Skip to content

Commit 0a296a1

Browse files
committed
0911
1 parent cffa0a0 commit 0a296a1

File tree

7 files changed

+43
-27
lines changed

7 files changed

+43
-27
lines changed

lightllm/models/vit/model.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,8 @@ def encode(self, images: List[ImageItem]):
178178
for i, img in enumerate(images):
179179
if isinstance(img, ImageItem):
180180
uuids.append(img.uuid)
181-
image_data = read_shm(get_shm_name_data(img.uuid))
181+
image_data = img._preload_data
182+
# image_data = read_shm(get_shm_name_data(img.uuid))
182183
image_data = Image.open(BytesIO(image_data))
183184
t = self.load_image_func(image_data, max_num=img.extra_params["image_patch_max_num"])
184185
img_tensors.append(t)

lightllm/server/embed_cache/impl/memory_cache_with_redis.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,21 +36,19 @@ def release(self, ids: list[int]) -> None:
3636
self._records[id_].ref -= 1
3737
if self.redis_cache.query(str(id_)):
3838
self.redis_cache.decr(str(id_))
39-
print(self.redis_cache.stats(), flush=True)
39+
# print(self.redis_cache.stats(), flush=True)
4040

4141
# vit 负责set
4242
def set_items_embed(self, ids: list[int]) -> None:
4343
with self.lock:
4444
for id in ids:
4545
self.redis_cache.insert(str(id))
4646
self._records[id].embed = True
47-
self._records[id].ref -= 1
47+
self._records[id].ref -= 1 # vit端alloc之后ref+1 vit完成后ref-1
4848

4949
def get_items_embed(self, ids: list[int]) -> list[Optional[bool]]:
5050
ret = []
5151
for id in ids:
52-
print(f"id is {id}")
53-
print(f"self.redis_cache.query(str(id)) is {self.redis_cache.query(str(id))}")
5452
exist = self.redis_cache.query(str(id))
5553
ret.append(exist)
5654
if exist:

lightllm/server/embed_cache/impl/naive_memory_cache.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,10 @@ def _clear(self, free_max_count: int):
7878
free_shm(get_shm_name_data(id))
7979
if record.embed:
8080
# 仅vit释放掉afs里的, llm端不做释放
81-
if self.args.run_mode == "visual":
82-
free_afs(get_shm_name_embed(id), self.args.image_embed_dir)
83-
elif not self.args.enable_remote_vit:
81+
# if self.args.run_mode == "visual":
82+
# free_afs(get_shm_name_embed(id), self.args.image_embed_dir)
83+
# elif not self.args.enable_remote_vit:
84+
if not self.args.run_mode == "visual":
8485
free_shm(get_shm_name_embed(id))
8586
del self._md5_to_record[record.md5sum]
8687
del self._records[id]

lightllm/server/embed_cache/utils.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
from pathlib import Path
99
import multiprocessing.shared_memory as shm
1010
from lightllm.utils.envs_utils import get_env_start_args
11+
from lightllm.utils.log_utils import init_logger
12+
13+
logger = init_logger(__name__)
1114

1215

1316
def tensor2bytes(t: torch.Tensor):
@@ -247,7 +250,7 @@ def _md5_to_afs_path(self, md5: str) -> str:
247250
"""Convert md5 to AFS file path."""
248251
if not self.image_embed_dir:
249252
return None
250-
filename = md5 + self.path_ext
253+
filename = self.image_embed_dir + md5 + self.path_ext
251254
return filename
252255

253256
def _delete_afs_files(self, victims: List[str]) -> None:
@@ -260,9 +263,9 @@ def _delete_afs_files(self, victims: List[str]) -> None:
260263
file_path = self._md5_to_afs_path(md5)
261264
if file_path and os.path.exists(file_path):
262265
os.remove(file_path)
263-
print(f"Deleted AFS file: {file_path}")
266+
logger.debug(f"Deleted AFS file: {file_path}")
264267
except Exception as e:
265-
print(f"Warning: Failed to delete AFS file for {md5}: {e}")
268+
logger.debug(f"Warning: Failed to delete AFS file for {md5}: {e}")
266269

