diff --git a/geoengine/workflow.py b/geoengine/workflow.py index fd08f377..65bd8da8 100644 --- a/geoengine/workflow.py +++ b/geoengine/workflow.py @@ -25,7 +25,8 @@ from owslib.util import Authentication, ResponseWrapper from owslib.wcs import WebCoverageService from vega import VegaLite -import websockets +from websockets.asyncio.client import connect +from websockets import protocol, exceptions as ws_exceptions import xarray as xr import pyarrow as pa @@ -554,7 +555,7 @@ async def raster_stream( query_rectangle: QueryRectangle, open_timeout: int = 60, bands: Optional[List[int]] = None # TODO: move into query rectangle? - ) -> AsyncIterator[RasterTile2D]: + ): '''Stream the workflow result as series of RasterTile2D (transformable to numpy and xarray)''' if bands is None: @@ -584,21 +585,21 @@ async def raster_stream( if url is None: raise InputException('Invalid websocket url') - async with websockets.asyncio.client.connect( + async with connect( uri=self.__replace_http_with_ws(url), - extra_headers=session.auth_header, + additional_headers=session.auth_header, open_timeout=open_timeout, max_size=None, ) as websocket: tile_bytes: Optional[bytes] = None - while websocket.state == websockets.protocol.State.OPEN: + while websocket.state == protocol.State.OPEN: async def read_new_bytes() -> Optional[bytes]: # already send the next request to speed up the process try: await websocket.send("NEXT") - except websockets.exceptions.ConnectionClosed: + except ws_exceptions.ConnectionClosed: # the websocket connection is already closed, we cannot read anymore return None @@ -610,7 +611,7 @@ async def read_new_bytes() -> Optional[bytes]: raise GeoEngineException({'error': data}) return data - except websockets.exceptions.ConnectionClosedOK: + except ws_exceptions.ConnectionClosedOK: # the websocket connection closed gracefully, so we stop reading return None @@ -791,21 +792,21 @@ def process_bytes(batch_bytes: Optional[bytes]) -> Optional[gpd.GeoDataFrame]: if url is None: raise InputException('Invalid websocket url') - async with websockets.asyncio.client.connect( + async with connect( uri=self.__replace_http_with_ws(url), - extra_headers=session.auth_header, + additional_headers=session.auth_header, open_timeout=open_timeout, max_size=None, # allow arbitrary large messages, since it is capped by the server's chunk size ) as websocket: batch_bytes: Optional[bytes] = None - while websocket.state == websockets.protocol.State.OPEN: + while websocket.state == protocol.State.OPEN: async def read_new_bytes() -> Optional[bytes]: # already send the next request to speed up the process try: await websocket.send("NEXT") - except websockets.exceptions.ConnectionClosed: + except ws_exceptions.ConnectionClosed: # the websocket connection is already closed, we cannot read anymore return None @@ -817,7 +818,7 @@ async def read_new_bytes() -> Optional[bytes]: raise GeoEngineException({'error': data}) return data - except websockets.exceptions.ConnectionClosedOK: + except ws_exceptions.ConnectionClosedOK: # the websocket connection closed gracefully, so we stop reading return None