@@ -645,50 +645,54 @@ async def handle_loop(self):
645645 except asyncio .TimeoutError :
646646 pass
647647
648- for group_req_id_ in list (self .req_id_to_out_inf .keys ()):
649- req_status = self .req_id_to_out_inf .get (group_req_id_ , None )
650- if req_status is None :
651- continue
648+ try :
649+ for group_req_id_ in list (self .req_id_to_out_inf .keys ()):
650+ req_status = self .req_id_to_out_inf .get (group_req_id_ , None )
651+ if req_status is None :
652+ continue
652653
653- token_list = []
654- for req in req_status .group_req_objs .shm_req_objs :
655- req_id = req .request_id
656- read_token_count = 1
657- if req .out_tokens_queue .is_full ():
658- read_token_count = LIGHTLLM_OUT_TOKEN_QUEUE_SIZE
659-
660- for _ in range (read_token_count ):
661- if not req .out_tokens_queue .is_empty ():
662-
663- text , src_index , special , count_output_tokens = req .out_tokens_queue .peek ()
664- req .cumlogprob += float (req .shm_logprobs .arr [src_index ])
665- metadata = {
666- "id" : int (req .shm_prompt_ids .arr [src_index ]),
667- "logprob" : float (req .shm_logprobs .arr [src_index ]),
668- "cumlogprob" : float (req .cumlogprob ) / count_output_tokens ,
669- "special" : special ,
670- "count_output_tokens" : count_output_tokens ,
671- "prompt_cache_len" : req .prompt_cache_len ,
672- "mtp_accepted_token_num" : req .mtp_accepted_token_num ,
673- }
674- if self .args .return_all_prompt_logprobs :
675- metadata .update (req .get_all_prompt_metadata ())
676- if self .args .use_reward_model :
677- metadata ["score" ] = float (req .reward_score )
678-
679- req .out_tokens_queue .pop_no_ret ()
680-
681- if req .finish_token_index != src_index :
682- token_list .append ((req_id , text , metadata , FinishStatus ()))
654+ token_list = []
655+ for req in req_status .group_req_objs .shm_req_objs :
656+ req_id = req .request_id
657+ read_token_count = 1
658+ if req .out_tokens_queue .is_full ():
659+ read_token_count = LIGHTLLM_OUT_TOKEN_QUEUE_SIZE
660+
661+ for _ in range (read_token_count ):
662+ if not req .out_tokens_queue .is_empty ():
663+
664+ text , src_index , special , count_output_tokens = req .out_tokens_queue .peek ()
665+ req .cumlogprob += float (req .shm_logprobs .arr [src_index ])
666+ metadata = {
667+ "id" : int (req .shm_prompt_ids .arr [src_index ]),
668+ "logprob" : float (req .shm_logprobs .arr [src_index ]),
669+ "cumlogprob" : float (req .cumlogprob ) / count_output_tokens ,
670+ "special" : special ,
671+ "count_output_tokens" : count_output_tokens ,
672+ "prompt_cache_len" : req .prompt_cache_len ,
673+ "mtp_accepted_token_num" : req .mtp_accepted_token_num ,
674+ }
675+ if self .args .return_all_prompt_logprobs :
676+ metadata .update (req .get_all_prompt_metadata ())
677+ if self .args .use_reward_model :
678+ metadata ["score" ] = float (req .reward_score )
679+
680+ req .out_tokens_queue .pop_no_ret ()
681+
682+ if req .finish_token_index != src_index :
683+ token_list .append ((req_id , text , metadata , FinishStatus ()))
684+ else :
685+ finish_status = FinishStatus (req .finish_status .status )
686+ token_list .append ((req_id , text , metadata , finish_status ))
683687 else :
684- finish_status = FinishStatus ( req . finish_status . status )
685- token_list . append (( req_id , text , metadata , finish_status ))
686- else :
687- break
688-
689- async with req_status . lock :
690- req_status . out_token_info_list . extend ( token_list )
691- req_status . event . set ()
688+ break
689+
690+ async with req_status . lock :
691+ req_status . out_token_info_list . extend ( token_list )
692+ req_status . event . set ()
693+ except BaseException as e :
694+ logger . exception ( str ( e ) )
695+ raise e
692696
693697 self .recycle_event .set ()
694698 return
0 commit comments