-
Notifications
You must be signed in to change notification settings - Fork 39
Open
Description
As for now it's impossible for me to use KafkaPublish task due to swagger issue. I've added a KafkaPublishTask to my workflow, it looks like this:
def send_kafka_message_task(
workflow_id: str,
test_url: str,
test_path: str,
parameters: list[dict]
) -> kafka_publish.KafkaPublishTask:
kafka_value = {
'id': workflow_id,
'triggeredBy': 'conductor',
'test_url': test_url,
'test_path': test_path,
'parameters': parameters
}
kafka_task_input = kafka_publish_input.KafkaPublishInput(
bootstrap_servers='kafka-broker:29092',
topic='test_kafka_message',
value=kafka_value
)
return kafka_publish.KafkaPublishTask(
task_ref_name='send_kafka_message', kafka_publish_input=kafka_task_input
)
Swagger fails to parse KafkaPublishInput since api client tries to access swagger_types and attribute_map attributes of KafkaPublishInput but there is none.
File "/usr/local/lib/python3.9/site-packages/conductor/client/http/api_client.py", line 194, in sanitize_for_serialization
for attr, _ in six.iteritems(obj.swagger_types)
AttributeError: 'KafkaPublishInput' object has no attribute 'swagger_types'
I propose to add swagger attributes to KafkaPublishInput same as we already have in HttpInput
class KafkaPublishInput:
swagger_types = {
'_bootstrap_servers': 'str',
'_key': 'str',
'_key_serializer': 'str',
'_value': 'str',
'_request_timeout_ms': 'str',
'_max_block_ms': 'str',
'_headers': 'dict[str, Any]',
'_topic': 'str',
}
attribute_map = {
'_bootstrap_servers': 'bootStrapServers',
'_key': 'key',
'_key_serializer': 'keySerializer',
'_value': 'value',
'_request_timeout_ms': 'requestTimeoutMs',
'_max_block_ms': 'maxBlockMs',
'_headers': 'headers',
'_topic': 'topic',
}
...
Metadata
Metadata
Assignees
Labels
No labels