@@ -167,7 +167,8 @@ static void _BGThread_Execute(RunQueueInfo *run_queue_info, RedisAI_RunInfo **ba
167167
168168static RedisAI_RunInfo * * _BGThread_BatchOperations (RunQueueInfo * run_queue_info ,
169169 RedisAI_RunInfo * rinfo ,
170- RedisAI_RunInfo * * batch_rinfo ) {
170+ RedisAI_RunInfo * * batch_rinfo ,
171+ bool * batchReady ) {
171172 // Since the current op can be batched, then we collect info on batching, namely
172173 // - batchsize
173174 // - minbatchsize
@@ -188,11 +189,29 @@ static RedisAI_RunInfo **_BGThread_BatchOperations(RunQueueInfo *run_queue_info,
188189 return batch_rinfo ;
189190 }
190191
192+ // Set the batch to be ready by default (optimistic), change it during run.
193+ * batchReady = true;
194+ bool timeout = false;
195+ // If minbatchsize has been set and we are not past it, we check
196+ // if the timeout for min batch has expired, in which case we proceed
197+ // anyway
198+ if (minbatchsize > 0 && minbatchtimeout > 0 ) {
199+ struct timeval now , sub ;
200+ gettimeofday (& now , NULL );
201+
202+ timersub (& now , & rinfo -> queuingTime , & sub );
203+ size_t time_msec = sub .tv_sec * 1000 + sub .tv_usec / 1000 ;
204+
205+ if (time_msec > minbatchtimeout ) {
206+ timeout = true;
207+ }
208+ }
209+
191210 // Get the next item in the queue
192211 queueItem * next_item = queueFront (run_queue_info -> run_queue );
193212
194213 // While we don't reach the end of the queue
195- while (next_item != NULL ) {
214+ while (next_item != NULL && ! timeout ) {
196215 // Get the next run info
197216 RedisAI_RunInfo * next_rinfo = (RedisAI_RunInfo * )next_item -> value ;
198217
@@ -219,13 +238,6 @@ static RedisAI_RunInfo **_BGThread_BatchOperations(RunQueueInfo *run_queue_info,
219238 continue ;
220239 }
221240
222- // If the new batch size would exceed the prescribed batch
223- // size, then quit searching.
224- // Here we could consider searching further down the queue.
225- if (current_batchsize + next_batchsize > batchsize ) {
226- break ;
227- }
228-
229241 // If all previous checks pass, then keep track of the item
230242 // in the list of evicted items
231243 queueItem * tmp = queueNext (next_item );
@@ -238,11 +250,10 @@ static RedisAI_RunInfo **_BGThread_BatchOperations(RunQueueInfo *run_queue_info,
238250 // there's anything else to batch
239251 current_batchsize += next_batchsize ;
240252
241- // If minbatchsize hasn't been set, or if the current batch
242- // size exceeds the minimum batch size already, then we're done.
243- // Otherwise, if minbatchsize was set and the size wasn't reached,
244- // loop until there's something new on the queue
245- if (minbatchsize == 0 || current_batchsize >= minbatchsize ) {
253+ // If the new batch size would exceed the prescribed batch
254+ // size, then quit searching.
255+ // Here we could consider searching further down the queue.
256+ if (current_batchsize >= batchsize ) {
246257 break ;
247258 }
248259
@@ -257,10 +268,14 @@ static RedisAI_RunInfo **_BGThread_BatchOperations(RunQueueInfo *run_queue_info,
257268 size_t time_msec = sub .tv_sec * 1000 + sub .tv_usec / 1000 ;
258269
259270 if (time_msec > minbatchtimeout ) {
260- break ;
271+ timeout = true ;
261272 }
262273 }
263274 }
275+ if (minbatchsize != 0 && current_batchsize < minbatchsize ) {
276+ // The batch is ready with respect to minbatch only if there was a timeout.
277+ * batchReady = timeout ;
278+ }
264279 return batch_rinfo ;
265280}
266281
@@ -305,7 +320,18 @@ void *RedisAI_Run_ThreadMain(void *arg) {
305320 }
306321
307322 if (currentOpBatchable ) {
308- batch_rinfo = _BGThread_BatchOperations (run_queue_info , rinfo , batch_rinfo );
323+ bool batchReady = true;
324+ batch_rinfo =
325+ _BGThread_BatchOperations (run_queue_info , rinfo , batch_rinfo , & batchReady );
326+ if (!batchReady ) {
327+ // Batch is not ready - batch size didn't match the expectations from
328+ // minbatchsize
329+ for (int i = array_len (batch_rinfo ) - 1 ; i >= 0 ; i -- ) {
330+ queuePush (run_queue_info -> run_queue , batch_rinfo [i ]);
331+ }
332+ // Exit the loop, give a chance to new tasks to submit.
333+ break ;
334+ }
309335 }
310336 // Run the computation step (batched or not)
311337 // We're done with the queue here, items have been evicted so we can
0 commit comments