diff --git a/Cargo.lock b/Cargo.lock index 43cda6fc4..fb0d3373f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -81,7 +81,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e01ed3140b2f8d422c68afa1ed2e85d996ea619c988ac834d255db32138655cb" dependencies = [ "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -218,7 +218,7 @@ dependencies = [ "actix-router", "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -281,7 +281,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9dd80fa0bd6217e482112d9d87a05af8e0f8dec9e3aa51f34816f761c5cf7da7" dependencies = [ "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -747,7 +747,7 @@ checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -900,7 +900,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.108", + "syn", ] [[package]] @@ -1231,7 +1231,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -1510,7 +1510,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.108", + "syn", ] [[package]] @@ -1521,7 +1521,7 @@ checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" dependencies = [ "darling_core", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -2019,7 +2019,7 @@ checksum = "1063ad4c9e094b3f798acee16d9a47bd7372d9699be2de21b05c3bd3f34ab848" dependencies = [ "datafusion-doc", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -2209,7 +2209,7 @@ checksum = "30542c1ad912e0e3d22a1935c290e12e8a29d704a420177a31faad4a601a0800" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -2230,7 +2230,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -2240,7 +2240,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab63b0e2bf4d5928aff72e83a7dace85d7bba5fe12dcc3c5a572d78caffd3f3c" dependencies = [ "derive_builder_core", - "syn 2.0.108", + "syn", ] [[package]] @@ -2253,7 +2253,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn 2.0.108", + "syn", ] [[package]] @@ -2283,7 +2283,7 @@ dependencies = [ "convert_case 0.6.0", "proc-macro2", "quote", - "syn 2.0.108", + "syn", "unicode-xid", ] @@ -2297,7 +2297,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn 2.0.108", + "syn", "unicode-xid", ] @@ -2320,7 +2320,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -2527,7 +2527,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -2706,6 +2706,12 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hmac-sha256" +version = "1.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad6880c8d4a9ebf39c6e8b77007ce223f646a4d21ce29d99f70cb16420545425" + [[package]] name = "hostname" version = "0.4.0" @@ -3064,7 +3070,7 @@ checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -3073,16 +3079,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" -[[package]] -name = "idna" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" -dependencies = [ - "unicode-bidi", - "unicode-normalization", -] - [[package]] name = "idna" version = "1.1.0" @@ -3585,7 +3581,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -3652,14 +3648,15 @@ checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" [[package]] name = "openid" -version = "0.15.0" +version = "0.18.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "627898ab5b3fff5e5f1dc0e404bafdbb87a4337d815e86149f53640380946ccc" +checksum = "c0a9d93c04da2d5e11578af6207f163c0816698b24c25a7aefae06a71e2d07bb" dependencies = [ "base64 0.22.1", "biscuit", "chrono", - "lazy_static", + "getrandom 0.3.1", + "hmac-sha256", "mime", "reqwest 0.12.12", "serde", @@ -4041,7 +4038,7 @@ checksum = "d56a66c0c55993aa927429d0f8a0abfd74f084e4d9c192cffed01e418d83eefb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -4084,7 +4081,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" dependencies = [ "proc-macro2", - "syn 2.0.108", + "syn", ] [[package]] @@ -4097,27 +4094,25 @@ dependencies = [ ] [[package]] -name = "proc-macro-error" -version = "1.0.4" +name = "proc-macro-error-attr2" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +checksum = "96de42df36bb9bba5542fe9f1a054b8cc87e172759a1868aa05c1f3acc89dfc5" dependencies = [ - "proc-macro-error-attr", "proc-macro2", "quote", - "syn 1.0.109", - "version_check", ] [[package]] -name = "proc-macro-error-attr" -version = "1.0.4" +name = "proc-macro-error2" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +checksum = "11ec05c52be0a07b08061f7dd003e7d7092e0472bc731b4af7bb1ef876109802" dependencies = [ + "proc-macro-error-attr2", "proc-macro2", "quote", - "version_check", + "syn", ] [[package]] @@ -4227,7 +4222,7 @@ dependencies = [ "itertools 0.13.0", "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -4240,7 +4235,7 @@ dependencies = [ "itertools 0.14.0", "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -4502,7 +4497,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76009fbe0614077fc1a2ce255e3a1881a2e3a3527097d5dc6d8212c585e7e38b" dependencies = [ "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -4700,7 +4695,7 @@ dependencies = [ "regex", "relative-path", "rustc_version", - "syn 2.0.108", + "syn", "unicode-ident", ] @@ -4718,7 +4713,7 @@ dependencies = [ "regex", "relative-path", "rustc_version", - "syn 2.0.108", + "syn", "unicode-ident", ] @@ -4995,7 +4990,7 @@ checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -5043,7 +5038,7 @@ checksum = "6c64451ba24fc7a6a2d60fc75dd9c83c90903b19028d4eff35e88fc1e86564e9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -5228,7 +5223,7 @@ checksum = "da5fc6819faabb412da764b99d3b713bb55083c11e7e0c00144d386cd6a1939c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -5289,7 +5284,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.108", + "syn", ] [[package]] @@ -5298,16 +5293,6 @@ version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" -[[package]] -name = "syn" -version = "1.0.109" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" -dependencies = [ - "proc-macro2", - "unicode-ident", -] - [[package]] name = "syn" version = "2.0.108" @@ -5342,7 +5327,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -5425,7 +5410,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -5436,7 +5421,7 @@ checksum = "26afc1baea8a989337eeb52b6e72a039780ce45c3edfcc9c5b9d112feeb173c2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -5552,7 +5537,7 @@ checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -5771,7 +5756,7 @@ checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -5855,27 +5840,12 @@ version = "2.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539" -[[package]] -name = "unicode-bidi" -version = "0.3.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c1cb5db39152898a79168971543b1cb5020dff7fe43c8dc468b0885f5e29df5" - [[package]] name = "unicode-ident" version = "1.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a210d160f08b701c8721ba1c726c11662f877ea6b7094007e1ca9a1041945034" -[[package]] -name = "unicode-normalization" -version = "0.1.24" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5033c97c4262335cded6d6fc3e5c18ab755e1a3dc96376350f3d8e9f009ad956" -dependencies = [ - "tinyvec", -] - [[package]] name = "unicode-segmentation" version = "1.12.0" @@ -5934,7 +5904,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08bc136a29a3d1758e07a9cca267be308aeebf5cfd5a10f3f67ab2097683ef5b" dependencies = [ "form_urlencoded", - "idna 1.1.0", + "idna", "percent-encoding", "serde", ] @@ -5970,11 +5940,11 @@ dependencies = [ [[package]] name = "validator" -version = "0.18.1" +version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db79c75af171630a3148bd3e6d7c4f42b6a9a014c2945bc5ed0020cbb8d9478e" +checksum = "d0b4a29d8709210980a09379f27ee31549b73292c87ab9899beee1c0d3be6303" dependencies = [ - "idna 0.5.0", + "idna", "once_cell", "regex", "serde", @@ -5986,16 +5956,16 @@ dependencies = [ [[package]] name = "validator_derive" -version = "0.18.2" +version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df0bcf92720c40105ac4b2dda2a4ea3aa717d4d6a862cc217da653a4bd5c6b10" +checksum = "bac855a2ce6f843beb229757e6e570a42e837bcb15e5f449dd48d5747d41bf77" dependencies = [ "darling", "once_cell", - "proc-macro-error", + "proc-macro-error2", "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -6126,7 +6096,7 @@ dependencies = [ "log", "proc-macro2", "quote", - "syn 2.0.108", + "syn", "wasm-bindgen-shared", ] @@ -6161,7 +6131,7 @@ checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -6303,7 +6273,7 @@ checksum = "9107ddc059d5b6fbfbffdfa7a7fe3e22a226def0b2608f72e9d552763d3e1ad7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -6314,7 +6284,7 @@ checksum = "29bee4b38ea3cde66011baa44dba677c432a78593e202392d1e9070cf2a7fca7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -6668,7 +6638,7 @@ checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", "synstructure", ] @@ -6699,7 +6669,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -6710,7 +6680,7 @@ checksum = "88d2b8d9c68ad2b9e4340d7832716a4d21a22a1154777ad56ea55c51a9cf3831" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] @@ -6730,7 +6700,7 @@ checksum = "595eed982f7d355beb85837f651fa22e90b3c044842dc7f2c2842c086f295808" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", "synstructure", ] @@ -6759,7 +6729,7 @@ checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.108", + "syn", ] [[package]] diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 604ab9c3a..eaaca9254 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -20,6 +20,7 @@ use self::error::StreamError; use super::cluster::utils::{IngestionStats, QueriedStats, StorageStats}; use super::query::update_schema_when_distributed; use crate::event::format::override_data_type; +use crate::handlers::http::modal::utils::logstream_utils::LogstreamAffectedResources; use crate::hottier::{CURRENT_HOT_TIER_VERSION, HotTierManager, StreamHotTier}; use crate::metadata::SchemaVersion; use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE}; @@ -494,6 +495,26 @@ pub async fn delete_stream_hot_tier( )) } +pub async fn get_affected_resources( + stream_name: Path +) -> Result { + let stream_name = stream_name.into_inner(); + + // For query mode, if the stream not found in memory map, + // check if it exists in the storage + // create stream and schema from storage + if !PARSEABLE.check_or_load_stream(&stream_name).await { + return Err(StreamNotFound(stream_name.clone()).into()); + } + + match LogstreamAffectedResources::load(&stream_name).await { + Ok(affected_resources) + => Ok((web::Json(affected_resources), StatusCode::OK)), + Err(err) + => Err(err.into()) + } +} + #[allow(unused)] fn classify_json_error(kind: serde_json::error::Category) -> StatusCode { match kind { @@ -510,13 +531,13 @@ pub mod error { use http::StatusCode; use crate::{ - hottier::HotTierError, - metastore::MetastoreError, - parseable::StreamNotFound, - storage::ObjectStorageError, + hottier::HotTierError, + metastore::MetastoreError, + parseable::StreamNotFound, + storage::ObjectStorageError, validator::error::{ AlertValidationError, HotTierValidationError, StreamNameValidationError, - }, + } }; #[allow(unused)] diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index 0440e857c..088599701 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -272,6 +272,14 @@ impl IngestServer { .authorize_for_resource(Action::GetStats), ), ) + .service( + // GET "/logstream/{logstream}/affected-resources" ==> Get affected resources for given log stream + web::resource("/affected-resources").route( + web::get() + .to(logstream::get_affected_resources) + .authorize_for_resource(Action::GetLogstreamAffectedResources), + ), + ) .service( web::scope("/retention").service( web::resource("/cleanup").route( diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index c345d3112..1e534b432 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -313,6 +313,14 @@ impl QueryServer { .authorize_for_resource(Action::GetStats), ), ) + .service( + // GET "/logstream/{logstream}/affected-resources" ==> Get affected resources for given log stream + web::resource("/affected-resources").route( + web::get() + .to(logstream::get_affected_resources) + .authorize_for_resource(Action::GetLogstreamAffectedResources), + ), + ) .service( web::resource("/retention") // PUT "/logstream/{logstream}/retention" ==> Set retention for given logstream diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 7b145ebb1..a425e4e58 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -488,6 +488,14 @@ impl Server { .authorize_for_resource(Action::GetStats), ), ) + .service( + // GET "/logstream/{logstream}/affected-resources" ==> Get affected resources for given log stream + web::resource("/affected-resources").route( + web::get() + .to(logstream::get_affected_resources) + .authorize_for_resource(Action::GetLogstreamAffectedResources), + ), + ) .service( web::resource("/retention") // PUT "/logstream/{logstream}/retention" ==> Set retention for given logstream diff --git a/src/handlers/http/modal/utils/logstream_utils.rs b/src/handlers/http/modal/utils/logstream_utils.rs index 533d5d86f..0dc046720 100644 --- a/src/handlers/http/modal/utils/logstream_utils.rs +++ b/src/handlers/http/modal/utils/logstream_utils.rs @@ -17,17 +17,30 @@ */ use actix_web::http::header::HeaderMap; +use bytes::Bytes; +use ulid::Ulid; use crate::{ - event::format::LogSource, + alerts::AlertConfig, + event::format::LogSource, handlers::{ CUSTOM_PARTITION_KEY, LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, TELEMETRY_TYPE_KEY, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, TelemetryType, - UPDATE_STREAM_KEY, - }, - storage::StreamType, + UPDATE_STREAM_KEY, http::logstream::error::StreamError, + }, + metastore::MetastoreError, + parseable::{PARSEABLE, StreamNotFound}, + rbac::role::{ + ParseableResourceType, + model::DefaultPrivilege + }, + storage::{ObjectStorageError, StorageMetadata, StreamType}, + users::dashboards::Dashboard }; +/// Field in a dashboard's tile that should contain the logstream name +const TILE_FIELD_REFERRING_TO_STREAM: &str = "dbName"; + #[derive(Debug, Default)] pub struct PutStreamHeaders { pub time_partition: String, @@ -74,3 +87,268 @@ impl From<&HeaderMap> for PutStreamHeaders { } } } + +/// Resources that rely on a specific logstream and will be affected if it gets deleted +#[derive(Debug, Default, serde::Serialize)] +pub struct LogstreamAffectedResources { + pub filters: Vec, + pub dashboards: Vec, + pub alerts: Vec, + pub roles: Vec, +} + +#[derive(Debug, Default, serde::Serialize)] +pub struct LogstreamAffectedDashboard { + pub dashboard_id: Ulid, + pub affected_tile_ids: Vec +} + +#[derive(thiserror::Error, Debug)] +pub enum LogstreamAffectedResourcesError { + #[error("(to fetch affected resources) logstream not found: {0}")] + StreamNotFound(#[from] StreamNotFound), + + #[error("(get affected resources) metastore error: {0}")] + MetastoreError(#[from] MetastoreError), + + #[error("(get affected resources) objectstore error: {0}")] + ObjectStorageError(#[from] ObjectStorageError), + + #[error("(get affected resources) could not parse JSON: {0}")] + Bytes2JSONError(#[from] Bytes2JSONError) +} + +impl LogstreamAffectedResources { + pub async fn load(stream_name: &str) -> Result { + Ok(Self { + filters: Self::fetch_affected_filters(stream_name).await?, + dashboards: Self::fetch_affected_dashboards(stream_name).await?, + alerts: Self::fetch_affected_alerts(stream_name).await?, + roles: Self::fetch_affected_roles(stream_name).await?, + }) + } + + pub async fn fetch_affected_filters( + stream_name: &str + ) -> Result, LogstreamAffectedResourcesError> { + if !PARSEABLE.streams.contains(stream_name) { + return Err(StreamNotFound(stream_name.to_string()).into()); + } + + Ok(PARSEABLE.metastore.get_filters().await? + .into_iter() + .filter_map(|filter| { + if filter.stream_name == stream_name && + let Some(f_id) = filter.filter_id { + Some(f_id) + } else { None } + }).collect()) + } + + pub async fn fetch_affected_dashboards( + stream_name: &str + ) -> Result, LogstreamAffectedResourcesError> { + if !PARSEABLE.streams.contains(stream_name) { + return Err(StreamNotFound(stream_name.to_string()).into()); + } + + let all_dashboards = PARSEABLE.metastore.get_dashboards().await?; + let mut parsed_dashboards = Vec::::new(); + + for dashboard_bytes in all_dashboards { + let dashboard = match self::bytes_to_json::(dashboard_bytes) { + Ok(d) => d, + Err(e) => { + tracing::warn!("{}", e.to_string()); + continue; + } + }; + + if !parsed_dashboards.iter().any(|d| d.dashboard_id == dashboard.dashboard_id) { + parsed_dashboards.push(dashboard); + } + } + + let mut affected_dashboards: Vec = vec![]; + + for (dash_i, dashboard) in parsed_dashboards.iter().enumerate() { + let Some(tiles) = dashboard.tiles.as_ref() else { + continue; + }; + + let mut affected_tile_ids = Vec::::new(); + for tile in tiles { + let Some(tile_fields) = tile.other_fields.as_ref() else { + continue; + }; + + let Some(tile_value) = tile_fields.get(TILE_FIELD_REFERRING_TO_STREAM) else { + continue; + }; + + if let Some(db_names) = tile_value.as_array() { + let dbs_have_stream = db_names + .iter() + .any(|db| { + if let Some(db_str) = db.as_str() { + return db_str == stream_name + } else { false } + }); + + if dbs_have_stream && !affected_tile_ids.contains(&tile.tile_id) { + affected_tile_ids.push(tile.tile_id); + } + } + } + + if !affected_tile_ids.is_empty() && dashboard.dashboard_id.is_some() { + affected_dashboards.push(LogstreamAffectedDashboard { + dashboard_id: dashboard.dashboard_id.unwrap(), + affected_tile_ids + }); + } else if !affected_tile_ids.is_empty() { + tracing::warn!("dashboard {}: [id] is missing, skipping -- for logstream {}", dash_i, stream_name); + } + } + + Ok(affected_dashboards) + } + + pub async fn fetch_affected_alerts( + stream_name: &str + ) -> Result, LogstreamAffectedResourcesError> { + if !PARSEABLE.streams.contains(stream_name) { + return Err(StreamNotFound(stream_name.to_string()).into()); + } + + let all_alerts = PARSEABLE.metastore.get_alerts().await?; + + let mut stream_alerts = Vec::::new(); + for alert_bytes in all_alerts { + let alert = match self::bytes_to_json::(alert_bytes) { + Ok(alert_val) => alert_val, + Err(e) => { + tracing::warn!("{}", e.to_string()); + continue; + } + }; + + if !alert.datasets.iter().any(|s| s == stream_name) { + continue + }; + + if !stream_alerts.contains(&alert.id) { + stream_alerts.push(alert.id); + } + } + + Ok(stream_alerts) + } + + + pub async fn fetch_affected_roles( + stream_name: &str + ) -> Result, LogstreamAffectedResourcesError> { + if !PARSEABLE.streams.contains(stream_name) { + return Err(StreamNotFound(stream_name.to_string()).into()); + } + + let metadata_bytes = PARSEABLE + .metastore + .get_parseable_metadata() + .await + .map_err(|e| ObjectStorageError::MetastoreError(Box::new(e.to_detail())))? + .ok_or_else(|| ObjectStorageError::Custom("parseable metadata not initialized".into()))?; + + let metadata = self::bytes_to_json::(metadata_bytes)?; + + let mut stream_associated_roles = Vec::::new(); + for (role_name, privileges) in &metadata.roles { + for privilege in privileges { + + let associated_stream = match privilege { + DefaultPrivilege::Ingestor { resource } => { + match resource { + ParseableResourceType::Stream(stream) => stream, + _ => continue + } + }, + + DefaultPrivilege::Reader { resource } => { + match resource { + ParseableResourceType::Stream(stream) => stream, + _ => continue + } + }, + + DefaultPrivilege::Writer { resource } => { + match resource { + ParseableResourceType::Stream(stream) => stream, + _ => continue + } + }, + + _ => continue + }; + + if associated_stream == stream_name && !stream_associated_roles.contains(role_name) { + stream_associated_roles.push(role_name.to_string()); + + // if any role privilege matches the input stream, + // add the role to the set and break + break; + } + + } + } + + Ok(stream_associated_roles) + } +} + + +impl From for StreamError { + fn from(err: LogstreamAffectedResourcesError) -> Self { + match err { + LogstreamAffectedResourcesError::StreamNotFound(e) => { + StreamError::StreamNotFound(e) + } + LogstreamAffectedResourcesError::MetastoreError(e) => { + StreamError::MetastoreError(e) + } + other => { + StreamError::Anyhow(anyhow::anyhow!(other.to_string())) + } + } + } +} + + +// utility: + +#[derive(Debug, thiserror::Error)] +pub enum Bytes2JSONError { + #[error("zero sized Bytes")] + ZeroSizedBytes, + + #[error("failed to parse bytes to JSON: {0}")] + FailedToParse(String) +} + +fn bytes_to_json(json_bytes: Bytes) -> Result { + if json_bytes.is_empty() { + return Err(Bytes2JSONError::ZeroSizedBytes); + } + + let json_bytes_value = match serde_json::from_slice::(&json_bytes) { + Ok(value) => value, + Err(err) => { + return Err(Bytes2JSONError::FailedToParse(format!("{:#?}", err))) + } + }; + + return match serde_json::from_value::(json_bytes_value.clone()) { + Ok(parsed_object) => Ok(parsed_object), + Err(e) => Err(Bytes2JSONError::FailedToParse(format!("deserialization failed: {:#?}", e))) + }; +} \ No newline at end of file diff --git a/src/rbac/role.rs b/src/rbac/role.rs index 9e54bb96a..cc9679e8b 100644 --- a/src/rbac/role.rs +++ b/src/rbac/role.rs @@ -32,6 +32,7 @@ pub enum Action { DetectSchema, GetSchema, GetStats, + GetLogstreamAffectedResources, DeleteStream, GetRetention, PutRetention, @@ -164,6 +165,7 @@ impl RoleBuilder { | Action::GetSchema | Action::DetectSchema | Action::GetStats + | Action::GetLogstreamAffectedResources | Action::GetRetention | Action::PutRetention | Action::All => Permission::Resource(action, self.resource_type.clone().unwrap()), @@ -237,6 +239,7 @@ pub mod model { Action::DetectSchema, Action::GetSchema, Action::GetStats, + Action::GetLogstreamAffectedResources, Action::GetRetention, Action::PutRetention, Action::PutHotTierEnabled, @@ -273,6 +276,7 @@ pub mod model { Action::ListStream, Action::GetSchema, Action::GetStats, + Action::GetLogstreamAffectedResources, Action::PutRetention, Action::PutAlert, Action::GetAlert, @@ -313,6 +317,7 @@ pub mod model { Action::ListStream, Action::GetSchema, Action::GetStats, + Action::GetLogstreamAffectedResources, Action::GetLLM, Action::QueryLLM, Action::ListLLM,