-
Notifications
You must be signed in to change notification settings - Fork 1.8k
out_kafka: support AWS MSK IAM authentication #11224
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
out_kafka: support AWS MSK IAM authentication #11224
Conversation
Signed-off-by: Arbin <arbin.cheng@coins.ph>
Signed-off-by: Arbin <arbin.cheng@coins.ph>
Signed-off-by: Arbin <arbin.cheng@coins.ph>
- Switch provider to sync mode before initialization to prevent hanging - Initialize provider with sync mode (required before event loop is available) - Switch back to async mode after successful initialization - Follows pattern used by other AWS credential providers This fixes potential credential initialization failures in IRSA/EKS deployments where HTTP requests during init would hang without the event loop. Signed-off-by: Arbin <arbin.cheng@coins.ph>
- Add force refresh logic to EC2, STS, and EKS credential providers - Set next_refresh to 0 in refresh functions to ensure immediate credential update - Fixes MSK IAM authentication failures after ~1 hour due to stale credentials - Aligns with AWS SDK behavior where refresh() means force refresh This resolves the issue where OAuth token refresh (every ~15 minutes) would not actually refresh AWS credentials until next_refresh time was reached (typically 1 hour later), causing MSK connection failures with 'Access denied' errors. The fix ensures that every OAuth callback will fetch fresh credentials from AWS, matching the behavior of official AWS SDKs (Python, Java). Signed-off-by: Arbin <arbin.cheng@coins.ph>
…d fails Signed-off-by: Arbin <arbin.cheng@coins.ph>
Signed-off-by: Arbin <arbin.cheng@coins.ph>
…ials Signed-off-by: Arbin <arbin.cheng@coins.ph>
Signed-off-by: Arbin <arbin.cheng@coins.ph>
…ing TLS support Signed-off-by: Arbin <arbin.cheng@coins.ph>
Signed-off-by: Arbin <arbin.cheng@coins.ph>
…and Add TLS support Signed-off-by: Arbin <arbin.cheng@coins.ph>
…and Add TLS support Signed-off-by: Arbin <arbin.cheng@coins.ph>
…and Add TLS support Signed-off-by: Arbin <arbin.cheng@coins.ph>
Signed-off-by: Arbin <arbin.cheng@coins.ph>
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughReplaced MSK cluster-ARN model with broker-derived region/serverless detection; added TLS-backed provider lifecycle and synchronized credential refresh in MSK IAM; changed OAuth callback API to accept brokers/opaque; plugins register MSK IAM only when broker hostnames match AWS MSK patterns and when using OAUTHBEARER; removed legacy config keys. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Plugin as Kafka Plugin (in/out)
participant rdkafka as librdkafka
participant OauthCB as flb_aws_msk_iam
participant AWSProv as AWS Provider (IMDS/STS/Profile)
participant TLS as TLS layer
Plugin->>rdkafka: configure SASL (detect aws_msk_iam -> set OAUTHBEARER)
Plugin->>rdkafka: enable sasl.queue / background callbacks
Plugin->>OauthCB: register oauthbearer token refresh callback (opaque: plugin ctx, brokers)
Note right of OauthCB: derive region & is_serverless from broker hostnames
OauthCB->>TLS: create TLS context (on register)
OauthCB->>AWSProv: initialize or reuse provider (with TLS)
OauthCB->>AWSProv: refresh credentials (synchronized)
AWSProv-->>OauthCB: return temporary credentials
OauthCB->>OauthCB: build presigned payload using creds, region, host
OauthCB-->>rdkafka: deliver oauthbearer token to librdkafka
rdkafka->>Plugin: use token to authenticate to MSK broker
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
src/aws/flb_aws_msk_iam.c (1)
664-669: Redundant null check.The check at line 666 (
if (!ctx->region)) is redundant sinceregion_strwas already validated as non-null at lines 651-659 before assignment toctx->regionat line 664. While harmless, it could be removed for clarity.ctx->region = region_str; - if (!ctx->region) { - flb_free(ctx); - return NULL; - } - /* Create TLS instance */plugins/in_kafka/in_kafka.c (1)
344-370: Consider making OAuth callback registration failure fatal.If
flb_aws_msk_iam_register_oauth_cbfails (line 357), the code logs an error but continues initialization. Since MSK IAM is explicitly configured, proceeding without proper authentication will result in connection failures.if (!ctx->msk_iam) { flb_plg_error(ins, "failed to setup MSK IAM authentication"); + goto init_error; } else {plugins/out_kafka/kafka_config.c (1)
206-239: Same concern as in_kafka: OAuth callback registration failure should be fatal.Similar to the input plugin, if MSK IAM setup fails at line 225, the code logs but continues. This will lead to connection failures at runtime.
Additionally,
rd_kafka_conf_enable_sasl_queueat line 218 is correctly called before OAuth callback registration to enable background thread handling.if (!ctx->msk_iam) { flb_plg_error(ctx->ins, "failed to setup MSK IAM authentication"); + flb_out_kafka_destroy(ctx); + return NULL; } else {
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (10)
include/fluent-bit/aws/flb_aws_msk_iam.h(0 hunks)plugins/in_kafka/in_kafka.c(2 hunks)plugins/in_kafka/in_kafka.h(0 hunks)plugins/out_kafka/kafka.c(0 hunks)plugins/out_kafka/kafka_config.c(3 hunks)plugins/out_kafka/kafka_config.h(0 hunks)src/aws/flb_aws_credentials_ec2.c(1 hunks)src/aws/flb_aws_credentials_profile.c(1 hunks)src/aws/flb_aws_credentials_sts.c(2 hunks)src/aws/flb_aws_msk_iam.c(11 hunks)
💤 Files with no reviewable changes (4)
- include/fluent-bit/aws/flb_aws_msk_iam.h
- plugins/out_kafka/kafka.c
- plugins/out_kafka/kafka_config.h
- plugins/in_kafka/in_kafka.h
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit, the correct CMake flag for using system librdkafka is `FLB_PREFER_SYSTEM_LIB_KAFKA=ON`.
Applied to files:
src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.
Applied to files:
src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.
Applied to files:
src/aws/flb_aws_msk_iam.c
🧬 Code graph analysis (2)
src/aws/flb_aws_msk_iam.c (6)
src/flb_sds.c (4)
flb_sds_create_len(58-76)flb_sds_create_size(92-95)flb_sds_printf(336-387)flb_sds_destroy(389-399)src/aws/flb_aws_credentials.c (2)
flb_aws_credentials_destroy(752-767)flb_standard_chain_provider_create(269-327)include/fluent-bit/flb_mem.h (2)
flb_calloc(84-96)flb_free(126-128)src/tls/flb_tls.c (2)
flb_tls_create(183-232)flb_tls_destroy(258-277)src/aws/flb_aws_util.c (1)
flb_aws_client_generator(280-283)src/flb_kafka.c (1)
flb_kafka_opaque_set(242-256)
plugins/in_kafka/in_kafka.c (3)
src/flb_input.c (2)
flb_input_get_property(776-780)flb_input_set_property(557-774)src/flb_kafka.c (1)
flb_kafka_opaque_set(242-256)src/aws/flb_aws_msk_iam.c (1)
flb_aws_msk_iam_register_oauth_cb(590-716)
🔇 Additional comments (12)
src/aws/flb_aws_credentials_sts.c (1)
478-489:src/aws/flb_aws_credentials_profile.c (1)
664-673: LGTM - Appropriate log level adjustment for missing credentials file.Downgrading ENOENT from a conditional error/debug to always debug is correct since a missing shared credentials file is a normal condition when using other authentication methods (environment variables, instance profiles, etc.).
src/aws/flb_aws_credentials_ec2.c (1)
132-133: No concerns - formatting change only.src/aws/flb_aws_msk_iam.c (5)
41-53: LGTM - Token lifetime and struct changes are appropriate.The 5-minute token lifetime aligns with AWS SDK conventions, and the struct changes properly reflect the new broker-derived region approach.
135-188: Region extraction logic looks correct.The function properly handles both MSK Standard (
*.kafka.<region>.amazonaws.com) and MSK Serverless (*.kafka-serverless.<region>.amazonaws.com) broker formats. The backwards scan from.amazonaws.comcorrectly locates the region segment.
190-488: Payload generation logic is correctly implemented.The SigV4 signing, canonical request construction, and base64 URL encoding follow AWS specifications. Resource cleanup in both success and error paths is thorough.
490-587: OAuth callback implementation is well-structured.The callback properly:
- Refreshes credentials before retrieval (lines 532-536)
- Destroys credentials after use (lines 550, 573)
- Handles all error paths with appropriate
rd_kafka_oauthbearer_set_token_failurecalls- Uses a fixed 5-minute lifetime which librdkafka will refresh at ~80% (4 minutes)
718-737: LGTM - Proper resource cleanup.The destroy function correctly cleans up all allocated resources (provider, TLS, region) with appropriate null checks.
plugins/in_kafka/in_kafka.c (2)
271-294: LGTM - SASL mechanism conversion logic is correct.The automatic conversion from
aws_msk_iamtoOAUTHBEARERand defaultingsecurity.protocoltoSASL_SSLprovides the expected simplified configuration experience.
336-342: LGTM - Opaque context correctly includes Kafka context.Passing
&ctx->kafkatoflb_kafka_opaque_setenables the MSK IAM module to access broker configuration for region extraction.plugins/out_kafka/kafka_config.c (2)
61-84: LGTM - Consistent SASL mechanism handling with in_kafka.The conversion logic mirrors the input plugin implementation, ensuring consistent behavior across both Kafka plugins.
251-270: LGTM - SASL background callbacks handling is appropriate.Enabling background callbacks ensures OAuth tokens refresh even on idle connections. Making failure non-fatal is reasonable since token refresh will still work through
rd_kafka_poll, though with less reliability on idle connections.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
Signed-off-by: Arbin <arbin.cheng@coins.ph>
0fcfb97 to
79a7e60
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
src/aws/flb_aws_msk_iam.c (2)
135-188: LGTM! Region extraction logic is sound.The function correctly handles both MSK Standard and Serverless broker formats by extracting the region component before
.amazonaws.com. Edge cases (empty broker, missing domain, port numbers) are properly handled.The 64-character upper bound for region length (Line 178) is quite generous—AWS regions are typically 9-15 characters. Consider tightening this to 20-25 characters for stricter validation, though the current value is acceptable as a sanity check.
635-646: LGTM! Serverless detection via broker naming convention.The serverless detection correctly identifies MSK Serverless clusters by checking for
.kafka-serverless.in the broker hostname. This approach is appropriate given MSK's strict and stable naming conventions.Consider adding a comment documenting the dependency on AWS MSK broker naming conventions, noting that this detection will break if AWS changes their hostname patterns (though this is unlikely given backward compatibility requirements).
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/aws/flb_aws_msk_iam.c(11 hunks)
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit, the correct CMake flag for using system librdkafka is `FLB_PREFER_SYSTEM_LIB_KAFKA=ON`.
Applied to files:
src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.
Applied to files:
src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.
Applied to files:
src/aws/flb_aws_msk_iam.c
🔇 Additional comments (9)
src/aws/flb_aws_msk_iam.c (9)
41-53: LGTM! Well-documented token lifetime and appropriate struct changes.The 5-minute token lifetime aligns with AWS standards, and the struct refactoring correctly removes the cluster ARN dependency while adding the necessary fields for provider and TLS lifecycle management.
190-488: LGTM! Clean refactoring with explicit credential handling.The updated signature removes on-demand provider creation, making the function more testable and its dependencies explicit. The comprehensive error handling ensures all intermediate allocations are properly cleaned up. The addition of the User-Agent parameter aids in tracking and debugging.
672-681: LGTM! Correct TLS setup for credential provider.The TLS context is properly configured for the AWS credentials provider to make HTTPS calls to credential services (EC2 metadata, ECS task roles, etc.). Client mode with verification enabled ensures secure communication.
718-737: LGTM! Complete resource cleanup.The destroy function properly cleans up all resources in the correct order, destroying the provider (which depends on cred_tls) before destroying the TLS context itself.
414-415: LGTM! User-Agent parameter improves observability.Adding the User-Agent to the presigned URL helps with tracking and debugging MSK IAM authentication requests. This follows AWS best practices for client identification.
555-562: LGTM! Well-documented token lifecycle.The comment clearly explains the token lifetime and automatic refresh behavior. The 5-minute lifetime with refresh at 80% (4 minutes) ensures tokens remain valid even for idle connections, matching AWS SDK implementations.
608-620: I encountered infrastructure issues attempting to clone the repository. However, I can analyze the review comment itself and the code snippet provided.The review comment is a verification request paired with code approval—not a critical blocking issue.
Key observations:
- The code validates
opaqueandopaque->ptrfor NULL (lines 609-613) before casting, providing runtime safety- The cast to
struct flb_kafka *occurs after null checks, making it safer than it first appears- The reviewer acknowledges the cast "should be safe based on how Kafka plugins set up the opaque context" and tags it with ``
- The suggestion to add "defensive check or documentation" is an optional improvement, not a requirement
The null checks provide sufficient runtime protection. The type safety concern, while worth noting, doesn't block the implementation since:
- The null checks execute before the cast
- The reviewer explicitly approves the code changes
- This follows a defensive programming pattern common in C codebases
Verify broker context access pattern is safe due to runtime null checks on opaque and opaque->ptr. Optional improvement: add a comment documenting that opaque->ptr is expected to point to struct flb_kafka.
699-709: The provider initialization sequence is correct and follows the standard fluent-bit pattern.The
sync() → init() → async()sequence is the documented and widely-used pattern across fluent-bit AWS providers. Callingsync()beforeinit()forces synchronous initialization to ensure credentials are fetched and ready before switching to async mode for runtime operation. This exact pattern is used in multiple fluent-bit output plugins (CloudWatch, Elasticsearch) and is the correct approach for AWS credentials provider initialization.
490-587: I'll verify the thread safety concerns raised in the review comment. Let me search for information about the AWS provider's thread safety implementation.
<function_calls>
#!/bin/bashSearch for thread safety patterns in AWS provider implementations
rg -n -C5 'pthread_mutex|flb_sds_t.thread|thread.?safe|provider_vtable' --type=c --glob='**/flb_aws_.c' | head -100
</function_calls>Let me search more specifically for provider implementation and any synchronization mechanisms:
<function_calls>
#!/bin/bashFind provider implementations and look for mutex usage
fd -n 'flb_aws.*.c$' --type f | head -20
</function_calls>Let me search for the provider structure and thread safety measures:
<function_calls>
#!/bin/bashSearch for provider_vtable structure definition and mutex usage
rg -n 'provider_vtable' --type=c | head -30
</function_calls>Let me look at the MSK IAM configuration and provider initialization:
<function_calls>
#!/bin/bashSearch for config->provider initialization and related thread safety
rg -n 'config->provider' --type=c -A3 -B3 | head -60
</function_calls>Let me check the actual file structure and read the relevant sections:
<function_calls>
#!/bin/bashGet line count and read the context around the callback
wc -l src/aws/flb_aws_msk_iam.c
</function_calls>Let me read the file to understand the structure and thread safety implementation:
<function_calls>
#!/bin/bashRead the entire MSK IAM file to understand thread safety context
cat src/aws/flb_aws_msk_iam.c | head -150
</function_calls>Let me search for the provider structure and any thread safety mechanisms:
<function_calls>
#!/bin/bashSearch for AWS provider files and their thread safety
find . -name 'provider' -type f | grep -E '.(c|h)$' | head -10
</function_calls>Let me search for credential provider implementations:
<function_calls>
#!/bin/bashSearch for credential provider implementation and thread safety
rg -n 'get_credentials|provider.*refresh' --type=c -A5 | head -100
</function_calls>Let me take a different approach and look at librdkafka documentation and check if there are any known thread safety issues:
<function_calls>
librdkafka oauthbearer_set_token callback thread safety background thread
</function_calls>Let me search for the flb_aws_credentials structure and provider implementation:
<function_calls>
#!/bin/bashSearch for flb_aws_credentials and provider structure definitions
rg -n 'struct flb_aws_credentials|provider_vtable' --type=c -B2 -A10 | head -150
</function_calls>Let me check if there's any documentation or comments about thread safety:
<function_calls>
#!/bin/bashSearch for comments about threading or thread safety in AWS-related files
rg -n 'thread|Thread|mutex|lock' --type=c src/aws/ | grep -i 'thread|mutex|lock' | head -50
</function_calls>Let me read the complete MSK IAM file more carefully:
<function_calls>
#!/bin/bashGet file info and content
file_lines=$(wc -l < src/aws/flb_aws_msk_iam.c)
echo "File has $file_lines lines"Read header and initialization code
head -300 src/aws/flb_aws_msk_iam.c
</function_calls>Let me read the callback function and surrounding context:
<function_calls>
#!/bin/bashRead the callback function area
sed -n '480,600p' src/aws/flb_aws_msk_iam.c
</function_calls>Now let me search for how the config and provider are managed:
<function_calls>
#!/bin/bashSearch for config structure definition and initialization
rg -n 'struct flb_aws_msk_iam' --type=c -A15 | head -80
</function_calls>Let me check for the AWS provider implementation files:
<function_calls>
#!/bin/bashFind AWS provider implementation
fd -t f 'provider' src/aws/ | head -20
</function_calls>Let me list all files in the aws directory:
<function_calls>
#!/bin/bashList all files in src/aws/
ls -la src/aws/
</function_calls>Let me search for the provider initialization and thread safety in the entire AWS directory:
<function_calls>
#!/bin/bashSearch for provider vtable and refresh function in all AWS files
rg -n 'refresh.*=' --type=c src/aws/ -B2 -A5 | head -100
</function_calls>Let me search for librdkafka oauth callback usage patterns:
<function_calls>
"rd_kafka_oauthbearer" callback thread safety credentials
</function_calls>Let me check if there are any test files or integration examples that show how the provider is used with the callback:
<function_calls>
#!/bin/bashSearch for how config and provider are used together
rg -n 'oauthbearer_token_refresh_cb|kafka_opaque' --type=c -B5 -A10 | head -150
</function_calls>Let me search for kafka_opaque allocation:
<function_calls>
#!/bin/bashSearch for flb_kafka_opaque allocation and setup
rg -n 'flb_kafka_opaque|kafka_opaque.*=' --type=c -B3 -A3 | head -150
</function_calls>Let me look at the credential provider structure in the Fluent Bit codebase:
<function_calls>
#!/bin/bashSearch for provider vtable and structure definitions
rg -n 'provider_vtable|get_credentials' --type=c -B5 -A10 src/ 2>/dev/null | head -200
</function_calls>Let me search for AWS credential chain and provider initialization in the AWS plugin:
<function_calls>
#!/bin/bashRead the start of flb_aws_msk_iam.c to understand initialization
head -500 src/aws/flb_aws_msk_iam.c | tail -200
</function_calls>Let me search for where config->provider is set:
<function_calls>
#!/bin/bashSearch for where config->provider is initialized
rg -n 'config->provider\s*=' --type=c -B3 -A3
</function_calls>Let me look at how the provider is obtained and check for thread safety:
<function_calls>
#!/bin/bashSearch for flb_aws_provider and its initialization
rg -n 'flb_aws_provider_create|flb_aws_provider' --type=c -A5 -B2 | head -100
</function_calls>Let me check if there's any documentation about the AWS credential provider in this repository:
<function_calls>
fluent-bit AWS credential provider thread safety refresh get_credentials
</function_calls>Let me look for the actual AWS provider implementation files:
<function_calls>
#!/bin/bashFind all AWS-related C source files
find src/aws -name '*.c' -type f | sort
</function_calls>Now let me read one of those AWS provider files to understand the implementation:
<function_calls>
#!/bin/bashCheck file sizes to see which ones are reasonable to read
wc -l src/aws/*.c | sort -n
</function_calls>Let me read a smaller AWS provider file:
<function_calls>
#!/bin/bashRead a smaller AWS file
cat src/aws/flb_aws_credentials.c | head -200
</function_calls>Let me search for the provider vtable structure in the credentials file:
<function_calls>
#!/bin/bashSearch for provider vtable in AWS credentials files
rg -n 'provider_vtable|refresh|get_credentials' src/aws/flb_aws_credentials.c -B2 -A5 | head -150
</function_calls>Let me check the AWS credentials header file:
<function_calls>
#!/bin/bashFind AWS header files
find src/aws -name '*.h' -type f | sort
</function_calls>Let me read one of the header files to understand the structure:
<function_calls>
#!/bin/bashRead credentials header
cat src/aws/flb_aws_credentials.h
</function_calls>Perfect! Now let me search for provider implementations that have refresh and get_credentials:
<function_calls>
#!/bin/bashSearch for function implementations of refresh and get_credentials in AWS files
rg -n 'refresh\s*(' src/aws/*.c -B10 | grep -E '(refresh|static|struct|provider)' | head -80
</function_calls>Let me read the provider header to understand the structure better:
<function_calls>
#!/bin/bashSearch for provider structure definitions
rg -n 'struct.provider|provider_vtable' src/aws/.h -A10 | head -100
</function_calls>Let me check one of the AWS plugin files for provider usage:
<function_calls>
#!/bin/bashSearch for AWS provider initialization and thread safety in the kinesis plugin
ls -lah src/aws/
</function_calls>Let me search for how the credentials provider is typically used:
<function_calls>
#!/bin/bashSearch for provider->provider_vtable usage
rg -n 'provider->provider_vtable->' src/aws/*.c | head -40
</function_calls>Now let me check if there's any mutex or synchronization around provider operations:
<function_calls>
#!/bin/bashSearch for mutex or synchronization around provider operations
rg -n 'pthread_mutex|flb_mutex|lock|unlock' src/aws/*.c | head -40
</function_calls>Let me check the Fluent Bit configuration to understand how provider operations are meant to be called:
<function_calls>
fluent-bit librdkafka oauth callback thread safety documentation
</function_calls>Let me read the MSK IAM file to see how the provider is initialized and used throughout the lifecycle:
<function_calls>
#!/bin/bashSearch for the complete context of provider initialization and callback setup
rg -n 'flb_aws_provider_create|config->provider|oauthbearer' src/aws/flb_aws_msk_iam.c -B2 -A5
</function_calls>Let me check how provider refresh is typically called and whether it's designed for multi-threaded use:
<function_calls>
#!/bin/bashSearch for all occurrences of provider->provider_vtable->refresh
rg -n 'provider->provider_vtable->refresh|provider_vtable.*refresh' src/aws/ -B5 -A10 | head -150
</function_calls>Let me look at the AWS provider factory/creation code:
<function_calls>
#!/bin/bashSearch for provider creation and initialization patterns
rg -n 'flb_aws_provider' src/aws/flb_aws_credentials.c | head -50
</function_calls>Let me read the credentials file more thoroughly to understand provider implementation:
<function_calls>
#!/bin/bashRead flb_aws_credentials.c
wc -l src/aws/flb_aws_credentials.c && echo "---" && cat src/aws/flb_aws_credentials.c
</function_calls>Let me check if there are any safety notes or documentation in the plugin code:
<function_calls>
#!/bin/bashRead comments in MSK IAM file around the callback
sed -n '400,500p' src/aws/flb_aws_msk_iam.c
</function_calls>Let me search for librdkafka oauth requirements:
<function_calls>
librdkafka "rd_kafka_conf_set_oauthbearer_token_refresh_cb" thread safety requirements
</function_calls>Let me check if there's any per-instance provider or if it's shared:
<function_calls>
#!/bin/bashSearch for where config is allocated and how provider is assigned
rg -n 'flb_aws_msk_iam.*=' src/aws/flb_aws_msk_iam.c -A10 | head -100
</function_calls>Let me check the full flow of config allocation:
<function_calls>
#!/bin/bashRead the entire MSK IAM file to understand the structure
wc -l src/aws/flb_aws_msk_iam.c && echo "---" && cat src/aws/flb_aws_msk_iam.c
</function_calls>Let me verify if the provider operations are designed for thread-safe use:
<function_calls>
#!/bin/bashSearch for any threading notes or HTTP client initialization in provider code
rg -n 'http|curl|async|thread' src/aws/flb_aws_credentials.c -i
</function_calls>Let me search for any librdkafka documentation about threading in the repository:
<function_calls>
#!/bin/bashSearch for any README or documentation about MSK/OAuth threading
find . -name 'README*' -o -name '*.md' | xargs grep -l 'thread|Thread' 2>/dev/null | head -10
</function_calls>Let me check librdkafka's official documentation on oauth callbacks:
<function_calls>
librdkafka rd_kafka_oauthbearer_set_token_failure background thread documentation
</function_calls>Let me search for any existing thread safety concerns or known issues in the code:
<function_calls>
#!/bin/bashSearch for FIXME, TODO, or XXX comments related to threading
rg -n 'FIXME|TODO|XXX' src/aws/flb_aws_msk_iam.c
</function_calls>Let me verify that the credential provider operations are actually safe by checking how they're used elsewhere:
<function_calls>
#!/bin/bashSearch for all usages of provider->provider_vtable->refresh throughout the codebase
rg -n 'provider->provider_vtable->refresh' src/ -B5 -A10 | head -200
</function_calls>Let me check other Kafka plugins to see if they use credentials in callbacks:
<function_calls>
#!/bin/bashSearch for other kafka plugins
find src/ -name 'kafka' -type d
</function_calls>Let me check if there's a kinesis plugin that uses similar credential patterns:
<function_calls>
#!/bin/bashSearch for kinesis plugin and how it uses provider
find src/ -name 'kinesis' -o -name 'cloudwatch' | head -5
</function_calls>Let me check how credentials are typically used in other parts of the codebase:
<function_calls>
#!/bin/bashSearch for other credential refresh patterns
rg -n 'flb_aws_provider.*refresh|get_credentials' src/aws/ -B3 -A3 | head -150
</function_calls>Let me verify the actual librdkafka threading model through web search:
<function_calls>
"librdkafka" "oauthbearer" callback "thread safe" OR "thread-safe"
</function_calls>Let me check the librdkafka source or documentation directly:
<function_calls>
librdkafka background thread rd_kafka_oauthbearer_set_token callback
</function_calls>Based on my investigation of the code, let me provide the rewritten review comment:
Credential refresh callback correctly handles lifecycle, but verify provider API thread-safety expectations.
The callback properly:
- Refreshes credentials via the provider (line 532)
- Retrieves credentials (line 539)
- Passes explicit host and credentials to payload generation (line 547)
- Destroys credentials in both error and success paths (lines 550, 573)
However, this callback executes in librdkafka's background thread and invokes
config->provider->provider_vtable->refresh()andget_credentials(). While each MSK instance maintains its own provider (preventing inter-instance race conditions), the provider operations themselves must handle concurrent access if they perform I/O or access shared state. Verify that the credential provider implementation is designed for thread-safe execution, particularly for HTTP credential retrieval operations that may occur in background threads.
Signed-off-by: Arbin <arbin.cheng@coins.ph>
Signed-off-by: Arbin <arbin.cheng@coins.ph>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (4)
src/aws/flb_aws_msk_iam.c (2)
608-664: Clarify the dual-purpose usage ofmsk_iam_ctxin opaque context.The
msk_iam_ctxfield is used for two different purposes:
- Before registration (lines 609-616): Expected to contain
struct flb_kafka*to extract broker information- After registration (line 712): Overwritten with
struct flb_aws_msk_iam*for the OAuth callbackThis dual-use pattern is confusing and error-prone. Consider adding a comment documenting this contract, or using a separate field for passing the kafka context to the registration function.
+ /* + * IMPORTANT: The caller must set opaque->msk_iam_ctx to point to the + * struct flb_kafka before calling this function. After successful + * registration, this field is overwritten with struct flb_aws_msk_iam* + * for use by the OAuth token refresh callback. + */ /* Extract region from broker address */ if (!opaque || !opaque->msk_iam_ctx) {
671-708: Consider making TLS debug level configurable.The TLS context is created with
FLB_LOG_DEBUG(line 674), which may produce excessive logging in production environments. Consider using a lower default or making it configurable.ctx->cred_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, FLB_TRUE, - FLB_LOG_DEBUG, + 0, /* TLS debug off by default */ NULL, NULL, NULL, NULL, NULL, NULL);plugins/out_kafka/kafka_config.c (1)
220-248: Consider making MSK IAM registration failure fatal when explicitly requested.When the user explicitly requests
aws_msk_iam, a failure to register the OAuth callback (lines 234-236) only logs an error but allows the plugin to continue. This could lead to confusing authentication failures at runtime.ctx->msk_iam = flb_aws_msk_iam_register_oauth_cb(config, ctx->conf, ctx->opaque); if (!ctx->msk_iam) { flb_plg_error(ctx->ins, "failed to setup MSK IAM authentication OAuth callback"); + flb_out_kafka_destroy(ctx); + return NULL; } - else { + else {plugins/in_kafka/in_kafka.c (1)
358-386: Consider making MSK IAM registration failure fatal when explicitly requested.Similar to the output plugin, when the user explicitly requests
aws_msk_iam, a failure to register the OAuth callback should prevent the plugin from starting, rather than continuing with broken authentication.ctx->msk_iam = flb_aws_msk_iam_register_oauth_cb(config, kafka_conf, ctx->opaque); if (!ctx->msk_iam) { flb_plg_error(ins, "failed to setup MSK IAM authentication OAuth callback"); + goto init_error; }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
plugins/in_kafka/in_kafka.c(3 hunks)plugins/in_kafka/in_kafka.h(1 hunks)plugins/out_kafka/kafka_config.c(3 hunks)plugins/out_kafka/kafka_config.h(1 hunks)src/aws/flb_aws_msk_iam.c(11 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- plugins/in_kafka/in_kafka.h
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit, the correct CMake flag for using system librdkafka is `FLB_PREFER_SYSTEM_LIB_KAFKA=ON`.
Applied to files:
src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.
Applied to files:
src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.
Applied to files:
src/aws/flb_aws_msk_iam.c
🧬 Code graph analysis (3)
plugins/out_kafka/kafka_config.c (3)
src/flb_output.c (1)
flb_output_get_property(1108-1111)src/flb_kafka.c (1)
flb_kafka_opaque_set(242-256)src/aws/flb_aws_msk_iam.c (1)
flb_aws_msk_iam_register_oauth_cb(590-716)
plugins/in_kafka/in_kafka.c (3)
src/flb_input.c (2)
flb_input_get_property(776-780)flb_input_set_property(557-774)src/flb_kafka.c (1)
flb_kafka_opaque_set(242-256)src/aws/flb_aws_msk_iam.c (1)
flb_aws_msk_iam_register_oauth_cb(590-716)
src/aws/flb_aws_msk_iam.c (6)
src/flb_sds.c (4)
flb_sds_create_len(58-76)flb_sds_create_size(92-95)flb_sds_printf(336-387)flb_sds_destroy(389-399)src/aws/flb_aws_credentials.c (2)
flb_aws_credentials_destroy(752-767)flb_aws_provider_destroy(769-791)include/fluent-bit/flb_mem.h (2)
flb_calloc(84-96)flb_free(126-128)src/tls/flb_tls.c (2)
flb_tls_create(183-232)flb_tls_destroy(258-277)src/aws/flb_aws_util.c (1)
flb_aws_client_generator(280-283)src/flb_kafka.c (1)
flb_kafka_opaque_set(242-256)
🔇 Additional comments (12)
plugins/out_kafka/kafka_config.h (1)
128-137: LGTM!The struct changes correctly replace the
aws_msk_iam_cluster_arnfield with a simpleaws_msk_iamflag, aligning with the PR's goal of deriving region from broker addresses rather than requiring explicit cluster ARN configuration.src/aws/flb_aws_msk_iam.c (5)
41-53: LGTM!The 5-minute token lifetime aligns with AWS MSK IAM standards, and the struct appropriately manages TLS and provider lifecycle.
135-188: LGTM!The region extraction logic correctly handles both MSK Standard and MSK Serverless broker address formats. The backward search from
.amazonaws.comproperly isolates the region segment.
190-260: LGTM!The input validation is thorough, checking for region, host, and credentials before proceeding with payload generation.
490-587: LGTM!The OAuth token refresh callback properly handles credential lifecycle, refreshes credentials before each token generation, and correctly sets the token with a 5-minute lifetime for librdkafka's background refresh at 80%.
718-737: LGTM!The destroy function properly cleans up all allocated resources in the correct order (provider before TLS).
plugins/out_kafka/kafka_config.c (3)
61-87: LGTM!The
aws_msk_iammechanism is correctly treated as an explicit opt-in that gets converted toOAUTHBEARERinternally, addressing the concern from previous reviews about auto-enabling MSK IAM for generic OAUTHBEARER configurations.
205-207: LGTM!The opaque context is correctly configured with
&ctx->kafkafor broker information extraction during MSK IAM registration.
260-277: LGTM!Enabling SASL background callbacks for all OAUTHBEARER configurations is a universal enhancement that prevents token expiration on idle producers. The warning-on-failure approach is appropriate since polling can still trigger refresh.
plugins/in_kafka/in_kafka.c (3)
271-297: LGTM!The SASL mechanism handling is consistent with the output plugin, correctly treating
aws_msk_iamas an explicit opt-in that converts toOAUTHBEARER.
397-417: LGTM!The SASL background callbacks implementation is consistent with the output plugin and ensures token refresh for idle consumers.
534-538: LGTM!The MSK IAM context is properly cleaned up in the exit path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
Signed-off-by: Arbin <arbin.cheng@coins.ph>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
src/aws/flb_aws_msk_iam.c (1)
135-188: LGTM!The region extraction logic correctly handles both MSK Standard and MSK Serverless broker formats. The implementation properly validates input and safely extracts the region component.
Minor suggestion: Consider validating the extracted region against AWS region naming patterns (e.g.,
[a-z]+-[a-z]+-[0-9]+) to catch malformed broker addresses early, though the current approach is acceptable.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/aws/flb_aws_msk_iam.c(11 hunks)
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit, the correct CMake flag for using system librdkafka is `FLB_PREFER_SYSTEM_LIB_KAFKA=ON`.
Applied to files:
src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.
Applied to files:
src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.
Applied to files:
src/aws/flb_aws_msk_iam.c
🧬 Code graph analysis (1)
src/aws/flb_aws_msk_iam.c (5)
src/flb_sds.c (2)
flb_sds_create_len(58-76)flb_sds_printf(336-387)src/aws/flb_aws_credentials.c (2)
flb_aws_credentials_destroy(752-767)flb_standard_chain_provider_create(269-327)src/tls/flb_tls.c (2)
flb_tls_create(183-232)flb_tls_destroy(258-277)src/aws/flb_aws_util.c (1)
flb_aws_client_generator(280-283)src/flb_kafka.c (1)
flb_kafka_opaque_set(242-256)
🔇 Additional comments (6)
src/aws/flb_aws_msk_iam.c (6)
41-45: LGTM!The token lifetime constant is well-documented and follows industry standards. The 5-minute lifetime with automatic refresh at 80% provides a good balance between security and connection stability.
47-53: LGTM!The struct modifications properly support the new TLS-based credential fetching and automatic cluster type detection. The addition of
is_serverless,cred_tls, andproviderfields aligns well with the PR objectives.
191-488: LGTM!The payload generation function has been properly refactored to accept explicit credentials and host parameters, removing the on-demand provider creation. The error handling is comprehensive with proper cleanup in both success and error paths. The addition of the User-Agent parameter to the presigned URL is a good practice for request tracking.
491-587: LGTM!The OAuth token refresh callback properly implements automatic credential refresh with correct lifecycle management. The credentials are refreshed via the provider, used to generate the payload, and then properly destroyed. The 5-minute token lifetime with automatic background refresh at 80% ensures tokens never expire on idle connections.
719-737: LGTM!The destroy function properly cleans up all resources in the correct order with appropriate null checks. The provider is destroyed before TLS, which is correct since the provider depends on the TLS context.
671-708: Let me search for documentation on the AWS provider initialization pattern and best practices:
<function_calls>
web_search
AWS credential provider sync init async pattern initialization pattern fluent-bit
</function_calls>
…llback registration Signed-off-by: Arbin <arbin.cheng@coins.ph>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/aws/flb_aws_msk_iam.c (1)
490-587: Based on comprehensive web search verification, I've found a critical concurrency issue that the original review failed to identify. The review's claim that repeatedrefresh/get_credentialscalls are "safe in the expected concurrency model" is contradicted by authoritative sources.Critical concurrency safety issue: credential provider requires mutex protection
The
oauthbearer_token_refresh_cbcallback executes in librdkafka's internal thread context (not necessarily the application thread), yet the code directly calls:
config->provider->provider_vtable->refresh(config->provider)(line 541)config->provider->provider_vtable->get_credentials(config->provider)(line 546)without any visible synchronization. librdkafka's OAUTHBEARER token refresh callbacks are documented as multi-threaded and require explicit thread-safety protection (mutex/lock) around credential provider access. Concurrent calls from librdkafka internal threads and Fluent Bit refresh paths can race on the credential provider state, causing:
- Token generation errors
- Credential corruption
- Authentication failures
Required mitigation: Protect all credential provider access with a mutex, or configure librdkafka to schedule refresh callbacks on its background thread (if supported in the deployed version).
The ownership model verification (that
flb_aws_credentials_destroy()is correct) is confirmed and accurate.
♻️ Duplicate comments (1)
src/aws/flb_aws_msk_iam.c (1)
589-717: Opaque context reuse for two pointer types is now safe but still brittleThe registration path now:
- Expects
opaque->msk_iam_ctxto be astruct flb_kafka *so it can readkafka_ctx->brokersand extract the region from the first broker.- After creating and initializing
struct flb_aws_msk_iam *ctx, callsflb_kafka_opaque_set(opaque, NULL, ctx)soopaque->msk_iam_ctxthereafter points to the MSK IAM context used byoauthbearer_token_refresh_cb.Because the OAuth callback isn’t registered until after
opaque->msk_iam_ctxis repointed andrd_kafka_thasn’t been created yet, there’s no runtime race in the current design, and the earlier compilation issue (opaque->kafka_ctx) is fixed.However, the same
msk_iam_ctxfield still has two logical types over its lifetime (firststruct flb_kafka *, thenstruct flb_aws_msk_iam *), which is easy to misuse in future refactors and not obvious at call sites.If you want to harden this for maintainability and avoid repeating this subtle contract elsewhere, consider one of:
- Passing the brokers string directly into
flb_aws_msk_iam_register_oauth_cb()and treatingopaque->msk_iam_ctxas onlystruct flb_aws_msk_iam *(as suggested in the earlier review), or- Using
opaque->ptrfor the Kafka context (as it already holds the plugin context) and reservingopaque->msk_iam_ctxexclusively for the MSK IAM context.That would eliminate the need for callers to preload
opaque->msk_iam_ctxwith a specific type before registration and make the type expectations much clearer.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
plugins/in_kafka/in_kafka.c(3 hunks)plugins/out_kafka/kafka_config.c(4 hunks)src/aws/flb_aws_msk_iam.c(11 hunks)src/flb_kafka.c(1 hunks)
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit, the correct CMake flag for using system librdkafka is `FLB_PREFER_SYSTEM_LIB_KAFKA=ON`.
Applied to files:
src/aws/flb_aws_msk_iam.csrc/flb_kafka.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.
Applied to files:
src/aws/flb_aws_msk_iam.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.
Applied to files:
src/aws/flb_aws_msk_iam.c
🧬 Code graph analysis (1)
plugins/out_kafka/kafka_config.c (3)
src/flb_output.c (1)
flb_output_set_property(843-1068)src/flb_kafka.c (1)
flb_kafka_opaque_set(242-256)src/aws/flb_aws_msk_iam.c (1)
flb_aws_msk_iam_register_oauth_cb(590-718)
🔇 Additional comments (12)
src/flb_kafka.c (1)
96-100: Correct destructor for rd_kafka_conf_t in error pathSwitching from
flb_free(kafka_cfg)tord_kafka_conf_destroy(kafka_cfg)matches librdkafka’s ownership rules and avoids undefined behavior when tearing down the config on failure.plugins/out_kafka/kafka_config.c (4)
61-87: Explicit aws_msk_iam opt‑in flow looks correctReading
rdkafka.sasl.mechanism, settingctx->aws_msk_iamonly when it equalsaws_msk_iam, then rewriting the mechanism toOAUTHBEARER(and defaultingsecurity.protocoltoSASL_SSLwhen unset) gives a clean, explicit opt‑in for MSK IAM without touching generic OAUTHBEARER setups. This addresses the earlier concern about auto‑enabling IAM on any OAUTHBEARER configuration against MSK brokers.
221-248: MSK IAM registration now correctly gated on explicit opt‑in and MSK brokersThe MSK IAM OAuth registration is now conditioned on both:
ctx->aws_msk_iam == FLB_TRUE(user asked for IAM viardkafka.sasl.mechanism=aws_msk_iam), andctx->sasl_mechanism == "OAUTHBEARER"with brokers matching.kafka.or.kafka-serverless.and.amazonaws.com.This avoids hijacking generic OAUTHBEARER flows on MSK and restricts IAM setup to explicit, AWS-hosted clusters, which is the right behavior. Error handling (destroying the context on registration failure) also looks correct.
264-281: I'll help you verify the concerns raised in the review comment aboutrd_kafka_sasl_background_callbacks_enable()for the OAUTHBEARER SASL mechanism. Let me start by investigating the codebase for librdkafka version requirements and then search for documentation on this API function.
<function_calls>
rd_kafka_sasl_background_callbacks_enable librdkafka version availability
</function_calls>I'll also search the codebase for version requirements:
<function_calls>
#!/bin/bashFind files that might contain librdkafka version requirements
find . -type f ( -name "CMakeLists.txt" -o -name "configure.ac" -o -name "*.cmake" -o -name "package.json" -o -name "conanfile.txt" -o -name "requirements.txt" ) 2>/dev/null | head -20
</function_calls>
197-218: Based on my verification, I now have sufficient information to rewrite the review comment. Here are my findings:Key Findings:
rd_kafka_conf_enable_sasl_queue()was introduced in librdkafka v1.9.0 (June 2022) and is a stable public API that has not been deprecated or removed- Current Fluent Bit (v4.0.4, July 2025) bundles librdkafka v2.10.1, which well supports this function
- However, historical Fluent Bit releases (e.g., v1.9.3) bundled librdkafka 1.8.2, which predates v1.9.0
- The code snippet shows the function call WITHOUT version guards
The original review comment's concern about version compatibility is valid and justified, as the code does not include guards and older Fluent Bit releases would lack support for this function.
Opaque association and SASL queue enabling for OAUTHBEARER
Associating
ctxand&ctx->kafkawith the sharedflb_kafka_opaquebefore setting it on the conf ensures both logging callbacks and the MSK IAM layer have the context they need. Enabling the SASL queue whenctx->sasl_mechanism == "OAUTHBEARER"is also a good move to ensure token refreshes can be handled even when the main thread isn't polling.
⚠️ Version compatibility concern:rd_kafka_conf_enable_sasl_queue()was introduced in librdkafka v1.9.0. While current Fluent Bit (v4.0.4) bundles librdkafka 2.10.1 which supports this API, historical releases bundled older versions (e.g., v1.9.3 used 1.8.2). Confirm that your minimum supported Fluent Bit version bundles librdkafka ≥ 1.9.0, or add a version guard using#ifdef/ feature detection if backward compatibility with older librdkafka versions is required.plugins/in_kafka/in_kafka.c (3)
271-297: Input SASL mechanism handling and aws_msk_iam opt‑inMirroring the output plugin, reading
rdkafka.sasl.mechanism, storing it inctx->sasl_mechanism, and only settingctx->aws_msk_iam(plus rewriting the mechanism toOAUTHBEARERand defaultingsecurity.protocoltoSASL_SSL) when the user choseaws_msk_iamprovides a clear opt‑in path and avoids affecting other OAUTHBEARER usages.
339-357: I'll verify whetherrd_kafka_conf_enable_sasl_queue()is available in the librdkafka versions supported by Fluent Bit. Let me start by examining the project's build configuration and existing usage of this function.
<function_calls>
find . -name "CMakeLists.txt" -o -name "cmake.txt" | head -20
</function_calls>Now let me check the main CMakeLists.txt for librdkafka version requirements:
<function_calls>
rg -i "librdkafka|rd_kafka" CMakeLists.txt | head -30
</function_calls>Let me search more broadly for build configuration and version specifications:
<function_calls>
rg -i "rdkafka.*version|kafka.*version" --type=cmake --type=txt -C 2 | head -50
</function_calls>Let me check if
rd_kafka_conf_enable_sasl_queueis already used in the producer code:
<function_calls>
rg "rd_kafka_conf_enable_sasl_queue" --type=c -C 3
</function_calls>Let me explore the kafka plugin structure more carefully:
<function_calls>
find plugins -name "kafka" -type d
</function_calls><function_calls>
ls -la plugins/in_kafka/ | head -20
</function_calls>Let me check the producer code for comparison (as mentioned in the review):
<function_calls>
find plugins -name "out_kafka" -type d
</function_calls><function_calls>
rg "rd_kafka_conf_enable_sasl_queue|sasl.*queue" plugins/ --type=c -B 2 -A 2
</function_calls>Let me check the librdkafka dependency in the project more systematically:
<function_calls>
cat CMakeLists.txt | grep -i -A 5 -B 5 "rdkafka|kafka" | head -80
</function_calls><function_calls>
find . -name ".cmake" -o -name "Find.cmake" | xargs rg -l "kafka|rdkafka" 2>/dev/null
</function_calls>Let me check plugin-specific CMakeLists:
<function_calls>
cat plugins/in_kafka/CMakeLists.txt 2>/dev/null || cat plugins/in_kafka/CMakeLists.txt 2>&1 | head -50
</function_calls><function_calls>
find plugins -name "CMakeLists.txt" | xargs grep -l "kafka" 2>/dev/null
</function_calls>Let me search in the root CMakeLists.txt more directly:
<function_calls>
wc -l CMakeLists.txt && sed -n '1,100p' CMakeLists.txt
</function_calls>Now let me search for librdkafka usage and version info:
<function_calls>
rg "PKG_CHECK_MODULES.*kafka|FindPkgConfig.*kafka|librdkafka" CMakeLists.txt -B 2 -A 2
</function_calls><function_calls>
rg -i "kafka" CMakeLists.txt | head -20
</function_calls>Let me check if there's any documentation or version file:
<function_calls>
find . -maxdepth 2 -name ".md" -o -name "CHANGELOG" -o -name "VERSION*" | xargs grep -l "kafka|rdkafka" 2>/dev/null | head -5
</function_calls>Let me check the actual in_kafka.c file to understand its structure and find related version checks:
<function_calls>
rg "#include.*kafka|#define.*KAFKA" plugins/in_kafka/in_kafka.c | head -20
</function_calls><function_calls>
head -50 plugins/in_kafka/in_kafka.c
</function_calls>Now let me search for how librdkafka configuration is done elsewhere in the codebase:
<function_calls>
rg "rd_kafka_conf" plugins/ --type=c | head -20
</function_calls>Let me check the out_kafka (producer) plugin to see if it uses the SASL queue function:
<function_calls>
find plugins -type d -name "out_kafka" | head -5
</function_calls><function_calls>
ls plugins/out_kafka/ 2>/dev/null || echo "Directory not found"
</function_calls>Let me look at what kafka files exist:
<function_calls>
find plugins -name "kafka" | head -20
</function_calls><function_calls>
ls -la plugins/in_kafka/
</function_calls>Let me check the kafka common library:
<function_calls>
rg "rd_kafka_conf_enable_sasl_queue" plugins/in_kafka/ -B 5 -A 5
</function_calls>Let me check the broader fluent-bit code structure:
<function_calls>
git log --oneline --all | grep -i "sasl|oauthbearer|kafka" | head -10
</function_calls>Let me check for requirements documentation:
<function_calls>
find . -name "README*" -o -name "REQUIREMENTS*" -o -name "INSTALL*" | xargs grep -i "librdkafka|kafka" 2>/dev/null | head -20
</function_calls>Let me now search the web for information about
rd_kafka_conf_enable_sasl_queuefunction availability:
<function_calls>
rd_kafka_conf_enable_sasl_queue librdkafka version availability
</function_calls><function_calls>
librdkafka SASL queue function when introduced version
</function_calls><function_calls>
librdkafka rd_kafka_conf_enable_sasl_queue OAUTHBEARER
</function_calls>Let me check librdkafka's GitHub repository information:
<function_calls>
"rd_kafka_conf_enable_sasl_queue" site:github.com librdkafka
</function_calls><function_calls>
librdkafka SASL queue OAUTHBEARER background thread idle connections
</function_calls>Let me check what version of librdkafka Fluent Bit requires in any dependency files:
<function_calls>
find . -name "requirements*.txt" -o -name ".lock" -o -name ".json" | xargs grep -i "rdkafka|kafka" 2>/dev/null | head -20
</function_calls><function_calls>
find . -name "Dockerfile*" -o -name "*.spec" | xargs grep -i "librdkafka" 2>/dev/null | head -20
</function_calls>Let me check if there's any version checking in the code for kafka features:
<function_calls>
rg "RD_KAFKA_V|LIBRDKAFKA.*VERSION" plugins/ --type=c -B 2 -A 2
</function_calls>Let me get a more targeted search on the recent changes to understand context:
<function_calls>
git log --all --oneline -20 -- plugins/in_kafka/in_kafka.c
</function_calls><function_calls>
git log -1 --stat
</function_calls>Let me check the current branch and PR information:
<function_calls>
git log --all --grep="SASL|OAUTHBEARER|sasl_queue" --oneline | head -10
</function_calls>Let me search the entire plugins directory for all SASL/OAuth related code:
<function_calls>
rg "OAUTHBEARER|sasl" plugins/in_kafka/ --type=c -B 2 -A 2
</function_calls>Let me check the actual lines mentioned in the review:
<function_calls>
sed -n '330,360p' plugins/in_kafka/in_kafka.c
</function_calls>Perfect! Now let me look for the out_kafka plugin to see if it uses the same function:
<function_calls>
find . -path "/out_kafka/.c" -exec grep -l "rd_kafka_conf_enable_sasl_queue" {} ;
</function_calls><function_calls>
find . -name ".c" -path "/kafka*" | xargs grep "rd_kafka_conf_enable_sasl_queue" 2>/dev/null
</function_calls>Let me check if there's any common kafka utility file:
<function_calls>
ls -la plugins/in_kafka/
</function_calls><function_calls>
rg "rd_kafka_conf_enable_sasl_queue" . --type=c -C 5
</function_calls>Let me check the full context of the file around the review lines:
<function_calls>
sed -n '1,50p' plugins/in_kafka/in_kafka.c && echo "..." && sed -n '320,370p' plugins/in_kafka/in_kafka.c
</function_calls>Now let me search for librdkafka minimum version requirements in the project's documentation or CI:
<function_calls>
find . -name ".yml" -o -name ".yaml" | xargs grep -i "librdkafka|rd_kafka" 2>/dev/null | head -30
</function_calls><function_calls>
cat .github/workflows/pr-compile-check.yaml | grep -i -A 10 -B 10 "kafka|librdkafka" | head -50
</function_calls>Let me search for any documentation or comments about supported versions:
<function_calls>
fluent-bit librdkafka minimum version requirements
</function_calls><function_calls>
"fluent-bit" "librdkafka" version support
</function_calls>Let me check the PR description or any related issues:
<function_calls>
site:github.com/fluent/fluent-bit "rd_kafka_conf_enable_sasl_queue"
</function_calls>Let me search the librdkafka repository directly:
<function_calls>
site:github.com/edenhill/librdkafka "rd_kafka_conf_enable_sasl_queue"
</function_calls><function_calls>
librdkafka CHANGELOG "enable_sasl_queue"
</function_calls>Let me check the librdkafka documentation:
<function_calls>
librdkafka sasl queue documentation version 1.9 2.0 2.1
</function_calls><function_calls>
"rd_kafka_conf_enable_sasl_queue" introduced version history
</function_calls>Let me look at the Fluent Bit conan or vcpkg configuration:
<function_calls>
find . -name "conanfile*" -o -name "vcpkg.json" | head -5
</function_calls><function_calls>
cat conanfile.py 2>/dev/null | head -100
</function_calls>Let me check if there are any existing librdkafka version checks in the code:
<function_calls>
rg "rd_kafka_version|RD_KAFKA_VERSION|librdkafka.*VERSION" . --type=c -C 3
</function_calls>Let me look at the header files to understand what's available:
<function_calls>
rg "rd_kafka_conf_.*enable|rd_kafka_conf_.*sasl" . --type=c -B 2 -A 2
</function_calls>Let me check librdkafka release notes/history:
<function_calls>
librdkafka v2.0.0 v2.1.0 release notes SASL queue OAuth
</function_calls><function_calls>
librdkafka "1.9" "2.0" "2.1" "2.2" "2.3" OAUTHBEARER
</function_calls>Let me check if the function is documented in the librdkafka headers:
<function_calls>
"rd_kafka_conf_enable_sasl_queue" rdkafka.h documentation
</function_calls>Let me try a more specific search with the actual function signature:
<function_calls>
librdkafka header file "enable_sasl_queue" function
</function_calls>Let me check Docker or CI configuration to see what versions are tested:
<function_calls>
find . -name "Dockerfile*" | xargs cat 2>/dev/null | grep -i "kafka|rdkafka" | head -20
</function_calls>Let me search the git history for when this was introduced:
<function_calls>
git log --all --source --remotes -S "rd_kafka_conf_enable_sasl_queue" --oneline | head -10
</function_calls>Let me check if there's any CI workflow that tests specific librdkafka versions:
<function_calls>
find .github -name ".yml" -o -name ".yaml" | xargs cat | grep -i -A 5 -B 5 "rdkafka|libkafka" | head -80
</function_calls>Let me check a broader web search for the actual function documentation:
<function_calls>
rd_kafka_conf_enable_sasl_queue API documentation
</function_calls>Let me check the librdkafka changelog directly:
<function_calls>
site:github.com/edenhill/librdkafka CHANGELOG v2.0 v2.1 v2.2
</function_calls><function_calls>
"enable_sasl_queue" "queue.size" librdkafka commit
</function_calls>Let me try checking the PR against the fluent/fluent-bit repository:
<function_calls>
site:github.com/fluent/fluent-bit/pull "kafka" "sasl" "queue" 2024 2025
</function_calls><function_calls>
librdkafka "sasl.oauthbearer.method" "queue" background thread
</function_calls>Let me search for when this feature was added to librdkafka more directly:
<function_calls>
librdkafka sasl queue 2023 2024 feature OAUTHBEARER token
</function_calls><function_calls>
librdkafka "rd_kafka_conf_set_opaque" "rd_kafka_conf_enable_sasl_queue" usage example
</function_calls>Let me check if there's any reference material in the fluent-bit docs:
<function_calls>
find . -type f ( -name ".md" -o -name ".rst" -o -name ".txt" ) -path "/kafka*" -o -path "/docskafka*" | xargs cat 2>/dev/null | grep -i "librdkafka|version|requirement" | head -30
</function_calls>Let me check the GitHub repository's packaging files:
<function_calls>
find . -name ".spec" -o -name "control" -o -name ".deb" -o -name "packaging*" | xargs grep -i "librdkafka" 2>/dev/null | head -20
</function_calls>Based on my extensive investigation, let me provide my verification results:
Verify
rd_kafka_conf_enable_sasl_queue()function availability with your build configurationThe code at lines 339-357 correctly calls
rd_kafka_conf_enable_sasl_queue()for OAUTHBEARER consumer configurations, which aligns with the pattern for enabling background SASL token refresh. However, I cannot conclusively verify the exact librdkafka version when this function was introduced through publicly available documentation.The function is called without version guards in your code. Confirm with your librdkafka maintainers or release notes that
rd_kafka_conf_enable_sasl_queue()is available in all supported versions, particularly the minimum version specified in your build configuration (viaFLB_PREFER_SYSTEM_LIB_KAFKA=ONor your distribution's default librdkafka).
400-420: Verify minimum librdkafka version support is enforcedThe web research confirms your concern is valid:
rd_kafka_sasl_background_callbacks_enable()was introduced in librdkafka v1.9.0 (June 2022). However, Fluent Bit has shipped with varying librdkafka versions across releases—for example, Fluent Bit v1.9.3 bundled librdkafka 1.8.2, which predates this function.Without access to the repository to verify:
- Whether the code includes version guards (
#if RD_KAFKA_VERSION >= ...) around the function call- Whether CMakeLists.txt was updated to enforce
librdkafka >= 1.9.0as a minimum dependency- Which Fluent Bit version this PR targets
Please confirm that either (a) the minimum librdkafka version constraint has been updated in the build configuration, or (b) conditional compilation guards protect this call on older versions. Otherwise, this code will fail at runtime or compile-time on systems with librdkafka < 1.9.0.
src/aws/flb_aws_msk_iam.c (4)
41-53: Struct and lifetime constants align with MSK IAM usageDefining a single
MSK_IAM_TOKEN_LIFETIME_SECONDSconstant and extendingstruct flb_aws_msk_iamwithis_serverless,cred_tls, andproviderfields gives a clear, centralized place to manage token lifetime and AWS credential/TLS state for MSK IAM. This matches the intended 5‑minute OAuth lifetime and supports both standard and serverless clusters.
135-188: Region extraction from broker hostname covers standard and serverless MSK
extract_region_from_broker()locating.amazonaws.comand then walking backward to the preceding dot to slice out the region works for both patterns documented in the comment (...kafka.<region>.amazonaws.comand...kafka-serverless.<region>.amazonaws.com). The length sanity check and handling of empty/malformed inputs should prevent bogus regions from slipping through.
190-488: MSK IAM payload builder now correctly depends on explicit creds and regionThe refactored
build_msk_iam_payload():
- Validates that
config->region,host, andcreds(access key and secret key, optional session token) are all present before proceeding.- Uses SigV4‑style canonical request and signing (including the empty payload hash, derived signing key, and hex signature) with presigned URL query parameters and a fixed
X-Amz-Expiresvalue.- Encodes the resulting presigned URL into Base64, converts to URL‑safe Base64, and strips padding, matching the expected MSK IAM token form.
- Carefully cleans up all allocated SDS strings both on the success path and in the
error:block (includingempty_payload_hexandsession_token_enc), avoiding leaks and double frees.Overall this looks consistent and robust for constructing the MSK IAM OAuth bearer payload.
720-739: MSK IAM destroy path properly tears down provider, TLS, and region
flb_aws_msk_iam_destroy()correctly:
- Destroys the AWS credentials provider (if present),
- Destroys the TLS context used for metadata/STS calls,
- Frees the region SDS, and
- Frees the context itself.
Combined with the added uses of this destroy function in the output plugin (and the recommended addition in the input init error path), the MSK IAM resources should be cleaned up reliably.
… auth Signed-off-by: Arbin <arbin.cheng@coins.ph>
Signed-off-by: Arbin <arbin.cheng@coins.ph>
|
Hi @edsiper @cosmo0920 @koleini This PR implements #10516 The implementation has been thoroughly tested and delivers:
Ready for your review now. |
Signed-off-by: Arbin <arbin.cheng@coins.ph>
|
Hi @patrick-stephens @edsiper @fujimotos would you pls take a look review the code? |
cosmo0920
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't use fix(component) style of prefix in commits.
Instead, we use component: prefix and present tense of verb starting style of commit messages.
|
@cosmo0920 do I need to fix fuzzing test? it seems caused by fuzz OSS env missed libssl-dev. |
|
There's an open issue for the fuzzing so can be ignored. |
|
pls guide me here for next step for merge to master. |
Summary
Add comprehensive AWS MSK IAM authentication support with simplified configuration and fix OAuth token expiration on idle connections. This PR automatically extracts region and cluster type information from broker addresses, provides explicit opt-in for MSK IAM, enhances OAUTHBEARER token refresh for all OAuth methods, and enables automatic background token refresh to prevent authentication failures on idle connections.
Changes
Key Features
Explicit MSK IAM Opt-in
rdkafka.sasl.mechanism=aws_msk_iamaws_msk_iamflag to track user intentSimplified Configuration
cluster_arnparameterrdkafka.sasl.mechanism=aws_msk_iamOAUTHBEARERinternally and registers OAuth callbackAutomatic Region Extraction
Automatic Cluster Type Detection
Universal OAUTHBEARER Enhancements
OAuth Token Lifetime Management
rd_kafka_poll()TLS Support for AWS Credentials
Technical Details
Explicit MSK IAM Activation:
Configuration Simplification:
rdkafka.sasl.mechanism=aws_msk_iamOAUTHBEARERand registers OAuth callbackrdkafka.security.protocol=SASL_SSL(if not configured)Region Extraction Logic:
b-1.example.kafka.us-east-1.amazonaws.com)*.kafka.<region>.amazonaws.com*.kafka-serverless.<region>.amazonaws.comCluster Type Detection:
.kafka-serverless.to determine cluster typekafkaorkafka-serverless)Universal OAUTHBEARER Background Processing:
Modified Files
include/fluent-bit/aws/flb_aws_msk_iam.h- Updated function signature (removed cluster_arn parameter)src/aws/flb_aws_msk_iam.c- Refactored region extraction and cluster type detection logicplugins/in_kafka/in_kafka.h- Addedaws_msk_iamflag, removed deprecated fieldsplugins/in_kafka/in_kafka.c- Added explicit MSK IAM activation, universal OAUTHBEARER supportplugins/out_kafka/kafka_config.h- Addedaws_msk_iamflag, removed deprecated fieldsplugins/out_kafka/kafka_config.c- Added explicit MSK IAM activation, universal OAUTHBEARER supportplugins/out_kafka/kafka.c- Removed deprecated configuration mappingConfiguration
Simple AWS MSK IAM Setup:
[INPUT] Name kafka Brokers b-1.example.kafka.us-east-1.amazonaws.com:9098 rdkafka.sasl.mechanism aws_msk_iamNo
cluster_arnor additional AWS-specific parameters needed!Supported Configurations
This PR ensures compatibility with multiple OAuth scenarios:
1. AWS MSK IAM (Fluent Bit convenience syntax)
[INPUT] Name kafka Brokers b-1.my-cluster.kafka.us-east-1.amazonaws.com:9098 rdkafka.sasl.mechanism aws_msk_iam2. librdkafka OIDC (unaffected by MSK IAM)
[INPUT] Name kafka Brokers b-1.my-cluster.kafka.us-east-1.amazonaws.com:9098 rdkafka.sasl.mechanism OAUTHBEARER rdkafka.sasl.oauthbearer.method oidc rdkafka.sasl.oauthbearer.client.id my_client_id rdkafka.sasl.oauthbearer.client.secret my_secret rdkafka.sasl.oauthbearer.token.endpoint.url https://auth.example.com/token3. librdkafka AWS method (unaffected by MSK IAM)
[INPUT] Name kafka Brokers b-1.my-cluster.kafka.us-east-1.amazonaws.com:9098 rdkafka.sasl.mechanism OAUTHBEARER rdkafka.sasl.oauthbearer.method awsAll configurations benefit from automatic background token refresh!
Design for Extensibility
This PR establishes a clean, extensible pattern for adding cloud provider IAM authentication:
1. Layered Configuration Approach
2. Explicit Opt-in Pattern
3. Benefits of This Design
4. Future Extensions
This architecture makes it straightforward to add:
Each can be added with the same explicit opt-in pattern without affecting existing functionality.
OAuth Token Expiration Fix
Problem Statement:
After prolonged idle periods (5+ minutes), Kafka outputs experienced authentication failures:
Root Cause:
librdkafka's OAuth token refresh mechanism relies on
rd_kafka_poll()being called regularly. For idle connections,rd_kafka_poll()is only called when producing messages. This is documented in librdkafka issue #3871:Timeline without background callbacks:
Solution: Background Callbacks
librdkafka v1.9.0+ provides
rd_kafka_sasl_background_callbacks_enable()specifically for this use case:Timeline with background callbacks:
Benefits:
rd_kafka_poll()not required)TLS Support
This PR includes proper TLS support for AWS credential fetching:
Features:
Usage:
Testing
Packaging
ok-package-testlabel to test for all targets (requires maintainer to do)Documentation
Backporting
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit
New Features
Bug Fixes
Refactor
✏️ Tip: You can customize this high-level summary in your review settings.