diff --git a/src/connectors/kafka/processor.rs b/src/connectors/kafka/processor.rs index 7727fc1fe..0ff9496c8 100644 --- a/src/connectors/kafka/processor.rs +++ b/src/connectors/kafka/processor.rs @@ -91,6 +91,7 @@ impl ParseableSinkProcessor { schema_version, StreamType::UserDefined, &p_custom_fields, + TelemetryType::Logs, )?; Ok(p_event) diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 5cd862b21..76b857f0e 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -31,7 +31,9 @@ use std::{collections::HashMap, sync::Arc}; use tracing::error; use super::EventFormat; -use crate::{metadata::SchemaVersion, storage::StreamType, utils::arrow::get_field}; +use crate::{ + handlers::TelemetryType, metadata::SchemaVersion, storage::StreamType, utils::arrow::get_field, +}; pub struct Event { pub json: Value, @@ -147,6 +149,7 @@ impl EventFormat for Event { schema_version: SchemaVersion, stream_type: StreamType, p_custom_fields: &HashMap, + telemetry_type: TelemetryType, ) -> Result { let custom_partition_values = match custom_partitions.as_ref() { Some(custom_partition) => { @@ -179,6 +182,7 @@ impl EventFormat for Event { time_partition: None, custom_partition_values, stream_type, + telemetry_type, }) } } diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index 9ed9e6052..56d3a676d 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -31,6 +31,7 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use crate::{ + handlers::TelemetryType, metadata::SchemaVersion, storage::StreamType, utils::arrow::{add_parseable_fields, get_field}, @@ -220,6 +221,7 @@ pub trait EventFormat: Sized { schema_version: SchemaVersion, stream_type: StreamType, p_custom_fields: &HashMap, + telemetry_type: TelemetryType, ) -> Result; } diff --git a/src/event/mod.rs b/src/event/mod.rs index aed646927..110ce2828 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -27,6 +27,7 @@ use std::sync::Arc; use self::error::EventError; use crate::{ LOCK_EXPECT, + handlers::TelemetryType, metadata::update_stats, metrics::{increment_events_ingested_by_date, increment_events_ingested_size_by_date}, parseable::{PARSEABLE, StagingError}, @@ -52,6 +53,7 @@ pub struct Event { pub time_partition: Option, pub custom_partition_values: HashMap, pub stream_type: StreamType, + pub telemetry_type: TelemetryType, } // Events holds the schema related to a each event for a single log stream @@ -92,7 +94,7 @@ impl Event { // Track billing metrics for event ingestion let date_string = self.parsed_timestamp.date().to_string(); increment_events_ingested_by_date(self.rb.num_rows() as u64, &date_string); - increment_events_ingested_size_by_date(self.origin_size, &date_string); + increment_events_ingested_size_by_date(self.origin_size, &date_string, self.telemetry_type); crate::livetail::LIVETAIL.process(&self.stream_name, &self.rb); diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index 332b4c42d..29527c734 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -100,6 +100,11 @@ struct BillingMetricsCollector { pub total_input_llm_tokens_by_date: HashMap>>, // provider -> model -> date -> count pub total_output_llm_tokens_by_date: HashMap>>, pub total_metrics_collected_by_date: HashMap, + pub total_metrics_collected_size_by_date: HashMap, + pub total_logs_collected_by_date: HashMap, + pub total_logs_collected_size_by_date: HashMap, + pub total_traces_collected_by_date: HashMap, + pub total_traces_collected_size_by_date: HashMap, pub event_time: chrono::NaiveDateTime, } @@ -202,6 +207,41 @@ impl BillingMetricsCollector { &self.total_metrics_collected_by_date, ); } + if !self.total_metrics_collected_size_by_date.is_empty() { + add_simple_metric( + events, + "total_metrics_collected_size", + &self.total_metrics_collected_size_by_date, + ); + } + if !self.total_logs_collected_by_date.is_empty() { + add_simple_metric( + events, + "total_logs_collected", + &self.total_logs_collected_by_date, + ); + } + if !self.total_logs_collected_size_by_date.is_empty() { + add_simple_metric( + events, + "total_logs_collected_size", + &self.total_logs_collected_size_by_date, + ); + } + if !self.total_traces_collected_by_date.is_empty() { + add_simple_metric( + events, + "total_traces_collected", + &self.total_traces_collected_by_date, + ); + } + if !self.total_traces_collected_size_by_date.is_empty() { + add_simple_metric( + events, + "total_traces_collected_size", + &self.total_traces_collected_size_by_date, + ); + } } /// Add object store metrics (method-based) to the events vector @@ -1273,6 +1313,11 @@ fn is_simple_metric(metric: &str) -> bool { | "parseable_total_files_scanned_in_query_by_date" | "parseable_total_bytes_scanned_in_query_by_date" | "parseable_total_metrics_collected_by_date" + | "parseable_total_metrics_collected_size_by_date" + | "parseable_total_logs_collected_by_date" + | "parseable_total_logs_collected_size_by_date" + | "parseable_total_traces_collected_by_date" + | "parseable_total_traces_collected_size_by_date" ) } @@ -1344,6 +1389,31 @@ fn process_simple_metric( .total_metrics_collected_by_date .insert(date.to_string(), value); } + "parseable_total_metrics_collected_size_by_date" => { + collector + .total_metrics_collected_size_by_date + .insert(date.to_string(), value); + } + "parseable_total_logs_collected_by_date" => { + collector + .total_logs_collected_by_date + .insert(date.to_string(), value); + } + "parseable_total_logs_collected_size_by_date" => { + collector + .total_logs_collected_size_by_date + .insert(date.to_string(), value); + } + "parseable_total_traces_collected_by_date" => { + collector + .total_traces_collected_by_date + .insert(date.to_string(), value); + } + "parseable_total_traces_collected_size_by_date" => { + collector + .total_traces_collected_size_by_date + .insert(date.to_string(), value); + } _ => {} } } diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 254224eb1..39d11f42e 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -125,7 +125,15 @@ pub async fn ingest( .add_update_log_source(&stream_name, log_source_entry) .await?; - flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?; + flatten_and_push_logs( + json, + &stream_name, + &log_source, + &p_custom_fields, + None, + telemetry_type, + ) + .await?; Ok(HttpResponse::Ok().finish()) } @@ -149,6 +157,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< SchemaVersion::V0, StreamType::Internal, &p_custom_fields, + TelemetryType::Logs, )? .process()?; @@ -235,6 +244,7 @@ async fn process_otel_content( body: web::Bytes, stream_name: &str, log_source: &LogSource, + telemetry_type: TelemetryType, ) -> Result<(), PostError> { let p_custom_fields = get_custom_fields_from_header(req); @@ -251,6 +261,7 @@ async fn process_otel_content( log_source, &p_custom_fields, None, + telemetry_type, ) .await?; } else if content_type == CONTENT_TYPE_PROTOBUF { @@ -289,7 +300,7 @@ pub async fn handle_otel_logs_ingestion( ) .await?; - process_otel_content(&req, body, &stream_name, &log_source).await?; + process_otel_content(&req, body, &stream_name, &log_source, TelemetryType::Logs).await?; Ok(HttpResponse::Ok().finish()) } @@ -309,7 +320,14 @@ pub async fn handle_otel_metrics_ingestion( ) .await?; - process_otel_content(&req, body, &stream_name, &log_source).await?; + process_otel_content( + &req, + body, + &stream_name, + &log_source, + TelemetryType::Metrics, + ) + .await?; Ok(HttpResponse::Ok().finish()) } @@ -329,7 +347,7 @@ pub async fn handle_otel_traces_ingestion( ) .await?; - process_otel_content(&req, body, &stream_name, &log_source).await?; + process_otel_content(&req, body, &stream_name, &log_source, TelemetryType::Traces).await?; Ok(HttpResponse::Ok().finish()) } @@ -396,7 +414,15 @@ pub async fn post_event( //return error if the stream log source is otel traces or otel metrics validate_stream_for_ingestion(&stream_name)?; - flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?; + flatten_and_push_logs( + json, + &stream_name, + &log_source, + &p_custom_fields, + None, + TelemetryType::Logs, + ) + .await?; Ok(HttpResponse::Ok().finish()) } @@ -415,6 +441,7 @@ pub async fn push_logs_unchecked( is_first_event: true, // NOTE: Maybe should be false custom_partition_values: HashMap::new(), // should be an empty map for unchecked push stream_type: StreamType::UserDefined, + telemetry_type: TelemetryType::Logs, }; unchecked_event.process_unchecked()?; diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index d544bece4..f9c5be680 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -32,7 +32,7 @@ use crate::{ format::{EventFormat, LogSource, json}, }, handlers::{ - EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY, + EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY, TelemetryType, http::{ ingest::PostError, kinesis::{Message, flatten_kinesis_logs}, @@ -54,6 +54,7 @@ pub async fn flatten_and_push_logs( log_source: &LogSource, p_custom_fields: &HashMap, time_partition: Option, + telemetry_type: TelemetryType, ) -> Result<(), PostError> { // Verify the dataset fields count verify_dataset_fields_count(stream_name)?; @@ -70,6 +71,7 @@ pub async fn flatten_and_push_logs( log_source, p_custom_fields, time_partition, + telemetry_type, ) .await?; } @@ -83,6 +85,7 @@ pub async fn flatten_and_push_logs( log_source, p_custom_fields, time_partition.clone(), + telemetry_type, ) .await?; } @@ -97,6 +100,7 @@ pub async fn flatten_and_push_logs( log_source, p_custom_fields, time_partition.clone(), + telemetry_type, ) .await?; } @@ -111,6 +115,7 @@ pub async fn flatten_and_push_logs( log_source, p_custom_fields, time_partition.clone(), + telemetry_type, ) .await?; } @@ -122,6 +127,7 @@ pub async fn flatten_and_push_logs( log_source, p_custom_fields, time_partition, + telemetry_type, ) .await? } @@ -136,6 +142,7 @@ pub async fn push_logs( log_source: &LogSource, p_custom_fields: &HashMap, time_partition: Option, + telemetry_type: TelemetryType, ) -> Result<(), PostError> { let stream = PARSEABLE.get_stream(stream_name)?; let time_partition_limit = PARSEABLE @@ -169,6 +176,7 @@ pub async fn push_logs( schema_version, StreamType::UserDefined, p_custom_fields, + telemetry_type, )? .process()?; } diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs index 784df157c..2b3200f67 100644 --- a/src/metrics/mod.rs +++ b/src/metrics/mod.rs @@ -17,7 +17,10 @@ */ pub mod prom_utils; -use crate::{handlers::http::metrics_path, stats::FullStats}; +use crate::{ + handlers::{TelemetryType, http::metrics_path}, + stats::FullStats, +}; use actix_web::Responder; use actix_web_prometheus::{PrometheusMetrics, PrometheusMetricsBuilder}; use error::MetricsError; @@ -380,7 +383,67 @@ pub static TOTAL_METRICS_COLLECTED_BY_DATE: Lazy = Lazy::new(|| { "Total metrics collected by date", ) .namespace(METRICS_NAMESPACE), - &["date"], + &["team", "date"], + ) + .expect("metric can be created") +}); + +pub static TOTAL_METRICS_COLLECTED_SIZE_BY_DATE: Lazy = Lazy::new(|| { + IntCounterVec::new( + Opts::new( + "total_metrics_collected_size_by_date", + "Total metrics collected size in bytes by date", + ) + .namespace(METRICS_NAMESPACE), + &["team", "date"], + ) + .expect("metric can be created") +}); + +pub static TOTAL_LOGS_COLLECTED_BY_DATE: Lazy = Lazy::new(|| { + IntCounterVec::new( + Opts::new( + "total_logs_collected_by_date", + "Total logs collected by date", + ) + .namespace(METRICS_NAMESPACE), + &["team", "date"], + ) + .expect("metric can be created") +}); + +pub static TOTAL_LOGS_COLLECTED_SIZE_BY_DATE: Lazy = Lazy::new(|| { + IntCounterVec::new( + Opts::new( + "total_logs_collected_size_by_date", + "Total logs collected size in bytes by date", + ) + .namespace(METRICS_NAMESPACE), + &["team", "date"], + ) + .expect("metric can be created") +}); + +pub static TOTAL_TRACES_COLLECTED_BY_DATE: Lazy = Lazy::new(|| { + IntCounterVec::new( + Opts::new( + "total_traces_collected_by_date", + "Total traces collected by date", + ) + .namespace(METRICS_NAMESPACE), + &["team", "date"], + ) + .expect("metric can be created") +}); + +pub static TOTAL_TRACES_COLLECTED_SIZE_BY_DATE: Lazy = Lazy::new(|| { + IntCounterVec::new( + Opts::new( + "total_traces_collected_size_by_date", + "Total traces collected size in bytes by date", + ) + .namespace(METRICS_NAMESPACE), + &["team", "date"], ) .expect("metric can be created") }); @@ -487,6 +550,21 @@ fn custom_metrics(registry: &Registry) { registry .register(Box::new(TOTAL_METRICS_COLLECTED_BY_DATE.clone())) .expect("metric can be registered"); + registry + .register(Box::new(TOTAL_METRICS_COLLECTED_SIZE_BY_DATE.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(TOTAL_LOGS_COLLECTED_BY_DATE.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(TOTAL_LOGS_COLLECTED_SIZE_BY_DATE.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(TOTAL_TRACES_COLLECTED_BY_DATE.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(TOTAL_TRACES_COLLECTED_SIZE_BY_DATE.clone())) + .expect("metric can be registered"); } pub fn build_metrics_handler() -> PrometheusMetrics { @@ -553,10 +631,31 @@ pub fn increment_events_ingested_by_date(count: u64, date: &str) { .inc_by(count); } -pub fn increment_events_ingested_size_by_date(size: u64, date: &str) { +pub fn increment_events_ingested_size_by_date( + size: u64, + date: &str, + telemetry_type: TelemetryType, +) { TOTAL_EVENTS_INGESTED_SIZE_BY_DATE .with_label_values(&[date]) .inc_by(size); + match telemetry_type { + TelemetryType::Logs | TelemetryType::Events => { + TOTAL_LOGS_COLLECTED_SIZE_BY_DATE + .with_label_values(&["all", date]) + .inc_by(size); + } + TelemetryType::Metrics => { + TOTAL_METRICS_COLLECTED_SIZE_BY_DATE + .with_label_values(&["all", date]) + .inc_by(size); + } + TelemetryType::Traces => { + TOTAL_TRACES_COLLECTED_SIZE_BY_DATE + .with_label_values(&["all", date]) + .inc_by(size); + } + } } pub fn increment_parquets_stored_by_date(date: &str) { @@ -634,10 +733,22 @@ pub fn increment_reasoning_llm_tokens_by_date( .inc_by(tokens); } -pub fn increment_metrics_collected_by_date(date: &str) { +pub fn increment_metrics_collected_by_date(count: u64, date: &str) { TOTAL_METRICS_COLLECTED_BY_DATE - .with_label_values(&[date]) - .inc(); + .with_label_values(&["all", date]) + .inc_by(count); +} + +pub fn increment_logs_collected_by_date(count: u64, date: &str) { + TOTAL_LOGS_COLLECTED_BY_DATE + .with_label_values(&["all", date]) + .inc_by(count); +} + +pub fn increment_traces_collected_by_date(count: u64, date: &str) { + TOTAL_TRACES_COLLECTED_BY_DATE + .with_label_values(&["all", date]) + .inc_by(count); } use actix_web::HttpResponse; diff --git a/src/otel/logs.rs b/src/otel/logs.rs index a370c4a9e..ae1abeb4e 100644 --- a/src/otel/logs.rs +++ b/src/otel/logs.rs @@ -18,6 +18,7 @@ use super::otel_utils::collect_json_from_values; use super::otel_utils::convert_epoch_nano_to_timestamp; use super::otel_utils::insert_attributes; +use crate::metrics::increment_logs_collected_by_date; use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::logs::v1::LogRecord; use opentelemetry_proto::tonic::logs::v1::LogsData; @@ -146,6 +147,9 @@ fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec> { vec_scope_log_json.push(combined_json); } + let date = chrono::Utc::now().date_naive().to_string(); + increment_logs_collected_by_date(scope_log.log_records.len() as u64, &date); + vec_scope_log_json } diff --git a/src/otel/metrics.rs b/src/otel/metrics.rs index ac2e82083..810baa54d 100644 --- a/src/otel/metrics.rs +++ b/src/otel/metrics.rs @@ -502,8 +502,6 @@ pub fn flatten_metrics_record(metrics_record: &Metric) -> Vec data_points_json.push(metric_json); } - let current_date = chrono::Utc::now().date_naive().to_string(); - increment_metrics_collected_by_date(¤t_date); data_points_json } @@ -544,6 +542,9 @@ fn process_resource_metrics( vec_scope_metrics_json.extend(flatten_metrics_record(get_metric(metric))); } + let date = chrono::Utc::now().date_naive().to_string(); + increment_metrics_collected_by_date(metrics.len() as u64, &date); + if let Some(scope) = get_scope(scope_metric) { scope_metrics_json .insert("scope_name".to_string(), Value::String(scope.name.clone())); diff --git a/src/otel/traces.rs b/src/otel/traces.rs index 3ed89c3f8..34eaed13d 100644 --- a/src/otel/traces.rs +++ b/src/otel/traces.rs @@ -25,6 +25,7 @@ use opentelemetry_proto::tonic::trace::v1::span::Event; use opentelemetry_proto::tonic::trace::v1::span::Link; use serde_json::{Map, Value}; +use crate::metrics::increment_traces_collected_by_date; use crate::otel::otel_utils::flatten_attributes; use super::otel_utils::convert_epoch_nano_to_timestamp; @@ -74,6 +75,9 @@ fn flatten_scope_span(scope_span: &ScopeSpans) -> Vec> { vec_scope_span_json.extend(span_record_json); } + let date = chrono::Utc::now().date_naive().to_string(); + increment_traces_collected_by_date(scope_span.spans.len() as u64, &date); + if let Some(scope) = &scope_span.scope { scope_span_json.insert("scope_name".to_string(), Value::String(scope.name.clone())); scope_span_json.insert( @@ -404,6 +408,7 @@ fn flatten_span_record(span_record: &Span) -> Vec> { } } } + span_records_json } diff --git a/src/storage/field_stats.rs b/src/storage/field_stats.rs index 521baa626..817a66cd3 100644 --- a/src/storage/field_stats.rs +++ b/src/storage/field_stats.rs @@ -161,6 +161,7 @@ pub async fn calculate_field_stats( SchemaVersion::V1, StreamType::Internal, &p_custom_fields, + TelemetryType::Logs, )? .process()?; }