Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
82de062
aws_msk_iam: optimize MSK IAM authentication and credential management
kalavt Nov 26, 2025
219fd90
aws_msk_iam: optimize MSK IAM authentication and credential management
kalavt Nov 26, 2025
e75e4e6
aws_msk_iam: optimize MSK IAM authentication and credential management
kalavt Nov 26, 2025
c9d51a0
aws_msk_iam: initialize AWS provider in sync mode for MSK IAM
kalavt Nov 26, 2025
26551da
aws_msk_iam: force credential refresh in provider refresh functions
kalavt Nov 27, 2025
39dfadc
Merge branch 'fluent:master' into fix/aws-msk-iam-optimization
kalavt Nov 27, 2025
bf5d9f2
aws_msk_iam: Minor leak on empty_payload_hex when canonical request …
kalavt Nov 27, 2025
86999b5
aws_msk_iam: optimize MSK IAM authentication and credential management
kalavt Nov 27, 2025
5bc78d1
aws_msk_iam: AWS MSK IAM authentication failures caused by stale cre…
kalavt Nov 27, 2025
7b30c74
aws_msk_iam: optimize MSK IAM authentication and credential management
kalavt Nov 27, 2025
59f143f
aws_msk_iam: fix auth failures on low traffic and missing TLS
kalavt Nov 28, 2025
c5039c8
aws_msk_iam: Fix potential overflow in md_lifetime_ms on 32‑bit time_t
kalavt Nov 28, 2025
2f6d7fb
aws_msk_iam: fix OAuth token expiration and add TLS support
kalavt Nov 28, 2025
922d8e7
aws_msk_iam: fix OAuth token expiration and add TLS support
kalavt Nov 28, 2025
c865635
aws_msk_iam: fix OAuth token expiration and add TLS support
kalavt Nov 28, 2025
210accd
aws_msk_iam: Fix AWS MSK IAM remove cluster_arn dependency
kalavt Nov 28, 2025
6bfbb57
Merge branch 'fluent:master' into feature/rdkafka-sasl-mechanism-aws-…
kalavt Nov 28, 2025
70280b6
workflows: bump actions/checkout from 4 to 6
dependabot[bot] Dec 3, 2025
7e9845f
workflows: bump actions/setup-python from 5 to 6
dependabot[bot] Dec 3, 2025
4f22878
dockerfile: Docker image to support large page sizes
Nov 15, 2025
e800300
dockerfile: allow customization of FLB_JEMALLOC_OPTIONS
Nov 17, 2025
420eb67
dockerfile: update to Debian Trixie
patrick-stephens Oct 9, 2025
b0e4bbe
dockerfile: install systemd libs from normal repo
patrick-stephens Oct 9, 2025
e73dd18
in_elasticsearch: fix missing http config parameter description (#11221)
eschabell Nov 28, 2025
6bc2b79
in_exec_wasi: fix config key typo 'bool' -> 'oneshot'
eschabell Dec 1, 2025
42def27
in_forward: improve configuration parameter descriptions
eschabell Dec 2, 2025
4248c11
github: scripts: commit_linter: Extend a capability of prefix inferences
cosmo0920 Dec 4, 2025
d6590a0
github: scripts: commit_linter: Fix failing test cases
cosmo0920 Dec 4, 2025
492b655
build: prevent the toolchain from emitting an executable stack
edsiper Dec 5, 2025
1b83e5c
in_forward: fix segfault and double-free in trace path handling
edsiper Dec 4, 2025
dd40eff
in_node_exporter_metrics: Increase buffer size to read /proc/stat cor…
piwai Dec 4, 2025
2ba282f
aws_msk_iam: remove cluster_arn dependency
kalavt Nov 28, 2025
1441be5
aws_msk_iam,in_kafka,out_kafka: enable AWS MSK IAM authentication
kalavt Nov 28, 2025
5956f87
aws_msk_iam,in_kafka,out_kafka: enable AWS MSK IAM authentication
kalavt Nov 28, 2025
86e366f
aws_msk_iam: fix use strlen for non-SDS buffer
kalavt Nov 28, 2025
854b3b0
aws_msk_iam: fix type confusion race in OAuth callback registration
kalavt Nov 28, 2025
8be1297
aws_msk_iam: fix critical concurrency and memory issues
kalavt Nov 28, 2025
c8ffbf4
aws_msk_iam: improve log clarity for cluster detection and token refresh
kalavt Nov 28, 2025
a64f819
aws_msk_iam: support VPC endpoint
kalavt Nov 29, 2025
776e6c5
Merge branch 'fluent:master' into feature/rdkafka-sasl-mechanism-aws-…
kalavt Dec 9, 2025
6e3bf3d
Merge branch 'fluent:master' into feature/rdkafka-sasl-mechanism-aws-…
kalavt Dec 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions include/fluent-bit/aws/flb_aws_msk_iam.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,17 @@ struct flb_msk_iam_cb {

/*
* Register the oauthbearer refresh callback for MSK IAM authentication.
* Parameters:
* - config: Fluent Bit configuration
* - kconf: rdkafka configuration
* - opaque: Kafka opaque context (will be set with MSK IAM context)
* - brokers: Comma-separated list of broker addresses (used to extract AWS region)
* Returns context pointer on success or NULL on failure.
*/
struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *config,
rd_kafka_conf_t *kconf,
const char *cluster_arn,
struct flb_kafka_opaque *opaque);
struct flb_kafka_opaque *opaque,
const char *brokers);
void flb_aws_msk_iam_destroy(struct flb_aws_msk_iam *ctx);

