From 836a6421fc55b8635453a31030527b579a222e0a Mon Sep 17 00:00:00 2001 From: mikoldin Date: Tue, 23 Sep 2025 15:15:53 +0800 Subject: [PATCH] Sequenced message request acceptance and message sending --- .../ConversationVC+Interaction.swift | 72 +++++++++++-------- SessionMessagingKit/Jobs/MessageSendJob.swift | 71 ++++++++++++++---- .../MessageSender+Convenience.swift | 6 +- 3 files changed, 108 insertions(+), 41 deletions(-) diff --git a/Session/Conversations/ConversationVC+Interaction.swift b/Session/Conversations/ConversationVC+Interaction.swift index ad0541a19d..63d027d17e 100644 --- a/Session/Conversations/ConversationVC+Interaction.swift +++ b/Session/Conversations/ConversationVC+Interaction.swift @@ -721,20 +721,24 @@ extension ConversationVC: ) // If this was a message request then approve it - approveMessageRequestIfNeeded( + // For 1-1 messages if approval is required `messageRequestResponse` + // `MessageRequestResponse` object will be returned + let messageRequestResponse = approveMessageRequestIfNeeded( for: self.viewModel.threadData.threadId, threadVariant: self.viewModel.threadData.threadVariant, displayName: self.viewModel.threadData.displayName, isDraft: (self.viewModel.threadData.threadIsDraft == true), - timestampMs: (sentTimestampMs - 1) // Set 1ms earlier as this is used for sorting - ).sinkUntilComplete( - receiveCompletion: { [weak self] _ in - self?.sendMessage(optimisticData: optimisticData) - } + timestampMs: (sentTimestampMs - 1), // Set 1ms earlier as this is used for sorting + shouldSequenceMessageRequestResponse: true // Skips scheduling of `MessageRequestResponse` sending + ) + + sendMessage( + optimisticData: optimisticData, + messageRequestResponse: messageRequestResponse ) } - private func sendMessage(optimisticData: ConversationViewModel.OptimisticMessageData) { + private func sendMessage(optimisticData: ConversationViewModel.OptimisticMessageData, messageRequestResponse: MessageRequestResponse? = nil) { let threadId: String = self.viewModel.threadData.threadId let threadVariant: SessionThread.Variant = self.viewModel.threadData.threadVariant @@ -831,6 +835,7 @@ extension ConversationVC: interaction: insertedInteraction, threadId: threadId, threadVariant: threadVariant, + messageResponse: messageRequestResponse, using: dependencies ) } @@ -2948,13 +2953,15 @@ extension ConversationVC: UIDocumentInteractionControllerDelegate { // MARK: - Message Request Actions extension ConversationVC { + @discardableResult fileprivate func approveMessageRequestIfNeeded( for threadId: String, threadVariant: SessionThread.Variant, displayName: String, isDraft: Bool, - timestampMs: Int64 - ) -> AnyPublisher { + timestampMs: Int64, + shouldSequenceMessageRequestResponse: Bool = false + ) -> MessageRequestResponse? { let updateNavigationBackStack: () -> Void = { // Remove the 'SessionTableViewController' from the nav hierarchy if present DispatchQueue.main.async { [weak self] in @@ -2982,9 +2989,14 @@ extension ConversationVC { Contact.fetchOrCreate(db, id: threadId, using: dependencies) }), !contact.isApproved - else { return Just(()).eraseToAnyPublisher() } + else { return nil } - return viewModel.dependencies[singleton: .storage] + let messageRequestResponse = MessageRequestResponse( + isApproved: true, + sentTimestampMs: UInt64(timestampMs) + ) + + viewModel.dependencies[singleton: .storage] .writePublisher { [dependencies = viewModel.dependencies] db in /// If this isn't a draft thread (ie. sending a message request) then send a `messageRequestResponse` /// back to the sender (this allows the sender to know that they have been approved and can now use this @@ -3002,17 +3014,16 @@ extension ConversationVC { using: dependencies ).inserted(db) - try MessageSender.send( - db, - message: MessageRequestResponse( - isApproved: true, - sentTimestampMs: UInt64(timestampMs) - ), - interactionId: nil, - threadId: threadId, - threadVariant: threadVariant, - using: dependencies - ) + if !shouldSequenceMessageRequestResponse { + try MessageSender.send( + db, + message: messageRequestResponse, + interactionId: nil, + threadId: threadId, + threadVariant: threadVariant, + using: dependencies + ) + } } // Default 'didApproveMe' to true for the person approving the message request @@ -3038,7 +3049,9 @@ extension ConversationVC { updateNavigationBackStack() } ) - .eraseToAnyPublisher() + .sinkUntilComplete() + + return !isDraft && shouldSequenceMessageRequestResponse ? messageRequestResponse : nil case .group: // If the group is not in the invited state then don't bother doing anything @@ -3047,9 +3060,9 @@ extension ConversationVC { try ClosedGroup.fetchOne(db, id: threadId) }), group.invited == true - else { return Just(()).eraseToAnyPublisher() } + else { return nil } - return viewModel.dependencies[singleton: .storage] + viewModel.dependencies[singleton: .storage] .writePublisher { [dependencies = viewModel.dependencies] db in /// Remove any existing `infoGroupInfoInvited` interactions from the group (don't want to have a /// duplicate one from inside the group history) @@ -3102,10 +3115,13 @@ extension ConversationVC { updateNavigationBackStack() } ) - .eraseToAnyPublisher() + .sinkUntilComplete() - default: return Just(()).eraseToAnyPublisher() + default: + return nil } + + return nil } func acceptMessageRequest() { @@ -3115,7 +3131,7 @@ extension ConversationVC { displayName: self.viewModel.threadData.displayName, isDraft: (self.viewModel.threadData.threadIsDraft == true), timestampMs: viewModel.dependencies[cache: .snodeAPI].currentOffsetTimestampMs() - ).sinkUntilComplete() + ) } func declineMessageRequest() { diff --git a/SessionMessagingKit/Jobs/MessageSendJob.swift b/SessionMessagingKit/Jobs/MessageSendJob.swift index bbb2b1c7fe..66d513c686 100644 --- a/SessionMessagingKit/Jobs/MessageSendJob.swift +++ b/SessionMessagingKit/Jobs/MessageSendJob.swift @@ -193,13 +193,36 @@ public enum MessageSendJob: JobExecutor { default: break } + let userSessionId: SessionId = dependencies[cache: .general].sessionId + + // Convert and prepare the data for sending + let swarmPublicKey: String = { + switch details.destination { + case .contact(let publicKey): return publicKey + case .syncMessage: return userSessionId.hexString + case .closedGroup(let groupPublicKey): return groupPublicKey + case .openGroup, .openGroupInbox: preconditionFailure() + } + }() + // Store the sentTimestamp from the message in case it fails due to a clockOutOfSync error let originalSentTimestampMs: UInt64? = details.message.sentTimestampMs let startTime: TimeInterval = dependencies.dateNow.timeIntervalSince1970 + // Update `messageType` value used for logging info + var updatedMessageType: String { + guard let messageRequestResponse = job.transientData as? MessageRequestResponse else { + return messageType + } + return "\(messageType) and \(type(of: messageRequestResponse))" + } + /// Perform the actual message sending - this will timeout if the entire process takes longer than `Network.defaultTimeout * 2` /// which can occur if it needs to build a new onion path (which doesn't actually have any limits so can take forever in rare cases) /// + /// Adds sending for `MessageRequestResponse` along with actual message if `job.transientData` is a valid + /// `MessageRequestResponse` type + /// /// **Note:** No need to upload attachments as part of this process as the above logic splits that out into it's own job /// so we shouldn't get here until attachments have already been uploaded dependencies[singleton: .storage] @@ -216,15 +239,39 @@ public enum MessageSendJob: JobExecutor { using: dependencies ) }) - .tryFlatMap { authMethod in - try MessageSender.preparedSend( - message: details.message, - to: details.destination, - namespace: details.destination.defaultNamespace, - interactionId: job.interactionId, - attachments: messageAttachments, - authMethod: authMethod, - onEvent: MessageSender.standardEventHandling(using: dependencies), + .tryFlatMap { authMethod -> AnyPublisher<(ResponseInfoType, Network.BatchResponse), Error> in + return try Network.SnodeAPI.preparedSequence( + requests: [] + .appending(try { + guard let messageRequestResponse = job.transientData as? MessageRequestResponse else { return nil } + + return try MessageSender.preparedSend( + message: messageRequestResponse, + to: details.destination, + namespace: details.destination.defaultNamespace, + interactionId: nil, + attachments: nil, + authMethod: authMethod, + onEvent: MessageSender.standardEventHandling(using: dependencies), + using: dependencies + ) + }()) + .appending(contentsOf: [ + MessageSender.preparedSend( + message: details.message, + to: details.destination, + namespace: details.destination.defaultNamespace, + interactionId: job.interactionId, + attachments: messageAttachments, + authMethod: authMethod, + onEvent: MessageSender.standardEventHandling(using: dependencies), + using: dependencies + ) + ]), + requireAllBatchResponses: true, + swarmPublicKey: swarmPublicKey, + snodeRetrievalRetryCount: 0, // This job has it's own retry mechanism + requestAndPathBuildTimeout: Network.defaultTimeout, using: dependencies ).send(using: dependencies) } @@ -234,12 +281,12 @@ public enum MessageSendJob: JobExecutor { receiveCompletion: { result in switch result { case .finished: - Log.info(.cat, "Completed sending \(messageType) (\(job.id ?? -1)) after \(.seconds(dependencies.dateNow.timeIntervalSince1970 - startTime), unit: .s)\(previousDeferralsMessage).") + Log.info(.cat, "Completed sending \(updatedMessageType) (\(job.id ?? -1)) after \(.seconds(dependencies.dateNow.timeIntervalSince1970 - startTime), unit: .s)\(previousDeferralsMessage).") dependencies.setAsync(.hasSentAMessage, true) success(job, false) case .failure(let error): - Log.info(.cat, "Failed to send \(messageType) (\(job.id ?? -1)) after \(.seconds(dependencies.dateNow.timeIntervalSince1970 - startTime), unit: .s)\(previousDeferralsMessage) due to error: \(error).") + Log.info(.cat, "Failed to send \(updatedMessageType) (\(job.id ?? -1)) after \(.seconds(dependencies.dateNow.timeIntervalSince1970 - startTime), unit: .s)\(previousDeferralsMessage) due to error: \(error).") // Actual error handling switch (error, details.message) { @@ -250,7 +297,7 @@ public enum MessageSendJob: JobExecutor { failure(job, error, true) case (SnodeAPIError.clockOutOfSync, _): - Log.error(.cat, "\(originalSentTimestampMs != nil ? "Permanently Failing" : "Failing") to send \(messageType) (\(job.id ?? -1)) due to clock out of sync issue.") + Log.error(.cat, "\(originalSentTimestampMs != nil ? "Permanently Failing" : "Failing") to send \(updatedMessageType) (\(job.id ?? -1)) due to clock out of sync issue.") failure(job, error, (originalSentTimestampMs != nil)) // Don't bother retrying (it can just send a new one later but allowing retries diff --git a/SessionMessagingKit/Sending & Receiving/MessageSender+Convenience.swift b/SessionMessagingKit/Sending & Receiving/MessageSender+Convenience.swift index f10e1dca4c..a50ff00775 100644 --- a/SessionMessagingKit/Sending & Receiving/MessageSender+Convenience.swift +++ b/SessionMessagingKit/Sending & Receiving/MessageSender+Convenience.swift @@ -15,6 +15,7 @@ extension MessageSender { threadId: String, threadVariant: SessionThread.Variant, isSyncMessage: Bool = false, + messageResponse: MessageRequestResponse? = nil, using dependencies: Dependencies ) throws { // Only 'VisibleMessage' types can be sent via this method @@ -32,6 +33,7 @@ extension MessageSender { interactionId: interactionId, to: try Message.Destination.from(db, threadId: threadId, threadVariant: threadVariant), isSyncMessage: isSyncMessage, + messageResponse: messageResponse, using: dependencies ) } @@ -64,6 +66,7 @@ extension MessageSender { interactionId: Int64?, to destination: Message.Destination, isSyncMessage: Bool = false, + messageResponse: MessageRequestResponse? = nil, using dependencies: Dependencies ) { // If it's a sync message then we need to make some slight tweaks before sending so use the proper @@ -89,7 +92,8 @@ extension MessageSender { details: MessageSendJob.Details( destination: destination, message: message - ) + ), + transientData: messageResponse ), canStartJob: true )