Merge pull request #457 from shaun0927/fix/cursor-telemetry-session-isolation

fix: isolate cursor telemetry samples per recording session
This commit is contained in:
Sid
2026-04-28 08:08:08 -07:00
committed by GitHub
6 changed files with 529 additions and 29 deletions
+2 -1
View File
@@ -63,7 +63,8 @@ interface Window {
message?: string;
error?: string;
}>;
setRecordingState: (recording: boolean) => Promise<void>;
setRecordingState: (recording: boolean, recordingId?: number) => Promise<void>;
discardCursorTelemetry: (recordingId: number) => Promise<void>;
getCursorTelemetry: (videoPath?: string) => Promise<{
success: boolean;
samples: CursorTelemetryPoint[];
+36 -25
View File
@@ -11,6 +11,10 @@ import {
shell,
systemPreferences,
} from "electron";
import {
type CursorTelemetryPoint,
createCursorTelemetryBuffer,
} from "../../src/lib/cursorTelemetryBuffer";
import {
normalizeProjectMedia,
normalizeRecordingSession,
@@ -275,14 +279,23 @@ async function storeRecordedSessionFiles(payload: StoreRecordedSessionInput) {
currentProjectPath = null;
const telemetryPath = `${screenVideoPath}.cursor.json`;
if (pendingCursorSamples.length > 0) {
await fs.writeFile(
telemetryPath,
JSON.stringify({ version: CURSOR_TELEMETRY_VERSION, samples: pendingCursorSamples }, null, 2),
"utf-8",
);
const pendingBatch = cursorTelemetryBuffer.takeNextBatch();
if (pendingBatch && pendingBatch.samples.length > 0) {
try {
await fs.writeFile(
telemetryPath,
JSON.stringify(
{ version: CURSOR_TELEMETRY_VERSION, samples: pendingBatch.samples },
null,
2,
),
"utf-8",
);
} catch (err) {
cursorTelemetryBuffer.prependBatch(pendingBatch);
throw err;
}
}
pendingCursorSamples = [];
const sessionManifestPath = path.join(
RECORDINGS_DIR,
@@ -302,16 +315,11 @@ const CURSOR_TELEMETRY_VERSION = 1;
const CURSOR_SAMPLE_INTERVAL_MS = 100;
const MAX_CURSOR_SAMPLES = 60 * 60 * 10; // 1 hour @ 10Hz
interface CursorTelemetryPoint {
timeMs: number;
cx: number;
cy: number;
}
let cursorCaptureInterval: NodeJS.Timeout | null = null;
let cursorCaptureStartTimeMs = 0;
let activeCursorSamples: CursorTelemetryPoint[] = [];
let pendingCursorSamples: CursorTelemetryPoint[] = [];
const cursorTelemetryBuffer = createCursorTelemetryBuffer({
maxActiveSamples: MAX_CURSOR_SAMPLES,
});
function clamp(value: number, min: number, max: number) {
return Math.min(max, Math.max(min, value));
@@ -338,15 +346,11 @@ function sampleCursorPoint() {
const cx = clamp((cursor.x - bounds.x) / width, 0, 1);
const cy = clamp((cursor.y - bounds.y) / height, 0, 1);
activeCursorSamples.push({
cursorTelemetryBuffer.push({
timeMs: Math.max(0, Date.now() - cursorCaptureStartTimeMs),
cx,
cy,
});
if (activeCursorSamples.length > MAX_CURSOR_SAMPLES) {
activeCursorSamples.shift();
}
}
export function registerIpcHandlers(
@@ -696,18 +700,21 @@ export function registerIpcHandlers(
}
});
ipcMain.handle("set-recording-state", (_, recording: boolean) => {
ipcMain.handle("set-recording-state", (_, recording: boolean, recordingId?: number) => {
if (recording) {
stopCursorCapture();
activeCursorSamples = [];
pendingCursorSamples = [];
// The renderer is the source of truth for the recording id (it
// uses the same id as the saved fileName). Fall back to a
// timestamp only if the renderer didn't supply one, so the
// buffer always has a stable key per session.
const id = typeof recordingId === "number" ? recordingId : Date.now();
cursorTelemetryBuffer.startSession(id);
cursorCaptureStartTimeMs = Date.now();
sampleCursorPoint();
cursorCaptureInterval = setInterval(sampleCursorPoint, CURSOR_SAMPLE_INTERVAL_MS);
} else {
stopCursorCapture();
pendingCursorSamples = [...activeCursorSamples];
activeCursorSamples = [];
cursorTelemetryBuffer.endSession();
}
const source = selectedSource || { name: "Screen" };
@@ -716,6 +723,10 @@ export function registerIpcHandlers(
}
});
ipcMain.handle("discard-cursor-telemetry", (_, recordingId: number) => {
cursorTelemetryBuffer.discardBatch(recordingId);
});
ipcMain.handle("get-cursor-telemetry", async (_, videoPath?: string) => {
const targetVideoPath = normalizeVideoSourcePath(
videoPath ?? currentRecordingSession?.screenVideoPath,
+5 -2
View File
@@ -51,12 +51,15 @@ contextBridge.exposeInMainWorld("electronAPI", {
getRecordedVideoPath: () => {
return ipcRenderer.invoke("get-recorded-video-path");
},
setRecordingState: (recording: boolean) => {
return ipcRenderer.invoke("set-recording-state", recording);
setRecordingState: (recording: boolean, recordingId?: number) => {
return ipcRenderer.invoke("set-recording-state", recording, recordingId);
},
getCursorTelemetry: (videoPath?: string) => {
return ipcRenderer.invoke("get-cursor-telemetry", videoPath);
},
discardCursorTelemetry: (recordingId: number) => {
return ipcRenderer.invoke("discard-cursor-telemetry", recordingId);
},
onStopRecordingFromTray: (callback: () => void) => {
const listener = () => callback();
ipcRenderer.on("stop-recording-from-tray", listener);
+2 -1
View File
@@ -308,6 +308,7 @@ export function useScreenRecorder(): UseScreenRecorderReturn {
try {
const screenBlob = await activeScreenRecorder.recordedBlobPromise;
if (discardRecordingId.current === activeRecordingId) {
window.electronAPI?.discardCursorTelemetry(activeRecordingId);
return;
}
if (screenBlob.size === 0) {
@@ -772,7 +773,7 @@ export function useScreenRecorder(): UseScreenRecorderReturn {
setRecording(true);
setPaused(false);
setElapsedSeconds(0);
window.electronAPI?.setRecordingState(true);
window.electronAPI?.setRecordingState(true, recordingId.current);
const activeScreenRecorder = screenRecorder.current;
const activeWebcamRecorder = webcamRecorder.current;
+271
View File
@@ -0,0 +1,271 @@
import { describe, expect, it, vi } from "vitest";
import { type CursorTelemetryPoint, createCursorTelemetryBuffer } from "./cursorTelemetryBuffer";
function sample(tag: number): CursorTelemetryPoint {
// Decouple the timestamp tag from the coordinate fixture so cursor
// points stay inside the normalized [0, 1] range that real samples use.
const normalized = (tag % 100) / 100;
return { timeMs: tag, cx: normalized, cy: normalized };
}
describe("createCursorTelemetryBuffer", () => {
it("stores samples captured during an active session", () => {
const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 });
buf.startSession(1);
for (let i = 0; i < 3; i++) buf.push(sample(i));
buf.endSession();
const batch = buf.takeNextBatch();
expect(batch?.recordingId).toBe(1);
expect(batch?.samples).toHaveLength(3);
expect(batch?.samples[0]?.timeMs).toBe(0);
});
it("trims active samples past maxActiveSamples (ring behaviour)", () => {
const buf = createCursorTelemetryBuffer({ maxActiveSamples: 2 });
buf.startSession(1);
buf.push(sample(1));
buf.push(sample(2));
buf.push(sample(3));
buf.endSession();
const batch = buf.takeNextBatch();
expect(batch?.samples).toEqual([sample(2), sample(3)]);
});
it("preserves earlier pending batches when a new session starts before store", () => {
const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 });
// Recording 1
buf.startSession(1);
buf.push(sample(101));
buf.push(sample(102));
buf.endSession();
// Recording 2 starts before recording 1's batch has been consumed
buf.startSession(2);
buf.push(sample(201));
buf.endSession();
const batch1 = buf.takeNextBatch();
const batch2 = buf.takeNextBatch();
expect(batch1?.recordingId).toBe(1);
expect(batch1?.samples.map((s) => s.timeMs)).toEqual([101, 102]);
expect(batch2?.recordingId).toBe(2);
expect(batch2?.samples.map((s) => s.timeMs)).toEqual([201]);
});
it("returns null when nothing is pending", () => {
const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 });
expect(buf.takeNextBatch()).toBeNull();
});
it("drops empty sessions instead of queuing empty batches", () => {
const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 });
buf.startSession(1);
buf.endSession();
expect(buf.pendingCount).toBe(0);
expect(buf.takeNextBatch()).toBeNull();
});
it("caps the pending queue at maxPendingBatches to bound memory", () => {
const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10, maxPendingBatches: 3 });
for (let round = 1; round <= 5; round++) {
buf.startSession(round);
buf.push(sample(round));
buf.endSession();
}
expect(buf.pendingCount).toBe(3);
// Oldest two batches (rounds 1 and 2) should have been dropped
expect(buf.takeNextBatch()?.recordingId).toBe(3);
expect(buf.takeNextBatch()?.recordingId).toBe(4);
expect(buf.takeNextBatch()?.recordingId).toBe(5);
});
it("starting a new session clears in-progress samples but keeps pending batches", () => {
const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 });
buf.startSession(1);
buf.push(sample(1));
buf.endSession();
buf.startSession(2);
buf.push(sample(99));
// Simulate another startSession before endSession (e.g. rapid restart)
buf.startSession(3);
expect(buf.activeCount).toBe(0);
expect(buf.pendingCount).toBe(1);
const batch = buf.takeNextBatch();
expect(batch?.recordingId).toBe(1);
expect(batch?.samples.map((s) => s.timeMs)).toEqual([1]);
});
it("discardBatch(id) drops only the batch produced by that recording id", () => {
const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 });
buf.startSession(1);
buf.push(sample(1));
buf.endSession();
buf.startSession(2);
buf.push(sample(2));
buf.endSession();
expect(buf.pendingCount).toBe(2);
expect(buf.discardBatch(1)).toBe(true);
expect(buf.pendingCount).toBe(1);
expect(buf.takeNextBatch()?.recordingId).toBe(2);
});
it("discardBatch(id) targets the correct batch even when a later recording sits in front of it", () => {
// Regression test for the rapid Stop → Record → Discard sequence:
// recording A's finalize callback does async work (fixWebmDuration),
// recording B finishes in the meantime, then A's callback resolves
// with discard intent. The discard must drop A — not B, which
// happens to be the *latest* pending batch by the time discard runs.
const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 });
buf.startSession(1);
buf.push(sample(11));
buf.endSession();
buf.startSession(2);
buf.push(sample(22));
buf.endSession();
expect(buf.pendingCount).toBe(2);
expect(buf.discardBatch(1)).toBe(true);
const remaining = buf.takeNextBatch();
expect(remaining?.recordingId).toBe(2);
expect(remaining?.samples.map((s) => s.timeMs)).toEqual([22]);
expect(buf.takeNextBatch()).toBeNull();
});
it("discardBatch(id) is a no-op (returns false) when the id is unknown or already drained", () => {
const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 });
expect(buf.discardBatch(42)).toBe(false);
buf.startSession(1);
buf.push(sample(1));
buf.endSession();
buf.takeNextBatch();
expect(buf.discardBatch(1)).toBe(false);
expect(buf.pendingCount).toBe(0);
});
it("prependBatch() re-inserts a batch at the front of the queue", () => {
const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 });
buf.startSession(1);
buf.push(sample(1));
buf.endSession();
const batch = buf.takeNextBatch();
expect(batch).not.toBeNull();
expect(buf.pendingCount).toBe(0);
if (batch) buf.prependBatch(batch);
expect(buf.pendingCount).toBe(1);
const next = buf.takeNextBatch();
expect(next?.recordingId).toBe(1);
expect(next?.samples.map((s) => s.timeMs)).toEqual([1]);
});
it("prependBatch() ignores empty batches", () => {
const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 });
buf.prependBatch({ recordingId: 1, samples: [] });
expect(buf.pendingCount).toBe(0);
});
it("endSession() returns the number of dropped batches and warns when the cap is exceeded", () => {
const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10, maxPendingBatches: 2 });
const warn = vi.spyOn(console, "warn").mockImplementation(() => undefined);
for (let round = 1; round <= 2; round++) {
buf.startSession(round);
buf.push(sample(round));
expect(buf.endSession()).toBe(0);
}
expect(warn).not.toHaveBeenCalled();
buf.startSession(3);
buf.push(sample(3));
const dropped = buf.endSession();
expect(dropped).toBe(1);
expect(warn).toHaveBeenCalledTimes(1);
expect(warn.mock.calls[0]?.[0]).toMatch(/dropped 1 pending batch/);
expect(buf.pendingCount).toBe(2);
warn.mockRestore();
});
it("prependBatch() defensively trims and warns when it would exceed the cap", () => {
const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10, maxPendingBatches: 2 });
const warn = vi.spyOn(console, "warn").mockImplementation(() => undefined);
// Fill the queue to the cap without dropping anything.
for (let round = 1; round <= 2; round++) {
buf.startSession(round);
buf.push(sample(round));
buf.endSession();
}
expect(buf.pendingCount).toBe(2);
expect(warn).not.toHaveBeenCalled();
// Simulate a misuse where a retry prepends without first draining:
// queue would grow to 3, so the oldest-trailing entry must be evicted.
buf.prependBatch({ recordingId: 99, samples: [sample(99)] });
expect(buf.pendingCount).toBe(2);
expect(warn).toHaveBeenCalledTimes(1);
expect(warn.mock.calls[0]?.[0]).toMatch(/prependBatch trimmed 1 trailing batch/);
// Front is the prepended batch; the preserved trailing batch is round 1.
expect(buf.takeNextBatch()?.recordingId).toBe(99);
expect(buf.takeNextBatch()?.recordingId).toBe(1);
expect(buf.pendingCount).toBe(0);
warn.mockRestore();
});
it("sanitizes non-finite or non-positive option values to safe defaults", () => {
// Infinity / NaN / negative would otherwise turn the trim loops
// into infinite loops. The buffer must fall back to defaults.
const buf = createCursorTelemetryBuffer({
maxActiveSamples: Number.POSITIVE_INFINITY,
maxPendingBatches: Number.NaN,
});
buf.startSession(1);
buf.push(sample(1));
expect(() => buf.endSession()).not.toThrow();
expect(buf.pendingCount).toBe(1);
const buf2 = createCursorTelemetryBuffer({
maxActiveSamples: -5,
maxPendingBatches: 0,
});
buf2.startSession(2);
buf2.push(sample(2));
expect(() => buf2.endSession()).not.toThrow();
expect(buf2.pendingCount).toBe(1);
});
it("reset() clears both active and pending state", () => {
const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 });
buf.startSession(1);
buf.push(sample(1));
buf.endSession();
buf.startSession(2);
buf.push(sample(2));
buf.reset();
expect(buf.activeCount).toBe(0);
expect(buf.pendingCount).toBe(0);
expect(buf.takeNextBatch()).toBeNull();
});
});
+213
View File
@@ -0,0 +1,213 @@
/**
* A single cursor telemetry sample captured during a recording session.
*
* Coordinates (`cx`, `cy`) are clamped ratios in the `[0, 1]` range,
* normalised against the captured surface's width and height by the
* main-process `sampleCursorPoint()` before being pushed. `timeMs` is the
* offset (in milliseconds) from the recording's start.
*/
export interface CursorTelemetryPoint {
timeMs: number;
cx: number;
cy: number;
}
/**
* A completed batch of cursor samples, tagged with the recording id that
* produced them. The id is supplied at `startSession()` time and travels
* with the batch through the pending queue, retries, and discards.
*/
export interface CursorTelemetryBatch {
recordingId: number;
samples: CursorTelemetryPoint[];
}
/**
* Per-session cursor telemetry buffer with bounded memory.
*
* Flow: `startSession(recordingId)` → `push(point)` N times → `endSession()`
* enqueues the collected samples as a completed batch tagged with that
* `recordingId`. The main process later drains batches in FIFO order via
* `takeNextBatch()` to persist them to disk, and can `prependBatch()` on
* write failure to retry without losing order. A discard request keys on
* the recording id so an asynchronous "discard recording A" decision that
* arrives after recording B has already enqueued its batch still drops
* the right one.
*
* Memory is bounded by `maxActiveSamples` (ring buffer on the in-progress
* batch) and `maxPendingBatches` (FIFO cap across completed batches).
*/
export interface CursorTelemetryBuffer {
/**
* Begin a new recording session under the given `recordingId`. Clears
* any in-progress active samples (without touching already-completed
* pending batches). Safe to call repeatedly — e.g. a rapid Stop →
* Record sequence — and the most recent id wins.
*/
startSession(recordingId: number): void;
/**
* Append a telemetry sample to the current active session. When the
* active buffer exceeds `maxActiveSamples`, the oldest sample is
* dropped (ring behaviour).
*/
push(point: CursorTelemetryPoint): void;
/**
* Finalize the active session, moving its samples into the pending
* queue as a single batch tagged with the current recording id. Empty
* sessions are dropped (no empty batch is enqueued).
*
* If the pending queue would exceed `maxPendingBatches`, the oldest
* batches are evicted to bound memory. A `console.warn` is emitted
* whenever at least one batch is dropped so that pathological rapid-
* restart scenarios are observable.
*
* @returns the number of pending batches dropped by this call (0 under
* normal operation).
*/
endSession(): number;
/**
* Remove and return the oldest pending batch, or `null` if the queue
* is empty.
*/
takeNextBatch(): CursorTelemetryBatch | null;
/**
* Re-insert a batch at the front of the queue, preserving FIFO order
* on retry paths (e.g. when persisting the batch failed and the
* caller wants the next `takeNextBatch()` to yield it again).
*
* Empty batches are ignored. The pending cap is enforced defensively
* — if prepending would push the queue past `maxPendingBatches`, the
* oldest entries are evicted and a `console.warn` is emitted. In
* normal retry usage this trim is a no-op because the caller has just
* removed the batch via `takeNextBatch()`.
*/
prependBatch(batch: CursorTelemetryBatch): void;
/**
* Drop the pending batch produced by the given `recordingId`. Used
* when a recording is discarded after its `endSession()` has run but
* before it has been persisted. Returns `true` if a batch was
* removed, `false` otherwise (no matching id, or the batch was
* already drained).
*
* Keying on the recording id (rather than "the latest pending batch")
* avoids a real bug: when finalizing a recording does asynchronous
* work like `fixWebmDuration`, a quick Stop → Record → Discard
* sequence can interleave such that the latest pending batch belongs
* to a *later* recording than the one being discarded.
*/
discardBatch(recordingId: number): boolean;
/**
* Clear both the active and pending state. Intended for tests and
* full teardown paths.
*/
reset(): void;
readonly activeCount: number;
readonly pendingCount: number;
}
export interface CursorTelemetryBufferOptions {
maxActiveSamples: number;
maxPendingBatches?: number;
}
const DEFAULT_MAX_PENDING_BATCHES = 8;
const DEFAULT_MAX_ACTIVE_SAMPLES = 10_000;
/** Coerce a numeric option into a safe, finite, positive integer. */
function sanitizeLimit(value: number | undefined, fallback: number): number {
if (typeof value !== "number" || !Number.isFinite(value)) return fallback;
const floored = Math.floor(value);
return floored >= 1 ? floored : fallback;
}
/**
* Create a cursor telemetry buffer.
*
* Numeric options are sanitized: non-finite, negative, or zero values fall
* back to safe defaults so a bad caller cannot disable the memory bounds
* (which would turn the trim loops into infinite loops).
*
* @see CursorTelemetryBuffer for the full lifecycle contract.
*/
export function createCursorTelemetryBuffer(
options: CursorTelemetryBufferOptions,
): CursorTelemetryBuffer {
const maxActive = sanitizeLimit(options.maxActiveSamples, DEFAULT_MAX_ACTIVE_SAMPLES);
const maxPending = sanitizeLimit(options.maxPendingBatches, DEFAULT_MAX_PENDING_BATCHES);
let active: CursorTelemetryPoint[] = [];
let activeRecordingId: number | null = null;
let pending: CursorTelemetryBatch[] = [];
return {
startSession(recordingId) {
active = [];
activeRecordingId = recordingId;
},
push(point) {
active.push(point);
if (active.length > maxActive) {
active.shift();
}
},
endSession() {
let dropped = 0;
if (active.length > 0 && activeRecordingId !== null) {
pending.push({ recordingId: activeRecordingId, samples: active });
while (pending.length > maxPending) {
pending.shift();
dropped++;
}
}
active = [];
activeRecordingId = null;
if (dropped > 0) {
console.warn(
`[cursorTelemetryBuffer] dropped ${dropped} pending batch(es) to stay within maxPendingBatches=${maxPending}`,
);
}
return dropped;
},
takeNextBatch() {
return pending.shift() ?? null;
},
prependBatch(batch) {
if (batch.samples.length === 0) return;
pending.unshift(batch);
let dropped = 0;
while (pending.length > maxPending) {
pending.pop();
dropped++;
}
if (dropped > 0) {
console.warn(
`[cursorTelemetryBuffer] prependBatch trimmed ${dropped} trailing batch(es) to stay within maxPendingBatches=${maxPending}`,
);
}
},
discardBatch(recordingId) {
const idx = pending.findIndex((b) => b.recordingId === recordingId);
if (idx === -1) return false;
pending.splice(idx, 1);
return true;
},
reset() {
active = [];
activeRecordingId = null;
pending = [];
},
get activeCount() {
return active.length;
},
get pendingCount() {
return pending.length;
},
};
}