From a0d541df3f6cd219c63637d10a9a4d3f45622963 Mon Sep 17 00:00:00 2001 From: David Brassely Date: Thu, 14 Aug 2025 15:30:12 +0200 Subject: [PATCH] feat: Make A2AClient agnostic to the transport protocol --- client/pom.xml | 2 +- client/src/main/java/io/a2a/A2A.java | 4 +- .../java/io/a2a/client/A2ACardResolver.java | 4 +- .../main/java/io/a2a/client/A2AClient.java | 325 +++--------------- .../java/io/a2a/client/A2AGrpcClient.java | 200 ----------- .../io/a2a/client/A2ACardResolverTest.java | 4 +- pom.xml | 1 + .../grpc/quarkus/QuarkusGrpcHandler.java | 2 +- .../server/grpc/quarkus/A2ATestResource.java | 2 +- .../server/apps/quarkus/A2AServerRoutes.java | 2 +- .../tasks/BasePushNotificationSender.java | 4 +- .../AbstractA2ARequestHandlerTest.java | 4 +- .../server/apps/common/TestHttpClient.java | 4 +- transport/grpc/pom.xml | 5 + .../grpc/client/EventStreamObserver.java | 16 +- .../transport/grpc/client/GrpcTransport.java | 164 +++++++++ .../grpc/server}/handler/GrpcHandler.java | 2 +- .../io/a2a/grpc/handler/GrpcHandlerTest.java | 1 + transport/jsonrpc/pom.xml | 5 + .../jsonrpc/client}/A2AHttpClient.java | 2 +- .../jsonrpc/client}/A2AHttpResponse.java | 2 +- .../jsonrpc/client/JSONRPCTransport.java | 301 ++++++++++++++++ .../jsonrpc/client}/JdkA2AHttpClient.java | 2 +- .../jsonrpc}/client/sse/SSEEventListener.java | 14 +- .../server}/handler/JSONRPCHandler.java | 2 +- .../jsonrpc/client/JsonStreamingMessages.java | 148 ++++++++ .../client/sse/SSEEventListenerTest.java | 25 +- .../server}/handler/JSONRPCHandlerTest.java | 3 +- transport/spi/pom.xml | 29 ++ .../a2a/transport/spi/client/Transport.java | 109 ++++++ 30 files changed, 857 insertions(+), 531 deletions(-) delete mode 100644 client/src/main/java/io/a2a/client/A2AGrpcClient.java rename client/src/main/java/io/a2a/client/sse/SSEStreamObserver.java => transport/grpc/src/main/java/io/a2a/transport/grpc/client/EventStreamObserver.java (83%) create mode 100644 transport/grpc/src/main/java/io/a2a/transport/grpc/client/GrpcTransport.java rename transport/grpc/src/main/java/io/a2a/{grpc => transport/grpc/server}/handler/GrpcHandler.java (99%) rename {client/src/main/java/io/a2a/http => transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/client}/A2AHttpClient.java (96%) rename {client/src/main/java/io/a2a/http => transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/client}/A2AHttpResponse.java (70%) create mode 100644 transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/client/JSONRPCTransport.java rename {client/src/main/java/io/a2a/http => transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/client}/JdkA2AHttpClient.java (99%) rename {client/src/main/java/io/a2a => transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc}/client/sse/SSEEventListener.java (98%) rename transport/jsonrpc/src/main/java/io/a2a/{jsonrpc => transport/jsonrpc/server}/handler/JSONRPCHandler.java (99%) create mode 100644 transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/client/JsonStreamingMessages.java rename {client/src/test/java/io/a2a => transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc}/client/sse/SSEEventListenerTest.java (97%) rename transport/jsonrpc/src/test/java/io/a2a/{jsonrpc => transport/jsonrpc/server}/handler/JSONRPCHandlerTest.java (99%) create mode 100644 transport/spi/pom.xml create mode 100644 transport/spi/src/main/java/io/a2a/transport/spi/client/Transport.java diff --git a/client/pom.xml b/client/pom.xml index 0a71440b4..bdf20a9c4 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -29,7 +29,7 @@ ${project.groupId} - a2a-java-sdk-spec-grpc + a2a-java-sdk-transport-jsonrpc ${project.version} diff --git a/client/src/main/java/io/a2a/A2A.java b/client/src/main/java/io/a2a/A2A.java index c8dca80e4..d43ee1738 100644 --- a/client/src/main/java/io/a2a/A2A.java +++ b/client/src/main/java/io/a2a/A2A.java @@ -4,8 +4,8 @@ import java.util.Map; import io.a2a.client.A2ACardResolver; -import io.a2a.http.A2AHttpClient; -import io.a2a.http.JdkA2AHttpClient; +import io.a2a.transport.jsonrpc.client.A2AHttpClient; +import io.a2a.transport.jsonrpc.client.JdkA2AHttpClient; import io.a2a.spec.A2AClientError; import io.a2a.spec.A2AClientJSONError; import io.a2a.spec.AgentCard; diff --git a/client/src/main/java/io/a2a/client/A2ACardResolver.java b/client/src/main/java/io/a2a/client/A2ACardResolver.java index 88d1e351f..ab45039d6 100644 --- a/client/src/main/java/io/a2a/client/A2ACardResolver.java +++ b/client/src/main/java/io/a2a/client/A2ACardResolver.java @@ -9,8 +9,8 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; -import io.a2a.http.A2AHttpClient; -import io.a2a.http.A2AHttpResponse; +import io.a2a.transport.jsonrpc.client.A2AHttpClient; +import io.a2a.transport.jsonrpc.client.A2AHttpResponse; import io.a2a.spec.A2AClientError; import io.a2a.spec.A2AClientJSONError; import io.a2a.spec.AgentCard; diff --git a/client/src/main/java/io/a2a/client/A2AClient.java b/client/src/main/java/io/a2a/client/A2AClient.java index 80f8c401c..a61c86da3 100644 --- a/client/src/main/java/io/a2a/client/A2AClient.java +++ b/client/src/main/java/io/a2a/client/A2AClient.java @@ -1,68 +1,24 @@ package io.a2a.client; -import static io.a2a.util.Assert.checkNotNullParam; +import io.a2a.A2A; +import io.a2a.spec.*; +import io.a2a.transport.jsonrpc.client.A2AHttpClient; +import io.a2a.transport.jsonrpc.client.JSONRPCTransport; +import io.a2a.transport.jsonrpc.client.JdkA2AHttpClient; +import io.a2a.transport.spi.client.Transport; -import java.io.IOException; +import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; -import io.a2a.client.sse.SSEEventListener; -import io.a2a.http.A2AHttpClient; -import io.a2a.http.A2AHttpResponse; -import io.a2a.http.JdkA2AHttpClient; -import io.a2a.A2A; -import io.a2a.spec.A2AClientError; -import io.a2a.spec.A2AClientJSONError; -import io.a2a.spec.A2AServerException; -import io.a2a.spec.AgentCard; -import io.a2a.spec.CancelTaskRequest; -import io.a2a.spec.CancelTaskResponse; -import io.a2a.spec.DeleteTaskPushNotificationConfigParams; -import io.a2a.spec.DeleteTaskPushNotificationConfigRequest; -import io.a2a.spec.DeleteTaskPushNotificationConfigResponse; -import io.a2a.spec.GetTaskPushNotificationConfigParams; -import io.a2a.spec.GetTaskPushNotificationConfigRequest; -import io.a2a.spec.GetTaskPushNotificationConfigResponse; -import io.a2a.spec.GetTaskRequest; -import io.a2a.spec.GetTaskResponse; -import io.a2a.spec.JSONRPCError; -import io.a2a.spec.JSONRPCMessage; -import io.a2a.spec.JSONRPCResponse; -import io.a2a.spec.ListTaskPushNotificationConfigParams; -import io.a2a.spec.ListTaskPushNotificationConfigRequest; -import io.a2a.spec.ListTaskPushNotificationConfigResponse; -import io.a2a.spec.MessageSendParams; -import io.a2a.spec.PushNotificationConfig; -import io.a2a.spec.SendMessageRequest; -import io.a2a.spec.SendMessageResponse; -import io.a2a.spec.SendStreamingMessageRequest; -import io.a2a.spec.SetTaskPushNotificationConfigRequest; -import io.a2a.spec.SetTaskPushNotificationConfigResponse; -import io.a2a.spec.StreamingEventKind; -import io.a2a.spec.TaskIdParams; -import io.a2a.spec.TaskPushNotificationConfig; -import io.a2a.spec.TaskQueryParams; -import io.a2a.spec.TaskResubscriptionRequest; -import io.a2a.util.Utils; +import static io.a2a.util.Assert.checkNotNullParam; /** * An A2A client. */ public class A2AClient { - private static final TypeReference SEND_MESSAGE_RESPONSE_REFERENCE = new TypeReference<>() {}; - private static final TypeReference GET_TASK_RESPONSE_REFERENCE = new TypeReference<>() {}; - private static final TypeReference CANCEL_TASK_RESPONSE_REFERENCE = new TypeReference<>() {}; - private static final TypeReference GET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE = new TypeReference<>() {}; - private static final TypeReference SET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE = new TypeReference<>() {}; - private static final TypeReference LIST_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE = new TypeReference<>() {}; - private static final TypeReference DELETE_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE = new TypeReference<>() {}; - private final A2AHttpClient httpClient; - private final String agentUrl; + private Transport transport; private AgentCard agentCard; @@ -74,8 +30,7 @@ public class A2AClient { public A2AClient(AgentCard agentCard) { checkNotNullParam("agentCard", agentCard); this.agentCard = agentCard; - this.agentUrl = agentCard.url(); - this.httpClient = new JdkA2AHttpClient(); + this.transport = new JSONRPCTransport(agentCard.url(), new JdkA2AHttpClient()); } /** @@ -85,8 +40,7 @@ public A2AClient(AgentCard agentCard) { */ public A2AClient(String agentUrl) { checkNotNullParam("agentUrl", agentUrl); - this.agentUrl = agentUrl; - this.httpClient = new JdkA2AHttpClient(); + this.transport = new JSONRPCTransport(agentUrl, new JdkA2AHttpClient()); } /** @@ -145,7 +99,7 @@ public AgentCard getAgentCard(String relativeCardPath, Map authH * @return the response, may contain a message or a task * @throws A2AServerException if sending the message fails for any reason */ - public SendMessageResponse sendMessage(MessageSendParams messageSendParams) throws A2AServerException { + public EventKind sendMessage(MessageSendParams messageSendParams) throws A2AServerException { return sendMessage(null, messageSendParams); } @@ -157,24 +111,8 @@ public SendMessageResponse sendMessage(MessageSendParams messageSendParams) thro * @return the response, may contain a message or a task * @throws A2AServerException if sending the message fails for any reason */ - public SendMessageResponse sendMessage(String requestId, MessageSendParams messageSendParams) throws A2AServerException { - SendMessageRequest.Builder sendMessageRequestBuilder = new SendMessageRequest.Builder() - .jsonrpc(JSONRPCMessage.JSONRPC_VERSION) - .method(SendMessageRequest.METHOD) - .params(messageSendParams); - - if (requestId != null) { - sendMessageRequestBuilder.id(requestId); - } - - SendMessageRequest sendMessageRequest = sendMessageRequestBuilder.build(); - - try { - String httpResponseBody = sendPostRequest(sendMessageRequest); - return unmarshalResponse(httpResponseBody, SEND_MESSAGE_RESPONSE_REFERENCE); - } catch (IOException | InterruptedException e) { - throw new A2AServerException("Failed to send message: " + e, e.getCause()); - } + public EventKind sendMessage(String requestId, MessageSendParams messageSendParams) throws A2AServerException { + return transport.sendMessage(requestId, messageSendParams); } /** @@ -185,7 +123,7 @@ public SendMessageResponse sendMessage(String requestId, MessageSendParams messa * @return the response containing the task * @throws A2AServerException if retrieving the task fails for any reason */ - public GetTaskResponse getTask(String id) throws A2AServerException { + public Task getTask(String id) throws A2AServerException { return getTask(null, new TaskQueryParams(id)); } @@ -197,7 +135,7 @@ public GetTaskResponse getTask(String id) throws A2AServerException { * @return the response containing the task * @throws A2AServerException if retrieving the task fails for any reason */ - public GetTaskResponse getTask(TaskQueryParams taskQueryParams) throws A2AServerException { + public Task getTask(TaskQueryParams taskQueryParams) throws A2AServerException { return getTask(null, taskQueryParams); } @@ -209,24 +147,8 @@ public GetTaskResponse getTask(TaskQueryParams taskQueryParams) throws A2AServer * @return the response containing the task * @throws A2AServerException if retrieving the task fails for any reason */ - public GetTaskResponse getTask(String requestId, TaskQueryParams taskQueryParams) throws A2AServerException { - GetTaskRequest.Builder getTaskRequestBuilder = new GetTaskRequest.Builder() - .jsonrpc(JSONRPCMessage.JSONRPC_VERSION) - .method(GetTaskRequest.METHOD) - .params(taskQueryParams); - - if (requestId != null) { - getTaskRequestBuilder.id(requestId); - } - - GetTaskRequest getTaskRequest = getTaskRequestBuilder.build(); - - try { - String httpResponseBody = sendPostRequest(getTaskRequest); - return unmarshalResponse(httpResponseBody, GET_TASK_RESPONSE_REFERENCE); - } catch (IOException | InterruptedException e) { - throw new A2AServerException("Failed to get task: " + e, e.getCause()); - } + public Task getTask(String requestId, TaskQueryParams taskQueryParams) throws A2AServerException { + return transport.getTask(requestId, taskQueryParams); } /** @@ -236,7 +158,7 @@ public GetTaskResponse getTask(String requestId, TaskQueryParams taskQueryParams * @return the response indicating if the task was cancelled * @throws A2AServerException if cancelling the task fails for any reason */ - public CancelTaskResponse cancelTask(String id) throws A2AServerException { + public Task cancelTask(String id) throws A2AServerException { return cancelTask(null, new TaskIdParams(id)); } @@ -247,7 +169,7 @@ public CancelTaskResponse cancelTask(String id) throws A2AServerException { * @return the response indicating if the task was cancelled * @throws A2AServerException if cancelling the task fails for any reason */ - public CancelTaskResponse cancelTask(TaskIdParams taskIdParams) throws A2AServerException { + public Task cancelTask(TaskIdParams taskIdParams) throws A2AServerException { return cancelTask(null, taskIdParams); } @@ -259,24 +181,8 @@ public CancelTaskResponse cancelTask(TaskIdParams taskIdParams) throws A2AServer * @return the response indicating if the task was cancelled * @throws A2AServerException if retrieving the task fails for any reason */ - public CancelTaskResponse cancelTask(String requestId, TaskIdParams taskIdParams) throws A2AServerException { - CancelTaskRequest.Builder cancelTaskRequestBuilder = new CancelTaskRequest.Builder() - .jsonrpc(JSONRPCMessage.JSONRPC_VERSION) - .method(CancelTaskRequest.METHOD) - .params(taskIdParams); - - if (requestId != null) { - cancelTaskRequestBuilder.id(requestId); - } - - CancelTaskRequest cancelTaskRequest = cancelTaskRequestBuilder.build(); - - try { - String httpResponseBody = sendPostRequest(cancelTaskRequest); - return unmarshalResponse(httpResponseBody, CANCEL_TASK_RESPONSE_REFERENCE); - } catch (IOException | InterruptedException e) { - throw new A2AServerException("Failed to cancel task: " + e, e.getCause()); - } + public Task cancelTask(String requestId, TaskIdParams taskIdParams) throws A2AServerException { + return transport.cancelTask(requestId, taskIdParams); } /** @@ -286,7 +192,7 @@ public CancelTaskResponse cancelTask(String requestId, TaskIdParams taskIdParams * @return the response containing the push notification configuration * @throws A2AServerException if getting the push notification configuration fails for any reason */ - public GetTaskPushNotificationConfigResponse getTaskPushNotificationConfig(String taskId) throws A2AServerException { + public TaskPushNotificationConfig getTaskPushNotificationConfig(String taskId) throws A2AServerException { return getTaskPushNotificationConfig(null, new GetTaskPushNotificationConfigParams(taskId)); } @@ -298,7 +204,7 @@ public GetTaskPushNotificationConfigResponse getTaskPushNotificationConfig(Strin * @return the response containing the push notification configuration * @throws A2AServerException if getting the push notification configuration fails for any reason */ - public GetTaskPushNotificationConfigResponse getTaskPushNotificationConfig(String taskId, String pushNotificationConfigId) throws A2AServerException { + public TaskPushNotificationConfig getTaskPushNotificationConfig(String taskId, String pushNotificationConfigId) throws A2AServerException { return getTaskPushNotificationConfig(null, new GetTaskPushNotificationConfigParams(taskId, pushNotificationConfigId)); } @@ -309,7 +215,7 @@ public GetTaskPushNotificationConfigResponse getTaskPushNotificationConfig(Strin * @return the response containing the push notification configuration * @throws A2AServerException if getting the push notification configuration fails for any reason */ - public GetTaskPushNotificationConfigResponse getTaskPushNotificationConfig(GetTaskPushNotificationConfigParams getTaskPushNotificationConfigParams) throws A2AServerException { + public TaskPushNotificationConfig getTaskPushNotificationConfig(GetTaskPushNotificationConfigParams getTaskPushNotificationConfigParams) throws A2AServerException { return getTaskPushNotificationConfig(null, getTaskPushNotificationConfigParams); } @@ -321,24 +227,8 @@ public GetTaskPushNotificationConfigResponse getTaskPushNotificationConfig(GetTa * @return the response containing the push notification configuration * @throws A2AServerException if getting the push notification configuration fails for any reason */ - public GetTaskPushNotificationConfigResponse getTaskPushNotificationConfig(String requestId, GetTaskPushNotificationConfigParams getTaskPushNotificationConfigParams) throws A2AServerException { - GetTaskPushNotificationConfigRequest.Builder getTaskPushNotificationRequestBuilder = new GetTaskPushNotificationConfigRequest.Builder() - .jsonrpc(JSONRPCMessage.JSONRPC_VERSION) - .method(GetTaskPushNotificationConfigRequest.METHOD) - .params(getTaskPushNotificationConfigParams); - - if (requestId != null) { - getTaskPushNotificationRequestBuilder.id(requestId); - } - - GetTaskPushNotificationConfigRequest getTaskPushNotificationRequest = getTaskPushNotificationRequestBuilder.build(); - - try { - String httpResponseBody = sendPostRequest(getTaskPushNotificationRequest); - return unmarshalResponse(httpResponseBody, GET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE); - } catch (IOException | InterruptedException e) { - throw new A2AServerException("Failed to get task push notification config: " + e, e.getCause()); - } + public TaskPushNotificationConfig getTaskPushNotificationConfig(String requestId, GetTaskPushNotificationConfigParams getTaskPushNotificationConfigParams) throws A2AServerException { + return transport.getTaskPushNotificationConfig(requestId, getTaskPushNotificationConfigParams); } /** @@ -349,7 +239,7 @@ public GetTaskPushNotificationConfigResponse getTaskPushNotificationConfig(Strin * @return the response indicating whether setting the task push notification configuration succeeded * @throws A2AServerException if setting the push notification configuration fails for any reason */ - public SetTaskPushNotificationConfigResponse setTaskPushNotificationConfig(String taskId, + public TaskPushNotificationConfig setTaskPushNotificationConfig(String taskId, PushNotificationConfig pushNotificationConfig) throws A2AServerException { return setTaskPushNotificationConfig(null, taskId, pushNotificationConfig); } @@ -363,25 +253,9 @@ public SetTaskPushNotificationConfigResponse setTaskPushNotificationConfig(Strin * @return the response indicating whether setting the task push notification configuration succeeded * @throws A2AServerException if setting the push notification configuration fails for any reason */ - public SetTaskPushNotificationConfigResponse setTaskPushNotificationConfig(String requestId, String taskId, + public TaskPushNotificationConfig setTaskPushNotificationConfig(String requestId, String taskId, PushNotificationConfig pushNotificationConfig) throws A2AServerException { - SetTaskPushNotificationConfigRequest.Builder setTaskPushNotificationRequestBuilder = new SetTaskPushNotificationConfigRequest.Builder() - .jsonrpc(JSONRPCMessage.JSONRPC_VERSION) - .method(SetTaskPushNotificationConfigRequest.METHOD) - .params(new TaskPushNotificationConfig(taskId, pushNotificationConfig)); - - if (requestId != null) { - setTaskPushNotificationRequestBuilder.id(requestId); - } - - SetTaskPushNotificationConfigRequest setTaskPushNotificationRequest = setTaskPushNotificationRequestBuilder.build(); - - try { - String httpResponseBody = sendPostRequest(setTaskPushNotificationRequest); - return unmarshalResponse(httpResponseBody, SET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE); - } catch (IOException | InterruptedException e) { - throw new A2AServerException("Failed to set task push notification config: " + e, e.getCause()); - } + return transport.setTaskPushNotificationConfig(requestId, taskId, pushNotificationConfig); } /** @@ -392,7 +266,7 @@ public SetTaskPushNotificationConfigResponse setTaskPushNotificationConfig(Strin * @return the response containing the push notification configuration * @throws A2AServerException if getting the push notification configuration fails for any reason */ - public ListTaskPushNotificationConfigResponse listTaskPushNotificationConfig(String requestId, String taskId) throws A2AServerException { + public List listTaskPushNotificationConfig(String requestId, String taskId) throws A2AServerException { return listTaskPushNotificationConfig(requestId, new ListTaskPushNotificationConfigParams(taskId)); } @@ -403,7 +277,7 @@ public ListTaskPushNotificationConfigResponse listTaskPushNotificationConfig(Str * @return the response containing the push notification configuration * @throws A2AServerException if getting the push notification configuration fails for any reason */ - public ListTaskPushNotificationConfigResponse listTaskPushNotificationConfig(String taskId) throws A2AServerException { + public List listTaskPushNotificationConfig(String taskId) throws A2AServerException { return listTaskPushNotificationConfig(null, new ListTaskPushNotificationConfigParams(taskId)); } @@ -414,7 +288,7 @@ public ListTaskPushNotificationConfigResponse listTaskPushNotificationConfig(Str * @return the response containing the push notification configuration * @throws A2AServerException if getting the push notification configuration fails for any reason */ - public ListTaskPushNotificationConfigResponse listTaskPushNotificationConfig(ListTaskPushNotificationConfigParams listTaskPushNotificationConfigParams) throws A2AServerException { + public List listTaskPushNotificationConfig(ListTaskPushNotificationConfigParams listTaskPushNotificationConfigParams) throws A2AServerException { return listTaskPushNotificationConfig(null, listTaskPushNotificationConfigParams); } @@ -426,25 +300,9 @@ public ListTaskPushNotificationConfigResponse listTaskPushNotificationConfig(Lis * @return the response containing the push notification configuration * @throws A2AServerException if getting the push notification configuration fails for any reason */ - public ListTaskPushNotificationConfigResponse listTaskPushNotificationConfig(String requestId, + public List listTaskPushNotificationConfig(String requestId, ListTaskPushNotificationConfigParams listTaskPushNotificationConfigParams) throws A2AServerException { - ListTaskPushNotificationConfigRequest.Builder listTaskPushNotificationRequestBuilder = new ListTaskPushNotificationConfigRequest.Builder() - .jsonrpc(JSONRPCMessage.JSONRPC_VERSION) - .method(ListTaskPushNotificationConfigRequest.METHOD) - .params(listTaskPushNotificationConfigParams); - - if (requestId != null) { - listTaskPushNotificationRequestBuilder.id(requestId); - } - - ListTaskPushNotificationConfigRequest listTaskPushNotificationRequest = listTaskPushNotificationRequestBuilder.build(); - - try { - String httpResponseBody = sendPostRequest(listTaskPushNotificationRequest); - return unmarshalResponse(httpResponseBody, LIST_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE); - } catch (IOException | InterruptedException e) { - throw new A2AServerException("Failed to list task push notification config: " + e, e.getCause()); - } + return transport.listTaskPushNotificationConfig(requestId, listTaskPushNotificationConfigParams); } /** @@ -456,9 +314,9 @@ public ListTaskPushNotificationConfigResponse listTaskPushNotificationConfig(Str * @return the response * @throws A2AServerException if deleting the push notification configuration fails for any reason */ - public DeleteTaskPushNotificationConfigResponse deleteTaskPushNotificationConfig(String requestId, String taskId, + public void deleteTaskPushNotificationConfig(String requestId, String taskId, String pushNotificationConfigId) throws A2AServerException { - return deleteTaskPushNotificationConfig(requestId, new DeleteTaskPushNotificationConfigParams(taskId, pushNotificationConfigId)); + deleteTaskPushNotificationConfig(requestId, new DeleteTaskPushNotificationConfigParams(taskId, pushNotificationConfigId)); } /** @@ -469,9 +327,9 @@ public DeleteTaskPushNotificationConfigResponse deleteTaskPushNotificationConfig * @return the response * @throws A2AServerException if deleting the push notification configuration fails for any reason */ - public DeleteTaskPushNotificationConfigResponse deleteTaskPushNotificationConfig(String taskId, + public void deleteTaskPushNotificationConfig(String taskId, String pushNotificationConfigId) throws A2AServerException { - return deleteTaskPushNotificationConfig(null, new DeleteTaskPushNotificationConfigParams(taskId, pushNotificationConfigId)); + deleteTaskPushNotificationConfig(null, new DeleteTaskPushNotificationConfigParams(taskId, pushNotificationConfigId)); } /** @@ -481,8 +339,8 @@ public DeleteTaskPushNotificationConfigResponse deleteTaskPushNotificationConfig * @return the response * @throws A2AServerException if deleting the push notification configuration fails for any reason */ - public DeleteTaskPushNotificationConfigResponse deleteTaskPushNotificationConfig(DeleteTaskPushNotificationConfigParams deleteTaskPushNotificationConfigParams) throws A2AServerException { - return deleteTaskPushNotificationConfig(null, deleteTaskPushNotificationConfigParams); + public void deleteTaskPushNotificationConfig(DeleteTaskPushNotificationConfigParams deleteTaskPushNotificationConfigParams) throws A2AServerException { + deleteTaskPushNotificationConfig(null, deleteTaskPushNotificationConfigParams); } /** @@ -493,25 +351,9 @@ public DeleteTaskPushNotificationConfigResponse deleteTaskPushNotificationConfig * @return the response * @throws A2AServerException if deleting the push notification configuration fails for any reason */ - public DeleteTaskPushNotificationConfigResponse deleteTaskPushNotificationConfig(String requestId, + public void deleteTaskPushNotificationConfig(String requestId, DeleteTaskPushNotificationConfigParams deleteTaskPushNotificationConfigParams) throws A2AServerException { - DeleteTaskPushNotificationConfigRequest.Builder deleteTaskPushNotificationRequestBuilder = new DeleteTaskPushNotificationConfigRequest.Builder() - .jsonrpc(JSONRPCMessage.JSONRPC_VERSION) - .method(DeleteTaskPushNotificationConfigRequest.METHOD) - .params(deleteTaskPushNotificationConfigParams); - - if (requestId != null) { - deleteTaskPushNotificationRequestBuilder.id(requestId); - } - - DeleteTaskPushNotificationConfigRequest deleteTaskPushNotificationRequest = deleteTaskPushNotificationRequestBuilder.build(); - - try { - String httpResponseBody = sendPostRequest(deleteTaskPushNotificationRequest); - return unmarshalResponse(httpResponseBody, DELETE_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE); - } catch (IOException | InterruptedException e) { - throw new A2AServerException("Failed to delete task push notification config: " + e, e.getCause()); - } + transport.deleteTaskPushNotificationConfig(requestId, deleteTaskPushNotificationConfigParams); } /** @@ -545,32 +387,7 @@ public void sendStreamingMessage(String requestId, MessageSendParams messageSend checkNotNullParam("errorHandler", errorHandler); checkNotNullParam("failureHandler", failureHandler); - SendStreamingMessageRequest.Builder sendStreamingMessageRequestBuilder = new SendStreamingMessageRequest.Builder() - .jsonrpc(JSONRPCMessage.JSONRPC_VERSION) - .method(SendStreamingMessageRequest.METHOD) - .params(messageSendParams); - - if (requestId != null) { - sendStreamingMessageRequestBuilder.id(requestId); - } - - AtomicReference> ref = new AtomicReference<>(); - SSEEventListener sseEventListener = new SSEEventListener(eventHandler, errorHandler, failureHandler); - SendStreamingMessageRequest sendStreamingMessageRequest = sendStreamingMessageRequestBuilder.build(); - try { - A2AHttpClient.PostBuilder builder = createPostBuilder(sendStreamingMessageRequest); - ref.set(builder.postAsyncSSE( - msg -> sseEventListener.onMessage(msg, ref.get()), - throwable -> sseEventListener.onError(throwable, ref.get()), - () -> { - // We don't need to do anything special on completion - })); - - } catch (IOException e) { - throw new A2AServerException("Failed to send streaming message request: " + e, e.getCause()); - } catch (InterruptedException e) { - throw new A2AServerException("Send streaming message request timed out: " + e, e.getCause()); - } + transport.sendStreamingMessage(requestId, messageSendParams, eventHandler, errorHandler, failureHandler); } /** @@ -604,58 +421,6 @@ public void resubscribeToTask(String requestId, TaskIdParams taskIdParams, Consu checkNotNullParam("errorHandler", errorHandler); checkNotNullParam("failureHandler", failureHandler); - TaskResubscriptionRequest.Builder taskResubscriptionRequestBuilder = new TaskResubscriptionRequest.Builder() - .jsonrpc(JSONRPCMessage.JSONRPC_VERSION) - .method(TaskResubscriptionRequest.METHOD) - .params(taskIdParams); - - if (requestId != null) { - taskResubscriptionRequestBuilder.id(requestId); - } - - AtomicReference> ref = new AtomicReference<>(); - SSEEventListener sseEventListener = new SSEEventListener(eventHandler, errorHandler, failureHandler); - TaskResubscriptionRequest taskResubscriptionRequest = taskResubscriptionRequestBuilder.build(); - try { - A2AHttpClient.PostBuilder builder = createPostBuilder(taskResubscriptionRequest); - ref.set(builder.postAsyncSSE( - msg -> sseEventListener.onMessage(msg, ref.get()), - throwable -> sseEventListener.onError(throwable, ref.get()), - () -> { - // We don't need to do anything special on completion - })); - - } catch (IOException e) { - throw new A2AServerException("Failed to send task resubscription request: " + e, e.getCause()); - } catch (InterruptedException e) { - throw new A2AServerException("Task resubscription request timed out: " + e, e.getCause()); - } - } - - private String sendPostRequest(Object value) throws IOException, InterruptedException { - A2AHttpClient.PostBuilder builder = createPostBuilder(value); - A2AHttpResponse response = builder.post(); - if (!response.success()) { - throw new IOException("Request failed " + response.status()); - } - return response.body(); - } - - private A2AHttpClient.PostBuilder createPostBuilder(Object value) throws JsonProcessingException { - return httpClient.createPost() - .url(agentUrl) - .addHeader("Content-Type", "application/json") - .body(Utils.OBJECT_MAPPER.writeValueAsString(value)); - - } - - private T unmarshalResponse(String response, TypeReference typeReference) - throws A2AServerException, JsonProcessingException { - T value = Utils.unmarshalFrom(response, typeReference); - JSONRPCError error = value.getError(); - if (error != null) { - throw new A2AServerException(error.getMessage() + (error.getData() != null ? ": " + error.getData() : ""), error); - } - return value; + transport.resubscribeToTask(requestId, taskIdParams, eventHandler, errorHandler, failureHandler); } } diff --git a/client/src/main/java/io/a2a/client/A2AGrpcClient.java b/client/src/main/java/io/a2a/client/A2AGrpcClient.java deleted file mode 100644 index 661adc228..000000000 --- a/client/src/main/java/io/a2a/client/A2AGrpcClient.java +++ /dev/null @@ -1,200 +0,0 @@ -package io.a2a.client; - -import static io.a2a.grpc.A2AServiceGrpc.A2AServiceBlockingV2Stub; -import static io.a2a.grpc.A2AServiceGrpc.A2AServiceStub; -import static io.a2a.grpc.utils.ProtoUtils.FromProto; -import static io.a2a.grpc.utils.ProtoUtils.ToProto; -import static io.a2a.util.Assert.checkNotNullParam; - -import java.util.function.Consumer; - -import io.a2a.client.sse.SSEStreamObserver; -import io.a2a.grpc.A2AServiceGrpc; -import io.a2a.grpc.CancelTaskRequest; -import io.a2a.grpc.CreateTaskPushNotificationConfigRequest; -import io.a2a.grpc.GetTaskPushNotificationConfigRequest; -import io.a2a.grpc.GetTaskRequest; -import io.a2a.grpc.SendMessageRequest; -import io.a2a.grpc.SendMessageResponse; -import io.a2a.grpc.StreamResponse; -import io.a2a.grpc.utils.ProtoUtils; -import io.a2a.spec.A2AServerException; -import io.a2a.spec.AgentCard; -import io.a2a.spec.EventKind; -import io.a2a.spec.GetTaskPushNotificationConfigParams; -import io.a2a.spec.MessageSendParams; -import io.a2a.spec.StreamingEventKind; -import io.a2a.spec.Task; -import io.a2a.spec.TaskIdParams; -import io.a2a.spec.TaskPushNotificationConfig; -import io.a2a.spec.TaskQueryParams; -import io.grpc.Channel; -import io.grpc.StatusRuntimeException; -import io.grpc.stub.StreamObserver; - -/** - * A2A Client for interacting with an A2A agent via gRPC. - */ -public class A2AGrpcClient { - - private A2AServiceBlockingV2Stub blockingStub; - private A2AServiceStub asyncStub; - private AgentCard agentCard; - - /** - * Create an A2A client for interacting with an A2A agent via gRPC. - * - * @param channel the gRPC channel - * @param agentCard the agent card for the A2A server this client will be communicating with - */ - public A2AGrpcClient(Channel channel, AgentCard agentCard) { - checkNotNullParam("channel", channel); - checkNotNullParam("agentCard", agentCard); - this.asyncStub = A2AServiceGrpc.newStub(channel); - this.blockingStub = A2AServiceGrpc.newBlockingV2Stub(channel); - this.agentCard = agentCard; - } - - /** - * Send a message to the remote agent. - * - * @param messageSendParams the parameters for the message to be sent - * @return the response, may be a message or a task - * @throws A2AServerException if sending the message fails for any reason - */ - public EventKind sendMessage(MessageSendParams messageSendParams) throws A2AServerException { - SendMessageRequest request = createGrpcSendMessageRequestFromMessageSendParams(messageSendParams); - try { - SendMessageResponse response = blockingStub.sendMessage(request); - if (response.hasMsg()) { - return FromProto.message(response.getMsg()); - } else if (response.hasTask()) { - return FromProto.task(response.getTask()); - } else { - throw new A2AServerException("Server response did not contain a message or task"); - } - } catch (StatusRuntimeException e) { - throw new A2AServerException("Failed to send message: " + e, e); - } - } - - /** - * Retrieves the current state and history of a specific task. - * - * @param taskQueryParams the params for the task to be queried - * @return the task - * @throws A2AServerException if retrieving the task fails for any reason - */ - public Task getTask(TaskQueryParams taskQueryParams) throws A2AServerException { - GetTaskRequest.Builder requestBuilder = GetTaskRequest.newBuilder(); - requestBuilder.setName("tasks/" + taskQueryParams.id()); - if (taskQueryParams.historyLength() != null) { - requestBuilder.setHistoryLength(taskQueryParams.historyLength()); - } - GetTaskRequest getTaskRequest = requestBuilder.build(); - try { - return FromProto.task(blockingStub.getTask(getTaskRequest)); - } catch (StatusRuntimeException e) { - throw new A2AServerException("Failed to get task: " + e, e); - } - } - - /** - * Cancel a task that was previously submitted to the A2A server. - * - * @param taskIdParams the params for the task to be cancelled - * @return the updated task - * @throws A2AServerException if cancelling the task fails for any reason - */ - public Task cancelTask(TaskIdParams taskIdParams) throws A2AServerException { - CancelTaskRequest cancelTaskRequest = CancelTaskRequest.newBuilder() - .setName("tasks/" + taskIdParams.id()) - .build(); - try { - return FromProto.task(blockingStub.cancelTask(cancelTaskRequest)); - } catch (StatusRuntimeException e) { - throw new A2AServerException("Failed to cancel task: " + e, e); - } - } - - /** - * Set push notification configuration for a task. - * - * @param taskPushNotificationConfig the task push notification configuration - * @return the task push notification config - * @throws A2AServerException if setting the push notification configuration fails for any reason - */ - public TaskPushNotificationConfig setTaskPushNotificationConfig(TaskPushNotificationConfig taskPushNotificationConfig) throws A2AServerException { - String configId = taskPushNotificationConfig.pushNotificationConfig().id(); - CreateTaskPushNotificationConfigRequest request = CreateTaskPushNotificationConfigRequest.newBuilder() - .setParent("tasks/" + taskPushNotificationConfig.taskId()) - .setConfig(ToProto.taskPushNotificationConfig(taskPushNotificationConfig)) - .setConfigId(configId == null ? "" : configId) - .build(); - try { - return FromProto.taskPushNotificationConfig(blockingStub.createTaskPushNotificationConfig(request)); - } catch (StatusRuntimeException e) { - throw new A2AServerException("Failed to set the task push notification config: " + e, e); - } - } - - /** - * Get the push notification configuration for a task. - * - * @param getTaskPushNotificationConfigParams the params for the task - * @return the push notification configuration - * @throws A2AServerException if getting the push notification configuration fails for any reason - */ - public TaskPushNotificationConfig getTaskPushNotificationConfig(GetTaskPushNotificationConfigParams getTaskPushNotificationConfigParams) throws A2AServerException { - GetTaskPushNotificationConfigRequest getTaskPushNotificationConfigRequest = GetTaskPushNotificationConfigRequest.newBuilder() - .setName(getTaskPushNotificationConfigName(getTaskPushNotificationConfigParams)) - .build(); - try { - return FromProto.taskPushNotificationConfig(blockingStub.getTaskPushNotificationConfig(getTaskPushNotificationConfigRequest)); - } catch (StatusRuntimeException e) { - throw new A2AServerException("Failed to get the task push notification config: " + e, e); - } - } - - /** - * Send a streaming message request to the remote agent. - * - * @param messageSendParams the parameters for the message to be sent - * @param eventHandler a consumer that will be invoked for each event received from the remote agent - * @param errorHandler a consumer that will be invoked if an error occurs - * @throws A2AServerException if sending the streaming message fails for any reason - */ - public void sendMessageStreaming(MessageSendParams messageSendParams, Consumer eventHandler, - Consumer errorHandler) throws A2AServerException { - SendMessageRequest request = createGrpcSendMessageRequestFromMessageSendParams(messageSendParams); - StreamObserver streamObserver = new SSEStreamObserver(eventHandler, errorHandler); - try { - asyncStub.sendStreamingMessage(request, streamObserver); - } catch (StatusRuntimeException e) { - throw new A2AServerException("Failed to send streaming message: " + e, e); - } - } - - private SendMessageRequest createGrpcSendMessageRequestFromMessageSendParams(MessageSendParams messageSendParams) { - SendMessageRequest.Builder builder = SendMessageRequest.newBuilder(); - builder.setRequest(ToProto.message(messageSendParams.message())); - if (messageSendParams.configuration() != null) { - builder.setConfiguration(ToProto.messageSendConfiguration(messageSendParams.configuration())); - } - if (messageSendParams.metadata() != null) { - builder.setMetadata(ToProto.struct(messageSendParams.metadata())); - } - return builder.build(); - } - - private String getTaskPushNotificationConfigName(GetTaskPushNotificationConfigParams getTaskPushNotificationConfigParams) { - StringBuilder name = new StringBuilder(); - name.append("tasks/"); - name.append(getTaskPushNotificationConfigParams.id()); - if (getTaskPushNotificationConfigParams.pushNotificationConfigId() != null) { - name.append("/pushNotificationConfigs/"); - name.append(getTaskPushNotificationConfigParams.pushNotificationConfigId()); - } - return name.toString(); - } -} diff --git a/client/src/test/java/io/a2a/client/A2ACardResolverTest.java b/client/src/test/java/io/a2a/client/A2ACardResolverTest.java index 8d9ff0f5b..70fc9883b 100644 --- a/client/src/test/java/io/a2a/client/A2ACardResolverTest.java +++ b/client/src/test/java/io/a2a/client/A2ACardResolverTest.java @@ -11,8 +11,8 @@ import java.util.function.Consumer; import com.fasterxml.jackson.core.type.TypeReference; -import io.a2a.http.A2AHttpClient; -import io.a2a.http.A2AHttpResponse; +import io.a2a.transport.jsonrpc.client.A2AHttpClient; +import io.a2a.transport.jsonrpc.client.A2AHttpResponse; import io.a2a.spec.A2AClientError; import io.a2a.spec.A2AClientJSONError; import io.a2a.spec.AgentCard; diff --git a/pom.xml b/pom.xml index 0a3bfba88..0d92d7651 100644 --- a/pom.xml +++ b/pom.xml @@ -290,6 +290,7 @@ spec-grpc tck tests/server-common + transport/spi transport/jsonrpc transport/grpc diff --git a/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/QuarkusGrpcHandler.java b/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/QuarkusGrpcHandler.java index 4a98ace48..816ed6f79 100644 --- a/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/QuarkusGrpcHandler.java +++ b/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/QuarkusGrpcHandler.java @@ -3,7 +3,7 @@ import jakarta.enterprise.inject.Instance; import jakarta.inject.Inject; -import io.a2a.grpc.handler.GrpcHandler; +import io.a2a.transport.grpc.server.handler.GrpcHandler; import io.a2a.server.PublicAgentCard; import io.a2a.server.requesthandlers.CallContextFactory; import io.a2a.server.requesthandlers.RequestHandler; diff --git a/reference/grpc/src/test/java/io/a2a/server/grpc/quarkus/A2ATestResource.java b/reference/grpc/src/test/java/io/a2a/server/grpc/quarkus/A2ATestResource.java index 25758da9c..9fb0074ac 100644 --- a/reference/grpc/src/test/java/io/a2a/server/grpc/quarkus/A2ATestResource.java +++ b/reference/grpc/src/test/java/io/a2a/server/grpc/quarkus/A2ATestResource.java @@ -19,7 +19,7 @@ import jakarta.ws.rs.core.Response; import io.a2a.server.apps.common.TestUtilsBean; -import io.a2a.grpc.handler.GrpcHandler; +import io.a2a.transport.grpc.server.handler.GrpcHandler; import io.a2a.spec.PushNotificationConfig; import io.a2a.spec.Task; import io.a2a.spec.TaskArtifactUpdateEvent; diff --git a/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java b/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java index 3ebf22ccd..5cdc31e7e 100644 --- a/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java +++ b/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java @@ -20,7 +20,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.io.JsonEOFException; import com.fasterxml.jackson.databind.JsonNode; -import io.a2a.jsonrpc.handler.JSONRPCHandler; +import io.a2a.transport.jsonrpc.server.handler.JSONRPCHandler; import io.a2a.server.ExtendedAgentCard; import io.a2a.server.ServerCallContext; import io.a2a.server.auth.UnauthenticatedUser; diff --git a/server-common/src/main/java/io/a2a/server/tasks/BasePushNotificationSender.java b/server-common/src/main/java/io/a2a/server/tasks/BasePushNotificationSender.java index 33ac4445c..ac04216f3 100644 --- a/server-common/src/main/java/io/a2a/server/tasks/BasePushNotificationSender.java +++ b/server-common/src/main/java/io/a2a/server/tasks/BasePushNotificationSender.java @@ -10,8 +10,8 @@ import com.fasterxml.jackson.core.JsonProcessingException; -import io.a2a.http.A2AHttpClient; -import io.a2a.http.JdkA2AHttpClient; +import io.a2a.transport.jsonrpc.client.A2AHttpClient; +import io.a2a.transport.jsonrpc.client.JdkA2AHttpClient; import io.a2a.spec.PushNotificationConfig; import io.a2a.spec.Task; import io.a2a.util.Utils; diff --git a/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java b/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java index 7232d1a0c..7c62562f8 100644 --- a/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java +++ b/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java @@ -12,8 +12,8 @@ import java.util.concurrent.Executors; import java.util.function.Consumer; -import io.a2a.http.A2AHttpClient; -import io.a2a.http.A2AHttpResponse; +import io.a2a.transport.jsonrpc.client.A2AHttpClient; +import io.a2a.transport.jsonrpc.client.A2AHttpResponse; import io.a2a.server.agentexecution.AgentExecutor; import io.a2a.server.agentexecution.RequestContext; import io.a2a.server.events.EventQueue; diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/TestHttpClient.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/TestHttpClient.java index c5deef68f..046e0f8bb 100644 --- a/tests/server-common/src/test/java/io/a2a/server/apps/common/TestHttpClient.java +++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/TestHttpClient.java @@ -11,8 +11,8 @@ import jakarta.enterprise.context.Dependent; import jakarta.enterprise.inject.Alternative; -import io.a2a.http.A2AHttpClient; -import io.a2a.http.A2AHttpResponse; +import io.a2a.transport.jsonrpc.client.A2AHttpClient; +import io.a2a.transport.jsonrpc.client.A2AHttpResponse; import io.a2a.spec.Task; import io.a2a.util.Utils; diff --git a/transport/grpc/pom.xml b/transport/grpc/pom.xml index 982ae3b8e..18257b2cc 100644 --- a/transport/grpc/pom.xml +++ b/transport/grpc/pom.xml @@ -18,6 +18,11 @@ Java SDK for the Agent2Agent Protocol (A2A) - gRPC + + io.github.a2asdk + a2a-java-sdk-transport-spi + ${project.version} + io.github.a2asdk a2a-java-sdk-server-common diff --git a/client/src/main/java/io/a2a/client/sse/SSEStreamObserver.java b/transport/grpc/src/main/java/io/a2a/transport/grpc/client/EventStreamObserver.java similarity index 83% rename from client/src/main/java/io/a2a/client/sse/SSEStreamObserver.java rename to transport/grpc/src/main/java/io/a2a/transport/grpc/client/EventStreamObserver.java index adc721f42..ae6d705fc 100644 --- a/client/src/main/java/io/a2a/client/sse/SSEStreamObserver.java +++ b/transport/grpc/src/main/java/io/a2a/transport/grpc/client/EventStreamObserver.java @@ -1,22 +1,22 @@ -package io.a2a.client.sse; +package io.a2a.transport.grpc.client; -import static io.a2a.grpc.utils.ProtoUtils.FromProto; +import io.a2a.grpc.StreamResponse; +import io.a2a.spec.StreamingEventKind; +import io.grpc.stub.StreamObserver; import java.util.function.Consumer; import java.util.logging.Logger; -import io.a2a.grpc.StreamResponse; -import io.a2a.spec.StreamingEventKind; -import io.grpc.stub.StreamObserver; +import static io.a2a.grpc.utils.ProtoUtils.FromProto; -public class SSEStreamObserver implements StreamObserver { +public class EventStreamObserver implements StreamObserver { - private static final Logger log = Logger.getLogger(SSEStreamObserver.class.getName()); + private static final Logger log = Logger.getLogger(EventStreamObserver.class.getName()); private final Consumer eventHandler; private final Consumer errorHandler; - public SSEStreamObserver(Consumer eventHandler, Consumer errorHandler) { + public EventStreamObserver(Consumer eventHandler, Consumer errorHandler) { this.eventHandler = eventHandler; this.errorHandler = errorHandler; } diff --git a/transport/grpc/src/main/java/io/a2a/transport/grpc/client/GrpcTransport.java b/transport/grpc/src/main/java/io/a2a/transport/grpc/client/GrpcTransport.java new file mode 100644 index 000000000..2975948c5 --- /dev/null +++ b/transport/grpc/src/main/java/io/a2a/transport/grpc/client/GrpcTransport.java @@ -0,0 +1,164 @@ +package io.a2a.transport.grpc.client; + +import io.a2a.grpc.*; +import io.a2a.grpc.SendMessageRequest; +import io.a2a.grpc.SendMessageResponse; +import io.a2a.grpc.utils.ProtoUtils; +import io.a2a.spec.*; +import io.a2a.spec.AgentCard; +import io.a2a.spec.Task; +import io.a2a.spec.TaskPushNotificationConfig; +import io.a2a.transport.spi.client.Transport; +import io.grpc.Channel; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; + +import java.util.List; +import java.util.function.Consumer; + +import static io.a2a.util.Assert.checkNotNullParam; + +/** + * @author David BRASSELY (david.brassely at graviteesource.com) + * @author GraviteeSource Team + */ +public class GrpcTransport implements Transport { + + private A2AServiceGrpc.A2AServiceBlockingV2Stub blockingStub; + private A2AServiceGrpc.A2AServiceStub asyncStub; + private AgentCard agentCard; + + /** + * Create an A2A client for interacting with an A2A agent via gRPC. + * + * @param channel the gRPC channel + * @param agentCard the agent card for the A2A server this client will be communicating with + */ + public GrpcTransport(Channel channel, AgentCard agentCard) { + checkNotNullParam("channel", channel); + checkNotNullParam("agentCard", agentCard); + this.asyncStub = A2AServiceGrpc.newStub(channel); + this.blockingStub = A2AServiceGrpc.newBlockingV2Stub(channel); + this.agentCard = agentCard; + } + + @Override + public EventKind sendMessage(String requestId, MessageSendParams messageSendParams) throws A2AServerException { + SendMessageRequest request = createGrpcSendMessageRequestFromMessageSendParams(messageSendParams); + try { + SendMessageResponse response = blockingStub.sendMessage(request); + if (response.hasMsg()) { + return ProtoUtils.FromProto.message(response.getMsg()); + } else if (response.hasTask()) { + return ProtoUtils.FromProto.task(response.getTask()); + } else { + throw new A2AServerException("Server response did not contain a message or task"); + } + } catch (StatusRuntimeException e) { + throw new A2AServerException("Failed to send message: " + e, e); + } + } + + @Override + public Task getTask(String requestId, TaskQueryParams taskQueryParams) throws A2AServerException { + io.a2a.grpc.GetTaskRequest.Builder requestBuilder = io.a2a.grpc.GetTaskRequest.newBuilder(); + requestBuilder.setName("tasks/" + taskQueryParams.id()); + if (taskQueryParams.historyLength() != null) { + requestBuilder.setHistoryLength(taskQueryParams.historyLength()); + } + io.a2a.grpc.GetTaskRequest getTaskRequest = requestBuilder.build(); + try { + return ProtoUtils.FromProto.task(blockingStub.getTask(getTaskRequest)); + } catch (StatusRuntimeException e) { + throw new A2AServerException("Failed to get task: " + e, e); + } + } + + @Override + public Task cancelTask(String requestId, TaskIdParams taskIdParams) throws A2AServerException { + io.a2a.grpc.CancelTaskRequest cancelTaskRequest = io.a2a.grpc.CancelTaskRequest.newBuilder() + .setName("tasks/" + taskIdParams.id()) + .build(); + try { + return ProtoUtils.FromProto.task(blockingStub.cancelTask(cancelTaskRequest)); + } catch (StatusRuntimeException e) { + throw new A2AServerException("Failed to cancel task: " + e, e); + } + } + + @Override + public TaskPushNotificationConfig getTaskPushNotificationConfig(String requestId, GetTaskPushNotificationConfigParams getTaskPushNotificationConfigParams) throws A2AServerException { + io.a2a.grpc.GetTaskPushNotificationConfigRequest getTaskPushNotificationConfigRequest = io.a2a.grpc.GetTaskPushNotificationConfigRequest.newBuilder() + .setName(getTaskPushNotificationConfigName(getTaskPushNotificationConfigParams)) + .build(); + try { + return ProtoUtils.FromProto.taskPushNotificationConfig(blockingStub.getTaskPushNotificationConfig(getTaskPushNotificationConfigRequest)); + } catch (StatusRuntimeException e) { + throw new A2AServerException("Failed to get the task push notification config: " + e, e); + } + } + + @Override + public TaskPushNotificationConfig setTaskPushNotificationConfig(String requestId, String taskId, TaskPushNotificationConfig taskPushNotificationConfig) throws A2AServerException { + String configId = taskPushNotificationConfig.pushNotificationConfig().id(); + CreateTaskPushNotificationConfigRequest request = CreateTaskPushNotificationConfigRequest.newBuilder() + .setParent("tasks/" + taskPushNotificationConfig.taskId()) + .setConfig(ProtoUtils.ToProto.taskPushNotificationConfig(taskPushNotificationConfig)) + .setConfigId(configId == null ? "" : configId) + .build(); + try { + return ProtoUtils.FromProto.taskPushNotificationConfig(blockingStub.createTaskPushNotificationConfig(request)); + } catch (StatusRuntimeException e) { + throw new A2AServerException("Failed to set the task push notification config: " + e, e); + } + } + + @Override + public List listTaskPushNotificationConfig(String requestId, ListTaskPushNotificationConfigParams listTaskPushNotificationConfigParams) throws A2AServerException { + return List.of(); + } + + @Override + public void deleteTaskPushNotificationConfig(String requestId, DeleteTaskPushNotificationConfigParams deleteTaskPushNotificationConfigParams) throws A2AServerException { + + } + + @Override + public void sendStreamingMessage(String requestId, MessageSendParams messageSendParams, Consumer eventHandler, Consumer errorHandler, Runnable failureHandler) throws A2AServerException { + SendMessageRequest request = createGrpcSendMessageRequestFromMessageSendParams(messageSendParams); + StreamObserver streamObserver = new EventStreamObserver(eventHandler, errorHandler); + try { + asyncStub.sendStreamingMessage(request, streamObserver); + } catch (StatusRuntimeException e) { + throw new A2AServerException("Failed to send streaming message: " + e, e); + } + } + + @Override + public void resubscribeToTask(String requestId, TaskIdParams taskIdParams, Consumer eventHandler, Consumer errorHandler, Runnable failureHandler) throws A2AServerException { + + } + + private SendMessageRequest createGrpcSendMessageRequestFromMessageSendParams(MessageSendParams messageSendParams) { + SendMessageRequest.Builder builder = SendMessageRequest.newBuilder(); + builder.setRequest(ProtoUtils.ToProto.message(messageSendParams.message())); + if (messageSendParams.configuration() != null) { + builder.setConfiguration(ProtoUtils.ToProto.messageSendConfiguration(messageSendParams.configuration())); + } + if (messageSendParams.metadata() != null) { + builder.setMetadata(ProtoUtils.ToProto.struct(messageSendParams.metadata())); + } + return builder.build(); + } + + private String getTaskPushNotificationConfigName(GetTaskPushNotificationConfigParams getTaskPushNotificationConfigParams) { + StringBuilder name = new StringBuilder(); + name.append("tasks/"); + name.append(getTaskPushNotificationConfigParams.id()); + if (getTaskPushNotificationConfigParams.pushNotificationConfigId() != null) { + name.append("/pushNotificationConfigs/"); + name.append(getTaskPushNotificationConfigParams.pushNotificationConfigId()); + } + return name.toString(); + } +} diff --git a/transport/grpc/src/main/java/io/a2a/grpc/handler/GrpcHandler.java b/transport/grpc/src/main/java/io/a2a/transport/grpc/server/handler/GrpcHandler.java similarity index 99% rename from transport/grpc/src/main/java/io/a2a/grpc/handler/GrpcHandler.java rename to transport/grpc/src/main/java/io/a2a/transport/grpc/server/handler/GrpcHandler.java index 15aa84209..20381b938 100644 --- a/transport/grpc/src/main/java/io/a2a/grpc/handler/GrpcHandler.java +++ b/transport/grpc/src/main/java/io/a2a/transport/grpc/server/handler/GrpcHandler.java @@ -1,4 +1,4 @@ -package io.a2a.grpc.handler; +package io.a2a.transport.grpc.server.handler; import static io.a2a.grpc.utils.ProtoUtils.FromProto; import static io.a2a.grpc.utils.ProtoUtils.ToProto; diff --git a/transport/grpc/src/test/java/io/a2a/grpc/handler/GrpcHandlerTest.java b/transport/grpc/src/test/java/io/a2a/grpc/handler/GrpcHandlerTest.java index 1cfeb6626..76cdb6f6e 100644 --- a/transport/grpc/src/test/java/io/a2a/grpc/handler/GrpcHandlerTest.java +++ b/transport/grpc/src/test/java/io/a2a/grpc/handler/GrpcHandlerTest.java @@ -45,6 +45,7 @@ import io.a2a.spec.TaskStatusUpdateEvent; import io.a2a.spec.TextPart; import io.a2a.spec.UnsupportedOperationError; +import io.a2a.transport.grpc.server.handler.GrpcHandler; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.internal.testing.StreamRecorder; diff --git a/transport/jsonrpc/pom.xml b/transport/jsonrpc/pom.xml index b66e3b1f5..a00362398 100644 --- a/transport/jsonrpc/pom.xml +++ b/transport/jsonrpc/pom.xml @@ -18,6 +18,11 @@ Java SDK for the Agent2Agent Protocol (A2A) - JSONRPC + + io.github.a2asdk + a2a-java-sdk-transport-spi + ${project.version} + io.github.a2asdk a2a-java-sdk-server-common diff --git a/client/src/main/java/io/a2a/http/A2AHttpClient.java b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/client/A2AHttpClient.java similarity index 96% rename from client/src/main/java/io/a2a/http/A2AHttpClient.java rename to transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/client/A2AHttpClient.java index 7a246843a..3ebd6e8b7 100644 --- a/client/src/main/java/io/a2a/http/A2AHttpClient.java +++ b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/client/A2AHttpClient.java @@ -1,4 +1,4 @@ -package io.a2a.http; +package io.a2a.transport.jsonrpc.client; import java.io.IOException; import java.util.concurrent.CompletableFuture; diff --git a/client/src/main/java/io/a2a/http/A2AHttpResponse.java b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/client/A2AHttpResponse.java similarity index 70% rename from client/src/main/java/io/a2a/http/A2AHttpResponse.java rename to transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/client/A2AHttpResponse.java index d6973a5dc..0e7074b8b 100644 --- a/client/src/main/java/io/a2a/http/A2AHttpResponse.java +++ b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/client/A2AHttpResponse.java @@ -1,4 +1,4 @@ -package io.a2a.http; +package io.a2a.transport.jsonrpc.client; public interface A2AHttpResponse { int status(); diff --git a/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/client/JSONRPCTransport.java b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/client/JSONRPCTransport.java new file mode 100644 index 000000000..dce6ab668 --- /dev/null +++ b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/client/JSONRPCTransport.java @@ -0,0 +1,301 @@ +package io.a2a.transport.jsonrpc.client; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import io.a2a.spec.A2AServerException; +import io.a2a.spec.CancelTaskRequest; +import io.a2a.spec.CancelTaskResponse; +import io.a2a.spec.DeleteTaskPushNotificationConfigParams; +import io.a2a.spec.DeleteTaskPushNotificationConfigRequest; +import io.a2a.spec.DeleteTaskPushNotificationConfigResponse; +import io.a2a.spec.EventKind; +import io.a2a.spec.GetTaskPushNotificationConfigParams; +import io.a2a.spec.GetTaskPushNotificationConfigRequest; +import io.a2a.spec.GetTaskPushNotificationConfigResponse; +import io.a2a.spec.GetTaskRequest; +import io.a2a.spec.GetTaskResponse; +import io.a2a.spec.JSONRPCError; +import io.a2a.spec.JSONRPCMessage; +import io.a2a.spec.JSONRPCResponse; +import io.a2a.spec.ListTaskPushNotificationConfigParams; +import io.a2a.spec.ListTaskPushNotificationConfigRequest; +import io.a2a.spec.ListTaskPushNotificationConfigResponse; +import io.a2a.spec.MessageSendParams; +import io.a2a.spec.PushNotificationConfig; +import io.a2a.spec.SendMessageRequest; +import io.a2a.spec.SendMessageResponse; +import io.a2a.spec.SendStreamingMessageRequest; +import io.a2a.spec.SetTaskPushNotificationConfigRequest; +import io.a2a.spec.SetTaskPushNotificationConfigResponse; +import io.a2a.spec.StreamingEventKind; +import io.a2a.spec.Task; +import io.a2a.spec.TaskIdParams; +import io.a2a.spec.TaskPushNotificationConfig; +import io.a2a.spec.TaskQueryParams; +import io.a2a.spec.TaskResubscriptionRequest; +import io.a2a.transport.jsonrpc.client.sse.SSEEventListener; +import io.a2a.transport.spi.client.Transport; +import io.a2a.util.Utils; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +public class JSONRPCTransport implements Transport { + + private static final TypeReference SEND_MESSAGE_RESPONSE_REFERENCE = new TypeReference<>() {}; + private static final TypeReference GET_TASK_RESPONSE_REFERENCE = new TypeReference<>() {}; + private static final TypeReference CANCEL_TASK_RESPONSE_REFERENCE = new TypeReference<>() {}; + private static final TypeReference GET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE = new TypeReference<>() {}; + private static final TypeReference SET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE = new TypeReference<>() {}; + private static final TypeReference LIST_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE = new TypeReference<>() {}; + private static final TypeReference DELETE_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE = new TypeReference<>() {}; + + private final String agentUrl; + private final A2AHttpClient httpClient; + + public JSONRPCTransport(String agentUrl) { + this(agentUrl, new JdkA2AHttpClient()); + } + + public JSONRPCTransport(String agentUrl, A2AHttpClient httpClient) { + this.agentUrl = agentUrl; + this.httpClient = httpClient; + } + + @Override + public EventKind sendMessage(String requestId, MessageSendParams messageSendParams) throws A2AServerException { + SendMessageRequest.Builder sendMessageRequestBuilder = new SendMessageRequest.Builder() + .jsonrpc(JSONRPCMessage.JSONRPC_VERSION) + .method(SendMessageRequest.METHOD) + .params(messageSendParams); + + if (requestId != null) { + sendMessageRequestBuilder.id(requestId); + } + + SendMessageRequest sendMessageRequest = sendMessageRequestBuilder.build(); + + try { + String httpResponseBody = sendPostRequest(sendMessageRequest); + return unmarshalResponse(httpResponseBody, SEND_MESSAGE_RESPONSE_REFERENCE).getResult(); + } catch (IOException | InterruptedException e) { + throw new A2AServerException("Failed to send message: " + e, e.getCause()); + } + } + + @Override + public Task getTask(String requestId, TaskQueryParams taskQueryParams) throws A2AServerException { + GetTaskRequest.Builder getTaskRequestBuilder = new GetTaskRequest.Builder() + .jsonrpc(JSONRPCMessage.JSONRPC_VERSION) + .method(GetTaskRequest.METHOD) + .params(taskQueryParams); + + if (requestId != null) { + getTaskRequestBuilder.id(requestId); + } + + GetTaskRequest getTaskRequest = getTaskRequestBuilder.build(); + + try { + String httpResponseBody = sendPostRequest(getTaskRequest); + return unmarshalResponse(httpResponseBody, GET_TASK_RESPONSE_REFERENCE).getResult(); + } catch (IOException | InterruptedException e) { + throw new A2AServerException("Failed to get task: " + e, e.getCause()); + } + } + + @Override + public Task cancelTask(String requestId, TaskIdParams taskIdParams) throws A2AServerException { + CancelTaskRequest.Builder cancelTaskRequestBuilder = new CancelTaskRequest.Builder() + .jsonrpc(JSONRPCMessage.JSONRPC_VERSION) + .method(CancelTaskRequest.METHOD) + .params(taskIdParams); + + if (requestId != null) { + cancelTaskRequestBuilder.id(requestId); + } + + CancelTaskRequest cancelTaskRequest = cancelTaskRequestBuilder.build(); + + try { + String httpResponseBody = sendPostRequest(cancelTaskRequest); + return unmarshalResponse(httpResponseBody, CANCEL_TASK_RESPONSE_REFERENCE).getResult(); + } catch (IOException | InterruptedException e) { + throw new A2AServerException("Failed to cancel task: " + e, e.getCause()); + } + } + + @Override + public TaskPushNotificationConfig getTaskPushNotificationConfig(String requestId, GetTaskPushNotificationConfigParams getTaskPushNotificationConfigParams) throws A2AServerException { + GetTaskPushNotificationConfigRequest.Builder getTaskPushNotificationRequestBuilder = new GetTaskPushNotificationConfigRequest.Builder() + .jsonrpc(JSONRPCMessage.JSONRPC_VERSION) + .method(GetTaskPushNotificationConfigRequest.METHOD) + .params(getTaskPushNotificationConfigParams); + + if (requestId != null) { + getTaskPushNotificationRequestBuilder.id(requestId); + } + + GetTaskPushNotificationConfigRequest getTaskPushNotificationRequest = getTaskPushNotificationRequestBuilder.build(); + + try { + String httpResponseBody = sendPostRequest(getTaskPushNotificationRequest); + return unmarshalResponse(httpResponseBody, GET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE).getResult(); + } catch (IOException | InterruptedException e) { + throw new A2AServerException("Failed to get task push notification config: " + e, e.getCause()); + } + } + + @Override + public TaskPushNotificationConfig setTaskPushNotificationConfig(String requestId, String taskId, PushNotificationConfig pushNotificationConfig) throws A2AServerException { + SetTaskPushNotificationConfigRequest.Builder setTaskPushNotificationRequestBuilder = new SetTaskPushNotificationConfigRequest.Builder() + .jsonrpc(JSONRPCMessage.JSONRPC_VERSION) + .method(SetTaskPushNotificationConfigRequest.METHOD) + .params(new TaskPushNotificationConfig(taskId, pushNotificationConfig)); + + if (requestId != null) { + setTaskPushNotificationRequestBuilder.id(requestId); + } + + SetTaskPushNotificationConfigRequest setTaskPushNotificationRequest = setTaskPushNotificationRequestBuilder.build(); + + try { + String httpResponseBody = sendPostRequest(setTaskPushNotificationRequest); + return unmarshalResponse(httpResponseBody, SET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE).getResult(); + } catch (IOException | InterruptedException e) { + throw new A2AServerException("Failed to set task push notification config: " + e, e.getCause()); + } + } + + @Override + public List listTaskPushNotificationConfig(String requestId, ListTaskPushNotificationConfigParams listTaskPushNotificationConfigParams) throws A2AServerException { + ListTaskPushNotificationConfigRequest.Builder listTaskPushNotificationRequestBuilder = new ListTaskPushNotificationConfigRequest.Builder() + .jsonrpc(JSONRPCMessage.JSONRPC_VERSION) + .method(ListTaskPushNotificationConfigRequest.METHOD) + .params(listTaskPushNotificationConfigParams); + + if (requestId != null) { + listTaskPushNotificationRequestBuilder.id(requestId); + } + + ListTaskPushNotificationConfigRequest listTaskPushNotificationRequest = listTaskPushNotificationRequestBuilder.build(); + + try { + String httpResponseBody = sendPostRequest(listTaskPushNotificationRequest); + return unmarshalResponse(httpResponseBody, LIST_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE).getResult(); + } catch (IOException | InterruptedException e) { + throw new A2AServerException("Failed to list task push notification config: " + e, e.getCause()); + } + } + + @Override + public void deleteTaskPushNotificationConfig(String requestId, DeleteTaskPushNotificationConfigParams deleteTaskPushNotificationConfigParams) throws A2AServerException { + DeleteTaskPushNotificationConfigRequest.Builder deleteTaskPushNotificationRequestBuilder = new DeleteTaskPushNotificationConfigRequest.Builder() + .jsonrpc(JSONRPCMessage.JSONRPC_VERSION) + .method(DeleteTaskPushNotificationConfigRequest.METHOD) + .params(deleteTaskPushNotificationConfigParams); + + if (requestId != null) { + deleteTaskPushNotificationRequestBuilder.id(requestId); + } + + DeleteTaskPushNotificationConfigRequest deleteTaskPushNotificationRequest = deleteTaskPushNotificationRequestBuilder.build(); + + try { + String httpResponseBody = sendPostRequest(deleteTaskPushNotificationRequest); + unmarshalResponse(httpResponseBody, DELETE_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE); + } catch (IOException | InterruptedException e) { + throw new A2AServerException("Failed to delete task push notification config: " + e, e.getCause()); + } + } + + @Override + public void sendStreamingMessage(String requestId, MessageSendParams messageSendParams, Consumer eventHandler, Consumer errorHandler, Runnable failureHandler) throws A2AServerException { + SendStreamingMessageRequest.Builder sendStreamingMessageRequestBuilder = new SendStreamingMessageRequest.Builder() + .jsonrpc(JSONRPCMessage.JSONRPC_VERSION) + .method(SendStreamingMessageRequest.METHOD) + .params(messageSendParams); + + if (requestId != null) { + sendStreamingMessageRequestBuilder.id(requestId); + } + + AtomicReference> ref = new AtomicReference<>(); + SSEEventListener sseEventListener = new SSEEventListener(eventHandler, errorHandler, failureHandler); + SendStreamingMessageRequest sendStreamingMessageRequest = sendStreamingMessageRequestBuilder.build(); + try { + A2AHttpClient.PostBuilder builder = createPostBuilder(sendStreamingMessageRequest); + ref.set(builder.postAsyncSSE( + msg -> sseEventListener.onMessage(msg, ref.get()), + throwable -> sseEventListener.onError(throwable, ref.get()), + () -> { + // We don't need to do anything special on completion + })); + + } catch (IOException e) { + throw new A2AServerException("Failed to send streaming message request: " + e, e.getCause()); + } catch (InterruptedException e) { + throw new A2AServerException("Send streaming message request timed out: " + e, e.getCause()); + } + } + + @Override + public void resubscribeToTask(String requestId, TaskIdParams taskIdParams, Consumer eventHandler, Consumer errorHandler, Runnable failureHandler) throws A2AServerException { + TaskResubscriptionRequest.Builder taskResubscriptionRequestBuilder = new TaskResubscriptionRequest.Builder() + .jsonrpc(JSONRPCMessage.JSONRPC_VERSION) + .method(TaskResubscriptionRequest.METHOD) + .params(taskIdParams); + + if (requestId != null) { + taskResubscriptionRequestBuilder.id(requestId); + } + + AtomicReference> ref = new AtomicReference<>(); + SSEEventListener sseEventListener = new SSEEventListener(eventHandler, errorHandler, failureHandler); + TaskResubscriptionRequest taskResubscriptionRequest = taskResubscriptionRequestBuilder.build(); + try { + A2AHttpClient.PostBuilder builder = createPostBuilder(taskResubscriptionRequest); + ref.set(builder.postAsyncSSE( + msg -> sseEventListener.onMessage(msg, ref.get()), + throwable -> sseEventListener.onError(throwable, ref.get()), + () -> { + // We don't need to do anything special on completion + })); + + } catch (IOException e) { + throw new A2AServerException("Failed to send task resubscription request: " + e, e.getCause()); + } catch (InterruptedException e) { + throw new A2AServerException("Task resubscription request timed out: " + e, e.getCause()); + } + } + + private String sendPostRequest(Object value) throws IOException, InterruptedException { + A2AHttpClient.PostBuilder builder = createPostBuilder(value); + A2AHttpResponse response = builder.post(); + if (!response.success()) { + throw new IOException("Request failed " + response.status()); + } + return response.body(); + } + + private A2AHttpClient.PostBuilder createPostBuilder(Object value) throws JsonProcessingException { + return httpClient.createPost() + .url(agentUrl) + .addHeader("Content-Type", "application/json") + .body(Utils.OBJECT_MAPPER.writeValueAsString(value)); + + } + + private T unmarshalResponse(String response, TypeReference typeReference) + throws A2AServerException, JsonProcessingException { + T value = Utils.unmarshalFrom(response, typeReference); + JSONRPCError error = value.getError(); + if (error != null) { + throw new A2AServerException(error.getMessage() + (error.getData() != null ? ": " + error.getData() : ""), error); + } + return value; + } +} diff --git a/client/src/main/java/io/a2a/http/JdkA2AHttpClient.java b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/client/JdkA2AHttpClient.java similarity index 99% rename from client/src/main/java/io/a2a/http/JdkA2AHttpClient.java rename to transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/client/JdkA2AHttpClient.java index c3d5907a2..e61d7fab3 100644 --- a/client/src/main/java/io/a2a/http/JdkA2AHttpClient.java +++ b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/client/JdkA2AHttpClient.java @@ -1,4 +1,4 @@ -package io.a2a.http; +package io.a2a.transport.jsonrpc.client; import java.io.IOException; import java.net.URI; diff --git a/client/src/main/java/io/a2a/client/sse/SSEEventListener.java b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/client/sse/SSEEventListener.java similarity index 98% rename from client/src/main/java/io/a2a/client/sse/SSEEventListener.java rename to transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/client/sse/SSEEventListener.java index 8ed0e9aa3..7f4ff19fc 100644 --- a/client/src/main/java/io/a2a/client/sse/SSEEventListener.java +++ b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/client/sse/SSEEventListener.java @@ -1,10 +1,4 @@ -package io.a2a.client.sse; - -import static io.a2a.util.Utils.OBJECT_MAPPER; - -import java.util.concurrent.Future; -import java.util.function.Consumer; -import java.util.logging.Logger; +package io.a2a.transport.jsonrpc.client.sse; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; @@ -12,6 +6,12 @@ import io.a2a.spec.StreamingEventKind; import io.a2a.spec.TaskStatusUpdateEvent; +import java.util.concurrent.Future; +import java.util.function.Consumer; +import java.util.logging.Logger; + +import static io.a2a.util.Utils.OBJECT_MAPPER; + public class SSEEventListener { private static final Logger log = Logger.getLogger(SSEEventListener.class.getName()); private final Consumer eventHandler; diff --git a/transport/jsonrpc/src/main/java/io/a2a/jsonrpc/handler/JSONRPCHandler.java b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/server/handler/JSONRPCHandler.java similarity index 99% rename from transport/jsonrpc/src/main/java/io/a2a/jsonrpc/handler/JSONRPCHandler.java rename to transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/server/handler/JSONRPCHandler.java index b2928dfd5..d40f5795f 100644 --- a/transport/jsonrpc/src/main/java/io/a2a/jsonrpc/handler/JSONRPCHandler.java +++ b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/server/handler/JSONRPCHandler.java @@ -1,4 +1,4 @@ -package io.a2a.jsonrpc.handler; +package io.a2a.transport.jsonrpc.server.handler; import static io.a2a.server.util.async.AsyncUtils.createTubeConfig; import jakarta.enterprise.context.ApplicationScoped; diff --git a/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/client/JsonStreamingMessages.java b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/client/JsonStreamingMessages.java new file mode 100644 index 000000000..7bf46054d --- /dev/null +++ b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/client/JsonStreamingMessages.java @@ -0,0 +1,148 @@ +package io.a2a.transport.jsonrpc.client; + +/** + * Contains JSON strings for testing SSE streaming. + */ +public class JsonStreamingMessages { + + public static final String STREAMING_TASK_EVENT = """ + data: { + "jsonrpc": "2.0", + "id": "1234", + "result": { + "kind": "task", + "id": "task-123", + "contextId": "context-456", + "status": { + "state": "working" + } + } + } + """; + + + public static final String STREAMING_MESSAGE_EVENT = """ + data: { + "jsonrpc": "2.0", + "id": "1234", + "result": { + "kind": "message", + "role": "agent", + "messageId": "msg-123", + "contextId": "context-456", + "parts": [ + { + "kind": "text", + "text": "Hello, world!" + } + ] + } + }"""; + + public static final String STREAMING_STATUS_UPDATE_EVENT = """ + data: { + "jsonrpc": "2.0", + "id": "1234", + "result": { + "taskId": "1", + "contextId": "2", + "status": { + "state": "submitted" + }, + "final": false, + "kind": "status-update" + } + }"""; + + public static final String STREAMING_STATUS_UPDATE_EVENT_FINAL = """ + data: { + "jsonrpc": "2.0", + "id": "1234", + "result": { + "taskId": "1", + "contextId": "2", + "status": { + "state": "completed" + }, + "final": true, + "kind": "status-update" + } + }"""; + + public static final String STREAMING_ARTIFACT_UPDATE_EVENT = """ + data: { + "jsonrpc": "2.0", + "id": "1234", + "result": { + "kind": "artifact-update", + "taskId": "1", + "contextId": "2", + "append": false, + "lastChunk": true, + "artifact": { + "artifactId": "artifact-1", + "parts": [ + { + "kind": "text", + "text": "Why did the chicken cross the road? To get to the other side!" + } + ] + } + } + } + }"""; + + public static final String STREAMING_ERROR_EVENT = """ + data: { + "jsonrpc": "2.0", + "id": "1234", + "error": { + "code": -32602, + "message": "Invalid parameters", + "data": "Missing required field" + } + }"""; + + public static final String SEND_MESSAGE_STREAMING_TEST_REQUEST = """ + { + "jsonrpc": "2.0", + "id": "request-1234", + "method": "message/stream", + "params": { + "message": { + "role": "user", + "parts": [ + { + "kind": "text", + "text": "tell me some jokes" + } + ], + "messageId": "message-1234", + "contextId": "context-1234", + "kind": "message" + }, + "configuration": { + "acceptedOutputModes": ["text"], + "blocking": false + }, + } + }"""; + + static final String SEND_MESSAGE_STREAMING_TEST_RESPONSE = + "event: message\n" + + "data: {\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"id\":\"2\",\"contextId\":\"context-1234\",\"status\":{\"state\":\"completed\"},\"artifacts\":[{\"artifactId\":\"artifact-1\",\"name\":\"joke\",\"parts\":[{\"kind\":\"text\",\"text\":\"Why did the chicken cross the road? To get to the other side!\"}]}],\"metadata\":{},\"kind\":\"task\"}}\n\n"; + + static final String TASK_RESUBSCRIPTION_REQUEST_TEST_RESPONSE = + "event: message\n" + + "data: {\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"id\":\"2\",\"contextId\":\"context-1234\",\"status\":{\"state\":\"completed\"},\"artifacts\":[{\"artifactId\":\"artifact-1\",\"name\":\"joke\",\"parts\":[{\"kind\":\"text\",\"text\":\"Why did the chicken cross the road? To get to the other side!\"}]}],\"metadata\":{},\"kind\":\"task\"}}\n\n"; + + public static final String TASK_RESUBSCRIPTION_TEST_REQUEST = """ + { + "jsonrpc": "2.0", + "id": "request-1234", + "method": "tasks/resubscribe", + "params": { + "id": "task-1234" + } + }"""; +} \ No newline at end of file diff --git a/client/src/test/java/io/a2a/client/sse/SSEEventListenerTest.java b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/client/sse/SSEEventListenerTest.java similarity index 97% rename from client/src/test/java/io/a2a/client/sse/SSEEventListenerTest.java rename to transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/client/sse/SSEEventListenerTest.java index 1fca0ff9c..5e4fc357c 100644 --- a/client/src/test/java/io/a2a/client/sse/SSEEventListenerTest.java +++ b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/client/sse/SSEEventListenerTest.java @@ -1,18 +1,5 @@ -package io.a2a.client.sse; +package io.a2a.transport.jsonrpc.client.sse; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import io.a2a.client.JsonStreamingMessages; import io.a2a.spec.Artifact; import io.a2a.spec.JSONRPCError; import io.a2a.spec.Message; @@ -24,8 +11,18 @@ import io.a2a.spec.TaskStatus; import io.a2a.spec.TaskStatusUpdateEvent; import io.a2a.spec.TextPart; +import io.a2a.transport.jsonrpc.client.JsonStreamingMessages; import org.junit.jupiter.api.Test; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.*; + public class SSEEventListenerTest { @Test diff --git a/transport/jsonrpc/src/test/java/io/a2a/jsonrpc/handler/JSONRPCHandlerTest.java b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/server/handler/JSONRPCHandlerTest.java similarity index 99% rename from transport/jsonrpc/src/test/java/io/a2a/jsonrpc/handler/JSONRPCHandlerTest.java rename to transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/server/handler/JSONRPCHandlerTest.java index 070df7aa1..7707b5bcc 100644 --- a/transport/jsonrpc/src/test/java/io/a2a/jsonrpc/handler/JSONRPCHandlerTest.java +++ b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/server/handler/JSONRPCHandlerTest.java @@ -1,4 +1,4 @@ -package io.a2a.jsonrpc.handler; +package io.a2a.transport.jsonrpc.server.handler; import java.util.ArrayList; import java.util.Collections; @@ -58,6 +58,7 @@ import io.a2a.spec.TaskStatusUpdateEvent; import io.a2a.spec.TextPart; import io.a2a.spec.UnsupportedOperationError; +import io.a2a.transport.jsonrpc.server.handler.JSONRPCHandler; import mutiny.zero.ZeroPublisher; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Disabled; diff --git a/transport/spi/pom.xml b/transport/spi/pom.xml new file mode 100644 index 000000000..1e284687f --- /dev/null +++ b/transport/spi/pom.xml @@ -0,0 +1,29 @@ + + + 4.0.0 + + + io.github.a2asdk + a2a-java-sdk-parent + 0.2.6.Beta1-SNAPSHOT + ../../pom.xml + + a2a-java-sdk-transport-spi + + jar + + Java SDK A2A Transport: SPI + Java SDK for the Agent2Agent Protocol (A2A) - SPI + + + + io.github.a2asdk + a2a-java-sdk-spec + ${project.version} + + + + + diff --git a/transport/spi/src/main/java/io/a2a/transport/spi/client/Transport.java b/transport/spi/src/main/java/io/a2a/transport/spi/client/Transport.java new file mode 100644 index 000000000..9ab643b56 --- /dev/null +++ b/transport/spi/src/main/java/io/a2a/transport/spi/client/Transport.java @@ -0,0 +1,109 @@ +package io.a2a.transport.spi.client; + +import io.a2a.spec.*; + +import java.util.List; +import java.util.function.Consumer; + +public interface Transport { + + /** + * Send a message to the remote agent. + * + * @param requestId the request ID to use + * @param messageSendParams the parameters for the message to be sent + * @return the response, may contain a message or a task + * @throws A2AServerException if sending the message fails for any reason + */ + EventKind sendMessage(String requestId, MessageSendParams messageSendParams) throws A2AServerException; + + /** + * Retrieve the generated artifacts for a task. + * + * @param requestId the request ID to use + * @param taskQueryParams the params for the task to be queried + * @return the response containing the task + * @throws A2AServerException if retrieving the task fails for any reason + */ + Task getTask(String requestId, TaskQueryParams taskQueryParams) throws A2AServerException; + + /** + * Cancel a task that was previously submitted to the A2A server. + * + * @param requestId the request ID to use + * @param taskIdParams the params for the task to be cancelled + * @return the response indicating if the task was cancelled + * @throws A2AServerException if retrieving the task fails for any reason + */ + Task cancelTask(String requestId, TaskIdParams taskIdParams) throws A2AServerException; + + /** + * Get the push notification configuration for a task. + * + * @param requestId the request ID to use + * @param getTaskPushNotificationConfigParams the params for the task + * @return the response containing the push notification configuration + * @throws A2AServerException if getting the push notification configuration fails for any reason + */ + TaskPushNotificationConfig getTaskPushNotificationConfig(String requestId, GetTaskPushNotificationConfigParams getTaskPushNotificationConfigParams) throws A2AServerException; + + /** + * Set push notification configuration for a task. + * + * @param requestId the request ID to use + * @param taskId the task ID + * @param pushNotificationConfig the push notification configuration + * @return the response indicating whether setting the task push notification configuration succeeded + * @throws A2AServerException if setting the push notification configuration fails for any reason + */ + TaskPushNotificationConfig setTaskPushNotificationConfig(String requestId, String taskId, + PushNotificationConfig pushNotificationConfig) throws A2AServerException; + + /** + * Retrieves the push notification configurations for a specified task. + * + * @param requestId the request ID to use + * @param listTaskPushNotificationConfigParams the params for retrieving the push notification configuration + * @return the response containing the push notification configuration + * @throws A2AServerException if getting the push notification configuration fails for any reason + */ + List listTaskPushNotificationConfig(String requestId, + ListTaskPushNotificationConfigParams listTaskPushNotificationConfigParams) throws A2AServerException; + + /** + * Delete the push notification configuration for a specified task. + * + * @param requestId the request ID to use + * @param deleteTaskPushNotificationConfigParams the params for deleting the push notification configuration + * @throws A2AServerException if deleting the push notification configuration fails for any reason + */ + void deleteTaskPushNotificationConfig(String requestId, + DeleteTaskPushNotificationConfigParams deleteTaskPushNotificationConfigParams) throws A2AServerException; + + /** + * Send a streaming message to the remote agent. + * + * @param requestId the request ID to use + * @param messageSendParams the parameters for the message to be sent + * @param eventHandler a consumer that will be invoked for each event received from the remote agent + * @param errorHandler a consumer that will be invoked if the remote agent returns an error + * @param failureHandler a consumer that will be invoked if a failure occurs when processing events + * @throws A2AServerException if sending the streaming message fails for any reason + */ + void sendStreamingMessage(String requestId, MessageSendParams messageSendParams, Consumer eventHandler, + Consumer errorHandler, Runnable failureHandler) throws A2AServerException; + + /** + * Resubscribe to an ongoing task. + * + * @param requestId the request ID to use + * @param taskIdParams the params for the task to resubscribe to + * @param eventHandler a consumer that will be invoked for each event received from the remote agent + * @param errorHandler a consumer that will be invoked if the remote agent returns an error + * @param failureHandler a consumer that will be invoked if a failure occurs when processing events + * @throws A2AServerException if resubscribing to the task fails for any reason + */ + void resubscribeToTask(String requestId, TaskIdParams taskIdParams, Consumer eventHandler, + Consumer errorHandler, Runnable failureHandler) throws A2AServerException; + +}