Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added custom event #24

Open
wants to merge 5 commits into
base: hotfix
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion src/backend/agents/base_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from context.cosmos_memory import CosmosBufferedChatCompletionContext
from models.messages import (ActionRequest, ActionResponse,
AgentMessage, Step, StepStatus)
from azure.monitor.events.extension import track_event

class BaseAgent(RoutedAgent):
def __init__(
Expand Down Expand Up @@ -94,14 +95,53 @@ async def handle_action_request(
step_id=message.step_id,
)
)

track_event(
"Base agent - Added into the cosmos",
{
"session_id": message.session_id,
"user_id": self._user_id,
"plan_id": message.plan_id,
"content": f"{result}",
"source": self._agent_name,
"step_id": message.step_id,
},
)

except Exception as e:
print(f"Error during LLM call: {e}")
logging.exception(f"Error during LLM call: {e}")
track_event(
"Base agent - Error during llm call, captured into the cosmos",
{
"session_id": message.session_id,
"user_id": self._user_id,
"plan_id": message.plan_id,
"content": f"{e}",
"source": self._agent_name,
"step_id": message.step_id,
},
)

return
print(f"Task completed: {result}")

step.status = StepStatus.completed
step.agent_reply = result
await self._model_context.update_step(step)

track_event(
"Base agent - Updated step and updated into the cosmos",
{
"status": StepStatus.completed,
"session_id": message.session_id,
"agent_reply": f"{result}",
"user_id": self._user_id,
"plan_id": message.plan_id,
"content": f"{result}",
"source": self._agent_name,
"step_id": message.step_id,
},
)

action_response = ActionResponse(
step_id=step.id,
Expand Down
79 changes: 76 additions & 3 deletions src/backend/agents/group_chat_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
from typing import Dict, List

from autogen_core.base import AgentId, MessageContext
from autogen_core.components import (RoutedAgent, default_subscription,
message_handler)
from autogen_core.components import RoutedAgent, default_subscription, message_handler
from autogen_core.components.models import AzureOpenAIChatCompletionClient

from context.cosmos_memory import CosmosBufferedChatCompletionContext
Expand All @@ -28,6 +27,7 @@

from datetime import datetime
from typing import List
from azure.monitor.events.extension import track_event


@default_subscription
Expand All @@ -36,7 +36,7 @@ def __init__(
self,
model_client: AzureOpenAIChatCompletionClient,
session_id: str,
user_id:str,
user_id: str,
memory: CosmosBufferedChatCompletionContext,
agent_ids: Dict[BAgentType, AgentId],
):
Expand Down Expand Up @@ -66,6 +66,17 @@ async def handle_input_task(
step_id="",
)
)

track_event(
"Group Chat Manager - Received and added input task into the cosmos",
{
"session_id": message.session_id,
"user_id": self._user_id,
"content": message.description,
"source": "HumanAgent",
},
)

# Send the InputTask to the PlannerAgent
planner_agent_id = self._agent_ids.get(BAgentType.planner_agent)
plan: Plan = await self.send_message(message, planner_agent_id)
Expand Down Expand Up @@ -158,6 +169,16 @@ class Step(BaseDataModel):
step.status = StepStatus.rejected
step.human_approval_status = HumanFeedbackStatus.rejected
self._memory.update_step(step)
track_event(
"Group Chat Manager - Steps has been rejected and updated into the cosmos",
{
"status": StepStatus.rejected,
"session_id": message.session_id,
"user_id": self._user_id,
"human_approval_status": HumanFeedbackStatus.rejected,
"source": step.agent,
},
)
else:
# Update and execute all steps if no specific step_id is provided
for step in steps:
Expand All @@ -172,6 +193,16 @@ class Step(BaseDataModel):
step.status = StepStatus.rejected
step.human_approval_status = HumanFeedbackStatus.rejected
self._memory.update_step(step)
track_event(
"Group Chat Manager - Step has been rejected and updated into the cosmos",
{
"status": StepStatus.rejected,
"session_id": message.session_id,
"user_id": self._user_id,
"human_approval_status": HumanFeedbackStatus.rejected,
"source": step.agent,
},
)

