feat(mcp): add in process work item stream for internal mcp wf routing#83
Open
feat(mcp): add in process work item stream for internal mcp wf routing#83
Conversation
Signed-off-by: Samantha Coyle <sam@diagrid.io>
There was a problem hiding this comment.
Pull request overview
This PR adds an in-process work-item routing path to the existing gRPC-backed executor so internal dapr.mcp.* workflows/activities can be delivered to in-process consumers while all other work items continue to flow over the external SDK gRPC stream.
Changes:
- Introduces
backend.WorkItemSinkandbackend.SinkRegistrarto register sinks by workflow/activity name prefix and route matching work items in-process. - Adds
client.InProcessClientas an in-process sink implementation and factors shared workflow/activity dispatch logic intoclient/dispatcher.go. - Adds tests validating sink registration, routing behavior, and shutdown sink closing.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
backend/executor.go |
Adds sink registration APIs, prefix matching, routing logic in ExecuteWorkflow/ExecuteActivity, and closes sinks first during Shutdown. |
client/worker_inprocess.go |
Adds an in-process worker/sink that drains work items from an internal channel and completes tasks via the backend. |
client/worker_grpc.go |
Refactors workflow/activity processing to use shared dispatch helpers. |
client/dispatcher.go |
Centralizes workflow/activity execution + response construction shared by gRPC and in-process workers. |
tests/sink_test.go |
Adds unit tests covering sink registrar validation, routing, and shutdown close behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Signed-off-by: Samantha Coyle <sam@diagrid.io>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Signed-off-by: Samantha Coyle <sam@diagrid.io>
7 tasks
JoshVanL
requested changes
Apr 15, 2026
Signed-off-by: Samantha Coyle <sam@diagrid.io>
Signed-off-by: Samantha Coyle <sam@diagrid.io>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Dapr's MCP Server integration registers built-in dapr.mcp.* orchestrations and dapr-mcp-* activities that run inside the sidecar. Initially I tried using a parallel task.TaskExecutor behind a RoutingExecutor wrapper, but that ended up being a completely separate execution path from the normal gRPC-backed engine. That meant that at first, internal MCP workflows couldn't use the standard TaskHubWorker lifecycle, queue-backed parallelism, or backend-driven retries.
With this PR, Dapr can register an InProcessClient as a sink for dapr.mcp.* prefixed names. The gRPC executor routes matching work items to the sink while everything else flows to the external SDK unchanged. One engine, one backend, two delivery paths - internal workflows are authored identically to SDK workflows.
CHanges: