-
Notifications
You must be signed in to change notification settings - Fork 88
Expand file tree
/
Copy pathProgram.cs
More file actions
121 lines (101 loc) · 4.66 KB
/
Program.cs
File metadata and controls
121 lines (101 loc) · 4.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
using System;
using System.ComponentModel;
using System.ClientModel;
using OpenAI;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using DotNetEnv;
// Load environment variables
Env.Load("../../../../.env");
var github_endpoint = Environment.GetEnvironmentVariable("GITHUB_ENDPOINT") ?? throw new InvalidOperationException("GITHUB_ENDPOINT is not set.");
var github_model_id = "gpt-4o";
var github_token = Environment.GetEnvironmentVariable("GITHUB_TOKEN") ?? throw new InvalidOperationException("GITHUB_TOKEN is not set.");
// Configure OpenAI client
var openAIOptions = new OpenAIClientOptions()
{
Endpoint = new Uri(github_endpoint)
};
var openAIClient = new OpenAIClient(new ApiKeyCredential(github_token), openAIOptions);
// Define agent names and instructions
const string ResearcherAgentName = "Researcher-Agent";
const string ResearcherAgentInstructions = "You are my travel researcher, working with me to analyze the destination, list relevant attractions, and make detailed plans for each attraction.";
const string PlanAgentName = "Plan-Agent";
const string PlanAgentInstructions = "You are my travel planner, working with me to create a detailed travel plan based on the researcher's findings.";
// Create AI agents
var chatClient = openAIClient.GetChatClient(github_model_id).AsIChatClient();
ChatClientAgent researcherAgent = new(
chatClient,
name: ResearcherAgentName,
instructions: ResearcherAgentInstructions);
ChatClientAgent plannerAgent = new(
chatClient,
name: PlanAgentName,
instructions: PlanAgentInstructions);
// Create concurrent executors
var startExecutor = new ConcurrentStartExecutor();
var aggregationExecutor = new ConcurrentAggregationExecutor();
// Build concurrent workflow with FanOut/FanIn pattern
var workflow = new WorkflowBuilder(startExecutor)
.AddFanOutEdge(startExecutor, [researcherAgent, plannerAgent])
.AddFanInBarrierEdge([researcherAgent, plannerAgent], aggregationExecutor)
.WithOutputFrom(aggregationExecutor)
.Build();
// Execute workflow
await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, input: "Plan a trip to Seattle in December");
string messageData = "";
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent output)
{
messageData = output.Data?.ToString() ?? "";
Console.WriteLine($"Workflow completed with results:\n{output.Data}");
}
}
Console.WriteLine("\n=== Final Output ===");
Console.WriteLine(messageData);
// Mermaid
Console.WriteLine("\nMermaid string: \n=======");
var mermaid = workflow.ToMermaidString();
Console.WriteLine(mermaid);
Console.WriteLine("=======");
// DOT - Save to file instead of stdout to avoid pipe issues
var dotString = workflow.ToDotString();
var dotFilePath = "workflow.dot";
File.WriteAllText(dotFilePath, dotString);
Console.WriteLine($"\nDOT graph saved to: {dotFilePath}");
Console.WriteLine("To generate image: dot -Tsvg workflow.dot -o workflow.svg");
Console.WriteLine(" dot -Tpng workflow.dot -o workflow.png");
/// <summary>
/// Executor that starts the concurrent processing by broadcasting messages to all agents.
/// </summary>
internal sealed partial class ConcurrentStartExecutor() :
Executor("ConcurrentStartExecutor")
{
[MessageHandler]
public async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Broadcast the message to all connected agents. Receiving agents will queue
// the message but will not start processing until they receive a turn token.
await context.SendMessageAsync(new ChatMessage(ChatRole.User, message), cancellationToken: cancellationToken);
// Broadcast the turn token to kick off the agents.
await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken: cancellationToken);
}
}
/// <summary>
/// Executor that aggregates the results from the concurrent agents.
/// </summary>
internal sealed class ConcurrentAggregationExecutor() :
Executor<List<ChatMessage>>("ConcurrentAggregationExecutor")
{
private readonly List<ChatMessage> _messages = [];
public override async ValueTask HandleAsync(List<ChatMessage> message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
this._messages.AddRange(message);
if (this._messages.Count == 2)
{
var formattedMessages = string.Join(Environment.NewLine, this._messages.Select(m => $"{m.AuthorName}: {m.Text}"));
await context.YieldOutputAsync(formattedMessages, cancellationToken);
}
}
}