267270
# ---------------- Lua scripts ----------------
268271
_INSERT_LUA = r"""
@@ -273,6 +276,7 @@ def _delete_afs_files(self, victims: List[str]) -> None:
273276
local md5 = ARGV[1]
274277
local capacity = tonumber(ARGV[2])
275278
279+
local unpack = unpack or table.unpack
276280
local ref_key = ref_prefix .. md5
277281
if redis.call('GET', ref_key) then
278282
return {0} -- Already exists
@@ -385,7 +389,7 @@ def _delete_afs_files(self, victims: List[str]) -> None:
385389
local now = redis.call('TIME')[1] * 1000
386390
redis.call('ZADD', zset, now, new_md5)
387391
388-
return {1, table.unpack(victims)} -- success + victims
392+
return {1, unpack(victims)} -- success + victims
389393
else
390394
return {0} -- 逐出失败,没有足够的候选
391395
end

lightllm/server/visualserver/manager.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -153,21 +153,28 @@ async def loop_for_fwd(self):
153153
processing_group_reqs = []
154154
images_need_infer = []
155155

156-
def _recv_reqs(self):
156+
async def _recv_reqs(self):
157157
if self.remote_vit:
158158
recv_req: GroupReqIndexes = self.vit_receiver.recv_pyobj(zmq.NOBLOCK)
159159
# recv_req.multimodal_params.images[:]= [
160160
# img for img in recv_req.multimodal_params.images
161161
# if not self.cache_client.root.get_item_embed(img.uuid) # embed已存在的被丢弃 , ref +1
162162
# ]
163+
logger.info(f"Receive req {recv_req.group_req_id}, image_count:{len(recv_req.multimodal_params.images)}")
163164
uuids = [img.uuid for img in recv_req.multimodal_params.images]
164165
already_embed = self.cache_client.root.get_items_embed(uuids)
166+
if all(already_embed):
167+
return None
165168
token_nums = []
166169
for img, embed in zip(recv_req.multimodal_params.images, already_embed):
167170
if not embed:
168171
uuids.append(img.uuid)
169172
token_nums.append(img.token_num)
170-
self.cache_client.root.alloc(uuids, token_nums)
173+
while True:
174+
records = self.cache_client.root.alloc(uuids, token_nums)
175+
if records is not None:
176+
break
177+
await asyncio.sleep(0.1)
171178
return recv_req
172179
else:
173180
return self.vit_receiver.recv_pyobj(zmq.NOBLOCK)
@@ -179,11 +186,11 @@ async def loop_for_netio_req(self):
179186
while True:
180187
try:
181188
for _ in range(self.visual_recv_max_count):
182-
recv_req: GroupReqIndexes = self._recv_reqs()
189+
recv_req: GroupReqIndexes = await self._recv_reqs()
190+
if recv_req is None:
191+
continue
183192
if isinstance(recv_req, GroupReqIndexes):
184-
# print(recv_req, flush=True)
185193
self.waiting_reqs.append(recv_req)
186-
print(f"recv_req.multimodal_params is {recv_req.multimodal_params}")
187194
else:
188195
assert False, f"Error Req Inf {recv_req}"
189196
self.visual_recv_max_count = min(self.visual_recv_max_count * 1.3, 256)
@@ -210,11 +217,21 @@ async def loop_for_fwd_visual_only(self):
210217
images_need_infer.append(img)
211218

212219
if len(images_need_infer) == self.infer_batch_size:
220+
_t0 = time.perf_counter()
213221
await self.infer_imgs(images_need_infer)
222+
logger.info(
223+
f"[visual] batch infer complete, image_count: {len(images_need_infer)}, "
224+
f"elapsed_time {(time.perf_counter()-_t0) * 1000}ms"
225+
)
214226
images_need_infer = []
215227

216228
if len(images_need_infer) > 0:
229+
_t1 = time.perf_counter()
217230
await self.infer_imgs(images_need_infer)
231+
logger.info(
232+
f"[visual] batch infer complete, image_count:{len(images_need_infer)}, "
233+
f"elapsed_time {(time.perf_counter()-_t1) * 1000}ms"
234+
)
218235
images_need_infer = []
219236
# 在这里release这个image,ref-1
220237
logger.info(f"req-id {visual_req.group_req_id} has been release ok")

lightllm/server/visualserver/model_infer/model_rpc.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ def exposed_init_model(self, kvargs):
5050
self.dp_rank_id = kvargs["dp_rank_id"]
5151
self.tp_rank_id = kvargs["tp_rank_id"]
5252
kvargs["vit_rank_id"] = self.dp_rank_id * self.args.visual_tp + self.tp_rank_id
53-
print(cache_port)
5453
self.cache_client = rpyc.connect("localhost", cache_port, config={"allow_pickle": True})
5554

5655
init_vision_distributed_env(kvargs)
@@ -87,9 +86,7 @@ def exposed_init_model(self, kvargs):
8786
else:
8887
raise Exception(f"can not support {self.model_type} now")
8988
self.model.load_model(weight_dir)
90-
print("begin load model")
9189
self.model = self.model.cuda()
92-
print("load model OK")
9390
except Exception as e:
9491
print("#" * 16)
9592
print("load model error:", str(e), e, type(e))
@@ -113,12 +110,11 @@ def exposed_encode(self, images: List[ImageItem]):
113110
all_img_embeds = all_img_embeds.to(torch.device("cpu"))
114111

115112
if self.tp_rank_id == 0:
116-
ready_flags = obtain(self.cache_client.root.get_items_embed(uuids))
117-
print(f"ready_flags is {ready_flags}")
113+
# ready_flags = obtain(self.cache_client.root.get_items_embed(uuids))
118114
ids_to_set = []
119-
for i, ready in enumerate(ready_flags):
120-
if ready:
121-
continue
115+
for i, img in enumerate(images):
116+
# if ready:
117+
# continue
122118
uid = uuids[i]
123119
start, end = valid_ids[i]
124120
cur_embed_bytes = tensor2bytes(all_img_embeds[start:end])

lightllm/server/visualserver/vit_connect.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ def _update_vit_connections(self, id_to_vit_obj: Dict[int, VIT_Obj]):
137137
if id not in self.remote_vit_instances:
138138
try:
139139
socket = self.context.socket(zmq.PUSH)
140-
print(vit_obj.host_ip_port, self.args.remote_vit_port, flush=True)
140+
# print(vit_obj.host_ip_port, self.args.remote_vit_port, flush=True)
141141
ip, port = vit_obj.host_ip_port.split(":")
142142
socket.connect(f"tcp://{ip}:{port}")
143143
self.remote_vit_instances[id] = socket
@@ -223,7 +223,6 @@ async def _wait_visual_embed_ready(self, req: GroupReqIndexes, timeout_seconds:
223223
return
224224

225225
uuids = req.multimodal_params.get_all_uuids()
226-
print(f"uuids is {uuids}")
227226

228227
async def wait_for_embeds():
229228
while not all(self.cache_client.root.get_items_embed(uuids)):

0 commit comments

Comments
 (0)