Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,32 @@ public static void scanUnusedClient() {
});
unusedClientMap.forEach((bConfig, value) -> value.forEach(e -> {
try {
RpcClientProxy proxy = (RpcClientProxy) e;
// Double-check before closing: ensure the client is still idle to avoid closing a client in use.
// This prevents race condition where getOrCreateClient() gets a client right before it's closed.
if (!isIdleTimeout(bConfig, proxy)) {
// lastUsedNanos was updated, meaning a business thread is using it, try to put it back
Map<String, RpcClientProxy> clientMap = CLUSTER_MAP.get(bConfig);
if (clientMap != null) {
RpcClientProxy existing = clientMap.putIfAbsent(
proxy.getProtocolConfig().toUniqId(), proxy);
if (existing == null) {
// Successfully put back, do not close
logger.info("RpcClient {} rescued from closing due to recent usage",
proxy.getProtocolConfig().toSimpleString());
return;
}
}
// Failed to put back (a new client already exists), still need to close the old one
}
e.close();
} finally {
logger.warn("RpcClient in clusterName={}, naming={}, remove rpc client{}, due to unused time > {} ms",
bConfig.getName(), bConfig.getNamingOptions().getServiceNaming(),
e.getProtocolConfig().toSimpleString(), bConfig.getIdleTimeout());
} catch (Exception ex) {
logger.error("Failed to close RpcClient in clusterName={}, naming={}, client={}",
bConfig.getName(), bConfig.getNamingOptions().getServiceNaming(),
e.getProtocolConfig().toSimpleString(), ex);
}
}));
}
Expand Down Expand Up @@ -322,4 +343,4 @@ public boolean equals(Object obj) {
}
}

}
}