diff --git a/firestore-bigquery-export/CHANGELOG.md b/firestore-bigquery-export/CHANGELOG.md index 0d464b106..4b29aab18 100644 --- a/firestore-bigquery-export/CHANGELOG.md +++ b/firestore-bigquery-export/CHANGELOG.md @@ -1,3 +1,7 @@ +## Version 0.2.3 + +fix: pass full document resource name to bigquery + ## Version 0.2.2 fix: remove default value on DATABASE_REGION diff --git a/firestore-bigquery-export/extension.yaml b/firestore-bigquery-export/extension.yaml index 917d82ae6..8d4b1baa2 100644 --- a/firestore-bigquery-export/extension.yaml +++ b/firestore-bigquery-export/extension.yaml @@ -13,7 +13,7 @@ # limitations under the License. name: firestore-bigquery-export -version: 0.2.2 +version: 0.2.3 specVersion: v1beta displayName: Stream Firestore to BigQuery diff --git a/firestore-bigquery-export/functions/__tests__/__snapshots__/config.test.ts.snap b/firestore-bigquery-export/functions/__tests__/__snapshots__/config.test.ts.snap index 532142de6..58ae2dab9 100644 --- a/firestore-bigquery-export/functions/__tests__/__snapshots__/config.test.ts.snap +++ b/firestore-bigquery-export/functions/__tests__/__snapshots__/config.test.ts.snap @@ -26,6 +26,7 @@ Object { "maxDispatchesPerSecond": 10, "maxEnqueueAttempts": 3, "maxStaleness": undefined, + "projectId": undefined, "refreshIntervalMinutes": undefined, "tableId": "my_table", "timePartitioning": null, diff --git a/firestore-bigquery-export/functions/src/config.ts b/firestore-bigquery-export/functions/src/config.ts index 00e3b9688..f58d43cf3 100644 --- a/firestore-bigquery-export/functions/src/config.ts +++ b/firestore-bigquery-export/functions/src/config.ts @@ -34,6 +34,7 @@ export function clustering(clusters: string | undefined) { export default { bqProjectId: process.env.BIGQUERY_PROJECT_ID, + projectId: process.env.PROJECT_ID, databaseId: process.env.DATABASE || "(default)", databaseRegion: process.env.DATABASE_REGION, collectionPath: process.env.COLLECTION_PATH, diff --git a/firestore-bigquery-export/functions/src/index.ts b/firestore-bigquery-export/functions/src/index.ts index 9fdf3be99..922557243 100644 --- a/firestore-bigquery-export/functions/src/index.ts +++ b/firestore-bigquery-export/functions/src/index.ts @@ -33,9 +33,8 @@ import { import * as logs from "./logs"; import * as events from "./events"; import { getChangeType, getDocumentId } from "./util"; -import { DocumentSnapshot } from "firebase-admin/firestore"; -// Configuration for the Firestore Event History Tracker. +// Configuration for the Firestore Event History Tracker const eventTrackerConfig = { firestoreInstanceId: config.databaseId, tableId: config.tableId, @@ -67,27 +66,27 @@ const eventTrackerConfig = { logLevel: config.logLevel, }; -// Initialize the Firestore Event History Tracker with the given configuration. -const eventTracker: FirestoreBigQueryEventHistoryTracker = - new FirestoreBigQueryEventHistoryTracker(eventTrackerConfig); +const eventTracker = new FirestoreBigQueryEventHistoryTracker( + eventTrackerConfig +); -// Initialize logging. logs.logger.setLogLevel(config.logLevel); logs.init(); -/** Initialize Firebase Admin SDK if not already initialized */ if (admin.apps.length === 0) { admin.initializeApp(); } -// Setup the event channel for EventArc. events.setupEventChannel(); -// Define a type for task data to ensure consistency +/** + * Task data structure for BigQuery synchronization + */ interface SyncBigQueryTaskData { timestamp: string; eventId: string; - documentPath: string; + relativePath: string; + fullResourceName: string; changeType: ChangeType; documentId: string; params: Record | null; @@ -96,39 +95,38 @@ interface SyncBigQueryTaskData { } /** - * Cloud Function to handle enqueued tasks to synchronize Firestore changes to BigQuery. + * Handles enqueued tasks for syncing Firestore changes to BigQuery */ export const syncBigQuery = functions.tasks .taskQueue() .onDispatch(async (taskData: SyncBigQueryTaskData, ctx) => { - const documentName = taskData.documentPath; + const fullResourceName = taskData.fullResourceName; const eventId = taskData.eventId; const operation = taskData.changeType; logs.logEventAction( "Firestore event received by onDispatch trigger", - documentName, + fullResourceName, eventId, operation ); try { - // Use the shared function to write the event to BigQuery await recordEventToBigQuery( taskData.changeType, taskData.documentId, + taskData.fullResourceName, taskData.data, taskData.oldData, taskData ); - // Record a success event in EventArc, if configured await events.recordSuccessEvent({ subject: taskData.documentId, data: { timestamp: taskData.timestamp, operation: taskData.changeType, - documentName: taskData.documentPath, + documentName: taskData.fullResourceName, documentId: taskData.documentId, pathParams: taskData.params, eventId: taskData.eventId, @@ -137,13 +135,11 @@ export const syncBigQuery = functions.tasks }, }); - // Log completion of the task. logs.complete(); } catch (err) { - // Log error and throw it to handle in the calling function. logs.logFailedEventAction( "Failed to write event to BigQuery from onDispatch handler", - documentName, + fullResourceName, eventId, operation, err as Error @@ -153,35 +149,34 @@ export const syncBigQuery = functions.tasks } }); +/** + * Main Cloud Function that triggers on Firestore document changes + * and sends the data to BigQuery + */ export const fsexportbigquery = onDocumentWritten( `${config.collectionPath}/{documentId}`, async (event) => { const { data, ...context } = event; - - // Start logging the function execution. logs.start(); - // Determine the type of change (CREATE, UPDATE, DELETE) from the new event data. const changeType = getChangeType(data); const documentId = getDocumentId(data); - - // Check if the document is newly created or deleted. const isCreated = changeType === ChangeType.CREATE; const isDeleted = changeType === ChangeType.DELETE; - // Get the new and old data from the snapshot. const newData = isDeleted ? undefined : data.after.data(); const oldData = isCreated || config.excludeOldData ? undefined : data.before.data(); - // check this is the full doc name - const documentName = context.document; + const relativeName = context.document; + const projectId = config.projectId; + const fullResourceName = `projects/${projectId}/databases/${config.databaseId}/documents/${relativeName}`; const eventId = context.id; const operation = changeType; logs.logEventAction( "Firestore event received by onDocumentWritten trigger", - documentName, + fullResourceName, eventId, operation ); @@ -190,13 +185,12 @@ export const fsexportbigquery = onDocumentWritten( let serializedOldData: any; try { - // Serialize the data before processing. serializedData = eventTracker.serializeData(newData); serializedOldData = eventTracker.serializeData(oldData); } catch (err) { logs.logFailedEventAction( "Failed to serialize data", - documentName, + fullResourceName, eventId, operation, err as Error @@ -205,7 +199,6 @@ export const fsexportbigquery = onDocumentWritten( } try { - // Record the start event in EventArc, if configured. await events.recordStartEvent({ documentId, changeType, @@ -219,16 +212,17 @@ export const fsexportbigquery = onDocumentWritten( } try { - // Write the change event to BigQuery. await recordEventToBigQuery( changeType, documentId, + fullResourceName, serializedData, serializedOldData, { timestamp: context.time, eventId: context.id, - documentPath: context.document, + relativePath: context.document, + fullResourceName, changeType, documentId, params: config.wildcardIds ? context.params : null, @@ -238,11 +232,12 @@ export const fsexportbigquery = onDocumentWritten( ); } catch (err) { logs.failedToWriteToBigQueryImmediately(err as Error); - // Handle enqueue errors with retries and backup to GCS. + await attemptToEnqueue(err, { timestamp: context.time, eventId: context.id, - documentPath: context.document, + relativePath: context.document, + fullResourceName: fullResourceName, changeType, documentId, params: config.wildcardIds ? context.params : null, @@ -251,49 +246,49 @@ export const fsexportbigquery = onDocumentWritten( }); } - // Log the successful completion of the function. logs.complete(); } ); /** - * Record the event to the Firestore Event History Tracker and BigQuery. + * Records a Firestore document change event to BigQuery * - * @param changeType - The type of change (CREATE, UPDATE, DELETE). - * @param documentId - The ID of the Firestore document. - * @param serializedData - The serialized new data of the document. - * @param serializedOldData - The serialized old data of the document. - * @param taskData - The task data containing event information. + * @param changeType - The type of change (CREATE, UPDATE, DELETE) + * @param documentId - The ID of the Firestore document + * @param fullResourceName - Fully-qualified Firestore document path + * @param serializedData - The serialized new data + * @param serializedOldData - The serialized old data + * @param taskData - Task metadata containing event information */ async function recordEventToBigQuery( changeType: ChangeType, documentId: string, + fullResourceName: string, serializedData: any, serializedOldData: any, taskData: SyncBigQueryTaskData ) { const event: FirestoreDocumentChangeEvent = { - timestamp: taskData.timestamp, // Cloud Firestore commit timestamp - operation: changeType, // The type of operation performed - documentName: taskData.documentPath, // The document name - documentId, // The document ID + timestamp: taskData.timestamp, + operation: changeType, + documentName: fullResourceName, + documentId, pathParams: taskData.params as | FirestoreDocumentChangeEvent["pathParams"] - | null, // Path parameters, if any - eventId: taskData.eventId, // The event ID from Firestore - data: serializedData, // Serialized new data - oldData: serializedOldData, // Serialized old data + | null, + eventId: taskData.eventId, + data: serializedData, + oldData: serializedOldData, }; - // Record the event in the Firestore Event History Tracker and BigQuery. await eventTracker.record([event]); } /** - * Handle errors when enqueueing tasks to sync BigQuery. + * Handles task enqueueing with retry logic when BigQuery sync fails * - * @param err - The error object. - * @param taskData - The task data to be enqueued. + * @param err - The error that occurred + * @param taskData - The task data to enqueue */ async function attemptToEnqueue(_err: Error, taskData: SyncBigQueryTaskData) { try { @@ -303,36 +298,31 @@ async function attemptToEnqueue(_err: Error, taskData: SyncBigQueryTaskData) { ); let attempts = 0; - const jitter = Math.random() * 100; // Adding jitter to avoid collision - - // Exponential backoff formula with a maximum of 5 + jitter seconds + const jitter = Math.random() * 100; const backoff = (attempt: number) => Math.min(Math.pow(2, attempt) * 100, 5000) + jitter; while (attempts < config.maxEnqueueAttempts) { if (attempts > 0) { - // Wait before retrying to enqueue the task. await new Promise((resolve) => setTimeout(resolve, backoff(attempts))); } attempts++; try { await queue.enqueue(taskData); - break; // Break the loop if enqueuing is successful. + break; } catch (enqueueErr) { - // Throw the error if max attempts are reached. if (attempts === config.maxEnqueueAttempts) { throw enqueueErr; } } } } catch (enqueueErr) { - // Record the error event. await events.recordErrorEvent(enqueueErr as Error); logs.logFailedEventAction( "Failed to enqueue event to Cloud Tasks from onWrite handler", - taskData.documentPath, + taskData.fullResourceName, taskData.eventId, taskData.changeType, enqueueErr as Error @@ -341,18 +331,13 @@ async function attemptToEnqueue(_err: Error, taskData: SyncBigQueryTaskData) { } /** - * Cloud Function to set up BigQuery sync by initializing the event tracker. + * Sets up BigQuery synchronization by initializing the event tracker */ export const setupBigQuerySync = functions.tasks .taskQueue() .onDispatch(async () => { - /** Setup runtime environment */ const runtime = getExtensions().runtime(); - - // Initialize the BigQuery sync. await eventTracker.initialize(); - - // Update the processing state. await runtime.setProcessingState( "PROCESSING_COMPLETE", "Sync setup completed" @@ -360,18 +345,13 @@ export const setupBigQuerySync = functions.tasks }); /** - * Cloud Function to initialize BigQuery sync. + * Initializes BigQuery synchronization */ export const initBigQuerySync = functions.tasks .taskQueue() .onDispatch(async () => { - /** Setup runtime environment */ const runtime = getExtensions().runtime(); - - // Initialize the BigQuery sync. await eventTracker.initialize(); - - // Update the processing state. await runtime.setProcessingState( "PROCESSING_COMPLETE", "Sync setup completed"