Skip to content
Closed
33 changes: 26 additions & 7 deletions agentscope-core/src/main/java/io/agentscope/core/ReActAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -680,12 +680,17 @@ public int priority() {
*/
private class HookNotifier {

private List<Hook> getSortedHooks() {
return ReActAgent.this.getSortedHooks();
}

Mono<List<Msg>> notifyPreReasoning(AgentBase agent, List<Msg> msgs) {
PreReasoningEvent event =
new PreReasoningEvent(agent, model.getModelName(), null, msgs);

Mono<PreReasoningEvent> result = Mono.just(event);
for (Hook hook : getSortedHooks()) {
result = result.flatMap(e -> hook.onEvent(e));
result = result.flatMap(hook::onEvent);
}
return result.map(PreReasoningEvent::getInputMessages);
}
Expand All @@ -694,9 +699,10 @@ Mono<Msg> notifyPostReasoning(Msg reasoningMsg) {
PostReasoningEvent event =
new PostReasoningEvent(
ReActAgent.this, model.getModelName(), null, reasoningMsg);

Mono<PostReasoningEvent> result = Mono.just(event);
for (Hook hook : getSortedHooks()) {
result = result.flatMap(e -> hook.onEvent(e));
result = result.flatMap(hook::onEvent);
}
return result.map(PostReasoningEvent::getReasoningMessage);
}
Expand All @@ -705,28 +711,41 @@ Mono<Void> notifyReasoningChunk(Msg chunk, Msg accumulated) {
ReasoningChunkEvent event =
new ReasoningChunkEvent(
ReActAgent.this, model.getModelName(), null, chunk, accumulated);
return Flux.fromIterable(getSortedHooks()).flatMap(hook -> hook.onEvent(event)).then();

Mono<ReasoningChunkEvent> result = Mono.just(event);
for (Hook hook : getSortedHooks()) {
result = result.flatMap(hook::onEvent);
}
return result.then();
}

Mono<ToolUseBlock> notifyPreActing(ToolUseBlock toolUse) {
PreActingEvent event = new PreActingEvent(ReActAgent.this, toolkit, toolUse);

Mono<PreActingEvent> result = Mono.just(event);
for (Hook hook : getSortedHooks()) {
result = result.flatMap(e -> hook.onEvent(e));
result = result.flatMap(hook::onEvent);
}
return result.map(PreActingEvent::getToolUse);
}

Mono<Void> notifyActingChunk(ToolUseBlock toolUse, ToolResultBlock chunk) {
ActingChunkEvent event = new ActingChunkEvent(ReActAgent.this, toolkit, toolUse, chunk);
return Flux.fromIterable(getSortedHooks()).flatMap(hook -> hook.onEvent(event)).then();

Mono<ActingChunkEvent> result = Mono.just(event);
for (Hook hook : getSortedHooks()) {
result = result.flatMap(hook::onEvent);
}
return result.then();
}

Mono<ToolResultBlock> notifyPostActing(ToolUseBlock toolUse, ToolResultBlock toolResult) {
var event = new PostActingEvent(ReActAgent.this, toolkit, toolUse, toolResult);
PostActingEvent event =
new PostActingEvent(ReActAgent.this, toolkit, toolUse, toolResult);

Mono<PostActingEvent> result = Mono.just(event);
for (Hook hook : getSortedHooks()) {
result = result.flatMap(e -> hook.onEvent(e));
result = result.flatMap(hook::onEvent);
}
return result.map(PostActingEvent::getToolResult);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.agentscope.core.state.StateModuleBase;
import io.agentscope.core.tracing.TracerRegistry;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
Expand Down Expand Up @@ -91,7 +92,12 @@ public abstract class AgentBase extends StateModuleBase implements Agent {
private final AtomicBoolean running = new AtomicBoolean(false);
private final boolean checkRunning;
private final List<Hook> hooks;
// Cached sorted hooks (invalidated when hooks list changes)
private transient volatile List<Hook> cachedSortedHooks;
private final AtomicBoolean hooksDirty = new AtomicBoolean(true);

private static final List<Hook> systemHooks = new CopyOnWriteArrayList<>();

private final Map<String, List<AgentBase>> hubSubscribers = new ConcurrentHashMap<>();

// Interrupt state management (available to all agents)
Expand Down Expand Up @@ -240,6 +246,8 @@ protected Mono<Msg> doCall(List<Msg> msgs, Class<?> structuredOutputClass) {
"Structured output not supported by " + getClass().getSimpleName()));
}

// Note: system hooks are applied at agent construction time;
// dynamic system hook changes do not affect existing agents.
public static void addSystemHook(Hook hook) {
systemHooks.add(hook);
}
Expand Down Expand Up @@ -395,22 +403,66 @@ protected Mono<Void> doObserve(Msg msg) {

/**
* Get the list of hooks for this agent.
* Protected to allow subclasses to access hooks for custom notification logic.
*
* @return List of hooks
* <p>Returns an immutable snapshot of the internal hook list.
* Callers must not attempt to modify the returned list.
* To add or remove hooks, use {@link #addHook(Hook)} or
* {@link #removeHook(Hook)}.
*
* <p>This is a breaking change from previous behavior where
* callers could mutate the returned list directly.
*
* @return Immutable list of hooks
*/
public List<Hook> getHooks() {
return hooks;
return List.copyOf(hooks);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also fix here

Image

}
Comment on lines 402 to +419
Copy link

Copilot AI Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Javadoc for getHooks() should be updated to reflect that it now returns an immutable copy rather than the original mutable list. The previous behavior allowed callers to directly manipulate the hooks list, but now returns List.copyOf(hooks) which creates an immutable snapshot.

This is a breaking API change that should be documented so callers understand they can no longer modify the returned list directly and must use the new addHook/removeHook methods instead.

Copilot uses AI. Check for mistakes.

/**
* Add a hook to this agent.
*
* <p>Hooks should generally be added during agent setup,
* before execution begins. Modifying hooks during execution
* is not thread-safe and may lead to undefined behavior.
*
* @param hook Hook to add
*/
public void addHook(Hook hook) {
hooks.add(hook);
hooksDirty.set(true);
}

/**
* Remove a hook from this agent.
*
* <p>Hooks should generally be removed during agent setup.
* Modifying hooks during execution is not thread-safe.
*
* @param hook Hook to remove
*/
public void removeHook(Hook hook) {
hooks.remove(hook);
hooksDirty.set(true);
}

/**
* Get hooks sorted by priority (lower value = higher priority).
* Hooks with the same priority maintain registration order.
*
* <p>Results may be cached until the hook list changes.
*
* @return Sorted list of hooks
*/
protected List<Hook> getSortedHooks() {
return hooks.stream().sorted(java.util.Comparator.comparingInt(Hook::priority)).toList();
if (!hooksDirty.get() && cachedSortedHooks != null) {
return cachedSortedHooks;
}

List<Hook> sorted = hooks.stream().sorted(Comparator.comparingInt(Hook::priority)).toList();

cachedSortedHooks = sorted;
hooksDirty.set(false);
Comment on lines +457 to +464
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concurrent issue here

return sorted;
Comment on lines +457 to +465
Copy link

Copilot AI Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The caching implementation has a race condition. Between checking hooksDirty.get() and reading cachedSortedHooks, another thread could modify the hooks list, set the dirty flag, and clear the cache. This violates the documented single-threaded execution constraint and could cause the method to return a stale or null cached list.

Since the class documentation states agents are not designed for concurrent execution, consider either:

  1. Removing the volatile and atomic constructs to align with the single-threaded design
  2. If thread-safety is needed, implement proper synchronization or use a thread-safe caching pattern
Suggested change
if (!hooksDirty.get() && cachedSortedHooks != null) {
return cachedSortedHooks;
}
List<Hook> sorted = hooks.stream().sorted(Comparator.comparingInt(Hook::priority)).toList();
cachedSortedHooks = sorted;
hooksDirty.set(false);
return sorted;
List<Hook> currentCache = cachedSortedHooks;
if (!hooksDirty.get() && currentCache != null) {
return currentCache;
}
synchronized (this) {
if (!hooksDirty.get() && cachedSortedHooks != null) {
return cachedSortedHooks;
}
List<Hook> sorted = hooks.stream()
.sorted(Comparator.comparingInt(Hook::priority))
.toList();
cachedSortedHooks = sorted;
hooksDirty.set(false);
return sorted;
}

Copilot uses AI. Check for mistakes.
}

/**
Expand Down Expand Up @@ -460,7 +512,11 @@ private Mono<Msg> notifyPostCall(Msg finalMsg) {
*/
private Mono<Void> notifyError(Throwable error) {
ErrorEvent event = new ErrorEvent(this, error);
return Flux.fromIterable(getSortedHooks()).flatMap(hook -> hook.onEvent(event)).then();
Mono<ErrorEvent> result = Mono.just(event);
for (Hook hook : getSortedHooks()) {
result = result.flatMap(hook::onEvent);
}
return result.then();
}

/**
Expand Down Expand Up @@ -626,15 +682,15 @@ private Flux<Event> createEventStream(StreamOptions options, Supplier<Mono<Msg>>
StreamingHook streamingHook = new StreamingHook(sink, options);

// Add temporary hook
hooks.add(streamingHook);
addHook(streamingHook);

// Execute call and manage hook lifecycle
callSupplier
.get()
.doFinally(
signalType -> {
// Remove temporary hook
hooks.remove(streamingHook);
removeHook(streamingHook);
})
.subscribe(
finalMsg -> {
Expand Down
Loading