Skip to content

Commit 1c16247

Browse files
authored
improve the scheduler (#989)
1 parent 6653437 commit 1c16247

File tree

5 files changed

+40
-7
lines changed

5 files changed

+40
-7
lines changed

docs/CN/source/tutorial/api_server_args_zh.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,10 @@ attention类型选择参数
236236

237237
多结果输出模式
238238

239+
.. option:: --schedule_time_interval
240+
241+
调度时间间隔,默认为 ``0.03``,单位为秒
242+
239243

240244
输出约束参数
241245
-----------

docs/EN/source/tutorial/api_server_args_zh.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,10 @@ Scheduling Parameters
236236

237237
Multi-result output mode
238238

239+
.. option:: --schedule_time_interval
240+
241+
Schedule time interval, default is ``0.03``, unit is seconds
242+
239243
Output Constraint Parameters
240244
---------------------------
241245

lightllm/server/api_cli.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,4 +471,10 @@ def make_argument_parser() -> argparse.ArgumentParser:
471471
default=None,
472472
help="""Path of the kv quant calibration config. It can be used for llama and qwen model.""",
473473
)
474+
parser.add_argument(
475+
"--schedule_time_interval",
476+
type=float,
477+
default=0.03,
478+
help="""The interval of the schedule time, default is 30ms.""",
479+
)
474480
return parser

lightllm/server/router/manager.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ def __init__(self, args: StartArgs, router_port, detokenization_port, metric_por
4141
self.nnodes = args.nnodes
4242
self.node_rank = args.node_rank
4343
self.dp_size = args.dp
44+
self.schedule_time_interval = args.schedule_time_interval # 默认30ms 的调度周期
4445
# 兼容多机纯tp的运行模式,这时候 1 // 2 == 0, 需要兼容
4546
self.dp_size_in_node = max(1, args.dp // self.nnodes)
4647
self.is_multinode_tp = args.nnodes > 1 and args.dp == 1
@@ -195,6 +196,14 @@ async def wait_to_model_ready(self):
195196

196197
return
197198

199+
def _get_schedule_time_interval(self):
200+
if self.running_batch is None:
201+
# 没有运行中的 batch 时,每 10ms 触发一次请求调度
202+
return 0.01
203+
204+
# dp 模式,为了更好的配平,需要更长的调度间隔,以便于能收到更多的请求
205+
return self.schedule_time_interval
206+
198207
async def loop_for_fwd(
199208
self,
200209
):
@@ -249,7 +258,7 @@ async def loop_for_fwd(
249258
logger.debug(f"dp_i {dp_i} frozen token num: {frozen_token_num} \n")
250259
logger.debug(f"dp_i {dp_i} estimated_peak_token_count: {estimated_peak_token_count} \n")
251260

252-
await asyncio.sleep(0.03) # 30ms
261+
await asyncio.sleep(self._get_schedule_time_interval())
253262

254263
async def _step(self):
255264
"""

lightllm/server/visualserver/manager.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def __init__(
3030
cache_port,
3131
visual_model_rpc_ports,
3232
):
33-
context = zmq.asyncio.Context(2)
33+
context = zmq.Context(2)
3434
self.send_to_next_module = context.socket(zmq.PUSH) # router or audio server (if --enable_multimodal_audio)
3535
self.send_to_next_module.connect(f"{args.zmq_mode}127.0.0.1:{next_module_port}")
3636

@@ -150,12 +150,22 @@ async def loop_for_fwd(self):
150150
images_need_infer = []
151151

152152
async def loop_for_netio_req(self):
153+
if not hasattr(self, "visual_recv_max_count"):
154+
self.visual_recv_max_count = 64
155+
153156
while True:
154-
recv_req: GroupReqIndexes = await self.recv_from_httpserver.recv_pyobj()
155-
if isinstance(recv_req, GroupReqIndexes):
156-
self.waiting_reqs.append(recv_req)
157-
else:
158-
assert False, f"Error Req Inf {recv_req}"
157+
try:
158+
for _ in range(self.visual_recv_max_count):
159+
recv_req: GroupReqIndexes = self.recv_from_httpserver.recv_pyobj(zmq.NOBLOCK)
160+
if isinstance(recv_req, GroupReqIndexes):
161+
self.waiting_reqs.append(recv_req)
162+
else:
163+
assert False, f"Error Req Inf {recv_req}"
164+
self.visual_recv_max_count = min(self.visual_recv_max_count * 1.3, 256)
165+
except zmq.ZMQError:
166+
# 当队列已经开始清空的时候,将一次接受数量下调
167+
self.visual_recv_max_count = 64
168+
await asyncio.sleep(0.01)
159169

160170
def clean_up(self):
161171
for model_rpc in self.model_rpcs:

0 commit comments

Comments
 (0)