Event streaming
Real-time SSE feed of auth events, agent lifecycle, and anomalies.
What event streaming is
Event streaming gives you a persistent, real-time connection to KavachOS events via Server-Sent Events (SSE). The moment an agent is revoked, a budget is exceeded, or an anomaly is detected, connected clients receive the event — no polling required.
Events are also persisted to the database so you can replay anything you missed.
Streaming vs webhooks
Both webhooks and SSE carry the same events, but they serve different use cases.
| Webhooks | Event streaming | |
|---|---|---|
| Transport | HTTP POST to your server | Persistent HTTP connection to your browser or service |
| Latency | ~100–500ms (network + retry) | Near-instant |
| Auth | Verified by HMAC signature | Bearer token |
| Missed events | Retried up to 3 times, then dropped | Replay via since cursor |
| Best for | Backend integrations, data pipelines | Dashboards, SOC tooling, live monitoring |
Use webhooks when you need durable delivery to an external service. Use event streaming when you need a live view — a security dashboard, an admin feed, or a CI script watching for anomaly.detected.
Setup
import { createKavach } from 'kavachos';
import { createEventStreamModule } from 'kavachos/auth';
const kavach = await createKavach({
database: { provider: 'sqlite', url: 'kavach.db' },
});
const stream = createEventStreamModule({
db: kavach.db,
requireAuth: true,
validateToken: async (token) => {
// Return the subscriber ID (userId or agentId) on success, null on failure.
const session = await kavach.session.verify(token);
return session?.userId ?? null;
},
});
// Mount in your HTTP handler
app.get('/api/kavach/events/stream', (req, res) => {
const response = stream.handleRequest(req);
if (response) return response;
});
// Emit from anywhere in your application
stream.emit({
id: crypto.randomUUID(),
type: 'agent.created',
timestamp: new Date(),
data: { agentId: agent.id, name: agent.name },
});handleRequest returns null for any request that is not a valid SSE request (wrong path, wrong Accept header, or non-GET method). This makes it safe to call inside a catch-all handler.
Connecting from a browser
The browser's built-in EventSource API handles reconnection automatically.
const source = new EventSource(
'/api/kavach/events/stream?token=your-bearer-token',
);
source.addEventListener('agent.created', (event) => {
const data = JSON.parse(event.data);
console.log('New agent:', data.agentId);
});
source.addEventListener('anomaly.detected', (event) => {
const data = JSON.parse(event.data);
console.warn('Anomaly:', data);
});
source.addEventListener('error', (event) => {
const data = JSON.parse((event as MessageEvent).data);
console.error('Stream error:', data.code);
});
// Clean up
source.close();If you pass the token via the Authorization header instead, use the eventsource package or a fetch-based polyfill, since the native EventSource does not support custom headers.
Connecting from Node.js
import { EventSource } from 'eventsource';
const source = new EventSource('https://your-app.com/api/kavach/events/stream', {
headers: {
Authorization: 'Bearer your-bearer-token',
},
});
source.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log('Event:', data.type, data);
};
source.onerror = () => {
console.error('Connection lost, reconnecting...');
};Event types
| Type | When it fires |
|---|---|
audit | Any agent action logged to the audit trail |
agent.created | A new agent identity is registered |
agent.revoked | An agent is permanently revoked |
agent.rotated | An agent token is rotated |
auth.signin | A user signs in |
auth.signout | A user signs out |
auth.failed | A sign-in attempt fails |
delegation.created | A new delegation chain is created |
delegation.revoked | A delegation is revoked |
budget.exceeded | An agent exceeds its cost budget |
anomaly.detected | The anomaly detector flags unusual behaviour |
cost.recorded | A cost event is attributed to an agent |
Filtering events
Pass a types query parameter with a comma-separated list to receive only the events you care about.
GET /api/kavach/events/stream?types=anomaly.detected,budget.exceeded// From a browser
const source = new EventSource(
'/api/kavach/events/stream?token=tok&types=agent.created,agent.revoked',
);You can also restrict the types at the module level so no client can subscribe to events outside the allowed set.
const stream = createEventStreamModule({
db: kavach.db,
requireAuth: true,
validateToken,
eventTypes: ['audit', 'anomaly.detected'], // only these types are ever delivered
});Replay and cursor
Events are persisted in the kavach_stream_events table. If a client disconnects and reconnects, pass since to receive everything it missed.
GET /api/kavach/events/stream?since=2026-01-15T10:00:00ZThe browser EventSource can also pass the last received event ID via the Last-Event-ID header, which the browser manages automatically when you set the id: field on SSE events. KavachOS uses the Last-Event-ID value as the replay cursor.
// Programmatic replay without an open connection
const result = await stream.replay(new Date('2026-01-15T10:00:00Z'), ['audit', 'auth.failed']);
if (result.success) {
for (const event of result.data) {
console.log(event.type, event.data);
}
}replay returns up to 1000 events in descending order (newest first). Apply your own pagination on top if you need to page through large windows.
Auth requirements
By default requireAuth: true. Every connection must present a valid Bearer token, either in the Authorization header or as the token query parameter.
Authorization: Bearer <token>
GET /api/kavach/events/stream?token=<token>When the token is invalid, the stream sends a single error event and closes.
{ "code": "UNAUTHORIZED", "message": "Invalid token" }To disable auth for local development or internal-only deployments:
const stream = createEventStreamModule({
db: kavach.db,
requireAuth: false,
});Never disable auth in production. The stream exposes audit events and agent lifecycle data.
Configuration
interface EventStreamConfig {
db: Database;
// Maximum concurrent connections — returns 503 when exceeded (default: 100)
maxConnections?: number;
// Interval between heartbeat comments in ms (default: 30000)
heartbeatIntervalMs?: number;
// Module-level event type allow-list — clients cannot exceed this (default: all)
eventTypes?: EventType[];
// Require a Bearer token to connect (default: true)
requireAuth?: boolean;
// Validate the token and return subscriber ID (userId or agentId), or null
validateToken?: (token: string) => Promise<string | null>;
}Connection limits
The stream rejects connections beyond maxConnections with a 503 Too many connections response. Size this based on your deployment: a single process can comfortably handle hundreds of concurrent SSE connections; above that, consider a pub/sub layer (Redis, NATS) in front of the module.
Heartbeat
The server sends : heartbeat comments on the configured interval (default 30 seconds) to keep load balancers and proxies from closing idle connections. No action is required on the client side — EventSource ignores comment lines.
Emitting events from plugins
Any part of your application can emit to the stream.
// After revoking an agent
await kavach.agent.revoke(agentId);
stream.emit({
id: crypto.randomUUID(),
type: 'agent.revoked',
timestamp: new Date(),
data: { agentId, reason: 'manual-revocation' },
agentId,
userId: currentUser.id,
});Integrate with the webhooks module to fire both a webhook and a stream event from the same action.
Module API
interface EventStreamModule {
// Emit an event to all connected clients and persist for replay
emit(event: StreamEvent): void;
// Handle an SSE connection request — returns a Response or null
handleRequest(request: Request): Response | null;
// Number of currently active connections
getConnectionCount(): number;
// Replay persisted events since a date, optionally filtered by type
replay(since: Date, types?: EventType[]): Promise<Result<StreamEvent[]>>;
// Close all connections and stop the heartbeat timer
close(): void;
}