본문 바로가기
AI/모델,학습

소스 코드가 포함된 Python A2A 자습서

by 주호파파 2025. 5. 14.
728x90
반응형

Python A2A 튜토리얼 소스 코드

목차

  • 소개
  • 환경 설정
  • 프로젝트 만들기
  • 상담원 스킬
  • 에이전트 카드
  • A2A 서버
  • A2A 서버와 상호 작용
  • 에이전트 기능 추가
  • 로컬 Ollama 모델 사용

소개

이 자습서에서는 Python을 사용하여 간단한 echo A2A 서버를 구축합니다. 이 베어본 구현은 A2A가 제공해야 하는 모든 기능을 보여줍니다. 이 자습서를 따라 Ollama 또는 Google의 에이전트 개발 키트를 사용하여 에이전트 기능을 추가할 수 있습니다.

학습할 내용:

  • A2A의 기본 개념
  • Python에서 A2A 서버를 만드는 방법
  • A2A 서버와 상호 작용
  • 에이전트 역할을 할 학습된 모델 추가

환경 설정

필요한 것

  • 커서/VsCode와 같은 코드 편집기
  • 터미널(Linux), iTerm/Warp(Mac) 또는 커서의 터미널과 같은 명령 프롬프트

Python 환경

uv를 패키지 관리자로 사용하고 프로젝트를 설정할 것입니다.

우리가 사용할 A2A 라이브러리에는 일치하는 버전이 아직 없는 경우 설치할 수 있는 uv가 필요합니다. 우리는 파이썬 3.12를 사용할 것입니다.python >= 3.12

검사

다음 명령을 실행하여 다음 단계를 수행할 준비가 되었는지 확인합니다.

echo 'import sys; print(sys.version)' | uv run -

다음과 유사한 내용이 표시되면 계속 진행할 준비가 된 것입니다!

3.12.3 (main, Feb 4 2025, 14:48:35) [GCC 13.3.0]

내 환경

  • 파이썬 3.13
  • uv : uv 0.7.2 (Homebrew 2025-04-30)
  • Warp
  • ollama 0.6.7(Qwen3 지원)
  • macOs Sequoia 15.4.1

프로젝트 만들기

먼저 를 사용하여 프로젝트를 만들어 보겠습니다. 테스트를 추가하거나 나중에 프로젝트를 게시하려는 경우 플래그를 추가합니다.uv--package

uv init --package my-project
cd my-project

가상 환경 사용

이 프로젝트에 대한 venv를 만들 것입니다. 이 작업은 한 번만 수행하면 됩니다.

uv venv .venv

이 창과 앞으로 여는 터미널 창에 대해이 venv를 가져와야합니다.

source .venv/bin/activate

VS Code와 같은 코드 편집기를 사용하는 경우 코드 완성을 위해 Python 인터프리터를 설정할 수 있습니다. VS Code에서 를 누르고 선택합니다. 그런 다음 프로젝트를 선택한 다음 올바른 파이썬 인터프리터를 선택하십시오. Ctrl-Shift-PPython: Select Interpretermy-projectPython 3.12.3 ('.venv':venv) ./.venv/bin/python

이제 소스 코드는 다음과 유사해야 합니다.

# my-project
tree
.
|____pyproject.toml
|____README.md
|____.venv
| |____bin
| | |____activate.bat
| | |____activate.ps1
| | |____python3
| | |____python
| | |____activate.fish
| | |____pydoc.bat
| | |____activate_this.py
| | |____activate
| | |____activate.nu
| | |____deactivate.bat
| | |____python3.13
| | |____activate.csh
| |____pyvenv.cfg
| |____CACHEDIR.TAG
| |____.gitignore
| |____lib
| | |____python3.13
| | | |____site-packages
| | | | |_____virtualenv.py
| | | | |_____virtualenv.pth
|____.python-version
|____src
| |____my_project
| | |______init__.py

Google-A2A Python 라이브러리 추가

다음으로 Google의 샘플 A2A Python 라이브러리를 추가합니다.

uv add git+https://github.com/google/A2A#subdirectory=samples/python

pyproject.toml을 사용합니다.

[project]
name = "my-project"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
authors = [
    { name = "zhangcheng", email = "zh.milo@gmail.com" }
]
requires-python = ">=3.13"
dependencies = [
    "a2a-samples",
]

[project.scripts]
my-project = "my_project:main"

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.uv.sources]
a2a-samples = { git = "https://github.com/google/A2A", subdirectory = "samples/python" }