# Function to update step status and add feedback
async def _update_step_status(
Expand All @@ -187,6 +218,16 @@ async def _update_step_status(
step.human_feedback = received_human_feedback
step.status = StepStatus.completed
await self._memory.update_step(step)
track_event(
"Group Chat Manager - Received human feedback, Updating step and updated into the cosmos",
{
"status": StepStatus.completed,
"session_id": step.session_id,
"user_id": self._user_id,
"human_feedback": received_human_feedback,
"source": step.agent,
},
)
# TODO: Agent verbosity
# await self._memory.add_item(
# AgentMessage(
Expand All @@ -205,6 +246,15 @@ async def _execute_step(self, session_id: str, step: Step):
# Update step status to 'action_requested'
step.status = StepStatus.action_requested
await self._memory.update_step(step)
track_event(
"Group Chat Manager - Update step to action_requested and updated into the cosmos",
{
"status": StepStatus.action_requested,
"session_id": step.session_id,
"user_id": self._user_id,
"source": step.agent,
},
)

# generate conversation history for the invoked agent
plan = await self._memory.get_plan_by_session(session_id=session_id)
Expand Down Expand Up @@ -261,6 +311,18 @@ async def _execute_step(self, session_id: str, step: Step):
)
)

track_event(
f"Group Chat Manager - Requesting {step.agent.value.title()} to perform the action and added into the cosmos",
{
"session_id": session_id,
"user_id": self._user_id,
"plan_id": step.plan_id,
"content": f"Requesting {step.agent.value.title()} to perform action: {step.action}",
"source": "GroupChatManager",
"step_id": step.id,
},
)

agent_id = self._agent_ids.get(step.agent)
# If the agent_id is not found, send the request to the PlannerAgent for re-planning
# TODO: re-think for the demo scenario
Expand All @@ -283,6 +345,17 @@ async def _execute_step(self, session_id: str, step: Step):
logging.info(
"Marking the step as complete - Since we have received the human feedback"
)
track_event(
"Group Chat Manager - Steps completed - Received the human feedback and updated into the cosmos",
{
"session_id": session_id,
"user_id": self._user_id,
"plan_id": step.plan_id,
"content": "Marking the step as complete - Since we have received the human feedback",
"source": step.agent,
"step_id": step.id,
},
)
else:
await self.send_message(action_request, agent_id)
logging.info(f"Sent ActionRequest to {step.agent.value}")
23 changes: 23 additions & 0 deletions src/backend/agents/human.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
AgentMessage,
Step,
)
from azure.monitor.events.extension import track_event


@default_subscription
Expand Down Expand Up @@ -59,6 +60,17 @@ async def handle_step_feedback(
)
)
logging.info(f"HumanAgent received feedback for step: {step}")
track_event(
f"Human Agent - Received feedback for step: {step} and added into the cosmos",
{
"session_id": message.session_id,
"user_id": self.user_id,
"plan_id": step.plan_id,
"content": f"Received feedback for step: {step.action}",
"source": "HumanAgent",
"step_id": message.step_id,
},
)

# Notify the GroupChatManager that the step has been completed
await self._memory.add_item(
Expand All @@ -71,3 +83,14 @@ async def handle_step_feedback(
)
)
logging.info(f"HumanAgent sent approval request for step: {step}")

track_event(
f"Human Agent - Approval request sent for step {step} and added into the cosmos",
{
"session_id": message.session_id,
"user_id": self.user_id,
"plan_id": step.plan_id,
"step_id": message.step_id,
"agent_id": self.group_chat_manager_id,
},
)
82 changes: 81 additions & 1 deletion src/backend/agents/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
HumanFeedbackStatus,
)
from typing import Optional
from azure.monitor.events.extension import track_event

