Skip to content

Conversation

@Arthur-Jacobina
Copy link

feat(dspy): Add DSPy A2A Agent Example with Memory

Summary

This PR adds a new A2A agent example that demonstrates integration with DSPy, a framework for programming with language models. The agent showcases conversational memory persisted by context_id, structured reasoning, and observability.

What's New

  • DSPy A2A Agent: Full implementation of an A2A-compliant agent using DSPy's ChainOfThought module for structured reasoning
  • Memory Integration: Persistent conversational memory using Mem0 for context-aware interactions across sessions
  • Observability: Integrated tracing and monitoring with Braintrust
  • Modular Architecture: Clean separation of concerns with dedicated modules for execution, memory, and agent logic

Files Added

  • samples/python/agents/a2a_dspy/
    • __main__.py - Server entry point with CORS support
    • executor.py - A2A executor implementation with memory operations
    • agents/dspy_example.py - DSPy agent with custom signature and Chain-of-Thought reasoning
    • memory/base.py - Abstract memory interface
    • memory/mem0.py - Mem0 memory implementation
    • logger.py - Logging configuration
    • test_client.py - Test client for interaction
    • pyproject.toml - Dependencies and project metadata
    • .env.sample - Environment configuration template
    • README.md - Comprehensive documentation

Key Features

  • ✅ DSPy ChainOfThought reasoning for structured responses
  • ✅ User-specific memory with semantic retrieval
  • ✅ Persistent context across conversation sessions
  • ✅ Full observability with execution tracing
  • ✅ A2A protocol compliant (tasks + conversations)
  • ✅ Comprehensive documentation and examples

Testing

Start the server:

cd samples/python/agents/a2a_dspy
uv run .

Test with the included client:

uv run test_client.py

Or use the CLI host:

cd samples/python/hosts/cli
uv run . --agent http://localhost:10020

Requirements

  • Python 3.10+
  • OpenAI API Key
  • Mem0 API Key
  • Braintrust API Key (optional)

Related

This example complements existing agent samples by demonstrating:

  • Integration with DSPy for declarative LLM programming
  • Some memory management patterns
  • Acceptable observability practices

CLA is signed by [email protected]
image

Feel free to reach out.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @Arthur-Jacobina, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a new, fully functional example of an Agent-to-Agent (A2A) server that leverages the DSPy framework for programming large language models. The primary goal is to demonstrate how to build an A2A-compliant agent capable of structured reasoning, persistent conversational memory via Mem0, and integrated observability through Braintrust. This example provides a robust blueprint for developing context-aware and traceable AI agents within the A2A ecosystem.

Highlights

  • New DSPy A2A Agent Example: Introduces a comprehensive example of an Agent-to-Agent (A2A) server built with DSPy, showcasing structured reasoning using DSPy's ChainOfThought module.
  • Conversational Memory Integration: Implements persistent conversational memory using Mem0, allowing the agent to maintain context across sessions based on a unique context_id.
  • Enhanced Observability: Integrates Braintrust for detailed tracing and monitoring of agent execution, LLM calls, and memory operations, providing insights into the agent's behavior.
  • Modular Architecture: The example follows a modular design with dedicated components for agent logic, execution, and memory management, promoting clarity and maintainability.
  • Comprehensive Documentation and Testing: Includes a detailed README.md for setup and usage, along with a test client to demonstrate multi-turn conversations and streaming capabilities.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new A2A agent example using DSPy, which is a great addition. The implementation is well-structured, demonstrating conversational memory with Mem0 and observability with Braintrust. My review focuses on improving code quality, maintainability, and performance. I've identified several areas for improvement, including handling blocking I/O in an async context, cleaning up unused code, and improving configuration flexibility. The most critical feedback relates to making memory operations non-blocking to ensure the async server performs well. Overall, this is a solid contribution that provides a valuable example for developers.