프로젝트 구조 설정

이제 나중에 사용할 파일을 만들어 보겠습니다.

touch src/my_project/agent.py
touch src/my_project/task_manager.py

테스트 실행

모든 것이 올바르게 설정되었다면 이제 애플리케이션을 실행할 수 있습니다.

uv run my-project

출력은 다음과 같아야 합니다.

Hello from my-project!

상담원 스킬

에이전트 스킬은 에이전트가 수행할 수 있는 기능 집합입니다. 다음은 에코 에이전트에 대한 모습의 예입니다.

{
  id: "my-project-echo-skill"
  name: "Echo Tool",
  description: "Echos the input given",
  tags: ["echo", "repeater"],
  examples: ["I will see this echoed back to me"],
  inputModes: ["text"],
  outputModes: ["text"]
}

이는 에이전트 카드의 기술 섹션을 준수합니다.

{
  id: string; // unique identifier for the agent's skill
  name: string; //human readable name of the skill
  // description of the skill - will be used by the client or a human
  // as a hint to understand what the skill does.
  description: string;
  // Set of tag words describing classes of capabilities for this specific
  // skill (e.g. "cooking", "customer support", "billing")
  tags: string[];
  // The set of example scenarios that the skill can perform.
  // Will be used by the client as a hint to understand how the skill can be
  // used. (e.g. "I need a recipe for bread")
  examples?: string[]; // example prompts for tasks
  // The set of interaction modes that the skill supports
  // (if different than the default)
  inputModes?: string[]; // supported mime types for input
  outputModes?: string[]; // supported mime types for output
}

구현

이 에이전트 스킬을 코드로 만들어 보겠습니다. 내용을 열고 다음 코드로 바꿉니다.src/my-project/__init__.py

import google_a2a
from google_a2a.common.types import AgentSkill

def main():
  skill = AgentSkill(
    id="my-project-echo-skill",
    name="Echo Tool",
    description="Echos the input given",
    tags=["echo", "repeater"],
    examples=["I will see this echoed back to me"],
    inputModes=["text"],
    outputModes=["text"],
  )
  print(skill)

if __name__ == "__main__":
  main()

모듈에 대한 오류가 발생하면 다음을 시도하십시오.

from common.types import AgentSkill

# same code

테스트 실행

이것을 실행해 보겠습니다.

uv run my-project

출력은 다음과 같아야 합니다.

id='my-project-echo-skill' name='Echo Tool' description='Echos the input given' tags=['echo', 'repeater'] examples=['I will see this echoed back to me'] inputModes=['text'] outputModes=['text']

에이전트 카드

이제 기술을 정의했으므로 에이전트 카드를 만들 수 있습니다.

원격 에이전트는 인증 메커니즘 외에도 에이전트의 기능과 기술을 설명하는 JSON 형식의 에이전트 카드를 게시해야 합니다. 즉, 이를 통해 전 세계가 에이전트에 대해 알리고 에이전트와 상호 작용하는 방법을 알 수 있습니다.

구현

먼저 명령 줄 인수를 구문 분석하기위한 몇 가지 도우미를 추가 할 수 있습니다. 이것은 나중에 서버를 시작하는 데 도움이 될 것입니다.

uv add click

그리고 코드를 업데이트하십시오.

import logging

import click
import google_a2a
from google_a2a.common.types import AgentSkill, AgentCapabilities, AgentCard

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@click.command()
@click.option("--host", default="localhost")
@click.option("--port", default=10002)
def main(host, port):
  skill = AgentSkill(
    id="my-project-echo-skill",
    name="Echo Tool",
    description="Echos the input given",
    tags=["echo", "repeater"],
    examples=["I will see this echoed back to me"],
    inputModes=["text"],
    outputModes=["text"],
  )
  logging.info(skill)

if __name__ == "__main__":
  main()

다음으로 에이전트 카드를 추가하겠습니다.

# ...
def main(host, port):
  # ...
  capabilities = AgentCapabilities()
  agent_card = AgentCard(
    name="Echo Agent",
    description="This agent echos the input given",
    url=f"http://{host}:{port}/",
    version="0.1.0",
    defaultInputModes=["text"],
    defaultOutputModes=["text"],
    capabilities=capabilities,
    skills=[skill]
  )
  logging.info(agent_card)

if __name__ == "__main__":
  main()

테스트 실행

이것을 실행해 보겠습니다.

uv run my-project

