Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions airbyte_cdk/sources/declarative/auth/jwt.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ def _get_secret_key(self) -> JwtKeyTypes:
"""
secret_key: str = self._secret_key.eval(self.config, json_loads=json.loads)

# Normalize escaped newlines for PEM-style keys
# This handles cases where keys are stored with literal \n characters instead of actual newlines
if isinstance(secret_key, str) and "\\n" in secret_key and "-----BEGIN" in secret_key and "KEY-----" in secret_key:
secret_key = secret_key.replace("\\n", "\n")

if self._passphrase:
passphrase_value = self._passphrase.eval(self.config, json_loads=json.loads)
if passphrase_value:
Expand Down
42 changes: 42 additions & 0 deletions unit_tests/sources/declarative/auth/test_jwt.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,3 +392,45 @@ def test_get_request_headers(self, request_option, expected_header_key):
}

assert authenticator.get_auth_header() == expected_headers

def test_get_signed_token_with_escaped_newlines_in_pem_key(self):
"""Test that JWT signing works with PEM keys containing escaped newlines."""
# Generate a test RSA private key
private_key = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend()
)

# Get the PEM representation with actual newlines
pem_with_newlines = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
).decode()

# Create a version with escaped newlines (as stored in some systems)
pem_with_escaped_newlines = pem_with_newlines.replace("\n", "\\n")

# Test with escaped newlines - should work after normalization
authenticator = JwtAuthenticator(
config={},
parameters={},
secret_key=pem_with_escaped_newlines,
algorithm="RS256",
token_duration=1000,
typ="JWT",
iss="test_issuer",
)

signed_token = authenticator._get_signed_token()

# Verify the token is valid
assert isinstance(signed_token, str)
assert len(signed_token.split(".")) == 3

# Verify we can decode it with the public key
public_key = private_key.public_key()
decoded_payload = jwt.decode(signed_token, public_key, algorithms=["RS256"])

assert decoded_payload["iss"] == "test_issuer"
assert "iat" in decoded_payload
assert "exp" in decoded_payload
Loading