diff --git a/services/libs/tinybird/pipes/cdp_segment_metrics_total_sink.pipe b/services/libs/tinybird/pipes/cdp_segment_metrics_total_sink.pipe new file mode 100644 index 0000000000..72d5c32300 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_segment_metrics_total_sink.pipe @@ -0,0 +1,88 @@ +DESCRIPTION > + Global metrics for activities, organizations, and members used for CDP dashboard. + Uses cdp_member_segment_aggregates_MV and cdp_organization_segment_aggregates_MV. + "Last30Days" is based on lastActive >= now() - 30 days. + +NODE activitiesGlobalAllTimeState +SQL > + -- Global total activities from event-level data (latest snapshot only). + -- Use countState() (not countStateIf) to keep AggregateFunction(count) consistent. + SELECT countState() AS activitiesTotalState + FROM activityRelations_enriched_deduplicated_ds + WHERE + snapshotId = (SELECT max(snapshotId) FROM activityRelations_enriched_deduplicated_ds) + AND segmentId IS NOT NULL + AND segmentId != '' + +NODE activitiesGlobalLast30State +SQL > + -- Global last-30-days activities from event-level data (latest snapshot only). + -- Use WHERE filtering (not countStateIf) to keep AggregateFunction(count) consistent. + SELECT countState() AS activitiesLast30DaysState + FROM activityRelations_enriched_deduplicated_ds + WHERE + snapshotId = (SELECT max(snapshotId) FROM activityRelations_enriched_deduplicated_ds) + AND segmentId IS NOT NULL + AND segmentId != '' + AND timestamp >= now() - INTERVAL 30 DAY + +NODE cdpDashboardMetricsTotal +SQL > + SELECT + a.activitiesTotal, + a.activitiesLast30Days, + ms.membersTotal, + ms.membersLast30Days, + os.organizationsTotal, + os.organizationsLast30Days + FROM + ( + -- activities (global) + SELECT + countMerge(ag.activitiesTotalState) AS activitiesTotal, + countMerge(al30.activitiesLast30DaysState) AS activitiesLast30Days + FROM activitiesGlobalAllTimeState AS ag + CROSS JOIN activitiesGlobalLast30State AS al30 + ) AS a + CROSS JOIN + ( + -- member-based global metrics (single scan over cdp_member_segment_aggregates_MV) + SELECT + uniqCombined(memberId) AS membersTotal, + uniqCombinedIf(memberId, lastActive >= now() - INTERVAL 30 DAY) AS membersLast30Days + FROM + ( + -- finalize AggregateFunction states per member + SELECT + memberId, + countMerge(activityCountState) AS activityCount, + maxMerge(lastActiveState) AS lastActive + FROM cdp_member_segment_aggregates_MV + GROUP BY memberId + ) AS m + ) AS ms + CROSS JOIN + ( + -- organization-based global metrics (single scan over + -- cdp_organization_segment_aggregates_MV) + SELECT + uniqCombined(organizationId) AS organizationsTotal, + uniqCombinedIf( + organizationId, lastActive >= now() - INTERVAL 30 DAY + ) AS organizationsLast30Days + FROM + ( + -- finalize AggregateFunction states per organization + SELECT organizationId, maxMerge(lastActiveState) AS lastActive + FROM cdp_organization_segment_aggregates_MV + GROUP BY organizationId + ) AS o + ) AS os + +TYPE SINK +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_SCHEDULE 0 9 * * * +EXPORT_FORMAT json +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_total_sink