출력은 다음과 같아야 합니다.

INFO:root:id='my-project-echo-skill' name='Echo Tool' description='Echos the input given' tags=['echo', 'repeater'] examples=['I will see this echoed back to me'] inputModes=['text'] outputModes=['text']
INFO:root:name='Echo Agent' description='This agent echos the input given' url='http://localhost:10002/' provider=None version='0.1.0' documentationUrl=None capabilities=AgentCapabilities(streaming=False, pushNotifications=False, stateTransitionHistory=False) authentication=None defaultInputModes=['text'] defaultOutputModes=['text'] skills=[AgentSkill(id='my-project-echo-skill', name='Echo Tool', description='Echos the input given', tags=['echo', 'repeater'], examples=['I will see this echoed back to me'], inputModes=['text'], outputModes=['text'])]

A2A 서버

서버를 시작할 준비가 거의 다 되었습니다! 우리는 후드 아래에서 uvicorn 서버를 시작하는 클래스를 사용할 것입니다.A2AServerGoogle-A2A

작업 관리자

서버를 만들기 전에 들어오는 요청을 처리하기 위한 작업 관리자가 필요합니다.

InMemoryTaskManager 인터페이스를 구현하려면 두 가지 방법을 구현해야 합니다.

async def on_send_task(
  self,
  request: SendTaskRequest
) -> SendTaskResponse:
  """
  This method queries or creates a task for the agent.
  The caller will receive exactly one response.
  """
  pass

async def on_send_task_subscribe(
  self,
  request: SendTaskStreamingRequest
) -> AsyncIterable[SendTaskStreamingResponse] | JSONRPCResponse:
  """
  This method subscribes the caller to future updates regarding a task.
  The caller will receive a response and additionally receive subscription
  updates over a session established between the client and the server
  """
  pass

다음 코드를 열고 추가합니다. 단순히 직접 에코 응답을 반환하고 세션이나 구독 없이 작업을 즉시 완료로 표시합니다.src/my_project/task_manager.py

from typing import AsyncIterable

import google_a2a
from google_a2a.common.server.task_manager import InMemoryTaskManager
from google_a2a.common.types import (
  Artifact,
  JSONRPCResponse,
  Message,
  SendTaskRequest,
  SendTaskResponse,
  SendTaskStreamingRequest,
  SendTaskStreamingResponse,
  Task,
  TaskState,
  TaskStatus,
  TaskStatusUpdateEvent,
)

class MyAgentTaskManager(InMemoryTaskManager):
  def __init__(self):
    super().__init__()

  async def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse:
    # Upsert a task stored by InMemoryTaskManager
    await self.upsert_task(request.params)

    task_id = request.params.id
    # Our custom logic that simply marks the task as complete
    # and returns the echo text
    received_text = request.params.message.parts[0].text
    task = await self._update_task(
      task_id=task_id,
      task_state=TaskState.COMPLETED,
      response_text=f"on_send_task received: {received_text}"
    )

    # Send the response
    return SendTaskResponse(id=request.id, result=task)

  async def on_send_task_subscribe(
    self,
    request: SendTaskStreamingRequest
  ) -> AsyncIterable[SendTaskStreamingResponse] | JSONRPCResponse:
    pass

  async def _update_task(
    self,
    task_id: str,
    task_state: TaskState,
    response_text: str,
  ) -> Task:
    task = self.tasks[task_id]
    agent_response_parts = [
      {
        "type": "text",
        "text": response_text,
      }
    ]
    task.status = TaskStatus(
      state=task_state,
      message=Message(
        role="agent",
        parts=agent_response_parts,
      )
    )
    task.artifacts = [
      Artifact(
        parts=agent_response_parts,
      )
    ]
    return task

A2A 서버

작업 관리자가 완료되면 이제 서버를 만들 수 있습니다.

다음 코드를 열고 추가합니다.src/my_project/__init__.py

# ...
from google_a2a.common.server import A2AServer
from my_project.task_manager import MyAgentTaskManager
# ...
def main(host, port):
  # ...

  task_manager = MyAgentTaskManager()
  server = A2AServer(
    agent_card=agent_card,
    task_manager=task_manager,
    host=host,
    port=port,
  )
  server.start()

테스트 실행

이것을 실행해 보겠습니다.

uv run my-project

출력은 다음과 같아야 합니다.

