Skip to content

Commit 83f2b70

Browse files
reeceyangConvex, Inc.
authored andcommitted
database migration for backfilling the webhook log sink HMAC secret (#43305)
GitOrigin-RevId: 932731797091e8ab4221308e91adc71e8dd5df34
1 parent ef4ee69 commit 83f2b70

File tree

9 files changed

+893
-1
lines changed

9 files changed

+893
-1
lines changed

crates/migrations_model/src/lib.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,14 @@ use value::{
2424

2525
pub mod migr_119;
2626
pub mod migr_121;
27+
pub mod migr_124;
2728

2829
pub type DatabaseVersion = i64;
2930
// The version for the format of the database. We support all previous
3031
// migrations unless explicitly dropping support.
3132
// Add a user name next to the version when you make a change to highlight merge
3233
// conflicts.
33-
pub const DATABASE_VERSION: DatabaseVersion = 123; // emma
34+
pub const DATABASE_VERSION: DatabaseVersion = 124; // reece
3435

3536
pub struct MigrationExecutor<RT: Runtime> {
3637
pub db: Database<RT>,
@@ -86,6 +87,14 @@ impl<RT: Runtime> MigrationExecutor<RT> {
8687
// table for each component, _scheduled_job_args
8788
MigrationCompletionCriterion::MigrationComplete(to_version)
8889
},
90+
124 => {
91+
let mut tx = self.db.begin_system().await?;
92+
migr_124::run_migration(&mut tx).await?;
93+
self.db
94+
.commit_with_write_source(tx, "migration_124")
95+
.await?;
96+
MigrationCompletionCriterion::MigrationComplete(to_version)
97+
},
8998
// NOTE: Make sure to increase DATABASE_VERSION when adding new migrations.
9099
_ => anyhow::bail!("Version did not define a migration! {}", to_version),
91100
};
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
use std::sync::LazyLock;
2+
3+
use common::{
4+
self,
5+
document::ParsedDocument,
6+
runtime::Runtime,
7+
};
8+
use database::{
9+
system_tables::{
10+
SystemIndex,
11+
SystemTable,
12+
},
13+
Transaction,
14+
};
15+
use value::{
16+
TableName,
17+
TableNamespace,
18+
};
19+
20+
pub mod types;
21+
use types::{
22+
LogSinksRow,
23+
SinkState,
24+
SinkType,
25+
};
26+
27+
pub static LOG_SINKS_TABLE: LazyLock<TableName> = LazyLock::new(|| {
28+
"_log_sinks"
29+
.parse()
30+
.expect("Invalid built-in _log_sinks table")
31+
});
32+
33+
pub struct LogSinksTable;
34+
impl SystemTable for LogSinksTable {
35+
type Metadata = LogSinksRow;
36+
37+
fn table_name() -> &'static TableName {
38+
&LOG_SINKS_TABLE
39+
}
40+
41+
fn indexes() -> Vec<SystemIndex<Self>> {
42+
vec![]
43+
}
44+
}
45+
46+
pub struct LogSinksModel<'a, RT: Runtime> {
47+
tx: &'a mut Transaction<RT>,
48+
}
49+
50+
impl<'a, RT: Runtime> LogSinksModel<'a, RT> {
51+
pub fn new(tx: &'a mut Transaction<RT>) -> Self {
52+
Self { tx }
53+
}
54+
55+
pub async fn get_by_provider(
56+
&mut self,
57+
provider: SinkType,
58+
) -> anyhow::Result<Option<ParsedDocument<LogSinksRow>>> {
59+
let mut result: Vec<_> = self
60+
.get_by_provider_including_tombstoned(provider.clone())
61+
.await?
62+
.into_iter()
63+
.filter(|doc| doc.status != SinkState::Tombstoned)
64+
.collect();
65+
anyhow::ensure!(
66+
result.len() <= 1,
67+
"Multiple sinks found of the same type: {:?}",
68+
provider
69+
);
70+
Ok(result.pop())
71+
}
72+
73+
async fn get_by_provider_including_tombstoned(
74+
&mut self,
75+
provider: SinkType,
76+
) -> anyhow::Result<Vec<ParsedDocument<LogSinksRow>>> {
77+
let result: Vec<_> = self
78+
.get_all()
79+
.await?
80+
.into_iter()
81+
.filter(|doc| doc.config.sink_type() == provider)
82+
.collect();
83+
Ok(result)
84+
}
85+
86+
pub async fn get_all(&mut self) -> anyhow::Result<Vec<ParsedDocument<LogSinksRow>>> {
87+
let result = self
88+
.tx
89+
.query_system(
90+
TableNamespace::Global,
91+
&SystemIndex::<LogSinksTable>::by_id(),
92+
)?
93+
.all()
94+
.await?
95+
.into_iter()
96+
.map(|arc_row| (*arc_row).clone())
97+
.collect();
98+
Ok(result)
99+
}
100+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
use std::{
2+
fmt,
3+
str::FromStr,
4+
};
5+
6+
use common::{
7+
log_streaming::LogEventFormatVersion,
8+
pii::PII,
9+
};
10+
use serde::{
11+
Deserialize,
12+
Serialize,
13+
};
14+
15+
#[derive(Debug, Clone, PartialEq)]
16+
#[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))]
17+
pub struct AxiomConfig {
18+
pub api_key: PII<String>,
19+
pub dataset_name: String,
20+
pub attributes: Vec<AxiomAttribute>,
21+
pub version: LogEventFormatVersion,
22+
}
23+
24+
#[derive(Serialize, Deserialize)]
25+
#[serde(rename_all = "camelCase")]
26+
pub struct SerializedAxiomConfig {
27+
pub api_key: String,
28+
pub dataset_name: String,
29+
pub attributes: Vec<SerializedAxiomAttribute>,
30+
pub version: Option<String>,
31+
}
32+
33+
impl From<AxiomConfig> for SerializedAxiomConfig {
34+
fn from(value: AxiomConfig) -> Self {
35+
Self {
36+
api_key: value.api_key.0,
37+
dataset_name: value.dataset_name,
38+
attributes: value
39+
.attributes
40+
.into_iter()
41+
.map(SerializedAxiomAttribute::from)
42+
.collect(),
43+
version: Some(value.version.to_string()),
44+
}
45+
}
46+
}
47+
48+
impl TryFrom<SerializedAxiomConfig> for AxiomConfig {
49+
type Error = anyhow::Error;
50+
51+
fn try_from(value: SerializedAxiomConfig) -> Result<Self, Self::Error> {
52+
Ok(Self {
53+
api_key: PII(value.api_key),
54+
dataset_name: value.dataset_name,
55+
attributes: value
56+
.attributes
57+
.into_iter()
58+
.map(AxiomAttribute::from)
59+
.collect(),
60+
version: value
61+
.version
62+
.map(|v| LogEventFormatVersion::from_str(v.as_str()))
63+
.transpose()?
64+
.unwrap_or(LogEventFormatVersion::V1),
65+
})
66+
}
67+
}
68+
69+
impl fmt::Display for AxiomConfig {
70+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71+
write!(f, "AxiomConfig {{ version:{:?} ... }}", self.version)
72+
}
73+
}
74+
75+
#[derive(Deserialize, Debug, Clone, PartialEq, Eq)]
76+
#[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))]
77+
pub struct AxiomAttribute {
78+
pub key: String,
79+
pub value: String,
80+
}
81+
82+
#[derive(Serialize, Deserialize)]
83+
#[serde(rename_all = "camelCase")]
84+
pub struct SerializedAxiomAttribute {
85+
pub key: String,
86+
pub value: String,
87+
}
88+
89+
impl From<AxiomAttribute> for SerializedAxiomAttribute {
90+
fn from(attribute: AxiomAttribute) -> Self {
91+
Self {
92+
key: attribute.key,
93+
value: attribute.value,
94+
}
95+
}
96+
}
97+
98+
impl From<SerializedAxiomAttribute> for AxiomAttribute {
99+
fn from(value: SerializedAxiomAttribute) -> Self {
100+
Self {
101+
key: value.key,
102+
value: value.value,
103+
}
104+
}
105+
}

0 commit comments

Comments
 (0)