diff --git a/client/pom.xml b/client/pom.xml
index 0a71440b4..bdf20a9c4 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -29,7 +29,7 @@
${project.groupId}
- a2a-java-sdk-spec-grpc
+ a2a-java-sdk-transport-jsonrpc
${project.version}
diff --git a/client/src/main/java/io/a2a/A2A.java b/client/src/main/java/io/a2a/A2A.java
index c8dca80e4..d43ee1738 100644
--- a/client/src/main/java/io/a2a/A2A.java
+++ b/client/src/main/java/io/a2a/A2A.java
@@ -4,8 +4,8 @@
import java.util.Map;
import io.a2a.client.A2ACardResolver;
-import io.a2a.http.A2AHttpClient;
-import io.a2a.http.JdkA2AHttpClient;
+import io.a2a.transport.jsonrpc.client.A2AHttpClient;
+import io.a2a.transport.jsonrpc.client.JdkA2AHttpClient;
import io.a2a.spec.A2AClientError;
import io.a2a.spec.A2AClientJSONError;
import io.a2a.spec.AgentCard;
diff --git a/client/src/main/java/io/a2a/client/A2ACardResolver.java b/client/src/main/java/io/a2a/client/A2ACardResolver.java
index 88d1e351f..ab45039d6 100644
--- a/client/src/main/java/io/a2a/client/A2ACardResolver.java
+++ b/client/src/main/java/io/a2a/client/A2ACardResolver.java
@@ -9,8 +9,8 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
-import io.a2a.http.A2AHttpClient;
-import io.a2a.http.A2AHttpResponse;
+import io.a2a.transport.jsonrpc.client.A2AHttpClient;
+import io.a2a.transport.jsonrpc.client.A2AHttpResponse;
import io.a2a.spec.A2AClientError;
import io.a2a.spec.A2AClientJSONError;
import io.a2a.spec.AgentCard;
diff --git a/client/src/main/java/io/a2a/client/A2AClient.java b/client/src/main/java/io/a2a/client/A2AClient.java
index 80f8c401c..a61c86da3 100644
--- a/client/src/main/java/io/a2a/client/A2AClient.java
+++ b/client/src/main/java/io/a2a/client/A2AClient.java
@@ -1,68 +1,24 @@
package io.a2a.client;
-import static io.a2a.util.Assert.checkNotNullParam;
+import io.a2a.A2A;
+import io.a2a.spec.*;
+import io.a2a.transport.jsonrpc.client.A2AHttpClient;
+import io.a2a.transport.jsonrpc.client.JSONRPCTransport;
+import io.a2a.transport.jsonrpc.client.JdkA2AHttpClient;
+import io.a2a.transport.spi.client.Transport;
-import java.io.IOException;
+import java.util.List;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
-import io.a2a.client.sse.SSEEventListener;
-import io.a2a.http.A2AHttpClient;
-import io.a2a.http.A2AHttpResponse;
-import io.a2a.http.JdkA2AHttpClient;
-import io.a2a.A2A;
-import io.a2a.spec.A2AClientError;
-import io.a2a.spec.A2AClientJSONError;
-import io.a2a.spec.A2AServerException;
-import io.a2a.spec.AgentCard;
-import io.a2a.spec.CancelTaskRequest;
-import io.a2a.spec.CancelTaskResponse;
-import io.a2a.spec.DeleteTaskPushNotificationConfigParams;
-import io.a2a.spec.DeleteTaskPushNotificationConfigRequest;
-import io.a2a.spec.DeleteTaskPushNotificationConfigResponse;
-import io.a2a.spec.GetTaskPushNotificationConfigParams;
-import io.a2a.spec.GetTaskPushNotificationConfigRequest;
-import io.a2a.spec.GetTaskPushNotificationConfigResponse;
-import io.a2a.spec.GetTaskRequest;
-import io.a2a.spec.GetTaskResponse;
-import io.a2a.spec.JSONRPCError;
-import io.a2a.spec.JSONRPCMessage;
-import io.a2a.spec.JSONRPCResponse;
-import io.a2a.spec.ListTaskPushNotificationConfigParams;
-import io.a2a.spec.ListTaskPushNotificationConfigRequest;
-import io.a2a.spec.ListTaskPushNotificationConfigResponse;
-import io.a2a.spec.MessageSendParams;
-import io.a2a.spec.PushNotificationConfig;
-import io.a2a.spec.SendMessageRequest;
-import io.a2a.spec.SendMessageResponse;
-import io.a2a.spec.SendStreamingMessageRequest;
-import io.a2a.spec.SetTaskPushNotificationConfigRequest;
-import io.a2a.spec.SetTaskPushNotificationConfigResponse;
-import io.a2a.spec.StreamingEventKind;
-import io.a2a.spec.TaskIdParams;
-import io.a2a.spec.TaskPushNotificationConfig;
-import io.a2a.spec.TaskQueryParams;
-import io.a2a.spec.TaskResubscriptionRequest;
-import io.a2a.util.Utils;
+import static io.a2a.util.Assert.checkNotNullParam;
/**
* An A2A client.
*/
public class A2AClient {
- private static final TypeReference SEND_MESSAGE_RESPONSE_REFERENCE = new TypeReference<>() {};
- private static final TypeReference GET_TASK_RESPONSE_REFERENCE = new TypeReference<>() {};
- private static final TypeReference CANCEL_TASK_RESPONSE_REFERENCE = new TypeReference<>() {};
- private static final TypeReference GET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE = new TypeReference<>() {};
- private static final TypeReference SET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE = new TypeReference<>() {};
- private static final TypeReference LIST_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE = new TypeReference<>() {};
- private static final TypeReference DELETE_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE = new TypeReference<>() {};
- private final A2AHttpClient httpClient;
- private final String agentUrl;
+ private Transport transport;
private AgentCard agentCard;
@@ -74,8 +30,7 @@ public class A2AClient {
public A2AClient(AgentCard agentCard) {
checkNotNullParam("agentCard", agentCard);
this.agentCard = agentCard;
- this.agentUrl = agentCard.url();
- this.httpClient = new JdkA2AHttpClient();
+ this.transport = new JSONRPCTransport(agentCard.url(), new JdkA2AHttpClient());
}
/**
@@ -85,8 +40,7 @@ public A2AClient(AgentCard agentCard) {
*/
public A2AClient(String agentUrl) {
checkNotNullParam("agentUrl", agentUrl);
- this.agentUrl = agentUrl;
- this.httpClient = new JdkA2AHttpClient();
+ this.transport = new JSONRPCTransport(agentUrl, new JdkA2AHttpClient());
}
/**
@@ -145,7 +99,7 @@ public AgentCard getAgentCard(String relativeCardPath, Map authH
* @return the response, may contain a message or a task
* @throws A2AServerException if sending the message fails for any reason
*/
- public SendMessageResponse sendMessage(MessageSendParams messageSendParams) throws A2AServerException {
+ public EventKind sendMessage(MessageSendParams messageSendParams) throws A2AServerException {
return sendMessage(null, messageSendParams);
}
@@ -157,24 +111,8 @@ public SendMessageResponse sendMessage(MessageSendParams messageSendParams) thro
* @return the response, may contain a message or a task
* @throws A2AServerException if sending the message fails for any reason
*/
- public SendMessageResponse sendMessage(String requestId, MessageSendParams messageSendParams) throws A2AServerException {
- SendMessageRequest.Builder sendMessageRequestBuilder = new SendMessageRequest.Builder()
- .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
- .method(SendMessageRequest.METHOD)
- .params(messageSendParams);
-
- if (requestId != null) {
- sendMessageRequestBuilder.id(requestId);
- }
-
- SendMessageRequest sendMessageRequest = sendMessageRequestBuilder.build();
-
- try {
- String httpResponseBody = sendPostRequest(sendMessageRequest);
- return unmarshalResponse(httpResponseBody, SEND_MESSAGE_RESPONSE_REFERENCE);
- } catch (IOException | InterruptedException e) {
- throw new A2AServerException("Failed to send message: " + e, e.getCause());
- }
+ public EventKind sendMessage(String requestId, MessageSendParams messageSendParams) throws A2AServerException {
+ return transport.sendMessage(requestId, messageSendParams);
}
/**
@@ -185,7 +123,7 @@ public SendMessageResponse sendMessage(String requestId, MessageSendParams messa
* @return the response containing the task
* @throws A2AServerException if retrieving the task fails for any reason
*/
- public GetTaskResponse getTask(String id) throws A2AServerException {
+ public Task getTask(String id) throws A2AServerException {
return getTask(null, new TaskQueryParams(id));
}
@@ -197,7 +135,7 @@ public GetTaskResponse getTask(String id) throws A2AServerException {
* @return the response containing the task
* @throws A2AServerException if retrieving the task fails for any reason
*/
- public GetTaskResponse getTask(TaskQueryParams taskQueryParams) throws A2AServerException {
+ public Task getTask(TaskQueryParams taskQueryParams) throws A2AServerException {
return getTask(null, taskQueryParams);
}
@@ -209,24 +147,8 @@ public GetTaskResponse getTask(TaskQueryParams taskQueryParams) throws A2AServer
* @return the response containing the task
* @throws A2AServerException if retrieving the task fails for any reason
*/
- public GetTaskResponse getTask(String requestId, TaskQueryParams taskQueryParams) throws A2AServerException {
- GetTaskRequest.Builder getTaskRequestBuilder = new GetTaskRequest.Builder()
- .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
- .method(GetTaskRequest.METHOD)
- .params(taskQueryParams);
-
- if (requestId != null) {
- getTaskRequestBuilder.id(requestId);
- }
-
- GetTaskRequest getTaskRequest = getTaskRequestBuilder.build();
-
- try {
- String httpResponseBody = sendPostRequest(getTaskRequest);
- return unmarshalResponse(httpResponseBody, GET_TASK_RESPONSE_REFERENCE);
- } catch (IOException | InterruptedException e) {
- throw new A2AServerException("Failed to get task: " + e, e.getCause());
- }
+ public Task getTask(String requestId, TaskQueryParams taskQueryParams) throws A2AServerException {
+ return transport.getTask(requestId, taskQueryParams);
}
/**
@@ -236,7 +158,7 @@ public GetTaskResponse getTask(String requestId, TaskQueryParams taskQueryParams
* @return the response indicating if the task was cancelled
* @throws A2AServerException if cancelling the task fails for any reason
*/
- public CancelTaskResponse cancelTask(String id) throws A2AServerException {
+ public Task cancelTask(String id) throws A2AServerException {
return cancelTask(null, new TaskIdParams(id));
}
@@ -247,7 +169,7 @@ public CancelTaskResponse cancelTask(String id) throws A2AServerException {
* @return the response indicating if the task was cancelled
* @throws A2AServerException if cancelling the task fails for any reason
*/
- public CancelTaskResponse cancelTask(TaskIdParams taskIdParams) throws A2AServerException {
+ public Task cancelTask(TaskIdParams taskIdParams) throws A2AServerException {
return cancelTask(null, taskIdParams);
}
@@ -259,24 +181,8 @@ public CancelTaskResponse cancelTask(TaskIdParams taskIdParams) throws A2AServer
* @return the response indicating if the task was cancelled
* @throws A2AServerException if retrieving the task fails for any reason
*/
- public CancelTaskResponse cancelTask(String requestId, TaskIdParams taskIdParams) throws A2AServerException {
- CancelTaskRequest.Builder cancelTaskRequestBuilder = new CancelTaskRequest.Builder()
- .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
- .method(CancelTaskRequest.METHOD)
- .params(taskIdParams);
-
- if (requestId != null) {
- cancelTaskRequestBuilder.id(requestId);
- }
-
- CancelTaskRequest cancelTaskRequest = cancelTaskRequestBuilder.build();
-
- try {
- String httpResponseBody = sendPostRequest(cancelTaskRequest);
- return unmarshalResponse(httpResponseBody, CANCEL_TASK_RESPONSE_REFERENCE);
- } catch (IOException | InterruptedException e) {
- throw new A2AServerException("Failed to cancel task: " + e, e.getCause());
- }
+ public Task cancelTask(String requestId, TaskIdParams taskIdParams) throws A2AServerException {
+ return transport.cancelTask(requestId, taskIdParams);
}
/**
@@ -286,7 +192,7 @@ public CancelTaskResponse cancelTask(String requestId, TaskIdParams taskIdParams
* @return the response containing the push notification configuration
* @throws A2AServerException if getting the push notification configuration fails for any reason
*/
- public GetTaskPushNotificationConfigResponse getTaskPushNotificationConfig(String taskId) throws A2AServerException {
+ public TaskPushNotificationConfig getTaskPushNotificationConfig(String taskId) throws A2AServerException {
return getTaskPushNotificationConfig(null, new GetTaskPushNotificationConfigParams(taskId));
}
@@ -298,7 +204,7 @@ public GetTaskPushNotificationConfigResponse getTaskPushNotificationConfig(Strin
* @return the response containing the push notification configuration
* @throws A2AServerException if getting the push notification configuration fails for any reason
*/
- public GetTaskPushNotificationConfigResponse getTaskPushNotificationConfig(String taskId, String pushNotificationConfigId) throws A2AServerException {
+ public TaskPushNotificationConfig getTaskPushNotificationConfig(String taskId, String pushNotificationConfigId) throws A2AServerException {
return getTaskPushNotificationConfig(null, new GetTaskPushNotificationConfigParams(taskId, pushNotificationConfigId));
}
@@ -309,7 +215,7 @@ public GetTaskPushNotificationConfigResponse getTaskPushNotificationConfig(Strin
* @return the response containing the push notification configuration
* @throws A2AServerException if getting the push notification configuration fails for any reason
*/
- public GetTaskPushNotificationConfigResponse getTaskPushNotificationConfig(GetTaskPushNotificationConfigParams getTaskPushNotificationConfigParams) throws A2AServerException {
+ public TaskPushNotificationConfig getTaskPushNotificationConfig(GetTaskPushNotificationConfigParams getTaskPushNotificationConfigParams) throws A2AServerException {
return getTaskPushNotificationConfig(null, getTaskPushNotificationConfigParams);
}
@@ -321,24 +227,8 @@ public GetTaskPushNotificationConfigResponse getTaskPushNotificationConfig(GetTa
* @return the response containing the push notification configuration
* @throws A2AServerException if getting the push notification configuration fails for any reason
*/
- public GetTaskPushNotificationConfigResponse getTaskPushNotificationConfig(String requestId, GetTaskPushNotificationConfigParams getTaskPushNotificationConfigParams) throws A2AServerException {
- GetTaskPushNotificationConfigRequest.Builder getTaskPushNotificationRequestBuilder = new GetTaskPushNotificationConfigRequest.Builder()
- .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
- .method(GetTaskPushNotificationConfigRequest.METHOD)
- .params(getTaskPushNotificationConfigParams);
-
- if (requestId != null) {
- getTaskPushNotificationRequestBuilder.id(requestId);
- }
-
- GetTaskPushNotificationConfigRequest getTaskPushNotificationRequest = getTaskPushNotificationRequestBuilder.build();
-
- try {
- String httpResponseBody = sendPostRequest(getTaskPushNotificationRequest);
- return unmarshalResponse(httpResponseBody, GET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE);
- } catch (IOException | InterruptedException e) {
- throw new A2AServerException("Failed to get task push notification config: " + e, e.getCause());
- }
+ public TaskPushNotificationConfig getTaskPushNotificationConfig(String requestId, GetTaskPushNotificationConfigParams getTaskPushNotificationConfigParams) throws A2AServerException {
+ return transport.getTaskPushNotificationConfig(requestId, getTaskPushNotificationConfigParams);
}
/**
@@ -349,7 +239,7 @@ public GetTaskPushNotificationConfigResponse getTaskPushNotificationConfig(Strin
* @return the response indicating whether setting the task push notification configuration succeeded
* @throws A2AServerException if setting the push notification configuration fails for any reason
*/
- public SetTaskPushNotificationConfigResponse setTaskPushNotificationConfig(String taskId,
+ public TaskPushNotificationConfig setTaskPushNotificationConfig(String taskId,
PushNotificationConfig pushNotificationConfig) throws A2AServerException {
return setTaskPushNotificationConfig(null, taskId, pushNotificationConfig);
}
@@ -363,25 +253,9 @@ public SetTaskPushNotificationConfigResponse setTaskPushNotificationConfig(Strin
* @return the response indicating whether setting the task push notification configuration succeeded
* @throws A2AServerException if setting the push notification configuration fails for any reason
*/
- public SetTaskPushNotificationConfigResponse setTaskPushNotificationConfig(String requestId, String taskId,
+ public TaskPushNotificationConfig setTaskPushNotificationConfig(String requestId, String taskId,
PushNotificationConfig pushNotificationConfig) throws A2AServerException {
- SetTaskPushNotificationConfigRequest.Builder setTaskPushNotificationRequestBuilder = new SetTaskPushNotificationConfigRequest.Builder()
- .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
- .method(SetTaskPushNotificationConfigRequest.METHOD)
- .params(new TaskPushNotificationConfig(taskId, pushNotificationConfig));
-
- if (requestId != null) {
- setTaskPushNotificationRequestBuilder.id(requestId);
- }
-
- SetTaskPushNotificationConfigRequest setTaskPushNotificationRequest = setTaskPushNotificationRequestBuilder.build();
-
- try {
- String httpResponseBody = sendPostRequest(setTaskPushNotificationRequest);
- return unmarshalResponse(httpResponseBody, SET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE);
- } catch (IOException | InterruptedException e) {
- throw new A2AServerException("Failed to set task push notification config: " + e, e.getCause());
- }
+ return transport.setTaskPushNotificationConfig(requestId, taskId, pushNotificationConfig);
}
/**
@@ -392,7 +266,7 @@ public SetTaskPushNotificationConfigResponse setTaskPushNotificationConfig(Strin
* @return the response containing the push notification configuration
* @throws A2AServerException if getting the push notification configuration fails for any reason
*/
- public ListTaskPushNotificationConfigResponse listTaskPushNotificationConfig(String requestId, String taskId) throws A2AServerException {
+ public List listTaskPushNotificationConfig(String requestId, String taskId) throws A2AServerException {
return listTaskPushNotificationConfig(requestId, new ListTaskPushNotificationConfigParams(taskId));
}
@@ -403,7 +277,7 @@ public ListTaskPushNotificationConfigResponse listTaskPushNotificationConfig(Str
* @return the response containing the push notification configuration
* @throws A2AServerException if getting the push notification configuration fails for any reason
*/
- public ListTaskPushNotificationConfigResponse listTaskPushNotificationConfig(String taskId) throws A2AServerException {
+ public List listTaskPushNotificationConfig(String taskId) throws A2AServerException {
return listTaskPushNotificationConfig(null, new ListTaskPushNotificationConfigParams(taskId));
}
@@ -414,7 +288,7 @@ public ListTaskPushNotificationConfigResponse listTaskPushNotificationConfig(Str
* @return the response containing the push notification configuration
* @throws A2AServerException if getting the push notification configuration fails for any reason
*/
- public ListTaskPushNotificationConfigResponse listTaskPushNotificationConfig(ListTaskPushNotificationConfigParams listTaskPushNotificationConfigParams) throws A2AServerException {
+ public List listTaskPushNotificationConfig(ListTaskPushNotificationConfigParams listTaskPushNotificationConfigParams) throws A2AServerException {
return listTaskPushNotificationConfig(null, listTaskPushNotificationConfigParams);
}
@@ -426,25 +300,9 @@ public ListTaskPushNotificationConfigResponse listTaskPushNotificationConfig(Lis
* @return the response containing the push notification configuration
* @throws A2AServerException if getting the push notification configuration fails for any reason
*/
- public ListTaskPushNotificationConfigResponse listTaskPushNotificationConfig(String requestId,
+ public List listTaskPushNotificationConfig(String requestId,
ListTaskPushNotificationConfigParams listTaskPushNotificationConfigParams) throws A2AServerException {
- ListTaskPushNotificationConfigRequest.Builder listTaskPushNotificationRequestBuilder = new ListTaskPushNotificationConfigRequest.Builder()
- .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
- .method(ListTaskPushNotificationConfigRequest.METHOD)
- .params(listTaskPushNotificationConfigParams);
-
- if (requestId != null) {
- listTaskPushNotificationRequestBuilder.id(requestId);
- }
-
- ListTaskPushNotificationConfigRequest listTaskPushNotificationRequest = listTaskPushNotificationRequestBuilder.build();
-
- try {
- String httpResponseBody = sendPostRequest(listTaskPushNotificationRequest);
- return unmarshalResponse(httpResponseBody, LIST_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE);
- } catch (IOException | InterruptedException e) {
- throw new A2AServerException("Failed to list task push notification config: " + e, e.getCause());
- }
+ return transport.listTaskPushNotificationConfig(requestId, listTaskPushNotificationConfigParams);
}
/**
@@ -456,9 +314,9 @@ public ListTaskPushNotificationConfigResponse listTaskPushNotificationConfig(Str
* @return the response
* @throws A2AServerException if deleting the push notification configuration fails for any reason
*/
- public DeleteTaskPushNotificationConfigResponse deleteTaskPushNotificationConfig(String requestId, String taskId,
+ public void deleteTaskPushNotificationConfig(String requestId, String taskId,
String pushNotificationConfigId) throws A2AServerException {
- return deleteTaskPushNotificationConfig(requestId, new DeleteTaskPushNotificationConfigParams(taskId, pushNotificationConfigId));
+ deleteTaskPushNotificationConfig(requestId, new DeleteTaskPushNotificationConfigParams(taskId, pushNotificationConfigId));
}
/**
@@ -469,9 +327,9 @@ public DeleteTaskPushNotificationConfigResponse deleteTaskPushNotificationConfig
* @return the response
* @throws A2AServerException if deleting the push notification configuration fails for any reason
*/
- public DeleteTaskPushNotificationConfigResponse deleteTaskPushNotificationConfig(String taskId,
+ public void deleteTaskPushNotificationConfig(String taskId,
String pushNotificationConfigId) throws A2AServerException {
- return deleteTaskPushNotificationConfig(null, new DeleteTaskPushNotificationConfigParams(taskId, pushNotificationConfigId));
+ deleteTaskPushNotificationConfig(null, new DeleteTaskPushNotificationConfigParams(taskId, pushNotificationConfigId));
}
/**
@@ -481,8 +339,8 @@ public DeleteTaskPushNotificationConfigResponse deleteTaskPushNotificationConfig
* @return the response
* @throws A2AServerException if deleting the push notification configuration fails for any reason
*/
- public DeleteTaskPushNotificationConfigResponse deleteTaskPushNotificationConfig(DeleteTaskPushNotificationConfigParams deleteTaskPushNotificationConfigParams) throws A2AServerException {
- return deleteTaskPushNotificationConfig(null, deleteTaskPushNotificationConfigParams);
+ public void deleteTaskPushNotificationConfig(DeleteTaskPushNotificationConfigParams deleteTaskPushNotificationConfigParams) throws A2AServerException {
+ deleteTaskPushNotificationConfig(null, deleteTaskPushNotificationConfigParams);
}
/**
@@ -493,25 +351,9 @@ public DeleteTaskPushNotificationConfigResponse deleteTaskPushNotificationConfig
* @return the response
* @throws A2AServerException if deleting the push notification configuration fails for any reason
*/
- public DeleteTaskPushNotificationConfigResponse deleteTaskPushNotificationConfig(String requestId,
+ public void deleteTaskPushNotificationConfig(String requestId,
DeleteTaskPushNotificationConfigParams deleteTaskPushNotificationConfigParams) throws A2AServerException {
- DeleteTaskPushNotificationConfigRequest.Builder deleteTaskPushNotificationRequestBuilder = new DeleteTaskPushNotificationConfigRequest.Builder()
- .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
- .method(DeleteTaskPushNotificationConfigRequest.METHOD)
- .params(deleteTaskPushNotificationConfigParams);
-
- if (requestId != null) {
- deleteTaskPushNotificationRequestBuilder.id(requestId);
- }
-
- DeleteTaskPushNotificationConfigRequest deleteTaskPushNotificationRequest = deleteTaskPushNotificationRequestBuilder.build();
-
- try {
- String httpResponseBody = sendPostRequest(deleteTaskPushNotificationRequest);
- return unmarshalResponse(httpResponseBody, DELETE_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE);
- } catch (IOException | InterruptedException e) {
- throw new A2AServerException("Failed to delete task push notification config: " + e, e.getCause());
- }
+ transport.deleteTaskPushNotificationConfig(requestId, deleteTaskPushNotificationConfigParams);
}
/**
@@ -545,32 +387,7 @@ public void sendStreamingMessage(String requestId, MessageSendParams messageSend
checkNotNullParam("errorHandler", errorHandler);
checkNotNullParam("failureHandler", failureHandler);
- SendStreamingMessageRequest.Builder sendStreamingMessageRequestBuilder = new SendStreamingMessageRequest.Builder()
- .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
- .method(SendStreamingMessageRequest.METHOD)
- .params(messageSendParams);
-
- if (requestId != null) {
- sendStreamingMessageRequestBuilder.id(requestId);
- }
-
- AtomicReference> ref = new AtomicReference<>();
- SSEEventListener sseEventListener = new SSEEventListener(eventHandler, errorHandler, failureHandler);
- SendStreamingMessageRequest sendStreamingMessageRequest = sendStreamingMessageRequestBuilder.build();
- try {
- A2AHttpClient.PostBuilder builder = createPostBuilder(sendStreamingMessageRequest);
- ref.set(builder.postAsyncSSE(
- msg -> sseEventListener.onMessage(msg, ref.get()),
- throwable -> sseEventListener.onError(throwable, ref.get()),
- () -> {
- // We don't need to do anything special on completion
- }));
-
- } catch (IOException e) {
- throw new A2AServerException("Failed to send streaming message request: " + e, e.getCause());
- } catch (InterruptedException e) {
- throw new A2AServerException("Send streaming message request timed out: " + e, e.getCause());
- }
+ transport.sendStreamingMessage(requestId, messageSendParams, eventHandler, errorHandler, failureHandler);
}
/**
@@ -604,58 +421,6 @@ public void resubscribeToTask(String requestId, TaskIdParams taskIdParams, Consu
checkNotNullParam("errorHandler", errorHandler);
checkNotNullParam("failureHandler", failureHandler);
- TaskResubscriptionRequest.Builder taskResubscriptionRequestBuilder = new TaskResubscriptionRequest.Builder()
- .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
- .method(TaskResubscriptionRequest.METHOD)
- .params(taskIdParams);
-
- if (requestId != null) {
- taskResubscriptionRequestBuilder.id(requestId);
- }
-
- AtomicReference> ref = new AtomicReference<>();
- SSEEventListener sseEventListener = new SSEEventListener(eventHandler, errorHandler, failureHandler);
- TaskResubscriptionRequest taskResubscriptionRequest = taskResubscriptionRequestBuilder.build();
- try {
- A2AHttpClient.PostBuilder builder = createPostBuilder(taskResubscriptionRequest);
- ref.set(builder.postAsyncSSE(
- msg -> sseEventListener.onMessage(msg, ref.get()),
- throwable -> sseEventListener.onError(throwable, ref.get()),
- () -> {
- // We don't need to do anything special on completion
- }));
-
- } catch (IOException e) {
- throw new A2AServerException("Failed to send task resubscription request: " + e, e.getCause());
- } catch (InterruptedException e) {
- throw new A2AServerException("Task resubscription request timed out: " + e, e.getCause());
- }
- }
-
- private String sendPostRequest(Object value) throws IOException, InterruptedException {
- A2AHttpClient.PostBuilder builder = createPostBuilder(value);
- A2AHttpResponse response = builder.post();
- if (!response.success()) {
- throw new IOException("Request failed " + response.status());
- }
- return response.body();
- }
-
- private A2AHttpClient.PostBuilder createPostBuilder(Object value) throws JsonProcessingException {
- return httpClient.createPost()
- .url(agentUrl)
- .addHeader("Content-Type", "application/json")
- .body(Utils.OBJECT_MAPPER.writeValueAsString(value));
-
- }
-
- private T unmarshalResponse(String response, TypeReference typeReference)
- throws A2AServerException, JsonProcessingException {
- T value = Utils.unmarshalFrom(response, typeReference);
- JSONRPCError error = value.getError();
- if (error != null) {
- throw new A2AServerException(error.getMessage() + (error.getData() != null ? ": " + error.getData() : ""), error);
- }
- return value;
+ transport.resubscribeToTask(requestId, taskIdParams, eventHandler, errorHandler, failureHandler);
}
}
diff --git a/client/src/main/java/io/a2a/client/A2AGrpcClient.java b/client/src/main/java/io/a2a/client/A2AGrpcClient.java
deleted file mode 100644
index 661adc228..000000000
--- a/client/src/main/java/io/a2a/client/A2AGrpcClient.java
+++ /dev/null
@@ -1,200 +0,0 @@
-package io.a2a.client;
-
-import static io.a2a.grpc.A2AServiceGrpc.A2AServiceBlockingV2Stub;
-import static io.a2a.grpc.A2AServiceGrpc.A2AServiceStub;
-import static io.a2a.grpc.utils.ProtoUtils.FromProto;
-import static io.a2a.grpc.utils.ProtoUtils.ToProto;
-import static io.a2a.util.Assert.checkNotNullParam;
-
-import java.util.function.Consumer;
-
-import io.a2a.client.sse.SSEStreamObserver;
-import io.a2a.grpc.A2AServiceGrpc;
-import io.a2a.grpc.CancelTaskRequest;
-import io.a2a.grpc.CreateTaskPushNotificationConfigRequest;
-import io.a2a.grpc.GetTaskPushNotificationConfigRequest;
-import io.a2a.grpc.GetTaskRequest;
-import io.a2a.grpc.SendMessageRequest;
-import io.a2a.grpc.SendMessageResponse;
-import io.a2a.grpc.StreamResponse;
-import io.a2a.grpc.utils.ProtoUtils;
-import io.a2a.spec.A2AServerException;
-import io.a2a.spec.AgentCard;
-import io.a2a.spec.EventKind;
-import io.a2a.spec.GetTaskPushNotificationConfigParams;
-import io.a2a.spec.MessageSendParams;
-import io.a2a.spec.StreamingEventKind;
-import io.a2a.spec.Task;
-import io.a2a.spec.TaskIdParams;
-import io.a2a.spec.TaskPushNotificationConfig;
-import io.a2a.spec.TaskQueryParams;
-import io.grpc.Channel;
-import io.grpc.StatusRuntimeException;
-import io.grpc.stub.StreamObserver;
-
-/**
- * A2A Client for interacting with an A2A agent via gRPC.
- */
-public class A2AGrpcClient {
-
- private A2AServiceBlockingV2Stub blockingStub;
- private A2AServiceStub asyncStub;
- private AgentCard agentCard;
-
- /**
- * Create an A2A client for interacting with an A2A agent via gRPC.
- *
- * @param channel the gRPC channel
- * @param agentCard the agent card for the A2A server this client will be communicating with
- */
- public A2AGrpcClient(Channel channel, AgentCard agentCard) {
- checkNotNullParam("channel", channel);
- checkNotNullParam("agentCard", agentCard);
- this.asyncStub = A2AServiceGrpc.newStub(channel);
- this.blockingStub = A2AServiceGrpc.newBlockingV2Stub(channel);
- this.agentCard = agentCard;
- }
-
- /**
- * Send a message to the remote agent.
- *
- * @param messageSendParams the parameters for the message to be sent
- * @return the response, may be a message or a task
- * @throws A2AServerException if sending the message fails for any reason
- */
- public EventKind sendMessage(MessageSendParams messageSendParams) throws A2AServerException {
- SendMessageRequest request = createGrpcSendMessageRequestFromMessageSendParams(messageSendParams);
- try {
- SendMessageResponse response = blockingStub.sendMessage(request);
- if (response.hasMsg()) {
- return FromProto.message(response.getMsg());
- } else if (response.hasTask()) {
- return FromProto.task(response.getTask());
- } else {
- throw new A2AServerException("Server response did not contain a message or task");
- }
- } catch (StatusRuntimeException e) {
- throw new A2AServerException("Failed to send message: " + e, e);
- }
- }
-
- /**
- * Retrieves the current state and history of a specific task.
- *
- * @param taskQueryParams the params for the task to be queried
- * @return the task
- * @throws A2AServerException if retrieving the task fails for any reason
- */
- public Task getTask(TaskQueryParams taskQueryParams) throws A2AServerException {
- GetTaskRequest.Builder requestBuilder = GetTaskRequest.newBuilder();
- requestBuilder.setName("tasks/" + taskQueryParams.id());
- if (taskQueryParams.historyLength() != null) {
- requestBuilder.setHistoryLength(taskQueryParams.historyLength());
- }
- GetTaskRequest getTaskRequest = requestBuilder.build();
- try {
- return FromProto.task(blockingStub.getTask(getTaskRequest));
- } catch (StatusRuntimeException e) {
- throw new A2AServerException("Failed to get task: " + e, e);
- }
- }
-
- /**
- * Cancel a task that was previously submitted to the A2A server.
- *
- * @param taskIdParams the params for the task to be cancelled
- * @return the updated task
- * @throws A2AServerException if cancelling the task fails for any reason
- */
- public Task cancelTask(TaskIdParams taskIdParams) throws A2AServerException {
- CancelTaskRequest cancelTaskRequest = CancelTaskRequest.newBuilder()
- .setName("tasks/" + taskIdParams.id())
- .build();
- try {
- return FromProto.task(blockingStub.cancelTask(cancelTaskRequest));
- } catch (StatusRuntimeException e) {
- throw new A2AServerException("Failed to cancel task: " + e, e);
- }
- }
-
- /**
- * Set push notification configuration for a task.
- *
- * @param taskPushNotificationConfig the task push notification configuration
- * @return the task push notification config
- * @throws A2AServerException if setting the push notification configuration fails for any reason
- */
- public TaskPushNotificationConfig setTaskPushNotificationConfig(TaskPushNotificationConfig taskPushNotificationConfig) throws A2AServerException {
- String configId = taskPushNotificationConfig.pushNotificationConfig().id();
- CreateTaskPushNotificationConfigRequest request = CreateTaskPushNotificationConfigRequest.newBuilder()
- .setParent("tasks/" + taskPushNotificationConfig.taskId())
- .setConfig(ToProto.taskPushNotificationConfig(taskPushNotificationConfig))
- .setConfigId(configId == null ? "" : configId)
- .build();
- try {
- return FromProto.taskPushNotificationConfig(blockingStub.createTaskPushNotificationConfig(request));
- } catch (StatusRuntimeException e) {
- throw new A2AServerException("Failed to set the task push notification config: " + e, e);
- }
- }
-
- /**
- * Get the push notification configuration for a task.
- *
- * @param getTaskPushNotificationConfigParams the params for the task
- * @return the push notification configuration
- * @throws A2AServerException if getting the push notification configuration fails for any reason
- */
- public TaskPushNotificationConfig getTaskPushNotificationConfig(GetTaskPushNotificationConfigParams getTaskPushNotificationConfigParams) throws A2AServerException {
- GetTaskPushNotificationConfigRequest getTaskPushNotificationConfigRequest = GetTaskPushNotificationConfigRequest.newBuilder()
- .setName(getTaskPushNotificationConfigName(getTaskPushNotificationConfigParams))
- .build();
- try {
- return FromProto.taskPushNotificationConfig(blockingStub.getTaskPushNotificationConfig(getTaskPushNotificationConfigRequest));
- } catch (StatusRuntimeException e) {
- throw new A2AServerException("Failed to get the task push notification config: " + e, e);
- }
- }
-
- /**
- * Send a streaming message request to the remote agent.
- *
- * @param messageSendParams the parameters for the message to be sent
- * @param eventHandler a consumer that will be invoked for each event received from the remote agent
- * @param errorHandler a consumer that will be invoked if an error occurs
- * @throws A2AServerException if sending the streaming message fails for any reason
- */
- public void sendMessageStreaming(MessageSendParams messageSendParams, Consumer eventHandler,
- Consumer errorHandler) throws A2AServerException {
- SendMessageRequest request = createGrpcSendMessageRequestFromMessageSendParams(messageSendParams);
- StreamObserver streamObserver = new SSEStreamObserver(eventHandler, errorHandler);
- try {
- asyncStub.sendStreamingMessage(request, streamObserver);
- } catch (StatusRuntimeException e) {
- throw new A2AServerException("Failed to send streaming message: " + e, e);
- }
- }
-
- private SendMessageRequest createGrpcSendMessageRequestFromMessageSendParams(MessageSendParams messageSendParams) {
- SendMessageRequest.Builder builder = SendMessageRequest.newBuilder();
- builder.setRequest(ToProto.message(messageSendParams.message()));
- if (messageSendParams.configuration() != null) {
- builder.setConfiguration(ToProto.messageSendConfiguration(messageSendParams.configuration()));
- }
- if (messageSendParams.metadata() != null) {
- builder.setMetadata(ToProto.struct(messageSendParams.metadata()));
- }
- return builder.build();
- }
-
- private String getTaskPushNotificationConfigName(GetTaskPushNotificationConfigParams getTaskPushNotificationConfigParams) {
- StringBuilder name = new StringBuilder();
- name.append("tasks/");
- name.append(getTaskPushNotificationConfigParams.id());
- if (getTaskPushNotificationConfigParams.pushNotificationConfigId() != null) {
- name.append("/pushNotificationConfigs/");
- name.append(getTaskPushNotificationConfigParams.pushNotificationConfigId());
- }
- return name.toString();
- }
-}
diff --git a/client/src/test/java/io/a2a/client/A2ACardResolverTest.java b/client/src/test/java/io/a2a/client/A2ACardResolverTest.java
index 8d9ff0f5b..70fc9883b 100644
--- a/client/src/test/java/io/a2a/client/A2ACardResolverTest.java
+++ b/client/src/test/java/io/a2a/client/A2ACardResolverTest.java
@@ -11,8 +11,8 @@
import java.util.function.Consumer;
import com.fasterxml.jackson.core.type.TypeReference;
-import io.a2a.http.A2AHttpClient;
-import io.a2a.http.A2AHttpResponse;
+import io.a2a.transport.jsonrpc.client.A2AHttpClient;
+import io.a2a.transport.jsonrpc.client.A2AHttpResponse;
import io.a2a.spec.A2AClientError;
import io.a2a.spec.A2AClientJSONError;
import io.a2a.spec.AgentCard;
diff --git a/pom.xml b/pom.xml
index 0a3bfba88..0d92d7651 100644
--- a/pom.xml
+++ b/pom.xml
@@ -290,6 +290,7 @@
spec-grpc
tck
tests/server-common
+ transport/spi
transport/jsonrpc
transport/grpc
diff --git a/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/QuarkusGrpcHandler.java b/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/QuarkusGrpcHandler.java
index 4a98ace48..816ed6f79 100644
--- a/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/QuarkusGrpcHandler.java
+++ b/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/QuarkusGrpcHandler.java
@@ -3,7 +3,7 @@
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
-import io.a2a.grpc.handler.GrpcHandler;
+import io.a2a.transport.grpc.server.handler.GrpcHandler;
import io.a2a.server.PublicAgentCard;
import io.a2a.server.requesthandlers.CallContextFactory;
import io.a2a.server.requesthandlers.RequestHandler;
diff --git a/reference/grpc/src/test/java/io/a2a/server/grpc/quarkus/A2ATestResource.java b/reference/grpc/src/test/java/io/a2a/server/grpc/quarkus/A2ATestResource.java
index 25758da9c..9fb0074ac 100644
--- a/reference/grpc/src/test/java/io/a2a/server/grpc/quarkus/A2ATestResource.java
+++ b/reference/grpc/src/test/java/io/a2a/server/grpc/quarkus/A2ATestResource.java
@@ -19,7 +19,7 @@
import jakarta.ws.rs.core.Response;
import io.a2a.server.apps.common.TestUtilsBean;
-import io.a2a.grpc.handler.GrpcHandler;
+import io.a2a.transport.grpc.server.handler.GrpcHandler;
import io.a2a.spec.PushNotificationConfig;
import io.a2a.spec.Task;
import io.a2a.spec.TaskArtifactUpdateEvent;
diff --git a/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java b/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java
index 3ebf22ccd..5cdc31e7e 100644
--- a/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java
+++ b/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java
@@ -20,7 +20,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.io.JsonEOFException;
import com.fasterxml.jackson.databind.JsonNode;
-import io.a2a.jsonrpc.handler.JSONRPCHandler;
+import io.a2a.transport.jsonrpc.server.handler.JSONRPCHandler;
import io.a2a.server.ExtendedAgentCard;
import io.a2a.server.ServerCallContext;
import io.a2a.server.auth.UnauthenticatedUser;
diff --git a/server-common/src/main/java/io/a2a/server/tasks/BasePushNotificationSender.java b/server-common/src/main/java/io/a2a/server/tasks/BasePushNotificationSender.java
index 33ac4445c..ac04216f3 100644
--- a/server-common/src/main/java/io/a2a/server/tasks/BasePushNotificationSender.java
+++ b/server-common/src/main/java/io/a2a/server/tasks/BasePushNotificationSender.java
@@ -10,8 +10,8 @@
import com.fasterxml.jackson.core.JsonProcessingException;
-import io.a2a.http.A2AHttpClient;
-import io.a2a.http.JdkA2AHttpClient;
+import io.a2a.transport.jsonrpc.client.A2AHttpClient;
+import io.a2a.transport.jsonrpc.client.JdkA2AHttpClient;
import io.a2a.spec.PushNotificationConfig;
import io.a2a.spec.Task;
import io.a2a.util.Utils;
diff --git a/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java b/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java
index 7232d1a0c..7c62562f8 100644
--- a/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java
+++ b/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java
@@ -12,8 +12,8 @@
import java.util.concurrent.Executors;
import java.util.function.Consumer;
-import io.a2a.http.A2AHttpClient;
-import io.a2a.http.A2AHttpResponse;
+import io.a2a.transport.jsonrpc.client.A2AHttpClient;
+import io.a2a.transport.jsonrpc.client.A2AHttpResponse;
import io.a2a.server.agentexecution.AgentExecutor;
import io.a2a.server.agentexecution.RequestContext;
import io.a2a.server.events.EventQueue;
diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/TestHttpClient.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/TestHttpClient.java
index c5deef68f..046e0f8bb 100644
--- a/tests/server-common/src/test/java/io/a2a/server/apps/common/TestHttpClient.java
+++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/TestHttpClient.java
@@ -11,8 +11,8 @@
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Alternative;
-import io.a2a.http.A2AHttpClient;
-import io.a2a.http.A2AHttpResponse;
+import io.a2a.transport.jsonrpc.client.A2AHttpClient;
+import io.a2a.transport.jsonrpc.client.A2AHttpResponse;
import io.a2a.spec.Task;
import io.a2a.util.Utils;
diff --git a/transport/grpc/pom.xml b/transport/grpc/pom.xml
index 982ae3b8e..18257b2cc 100644
--- a/transport/grpc/pom.xml
+++ b/transport/grpc/pom.xml
@@ -18,6 +18,11 @@
Java SDK for the Agent2Agent Protocol (A2A) - gRPC
+
+ io.github.a2asdk
+ a2a-java-sdk-transport-spi
+ ${project.version}
+
io.github.a2asdk
a2a-java-sdk-server-common
diff --git a/client/src/main/java/io/a2a/client/sse/SSEStreamObserver.java b/transport/grpc/src/main/java/io/a2a/transport/grpc/client/EventStreamObserver.java
similarity index 83%
rename from client/src/main/java/io/a2a/client/sse/SSEStreamObserver.java
rename to transport/grpc/src/main/java/io/a2a/transport/grpc/client/EventStreamObserver.java
index adc721f42..ae6d705fc 100644
--- a/client/src/main/java/io/a2a/client/sse/SSEStreamObserver.java
+++ b/transport/grpc/src/main/java/io/a2a/transport/grpc/client/EventStreamObserver.java
@@ -1,22 +1,22 @@
-package io.a2a.client.sse;
+package io.a2a.transport.grpc.client;
-import static io.a2a.grpc.utils.ProtoUtils.FromProto;
+import io.a2a.grpc.StreamResponse;
+import io.a2a.spec.StreamingEventKind;
+import io.grpc.stub.StreamObserver;
import java.util.function.Consumer;
import java.util.logging.Logger;
-import io.a2a.grpc.StreamResponse;
-import io.a2a.spec.StreamingEventKind;
-import io.grpc.stub.StreamObserver;
+import static io.a2a.grpc.utils.ProtoUtils.FromProto;
-public class SSEStreamObserver implements StreamObserver {
+public class EventStreamObserver implements StreamObserver {
- private static final Logger log = Logger.getLogger(SSEStreamObserver.class.getName());
+ private static final Logger log = Logger.getLogger(EventStreamObserver.class.getName());
private final Consumer eventHandler;
private final Consumer errorHandler;
- public SSEStreamObserver(Consumer eventHandler, Consumer errorHandler) {
+ public EventStreamObserver(Consumer eventHandler, Consumer errorHandler) {
this.eventHandler = eventHandler;
this.errorHandler = errorHandler;
}
diff --git a/transport/grpc/src/main/java/io/a2a/transport/grpc/client/GrpcTransport.java b/transport/grpc/src/main/java/io/a2a/transport/grpc/client/GrpcTransport.java
new file mode 100644
index 000000000..2975948c5
--- /dev/null
+++ b/transport/grpc/src/main/java/io/a2a/transport/grpc/client/GrpcTransport.java
@@ -0,0 +1,164 @@
+package io.a2a.transport.grpc.client;
+
+import io.a2a.grpc.*;
+import io.a2a.grpc.SendMessageRequest;
+import io.a2a.grpc.SendMessageResponse;
+import io.a2a.grpc.utils.ProtoUtils;
+import io.a2a.spec.*;
+import io.a2a.spec.AgentCard;
+import io.a2a.spec.Task;
+import io.a2a.spec.TaskPushNotificationConfig;
+import io.a2a.transport.spi.client.Transport;
+import io.grpc.Channel;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+
+import java.util.List;
+import java.util.function.Consumer;
+
+import static io.a2a.util.Assert.checkNotNullParam;
+
+/**
+ * @author David BRASSELY (david.brassely at graviteesource.com)
+ * @author GraviteeSource Team
+ */
+public class GrpcTransport implements Transport {
+
+ private A2AServiceGrpc.A2AServiceBlockingV2Stub blockingStub;
+ private A2AServiceGrpc.A2AServiceStub asyncStub;
+ private AgentCard agentCard;
+
+ /**
+ * Create an A2A client for interacting with an A2A agent via gRPC.
+ *
+ * @param channel the gRPC channel
+ * @param agentCard the agent card for the A2A server this client will be communicating with
+ */
+ public GrpcTransport(Channel channel, AgentCard agentCard) {
+ checkNotNullParam("channel", channel);
+ checkNotNullParam("agentCard", agentCard);
+ this.asyncStub = A2AServiceGrpc.newStub(channel);
+ this.blockingStub = A2AServiceGrpc.newBlockingV2Stub(channel);
+ this.agentCard = agentCard;
+ }
+
+ @Override
+ public EventKind sendMessage(String requestId, MessageSendParams messageSendParams) throws A2AServerException {
+ SendMessageRequest request = createGrpcSendMessageRequestFromMessageSendParams(messageSendParams);
+ try {
+ SendMessageResponse response = blockingStub.sendMessage(request);
+ if (response.hasMsg()) {
+ return ProtoUtils.FromProto.message(response.getMsg());
+ } else if (response.hasTask()) {
+ return ProtoUtils.FromProto.task(response.getTask());
+ } else {
+ throw new A2AServerException("Server response did not contain a message or task");
+ }
+ } catch (StatusRuntimeException e) {
+ throw new A2AServerException("Failed to send message: " + e, e);
+ }
+ }
+
+ @Override
+ public Task getTask(String requestId, TaskQueryParams taskQueryParams) throws A2AServerException {
+ io.a2a.grpc.GetTaskRequest.Builder requestBuilder = io.a2a.grpc.GetTaskRequest.newBuilder();
+ requestBuilder.setName("tasks/" + taskQueryParams.id());
+ if (taskQueryParams.historyLength() != null) {
+ requestBuilder.setHistoryLength(taskQueryParams.historyLength());
+ }
+ io.a2a.grpc.GetTaskRequest getTaskRequest = requestBuilder.build();
+ try {
+ return ProtoUtils.FromProto.task(blockingStub.getTask(getTaskRequest));
+ } catch (StatusRuntimeException e) {
+ throw new A2AServerException("Failed to get task: " + e, e);
+ }
+ }
+
+ @Override
+ public Task cancelTask(String requestId, TaskIdParams taskIdParams) throws A2AServerException {
+ io.a2a.grpc.CancelTaskRequest cancelTaskRequest = io.a2a.grpc.CancelTaskRequest.newBuilder()
+ .setName("tasks/" + taskIdParams.id())
+ .build();
+ try {
+ return ProtoUtils.FromProto.task(blockingStub.cancelTask(cancelTaskRequest));
+ } catch (StatusRuntimeException e) {
+ throw new A2AServerException("Failed to cancel task: " + e, e);
+ }
+ }
+
+ @Override
+ public TaskPushNotificationConfig getTaskPushNotificationConfig(String requestId, GetTaskPushNotificationConfigParams getTaskPushNotificationConfigParams) throws A2AServerException {
+ io.a2a.grpc.GetTaskPushNotificationConfigRequest getTaskPushNotificationConfigRequest = io.a2a.grpc.GetTaskPushNotificationConfigRequest.newBuilder()
+ .setName(getTaskPushNotificationConfigName(getTaskPushNotificationConfigParams))
+ .build();
+ try {
+ return ProtoUtils.FromProto.taskPushNotificationConfig(blockingStub.getTaskPushNotificationConfig(getTaskPushNotificationConfigRequest));
+ } catch (StatusRuntimeException e) {
+ throw new A2AServerException("Failed to get the task push notification config: " + e, e);
+ }
+ }
+
+ @Override
+ public TaskPushNotificationConfig setTaskPushNotificationConfig(String requestId, String taskId, TaskPushNotificationConfig taskPushNotificationConfig) throws A2AServerException {
+ String configId = taskPushNotificationConfig.pushNotificationConfig().id();
+ CreateTaskPushNotificationConfigRequest request = CreateTaskPushNotificationConfigRequest.newBuilder()
+ .setParent("tasks/" + taskPushNotificationConfig.taskId())
+ .setConfig(ProtoUtils.ToProto.taskPushNotificationConfig(taskPushNotificationConfig))
+ .setConfigId(configId == null ? "" : configId)
+ .build();
+ try {
+ return ProtoUtils.FromProto.taskPushNotificationConfig(blockingStub.createTaskPushNotificationConfig(request));
+ } catch (StatusRuntimeException e) {
+ throw new A2AServerException("Failed to set the task push notification config: " + e, e);
+ }
+ }
+
+ @Override
+ public List listTaskPushNotificationConfig(String requestId, ListTaskPushNotificationConfigParams listTaskPushNotificationConfigParams) throws A2AServerException {
+ return List.of();
+ }
+
+ @Override
+ public void deleteTaskPushNotificationConfig(String requestId, DeleteTaskPushNotificationConfigParams deleteTaskPushNotificationConfigParams) throws A2AServerException {
+
+ }
+
+ @Override
+ public void sendStreamingMessage(String requestId, MessageSendParams messageSendParams, Consumer eventHandler, Consumer errorHandler, Runnable failureHandler) throws A2AServerException {
+ SendMessageRequest request = createGrpcSendMessageRequestFromMessageSendParams(messageSendParams);
+ StreamObserver streamObserver = new EventStreamObserver(eventHandler, errorHandler);
+ try {
+ asyncStub.sendStreamingMessage(request, streamObserver);
+ } catch (StatusRuntimeException e) {
+ throw new A2AServerException("Failed to send streaming message: " + e, e);
+ }
+ }
+
+ @Override
+ public void resubscribeToTask(String requestId, TaskIdParams taskIdParams, Consumer eventHandler, Consumer errorHandler, Runnable failureHandler) throws A2AServerException {
+
+ }
+
+ private SendMessageRequest createGrpcSendMessageRequestFromMessageSendParams(MessageSendParams messageSendParams) {
+ SendMessageRequest.Builder builder = SendMessageRequest.newBuilder();
+ builder.setRequest(ProtoUtils.ToProto.message(messageSendParams.message()));
+ if (messageSendParams.configuration() != null) {
+ builder.setConfiguration(ProtoUtils.ToProto.messageSendConfiguration(messageSendParams.configuration()));
+ }
+ if (messageSendParams.metadata() != null) {
+ builder.setMetadata(ProtoUtils.ToProto.struct(messageSendParams.metadata()));
+ }
+ return builder.build();
+ }
+
+ private String getTaskPushNotificationConfigName(GetTaskPushNotificationConfigParams getTaskPushNotificationConfigParams) {
+ StringBuilder name = new StringBuilder();
+ name.append("tasks/");
+ name.append(getTaskPushNotificationConfigParams.id());
+ if (getTaskPushNotificationConfigParams.pushNotificationConfigId() != null) {
+ name.append("/pushNotificationConfigs/");
+ name.append(getTaskPushNotificationConfigParams.pushNotificationConfigId());
+ }
+ return name.toString();
+ }
+}
diff --git a/transport/grpc/src/main/java/io/a2a/grpc/handler/GrpcHandler.java b/transport/grpc/src/main/java/io/a2a/transport/grpc/server/handler/GrpcHandler.java
similarity index 99%
rename from transport/grpc/src/main/java/io/a2a/grpc/handler/GrpcHandler.java
rename to transport/grpc/src/main/java/io/a2a/transport/grpc/server/handler/GrpcHandler.java
index 15aa84209..20381b938 100644
--- a/transport/grpc/src/main/java/io/a2a/grpc/handler/GrpcHandler.java
+++ b/transport/grpc/src/main/java/io/a2a/transport/grpc/server/handler/GrpcHandler.java
@@ -1,4 +1,4 @@
-package io.a2a.grpc.handler;
+package io.a2a.transport.grpc.server.handler;
import static io.a2a.grpc.utils.ProtoUtils.FromProto;
import static io.a2a.grpc.utils.ProtoUtils.ToProto;
diff --git a/transport/grpc/src/test/java/io/a2a/grpc/handler/GrpcHandlerTest.java b/transport/grpc/src/test/java/io/a2a/grpc/handler/GrpcHandlerTest.java
index 1cfeb6626..76cdb6f6e 100644
--- a/transport/grpc/src/test/java/io/a2a/grpc/handler/GrpcHandlerTest.java
+++ b/transport/grpc/src/test/java/io/a2a/grpc/handler/GrpcHandlerTest.java
@@ -45,6 +45,7 @@
import io.a2a.spec.TaskStatusUpdateEvent;
import io.a2a.spec.TextPart;
import io.a2a.spec.UnsupportedOperationError;
+import io.a2a.transport.grpc.server.handler.GrpcHandler;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.internal.testing.StreamRecorder;
diff --git a/transport/jsonrpc/pom.xml b/transport/jsonrpc/pom.xml
index b66e3b1f5..a00362398 100644
--- a/transport/jsonrpc/pom.xml
+++ b/transport/jsonrpc/pom.xml
@@ -18,6 +18,11 @@
Java SDK for the Agent2Agent Protocol (A2A) - JSONRPC
+
+ io.github.a2asdk
+ a2a-java-sdk-transport-spi
+ ${project.version}
+
io.github.a2asdk
a2a-java-sdk-server-common
diff --git a/client/src/main/java/io/a2a/http/A2AHttpClient.java b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/client/A2AHttpClient.java
similarity index 96%
rename from client/src/main/java/io/a2a/http/A2AHttpClient.java
rename to transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/client/A2AHttpClient.java
index 7a246843a..3ebd6e8b7 100644
--- a/client/src/main/java/io/a2a/http/A2AHttpClient.java
+++ b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/client/A2AHttpClient.java
@@ -1,4 +1,4 @@
-package io.a2a.http;
+package io.a2a.transport.jsonrpc.client;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
diff --git a/client/src/main/java/io/a2a/http/A2AHttpResponse.java b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/client/A2AHttpResponse.java
similarity index 70%
rename from client/src/main/java/io/a2a/http/A2AHttpResponse.java
rename to transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/client/A2AHttpResponse.java
index d6973a5dc..0e7074b8b 100644
--- a/client/src/main/java/io/a2a/http/A2AHttpResponse.java
+++ b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/client/A2AHttpResponse.java
@@ -1,4 +1,4 @@
-package io.a2a.http;
+package io.a2a.transport.jsonrpc.client;
public interface A2AHttpResponse {
int status();
diff --git a/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/client/JSONRPCTransport.java b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/client/JSONRPCTransport.java
new file mode 100644
index 000000000..dce6ab668
--- /dev/null
+++ b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/client/JSONRPCTransport.java
@@ -0,0 +1,301 @@
+package io.a2a.transport.jsonrpc.client;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import io.a2a.spec.A2AServerException;
+import io.a2a.spec.CancelTaskRequest;
+import io.a2a.spec.CancelTaskResponse;
+import io.a2a.spec.DeleteTaskPushNotificationConfigParams;
+import io.a2a.spec.DeleteTaskPushNotificationConfigRequest;
+import io.a2a.spec.DeleteTaskPushNotificationConfigResponse;
+import io.a2a.spec.EventKind;
+import io.a2a.spec.GetTaskPushNotificationConfigParams;
+import io.a2a.spec.GetTaskPushNotificationConfigRequest;
+import io.a2a.spec.GetTaskPushNotificationConfigResponse;
+import io.a2a.spec.GetTaskRequest;
+import io.a2a.spec.GetTaskResponse;
+import io.a2a.spec.JSONRPCError;
+import io.a2a.spec.JSONRPCMessage;
+import io.a2a.spec.JSONRPCResponse;
+import io.a2a.spec.ListTaskPushNotificationConfigParams;
+import io.a2a.spec.ListTaskPushNotificationConfigRequest;
+import io.a2a.spec.ListTaskPushNotificationConfigResponse;
+import io.a2a.spec.MessageSendParams;
+import io.a2a.spec.PushNotificationConfig;
+import io.a2a.spec.SendMessageRequest;
+import io.a2a.spec.SendMessageResponse;
+import io.a2a.spec.SendStreamingMessageRequest;
+import io.a2a.spec.SetTaskPushNotificationConfigRequest;
+import io.a2a.spec.SetTaskPushNotificationConfigResponse;
+import io.a2a.spec.StreamingEventKind;
+import io.a2a.spec.Task;
+import io.a2a.spec.TaskIdParams;
+import io.a2a.spec.TaskPushNotificationConfig;
+import io.a2a.spec.TaskQueryParams;
+import io.a2a.spec.TaskResubscriptionRequest;
+import io.a2a.transport.jsonrpc.client.sse.SSEEventListener;
+import io.a2a.transport.spi.client.Transport;
+import io.a2a.util.Utils;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+public class JSONRPCTransport implements Transport {
+
+ private static final TypeReference SEND_MESSAGE_RESPONSE_REFERENCE = new TypeReference<>() {};
+ private static final TypeReference GET_TASK_RESPONSE_REFERENCE = new TypeReference<>() {};
+ private static final TypeReference CANCEL_TASK_RESPONSE_REFERENCE = new TypeReference<>() {};
+ private static final TypeReference GET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE = new TypeReference<>() {};
+ private static final TypeReference SET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE = new TypeReference<>() {};
+ private static final TypeReference LIST_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE = new TypeReference<>() {};
+ private static final TypeReference DELETE_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE = new TypeReference<>() {};
+
+ private final String agentUrl;
+ private final A2AHttpClient httpClient;
+
+ public JSONRPCTransport(String agentUrl) {
+ this(agentUrl, new JdkA2AHttpClient());
+ }
+
+ public JSONRPCTransport(String agentUrl, A2AHttpClient httpClient) {
+ this.agentUrl = agentUrl;
+ this.httpClient = httpClient;
+ }
+
+ @Override
+ public EventKind sendMessage(String requestId, MessageSendParams messageSendParams) throws A2AServerException {
+ SendMessageRequest.Builder sendMessageRequestBuilder = new SendMessageRequest.Builder()
+ .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
+ .method(SendMessageRequest.METHOD)
+ .params(messageSendParams);
+
+ if (requestId != null) {
+ sendMessageRequestBuilder.id(requestId);
+ }
+
+ SendMessageRequest sendMessageRequest = sendMessageRequestBuilder.build();
+
+ try {
+ String httpResponseBody = sendPostRequest(sendMessageRequest);
+ return unmarshalResponse(httpResponseBody, SEND_MESSAGE_RESPONSE_REFERENCE).getResult();
+ } catch (IOException | InterruptedException e) {
+ throw new A2AServerException("Failed to send message: " + e, e.getCause());
+ }
+ }
+
+ @Override
+ public Task getTask(String requestId, TaskQueryParams taskQueryParams) throws A2AServerException {
+ GetTaskRequest.Builder getTaskRequestBuilder = new GetTaskRequest.Builder()
+ .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
+ .method(GetTaskRequest.METHOD)
+ .params(taskQueryParams);
+
+ if (requestId != null) {
+ getTaskRequestBuilder.id(requestId);
+ }
+
+ GetTaskRequest getTaskRequest = getTaskRequestBuilder.build();
+
+ try {
+ String httpResponseBody = sendPostRequest(getTaskRequest);
+ return unmarshalResponse(httpResponseBody, GET_TASK_RESPONSE_REFERENCE).getResult();
+ } catch (IOException | InterruptedException e) {
+ throw new A2AServerException("Failed to get task: " + e, e.getCause());
+ }
+ }
+
+ @Override
+ public Task cancelTask(String requestId, TaskIdParams taskIdParams) throws A2AServerException {
+ CancelTaskRequest.Builder cancelTaskRequestBuilder = new CancelTaskRequest.Builder()
+ .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
+ .method(CancelTaskRequest.METHOD)
+ .params(taskIdParams);
+
+ if (requestId != null) {
+ cancelTaskRequestBuilder.id(requestId);
+ }
+
+ CancelTaskRequest cancelTaskRequest = cancelTaskRequestBuilder.build();
+
+ try {
+ String httpResponseBody = sendPostRequest(cancelTaskRequest);
+ return unmarshalResponse(httpResponseBody, CANCEL_TASK_RESPONSE_REFERENCE).getResult();
+ } catch (IOException | InterruptedException e) {
+ throw new A2AServerException("Failed to cancel task: " + e, e.getCause());
+ }
+ }
+
+ @Override
+ public TaskPushNotificationConfig getTaskPushNotificationConfig(String requestId, GetTaskPushNotificationConfigParams getTaskPushNotificationConfigParams) throws A2AServerException {
+ GetTaskPushNotificationConfigRequest.Builder getTaskPushNotificationRequestBuilder = new GetTaskPushNotificationConfigRequest.Builder()
+ .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
+ .method(GetTaskPushNotificationConfigRequest.METHOD)
+ .params(getTaskPushNotificationConfigParams);
+
+ if (requestId != null) {
+ getTaskPushNotificationRequestBuilder.id(requestId);
+ }
+
+ GetTaskPushNotificationConfigRequest getTaskPushNotificationRequest = getTaskPushNotificationRequestBuilder.build();
+
+ try {
+ String httpResponseBody = sendPostRequest(getTaskPushNotificationRequest);
+ return unmarshalResponse(httpResponseBody, GET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE).getResult();
+ } catch (IOException | InterruptedException e) {
+ throw new A2AServerException("Failed to get task push notification config: " + e, e.getCause());
+ }
+ }
+
+ @Override
+ public TaskPushNotificationConfig setTaskPushNotificationConfig(String requestId, String taskId, PushNotificationConfig pushNotificationConfig) throws A2AServerException {
+ SetTaskPushNotificationConfigRequest.Builder setTaskPushNotificationRequestBuilder = new SetTaskPushNotificationConfigRequest.Builder()
+ .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
+ .method(SetTaskPushNotificationConfigRequest.METHOD)
+ .params(new TaskPushNotificationConfig(taskId, pushNotificationConfig));
+
+ if (requestId != null) {
+ setTaskPushNotificationRequestBuilder.id(requestId);
+ }
+
+ SetTaskPushNotificationConfigRequest setTaskPushNotificationRequest = setTaskPushNotificationRequestBuilder.build();
+
+ try {
+ String httpResponseBody = sendPostRequest(setTaskPushNotificationRequest);
+ return unmarshalResponse(httpResponseBody, SET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE).getResult();
+ } catch (IOException | InterruptedException e) {
+ throw new A2AServerException("Failed to set task push notification config: " + e, e.getCause());
+ }
+ }
+
+ @Override
+ public List listTaskPushNotificationConfig(String requestId, ListTaskPushNotificationConfigParams listTaskPushNotificationConfigParams) throws A2AServerException {
+ ListTaskPushNotificationConfigRequest.Builder listTaskPushNotificationRequestBuilder = new ListTaskPushNotificationConfigRequest.Builder()
+ .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
+ .method(ListTaskPushNotificationConfigRequest.METHOD)
+ .params(listTaskPushNotificationConfigParams);
+
+ if (requestId != null) {
+ listTaskPushNotificationRequestBuilder.id(requestId);
+ }
+
+ ListTaskPushNotificationConfigRequest listTaskPushNotificationRequest = listTaskPushNotificationRequestBuilder.build();
+
+ try {
+ String httpResponseBody = sendPostRequest(listTaskPushNotificationRequest);
+ return unmarshalResponse(httpResponseBody, LIST_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE).getResult();
+ } catch (IOException | InterruptedException e) {
+ throw new A2AServerException("Failed to list task push notification config: " + e, e.getCause());
+ }
+ }
+
+ @Override
+ public void deleteTaskPushNotificationConfig(String requestId, DeleteTaskPushNotificationConfigParams deleteTaskPushNotificationConfigParams) throws A2AServerException {
+ DeleteTaskPushNotificationConfigRequest.Builder deleteTaskPushNotificationRequestBuilder = new DeleteTaskPushNotificationConfigRequest.Builder()
+ .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
+ .method(DeleteTaskPushNotificationConfigRequest.METHOD)
+ .params(deleteTaskPushNotificationConfigParams);
+
+ if (requestId != null) {
+ deleteTaskPushNotificationRequestBuilder.id(requestId);
+ }
+
+ DeleteTaskPushNotificationConfigRequest deleteTaskPushNotificationRequest = deleteTaskPushNotificationRequestBuilder.build();
+
+ try {
+ String httpResponseBody = sendPostRequest(deleteTaskPushNotificationRequest);
+ unmarshalResponse(httpResponseBody, DELETE_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE);
+ } catch (IOException | InterruptedException e) {
+ throw new A2AServerException("Failed to delete task push notification config: " + e, e.getCause());
+ }
+ }
+
+ @Override
+ public void sendStreamingMessage(String requestId, MessageSendParams messageSendParams, Consumer eventHandler, Consumer errorHandler, Runnable failureHandler) throws A2AServerException {
+ SendStreamingMessageRequest.Builder sendStreamingMessageRequestBuilder = new SendStreamingMessageRequest.Builder()
+ .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
+ .method(SendStreamingMessageRequest.METHOD)
+ .params(messageSendParams);
+
+ if (requestId != null) {
+ sendStreamingMessageRequestBuilder.id(requestId);
+ }
+
+ AtomicReference> ref = new AtomicReference<>();
+ SSEEventListener sseEventListener = new SSEEventListener(eventHandler, errorHandler, failureHandler);
+ SendStreamingMessageRequest sendStreamingMessageRequest = sendStreamingMessageRequestBuilder.build();
+ try {
+ A2AHttpClient.PostBuilder builder = createPostBuilder(sendStreamingMessageRequest);
+ ref.set(builder.postAsyncSSE(
+ msg -> sseEventListener.onMessage(msg, ref.get()),
+ throwable -> sseEventListener.onError(throwable, ref.get()),
+ () -> {
+ // We don't need to do anything special on completion
+ }));
+
+ } catch (IOException e) {
+ throw new A2AServerException("Failed to send streaming message request: " + e, e.getCause());
+ } catch (InterruptedException e) {
+ throw new A2AServerException("Send streaming message request timed out: " + e, e.getCause());
+ }
+ }
+
+ @Override
+ public void resubscribeToTask(String requestId, TaskIdParams taskIdParams, Consumer eventHandler, Consumer errorHandler, Runnable failureHandler) throws A2AServerException {
+ TaskResubscriptionRequest.Builder taskResubscriptionRequestBuilder = new TaskResubscriptionRequest.Builder()
+ .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
+ .method(TaskResubscriptionRequest.METHOD)
+ .params(taskIdParams);
+
+ if (requestId != null) {
+ taskResubscriptionRequestBuilder.id(requestId);
+ }
+
+ AtomicReference> ref = new AtomicReference<>();
+ SSEEventListener sseEventListener = new SSEEventListener(eventHandler, errorHandler, failureHandler);
+ TaskResubscriptionRequest taskResubscriptionRequest = taskResubscriptionRequestBuilder.build();
+ try {
+ A2AHttpClient.PostBuilder builder = createPostBuilder(taskResubscriptionRequest);
+ ref.set(builder.postAsyncSSE(
+ msg -> sseEventListener.onMessage(msg, ref.get()),
+ throwable -> sseEventListener.onError(throwable, ref.get()),
+ () -> {
+ // We don't need to do anything special on completion
+ }));
+
+ } catch (IOException e) {
+ throw new A2AServerException("Failed to send task resubscription request: " + e, e.getCause());
+ } catch (InterruptedException e) {
+ throw new A2AServerException("Task resubscription request timed out: " + e, e.getCause());
+ }
+ }
+
+ private String sendPostRequest(Object value) throws IOException, InterruptedException {
+ A2AHttpClient.PostBuilder builder = createPostBuilder(value);
+ A2AHttpResponse response = builder.post();
+ if (!response.success()) {
+ throw new IOException("Request failed " + response.status());
+ }
+ return response.body();
+ }
+
+ private A2AHttpClient.PostBuilder createPostBuilder(Object value) throws JsonProcessingException {
+ return httpClient.createPost()
+ .url(agentUrl)
+ .addHeader("Content-Type", "application/json")
+ .body(Utils.OBJECT_MAPPER.writeValueAsString(value));
+
+ }
+
+ private T unmarshalResponse(String response, TypeReference typeReference)
+ throws A2AServerException, JsonProcessingException {
+ T value = Utils.unmarshalFrom(response, typeReference);
+ JSONRPCError error = value.getError();
+ if (error != null) {
+ throw new A2AServerException(error.getMessage() + (error.getData() != null ? ": " + error.getData() : ""), error);
+ }
+ return value;
+ }
+}
diff --git a/client/src/main/java/io/a2a/http/JdkA2AHttpClient.java b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/client/JdkA2AHttpClient.java
similarity index 99%
rename from client/src/main/java/io/a2a/http/JdkA2AHttpClient.java
rename to transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/client/JdkA2AHttpClient.java
index c3d5907a2..e61d7fab3 100644
--- a/client/src/main/java/io/a2a/http/JdkA2AHttpClient.java
+++ b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/client/JdkA2AHttpClient.java
@@ -1,4 +1,4 @@
-package io.a2a.http;
+package io.a2a.transport.jsonrpc.client;
import java.io.IOException;
import java.net.URI;
diff --git a/client/src/main/java/io/a2a/client/sse/SSEEventListener.java b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/client/sse/SSEEventListener.java
similarity index 98%
rename from client/src/main/java/io/a2a/client/sse/SSEEventListener.java
rename to transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/client/sse/SSEEventListener.java
index 8ed0e9aa3..7f4ff19fc 100644
--- a/client/src/main/java/io/a2a/client/sse/SSEEventListener.java
+++ b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/client/sse/SSEEventListener.java
@@ -1,10 +1,4 @@
-package io.a2a.client.sse;
-
-import static io.a2a.util.Utils.OBJECT_MAPPER;
-
-import java.util.concurrent.Future;
-import java.util.function.Consumer;
-import java.util.logging.Logger;
+package io.a2a.transport.jsonrpc.client.sse;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
@@ -12,6 +6,12 @@
import io.a2a.spec.StreamingEventKind;
import io.a2a.spec.TaskStatusUpdateEvent;
+import java.util.concurrent.Future;
+import java.util.function.Consumer;
+import java.util.logging.Logger;
+
+import static io.a2a.util.Utils.OBJECT_MAPPER;
+
public class SSEEventListener {
private static final Logger log = Logger.getLogger(SSEEventListener.class.getName());
private final Consumer eventHandler;
diff --git a/transport/jsonrpc/src/main/java/io/a2a/jsonrpc/handler/JSONRPCHandler.java b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/server/handler/JSONRPCHandler.java
similarity index 99%
rename from transport/jsonrpc/src/main/java/io/a2a/jsonrpc/handler/JSONRPCHandler.java
rename to transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/server/handler/JSONRPCHandler.java
index b2928dfd5..d40f5795f 100644
--- a/transport/jsonrpc/src/main/java/io/a2a/jsonrpc/handler/JSONRPCHandler.java
+++ b/transport/jsonrpc/src/main/java/io/a2a/transport/jsonrpc/server/handler/JSONRPCHandler.java
@@ -1,4 +1,4 @@
-package io.a2a.jsonrpc.handler;
+package io.a2a.transport.jsonrpc.server.handler;
import static io.a2a.server.util.async.AsyncUtils.createTubeConfig;
import jakarta.enterprise.context.ApplicationScoped;
diff --git a/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/client/JsonStreamingMessages.java b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/client/JsonStreamingMessages.java
new file mode 100644
index 000000000..7bf46054d
--- /dev/null
+++ b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/client/JsonStreamingMessages.java
@@ -0,0 +1,148 @@
+package io.a2a.transport.jsonrpc.client;
+
+/**
+ * Contains JSON strings for testing SSE streaming.
+ */
+public class JsonStreamingMessages {
+
+ public static final String STREAMING_TASK_EVENT = """
+ data: {
+ "jsonrpc": "2.0",
+ "id": "1234",
+ "result": {
+ "kind": "task",
+ "id": "task-123",
+ "contextId": "context-456",
+ "status": {
+ "state": "working"
+ }
+ }
+ }
+ """;
+
+
+ public static final String STREAMING_MESSAGE_EVENT = """
+ data: {
+ "jsonrpc": "2.0",
+ "id": "1234",
+ "result": {
+ "kind": "message",
+ "role": "agent",
+ "messageId": "msg-123",
+ "contextId": "context-456",
+ "parts": [
+ {
+ "kind": "text",
+ "text": "Hello, world!"
+ }
+ ]
+ }
+ }""";
+
+ public static final String STREAMING_STATUS_UPDATE_EVENT = """
+ data: {
+ "jsonrpc": "2.0",
+ "id": "1234",
+ "result": {
+ "taskId": "1",
+ "contextId": "2",
+ "status": {
+ "state": "submitted"
+ },
+ "final": false,
+ "kind": "status-update"
+ }
+ }""";
+
+ public static final String STREAMING_STATUS_UPDATE_EVENT_FINAL = """
+ data: {
+ "jsonrpc": "2.0",
+ "id": "1234",
+ "result": {
+ "taskId": "1",
+ "contextId": "2",
+ "status": {
+ "state": "completed"
+ },
+ "final": true,
+ "kind": "status-update"
+ }
+ }""";
+
+ public static final String STREAMING_ARTIFACT_UPDATE_EVENT = """
+ data: {
+ "jsonrpc": "2.0",
+ "id": "1234",
+ "result": {
+ "kind": "artifact-update",
+ "taskId": "1",
+ "contextId": "2",
+ "append": false,
+ "lastChunk": true,
+ "artifact": {
+ "artifactId": "artifact-1",
+ "parts": [
+ {
+ "kind": "text",
+ "text": "Why did the chicken cross the road? To get to the other side!"
+ }
+ ]
+ }
+ }
+ }
+ }""";
+
+ public static final String STREAMING_ERROR_EVENT = """
+ data: {
+ "jsonrpc": "2.0",
+ "id": "1234",
+ "error": {
+ "code": -32602,
+ "message": "Invalid parameters",
+ "data": "Missing required field"
+ }
+ }""";
+
+ public static final String SEND_MESSAGE_STREAMING_TEST_REQUEST = """
+ {
+ "jsonrpc": "2.0",
+ "id": "request-1234",
+ "method": "message/stream",
+ "params": {
+ "message": {
+ "role": "user",
+ "parts": [
+ {
+ "kind": "text",
+ "text": "tell me some jokes"
+ }
+ ],
+ "messageId": "message-1234",
+ "contextId": "context-1234",
+ "kind": "message"
+ },
+ "configuration": {
+ "acceptedOutputModes": ["text"],
+ "blocking": false
+ },
+ }
+ }""";
+
+ static final String SEND_MESSAGE_STREAMING_TEST_RESPONSE =
+ "event: message\n" +
+ "data: {\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"id\":\"2\",\"contextId\":\"context-1234\",\"status\":{\"state\":\"completed\"},\"artifacts\":[{\"artifactId\":\"artifact-1\",\"name\":\"joke\",\"parts\":[{\"kind\":\"text\",\"text\":\"Why did the chicken cross the road? To get to the other side!\"}]}],\"metadata\":{},\"kind\":\"task\"}}\n\n";
+
+ static final String TASK_RESUBSCRIPTION_REQUEST_TEST_RESPONSE =
+ "event: message\n" +
+ "data: {\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"id\":\"2\",\"contextId\":\"context-1234\",\"status\":{\"state\":\"completed\"},\"artifacts\":[{\"artifactId\":\"artifact-1\",\"name\":\"joke\",\"parts\":[{\"kind\":\"text\",\"text\":\"Why did the chicken cross the road? To get to the other side!\"}]}],\"metadata\":{},\"kind\":\"task\"}}\n\n";
+
+ public static final String TASK_RESUBSCRIPTION_TEST_REQUEST = """
+ {
+ "jsonrpc": "2.0",
+ "id": "request-1234",
+ "method": "tasks/resubscribe",
+ "params": {
+ "id": "task-1234"
+ }
+ }""";
+}
\ No newline at end of file
diff --git a/client/src/test/java/io/a2a/client/sse/SSEEventListenerTest.java b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/client/sse/SSEEventListenerTest.java
similarity index 97%
rename from client/src/test/java/io/a2a/client/sse/SSEEventListenerTest.java
rename to transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/client/sse/SSEEventListenerTest.java
index 1fca0ff9c..5e4fc357c 100644
--- a/client/src/test/java/io/a2a/client/sse/SSEEventListenerTest.java
+++ b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/client/sse/SSEEventListenerTest.java
@@ -1,18 +1,5 @@
-package io.a2a.client.sse;
+package io.a2a.transport.jsonrpc.client.sse;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import io.a2a.client.JsonStreamingMessages;
import io.a2a.spec.Artifact;
import io.a2a.spec.JSONRPCError;
import io.a2a.spec.Message;
@@ -24,8 +11,18 @@
import io.a2a.spec.TaskStatus;
import io.a2a.spec.TaskStatusUpdateEvent;
import io.a2a.spec.TextPart;
+import io.a2a.transport.jsonrpc.client.JsonStreamingMessages;
import org.junit.jupiter.api.Test;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.*;
+
public class SSEEventListenerTest {
@Test
diff --git a/transport/jsonrpc/src/test/java/io/a2a/jsonrpc/handler/JSONRPCHandlerTest.java b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/server/handler/JSONRPCHandlerTest.java
similarity index 99%
rename from transport/jsonrpc/src/test/java/io/a2a/jsonrpc/handler/JSONRPCHandlerTest.java
rename to transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/server/handler/JSONRPCHandlerTest.java
index 070df7aa1..7707b5bcc 100644
--- a/transport/jsonrpc/src/test/java/io/a2a/jsonrpc/handler/JSONRPCHandlerTest.java
+++ b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/server/handler/JSONRPCHandlerTest.java
@@ -1,4 +1,4 @@
-package io.a2a.jsonrpc.handler;
+package io.a2a.transport.jsonrpc.server.handler;
import java.util.ArrayList;
import java.util.Collections;
@@ -58,6 +58,7 @@
import io.a2a.spec.TaskStatusUpdateEvent;
import io.a2a.spec.TextPart;
import io.a2a.spec.UnsupportedOperationError;
+import io.a2a.transport.jsonrpc.server.handler.JSONRPCHandler;
import mutiny.zero.ZeroPublisher;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
diff --git a/transport/spi/pom.xml b/transport/spi/pom.xml
new file mode 100644
index 000000000..1e284687f
--- /dev/null
+++ b/transport/spi/pom.xml
@@ -0,0 +1,29 @@
+
+
+ 4.0.0
+
+
+ io.github.a2asdk
+ a2a-java-sdk-parent
+ 0.2.6.Beta1-SNAPSHOT
+ ../../pom.xml
+
+ a2a-java-sdk-transport-spi
+
+ jar
+
+ Java SDK A2A Transport: SPI
+ Java SDK for the Agent2Agent Protocol (A2A) - SPI
+
+
+
+ io.github.a2asdk
+ a2a-java-sdk-spec
+ ${project.version}
+
+
+
+
+
diff --git a/transport/spi/src/main/java/io/a2a/transport/spi/client/Transport.java b/transport/spi/src/main/java/io/a2a/transport/spi/client/Transport.java
new file mode 100644
index 000000000..9ab643b56
--- /dev/null
+++ b/transport/spi/src/main/java/io/a2a/transport/spi/client/Transport.java
@@ -0,0 +1,109 @@
+package io.a2a.transport.spi.client;
+
+import io.a2a.spec.*;
+
+import java.util.List;
+import java.util.function.Consumer;
+
+public interface Transport {
+
+ /**
+ * Send a message to the remote agent.
+ *
+ * @param requestId the request ID to use
+ * @param messageSendParams the parameters for the message to be sent
+ * @return the response, may contain a message or a task
+ * @throws A2AServerException if sending the message fails for any reason
+ */
+ EventKind sendMessage(String requestId, MessageSendParams messageSendParams) throws A2AServerException;
+
+ /**
+ * Retrieve the generated artifacts for a task.
+ *
+ * @param requestId the request ID to use
+ * @param taskQueryParams the params for the task to be queried
+ * @return the response containing the task
+ * @throws A2AServerException if retrieving the task fails for any reason
+ */
+ Task getTask(String requestId, TaskQueryParams taskQueryParams) throws A2AServerException;
+
+ /**
+ * Cancel a task that was previously submitted to the A2A server.
+ *
+ * @param requestId the request ID to use
+ * @param taskIdParams the params for the task to be cancelled
+ * @return the response indicating if the task was cancelled
+ * @throws A2AServerException if retrieving the task fails for any reason
+ */
+ Task cancelTask(String requestId, TaskIdParams taskIdParams) throws A2AServerException;
+
+ /**
+ * Get the push notification configuration for a task.
+ *
+ * @param requestId the request ID to use
+ * @param getTaskPushNotificationConfigParams the params for the task
+ * @return the response containing the push notification configuration
+ * @throws A2AServerException if getting the push notification configuration fails for any reason
+ */
+ TaskPushNotificationConfig getTaskPushNotificationConfig(String requestId, GetTaskPushNotificationConfigParams getTaskPushNotificationConfigParams) throws A2AServerException;
+
+ /**
+ * Set push notification configuration for a task.
+ *
+ * @param requestId the request ID to use
+ * @param taskId the task ID
+ * @param pushNotificationConfig the push notification configuration
+ * @return the response indicating whether setting the task push notification configuration succeeded
+ * @throws A2AServerException if setting the push notification configuration fails for any reason
+ */
+ TaskPushNotificationConfig setTaskPushNotificationConfig(String requestId, String taskId,
+ PushNotificationConfig pushNotificationConfig) throws A2AServerException;
+
+ /**
+ * Retrieves the push notification configurations for a specified task.
+ *
+ * @param requestId the request ID to use
+ * @param listTaskPushNotificationConfigParams the params for retrieving the push notification configuration
+ * @return the response containing the push notification configuration
+ * @throws A2AServerException if getting the push notification configuration fails for any reason
+ */
+ List listTaskPushNotificationConfig(String requestId,
+ ListTaskPushNotificationConfigParams listTaskPushNotificationConfigParams) throws A2AServerException;
+
+ /**
+ * Delete the push notification configuration for a specified task.
+ *
+ * @param requestId the request ID to use
+ * @param deleteTaskPushNotificationConfigParams the params for deleting the push notification configuration
+ * @throws A2AServerException if deleting the push notification configuration fails for any reason
+ */
+ void deleteTaskPushNotificationConfig(String requestId,
+ DeleteTaskPushNotificationConfigParams deleteTaskPushNotificationConfigParams) throws A2AServerException;
+
+ /**
+ * Send a streaming message to the remote agent.
+ *
+ * @param requestId the request ID to use
+ * @param messageSendParams the parameters for the message to be sent
+ * @param eventHandler a consumer that will be invoked for each event received from the remote agent
+ * @param errorHandler a consumer that will be invoked if the remote agent returns an error
+ * @param failureHandler a consumer that will be invoked if a failure occurs when processing events
+ * @throws A2AServerException if sending the streaming message fails for any reason
+ */
+ void sendStreamingMessage(String requestId, MessageSendParams messageSendParams, Consumer eventHandler,
+ Consumer errorHandler, Runnable failureHandler) throws A2AServerException;
+
+ /**
+ * Resubscribe to an ongoing task.
+ *
+ * @param requestId the request ID to use
+ * @param taskIdParams the params for the task to resubscribe to
+ * @param eventHandler a consumer that will be invoked for each event received from the remote agent
+ * @param errorHandler a consumer that will be invoked if the remote agent returns an error
+ * @param failureHandler a consumer that will be invoked if a failure occurs when processing events
+ * @throws A2AServerException if resubscribing to the task fails for any reason
+ */
+ void resubscribeToTask(String requestId, TaskIdParams taskIdParams, Consumer eventHandler,
+ Consumer errorHandler, Runnable failureHandler) throws A2AServerException;
+
+}