diff --git a/services/libs/tinybird/datasources/cdp_member_segment_aggregates_ds.datasource b/services/libs/tinybird/datasources/cdp_member_segment_aggregates_ds.datasource new file mode 100644 index 0000000000..3ee3f235e1 --- /dev/null +++ b/services/libs/tinybird/datasources/cdp_member_segment_aggregates_ds.datasource @@ -0,0 +1,15 @@ +SCHEMA > + `segmentId` String, + `memberId` String, + `tenantId` String, + `activityCountState` AggregateFunction(count, String), + `lastActiveState` AggregateFunction(max, DateTime64(3)), + `activityTypesState` AggregateFunction(groupArrayDistinct, String), + `activeOnState` AggregateFunction(groupArrayDistinct, String), + `averageSentimentState` AggregateFunction(avg, Int8), + `lastActivityUpdatedAtState` AggregateFunction(max, DateTime64(3)), + `updatedAt` DateTime64(3) + +ENGINE AggregatingMergeTree +ENGINE_PARTITION_KEY toYear(updatedAt) +ENGINE_SORTING_KEY segmentId, memberId diff --git a/services/libs/tinybird/datasources/cdp_organization_segment_aggregates_ds.datasource b/services/libs/tinybird/datasources/cdp_organization_segment_aggregates_ds.datasource new file mode 100644 index 0000000000..9ad5d7671e --- /dev/null +++ b/services/libs/tinybird/datasources/cdp_organization_segment_aggregates_ds.datasource @@ -0,0 +1,16 @@ +SCHEMA > + `segmentId` String, + `organizationId` String, + `tenantId` String, + `joinedAtState` AggregateFunction(min, DateTime64(3)), + `lastActiveState` AggregateFunction(max, DateTime64(3)), + `activeOnState` AggregateFunction(groupArrayDistinct, String), + `activityCountState` AggregateFunction(count, String), + `memberCountState` AggregateFunction(countDistinct, String), + `avgContributorEngagement` AggregateFunction(avg, Int8), + `lastActivityUpdatedAtState` AggregateFunction(max, DateTime64(3)), + `updatedAt` DateTime64(3) + +ENGINE AggregatingMergeTree +ENGINE_PARTITION_KEY toYear(updatedAt) +ENGINE_SORTING_KEY segmentId, organizationId diff --git a/services/libs/tinybird/pipes/cdp_member_aggregates_bucket_backfiller_sink.pipe b/services/libs/tinybird/pipes/cdp_member_aggregates_bucket_backfiller_sink.pipe new file mode 100644 index 0000000000..bb177f85c9 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_member_aggregates_bucket_backfiller_sink.pipe @@ -0,0 +1,104 @@ +NODE leaf_segment_aggregates +SQL > + % + SELECT + segmentId, + memberId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + countMerge(activityCountState) AS activityCount, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activityTypesState) AS activityTypes, + groupArrayDistinctMerge(activeOnState) AS activeOn, + avgMerge(averageSentimentState) AS averageSentiment, + maxMerge(updatedAtState) AS updatedAt + FROM cdp_member_segment_aggregates_ds + {% if defined(bucket_id) %} + WHERE + cityHash64(segmentId) % 10 + = {{ + UInt8( + bucket_id, + 0, + description="This is bucket id of the activity segment", + required=False, + ) + }} + {% end %} + GROUP BY segmentId, memberId + +NODE parent_segment_aggregates +SQL > + % + SELECT + parentId as segmentId, + memberId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + countMerge(activityCountState) AS activityCount, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activityTypesState) AS activityTypes, + groupArrayDistinctMerge(activeOnState) AS activeOn, + avgMerge(averageSentimentState) AS averageSentiment, + maxMerge(updatedAtState) AS updatedAt + FROM cdp_member_segment_aggregates_ds as cdp_aggs + join segments s on s.id = cdp_aggs.segmentId + {% if defined(bucket_id) %} + WHERE + cityHash64(segmentId) % 10 + = {{ + UInt8( + bucket_id, + 0, + description="This is bucket id of the activity segment", + required=False, + ) + }} + {% end %} + GROUP BY parentId, memberId + +NODE grandparent_segment_aggregates +SQL > + % + SELECT + grandparentId as segmentId, + memberId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + countMerge(activityCountState) AS activityCount, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activityTypesState) AS activityTypes, + groupArrayDistinctMerge(activeOnState) AS activeOn, + avgMerge(averageSentimentState) AS averageSentiment, + maxMerge(updatedAtState) AS updatedAt + FROM cdp_member_segment_aggregates_ds as cdp_aggs + join segments s on s.id = cdp_aggs.segmentId + {% if defined(bucket_id) %} + WHERE + cityHash64(grandparentId) % 10 + = {{ + UInt8( + bucket_id, + 0, + description="This is bucket id of the activity segment", + required=False, + ) + }} + {% end %} + GROUP BY grandparentId, memberId + +NODE cdp_member_segment_aggs_union +SQL > + select * + from leaf_segment_aggregates + union all + select * + from parent_segment_aggregates + union all + select * + from grandparent_segment_aggregates + +TYPE SINK +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_SCHEDULE @on-demand +EXPORT_FORMAT csv +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC memberSegmentsAggs_sink diff --git a/services/libs/tinybird/pipes/cdp_member_aggregates_changed_grandparent_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_member_aggregates_changed_grandparent_segments_sink.pipe new file mode 100644 index 0000000000..709bc09d5a --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_member_aggregates_changed_grandparent_segments_sink.pipe @@ -0,0 +1,48 @@ +NODE members_with_changed_aggs_previous_day +SQL > + select distinct memberId + from cdp_member_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + +NODE segments_with_changed_aggs_previous_day +SQL > + select id as segmentId + from segments + where + grandparentId in ( + select grandparentId + from segments + where + id in ( + select distinct segmentId + from cdp_member_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + ) + ) + +NODE grandparent_segment_aggs_updated_previous_day +SQL > + SELECT + grandparentId as segmentId, + memberId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + countMerge(activityCountState) AS activityCount, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activityTypesState) AS activityTypes, + groupArrayDistinctMerge(activeOnState) AS activeOn, + avgMerge(averageSentimentState) AS averageSentiment, + now() AS updatedAt + FROM cdp_member_segment_aggregates_ds as cdp_aggs + join segments s on s.id = cdp_aggs.segmentId + where + cdp_aggs.segmentId in (select segmentId from segments_with_changed_aggs_previous_day) + and memberId in (select memberId from members_with_changed_aggs_previous_day) + GROUP BY grandparentId, memberId + +TYPE SINK +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_SCHEDULE 30 1 * * * +EXPORT_FORMAT csv +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink diff --git a/services/libs/tinybird/pipes/cdp_member_aggregates_changed_leaf_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_member_aggregates_changed_leaf_segments_sink.pipe new file mode 100644 index 0000000000..5eaa3a7ae1 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_member_aggregates_changed_leaf_segments_sink.pipe @@ -0,0 +1,29 @@ +NODE leaf_segment_aggs_updated_previous_day +SQL > + % + SELECT + segmentId, + memberId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + countMerge(activityCountState) AS activityCount, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activityTypesState) AS activityTypes, + groupArrayDistinctMerge(activeOnState) AS activeOn, + avgMerge(averageSentimentState) AS averageSentiment, + now() as updatedAt + FROM cdp_member_segment_aggregates_ds + WHERE + (memberId, segmentId) in ( + select distinct memberId, segmentId + from cdp_member_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + ) + GROUP BY segmentId, memberId, updatedAt + +TYPE SINK +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_SCHEDULE 0 1 * * * +EXPORT_FORMAT csv +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink diff --git a/services/libs/tinybird/pipes/cdp_member_aggregates_changed_parent_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_member_aggregates_changed_parent_segments_sink.pipe new file mode 100644 index 0000000000..6e3ff065c2 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_member_aggregates_changed_parent_segments_sink.pipe @@ -0,0 +1,49 @@ +NODE members_with_changed_aggs_previous_day +SQL > + select distinct memberId + from cdp_member_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + +NODE segments_with_changed_aggs_previous_day +SQL > + select id as segmentId + from segments + where + parentId in ( + select parentId + from segments + where + id in ( + select distinct segmentId + from cdp_member_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + ) + ) + +NODE cdp_member_aggregates_sink_daily_parent_segments_1 +SQL > + % + SELECT + parentId as segmentId, + memberId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + countMerge(activityCountState) AS activityCount, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activityTypesState) AS activityTypes, + groupArrayDistinctMerge(activeOnState) AS activeOn, + avgMerge(averageSentimentState) AS averageSentiment, + now() AS updatedAt + FROM cdp_member_segment_aggregates_ds as cdp_aggs + join segments s on s.id = cdp_aggs.segmentId + where + cdp_aggs.segmentId in (select segmentId from segments_with_changed_aggs_previous_day) + and memberId in (select memberId from members_with_changed_aggs_previous_day) + GROUP BY parentId, memberId + +TYPE SINK +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_SCHEDULE 0 1 * * * +EXPORT_FORMAT csv +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink diff --git a/services/libs/tinybird/pipes/cdp_member_segment_aggregates_MV.pipe b/services/libs/tinybird/pipes/cdp_member_segment_aggregates_MV.pipe new file mode 100644 index 0000000000..3a71264cb6 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_member_segment_aggregates_MV.pipe @@ -0,0 +1,18 @@ +NODE cdp_member_aggregates_sink_initial_snapshot_0 +SQL > + SELECT + segmentId, + memberId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + countState(activityId) AS activityCountState, + maxState(timestamp) AS lastActiveState, + groupArrayDistinctState(type) AS activityTypesState, + groupArrayDistinctState(platform) AS activeOnState, + avgState(sentimentScore) AS averageSentimentState, + maxState(updatedAt) AS lastActivityUpdatedAtState, + now64(3) as updatedAt + FROM activityRelations_enrich_snapshot_MV_ds + GROUP BY segmentId, memberId + +TYPE MATERIALIZED +DATASOURCE cdp_member_segment_aggregates_ds diff --git a/services/libs/tinybird/pipes/cdp_member_segment_aggregates_initial_snapshot.pipe b/services/libs/tinybird/pipes/cdp_member_segment_aggregates_initial_snapshot.pipe new file mode 100644 index 0000000000..b6a69804fd --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_member_segment_aggregates_initial_snapshot.pipe @@ -0,0 +1,20 @@ +NODE cdp_member_aggregates_sink_initial_snapshot_0 +SQL > + SELECT + segmentId, + memberId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + countState(activityId) AS activityCountState, + maxState(timestamp) AS lastActiveState, + groupArrayDistinctState(type) AS activityTypesState, + groupArrayDistinctState(platform) AS activeOnState, + avgState(sentimentScore) AS averageSentimentState, + maxState(act.updatedAt) as lastActivityUpdatedAtState, + max(act.updatedAt) as updatedAt + FROM activityRelations_enriched_deduplicated_ds act + GROUP BY segmentId, memberId + +TYPE COPY +TARGET_DATASOURCE cdp_member_segment_aggregates_ds +COPY_MODE replace +COPY_SCHEDULE @on-demand diff --git a/services/libs/tinybird/pipes/cdp_organization_aggregates_bucket_backfiller_sink.pipe b/services/libs/tinybird/pipes/cdp_organization_aggregates_bucket_backfiller_sink.pipe new file mode 100644 index 0000000000..710e2c56b5 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_organization_aggregates_bucket_backfiller_sink.pipe @@ -0,0 +1,110 @@ +NODE leaf_segment_aggregates +SQL > + % + SELECT + segmentId, + organizationId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + minMerge(joinedAtState) as joinedAt, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activeOnState) AS activeOn, + countMerge(activityCountState) AS activityCount, + countDistinctMerge(memberCountState) as memberCount, + round(avgMerge(avgContributorEngagement)) AS avgContributorEngagement, + max(updatedAt) AS updatedAt + FROM cdp_organization_segment_aggregates_ds + {% if defined(bucket_id) %} + WHERE + organizationId <> '' + AND cityHash64(segmentId) % 3 + = {{ + UInt8( + bucket_id, + 0, + description="This is bucket id of the activity segment", + required=False, + ) + }} + {% end %} + GROUP BY segmentId, organizationId + +NODE parent_segment_aggregates +SQL > + % + SELECT + parentId as segmentId, + organizationId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + minMerge(joinedAtState) as joinedAt, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activeOnState) AS activeOn, + countMerge(activityCountState) AS activityCount, + countDistinctMerge(memberCountState) as memberCount, + round(avgMerge(avgContributorEngagement)) AS avgContributorEngagement, + max(updatedAt) AS updatedAt + FROM cdp_organization_segment_aggregates_ds as cdp_aggs + join segments s on s.id = cdp_aggs.segmentId + {% if defined(bucket_id) %} + WHERE + organizationId <> '' + AND cityHash64(segmentId) % 3 + = {{ + UInt8( + bucket_id, + 0, + description="This is bucket id of the activity segment", + required=False, + ) + }} + {% end %} + GROUP BY parentId, organizationId + +NODE grandparent_segment_aggregates +SQL > + % + SELECT + grandparentId as segmentId, + organizationId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + minMerge(joinedAtState) as joinedAt, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activeOnState) AS activeOn, + countMerge(activityCountState) AS activityCount, + countDistinctMerge(memberCountState) as memberCount, + round(avgMerge(avgContributorEngagement)) AS avgContributorEngagement, + max(updatedAt) AS updatedAt + FROM cdp_organization_segment_aggregates_ds as cdp_aggs + join segments s on s.id = cdp_aggs.segmentId + {% if defined(bucket_id) %} + WHERE + organizationId <> '' + AND cityHash64(grandparentId) % 3 + = {{ + UInt8( + bucket_id, + 0, + description="This is bucket id of the activity segment", + required=False, + ) + }} + {% end %} + GROUP BY grandparentId, organizationId + +NODE cdp_organization_segment_aggs_union +SQL > + select * + from leaf_segment_aggregates + union all + select * + from parent_segment_aggregates + union all + select * + from grandparent_segment_aggregates + +TYPE SINK +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_SCHEDULE @on-demand +EXPORT_FORMAT csv +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink diff --git a/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_grandparent_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_grandparent_segments_sink.pipe new file mode 100644 index 0000000000..e8f8b77256 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_grandparent_segments_sink.pipe @@ -0,0 +1,51 @@ +NODE organizations_with_changed_aggs_previous_day +SQL > + select distinct organizationId + from cdp_organization_segment_aggregates_ds + where + organizationId <> '' + and updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + +NODE segments_with_changed_aggs_previous_day +SQL > + select id as segmentId + from segments + where + grandparentId in ( + select grandparentId + from segments + where + id in ( + select distinct segmentId + from cdp_organization_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + ) + ) + +NODE grandparent_segment_aggs_updated_previous_day +SQL > + SELECT + grandparentId as segmentId, + organizationId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + minMerge(joinedAtState) as joinedAt, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activeOnState) AS activeOn, + countMerge(activityCountState) AS activityCount, + countDistinctMerge(memberCountState) as memberCount, + avgMerge(avgContributorEngagement) AS avgContributorEngagement, + now() as updatedAt + FROM cdp_organization_segment_aggregates_ds as cdp_aggs + join segments s on s.id = cdp_aggs.segmentId + where + cdp_aggs.segmentId in (select segmentId from segments_with_changed_aggs_previous_day) + and organizationId in (select organizationId from organizations_with_changed_aggs_previous_day) + GROUP BY grandparentId, organizationId + +TYPE SINK +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_SCHEDULE 30 2 * * * +EXPORT_FORMAT csv +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink diff --git a/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_leaf_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_leaf_segments_sink.pipe new file mode 100644 index 0000000000..e39d226927 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_leaf_segments_sink.pipe @@ -0,0 +1,31 @@ +NODE leaf_segment_aggs_updated_previous_day +SQL > + % + SELECT + segmentId, + organizationId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + minMerge(joinedAtState) as joinedAt, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activeOnState) AS activeOn, + countMerge(activityCountState) AS activityCount, + countDistinctMerge(memberCountState) as memberCount, + avgMerge(avgContributorEngagement) AS avgContributorEngagement, + now() as updatedAt + FROM cdp_organization_segment_aggregates_ds + WHERE + organizationId <> '' + AND (organizationId, segmentId) in ( + select distinct organizationId, segmentId + from cdp_organization_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + ) + GROUP BY segmentId, organizationId, updatedAt + +TYPE SINK +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_SCHEDULE 0 2 * * * +EXPORT_FORMAT csv +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink diff --git a/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_parent_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_parent_segments_sink.pipe new file mode 100644 index 0000000000..bd8cb4dc36 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_parent_segments_sink.pipe @@ -0,0 +1,50 @@ +NODE organizations_with_changed_aggs_previous_day +SQL > + select distinct organizationId + from cdp_organization_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + +NODE segments_with_changed_aggs_previous_day +SQL > + select id as segmentId + from segments + where + parentId in ( + select parentId + from segments + where + id in ( + select distinct segmentId + from cdp_organization_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + ) + ) + +NODE cdp_member_aggregates_sink_daily_parent_segments_1 +SQL > + % + SELECT + parentId as segmentId, + organizationId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + minMerge(joinedAtState) as joinedAt, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activeOnState) AS activeOn, + countMerge(activityCountState) AS activityCount, + countDistinctMerge(memberCountState) as memberCount, + avgMerge(avgContributorEngagement) AS avgContributorEngagement, + now() as updatedAt + FROM cdp_organization_segment_aggregates_ds as cdp_aggs + join segments s on s.id = cdp_aggs.segmentId + where + cdp_aggs.segmentId in (select segmentId from segments_with_changed_aggs_previous_day) + and organizationId in (select organizationId from organizations_with_changed_aggs_previous_day) + GROUP BY parentId, organizationId + +TYPE SINK +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_SCHEDULE 0 2 * * * +EXPORT_FORMAT csv +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink diff --git a/services/libs/tinybird/pipes/cdp_organization_segment_aggregates_MV.pipe b/services/libs/tinybird/pipes/cdp_organization_segment_aggregates_MV.pipe new file mode 100644 index 0000000000..93a5a3e4b5 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_organization_segment_aggregates_MV.pipe @@ -0,0 +1,19 @@ +NODE cdp_member_aggregates_sink_initial_snapshot_0 +SQL > + SELECT + segmentId, + organizationId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + minState(timestamp) as joinedAtState, + maxState(timestamp) AS lastActiveState, + groupArrayDistinctState(platform) AS activeOnState, + countState(activityId) AS activityCountState, + countDistinctState(memberId) as memberCountState, + avgState(score) AS avgContributorEngagement, + maxState(updatedAt) as lastActivityUpdatedAtState, + now64(3) as updatedAt + FROM activityRelations_enrich_snapshot_MV_ds + GROUP BY segmentId, organizationId + +TYPE MATERIALIZED +DATASOURCE cdp_organization_segment_aggregates_ds diff --git a/services/libs/tinybird/pipes/cdp_organization_segment_aggregates_initial_snapshot.pipe b/services/libs/tinybird/pipes/cdp_organization_segment_aggregates_initial_snapshot.pipe new file mode 100644 index 0000000000..e1e84adaed --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_organization_segment_aggregates_initial_snapshot.pipe @@ -0,0 +1,21 @@ +NODE cdp_org_aggregates_sink_initial_snapshot +SQL > + SELECT + segmentId, + organizationId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + minState(timestamp) as joinedAtState, + maxState(timestamp) AS lastActiveState, + groupArrayDistinctState(platform) AS activeOnState, + countState(activityId) AS activityCountState, + countDistinctState(memberId) as memberCountState, + avgState(score) AS avgContributorEngagement, + maxState(act.updatedAt) as lastActivityUpdatedAtState, + max(act.updatedAt) as updatedAt + FROM activityRelations_enriched_deduplicated_ds act + GROUP BY segmentId, organizationId + +TYPE COPY +TARGET_DATASOURCE cdp_organization_segment_aggregates_ds +COPY_MODE replace +COPY_SCHEDULE @on-demand