diff --git a/airbyte_cdk/sources/declarative/auth/jwt.py b/airbyte_cdk/sources/declarative/auth/jwt.py index 65eb6d1e5..933856c32 100644 --- a/airbyte_cdk/sources/declarative/auth/jwt.py +++ b/airbyte_cdk/sources/declarative/auth/jwt.py @@ -183,6 +183,16 @@ def _get_secret_key(self) -> JwtKeyTypes: """ secret_key: str = self._secret_key.eval(self.config, json_loads=json.loads) + # Normalize escaped newlines for PEM-style keys + # This handles cases where keys are stored with literal \n characters instead of actual newlines + if ( + isinstance(secret_key, str) + and "\\n" in secret_key + and "-----BEGIN" in secret_key + and "KEY-----" in secret_key + ): + secret_key = secret_key.replace("\\n", "\n") + if self._passphrase: passphrase_value = self._passphrase.eval(self.config, json_loads=json.loads) if passphrase_value: diff --git a/unit_tests/sources/declarative/auth/test_jwt.py b/unit_tests/sources/declarative/auth/test_jwt.py index de6d7ac32..35b1efd8e 100644 --- a/unit_tests/sources/declarative/auth/test_jwt.py +++ b/unit_tests/sources/declarative/auth/test_jwt.py @@ -392,3 +392,45 @@ def test_get_request_headers(self, request_option, expected_header_key): } assert authenticator.get_auth_header() == expected_headers + + def test_get_signed_token_with_escaped_newlines_in_pem_key(self): + """Test that JWT signing works with PEM keys containing escaped newlines.""" + # Generate a test RSA private key + private_key = rsa.generate_private_key( + public_exponent=65537, key_size=2048, backend=default_backend() + ) + + # Get the PEM representation with actual newlines + pem_with_newlines = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ).decode() + + # Create a version with escaped newlines (as stored in some systems) + pem_with_escaped_newlines = pem_with_newlines.replace("\n", "\\n") + + # Test with escaped newlines - should work after normalization + authenticator = JwtAuthenticator( + config={}, + parameters={}, + secret_key=pem_with_escaped_newlines, + algorithm="RS256", + token_duration=1000, + typ="JWT", + iss="test_issuer", + ) + + signed_token = authenticator._get_signed_token() + + # Verify the token is valid + assert isinstance(signed_token, str) + assert len(signed_token.split(".")) == 3 + + # Verify we can decode it with the public key + public_key = private_key.public_key() + decoded_payload = jwt.decode(signed_token, public_key, algorithms=["RS256"]) + + assert decoded_payload["iss"] == "test_issuer" + assert "iat" in decoded_payload + assert "exp" in decoded_payload