diff --git a/src/aws/flb_aws_credentials_ec2.c b/src/aws/flb_aws_credentials_ec2.c index 2722e26d223..9446083126b 100644 --- a/src/aws/flb_aws_credentials_ec2.c +++ b/src/aws/flb_aws_credentials_ec2.c @@ -125,12 +125,26 @@ struct flb_aws_credentials *get_credentials_fn_ec2(struct flb_aws_provider return creds; } +/** + * Force an immediate refresh of EC2 IMDS credentials for the given provider. + * + * Attempts to acquire the provider lock and, if successful, triggers an immediate + * credentials refresh from the EC2 Instance Metadata Service. If the lock cannot + * be acquired the function does not perform a refresh. + * + * @param provider The AWS provider whose EC2 IMDS credentials should be refreshed. + * @returns `0` on successful credential refresh, `-1` if the refresh failed or did not occur. + */ int refresh_fn_ec2(struct flb_aws_provider *provider) { struct flb_aws_provider_ec2 *implementation = provider->implementation; int ret = -1; flb_debug("[aws_credentials] Refresh called on the EC2 IMDS provider"); + if (try_lock_provider(provider)) { + /* Set to 1 (epoch start) to trigger immediate refresh via time check */ + implementation->next_refresh = 1; + ret = get_creds_ec2(implementation); unlock_provider(provider); } @@ -379,4 +393,4 @@ static int ec2_credentials_request(struct flb_aws_provider_ec2 flb_sds_destroy(credentials_response); return 0; -} +} \ No newline at end of file diff --git a/src/aws/flb_aws_credentials_http.c b/src/aws/flb_aws_credentials_http.c index 8ba78b788fd..a4bdec8b90c 100644 --- a/src/aws/flb_aws_credentials_http.c +++ b/src/aws/flb_aws_credentials_http.c @@ -152,12 +152,26 @@ struct flb_aws_credentials *get_credentials_fn_http(struct flb_aws_provider return NULL; } +/** + * Trigger an immediate credentials refresh for an HTTP provider. + * + * If the provider can be locked, forces an immediate refresh and performs a + * credential fetch using the provider's HTTP implementation; the lock is + * released after the fetch completes. If the provider lock cannot be + * acquired, no refresh is attempted. + * + * @param provider AWS provider that contains the HTTP implementation to refresh. + * @returns `0` on successful credential retrieval and update, `-1` on failure or if the provider lock could not be acquired. + */ int refresh_fn_http(struct flb_aws_provider *provider) { struct flb_aws_provider_http *implementation = provider->implementation; int ret = -1; flb_debug("[aws_credentials] Refresh called on the http provider"); if (try_lock_provider(provider)) { + /* Set to 1 (epoch start) to trigger immediate refresh via time check */ + implementation->next_refresh = 1; + ret = http_credentials_request(implementation); unlock_provider(provider); } @@ -690,4 +704,4 @@ struct flb_aws_credentials *flb_parse_json_credentials(char *response, flb_aws_credentials_destroy(creds); flb_free(tokens); return NULL; -} +} \ No newline at end of file diff --git a/src/aws/flb_aws_credentials_profile.c b/src/aws/flb_aws_credentials_profile.c index 48cb9299572..6abd1ee8498 100644 --- a/src/aws/flb_aws_credentials_profile.c +++ b/src/aws/flb_aws_credentials_profile.c @@ -643,6 +643,19 @@ static int get_shared_config_credentials(char* config_path, return result; } +/** + * Load AWS credentials for the given profile from the shared credentials file. + * + * Allocates and fills a flb_aws_credentials structure pointed to by `*creds` when a matching + * profile is found in the file at `credentials_path`. On failure the function frees any + * allocated resources and sets `*creds` to NULL. + * + * @param credentials_path Path to the shared credentials file. + * @param profile Name of the profile to load. + * @param creds Output pointer that will receive an allocated credentials structure on success. + * @param debug_only If non-zero, suppresses warning-level messages in favor of debug-level logging. + * @return `0` on success (credentials populated in `*creds`), `-1` on failure (`*creds` is set to NULL). + */ static int get_shared_credentials(char* credentials_path, char* profile, struct flb_aws_credentials** creds, @@ -663,8 +676,7 @@ static int get_shared_credentials(char* credentials_path, if (flb_read_file(credentials_path, &buf, &size) < 0) { if (errno == ENOENT) { - AWS_CREDS_ERROR_OR_DEBUG(debug_only, "Shared credentials file %s does not exist", - credentials_path); + AWS_CREDS_DEBUG("Shared credentials file %s does not exist", credentials_path); } else { flb_errno(); AWS_CREDS_ERROR_OR_DEBUG(debug_only, "Could not read shared credentials file %s", @@ -750,4 +762,4 @@ static int refresh_credentials(struct flb_aws_provider_profile *implementation, error: flb_aws_credentials_destroy(creds); return -1; -} +} \ No newline at end of file diff --git a/src/aws/flb_aws_credentials_sts.c b/src/aws/flb_aws_credentials_sts.c index 554fac20353..9c896d4c579 100644 --- a/src/aws/flb_aws_credentials_sts.c +++ b/src/aws/flb_aws_credentials_sts.c @@ -170,13 +170,25 @@ struct flb_aws_credentials *get_credentials_fn_sts(struct flb_aws_provider return NULL; } +/** + * Trigger an immediate refresh of STS credentials for the given provider. + * + * Sets the provider's next_refresh to epoch start to force an immediate AssumeRole + * request and attempts to perform the STS AssumeRole call to update cached credentials. + * + * @param provider The AWS provider instance whose STS implementation will be refreshed. + * @returns `0` if the credentials were successfully refreshed; `-1` on failure or if the provider lock could not be acquired. + */ int refresh_fn_sts(struct flb_aws_provider *provider) { int ret = -1; struct flb_aws_provider_sts *implementation = provider->implementation; flb_debug("[aws_credentials] Refresh called on the STS provider"); - + if (try_lock_provider(provider)) { + /* Set to 1 (epoch start) to trigger immediate refresh via time check */ + implementation->next_refresh = 1; + ret = sts_assume_role_request(implementation->sts_client, &implementation->creds, implementation->uri, &implementation->next_refresh); @@ -475,12 +487,24 @@ struct flb_aws_credentials *get_credentials_fn_eks(struct flb_aws_provider return NULL; } +/** + * Trigger a credentials refresh for the EKS provider. + * + * Attempts to acquire the provider lock, forces an immediate refresh window, and requests new credentials using the web-identity flow. + * + * @param provider EKS provider instance. + * @returns 0 on success, -1 on failure or if the provider lock could not be acquired. + */ int refresh_fn_eks(struct flb_aws_provider *provider) { int ret = -1; struct flb_aws_provider_eks *implementation = provider->implementation; flb_debug("[aws_credentials] Refresh called on the EKS provider"); + if (try_lock_provider(provider)) { + /* Set to 1 (epoch start) to trigger immediate refresh via time check */ + implementation->next_refresh = 1; + ret = assume_with_web_identity(implementation); unlock_provider(provider); } @@ -955,4 +979,4 @@ static flb_sds_t get_node(char *cred_node, char* node_name, int node_name_len, c } return val; -} +} \ No newline at end of file diff --git a/src/aws/flb_aws_msk_iam.c b/src/aws/flb_aws_msk_iam.c index cf8af7d0cc8..ac222c13c55 100644 --- a/src/aws/flb_aws_msk_iam.c +++ b/src/aws/flb_aws_msk_iam.c @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -37,11 +38,13 @@ #include #include -/* Lightweight config - NO persistent AWS provider */ +/* Lightweight config - provider manages credential caching and refresh internally */ struct flb_aws_msk_iam { - struct flb_config *flb_config; /* For creating AWS provider on-demand */ + struct flb_config *flb_config; flb_sds_t region; flb_sds_t cluster_arn; + struct flb_tls *cred_tls; /* TLS instance for AWS credentials (STS) */ + struct flb_aws_provider *provider; /* AWS credentials provider (created once, reused) */ }; /* Utility functions - same as before */ @@ -125,6 +128,17 @@ static int hmac_sha256_sign(unsigned char out[32], return 0; } +/** + * Extracts the AWS region component from an ARN. + * + * Parses an ARN of the form "arn:partition:service:region:account-id/..." and returns + * the region substring. + * + * @param arn ARN string (expected format: "arn:partition:service:region:...") + * @returns Newly allocated NUL-terminated string containing the region on success; + * `NULL` if the ARN is malformed or memory allocation fails. Caller is + * responsible for freeing the returned string. + */ static char *extract_region(const char *arn) { const char *p; @@ -162,12 +176,22 @@ static char *extract_region(const char *arn) return out; } -/* Stateless payload generator - creates AWS provider on demand */ +/** + * Build a Base64 URL-encoded presigned URL payload for AWS MSK IAM authentication. + * + * Given an MSK IAM configuration, target host, and AWS credentials, construct a + * presigned URL suitable for MSK IAM OAuth bearer tokens and return it encoded + * using URL-safe Base64 (no padding). + * + * @param config MSK IAM configuration containing at least a valid region. + * @param host DNS host name that will be used as the request target (must be non-empty). + * @param creds AWS credentials containing access key ID and secret access key; may include a session token. + * @returns A newly allocated SDS string containing the URL-safe Base64-encoded presigned URL (no padding), + * or NULL on failure. The caller is responsible for destroying the returned SDS string. static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, - const char *host) + const char *host, + struct flb_aws_credentials *creds) { - struct flb_aws_provider *temp_provider = NULL; - struct flb_aws_credentials *creds = NULL; flb_sds_t payload = NULL; int encode_result; char *p; @@ -214,37 +238,17 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, return NULL; } - flb_info("[aws_msk_iam] build_msk_iam_payload: generating payload for host: %s, region: %s", - host, config->region); - - /* Create AWS provider on-demand */ - temp_provider = flb_standard_chain_provider_create(config->flb_config, NULL, - config->region, NULL, NULL, - flb_aws_client_generator(), - NULL); - if (!temp_provider) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to create AWS credentials provider"); - return NULL; - } + flb_debug("[aws_msk_iam] build_msk_iam_payload: generating payload for host: %s, region: %s", + host, config->region); - if (temp_provider->provider_vtable->init(temp_provider) != 0) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to initialize AWS credentials provider"); - flb_aws_provider_destroy(temp_provider); - return NULL; - } - - /* Get credentials */ - creds = temp_provider->provider_vtable->get_credentials(temp_provider); + /* Validate credentials */ if (!creds) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to get credentials"); - flb_aws_provider_destroy(temp_provider); + flb_error("[aws_msk_iam] build_msk_iam_payload: credentials are NULL"); return NULL; } if (!creds->access_key_id || !creds->secret_access_key) { flb_error("[aws_msk_iam] build_msk_iam_payload: incomplete credentials"); - flb_aws_credentials_destroy(creds); - flb_aws_provider_destroy(temp_provider); return NULL; } @@ -547,12 +551,6 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, if (session_token_enc) { flb_sds_destroy(session_token_enc); } - if (creds) { - flb_aws_credentials_destroy(creds); - } - if (temp_provider) { - flb_aws_provider_destroy(temp_provider); - } return payload; @@ -594,18 +592,27 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, if (session_token_enc) { flb_sds_destroy(session_token_enc); } - if (creds) { - flb_aws_credentials_destroy(creds); - } - if (temp_provider) { - flb_aws_provider_destroy(temp_provider); + if (empty_payload_hex) { + flb_sds_destroy(empty_payload_hex); } return NULL; } -/* Stateless callback - creates AWS provider on-demand for each refresh */ +/** + * Refreshes AWS MSK IAM credentials, builds an MSK IAM presigned URL payload, + * and supplies an OAuth bearer token to librdkafka. + * + * On success this callback sets an OAuth bearer token derived from the + * provider-supplied AWS credentials (token lifetime set to 15 minutes). + * On failure it notifies librdkafka of the token acquisition failure. + * + * @param rk Kafka client instance used to report the refreshed token or failure. + * @param oauthbearer_config Unused by this implementation; provided by librdkafka. + * @param opaque Opaque pointer expected to be a flb_kafka_opaque containing + * a valid msk_iam_ctx (struct flb_aws_msk_iam *). + */ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, const char *oauthbearer_config, void *opaque) @@ -622,7 +629,6 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, struct flb_aws_msk_iam *config; struct flb_aws_credentials *creds = NULL; struct flb_kafka_opaque *kafka_opaque; - struct flb_aws_provider *temp_provider = NULL; (void) oauthbearer_config; kafka_opaque = (struct flb_kafka_opaque *) opaque; @@ -644,57 +650,76 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, return; } - /* Determine host endpoint */ + /* + * Use MSK generic endpoint for IAM authentication. + * AWS MSK IAM supports both cluster-specific and generic regional endpoints. + * Generic endpoints are recommended as they work across all brokers in the region. + */ if (config->cluster_arn) { arn_len = strlen(config->cluster_arn); suffix_len = strlen(s3_suffix); if (arn_len >= suffix_len && strcmp(config->cluster_arn + arn_len - suffix_len, s3_suffix) == 0) { snprintf(host, sizeof(host), "kafka-serverless.%s.amazonaws.com", config->region); - flb_info("[aws_msk_iam] MSK Serverless cluster, using generic endpoint: %s", host); + flb_debug("[aws_msk_iam] using MSK Serverless generic endpoint: %s", host); } else { snprintf(host, sizeof(host), "kafka.%s.amazonaws.com", config->region); - flb_info("[aws_msk_iam] Regular MSK cluster, using generic endpoint: %s", host); + flb_debug("[aws_msk_iam] using MSK generic endpoint: %s", host); } } else { snprintf(host, sizeof(host), "kafka.%s.amazonaws.com", config->region); - flb_info("[aws_msk_iam] Regular MSK cluster, using generic endpoint: %s", host); + flb_debug("[aws_msk_iam] using MSK generic endpoint: %s", host); } - flb_info("[aws_msk_iam] requesting MSK IAM payload for region: %s, host: %s", config->region, host); + flb_debug("[aws_msk_iam] requesting MSK IAM payload for region: %s, host: %s", config->region, host); + + /* + * Refresh credentials before generating OAuth token. + * This is necessary because provider's passive refresh only triggers when + * get_credentials is called and detects expiration. However, OAuth tokens + * are refreshed every ~15 minutes while IAM credentials expire after ~1 hour. + * If OAuth callbacks are spaced far apart, the passive refresh may not trigger + * before credentials expire, causing authentication failures. + */ + int rc = config->provider->provider_vtable->refresh(config->provider); + if (rc < 0) { + flb_warn("[aws_msk_iam] AWS provider refresh() failed (rc=%d), continuing to get_credentials()", rc); + } + + /* Get credentials from provider */ + creds = config->provider->provider_vtable->get_credentials(config->provider); + if (!creds) { + flb_error("[aws_msk_iam] failed to get AWS credentials from provider"); + rd_kafka_oauthbearer_set_token_failure(rk, "credential retrieval failed"); + return; + } - /* Generate payload using stateless function - creates and destroys AWS provider internally */ - payload = build_msk_iam_payload(config, host); + /* Generate payload using credentials from provider */ + payload = build_msk_iam_payload(config, host, creds); if (!payload) { flb_error("[aws_msk_iam] failed to generate MSK IAM payload"); + flb_aws_credentials_destroy(creds); rd_kafka_oauthbearer_set_token_failure(rk, "payload generation failed"); return; } - /* Get credentials for principal (create temporary provider just for this) */ - temp_provider = flb_standard_chain_provider_create(config->flb_config, NULL, - config->region, NULL, NULL, - flb_aws_client_generator(), - NULL); - if (temp_provider) { - if (temp_provider->provider_vtable->init(temp_provider) == 0) { - creds = temp_provider->provider_vtable->get_credentials(temp_provider); - } - } - now = time(NULL); md_lifetime_ms = (now + 900) * 1000; err = rd_kafka_oauthbearer_set_token(rk, payload, md_lifetime_ms, - creds ? creds->access_key_id : "unknown", + creds->access_key_id, NULL, 0, errstr, sizeof(errstr)); + /* Destroy credentials immediately after use (standard pattern) */ + flb_aws_credentials_destroy(creds); + creds = NULL; + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { flb_error("[aws_msk_iam] failed to set OAuth bearer token: %s", errstr); rd_kafka_oauthbearer_set_token_failure(rk, errstr); @@ -703,20 +728,23 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, flb_info("[aws_msk_iam] OAuth bearer token successfully set"); } - /* Clean up everything immediately - no memory leaks possible! */ - if (creds) { - flb_aws_credentials_destroy(creds); - } - if (temp_provider) { - flb_aws_provider_destroy(temp_provider); - } - + /* Clean up - payload only (creds already destroyed) */ if (payload) { flb_sds_destroy(payload); } } -/* Register callback with lightweight config - keeps your current interface */ +/** + * Create and initialize an MSK IAM context, register the OAuth bearer token refresh callback, + * and prepare an AWS credentials provider for subsequent token refreshes. + * + * @param config Fluent Bit main configuration (used to create the AWS provider and TLS). + * @param kconf librdkafka configuration where the OAuth bearer token refresh callback will be set. + * @param cluster_arn ARN of the MSK cluster; used to extract the AWS region and validate configuration. + * @param opaque Opaque Kafka state object that will store the created MSK IAM context. + * + * @returns Pointer to an initialized `flb_aws_msk_iam` context on success, or `NULL` on failure. + */ struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *config, rd_kafka_conf_t *kconf, const char *cluster_arn, @@ -771,6 +799,59 @@ struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *con flb_info("[aws_msk_iam] extracted region: %s", ctx->region); + /* Create TLS instance for AWS credentials (STS) - CRITICAL FIX */ + ctx->cred_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, + FLB_TRUE, + FLB_LOG_DEBUG, + NULL, /* vhost */ + NULL, /* ca_path */ + NULL, /* ca_file */ + NULL, /* crt_file */ + NULL, /* key_file */ + NULL); /* key_passwd */ + if (!ctx->cred_tls) { + flb_error("[aws_msk_iam] failed to create TLS instance for AWS credentials"); + flb_sds_destroy(ctx->region); + flb_sds_destroy(ctx->cluster_arn); + flb_free(ctx); + return NULL; + } + + flb_info("[aws_msk_iam] TLS instance created for AWS credentials"); + + /* Create AWS provider once - will be reused for credential refresh */ + ctx->provider = flb_standard_chain_provider_create(config, + ctx->cred_tls, + ctx->region, + NULL, /* sts_endpoint */ + NULL, /* proxy */ + flb_aws_client_generator(), + NULL); /* profile */ + if (!ctx->provider) { + flb_error("[aws_msk_iam] failed to create AWS credentials provider"); + flb_tls_destroy(ctx->cred_tls); + flb_sds_destroy(ctx->region); + flb_sds_destroy(ctx->cluster_arn); + flb_free(ctx); + return NULL; + } + + /* Initialize provider in sync mode (required before event loop is available) */ + ctx->provider->provider_vtable->sync(ctx->provider); + if (ctx->provider->provider_vtable->init(ctx->provider) != 0) { + flb_error("[aws_msk_iam] failed to initialize AWS credentials provider"); + flb_aws_provider_destroy(ctx->provider); + flb_tls_destroy(ctx->cred_tls); + flb_sds_destroy(ctx->region); + flb_sds_destroy(ctx->cluster_arn); + flb_free(ctx); + return NULL; + } + /* Switch back to async mode */ + ctx->provider->provider_vtable->async(ctx->provider); + + flb_info("[aws_msk_iam] AWS credentials provider created and initialized successfully"); + /* Set the callback and opaque */ rd_kafka_conf_set_oauthbearer_token_refresh_cb(kconf, oauthbearer_token_refresh_cb); flb_kafka_opaque_set(opaque, NULL, ctx); @@ -781,7 +862,14 @@ struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *con return ctx; } -/* Simple destroy - just config cleanup, no AWS provider to leak! */ +/** + * Release all resources associated with an MSK IAM configuration. + * + * Destroys the AWS credentials provider, TLS instance used for credential retrieval, + * region and cluster ARN strings, and frees the context structure. + * + * @param ctx Pointer to the MSK IAM context to destroy. If NULL no action is taken. + */ void flb_aws_msk_iam_destroy(struct flb_aws_msk_iam *ctx) { if (!ctx) { @@ -790,7 +878,17 @@ void flb_aws_msk_iam_destroy(struct flb_aws_msk_iam *ctx) flb_info("[aws_msk_iam] destroying MSK IAM config"); - /* NO AWS provider to destroy! */ + /* Destroy AWS provider (provider manages its own credential caching) */ + if (ctx->provider) { + flb_aws_provider_destroy(ctx->provider); + } + + /* Clean up TLS instance - caller owns TLS lifecycle with flb_standard_chain_provider_create */ + if (ctx->cred_tls) { + flb_tls_destroy(ctx->cred_tls); + } + + /* Clean up other resources */ if (ctx->region) { flb_sds_destroy(ctx->region); } @@ -798,4 +896,6 @@ void flb_aws_msk_iam_destroy(struct flb_aws_msk_iam *ctx) flb_sds_destroy(ctx->cluster_arn); } flb_free(ctx); -} + + flb_info("[aws_msk_iam] MSK IAM config destroyed"); +} \ No newline at end of file