#endif
157 changes: 96 additions & 61 deletions plugins/in_kafka/in_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -268,40 +268,33 @@ static int in_kafka_init(struct flb_input_instance *ins,
return -1;
}

/* Retrieve SASL mechanism if configured */
conf = flb_input_get_property("rdkafka.sasl.mechanism", ins);
if (conf) {
ctx->sasl_mechanism = flb_sds_create(conf);
flb_plg_info(ins, "SASL mechanism configured: %s", ctx->sasl_mechanism);

#ifdef FLB_HAVE_AWS_MSK_IAM
/*
* When MSK IAM auth is enabled, default the required
* security settings so users don't need to specify them.
*/
if (ctx->aws_msk_iam && ctx->aws_msk_iam_cluster_arn) {
conf = flb_input_get_property("rdkafka.security.protocol", ins);
if (!conf) {
flb_input_set_property(ins, "rdkafka.security.protocol", "SASL_SSL");
}

conf = flb_input_get_property("rdkafka.sasl.mechanism", ins);
if (!conf) {
/* Check if using aws_msk_iam as SASL mechanism */
if (strcasecmp(conf, "aws_msk_iam") == 0) {
/* Mark that user explicitly requested AWS MSK IAM */
ctx->aws_msk_iam = FLB_TRUE;

/* Set SASL mechanism to OAUTHBEARER for librdkafka */
flb_input_set_property(ins, "rdkafka.sasl.mechanism", "OAUTHBEARER");
flb_sds_destroy(ctx->sasl_mechanism);
ctx->sasl_mechanism = flb_sds_create("OAUTHBEARER");

/* Ensure security protocol is set */
conf = flb_input_get_property("rdkafka.security.protocol", ins);
if (!conf) {
flb_input_set_property(ins, "rdkafka.security.protocol", "SASL_SSL");
}

flb_plg_info(ins, "AWS MSK IAM authentication enabled via rdkafka.sasl.mechanism");
}
else {
ctx->sasl_mechanism = flb_sds_create(conf);
flb_plg_info(ins, "SASL mechanism configured: %s", ctx->sasl_mechanism);
}
}
else {
#endif

/* Retrieve SASL mechanism if configured */
conf = flb_input_get_property("rdkafka.sasl.mechanism", ins);
if (conf) {
ctx->sasl_mechanism = flb_sds_create(conf);
flb_plg_info(ins, "SASL mechanism configured: %s", ctx->sasl_mechanism);
}

#ifdef FLB_HAVE_AWS_MSK_IAM
}
#endif

kafka_conf = flb_kafka_conf_create(&ctx->kafka, &ins->properties, 1);
if (!kafka_conf) {
Expand Down Expand Up @@ -351,25 +344,45 @@ static int in_kafka_init(struct flb_input_instance *ins,
flb_kafka_opaque_set(ctx->opaque, ctx, NULL);
rd_kafka_conf_set_opaque(kafka_conf, ctx->opaque);

/*
* Enable SASL queue for all OAUTHBEARER configurations.
* This allows librdkafka to handle OAuth token refresh in a background thread,
* which is essential for idle connections or when poll intervals are large.
* This benefits all OAUTHBEARER methods: AWS IAM, OIDC, custom OAuth, etc.
*/
if (ctx->sasl_mechanism && strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) {
rd_kafka_conf_enable_sasl_queue(kafka_conf, 1);
flb_plg_debug(ins, "SASL queue enabled for OAUTHBEARER mechanism");
}

#ifdef FLB_HAVE_AWS_MSK_IAM
if (ctx->aws_msk_iam && ctx->aws_msk_iam_cluster_arn && ctx->sasl_mechanism &&
/* Only register MSK IAM if user explicitly requested it via rdkafka.sasl.mechanism=aws_msk_iam */
if (ctx->aws_msk_iam && ctx->sasl_mechanism &&
strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) {
flb_plg_info(ins, "registering MSK IAM authentication with cluster ARN: %s",
ctx->aws_msk_iam_cluster_arn);
ctx->msk_iam = flb_aws_msk_iam_register_oauth_cb(config,
kafka_conf,
ctx->aws_msk_iam_cluster_arn,
ctx->opaque);
if (!ctx->msk_iam) {
flb_plg_error(ins, "failed to setup MSK IAM authentication");
}
else {
res = rd_kafka_conf_set(kafka_conf, "sasl.oauthbearer.config",
"principal=admin", errstr, sizeof(errstr));
if (res != RD_KAFKA_CONF_OK) {
flb_plg_error(ins,
"failed to set sasl.oauthbearer.config: %s",
errstr);
/* Check if brokers are configured for MSK IAM */
if (ctx->kafka.brokers &&
(strstr(ctx->kafka.brokers, ".kafka.") || strstr(ctx->kafka.brokers, ".kafka-serverless.")) &&
strstr(ctx->kafka.brokers, ".amazonaws.com")) {

/* Register MSK IAM OAuth callback - pass brokers string directly */
flb_plg_info(ins, "registering AWS MSK IAM authentication OAuth callback");
ctx->msk_iam = flb_aws_msk_iam_register_oauth_cb(config,
kafka_conf,
ctx->opaque,
ctx->kafka.brokers);

if (!ctx->msk_iam) {
flb_plg_error(ins, "failed to setup MSK IAM authentication OAuth callback");
goto init_error;
}
else {
res = rd_kafka_conf_set(kafka_conf, "sasl.oauthbearer.config",
"principal=admin", errstr, sizeof(errstr));
if (res != RD_KAFKA_CONF_OK) {
flb_plg_error(ins,
"failed to set sasl.oauthbearer.config: %s",
errstr);
}
}
}
}
Expand All @@ -380,9 +393,36 @@ static int in_kafka_init(struct flb_input_instance *ins,
/* Create Kafka consumer handle */
if (!ctx->kafka.rk) {
flb_plg_error(ins, "Failed to create new consumer: %s", errstr);
/* rd_kafka_new() did NOT take ownership on failure; kafka_conf is
* still valid and will be destroyed by init_error cleanup path. */
goto init_error;
}

/* rd_kafka_new() takes ownership of kafka_conf on success */
kafka_conf = NULL;

/*
* Enable SASL background callbacks for all OAUTHBEARER configurations.
* This ensures OAuth tokens are refreshed automatically even when:
* - Poll intervals are large
* - Topics have no messages
* - Collector is paused
* This benefits all OAUTHBEARER methods: AWS IAM, OIDC, custom OAuth, etc.
*/
if (ctx->sasl_mechanism && strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) {
rd_kafka_error_t *error;
error = rd_kafka_sasl_background_callbacks_enable(ctx->kafka.rk);
if (error) {
flb_plg_warn(ins, "failed to enable SASL background callbacks: %s. "
"OAuth tokens may not refresh during idle periods.",
rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
}
else {
flb_plg_info(ins, "OAUTHBEARER: SASL background callbacks enabled");
}
}

/* Trigger initial token refresh for OAUTHBEARER */
rd_kafka_poll(ctx->kafka.rk, 0);

Expand Down Expand Up @@ -449,15 +489,23 @@ static int in_kafka_init(struct flb_input_instance *ins,
}
if (ctx->kafka.rk) {
rd_kafka_consumer_close(ctx->kafka.rk);
/* rd_kafka_destroy also destroys the conf that was passed to rd_kafka_new */
rd_kafka_destroy(ctx->kafka.rk);
}
else if (kafka_conf) {
/* If rd_kafka was never created, we need to destroy conf manually */
rd_kafka_conf_destroy(kafka_conf);
}
if (ctx->opaque) {
flb_kafka_opaque_destroy(ctx->opaque);
}
else if (kafka_conf) {
/* conf is already destroyed when rd_kafka is initialized */
rd_kafka_conf_destroy(kafka_conf);

#ifdef FLB_HAVE_AWS_MSK_IAM
if (ctx->msk_iam) {
flb_aws_msk_iam_destroy(ctx->msk_iam);
}
#endif

flb_sds_destroy(ctx->sasl_mechanism);
flb_free(ctx);

Expand Down Expand Up @@ -571,19 +619,6 @@ static struct flb_config_map config_map[] = {
"Rely on kafka auto-commit and commit messages in batches"
},

#ifdef FLB_HAVE_AWS_MSK_IAM
{
FLB_CONFIG_MAP_STR, "aws_msk_iam_cluster_arn", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_in_kafka_config, aws_msk_iam_cluster_arn),
"ARN of the MSK cluster when using AWS IAM authentication"
},
{
FLB_CONFIG_MAP_BOOL, "aws_msk_iam", "false",
0, FLB_TRUE, offsetof(struct flb_in_kafka_config, aws_msk_iam),
"Enable AWS MSK IAM authentication"
},
#endif

/* EOF */
{0}
};
Expand Down
3 changes: 1 addition & 2 deletions plugins/in_kafka/in_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,11 @@ struct flb_in_kafka_config {
struct flb_kafka_opaque *opaque;

#ifdef FLB_HAVE_AWS_MSK_IAM
flb_sds_t aws_msk_iam_cluster_arn;
struct flb_aws_msk_iam *msk_iam;
int aws_msk_iam; /* Flag to indicate user explicitly requested AWS MSK IAM */
#endif

/* SASL mechanism configured in rdkafka.sasl.mechanism */
int aws_msk_iam;
flb_sds_t sasl_mechanism;
};

Expand Down
13 changes: 0 additions & 13 deletions plugins/out_kafka/kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -678,19 +678,6 @@ static struct flb_config_map config_map[] = {
"that key will be sent to Kafka."
},

#ifdef FLB_HAVE_AWS_MSK_IAM
{
FLB_CONFIG_MAP_STR, "aws_msk_iam_cluster_arn", NULL,
0, FLB_TRUE, offsetof(struct flb_out_kafka, aws_msk_iam_cluster_arn),
"ARN of the MSK cluster when using AWS IAM authentication"
},
{
FLB_CONFIG_MAP_BOOL, "aws_msk_iam", "false",
0, FLB_TRUE, offsetof(struct flb_out_kafka, aws_msk_iam),
"Enable AWS MSK IAM authentication"
},
#endif

/* EOF */
{0}
};
Expand Down
Loading
Loading