@default_subscription
class PlannerAgent(RoutedAgent):
Expand Down Expand Up @@ -70,6 +71,17 @@ async def handle_input_task(self, message: InputTask, ctx: MessageContext) -> Pl
)
)
logging.info(f"Plan generated: {plan.summary}")

track_event(
f"Planner - Generated a plan with {len(steps)} steps and added plan into the cosmos",
{
"session_id": message.session_id,
"user_id": self._user_id,
"plan_id": plan.id,
"content": f"Generated a plan with {len(steps)} steps. Click the blue check box beside each step to complete it, click the x to remove this step.",
"source": "PlannerAgent",
},
)

if plan.human_clarification_request is not None:
# if the plan identified that user information was required, send a message asking the user for it
Expand All @@ -86,6 +98,17 @@ async def handle_input_task(self, message: InputTask, ctx: MessageContext) -> Pl
logging.info(
f"Additional information requested: {plan.human_clarification_request}"
)

track_event(
"Planner - Additional information requested and added into the cosmos",
{
"session_id": message.session_id,
"user_id": self._user_id,
"plan_id": plan.id,
"content": f"I require additional information before we can proceed: {plan.human_clarification_request}",
"source": "PlannerAgent",
},
)

return plan

Expand All @@ -112,6 +135,17 @@ async def handle_plan_clarification(
step_id="",
)
)

track_event(
"Planner - Store HumanAgent clarification and added into the cosmos",
{
"session_id": message.session_id,
"user_id": self._user_id,
"content": f"{message.human_clarification}",
"source": "HumanAgent",
},
)

await self._memory.add_item(
AgentMessage(
session_id=message.session_id,
Expand All @@ -123,6 +157,16 @@ async def handle_plan_clarification(
)
)
logging.info("Plan updated with HumanClarification.")

track_event(
"Planner - Updated with HumanClarification and added into the cosmos",
{
"session_id": message.session_id,
"user_id": self._user_id,
"content": "Thanks. The plan has been updated.",
"source": "PlannerAgent",
},
)

def _generate_instruction(self, objective: str) -> str:

Expand Down Expand Up @@ -221,6 +265,19 @@ class StructuredOutputPlan(BaseModel):
)
# Store the plan in memory
await self._memory.add_plan(plan)

track_event(
"Planner - Initial plan and added into the cosmos",
{
"session_id": self._session_id,
"user_id": self._user_id,
"initial_goal": structured_plan.initial_goal,
"overall_status": PlanStatus.in_progress,
"source": "PlannerAgent",
"summary": structured_plan.summary_plan_and_steps,
"human_clarification_request": structured_plan.human_clarification_request,
},
)

# Create the Step instances and store them in memory
steps = []
Expand All @@ -235,12 +292,35 @@ class StructuredOutputPlan(BaseModel):
human_approval_status=HumanFeedbackStatus.requested,
)
await self._memory.add_step(step)
track_event(
"Planner - Added planned individual step into the cosmos",
{
"plan_id": plan.id,
"action": step_data.action,
"agent": step_data.agent,
"status": StepStatus.planned,
"session_id": self._session_id,
"user_id": self._user_id,
"human_approval_status": HumanFeedbackStatus.requested,
},
)
steps.append(step)

return plan, steps

except Exception as e:
logging.error(f"Error in create_structured_plan: {e}")
logging.exception(f"Error in create_structured_plan: {e}")
track_event(
f"Planner - Error in create_structured_plan: {e} into the cosmos",
{
"session_id": self._session_id,
"user_id": self._user_id,
"initial_goal": "Error generating plan",
"overall_status": PlanStatus.failed,
"source": "PlannerAgent",
"summary": "Error generating plan",
},
)
# Handle the error, possibly by creating a plan with an error step
plan = Plan(
id=str(uuid.uuid4()),
Expand Down
Loading