diff --git a/CHANGELOG.md b/CHANGELOG.md index d5743e07..c4cfffa6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Fixed +- Managed Streaming Should Handle Throttling Events Correctly - Changed extra's name back to `aio` - Fixed encoding error in `ingest_from_dataframe` when using csv data format. diff --git a/azure-kusto-ingest/azure/kusto/ingest/managed_streaming_ingest_client.py b/azure-kusto-ingest/azure/kusto/ingest/managed_streaming_ingest_client.py index 942a9faf..458cd0a3 100644 --- a/azure-kusto-ingest/azure/kusto/ingest/managed_streaming_ingest_client.py +++ b/azure-kusto-ingest/azure/kusto/ingest/managed_streaming_ingest_client.py @@ -9,7 +9,7 @@ from azure.core.tracing import SpanKind from azure.kusto.data import KustoConnectionStringBuilder -from azure.kusto.data.exceptions import KustoApiError, KustoClosedError +from azure.kusto.data.exceptions import KustoApiError, KustoClosedError, KustoThrottlingError from azure.kusto.data._telemetry import MonitoredActivity from . import BlobDescriptor, FileDescriptor, IngestionProperties, StreamDescriptor @@ -99,7 +99,9 @@ def ingest_from_stream(self, stream_descriptor: Union[StreamDescriptor, IO[AnySt if error.permanent: raise buffered_stream.seek(0, SEEK_SET) - + except KustoThrottlingError: + _ = buffered_stream.seek(0, SEEK_SET) + return self.queued_client.ingest_from_stream(stream_descriptor, ingestion_properties) @distributed_trace(kind=SpanKind.CLIENT) @@ -127,7 +129,9 @@ def ingest_from_blob(self, blob_descriptor: BlobDescriptor, ingestion_properties error = ex.get_api_error() if error.permanent: raise - + except KustoThrottlingError: + pass + return self.queued_client.ingest_from_blob(blob_descriptor, ingestion_properties) def _stream_with_retries(