INFO:root:id='my-project-echo-skill' name='Echo Tool' description='Echos the input given' tags=['echo', 'repeater'] examples=['I will see this echoed back to me'] inputModes=['text'] outputModes=['text']
INFO:root:name='Echo Agent' description='This agent echos the input given' url='http://localhost:10002/' provider=None version='0.1.0' documentationUrl=None capabilities=AgentCapabilities(streaming=False, pushNotifications=False, stateTransitionHistory=False) authentication=None defaultInputModes=['text'] defaultOutputModes=['text'] skills=[AgentSkill(id='my-project-echo-skill', name='Echo Tool', description='Echos the input given', tags=['echo', 'repeater'], examples=['I will see this echoed back to me'], inputModes=['text'], outputModes=['text'])]
INFO:     Started server process [582]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://localhost:10002 (Press CTRL+C to quit)

축하합니다! 이제 A2A 서버가 실행 중입니다!

A2A 서버와 상호 작용

먼저 Google-A2A의 명령줄 도구를 사용하여 A2A 서버로 요청을 보냅니다. 시도한 후, 이것이 내부적으로 어떻게 작동하는지 확인하기 위해 자체 기본 클라이언트를 작성할 것입니다.

Google-A2A의 명령줄 도구 사용

이전 실행에서 이미 실행 중인 A2A 서버:

# This should already be running in your terminal
$ uv run my-project
INFO:     Started server process [20538]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://localhost:10002 (Press CTRL+C to quit)

동일한 디렉토리에서 새 터미널을 엽니다.

source .venv/bin/activate
uv run google-a2a-cli --agent http://localhost:10002

# if got errors, try this (make sure that there is a dir hosts in .venv/lib/python3.13/site-packages):
uv run python -m hosts.cli --agent http://localhost:10002

참고: cli가 이전에 노출되지 않았으므로 이 풀 리퀘스트에서 google-a2a를 설치한 경우에만 작동합니다.

그렇지 않으면 Google/A2A 저장소를 직접 체크아웃하고 저장소로 이동하여 cli를 직접 실행해야 합니다.samples/python

그런 다음 Enter 키를 입력하고 눌러 서버로 메시지를 보낼 수 있습니다.

=========  starting a new task ========

What do you want to send to the agent? (:q or quit to exit): Hello!

모든 것이 올바르게 작동하면 응답에서 다음을 볼 수 있습니다.

$ uv run python -m hosts.cli --agent http://localhost:10002
======= Agent Card ========
{"name":"Echo Agent","description":"This agent echos the input given","url":"http://localhost:10002/","version":"0.1.0","capabilities":{"streaming":false,"pushNotifications":false,"stateTransitionHistory":false},"defaultInputModes":["text"],"defaultOutputModes":["text"],"skills":[{"id":"my-project-echo-skill","name":"Echo Tool","description":"Echos the input given","tags":["echo","repeater"],"examples":["I will see this echoed back to me"],"inputModes":["text"],"outputModes":["text"]}]}
=========  starting a new task ========

What do you want to send to the agent? (:q or quit to exit): hello
Select a file path to attach? (press enter to skip):

{"jsonrpc":"2.0","id":"5b3b74b7ea80495daff4047ee48a6c48","result":{"id":"740f1e21465b4ee2af4af7b8c6cacad5","sessionId":"7fbd065264cb4d6c91ed96909589fc35","status":{"state":"completed","message":{"role":"agent","parts":[{"type":"text","text":"on_send_task received: hello"}]},"timestamp":"2025-05-03T22:18:41.649600"},"artifacts":[{"parts":[{"type":"text","text":"on_send_task received: hello"}],"index":0}],"history":[{"role":"user","parts":[{"type":"text","text":"hello"}]}]}}
=========  starting a new task ========

What do you want to send to the agent? (:q or quit to exit):

종료하려면 Enter를 입력하고 키를 누릅니다.:q

에이전트 기능 추가

이제 기본 A2A 서버가 실행 중이므로 몇 가지 기능을 더 추가해 보겠습니다. A2A가 비동기식으로 작동하고 응답을 스트리밍하는 방법을 살펴보겠습니다.

스트리밍

이를 통해 클라이언트는 서버에 가입하고 단일 응답 대신 여러 업데이트를 받을 수 있습니다. 이는 장기 실행 에이전트 작업 또는 여러 아티팩트가 클라이언트로 다시 스트리밍될 수 있는 경우에 유용할 수 있습니다.

먼저 에이전트가 스트리밍 준비가 되었음을 선언합니다. AgentCapabilities 열기 및 업데이트:src/my_project/__init__.py

