@@ -189,11 +189,29 @@ static RedisAI_RunInfo **_BGThread_BatchOperations(RunQueueInfo *run_queue_info,
189189 return batch_rinfo ;
190190 }
191191
192+ // Set the batch to be ready by default (optimistic), change it during run.
193+ * batchReady = true;
194+ bool timeout = false;
195+ // If minbatchsize has been set and we are not past it, we check
196+ // if the timeout for min batch has expired, in which case we proceed
197+ // anyway
198+ if (minbatchsize > 0 && minbatchtimeout > 0 ) {
199+ struct timeval now , sub ;
200+ gettimeofday (& now , NULL );
201+
202+ timersub (& now , & rinfo -> queuingTime , & sub );
203+ size_t time_msec = sub .tv_sec * 1000 + sub .tv_usec / 1000 ;
204+
205+ if (time_msec > minbatchtimeout ) {
206+ timeout = true;
207+ }
208+ }
209+
192210 // Get the next item in the queue
193211 queueItem * next_item = queueFront (run_queue_info -> run_queue );
194212
195213 // While we don't reach the end of the queue
196- while (next_item != NULL ) {
214+ while (next_item != NULL && ! timeout ) {
197215 // Get the next run info
198216 RedisAI_RunInfo * next_rinfo = (RedisAI_RunInfo * )next_item -> value ;
199217
@@ -257,12 +275,13 @@ static RedisAI_RunInfo **_BGThread_BatchOperations(RunQueueInfo *run_queue_info,
257275 size_t time_msec = sub .tv_sec * 1000 + sub .tv_usec / 1000 ;
258276
259277 if (time_msec > minbatchtimeout ) {
260- break ;
278+ timeout = true ;
261279 }
262280 }
263281 }
264282 if (minbatchsize != 0 && current_batchsize < minbatchsize ) {
265- * batchReady = false;
283+ // The batch is ready with respect to minbatch only if there was a timeout.
284+ * batchReady = timeout ;
266285 }
267286 return batch_rinfo ;
268287}
0 commit comments