1717from redis .asyncio .cluster import RedisCluster
1818from taskiq import AsyncResultBackend
1919from taskiq .abc .result_backend import TaskiqResult
20+ from taskiq .abc .serializer import TaskiqSerializer
2021
2122from taskiq_redis .exceptions import (
2223 DuplicateExpireTimeSelectedError ,
2324 ExpireTimeMustBeMoreThanZeroError ,
2425 ResultIsMissingError ,
2526)
27+ from taskiq_redis .serializer import PickleSerializer
2628
2729if sys .version_info >= (3 , 10 ):
2830 from typing import TypeAlias
@@ -303,6 +305,7 @@ def __init__(
303305 result_px_time : Optional [int ] = None ,
304306 min_other_sentinels : int = 0 ,
305307 sentinel_kwargs : Optional [Any ] = None ,
308+ serializer : Optional [TaskiqSerializer ] = None ,
306309 ** connection_kwargs : Any ,
307310 ) -> None :
308311 """
@@ -328,6 +331,9 @@ def __init__(
328331 ** connection_kwargs ,
329332 )
330333 self .master_name = master_name
334+ if serializer is None :
335+ serializer = PickleSerializer ()
336+ self .serializer = serializer
331337 self .keep_results = keep_results
332338 self .result_ex_time = result_ex_time
333339 self .result_px_time = result_px_time
@@ -369,7 +375,7 @@ async def set_result(
369375 """
370376 redis_set_params : Dict [str , Union [str , bytes , int ]] = {
371377 "name" : task_id ,
372- "value" : pickle . dumps (result ),
378+ "value" : self . serializer . dumpb (result ),
373379 }
374380 if self .result_ex_time :
375381 redis_set_params ["ex" ] = self .result_ex_time
0 commit comments