zoobzio January 15, 2025 Edit this page

Reliability Guide

Chit uses pipz for composable reliability patterns. This guide covers how to add retry logic, timeouts, rate limiting, circuit breakers, and custom middleware to your chat applications.

Pipeline Architecture

When you create a Chat, chit builds a processing pipeline:

Input → [Middleware] → [Reliability Wrappers] → Processor → Result

The processor (your brain) is the terminal. Everything else wraps it, executing in order.

Reliability Options

Retry

Retry failed processing up to N times:

chat := chit.New(processor, emitter,
    chit.WithRetry(3), // Retry up to 3 times
)

Backoff

Retry with exponential backoff between attempts:

chat := chit.New(processor, emitter,
    chit.WithBackoff(3, 100*time.Millisecond), // 3 attempts, 100ms base delay
)
// Delays: 100ms, 200ms, 400ms

Timeout

Cancel processing if it takes too long:

chat := chit.New(processor, emitter,
    chit.WithTimeout(30 * time.Second),
)

Circuit Breaker

Stop calling a failing processor to prevent cascade failures:

chat := chit.New(processor, emitter,
    chit.WithCircuitBreaker(5, 30*time.Second), // Open after 5 failures, recover after 30s
)

States:

  • Closed: Normal operation
  • Open: All calls fail immediately (after N consecutive failures)
  • Half-Open: After recovery timeout, allows one test call

Rate Limiting

Limit requests per second:

chat := chit.New(processor, emitter,
    chit.WithRateLimit(10, 5), // 10 requests/second, burst of 5
)

Combining Options

Options wrap inside-out. Order matters:

chat := chit.New(processor, emitter,
    chit.WithMiddleware(chit.UseValidation(4096)), // Innermost (runs first)
    chit.WithRetry(3),
    chit.WithTimeout(30 * time.Second),
    chit.WithRateLimit(10, 5), // Outermost (checked first)
)

Execution order:

  1. Rate limit check
  2. Timeout wrapper starts
  3. Retry loop begins
  4. Middleware runs (validation)
  5. Processor executes

Continuation Reliability

Reliability options apply to both fresh processor calls and continuation resumptions.

When a processor yields a continuation:

return &chit.Yield{
    Prompt: "What's your name?",
    Continuation: func(ctx context.Context, name string) (chit.Result, error) {
        // This continuation has the same reliability as the initial call
        return &chit.Response{Content: "Hello, " + name}, nil
    },
}, nil

The next Handle() call resumes the continuation through the same pipeline:

Input → [Middleware] → [Reliability Wrappers] → Continuation → Result

This means:

  • Retry: If the continuation fails transiently, it retries
  • Timeout: Slow continuations are cancelled
  • Rate limiting: Continuation calls count against rate limits
  • Middleware: Validation, logging, etc. apply to continuation input
chat := chit.New(processor, emitter,
    chit.WithMiddleware(chit.UseValidation(100)),
    chit.WithRetry(3),
    chit.WithTimeout(5 * time.Second),
)

// First call - processor runs through pipeline
chat.Handle(ctx, "start") // yields continuation

// Second call - continuation runs through SAME pipeline
chat.Handle(ctx, "Alice") // retry, timeout, validation all apply

The continuation is wrapped in a ContinuationTerminal and the same PipelineOption functions are applied, ensuring consistent behavior.

Custom Middleware

Middleware runs before the processor. Use it for validation, logging, enrichment, or transformation.

Built-in Middleware

// Validate input length
chit.UseValidation(maxLength int)

// Log requests
chit.UseLogging(logger *slog.Logger)

// Record metrics
chit.UseMetrics(recorder MetricsRecorder)

// Enrich requests (best-effort)
chit.UseEnrich(enricher Enricher)

Custom Middleware with pipz

Use pipz primitives directly for full control:

import "github.com/zoobz-io/pipz"

// Apply: transform or fail
customValidator := pipz.Apply(
    pipz.NewIdentity("myapp:validate", "Custom validation"),
    func(ctx context.Context, req *chit.ChatRequest) (*chit.ChatRequest, error) {
        if containsBadWords(req.Input) {
            return nil, errors.New("inappropriate content")
        }
        return req, nil
    },
)

// Effect: side effect, no modification
auditLogger := pipz.Effect(
    pipz.NewIdentity("myapp:audit", "Audit logging"),
    func(ctx context.Context, req *chit.ChatRequest) error {
        log.Printf("User %s: %s", req.ChatID, req.Input)
        return nil
    },
)

// Transform: modify without failing
normalizer := pipz.Transform(
    pipz.NewIdentity("myapp:normalize", "Input normalization"),
    func(ctx context.Context, req *chit.ChatRequest) *chit.ChatRequest {
        req.Input = strings.TrimSpace(req.Input)
        return req
    },
)

chat := chit.New(processor, emitter,
    chit.WithMiddleware(normalizer, customValidator, auditLogger),
)

Middleware Helpers

Chit provides convenience wrappers:

// UseApply - transform or fail
chit.UseApply(id, func(ctx, req) (*ChatRequest, error))

// UseEffect - side effect only
chit.UseEffect(id, func(ctx, req) error)

// UseTransform - transform without failing
chit.UseTransform(id, func(ctx, req) *ChatRequest)

// UseMutate - conditional transform
chit.UseMutate(id, transformFn, conditionFn)

Error Handling

Add an error handler pipeline for observability:

errorHandler := pipz.Effect(
    pipz.NewIdentity("myapp:error-handler", "Error handler"),
    func(ctx context.Context, err *pipz.Error[*chit.ChatRequest]) error {
        log.Printf("Processing failed: %v (path: %v)", err.Err, err.Path)
        metrics.RecordError(err.Err)
        return nil
    },
)

chat := chit.New(processor, emitter,
    chit.WithErrorHandler(errorHandler),
)

The error handler receives rich context:

  • err.Err — the underlying error
  • err.Path — the pipeline path where it failed
  • err.Duration — how long before failure
  • err.InputData — the request that failed

Fallback

Use a backup processor when the primary fails:

primaryChat := chit.New(primaryProcessor, emitter)
fallbackChat := chit.New(fallbackProcessor, emitter)

chat := chit.New(primaryProcessor, emitter,
    chit.WithFallback(fallbackChat),
)

Both must implement ChatProvider. The fallback's pipeline is used when the primary fails.

ChatRequest

The ChatRequest struct flows through the pipeline:

type ChatRequest struct {
    Input     string       // User input
    Session   *zyn.Session // Conversation history
    ChatID    string       // Chat instance ID
    RequestID string       // Unique request ID
    Result    Result       // Output (populated by terminal)
}

Middleware can read/modify any field except Result (set by the terminal).

Observability

pipz emits signals for reliability events:

SignalWhen
pipz.SignalRetryAttemptStartBefore each retry attempt
pipz.SignalRetryAttemptFailAfter a failed retry attempt
pipz.SignalRetryExhaustedAll retries exhausted
pipz.SignalTimeoutTriggeredTimeout exceeded
pipz.SignalCircuitBreakerOpenedCircuit opened
pipz.SignalCircuitBreakerClosedCircuit closed
pipz.SignalRateLimitThrottledRate limit hit

Subscribe via capitan for logging, metrics, or alerting.

Next Steps