kavachOS

Event streaming

Real-time SSE feed of auth events, agent lifecycle, and anomalies.

What event streaming is

Event streaming gives you a persistent, real-time connection to KavachOS events via Server-Sent Events (SSE). The moment an agent is revoked, a budget is exceeded, or an anomaly is detected, connected clients receive the event — no polling required.

Events are also persisted to the database so you can replay anything you missed.

Streaming vs webhooks

Both webhooks and SSE carry the same events, but they serve different use cases.

WebhooksEvent streaming
TransportHTTP POST to your serverPersistent HTTP connection to your browser or service
Latency~100–500ms (network + retry)Near-instant
AuthVerified by HMAC signatureBearer token
Missed eventsRetried up to 3 times, then droppedReplay via since cursor
Best forBackend integrations, data pipelinesDashboards, SOC tooling, live monitoring

Use webhooks when you need durable delivery to an external service. Use event streaming when you need a live view — a security dashboard, an admin feed, or a CI script watching for anomaly.detected.

Setup

import { createKavach } from 'kavachos';
import { createEventStreamModule } from 'kavachos/auth';

const kavach = await createKavach({
  database: { provider: 'sqlite', url: 'kavach.db' },
});

const stream = createEventStreamModule({
  db: kavach.db,
  requireAuth: true,
  validateToken: async (token) => {
    // Return the subscriber ID (userId or agentId) on success, null on failure.
    const session = await kavach.session.verify(token);
    return session?.userId ?? null;
  },
});

// Mount in your HTTP handler
app.get('/api/kavach/events/stream', (req, res) => {
  const response = stream.handleRequest(req);
  if (response) return response;
});

// Emit from anywhere in your application
stream.emit({
  id: crypto.randomUUID(),
  type: 'agent.created',
  timestamp: new Date(),
  data: { agentId: agent.id, name: agent.name },
});

handleRequest returns null for any request that is not a valid SSE request (wrong path, wrong Accept header, or non-GET method). This makes it safe to call inside a catch-all handler.

Connecting from a browser

The browser's built-in EventSource API handles reconnection automatically.

const source = new EventSource(
  '/api/kavach/events/stream?token=your-bearer-token',
);

source.addEventListener('agent.created', (event) => {
  const data = JSON.parse(event.data);
  console.log('New agent:', data.agentId);
});

source.addEventListener('anomaly.detected', (event) => {
  const data = JSON.parse(event.data);
  console.warn('Anomaly:', data);
});

source.addEventListener('error', (event) => {
  const data = JSON.parse((event as MessageEvent).data);
  console.error('Stream error:', data.code);
});

// Clean up
source.close();

If you pass the token via the Authorization header instead, use the eventsource package or a fetch-based polyfill, since the native EventSource does not support custom headers.

Connecting from Node.js

import { EventSource } from 'eventsource';

const source = new EventSource('https://your-app.com/api/kavach/events/stream', {
  headers: {
    Authorization: 'Bearer your-bearer-token',
  },
});

source.onmessage = (event) => {
  const data = JSON.parse(event.data);
  console.log('Event:', data.type, data);
};

source.onerror = () => {
  console.error('Connection lost, reconnecting...');
};

Event types

TypeWhen it fires
auditAny agent action logged to the audit trail
agent.createdA new agent identity is registered
agent.revokedAn agent is permanently revoked
agent.rotatedAn agent token is rotated
auth.signinA user signs in
auth.signoutA user signs out
auth.failedA sign-in attempt fails
delegation.createdA new delegation chain is created
delegation.revokedA delegation is revoked
budget.exceededAn agent exceeds its cost budget
anomaly.detectedThe anomaly detector flags unusual behaviour
cost.recordedA cost event is attributed to an agent

Filtering events

Pass a types query parameter with a comma-separated list to receive only the events you care about.

GET /api/kavach/events/stream?types=anomaly.detected,budget.exceeded
// From a browser
const source = new EventSource(
  '/api/kavach/events/stream?token=tok&types=agent.created,agent.revoked',
);

You can also restrict the types at the module level so no client can subscribe to events outside the allowed set.

const stream = createEventStreamModule({
  db: kavach.db,
  requireAuth: true,
  validateToken,
  eventTypes: ['audit', 'anomaly.detected'], // only these types are ever delivered
});

Replay and cursor

Events are persisted in the kavach_stream_events table. If a client disconnects and reconnects, pass since to receive everything it missed.

GET /api/kavach/events/stream?since=2026-01-15T10:00:00Z

The browser EventSource can also pass the last received event ID via the Last-Event-ID header, which the browser manages automatically when you set the id: field on SSE events. KavachOS uses the Last-Event-ID value as the replay cursor.

// Programmatic replay without an open connection
const result = await stream.replay(new Date('2026-01-15T10:00:00Z'), ['audit', 'auth.failed']);
if (result.success) {
  for (const event of result.data) {
    console.log(event.type, event.data);
  }
}

replay returns up to 1000 events in descending order (newest first). Apply your own pagination on top if you need to page through large windows.

Auth requirements

By default requireAuth: true. Every connection must present a valid Bearer token, either in the Authorization header or as the token query parameter.

Authorization: Bearer <token>
GET /api/kavach/events/stream?token=<token>

When the token is invalid, the stream sends a single error event and closes.

{ "code": "UNAUTHORIZED", "message": "Invalid token" }

To disable auth for local development or internal-only deployments:

const stream = createEventStreamModule({
  db: kavach.db,
  requireAuth: false,
});

Never disable auth in production. The stream exposes audit events and agent lifecycle data.

Configuration

interface EventStreamConfig {
  db: Database;

  // Maximum concurrent connections — returns 503 when exceeded (default: 100)
  maxConnections?: number;

  // Interval between heartbeat comments in ms (default: 30000)
  heartbeatIntervalMs?: number;

  // Module-level event type allow-list — clients cannot exceed this (default: all)
  eventTypes?: EventType[];

  // Require a Bearer token to connect (default: true)
  requireAuth?: boolean;

  // Validate the token and return subscriber ID (userId or agentId), or null
  validateToken?: (token: string) => Promise<string | null>;
}

Connection limits

The stream rejects connections beyond maxConnections with a 503 Too many connections response. Size this based on your deployment: a single process can comfortably handle hundreds of concurrent SSE connections; above that, consider a pub/sub layer (Redis, NATS) in front of the module.

Heartbeat

The server sends : heartbeat comments on the configured interval (default 30 seconds) to keep load balancers and proxies from closing idle connections. No action is required on the client side — EventSource ignores comment lines.

Emitting events from plugins

Any part of your application can emit to the stream.

// After revoking an agent
await kavach.agent.revoke(agentId);

stream.emit({
  id: crypto.randomUUID(),
  type: 'agent.revoked',
  timestamp: new Date(),
  data: { agentId, reason: 'manual-revocation' },
  agentId,
  userId: currentUser.id,
});

Integrate with the webhooks module to fire both a webhook and a stream event from the same action.

Module API

interface EventStreamModule {
  // Emit an event to all connected clients and persist for replay
  emit(event: StreamEvent): void;

  // Handle an SSE connection request — returns a Response or null
  handleRequest(request: Request): Response | null;

  // Number of currently active connections
  getConnectionCount(): number;

  // Replay persisted events since a date, optionally filtered by type
  replay(since: Date, types?: EventType[]): Promise<Result<StreamEvent[]>>;

  // Close all connections and stop the heartbeat timer
  close(): void;
}

On this page