# ...
def main(host, port):
  # ...
  capabilities = AgentCapabilities(
    streaming=True
  )
  # ...

이제 다음을 구현해야 합니다.src/my_project/task_manager.pyon_send_task_subscribe

import asyncio
# ...
class MyAgentTaskManager(InMemoryTaskManager):
  # ...
  async def _stream_3_messages(self, request: SendTaskStreamingRequest):
    task_id = request.params.id
    received_text = request.params.message.parts[0].text

    text_messages = ["one", "two", "three"]
    for text in text_messages:
      parts = [
        {
          "type": "text",
          "text": f"{received_text}: {text}",
        }
      ]
      message = Message(role="agent", parts=parts)
      is_last = text == text_messages[-1]
      task_state = TaskState.COMPLETED if is_last else TaskState.WORKING
      task_status = TaskStatus(
        state=task_state,
        message=message
      )
      task_update_event = TaskStatusUpdateEvent(
        id=request.params.id,
        status=task_status,
        final=is_last,
      )
      await self.enqueue_events_for_sse(
        request.params.id,
        task_update_event
      )

  async def on_send_task_subscribe(
    self,
    request: SendTaskStreamingRequest
  ) -> AsyncIterable[SendTaskStreamingResponse] | JSONRPCResponse:
    # Upsert a task stored by InMemoryTaskManager
    await self.upsert_task(request.params)

    task_id = request.params.id
    # Create a queue of work to be done for this task
    sse_event_queue = await self.setup_sse_consumer(task_id=task_id)

    # Start the asynchronous work for this task
    asyncio.create_task(self._stream_3_messages(request))

    # Tell the client to expect future streaming responses
    return self.dequeue_events_for_sse(
      request_id=request.id,
      task_id=task_id,
      sse_event_queue=sse_event_queue,
    )

A2A 서버를 다시 시작하여 새 변경 사항을 선택한 다음 cli를 다시 실행합니다.

$ uv run python -m hosts.cli --agent http://localhost:10002
======= Agent Card ========
{"name":"Echo Agent","description":"This agent echos the input given","url":"http://localhost:10002/","version":"0.1.0","capabilities":{"streaming":true,"pushNotifications":false,"stateTransitionHistory":false},"defaultInputModes":["text"],"defaultOutputModes":["text"],"skills":[{"id":"my-project-echo-skill","name":"Echo Tool","description":"Echos the input given","tags":["echo","repeater"],"examples":["I will see this echoed back to me"],"inputModes":["text"],"outputModes":["text"]}]}
=========  starting a new task ========

What do you want to send to the agent? (:q or quit to exit): Streaming?
Select a file path to attach? (press enter to skip):
stream event => {"jsonrpc":"2.0","id":"c6f21c0b7e5e497caaca4a692aaefd7a","result":{"id":"d7218dd3c122477c89d62e7d897fea0b","status":{"state":"working","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?: one"}]},"timestamp":"2025-05-03T22:22:31.354656"},"final":false}}
stream event => {"jsonrpc":"2.0","id":"c6f21c0b7e5e497caaca4a692aaefd7a","result":{"id":"d7218dd3c122477c89d62e7d897fea0b","status":{"state":"working","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?: two"}]},"timestamp":"2025-05-03T22:22:31.354684"},"final":false}}
stream event => {"jsonrpc":"2.0","id":"c6f21c0b7e5e497caaca4a692aaefd7a","result":{"id":"d7218dd3c122477c89d62e7d897fea0b","status":{"state":"completed","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?: three"}]},"timestamp":"2025-05-03T22:22:31.354698"},"final":true}}
=========  starting a new task ========

What do you want to send to the agent? (:q or quit to exit):

경우에 따라 에이전트에 추가 입력이 필요할 수 있습니다. 예를 들어, 상담원은 클라이언트에게 3개의 메시지를 계속 반복할 것인지 물어볼 수 있습니다. 이 경우 에이전트는 응답한 다음 클라이언트가 에이전트가 요구하는 입력을 제공하는 동일한 업데이트된 메시지로 다시 보냅니다. 서버 측에서 이 경우를 처리하기 위해 업데이트할 것입니다.TaskState.INPUT_REQUIREDsend_task_streamingtask_idsession_idon_send_task_subscribe

import asyncio
from typing import AsyncIterable

