ProductGraph Integration TRD¶
Author: PlexusOne Date: 2026-04-27 Status: Draft
Overview¶
This document describes the technical architecture for integrating systemforge's observability with ProductGraph for backend-frontend correlation and analytics forwarding.
Architecture¶
┌─────────────────────────────────────────────────────────────────────────────┐
│ Frontend │
│ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ @systemforge/telemetry │ │
│ │ TelemetryProvider → ProductGraphAdapter → POST /v1/events │ │
│ │ │ │
│ │ Headers: X-Session-ID, X-Request-ID │ │
│ └───────────────────────────────────┬──────────────────────────────────┘ │
└───────────────────────────────────────┼─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ systemforge Backend Service │
│ │
│ ┌────────────────────────────────────────────────────────────────────────┐ │
│ │ Correlation Middleware │ │
│ │ Extract: X-Session-ID, X-Request-ID → Context │ │
│ └────────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────┐ ┌────────────────┴───────────────┐ ┌──────────────┐ │
│ │ omniobserve │ │ productgraph Client │ │ Business │ │
│ │ Provider │ │ │ │ Logic │ │
│ │ (Traces/Metrics)│ │ - Event batching │ │ │ │
│ │ │ │ - OTel semantics │ │ │ │
│ │ │ │ - Session correlation │ │ │ │
│ └────────┬────────┘ └────────────────┬───────────────┘ └──────────────┘ │
└───────────┼────────────────────────────┼────────────────────────────────────┘
│ │
▼ ▼
┌──────────────────────┐ ┌─────────────────────────────────────────────┐
│ OTLP / Datadog / │ │ ProductGraph │
│ New Relic / etc │ │ │
└──────────────────────┘ │ ┌─────────────────┐ ┌──────────────────┐ │
│ │ Storage (PG) │ │ Analytics Adapter│ │
│ └─────────────────┘ └────────┬─────────┘ │
└─────────────────────────────────┼───────────┘
│
┌──────────────────────┴───────┐
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ Amplitude │ │ Mixpanel │
└──────────────────┘ └──────────────────┘
Components¶
Correlation Middleware¶
Package: systemforge/observability/correlation
Purpose: Extract frontend correlation IDs and inject into context.
package correlation
import (
"context"
"net/http"
)
type ContextKey string
const (
SessionIDKey ContextKey = "session_id"
RequestIDKey ContextKey = "request_id"
UserIDKey ContextKey = "user_id"
)
// Middleware extracts correlation headers and injects into context.
func Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
// Extract correlation headers
if sessionID := r.Header.Get("X-Session-ID"); sessionID != "" {
ctx = context.WithValue(ctx, SessionIDKey, sessionID)
}
if requestID := r.Header.Get("X-Request-ID"); requestID != "" {
ctx = context.WithValue(ctx, RequestIDKey, requestID)
}
// Echo back for debugging
if requestID := r.Header.Get("X-Request-ID"); requestID != "" {
w.Header().Set("X-Request-ID", requestID)
}
next.ServeHTTP(w, r.WithContext(ctx))
})
}
// SessionIDFromContext extracts session ID from context.
func SessionIDFromContext(ctx context.Context) string {
if v, ok := ctx.Value(SessionIDKey).(string); ok {
return v
}
return ""
}
// RequestIDFromContext extracts request ID from context.
func RequestIDFromContext(ctx context.Context) string {
if v, ok := ctx.Value(RequestIDKey).(string); ok {
return v
}
return ""
}
ProductGraph Client¶
Package: systemforge/productgraph
Purpose: Send events to ProductGraph from Go backend.
package productgraph
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
"github.com/google/uuid"
)
// Config configures the ProductGraph client.
type Config struct {
ProjectID string // Required: project identifier
Endpoint string // Required: ProductGraph endpoint
APIKey string // Optional: X-PG-API-Key header
BatchSize int // Default: 50
BatchInterval time.Duration // Default: 5s
HTTPClient *http.Client // Default: http.DefaultClient
}
// Client sends events to ProductGraph.
type Client struct {
config Config
buffer []Event
mu sync.Mutex
done chan struct{}
wg sync.WaitGroup
}
// Event represents a ProductGraph event.
type Event struct {
EventID string `json:"event_id"`
ProjectID string `json:"project_id"`
SessionID string `json:"session.id"`
UserID string `json:"user.id,omitempty"`
EventType string `json:"event.type"`
EventName string `json:"event.name,omitempty"`
Timestamp string `json:"event.timestamp"`
PagePath string `json:"page.path,omitempty"`
APIMethod string `json:"api.method,omitempty"`
APIPath string `json:"api.path,omitempty"`
APIStatus int `json:"api.status_code,omitempty"`
APIDuration int `json:"api.duration_ms,omitempty"`
ErrorType string `json:"error.type,omitempty"`
ErrorMsg string `json:"error.message,omitempty"`
JourneyID string `json:"gen_ai.journey.id,omitempty"`
JourneyStep string `json:"gen_ai.journey.step.id,omitempty"`
JourneyName string `json:"gen_ai.journey.step.name,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
// New creates a ProductGraph client.
func New(cfg Config) *Client {
if cfg.BatchSize == 0 {
cfg.BatchSize = 50
}
if cfg.BatchInterval == 0 {
cfg.BatchInterval = 5 * time.Second
}
if cfg.HTTPClient == nil {
cfg.HTTPClient = &http.Client{Timeout: 10 * time.Second}
}
c := &Client{
config: cfg,
buffer: make([]Event, 0, cfg.BatchSize),
done: make(chan struct{}),
}
c.wg.Add(1)
go c.flusher()
return c
}
// Track sends an event to ProductGraph.
func (c *Client) Track(ctx context.Context, event Event) {
// Fill defaults
if event.EventID == "" {
event.EventID = uuid.New().String()
}
if event.ProjectID == "" {
event.ProjectID = c.config.ProjectID
}
if event.Timestamp == "" {
event.Timestamp = time.Now().UTC().Format(time.RFC3339)
}
// Extract session from context if not set
if event.SessionID == "" {
if sessionID, ok := ctx.Value("session_id").(string); ok {
event.SessionID = sessionID
}
}
c.mu.Lock()
c.buffer = append(c.buffer, event)
shouldFlush := len(c.buffer) >= c.config.BatchSize
c.mu.Unlock()
if shouldFlush {
c.flush()
}
}
// TrackAPICall tracks an API request/response.
func (c *Client) TrackAPICall(ctx context.Context, method, path string, status int, duration time.Duration) {
c.Track(ctx, Event{
EventType: "api.response",
APIMethod: method,
APIPath: path,
APIStatus: status,
APIDuration: int(duration.Milliseconds()),
})
}
// TrackError tracks an error event.
func (c *Client) TrackError(ctx context.Context, errType, message string) {
c.Track(ctx, Event{
EventType: "error",
ErrorType: errType,
ErrorMsg: message,
})
}
// TrackJourneyStep tracks a journey step completion.
func (c *Client) TrackJourneyStep(ctx context.Context, journeyID, stepID, stepName string) {
c.Track(ctx, Event{
EventType: "journey.step",
JourneyID: journeyID,
JourneyStep: stepID,
JourneyName: stepName,
})
}
func (c *Client) flusher() {
defer c.wg.Done()
ticker := time.NewTicker(c.config.BatchInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.flush()
case <-c.done:
c.flush() // Final flush
return
}
}
}
func (c *Client) flush() {
c.mu.Lock()
if len(c.buffer) == 0 {
c.mu.Unlock()
return
}
events := c.buffer
c.buffer = make([]Event, 0, c.config.BatchSize)
c.mu.Unlock()
body, _ := json.Marshal(map[string]interface{}{
"events": events,
})
req, _ := http.NewRequest("POST", c.config.Endpoint, bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
if c.config.APIKey != "" {
req.Header.Set("X-PG-API-Key", c.config.APIKey)
}
resp, err := c.config.HTTPClient.Do(req)
if err != nil {
// Log error, consider retry
return
}
defer resp.Body.Close()
}
// Close gracefully shuts down the client.
func (c *Client) Close() error {
close(c.done)
c.wg.Wait()
return nil
}
Observability Integration¶
Package: systemforge/observability
Extend existing observability to include ProductGraph.
// Add to observability/observability.go
// WithProductGraph adds ProductGraph client to provider.
func WithProductGraph(cfg productgraph.Config) ProviderOption {
return func(p *Provider) {
p.productgraph = productgraph.New(cfg)
}
}
// TrackEvent sends event to ProductGraph (if configured).
func (p *Provider) TrackEvent(ctx context.Context, event productgraph.Event) {
if p.productgraph != nil {
p.productgraph.Track(ctx, event)
}
}
Request Tracking Middleware¶
Package: systemforge/observability
Automatically track API requests.
// RequestTracker middleware tracks requests to ProductGraph.
func (p *Provider) RequestTracker(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// Wrap response writer to capture status
ww := &responseWriter{ResponseWriter: w, status: 200}
next.ServeHTTP(ww, r)
duration := time.Since(start)
// Track to ProductGraph
if p.productgraph != nil {
p.productgraph.TrackAPICall(
r.Context(),
r.Method,
r.URL.Path,
ww.status,
duration,
)
}
})
}
type responseWriter struct {
http.ResponseWriter
status int
}
func (w *responseWriter) WriteHeader(status int) {
w.status = status
w.ResponseWriter.WriteHeader(status)
}
Configuration¶
Environment Variables¶
| Variable | Description | Default |
|---|---|---|
PRODUCTGRAPH_ENABLED |
Enable ProductGraph integration | false |
PRODUCTGRAPH_PROJECT_ID |
Project identifier | - |
PRODUCTGRAPH_ENDPOINT |
API endpoint | - |
PRODUCTGRAPH_API_KEY |
API key (optional) | - |
PRODUCTGRAPH_BATCH_SIZE |
Events per batch | 50 |
PRODUCTGRAPH_BATCH_INTERVAL |
Flush interval | 5s |
Configuration Loading¶
func ConfigFromEnv() productgraph.Config {
return productgraph.Config{
ProjectID: os.Getenv("PRODUCTGRAPH_PROJECT_ID"),
Endpoint: os.Getenv("PRODUCTGRAPH_ENDPOINT"),
APIKey: os.Getenv("PRODUCTGRAPH_API_KEY"),
BatchSize: getEnvInt("PRODUCTGRAPH_BATCH_SIZE", 50),
BatchInterval: getEnvDuration("PRODUCTGRAPH_BATCH_INTERVAL", 5*time.Second),
}
}
Event Schema¶
API Response Event¶
{
"event_id": "uuid",
"project_id": "proj_backend",
"session.id": "sess_frontend_123",
"event.type": "api.response",
"event.timestamp": "2026-04-27T10:30:00Z",
"api.method": "POST",
"api.path": "/api/v1/checkout",
"api.status_code": 200,
"api.duration_ms": 150
}
Error Event¶
{
"event_id": "uuid",
"project_id": "proj_backend",
"session.id": "sess_frontend_123",
"event.type": "error",
"event.timestamp": "2026-04-27T10:30:00Z",
"error.type": "ValidationError",
"error.message": "invalid card number"
}
Journey Step Event¶
{
"event_id": "uuid",
"project_id": "proj_backend",
"session.id": "sess_frontend_123",
"event.type": "journey.step",
"event.timestamp": "2026-04-27T10:30:00Z",
"gen_ai.journey.id": "checkout_flow",
"gen_ai.journey.step.id": "payment_confirmed",
"gen_ai.journey.step.name": "Payment Confirmed"
}
Usage Example¶
package main
import (
"net/http"
"github.com/go-chi/chi/v5"
"github.com/grokify/systemforge/observability"
"github.com/grokify/systemforge/observability/correlation"
"github.com/grokify/systemforge/productgraph"
)
func main() {
// Create ProductGraph client
pgClient := productgraph.New(productgraph.Config{
ProjectID: "proj_demo",
Endpoint: "https://api.productgraph.io/v1/events",
APIKey: os.Getenv("PRODUCTGRAPH_API_KEY"),
})
defer pgClient.Close()
// Create observability provider with ProductGraph
provider := observability.New(
observability.ConfigFromEnv(),
observability.WithProductGraph(pgClient),
)
defer provider.Shutdown()
// Setup router
r := chi.NewRouter()
// Middleware chain
r.Use(correlation.Middleware) // Extract session/request IDs
r.Use(provider.TracingMiddleware) // OpenTelemetry tracing
r.Use(provider.RequestTracker) // ProductGraph tracking
r.Post("/api/checkout", func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
// Business logic...
// Track journey step from backend
pgClient.TrackJourneyStep(ctx, "checkout_flow", "payment_confirmed", "Payment Confirmed")
w.WriteHeader(http.StatusOK)
})
http.ListenAndServe(":8080", r)
}
Testing¶
Unit Tests¶
func TestCorrelationMiddleware(t *testing.T) {
handler := correlation.Middleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
sessionID := correlation.SessionIDFromContext(r.Context())
assert.Equal(t, "sess_123", sessionID)
}))
req := httptest.NewRequest("GET", "/", nil)
req.Header.Set("X-Session-ID", "sess_123")
handler.ServeHTTP(httptest.NewRecorder(), req)
}
func TestProductGraphClient(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "POST", r.Method)
assert.Equal(t, "application/json", r.Header.Get("Content-Type"))
var payload map[string]interface{}
json.NewDecoder(r.Body).Decode(&payload)
events := payload["events"].([]interface{})
assert.Len(t, events, 1)
w.WriteHeader(http.StatusAccepted)
}))
defer server.Close()
client := productgraph.New(productgraph.Config{
ProjectID: "test",
Endpoint: server.URL,
BatchSize: 1, // Immediate flush
})
defer client.Close()
ctx := context.WithValue(context.Background(), correlation.SessionIDKey, "sess_test")
client.Track(ctx, productgraph.Event{
EventType: "test",
})
time.Sleep(100 * time.Millisecond) // Wait for flush
}
Security¶
Header Validation¶
- Validate session ID format (UUID)
- Sanitize header values
- Rate limit by session
Data Protection¶
- No PII in event payloads
- Redact sensitive fields
- TLS required in production
Performance¶
Targets¶
| Metric | Target |
|---|---|
| Tracking overhead | < 1ms |
| Memory per event | < 1 KB |
| Batch flush latency | < 50ms |
Optimizations¶
- Async batching (non-blocking Track)
- Connection pooling
- Gzip compression for large batches
Related Documents¶
- PRD.md - Product requirements
- PLAN.md - Implementation plan
- TASKS.md - Task breakdown
- Observability TRD