feat: add basic A2A workflow adapter#1404
Conversation
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds minimal Agent2Agent (A2A) support to the workflow service by introducing discovery and message forwarding endpoints, plus wiring them into the existing FastAPI router setup.
Changes:
- Introduced A2A Pydantic entities and FastAPI endpoints (agent card discovery + message forwarding).
- Registered new routers (root
/.well-knowndiscovery +/workflow/v1/a2a/*endpoints) in the app routing. - Added the A2A send endpoint to the “open API paths” allowlist.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| core/workflow/main.py | Registers a new root router to serve A2A discovery endpoints. |
| core/workflow/extensions/fastapi/base.py | Adds the A2A send endpoint to the open-path allowlist. |
| core/workflow/domain/entities/a2a.py | Introduces Pydantic models for A2A agent card + message sending. |
| core/workflow/api/v1/router.py | Wires A2A routers into existing v1 routing (both root discovery and workflow-prefixed API). |
| core/workflow/api/v1/chat/a2a.py | Implements A2A discovery endpoints and forwards messages into chat_open. |
| core/workflow/api/v1/chat/init.py | Exports A2A routers for inclusion in the v1 router module. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| CHAT_OPEN_API_PATHS = [ | ||
| "/workflow/v1/chat/completions", | ||
| "/workflow/v1/a2a/message:send", | ||
| "/workflow/v1/resume", | ||
| ] |
| parameters = dict(request.parameters) | ||
| parameters.setdefault("input", request.text()) |
| class A2AMessage(BaseModel): | ||
| """Subset of the A2A message shape needed for text chat.""" | ||
|
|
||
| role: str = "user" | ||
| parts: List[A2ATextPart] | ||
| message_id: Optional[str] = Field("", alias="messageId") |
|
|
||
| def build_agent_card() -> A2AAgentCard: | ||
| """Build public A2A discovery metadata for Astron Agent workflows.""" | ||
| base_url = os.getenv("A2A_PUBLIC_BASE_URL", "") |
| "forwarding to published workflows." | ||
| ), | ||
| version=os.getenv("A2A_AGENT_VERSION", "0.1.0"), | ||
| url=f"{base_url}/workflow/v1/a2a/message:send" if base_url else "", |
There was a problem hiding this comment.
Code Review
This pull request introduces minimal Agent2Agent (A2A) endpoints for published workflows, including public discovery metadata and a synchronous text message adapter mapping A2A messages to the existing workflow chat completion API. Feedback suggests avoiding decorating a single function with multiple routes to prevent duplicate OpenAPI operationIds, adding validation to reject empty messages early, and removing the new endpoint from CHAT_OPEN_API_PATHS to ensure standard JSON error payloads are returned instead of forcing SSE format for early validation failures.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| @discovery_router.get("/.well-known/agent-card.json", response_model=A2AAgentCard) | ||
| @router.get("/agent-card.json", response_model=A2AAgentCard) | ||
| @router.get("/.well-known/agent-card.json", response_model=A2AAgentCard) | ||
| async def agent_card() -> A2AAgentCard: | ||
| """ | ||
| Return public A2A discovery metadata for Astron Agent workflows. | ||
| """ | ||
| return build_agent_card() |
There was a problem hiding this comment.
Decorating a single function with multiple routes across different routers can cause FastAPI to generate duplicate operationIds in the OpenAPI schema, which violates the OpenAPI specification and can break client generation tools. It is highly recommended to define separate, clearly named handler functions for each route to ensure unique operation IDs and cleaner routing.
@discovery_router.get("/.well-known/agent-card.json", response_model=A2AAgentCard)
async def get_well_known_agent_card() -> A2AAgentCard:
"""Return public A2A discovery metadata at the root well-known path."""
return build_agent_card()
@router.get("/agent-card.json", response_model=A2AAgentCard)
async def get_a2a_agent_card() -> A2AAgentCard:
"""Return public A2A discovery metadata at the versioned path."""
return build_agent_card()
@router.get("/.well-known/agent-card.json", response_model=A2AAgentCard)
async def get_a2a_well_known_agent_card() -> A2AAgentCard:
"""Return public A2A discovery metadata at the versioned well-known path."""
return build_agent_card()| async def send_message( | ||
| x_consumer_username: Annotated[str, Header()], | ||
| request: A2ASendMessageRequest, | ||
| ) -> Union[StreamingResponse, JSONResponse]: | ||
| """ | ||
| Map an A2A text message to the existing workflow chat completion API. | ||
| """ | ||
| parameters = dict(request.parameters) | ||
| parameters.setdefault("input", request.text()) |
There was a problem hiding this comment.
If the incoming A2A message contains no text parts or only empty text, request.text() will return an empty string. Forwarding an empty input to the downstream workflow engine can cause unexpected failures or errors. Adding a validation check to reject empty messages early improves robustness and adheres to defensive programming principles.
async def send_message(
x_consumer_username: Annotated[str, Header()],
request: A2ASendMessageRequest,
) -> Union[StreamingResponse, JSONResponse]:
"""
Map an A2A text message to the existing workflow chat completion API.
"""
message_text = request.text().strip()
if not message_text:
from fastapi import HTTPException
raise HTTPException(status_code=400, detail="Message content cannot be empty")
parameters = dict(request.parameters)
parameters.setdefault("input", message_text)| CHAT_OPEN_API_PATHS = [ | ||
| "/workflow/v1/chat/completions", | ||
| "/workflow/v1/a2a/message:send", | ||
| "/workflow/v1/resume", | ||
| ] |
There was a problem hiding this comment.
Including /workflow/v1/a2a/message:send in CHAT_OPEN_API_PATHS forces all validation and middleware errors for this endpoint to be returned as SSE (text/event-stream) responses. However, A2A clients may send non-streaming requests (where stream=False) or expect standard JSON error payloads for HTTP-level failures (like 400 Bad Request). Returning SSE format for early validation errors will break standard HTTP clients. It is safer to remove this path from CHAT_OPEN_API_PATHS so that early errors are returned as standard JSON, while active stream errors are still handled gracefully within the handler.
CHAT_OPEN_API_PATHS = [
"/workflow/v1/chat/completions",
"/workflow/v1/resume",
]Signed-off-by: Ziliang-H <[email protected]>
65b65d3 to
1495b28
Compare
|
|
|
Follow-up update: I pushed commit
Current remaining blocker appears to be CLA assistant, which I will handle from the account side. |
Summary
/.well-known/agent-card.jsonand versioned/workflow/v1/a2a/agent-card.jsonagent card/workflow/v1/a2a/message:sendtext-message adapter that maps A2A requests onto the existing workflow chat completion pathCloses #709
Validation
python -m compileall core\workflow\main.py core\workflow\api\v1\router.py core\workflow\api\v1\chat\__init__.py core\workflow\api\v1\chat\a2a.py core\workflow\domain\entities\a2a.py core\workflow\extensions\fastapi\base.pygit diff --checkNote: full pytest execution was not available in the local environment because
pytestand runtime dependencies such aspydanticare not installed in the system Python.