@@ -391,19 +391,16 @@ void *RedisAI_Run_ThreadMain(void *arg) {
391391
392392 // Run is over, now iterate over the run info structs in the batch
393393 // and see if any error was generated
394- int dagError = 0 ;
394+ int first_dag_error = 0 ;
395395 for (long long i = 0 ; i < array_len (batch_rinfo ); i ++ ) {
396396 RedisAI_RunInfo * rinfo = batch_rinfo [i ];
397- // We lock on the DAG because error could be set from
398- // other threads operating on the same DAG (TODO: use atomic)
399- dagError = __atomic_load_n (rinfo -> dagError , __ATOMIC_RELAXED );
400-
401397 // We record that there was an error for later on
402- run_error = dagError ;
403-
398+ run_error = __atomic_load_n (rinfo -> dagError , __ATOMIC_RELAXED );
399+ if (i == 0 && run_error == 1 )
400+ first_dag_error = 1 ;
404401 // If there was an error and the reference count for the dag
405402 // has gone to zero and the client is still around, we unblock
406- if (dagError ) {
403+ if (run_error ) {
407404 RedisAI_RunInfo * orig = rinfo -> orig_copy ;
408405 long long dagRefCount = RAI_DagRunInfoFreeShallowCopy (rinfo );
409406 if (dagRefCount == 0 ) {
@@ -415,6 +412,10 @@ void *RedisAI_Run_ThreadMain(void *arg) {
415412 __atomic_add_fetch (rinfo -> dagCompleteOpCount , 1 , __ATOMIC_RELAXED );
416413 }
417414 }
415+ if (first_dag_error ) {
416+ run_queue_len = queueLength (run_queue_info -> run_queue );
417+ continue ;
418+ }
418419 }
419420
420421 // We initialize variables where we'll store the fact hat, after the current
0 commit comments