diff --git a/src/lib/cursorTelemetryBuffer.test.ts b/src/lib/cursorTelemetryBuffer.test.ts index 5ffbc7a..309df7e 100644 --- a/src/lib/cursorTelemetryBuffer.test.ts +++ b/src/lib/cursorTelemetryBuffer.test.ts @@ -1,4 +1,4 @@ -import { describe, expect, it } from "vitest"; +import { describe, expect, it, vi } from "vitest"; import { type CursorTelemetryPoint, createCursorTelemetryBuffer } from "./cursorTelemetryBuffer"; function sample(tag: number): CursorTelemetryPoint { @@ -140,6 +140,56 @@ describe("createCursorTelemetryBuffer", () => { expect(buf.pendingCount).toBe(0); }); + it("endSession() returns the number of dropped batches and warns when the cap is exceeded", () => { + const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10, maxPendingBatches: 2 }); + const warn = vi.spyOn(console, "warn").mockImplementation(() => undefined); + + for (let round = 1; round <= 2; round++) { + buf.startSession(); + buf.push(sample(round)); + expect(buf.endSession()).toBe(0); + } + expect(warn).not.toHaveBeenCalled(); + + buf.startSession(); + buf.push(sample(3)); + const dropped = buf.endSession(); + expect(dropped).toBe(1); + expect(warn).toHaveBeenCalledTimes(1); + expect(warn.mock.calls[0]?.[0]).toMatch(/dropped 1 pending batch/); + expect(buf.pendingCount).toBe(2); + + warn.mockRestore(); + }); + + it("prependBatch() defensively trims and warns when it would exceed the cap", () => { + const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10, maxPendingBatches: 2 }); + const warn = vi.spyOn(console, "warn").mockImplementation(() => undefined); + + // Fill the queue to the cap without dropping anything. + for (let round = 1; round <= 2; round++) { + buf.startSession(); + buf.push(sample(round)); + buf.endSession(); + } + expect(buf.pendingCount).toBe(2); + expect(warn).not.toHaveBeenCalled(); + + // Simulate a misuse where a retry prepends without first draining: + // queue would grow to 3, so the oldest-trailing entry must be evicted. + buf.prependBatch([sample(99)]); + expect(buf.pendingCount).toBe(2); + expect(warn).toHaveBeenCalledTimes(1); + expect(warn.mock.calls[0]?.[0]).toMatch(/prependBatch trimmed 1 trailing batch/); + + // Front is the prepended batch; the preserved trailing batch is round 1. + expect(buf.takeNextBatch().map((s) => s.timeMs)).toEqual([99]); + expect(buf.takeNextBatch().map((s) => s.timeMs)).toEqual([1]); + expect(buf.pendingCount).toBe(0); + + warn.mockRestore(); + }); + it("reset() clears both active and pending state", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); buf.startSession(); diff --git a/src/lib/cursorTelemetryBuffer.ts b/src/lib/cursorTelemetryBuffer.ts index d812610..57db2ed 100644 --- a/src/lib/cursorTelemetryBuffer.ts +++ b/src/lib/cursorTelemetryBuffer.ts @@ -1,17 +1,89 @@ +/** + * A single cursor telemetry sample captured during a recording session. + * + * Coordinates (`cx`, `cy`) are device-pixel positions relative to the + * captured surface; `timeMs` is the offset from the recording's start. + */ export interface CursorTelemetryPoint { timeMs: number; cx: number; cy: number; } +/** + * Per-session cursor telemetry buffer with bounded memory. + * + * Flow: `startSession()` → `push(point)` N times → `endSession()` enqueues + * the collected samples as a completed batch. The main process later + * drains batches in FIFO order via `takeNextBatch()` to persist them to + * disk, and can `prependBatch()` on write failure to retry without losing + * order. + * + * Memory is bounded by `maxActiveSamples` (ring buffer on the in-progress + * batch) and `maxPendingBatches` (FIFO cap across completed batches). + */ export interface CursorTelemetryBuffer { + /** + * Begin a new recording session. Clears any in-progress active samples + * (without touching already-completed pending batches). Safe to call + * repeatedly — e.g. a rapid Stop → Record sequence. + */ startSession(): void; + + /** + * Append a telemetry sample to the current active session. When the + * active buffer exceeds `maxActiveSamples`, the oldest sample is + * dropped (ring behaviour). + */ push(point: CursorTelemetryPoint): void; - endSession(): void; + + /** + * Finalize the active session, moving its samples into the pending + * queue as a single batch. Empty sessions are dropped (no empty batch + * is enqueued). + * + * If the pending queue would exceed `maxPendingBatches`, the oldest + * batches are evicted to bound memory. A `console.warn` is emitted + * whenever at least one batch is dropped so that pathological rapid- + * restart scenarios are observable. + * + * @returns the number of pending batches dropped by this call (0 under + * normal operation). + */ + endSession(): number; + + /** + * Remove and return the oldest pending batch, or an empty array if + * the queue is empty. + */ takeNextBatch(): CursorTelemetryPoint[]; + + /** + * Re-insert a batch at the front of the queue, preserving FIFO order + * on retry paths (e.g. when persisting the batch failed and the + * caller wants the next `takeNextBatch()` to yield it again). + * + * Empty batches are ignored. The pending cap is enforced defensively + * — if prepending would push the queue past `maxPendingBatches`, the + * oldest entries are evicted and a `console.warn` is emitted. In + * normal retry usage this trim is a no-op because the caller has just + * removed the batch via `takeNextBatch()`. + */ prependBatch(batch: CursorTelemetryPoint[]): void; + + /** + * Drop the most recently enqueued pending batch. Used when a recording + * is discarded after `endSession()` but before it has been persisted. + * No-op on an empty queue. + */ discardLatestPending(): void; + + /** + * Clear both the active and pending state. Intended for tests and + * full teardown paths. + */ reset(): void; + readonly activeCount: number; readonly pendingCount: number; } @@ -23,6 +95,11 @@ export interface CursorTelemetryBufferOptions { const DEFAULT_MAX_PENDING_BATCHES = 8; +/** + * Create a cursor telemetry buffer. + * + * @see CursorTelemetryBuffer for the full lifecycle contract. + */ export function createCursorTelemetryBuffer( options: CursorTelemetryBufferOptions, ): CursorTelemetryBuffer { @@ -43,20 +120,37 @@ export function createCursorTelemetryBuffer( } }, endSession() { + let dropped = 0; if (active.length > 0) { pending.push(active); while (pending.length > maxPending) { pending.shift(); + dropped++; } } active = []; + if (dropped > 0) { + console.warn( + `[cursorTelemetryBuffer] dropped ${dropped} pending batch(es) to stay within maxPendingBatches=${maxPending}`, + ); + } + return dropped; }, takeNextBatch() { return pending.shift() ?? []; }, prependBatch(batch) { - if (batch.length > 0) { - pending.unshift(batch); + if (batch.length === 0) return; + pending.unshift(batch); + let dropped = 0; + while (pending.length > maxPending) { + pending.pop(); + dropped++; + } + if (dropped > 0) { + console.warn( + `[cursorTelemetryBuffer] prependBatch trimmed ${dropped} trailing batch(es) to stay within maxPendingBatches=${maxPending}`, + ); } }, discardLatestPending() {