Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions cmd/vmcp/app/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,19 @@ func runServe(cmd *cobra.Command, _ []string) error {
Timeout: defaults.Timeout,
DegradedThreshold: defaults.DegradedThreshold,
}

// Configure circuit breaker if enabled
if cfg.Operational.FailureHandling.CircuitBreaker != nil && cfg.Operational.FailureHandling.CircuitBreaker.Enabled {
healthMonitorConfig.CircuitBreaker = &health.CircuitBreakerConfig{
Enabled: true,
FailureThreshold: cfg.Operational.FailureHandling.CircuitBreaker.FailureThreshold,
Timeout: time.Duration(cfg.Operational.FailureHandling.CircuitBreaker.Timeout),
}
logger.Infof("Circuit breaker enabled (failure threshold: %d, timeout: %v)",
cfg.Operational.FailureHandling.CircuitBreaker.FailureThreshold,
time.Duration(cfg.Operational.FailureHandling.CircuitBreaker.Timeout))
}

logger.Info("Health monitoring configured from operational settings")
}

Expand Down
107 changes: 62 additions & 45 deletions pkg/vmcp/aggregator/default_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ func (a *defaultAggregator) QueryAllCapabilities(

// Query each backend in parallel
for _, backend := range backends {
backend := backend // Capture loop variable
g.Go(func() error {
caps, err := a.QueryCapabilities(ctx, backend)
if err != nil {
Expand Down Expand Up @@ -311,10 +310,25 @@ func (a *defaultAggregator) MergeCapabilities(
}

// Convert resolved tools to final vmcp.Tool format
// The routing table gets ALL tools (for composite tool routing)
// The advertised tools list only gets non-excluded tools (for LLM)
// The routing table gets ALL healthy tools (for composite tool routing)
// The advertised tools list only gets non-excluded healthy tools (for LLM)
tools := make([]vmcp.Tool, 0, len(resolved.Tools))
for _, resolvedTool := range resolved.Tools {
// Look up full backend information from registry
backend := registry.Get(ctx, resolvedTool.BackendID)
if backend == nil {
logger.Warnf("Backend %s not found in registry for tool %s, skipping",
resolvedTool.BackendID, resolvedTool.ResolvedName)
continue
}

// Filter out tools from unhealthy backends
if !backend.HealthStatus.IsHealthyForRouting() {
logger.Debugf("Skipping tool %s from unhealthy backend %s (status: %s)",
resolvedTool.ResolvedName, backend.Name, backend.HealthStatus)
continue
}

// Check if this tool should be excluded from the advertised list
// ExcludeAll only affects advertising, not routing
shouldAdvertise := a.shouldAdvertiseTool(resolvedTool.BackendID)
Expand All @@ -329,58 +343,61 @@ func (a *defaultAggregator) MergeCapabilities(
}

// ALWAYS add to routing table (for composite tools to call excluded backend tools)
// Look up full backend information from registry
backend := registry.Get(ctx, resolvedTool.BackendID)
if backend == nil {
logger.Warnf("Backend %s not found in registry for tool %s, creating minimal target",
resolvedTool.BackendID, resolvedTool.ResolvedName)
routingTable.Tools[resolvedTool.ResolvedName] = &vmcp.BackendTarget{
WorkloadID: resolvedTool.BackendID,
OriginalCapabilityName: resolvedTool.OriginalName,
}
} else {
// Use the backendToTarget helper from registry package
target := vmcp.BackendToTarget(backend)
// Store the original tool name for forwarding to backend
target.OriginalCapabilityName = resolvedTool.OriginalName
routingTable.Tools[resolvedTool.ResolvedName] = target
}
// Use the backendToTarget helper from registry package
target := vmcp.BackendToTarget(backend)
// Store the original tool name for forwarding to backend
target.OriginalCapabilityName = resolvedTool.OriginalName
routingTable.Tools[resolvedTool.ResolvedName] = target
}

// Add resources to routing table
// Add resources to routing table (with health filtering)
resources := make([]vmcp.Resource, 0, len(resolved.Resources))
for _, resource := range resolved.Resources {
backend := registry.Get(ctx, resource.BackendID)
if backend == nil {
logger.Warnf("Backend %s not found in registry for resource %s, creating minimal target",
logger.Warnf("Backend %s not found in registry for resource %s, skipping",
resource.BackendID, resource.URI)
routingTable.Resources[resource.URI] = &vmcp.BackendTarget{
WorkloadID: resource.BackendID,
OriginalCapabilityName: resource.URI,
}
} else {
target := vmcp.BackendToTarget(backend)
// Store the original resource URI for forwarding to backend
target.OriginalCapabilityName = resource.URI
routingTable.Resources[resource.URI] = target
continue
}

// Filter out resources from unhealthy backends
if !backend.HealthStatus.IsHealthyForRouting() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: In the event health changes, checking this multiple times for the same backend while aggregating can cause partial states where the tools for the backend are excluded but the resources are included. This is probably not a big deal, but you could prevent it by only checking a backend's health once per aggregation.

logger.Debugf("Skipping resource %s from unhealthy backend %s (status: %s)",
resource.URI, backend.Name, backend.HealthStatus)
continue
}

resources = append(resources, resource)

target := vmcp.BackendToTarget(backend)
// Store the original resource URI for forwarding to backend
target.OriginalCapabilityName = resource.URI
routingTable.Resources[resource.URI] = target
}

// Add prompts to routing table
// Add prompts to routing table (with health filtering)
prompts := make([]vmcp.Prompt, 0, len(resolved.Prompts))
for _, prompt := range resolved.Prompts {
backend := registry.Get(ctx, prompt.BackendID)
if backend == nil {
logger.Warnf("Backend %s not found in registry for prompt %s, creating minimal target",
logger.Warnf("Backend %s not found in registry for prompt %s, skipping",
prompt.BackendID, prompt.Name)
routingTable.Prompts[prompt.Name] = &vmcp.BackendTarget{
WorkloadID: prompt.BackendID,
OriginalCapabilityName: prompt.Name,
}
} else {
target := vmcp.BackendToTarget(backend)
// Store the original prompt name for forwarding to backend
target.OriginalCapabilityName = prompt.Name
routingTable.Prompts[prompt.Name] = target
continue
}

// Filter out prompts from unhealthy backends
if !backend.HealthStatus.IsHealthyForRouting() {
logger.Debugf("Skipping prompt %s from unhealthy backend %s (status: %s)",
prompt.Name, backend.Name, backend.HealthStatus)
continue
}

prompts = append(prompts, prompt)

target := vmcp.BackendToTarget(backend)
// Store the original prompt name for forwarding to backend
target.OriginalCapabilityName = prompt.Name
routingTable.Prompts[prompt.Name] = target
}

// Determine conflict strategy used
Expand All @@ -396,16 +413,16 @@ func (a *defaultAggregator) MergeCapabilities(
// Create final aggregated view
aggregated := &AggregatedCapabilities{
Tools: tools,
Resources: resolved.Resources,
Prompts: resolved.Prompts,
Resources: resources,
Prompts: prompts,
SupportsLogging: resolved.SupportsLogging,
SupportsSampling: resolved.SupportsSampling,
RoutingTable: routingTable,
Metadata: &AggregationMetadata{
BackendCount: 0, // Will be set by caller
ToolCount: len(tools),
ResourceCount: len(resolved.Resources),
PromptCount: len(resolved.Prompts),
ResourceCount: len(resources),
PromptCount: len(prompts),
ConflictStrategy: conflictStrategy,
},
}
Expand Down
Loading
Loading