from common.server.task_manager import InMemoryTaskManager
from common.types import (
  Artifact,
  JSONRPCResponse,
  Message,
  SendTaskRequest,
  SendTaskResponse,
  SendTaskStreamingRequest,
  SendTaskStreamingResponse,
  Task,
  TaskState,
  TaskStatus,
  TaskStatusUpdateEvent,
)

class MyAgentTaskManager(InMemoryTaskManager):
  def __init__(self):
    super().__init__()

  async def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse:
    # Upsert a task stored by InMemoryTaskManager
    await self.upsert_task(request.params)

    task_id = request.params.id
    # Our custom logic that simply marks the task as complete
    # and returns the echo text
    received_text = request.params.message.parts[0].text
    task = await self._update_task(
      task_id=task_id,
      task_state=TaskState.COMPLETED,
      response_text=f"on_send_task received: {received_text}"
    )

    # Send the response
    return SendTaskResponse(id=request.id, result=task)

  async def _stream_3_messages(self, request: SendTaskStreamingRequest):
    task_id = request.params.id
    received_text = request.params.message.parts[0].text

    text_messages = ["one", "two", "three"]
    for text in text_messages:
      parts = [
        {
          "type": "text",
          "text": f"{received_text}: {text}",
        }
      ]
      message = Message(role="agent", parts=parts)
      # is_last = text == text_messages[-1]
      task_state = TaskState.WORKING
      # task_state = TaskState.COMPLETED if is_last else TaskState.WORKING
      task_status = TaskStatus(
        state=task_state,
        message=message
      )
      task_update_event = TaskStatusUpdateEvent(
        id=request.params.id,
        status=task_status,
        final=False,
      )
      await self.enqueue_events_for_sse(
        request.params.id,
        task_update_event
      )
    ask_message = Message(
      role="agent",
      parts=[
        {
          "type": "text",
          "text": "Would you like more messages? (Y/N)"
        }
      ]
    )
    task_update_event = TaskStatusUpdateEvent(
      id=request.params.id,
      status=TaskStatus(
        state=TaskState.INPUT_REQUIRED,
        message=ask_message
      ),
      final=True,
    )
    await self.enqueue_events_for_sse(
      request.params.id,
      task_update_event
    )

  async def on_send_task_subscribe(
    self,
    request: SendTaskStreamingRequest
  ) -> AsyncIterable[SendTaskStreamingResponse] | JSONRPCResponse:
    task_id = request.params.id
    is_new_task = task_id in self.tasks
    # Upsert a task stored by InMemoryTaskManager
    await self.upsert_task(request.params)

    received_text = request.params.message.parts[0].text
    sse_event_queue = await self.setup_sse_consumer(task_id=task_id)
    if not is_new_task and received_text == "N":
      task_update_event = TaskStatusUpdateEvent(
        id=request.params.id,
        status=TaskStatus(
          state=TaskState.COMPLETED,
          message=Message(
            role="agent",
            parts=[
              {
                "type": "text",
                "text": "All done!"
              }
            ]
          )
        ),
        final=True,
      )
      await self.enqueue_events_for_sse(
        request.params.id,
        task_update_event,
      )
    else:
      asyncio.create_task(self._stream_3_messages(request))

    return self.dequeue_events_for_sse(
      request_id=request.id,
      task_id=task_id,
      sse_event_queue=sse_event_queue,
    )

  async def _update_task(
    self,
    task_id: str,
    task_state: TaskState,
    response_text: str,
  ) -> Task:
    task = self.tasks[task_id]
    agent_response_parts = [
      {
        "type": "text",
        "text": response_text,
      }
    ]
    task.status = TaskStatus(
      state=task_state,
      message=Message(
        role="agent",
        parts=agent_response_parts,
      )
    )
    task.artifacts = [
      Artifact(
        parts=agent_response_parts,
      )
    ]
    return task

이제 서버를 다시 시작하고 cli를 실행한 후 에이전트에 알릴 때까지 작업이 계속 실행되는 것을 볼 수 있습니다.N

uv run python -m hosts.cli --agent http://localhost:10002
======= Agent Card ========
{"name":"Echo Agent","description":"This agent echos the input given","url":"http://localhost:10002/","version":"0.1.0","capabilities":{"streaming":true,"pushNotifications":false,"stateTransitionHistory":false},"defaultInputModes":["text"],"defaultOutputModes":["text"],"skills":[{"id":"my-project-echo-skill","name":"Echo Tool","description":"Echos the input given","tags":["echo","repeater"],"examples":["I will see this echoed back to me"],"inputModes":["text"],"outputModes":["text"]}]}
=========  starting a new task ========

What do you want to send to the agent? (:q or quit to exit): Streaming?
Select a file path to attach? (press enter to skip):
stream event => {"jsonrpc":"2.0","id":"18357b72fc5841ef8e8ede073b91ac48","result":{"id":"b02f6989e72f44818560778d39fcef18","status":{"state":"working","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?: one"}]},"timestamp":"2025-05-04T09:18:18.235994"},"final":false}}
stream event => {"jsonrpc":"2.0","id":"18357b72fc5841ef8e8ede073b91ac48","result":{"id":"b02f6989e72f44818560778d39fcef18","status":{"state":"working","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?: two"}]},"timestamp":"2025-05-04T09:18:18.236021"},"final":false}}
stream event => {"jsonrpc":"2.0","id":"18357b72fc5841ef8e8ede073b91ac48","result":{"id":"b02f6989e72f44818560778d39fcef18","status":{"state":"working","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?: three"}]},"timestamp":"2025-05-04T09:18:18.236033"},"final":false}}
stream event => {"jsonrpc":"2.0","id":"18357b72fc5841ef8e8ede073b91ac48","result":{"id":"b02f6989e72f44818560778d39fcef18","status":{"state":"input-required","message":{"role":"agent","parts":[{"type":"text","text":"Would you like more messages? (Y/N)"}]},"timestamp":"2025-05-04T09:18:18.236044"},"final":true}}
=========  starting a new task ========

What do you want to send to the agent? (:q or quit to exit): N
Select a file path to attach? (press enter to skip):
stream event => {"jsonrpc":"2.0","id":"86ce510ba68b4797a5b68061c8c4780b","result":{"id":"64e51665dc354d2da7c31bcc45abc8f9","status":{"state":"completed","message":{"role":"agent","parts":[{"type":"text","text":"All done!"}]},"timestamp":"2025-05-04T09:22:24.598749"},"final":true}}
=========  starting a new task ========

What do you want to send to the agent? (:q or quit to exit):

축하합니다! 이제 비동기식으로 작업을 수행하고 필요할 때 사용자에게 입력을 요청할 수 있는 에이전트가 있습니다.

로컬 Ollama 모델 사용

이제 흥미로운 부분에 도달했습니다. A2A 서버에 AI를 추가할 예정입니다.

이 자습서에서는 로컬 Ollama 모델을 설정하고 A2A 서버와 통합합니다.

요구 사항

를 설치하고 MCP 도구를 지원하는 올라마 모델을 다운로드할 것입니다(향후 튜토리얼을 위해).ollamalangchain

  1. 다운로드 ollama
  2. ollama 서버를 실행합니다.
# Note: if ollama is already running, you may get an error such as
# Error: listen tcp 127.0.0.1:11434: bind: address already in use
# On linux you can run systemctl stop ollama to stop ollama
ollama serve
  1. 이 목록에서 모델을 다운로드합니다. 태그로 표시된 대로 지원하고 24GB 그래픽 카드에서 실행되는 대로 사용할 것입니다.qwqtools
ollama pull qwq

# or ollama pull qwen3:4b
# only 2.4G
  1. 설치하다:langchain
uv add langchain langchain-ollama langgraph

이제 올라마를 설정했으니 A2A 서버에 통합할 수 있습니다.

Ollama를 A2A 서버에 통합

먼저 열어 보세요.src/my_project/__init__.py

# ...

@click.command()
@click.option("--host", default="localhost")
@click.option("--port", default=10002)
@click.option("--ollama-host", default="http://127.0.0.1:11434")
@click.option("--ollama-model", default=None)
def main(host, port, ollama_host, ollama_model):
  # ...
  capabilities = AgentCapabilities(
    streaming=False # We'll leave streaming capabilities as an exercise for the reader
  )
  # ...
  task_manager = MyAgentTaskManager(
    ollama_host=ollama_host,
    ollama_model=ollama_mode,
  )
  # ..

이제 AI 기능을 추가해 보겠습니다.src/my_project/agent.py

from langchain_ollama import ChatOllama
from langgraph.prebuilt import create_react_agent
from langgraph.graph.graph import CompiledGraph

def create_ollama_agent(ollama_base_url: str, ollama_model: str):
  ollama_chat_llm = ChatOllama(
    base_url=ollama_base_url,
    model=ollama_model,
    temperature=0.2
  )
  agent = create_react_agent(ollama_chat_llm, tools=[])
  return agent

