From fc86e96f1accbca7b2662deb83e48579b2fbd991 Mon Sep 17 00:00:00 2001 From: cameronvoell Date: Tue, 15 Jul 2025 11:54:36 -0700 Subject: [PATCH 1/2] stream messages in conversation screen, reconnect when connection drops --- example/src/ConversationScreen.tsx | 185 +++++++++++++++++++++-------- 1 file changed, 133 insertions(+), 52 deletions(-) diff --git a/example/src/ConversationScreen.tsx b/example/src/ConversationScreen.tsx index 1772ccedb..9df8db300 100644 --- a/example/src/ConversationScreen.tsx +++ b/example/src/ConversationScreen.tsx @@ -8,7 +8,7 @@ import * as ImagePicker from 'expo-image-picker' import type { ImagePickerAsset } from 'expo-image-picker' import { PermissionStatus } from 'expo-modules-core' import moment from 'moment' -import React, { useCallback, useMemo, useRef, useState } from 'react' +import React, { useCallback, useMemo, useRef, useState, useEffect } from 'react' import { Button, FlatList, @@ -72,14 +72,10 @@ export default function ConversationScreen({ const [replyingTo, setReplyingTo] = useState(null) const [text, setText] = useState('') const [isShowingAttachmentModal, setShowingAttachmentModal] = useState(false) - const [attachment, setAttachment] = useState(null) const [isAttachmentPreviewing, setAttachmentPreviewing] = useState(false) const [isSending, setSending] = useState(false) - // const { remoteAttachment } = usePrepareRemoteAttachment({ - // fileUri: attachment?.image?.uri || attachment?.file?.uri, - // mimeType: attachment?.file?.mimeType, - // }) + // Update state to handle multiple attachments const [attachments, setAttachments] = useState([]) const [previewingAttachmentIndex, setPreviewingAttachmentIndex] = useState< @@ -100,20 +96,107 @@ export default function ConversationScreen({ } }), }) + const [streamedMessages, setStreamedMessages] = useState([]) + + const streamCleanupRef = useRef<(() => void) | null>(null) + + useEffect(() => { + if (!conversation) return - // const sendRemoteAttachmentMessages = () => { - // if (remoteAttachments && remoteAttachments.length) { - // Promise.all( - // remoteAttachments.map(attachment => - // sendMessage({ remoteAttachment: attachment }) - // ) - // ) - // .then(() => setAttachments([])) - // .catch((e) => { - // console.error('Error sending messages: ', e) - // }) - // } - // } + // If there's already an active stream, clean it up first + if (streamCleanupRef.current) { + console.log('๐Ÿงน Cleaning up existing stream before setting up new one') + streamCleanupRef.current() + streamCleanupRef.current = null + } + + console.log('๐Ÿ”ต Setting up stream for conversation:', conversation.id) + + let isActive = true + let retryTimeout: NodeJS.Timeout | null = null + let retryCount = 0 + const maxRetries = 10 + + const attemptStreamSetup = async () => { + if (!isActive || retryCount >= maxRetries) { + if (retryCount >= maxRetries) { + console.log(`โŒ Max retries (${maxRetries}) reached, giving up on stream`) + } + return + } + + try { + // First, try to sync + console.log(`๐Ÿ”„ Checking sync... (attempt ${retryCount + 1}/${maxRetries})`) + await conversation.sync() + console.log(`โœ… Conversation sync successful`) + + // If sync works, try to start the stream + console.log(`๐Ÿ”„ Starting stream... (attempt ${retryCount + 1}/${maxRetries})`) + const cleanup = await conversation.streamMessages( + async (message) => { + console.log('๐Ÿ“จ Streamed message:', message.id, message.contentTypeId, message.content()) + + if (isActive) { + setStreamedMessages(prev => { + const messageExists = prev.some(msg => msg.id === message.id) + if (messageExists) return prev + + return [message, ...prev] + }) + } + }, + // onClose callback - retry the whole setup + () => { + console.log('โŒ Stream closed for conversation:', conversation.id) + if (isActive && retryCount < maxRetries) { + retryCount++ + console.log(`๐Ÿ”„ Stream closed, retrying in 10 seconds... (attempt ${retryCount}/${maxRetries})`) + retryTimeout = setTimeout(attemptStreamSetup, 10000) + } + } + ) + + // Reset retry count on successful setup + retryCount = 0 + streamCleanupRef.current = cleanup + console.log('โœ… Stream setup complete for conversation:', conversation.id) + + } catch (error) { + console.error('๐Ÿ’ฅ Error during setup (sync or stream):', error) + + // Retry the whole thing (sync + stream) on any error + if (isActive && retryCount < maxRetries) { + retryCount++ + console.log(`๐Ÿ”„ Setup failed, retrying in 10 seconds... (attempt ${retryCount}/${maxRetries})`) + retryTimeout = setTimeout(attemptStreamSetup, 10000) + } + } + } + + // Start the first attempt + attemptStreamSetup() + + return () => { + console.log('๐Ÿงน Cleaning up stream for conversation:', conversation.id) + isActive = false + + // Clear retry timeout if it exists + if (retryTimeout) { + clearTimeout(retryTimeout) + retryTimeout = null + } + + if (streamCleanupRef.current) { + try { + streamCleanupRef.current() + } catch (error: any) { + console.error('๐Ÿ’ฅ Error during cleanup:', error) + } + streamCleanupRef.current = null + } + } + }, [conversation?.id, topic]) const sendMultiRemoteAttachmentMessage = () => { if (remoteAttachments && remoteAttachments.length) { @@ -132,13 +215,27 @@ export default function ConversationScreen({ } } - const filteredMessages = useMemo( - () => - (messages ?? [])?.filter( - (message) => !hiddenMessageTypes.includes(message.contentTypeId) - ), - [messages] - ) + const filteredMessages = useMemo(() => { + // Start with fetched messages + let allMessages = [...(messages ?? [])] + + // Only add streamed messages if they're not already in the fetched messages + const streamedToAdd = streamedMessages.filter( + (streamedMsg) => + !allMessages.some((fetchedMsg) => fetchedMsg.id === streamedMsg.id) + ) + + // Combine them + allMessages = [...allMessages, ...streamedToAdd] + + // Sort by timestamp (newest first since FlatList is inverted) + const sortedMessages = allMessages.sort((a, b) => b.sentNs - a.sentNs) + + // Filter out hidden message types + return sortedMessages.filter( + (message) => !hiddenMessageTypes.includes(message.contentTypeId) + ) + }, [messages, streamedMessages]) const sendMessage = async ( content: ConversationSendPayload @@ -155,7 +252,7 @@ export default function ConversationScreen({ } as ConversationSendPayload) : content await conversation!.send(content) - await refreshMessages() + await handleRefreshMessages() setReplyingTo(null) } catch (e) { console.log('Error sending message', e) @@ -163,15 +260,7 @@ export default function ConversationScreen({ setSending(false) } } - // const sendRemoteAttachmentMessage = () => { - // if (remoteAttachment) { - // sendMessage({ remoteAttachment }) - // .then(() => setAttachment(null)) - // .catch((e) => { - // console.error('Error sending message: ', e) - // }) - // } - // } + const sendTextMessage = () => sendMessage({ text }).then(() => setText('')) const scrollToMessageId = useCallback( (messageId: string) => { @@ -189,6 +278,13 @@ export default function ConversationScreen({ [filteredMessages] ) + const handleRefreshMessages = useCallback(async () => { + // Clear streamed messages since refresh will include them + setStreamedMessages([]) + // Call the original refresh + return await refreshMessages() + }, [refreshMessages]) + return ( scrollToMessageId(replyingTo!)} /> )} - {attachment && ( - <> - setAttachmentPreviewing(true)} - onRemove={() => setAttachment(null)} - /> - setAttachmentPreviewing(false)} - /> - - )} {attachments.length > 0 && ( <> Date: Mon, 28 Jul 2025 11:27:00 -0700 Subject: [PATCH 2/2] fix lint warnings --- example/src/ConversationScreen.tsx | 55 ++++++++++++++++++--------- example/src/tests/contentTypeTests.ts | 20 ++++++---- example/src/tests/dmTests.ts | 2 +- 3 files changed, 51 insertions(+), 26 deletions(-) diff --git a/example/src/ConversationScreen.tsx b/example/src/ConversationScreen.tsx index 9df8db300..b23acf85c 100644 --- a/example/src/ConversationScreen.tsx +++ b/example/src/ConversationScreen.tsx @@ -111,7 +111,7 @@ export default function ConversationScreen({ } console.log('๐Ÿ”ต Setting up stream for conversation:', conversation.id) - + let isActive = true let retryTimeout: NodeJS.Timeout | null = null let retryCount = 0 @@ -120,28 +120,39 @@ export default function ConversationScreen({ const attemptStreamSetup = async () => { if (!isActive || retryCount >= maxRetries) { if (retryCount >= maxRetries) { - console.log(`โŒ Max retries (${maxRetries}) reached, giving up on stream`) + console.log( + `โŒ Max retries (${maxRetries}) reached, giving up on stream` + ) } return } try { // First, try to sync - console.log(`๐Ÿ”„ Checking sync... (attempt ${retryCount + 1}/${maxRetries})`) + console.log( + `๐Ÿ”„ Checking sync... (attempt ${retryCount + 1}/${maxRetries})` + ) await conversation.sync() console.log(`โœ… Conversation sync successful`) // If sync works, try to start the stream - console.log(`๐Ÿ”„ Starting stream... (attempt ${retryCount + 1}/${maxRetries})`) + console.log( + `๐Ÿ”„ Starting stream... (attempt ${retryCount + 1}/${maxRetries})` + ) const cleanup = await conversation.streamMessages( async (message) => { - console.log('๐Ÿ“จ Streamed message:', message.id, message.contentTypeId, message.content()) - + console.log( + '๐Ÿ“จ Streamed message:', + message.id, + message.contentTypeId, + message.content() + ) + if (isActive) { - setStreamedMessages(prev => { - const messageExists = prev.some(msg => msg.id === message.id) + setStreamedMessages((prev) => { + const messageExists = prev.some((msg) => msg.id === message.id) if (messageExists) return prev - + return [message, ...prev] }) } @@ -151,42 +162,50 @@ export default function ConversationScreen({ console.log('โŒ Stream closed for conversation:', conversation.id) if (isActive && retryCount < maxRetries) { retryCount++ - console.log(`๐Ÿ”„ Stream closed, retrying in 10 seconds... (attempt ${retryCount}/${maxRetries})`) + console.log( + `๐Ÿ”„ Stream closed, retrying in 10 seconds... (attempt ${retryCount}/${maxRetries})` + ) retryTimeout = setTimeout(attemptStreamSetup, 10000) } } ) - + // Reset retry count on successful setup retryCount = 0 streamCleanupRef.current = cleanup - console.log('โœ… Stream setup complete for conversation:', conversation.id) - + console.log( + 'โœ… Stream setup complete for conversation:', + conversation.id + ) } catch (error) { console.error('๐Ÿ’ฅ Error during setup (sync or stream):', error) - + // Retry the whole thing (sync + stream) on any error if (isActive && retryCount < maxRetries) { retryCount++ - console.log(`๐Ÿ”„ Setup failed, retrying in 10 seconds... (attempt ${retryCount}/${maxRetries})`) + console.log( + `๐Ÿ”„ Setup failed, retrying in 10 seconds... (attempt ${retryCount}/${maxRetries})` + ) retryTimeout = setTimeout(attemptStreamSetup, 10000) } } } // Start the first attempt - attemptStreamSetup() + attemptStreamSetup().catch((error) => { + console.error('Failed to start initial stream setup:', error) + }) return () => { console.log('๐Ÿงน Cleaning up stream for conversation:', conversation.id) isActive = false - + // Clear retry timeout if it exists if (retryTimeout) { clearTimeout(retryTimeout) retryTimeout = null } - + if (streamCleanupRef.current) { try { streamCleanupRef.current() diff --git a/example/src/tests/contentTypeTests.ts b/example/src/tests/contentTypeTests.ts index 81b93ade0..013a3298e 100644 --- a/example/src/tests/contentTypeTests.ts +++ b/example/src/tests/contentTypeTests.ts @@ -22,13 +22,13 @@ function test(name: string, perform: () => Promise) { test('DecodedMessage.from() should throw informative error on null', async () => { try { - DecodedMessage.from("undefined") + DecodedMessage.from('undefined') } catch (e: any) { assert(e.toString().includes('JSON Parse error'), 'Error: ' + e.toString()) } try { - DecodedMessage.from("") + DecodedMessage.from('') } catch (e: any) { assert(e.toString().includes('JSON Parse error'), 'Error: ' + e.toString()) } @@ -42,23 +42,29 @@ test('DecodedMessage.from() should throw informative error on null', async () => try { DecodedMessage.from(null) } catch (e: any) { - assert(e.toString().includes('Tried to parse null as a DecodedMessage'), 'Error: ' + e.toString()) + assert( + e.toString().includes('Tried to parse null as a DecodedMessage'), + 'Error: ' + e.toString() + ) } try { - DecodedMessage.from("null") + DecodedMessage.from('null') } catch (e: any) { - assert(e.toString().includes('Tried to parse null as a DecodedMessage'), 'Error: ' + e.toString()) + assert( + e.toString().includes('Tried to parse null as a DecodedMessage'), + 'Error: ' + e.toString() + ) } - let json = '{"id": "123", "topic": "123", "contentTypeId": "123", "senderInboxId": "123", "sentNs": 123, "content": "123", "fallback": "123", "deliveryStatus": "123", "childMessages": null}' + const json = + '{"id": "123", "topic": "123", "contentTypeId": "123", "senderInboxId": "123", "sentNs": 123, "content": "123", "fallback": "123", "deliveryStatus": "123", "childMessages": null}' try { DecodedMessage.from(json) } catch (e: any) { assert(false, 'Error: ' + e.toString()) } return true - }) test('can fetch messages with reactions', async () => { diff --git a/example/src/tests/dmTests.ts b/example/src/tests/dmTests.ts index 1c36d6f3f..c1910b182 100644 --- a/example/src/tests/dmTests.ts +++ b/example/src/tests/dmTests.ts @@ -167,7 +167,7 @@ test('can stream dm messages', async () => { await alixDm.streamMessages(async () => { dmMessageCallbacks++ }) - + await delayToPropogate(1000) await alixConversation?.send({ text: `first message` }) await alixDm.send({ text: `first message` })