Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,284 changes: 683 additions & 601 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion fairy/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "fairy"
version = "0.1.0"
edition = "2021"
edition = "2024"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand Down
71 changes: 42 additions & 29 deletions fairy/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use std::process::{exit, Stdio};
use std::sync::Arc;

use futures_util::{Sink, StreamExt, TryStreamExt};
use hyper::{Body, Client, Method, Request};
use log::{debug, error, info, warn, LevelFilter};
use rocket::figment::providers::{Env, Format, Toml};
use rocket::figment::{Figment, Profile};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use std::time::Duration;
use tokio::process::Command;
use tokio_util::codec::{FramedRead, LinesCodec};
use uuid::Uuid;
Expand All @@ -17,7 +16,7 @@ use which::which;

mod error;

use crate::error::{Error, Result};
use crate::error::{Error, Result, TaskExitErr, WaitNixErr};

#[derive(Deserialize)]
pub(crate) struct AppConfig {
Expand All @@ -32,20 +31,20 @@ const CODE_NIX_NOT_INSTALLED: i32 = 1;

fn ensure_nix() {
if which("nix").is_err() {
log::error!("\"nix\" binary not found. Please install nix or run on a nix-os host.");
error!("\"nix\" binary not found. Please install nix or run on a nix-os host.");
exit(CODE_NIX_NOT_INSTALLED);
}
}

fn main() -> Result<()> {
env_logger::builder()
.filter_level(log::LevelFilter::Debug)
.filter_level(LevelFilter::Debug)
.init();

#[cfg(not(feature = "nixless-test-mode"))]
ensure_nix();

log::info!("Fairy starting up.");
info!("Fairy starting up.");

// Took from rocket source code and added .split("__") to be able to add keys in nested structures.
let rocket_config_figment = Figment::from(rocket::Config::default())
Expand All @@ -71,10 +70,20 @@ async fn api<BODY: Serialize, RESPONSE: DeserializeOwned>(
cfg: &AppConfig,
method: Method,
endpoint: &str,
q: &BODY,
q: Option<&BODY>,
) -> Result<RESPONSE> {
let client = Client::new();
let req_data = serde_json::to_vec(q).context(error::SerializeErr)?;
let req_data = q
.map(serde_json::to_vec)
.transpose()
.context(error::SerializeErr)?
.unwrap_or_default();
//
// let content_type = if req_data.is_empty() {
// "text/plain"
// } else {
// "application/json"
// };

let request = Request::builder()
.uri(format!("{}/{}", cfg.vicky_url, endpoint))
Expand All @@ -93,10 +102,14 @@ async fn api<BODY: Serialize, RESPONSE: DeserializeOwned>(
}
);

let resp_data = hyper::body::to_bytes(response.into_body())
.await
.context(error::ReadBodyErr)?;
serde_json::from_slice(&resp_data).context(error::DecodeResponseErr)
if size_of::<RESPONSE>() == 0 {
serde_json::from_str("null").context(error::DecodeResponseErr)
} else {
let resp_data = hyper::body::to_bytes(response.into_body())
.await
.context(error::ReadBodyErr)?;
serde_json::from_slice(&resp_data).context(error::DecodeResponseErr)
}
}

fn log_sink(cfg: Arc<AppConfig>, task_id: Uuid) -> impl Sink<Vec<String>, Error = Error> + Send {
Expand All @@ -107,17 +120,17 @@ fn log_sink(cfg: Arc<AppConfig>, task_id: Uuid) -> impl Sink<Vec<String>, Error
&cfg,
Method::POST,
&format!("api/v1/tasks/{task_id}/logs"),
&serde_json::json!({ "lines": lines }),
Some(&serde_json::json!({ "lines": lines })),
)
.await;

match response {
Ok(_) => {
log::info!("logged {} line(s) from task", lines.len());
info!("logged {} line(s) from task", lines.len());
Ok(())
}
Err(e) => {
log::error!(
error!(
"could not log from task. {} lines were dropped",
lines.len()
);
Expand Down Expand Up @@ -190,7 +203,7 @@ async fn run_task(cfg: Arc<AppConfig>, task: Task) {
#[cfg(not(feature = "nixless-test-mode"))]
let result = match try_run_task(cfg.clone(), &task).await {
Err(e) => {
log::info!("task failed: {} {} ({})", task.id, task.display_name, e);
info!("task failed: {} {} ({:?})", task.id, task.display_name, e);
TaskResult::Error
}
Ok(_) => TaskResult::Success,
Expand All @@ -199,48 +212,48 @@ async fn run_task(cfg: Arc<AppConfig>, task: Task) {
#[cfg(feature = "nixless-test-mode")]
let result = TaskResult::Success;

tokio::time::sleep(std::time::Duration::from_secs(1)).await;
tokio::time::sleep(Duration::from_secs(1)).await;
let _ = api::<_, ()>(
&cfg,
Method::POST,
&format!("api/v1/tasks/{}/finish", task.id),
&serde_json::json!({ "result": result }),
Some(&serde_json::json!({ "result": result })),
)
.await;
}

async fn try_claim(cfg: Arc<AppConfig>) -> Result<()> {
log::debug!("trying to claim task...");
debug!("trying to claim task...");
if let Some(task) = api::<_, Option<Task>>(
&cfg,
Method::POST,
"api/v1/tasks/claim",
&serde_json::json!({ "features": cfg.features }),
Some(&serde_json::json!({ "features": cfg.features })),
)
.await?
{
log::info!("task claimed: {} {} 🎉", task.id, task.display_name);
log::debug!("{:#?}", task);
info!("task claimed: {} {} 🎉", task.id, task.display_name);
debug!("{:#?}", task);

tokio::task::spawn(run_task(cfg.clone(), task));
} else {
log::debug!("no work available...");
debug!("no work available...");
}

Ok(())
}

#[tokio::main(flavor = "current_thread")]
async fn run(cfg: AppConfig) -> Result<()> {
log::info!("config valid, starting communication with vicky");
log::info!("waiting for tasks...");
info!("config valid, starting communication with vicky");
info!("waiting for tasks...");

let cfg = Arc::new(cfg);
loop {
if let Err(e) = try_claim(cfg.clone()).await {
log::error!("{e}");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
error!("{e}");
tokio::time::sleep(Duration::from_secs(5)).await;
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
34 changes: 16 additions & 18 deletions vicky/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,35 +1,33 @@
[package]
name = "vicky"
version = "0.1.0"
edition = "2021"
edition = "2024"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
clap = { version = "4.3.17", features = ["derive"] }
env_logger = "0.11.3"
log = "0.4.19"
rand = "0.8.5"
clap = { version = "4.5", features = ["derive"] }
env_logger = "0.11"
log = "0.4"
snafu = "0.8"
thiserror = "1.0.43"
thiserror = "2.0"
tokio = { version = "1", features = ["full", "sync"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
async-trait = "0.1.71"
async-trait = "0.1"
# https://github.com/awesomized/crc64fast-nvme/issues/5
aws-sdk-s3 = "=1.34.0"
uuid = { version = "1.18.1", features = ["fast-rng", "v4", "serde"] }
rocket = { version = "0.5.1", features = ["json", "secrets", "uuid"] }
rocket_dyn_templates = { version = "0.2.0", features = ["tera"] }
uuid = { version = "1.19", features = ["fast-rng", "v4", "serde"] }
rocket = { version = "0.5", features = ["json", "secrets", "uuid"] }
rocket_sync_db_pools = { version = "0.1", features = ["diesel_postgres_pool"] }
reqwest = { version = "0.12.4", features = ["json"] }
jwtk = "0.3.0"
diesel = { version = "2.1.6", features = ["postgres", "uuid", "r2d2", "chrono"] }
itertools = { version = "0.13.0" }
diesel_migrations = { version = "2.2.0", features = ["postgres"] }
chrono = { version= "0.4.39", features=["serde"] }
delegate = "0.13.5"
strum = { version = "0.27.2", features = ["derive"] }
reqwest = { version = "0.13", features = ["json"] }
jwtk = "0.4"
diesel = { version = "2.3", features = ["postgres", "uuid", "r2d2", "chrono", "postgres_backend"] }
itertools = { version = "0.14" }
diesel_migrations = { version = "2.3", features = ["postgres"] }
chrono = { version= "0.4", features=["serde"] }
delegate = "0.13"
strum = { version = "0.27", features = ["derive"] }
bon = "3.8"

[[bin]]
Expand Down
2 changes: 1 addition & 1 deletion vicky/src/bin/vicky/auth.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use jwtk::jwk::RemoteJwksVerifier;
use log::warn;
use rocket::http::Status;
use rocket::{request, Request, State};
use rocket::{Request, State, request};
use serde_json::{Map, Value};
use std::str::FromStr;
use uuid::Uuid;
Expand Down
3 changes: 2 additions & 1 deletion vicky/src/bin/vicky/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use aws_sdk_s3::config::{BehaviorVersion, Credentials, Region};
use log::info;
use rocket::figment::providers::{Env, Format, Toml};
use rocket::figment::{Figment, Profile};
use rocket::serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -66,7 +67,7 @@ impl S3Config {
}

pub fn build_config(&self) -> aws_sdk_s3::Config {
log::info!("building s3 client");
info!("building s3 client");

aws_sdk_s3::Config::builder()
.behavior_version(BehaviorVersion::v2024_03_28())
Expand Down
2 changes: 1 addition & 1 deletion vicky/src/bin/vicky/errors.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use log::error;
use rocket::{http::Status, response::Responder, Request};
use rocket::{Request, http::Status, response::Responder};
use thiserror::Error;
use tokio::sync::broadcast::error::SendError;
use vickylib::errors::VickyError;
Expand Down
2 changes: 1 addition & 1 deletion vicky/src/bin/vicky/events.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use rocket::response::stream::{Event, EventStream};
use rocket::{get, State};
use rocket::{State, get};
use serde::{Deserialize, Serialize};
use std::time;
use tokio::sync::broadcast::{self, error::TryRecvError};
Expand Down
30 changes: 16 additions & 14 deletions vicky/src/bin/vicky/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::config::{build_rocket_config, Config, OIDCConfigResolved};
use crate::events::{get_global_events, GlobalEvent};
use crate::config::{Config, OIDCConfigResolved, build_rocket_config};
use crate::events::{GlobalEvent, get_global_events};
use crate::locks::{
locks_get_active, locks_get_detailed_poisoned, locks_get_poisoned, locks_unlock,
};
Expand All @@ -10,9 +10,10 @@ use crate::tasks::{
};
use crate::user::get_user;
use crate::webconfig::get_web_config;
use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness};
use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations};
use errors::AppError;
use jwtk::jwk::RemoteJwksVerifier;
use log::{error, info, trace, warn, LevelFilter};
use rocket::fairing::AdHoc;
use rocket::{routes, Build, Rocket};
use snafu::ResultExt;
Expand All @@ -37,21 +38,21 @@ pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
fn run_migrations(connection: &mut impl MigrationHarness<diesel::pg::Pg>) -> Result<(), AppError> {
match connection.run_pending_migrations(MIGRATIONS) {
Ok(_) => {
log::info!("Migrations successfully completed");
info!("Migrations successfully completed");
Ok(())
}
Err(e) => {
log::error!("Error running migrations {e}");
error!("Error running migrations {e}");
Err(AppError::MigrationError(e.to_string()))
}
}
}

async fn run_rocket_migrations(rocket: Rocket<Build>) -> Result<Rocket<Build>, Rocket<Build>> {
log::info!("Running database migrations");
info!("Running database migrations");

let Some(db) = Database::get_one(&rocket).await else {
log::error!("Failed to get a database connection");
error!("Failed to get a database connection");
return Err(rocket);
};

Expand All @@ -64,31 +65,31 @@ async fn run_rocket_migrations(rocket: Rocket<Build>) -> Result<Rocket<Build>, R
#[tokio::main]
async fn main() {
if let Err(e) = inner_main().await {
log::error!("Fatal: {e}");
error!("Fatal: {e}");
}
}

async fn inner_main() -> Result<()> {
env_logger::builder()
.filter_module("vicky", log::LevelFilter::Debug)
.filter_module("vicky", LevelFilter::Debug)
.init();
log::info!("vicky starting...");
info!("vicky starting...");

log::info!("loading service config...");
info!("loading service config...");
let rocket_config = build_rocket_config();
let app_config = rocket_config
.extract::<Config>()
.context(startup::ConfigErr)?;
let build_rocket = rocket::custom(build_rocket_config());

log::info!(
info!(
"fetching OIDC discovery from {}",
app_config.oidc_config.well_known_uri
);
let oidc_config_resolved =
startup::fetch_oidc_config(&app_config.oidc_config.well_known_uri).await?;

log::info!(
info!(
"Fetched OIDC configuration, found jwks_uri={}",
oidc_config_resolved.jwks_uri
);
Expand Down Expand Up @@ -127,7 +128,8 @@ async fn serve_web_api(
log_drain: LogDrain,
tx_global_events: Sender<GlobalEvent>,
) -> Result<()> {
log::info!("starting web api");
info!("starting web api");

build_rocket
.manage(s3_log_bucket_client)
.manage(log_drain)
Expand Down
Loading
Loading