async def run_ollama(ollama_agent: CompiledGraph, prompt: str):
  agent_response = await ollama_agent.ainvoke(
    {"messages": prompt }
  )
  message = agent_response["messages"][-1].content
  return str(message)

마지막으로 다음에서 올라마 에이전트를 호출해 보겠습니다.src/my_project/task_manager.py

# ...
from my_project.agent import create_ollama_agent, run_ollama

class MyAgentTaskManager(InMemoryTaskManager):
  def __init__(
    self,
    ollama_host: str,
    ollama_model: typing.Union[None, str]
  ):
    super().__init__()
    if ollama_model is not None:
      self.ollama_agent = create_ollama_agent(
        ollama_base_url=ollama_host,
        ollama_model=ollama_model
      )
    else:
      self.ollama_agent = None

  async def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse:
    # ...
    received_text = request.params.message.parts[0].text
    response_text = f"on_send_task received: {received_text}"
    if self.ollama_agent is not None:
      response_text = await run_ollama(ollama_agent=self.ollama_agent, prompt=received_text)

    task = await self._update_task(
      task_id=task_id,
      task_state=TaskState.COMPLETED,
      response_text=response_text
    )

    # Send the response
    return SendTaskResponse(id=request.id, result=task)

  # ...

테스트해 보겠습니다!

먼저 A2A 서버를 다운로드한 올라마 모델로 교체하여 다시 실행합니다.qwq

uv run my-project --ollama-host http://127.0.0.1:11434 --ollama-model qwen3:4b

그런 다음 cli를 다시 실행하십시오.

uv run python -m hosts.cli --agent http://localhost:10002

대형 모델을 사용하는 경우 로드하는 데 시간이 걸릴 수 있습니다. cli가 시간 초과될 수 있습니다. 이 경우 ollama 서버가 모델 로드를 완료한 후 cli를 다시 실행합니다.

다음과 같은 내용이 표시되어야 합니다.

======= Agent Card ========
{"name":"Echo Agent","description":"This agent echos the input given","url":"http://localhost:10002/","version":"0.1.0","capabilities":{"streaming":false,"pushNotifications":false,"stateTransitionHistory":false},"defaultInputModes":["text"],"defaultOutputModes":["text"],"skills":[{"id":"my-project-echo-skill","name":"Echo Tool","description":"Echos the input given","tags":["echo","repeater"],"examples":["I will see this echoed back to me"],"inputModes":["text"],"outputModes":["text"]}]}
=========  starting a new task ========

What do you want to send to the agent? (:q or quit to exit): hey
Select a file path to attach? (press enter to skip):

{"jsonrpc":"2.0","id":"eca7ecf4d6da4a65a4ff99ab0954b957","result":{"id":"62636e021ac0483bb31d40c1473796fa","sessionId":"438927e3540f459389f3d3cb216dd945","status":{"state":"completed","message":{"role":"agent","parts":[{"type":"text","text":"<think>\nOkay, the user just said \"hey\". That's a pretty open-ended greeting. I need to respond in a friendly and welcoming way. Maybe start with a greeting like \"Hi there!\" to keep it casual. Then, ask how I can assist them. Since they didn't specify a topic, I should keep the response general but inviting. Let me make sure the tone is positive and approachable. Also, check if there's any specific context I should consider, but since there's no prior conversation, it's safe to assume they just want to start a new interaction. Alright, time to put that together.\n</think>\n\nHi there! How can I assist you today? 😊"}]},"timestamp":"2025-05-04T10:01:55.068049"},"artifacts":[{"parts":[{"type":"text","text":"<think>\nOkay, the user just said \"hey\". That's a pretty open-ended greeting. I need to respond in a friendly and welcoming way. Maybe start with a greeting like \"Hi there!\" to keep it casual. Then, ask how I can assist them. Since they didn't specify a topic, I should keep the response general but inviting. Let me make sure the tone is positive and approachable. Also, check if there's any specific context I should consider, but since there's no prior conversation, it's safe to assume they just want to start a new interaction. Alright, time to put that together.\n</think>\n\nHi there! How can I assist you today? 😊"}],"index":0}],"history":[{"role":"user","parts":[{"type":"text","text":"hey"}]}]}}
=========  starting a new task ========

What do you want to send to the agent? (:q or quit to exit):

축하합니다! 이제 AI 모델을 사용하여 응답을 생성하는 A2A 서버가 있습니다!

728x90
반응형