Comment on lines 38 to 56
@traced
def save(self, user_id: str, user_input: str, assistant_response: str):
"""Save the interaction to Mem0"""
try:
interaction = [
{
"role": "user",
"content": user_input
},
{
"role": "assistant",
"content": assistant_response
}
]
result = mem0.add(interaction, user_id=user_id)
current_span().log(metadata={"memory_saved": result, "user_id": user_id})
print(f"Memory saved successfully: {len(result.get('results', []))} memories added")
except Exception as e:
print(f"Error saving interaction: {e}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The save method performs a blocking I/O operation (mem0.add). In an asyncio application, this will block the event loop and hurt performance. This method should be async, and the blocking call should be run in a separate thread using asyncio.to_thread. You will need to import asyncio at the top of the file.

Suggested change
@traced
def save(self, user_id: str, user_input: str, assistant_response: str):
"""Save the interaction to Mem0"""
try:
interaction = [
{
"role": "user",
"content": user_input
},
{
"role": "assistant",
"content": assistant_response
}
]
result = mem0.add(interaction, user_id=user_id)
current_span().log(metadata={"memory_saved": result, "user_id": user_id})
print(f"Memory saved successfully: {len(result.get('results', []))} memories added")
except Exception as e:
print(f"Error saving interaction: {e}")
@traced
async def save(self, user_id: str, user_input: str, assistant_response: str):
"""Save the interaction to Mem0"""
try:
interaction = [
{
"role": "user",
"content": user_input
},
{
"role": "assistant",
"content": assistant_response
}
]
result = await asyncio.to_thread(mem0.add, interaction, user_id=user_id)
current_span().log(metadata={"memory_saved": result, "user_id": user_id})
print(f"Memory saved successfully: {len(result.get('results', []))} memories added")
except Exception as e:
print(f"Error saving interaction: {e}")

Comment on lines 16 to 37
@traced
def retrieve(self, query: str, user_id: str) -> List[Dict]:
"""Retrieve relevant context from Mem0"""
try:
memories = mem0.search(query=query, user_id=user_id)
memory_list = memories
serialized_memories = ' '.join([mem["memory"] for mem in memory_list])
context = [
{
"role": "system",
"content": f"Relevant information: {serialized_memories}"
},
{
"role": "user",
"content": query
}
]
current_span().log(metadata={"memory_retrieved": context, "query": query, "user_id": user_id})
return context
except Exception as e:
print(f"Error retrieving memories: {e} at {traceback.format_exc()}")
return [{"role": "user", "content": query}]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The retrieve method performs a blocking I/O operation (mem0.search). In an asyncio application, this will block the event loop and degrade performance. This method should be async, and the blocking call should be run in a separate thread using asyncio.to_thread. You will need to import asyncio at the top of the file.

    @traced
    async def retrieve(self, query: str, user_id: str) -> List[Dict]:
        """Retrieve relevant context from Mem0"""
        try:
            memories = await asyncio.to_thread(mem0.search, query=query,  user_id=user_id)
            memory_list = memories
            serialized_memories = ' '.join([mem["memory"] for mem in memory_list])
            context = [
                {
                    "role": "system", 
                    "content": f"Relevant information: {serialized_memories}"
                },
                {
                    "role": "user",
                    "content": query
                }
            ]
            current_span().log(metadata={"memory_retrieved": context, "query": query, "user_id": user_id})
            return context
        except Exception as e:
            print(f"Error retrieving memories: {e} at {traceback.format_exc()}")
            return [{"role": "user", "content": query}]

Comment on lines 6 to 12
@abstractmethod
def save(self, user_id: str, user_input: str, assistant_response: str):
pass

@abstractmethod
def retrieve(self, query: str, user_id: str) -> List[Dict]:
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The save and retrieve methods are intended to perform I/O operations and are called from an async context in DspyAgentExecutor. They should be defined as async methods to avoid blocking the event loop. Also, adding a return type hint to save improves type safety.

    @abstractmethod
    async def save(self, user_id: str, user_input: str, assistant_response: str) -> None:
        pass

    @abstractmethod
    async def retrieve(self, query: str, user_id: str) -> List[Dict]:
        pass

Comment on lines +78 to +79
def _validate_request(self, context: RequestContext) -> bool:
return False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The _validate_request method is a stub that always returns False. Consequently, the validation check if error: on line 43 will never pass, and validation is effectively disabled. This is misleading. If validation is not required, this method and the check should be removed. If it's a placeholder for future implementation, it should raise a NotImplementedError or include a TODO comment to clarify its status.

Comment on lines 36 to 56
print(f"Error retrieving memories: {e} at {traceback.format_exc()}")
return [{"role": "user", "content": query}]
@traced
def save(self, user_id: str, user_input: str, assistant_response: str):
"""Save the interaction to Mem0"""
try:
interaction = [
{
"role": "user",
"content": user_input
},
{
"role": "assistant",
"content": assistant_response
}
]
result = mem0.add(interaction, user_id=user_id)
current_span().log(metadata={"memory_saved": result, "user_id": user_id})
print(f"Memory saved successfully: {len(result.get('results', []))} memories added")
except Exception as e:
print(f"Error saving interaction: {e}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using print() for logging and error reporting is not ideal for production code. It's better to use a configured logger (e.g., from Python's logging module) for structured, leveled, and configurable logs. Additionally, the except block in the save method silently swallows the exception, which can hide critical issues. The error should be logged, and you should consider re-raising it or handling it in a way that informs the caller of the failure.

"""Retrieve relevant context from Mem0"""
try:
memories = mem0.search(query=query, user_id=user_id)
memory_list = memories
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

The line memory_list = memories is redundant. You can use the memories variable directly in the following line.

Comment on lines +31 to +43
description='A simple DSPy agent that can answer questions and remember user interactions.',
tags=['DSPy', 'Memory', 'Mem0'],
examples=[
'What is the capital of France?',
'What did I ask you about earlier?',
'Remember that I prefer morning meetings.',
],
)

agent_executor = DspyAgentExecutor()
agent_card = AgentCard(
name='DSPy Agent',
description='A simple DSPy agent that can answer questions and remember user interactions.',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

The description string 'A simple DSPy agent that can answer questions and remember user interactions.' is duplicated in the AgentSkill and AgentCard definitions. It's good practice to define this as a constant to improve maintainability and avoid potential inconsistencies.

Comment on lines 4 to 11
import os

import click
import uvicorn
from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware
from starlette.requests import Request
from starlette.responses import JSONResponse, RedirectResponse
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

There are several unused imports in this file: os, Middleware, Request, JSONResponse, and RedirectResponse. Please remove them to keep the code clean.

Suggested change
import os
import click
import uvicorn
from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware
from starlette.requests import Request
from starlette.responses import JSONResponse, RedirectResponse
import logging
import click
import uvicorn
from starlette.middleware.cors import CORSMiddleware
from a2a.server.apps import A2AStarletteApplication

[project]
name = "a2a-dspy"
version = "0.1.0"
description = "Add your description here"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

The project description is a placeholder. Please provide a meaningful description for the project.

Suggested change
description = "Add your description here"
description = "A2A agent example using DSPy with conversational memory."

version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

The Python version requirement is set to >=3.13. However, the README.md specifies 3.10+. Please ensure these are consistent. If 3.13 is a strict requirement, the README should be updated. If it can run on older versions, consider lowering this requirement to match the documentation and broaden compatibility.

@Arthur-Jacobina
Copy link
Author

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive example of an A2A agent using DSPy, including memory with Mem0 and observability with Braintrust. The implementation is well-structured and provides a great starting point. My review includes suggestions to improve configuration consistency, robustness in handling user input, and code clarity. Specifically, I've pointed out a mismatch in Python versions between configuration files, potential issues with processing multipart user messages, and opportunities to make the code more idiomatic and maintainable.

line-length = 80 # Google Style Guide §3.2: 80 columns
indent-width = 4 # Google Style Guide §3.4: 4 spaces

target-version = "py310" # Minimum Python version
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The target-version is set to py310, but pyproject.toml and README.md specify Python 3.13. To ensure Ruff's rules and fixes are appropriate for the project's actual Python version, this should be updated for consistency.

Suggested change
target-version = "py310" # Minimum Python version
target-version = "py313" # Minimum Python version


await updater.start_work()

query = context.get_user_input()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The user input from context.get_user_input() is directly used as query. This is not robust, as get_user_input() can return a list[Part]. Casting a list to a string with str() later on will produce its Python representation (e.g., '[<TextPart...>]'), not the user's message.

You should process the input to extract the text content. For example:

user_input = context.get_user_input()
query_text = (
    ' '.join(p.text for p in user_input if isinstance(p, TextPart))
    if isinstance(user_input, list)
    else str(user_input)
)
# Then use query_text in calls to memory.retrieve, agent, etc.

Comment on lines 34 to 57
except Exception as e: # noqa: BLE001
current_span().log(metadata={'error': e, 'traceback': traceback.format_exc()})
return [{'role': 'user', 'content': query}]
else:
return context

@traced
async def save(self, user_id: str, user_input: str, assistant_response: str) -> None:
"""Save the interaction to Mem0."""
try:
interaction = [
{
'role': 'user',
'content': user_input
},
{
'role': 'assistant',
'content': assistant_response
}
]
result = await mem0.add(interaction, user_id=user_id)
current_span().log(metadata={'memory_saved': result, 'user_id': user_id})
except Exception as e: # noqa: BLE001
current_span().log(metadata={'error': e, 'traceback': traceback.format_exc()})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The try...except Exception blocks in both retrieve and save methods catch a broad Exception, which can hide bugs and make debugging difficult. It's better to catch more specific exceptions that you expect from the mem0 client (e.g., for network errors or API errors). Additionally, logging the exception object e directly might leak sensitive information if the exception message contains it. Consider logging str(e) after sanitizing it, or logging a generic error message.

"TD003", # Ignore Missing issue link in TODOs (often not required/available)
"T201", # Ignore print presence
"RUF012", # Ignore Mutable class attributes should be annotated with `typing.ClassVar`
"UP007", # Do not upgrade `Optional[T]` to `T | None` (PEP 604 syntax)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

The rule UP007 is ignored, which prevents upgrading type hints like Optional[T] to the more modern T | None syntax (PEP 604). Since the project requires Python 3.13, which fully supports this syntax, it's recommended to remove this from the ignore list to encourage modern, cleaner code.


## Files

* `__main__.py`: The main entry point that configures and starts the A2A server with CORS support.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

The documentation refers to __main__.py as the main entry point, but the actual file is named run.py. This should be corrected to avoid confusion for users trying to understand the project structure.

Suggested change
* `__main__.py`: The main entry point that configures and starts the A2A server with CORS support.
* `run.py`: The main entry point that configures and starts the A2A server with CORS support.

Comment on lines +75 to +76
def _validate_request(self, context: RequestContext) -> bool:
return False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

The _validate_request method currently performs no validation and always returns False. While this may be acceptable for an example, its purpose and return value are not immediately clear. Adding a docstring and a TODO comment would improve clarity and serve as a reminder to implement proper validation for production use. For example:

def _validate_request(self, context: RequestContext) -> bool:
    """Validate the incoming request. For this example, we assume all requests are valid.

    Returns:
        bool: True if there is a validation error, False otherwise.
    """
    # TODO: Implement request validation for production use.
    return False


set_masking_function(mask_sensitive_data)

logger = init_logger(project='My Project', api_key=os.getenv('BRAINTRUST_API_KEY'))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

The Braintrust project name is hardcoded as 'My Project'. It would be more helpful for observability to use a more specific name, such as 'a2a-dspy-agent', to easily identify this agent's logs in Braintrust. Consider making this configurable via an environment variable as well.

Suggested change
logger = init_logger(project='My Project', api_key=os.getenv('BRAINTRUST_API_KEY'))
logger = init_logger(project='a2a-dspy-agent', api_key=os.getenv('BRAINTRUST_API_KEY'))

from pydantic import BaseModel


class Memory(BaseModel):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

The Memory class is defined as a pydantic.BaseModel but is used as an abstract interface with @abstractmethod. For defining an interface, it is more idiomatic in Python to use abc.ABC. While using BaseModel is not incorrect, ABC more clearly communicates the intent that this is an abstract base class defining a contract. You would also need to change the imports from from abc import abstractmethod to from abc import ABC, abstractmethod and remove the pydantic import.

Suggested change
class Memory(BaseModel):
class Memory(ABC):

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/gemini review

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing the suggestion! The Memory class now correctly inherits from abc.ABC, which is the idiomatic way to define abstract base classes in Python. This improves the clarity and intent of the interface.

@Arthur-Jacobina
Copy link
Author

@holtskinner I'm tagging you to get some help.

In a short, I ran

uv run ruff check --fix

with some variants like format, --unsafe-fixes etc but I still keep getting this linter error. Is there a specific ruff config other than the one at https://github.com/a2aproject/a2a-samples/blob/main/.ruff.toml?

I'd be really happy to get some help from you (or anyone who knows how to fix these errors) !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant