diff --git a/electron/electron-env.d.ts b/electron/electron-env.d.ts index ea364a1..08c06c6 100644 --- a/electron/electron-env.d.ts +++ b/electron/electron-env.d.ts @@ -63,8 +63,8 @@ interface Window { message?: string; error?: string; }>; - setRecordingState: (recording: boolean) => Promise; - discardCursorTelemetry: () => Promise; + setRecordingState: (recording: boolean, recordingId?: number) => Promise; + discardCursorTelemetry: (recordingId: number) => Promise; getCursorTelemetry: (videoPath?: string) => Promise<{ success: boolean; samples: CursorTelemetryPoint[]; diff --git a/electron/ipc/handlers.ts b/electron/ipc/handlers.ts index fc55006..7fe6c52 100644 --- a/electron/ipc/handlers.ts +++ b/electron/ipc/handlers.ts @@ -279,16 +279,20 @@ async function storeRecordedSessionFiles(payload: StoreRecordedSessionInput) { currentProjectPath = null; const telemetryPath = `${screenVideoPath}.cursor.json`; - const pendingSamples: CursorTelemetryPoint[] = cursorTelemetryBuffer.takeNextBatch(); - if (pendingSamples.length > 0) { + const pendingBatch = cursorTelemetryBuffer.takeNextBatch(); + if (pendingBatch && pendingBatch.samples.length > 0) { try { await fs.writeFile( telemetryPath, - JSON.stringify({ version: CURSOR_TELEMETRY_VERSION, samples: pendingSamples }, null, 2), + JSON.stringify( + { version: CURSOR_TELEMETRY_VERSION, samples: pendingBatch.samples }, + null, + 2, + ), "utf-8", ); } catch (err) { - cursorTelemetryBuffer.prependBatch(pendingSamples); + cursorTelemetryBuffer.prependBatch(pendingBatch); throw err; } } @@ -531,10 +535,15 @@ export function registerIpcHandlers( } }); - ipcMain.handle("set-recording-state", (_, recording: boolean) => { + ipcMain.handle("set-recording-state", (_, recording: boolean, recordingId?: number) => { if (recording) { stopCursorCapture(); - cursorTelemetryBuffer.startSession(); + // The renderer is the source of truth for the recording id (it + // uses the same id as the saved fileName). Fall back to a + // timestamp only if the renderer didn't supply one, so the + // buffer always has a stable key per session. + const id = typeof recordingId === "number" ? recordingId : Date.now(); + cursorTelemetryBuffer.startSession(id); cursorCaptureStartTimeMs = Date.now(); sampleCursorPoint(); cursorCaptureInterval = setInterval(sampleCursorPoint, CURSOR_SAMPLE_INTERVAL_MS); @@ -549,8 +558,8 @@ export function registerIpcHandlers( } }); - ipcMain.handle("discard-cursor-telemetry", () => { - cursorTelemetryBuffer.discardLatestPending(); + ipcMain.handle("discard-cursor-telemetry", (_, recordingId: number) => { + cursorTelemetryBuffer.discardBatch(recordingId); }); ipcMain.handle("get-cursor-telemetry", async (_, videoPath?: string) => { diff --git a/electron/preload.ts b/electron/preload.ts index 9367122..962e582 100644 --- a/electron/preload.ts +++ b/electron/preload.ts @@ -47,14 +47,14 @@ contextBridge.exposeInMainWorld("electronAPI", { getRecordedVideoPath: () => { return ipcRenderer.invoke("get-recorded-video-path"); }, - setRecordingState: (recording: boolean) => { - return ipcRenderer.invoke("set-recording-state", recording); + setRecordingState: (recording: boolean, recordingId?: number) => { + return ipcRenderer.invoke("set-recording-state", recording, recordingId); }, getCursorTelemetry: (videoPath?: string) => { return ipcRenderer.invoke("get-cursor-telemetry", videoPath); }, - discardCursorTelemetry: () => { - return ipcRenderer.invoke("discard-cursor-telemetry"); + discardCursorTelemetry: (recordingId: number) => { + return ipcRenderer.invoke("discard-cursor-telemetry", recordingId); }, onStopRecordingFromTray: (callback: () => void) => { const listener = () => callback(); diff --git a/src/hooks/useScreenRecorder.ts b/src/hooks/useScreenRecorder.ts index fd8a307..a95b672 100644 --- a/src/hooks/useScreenRecorder.ts +++ b/src/hooks/useScreenRecorder.ts @@ -225,7 +225,7 @@ export function useScreenRecorder(): UseScreenRecorderReturn { try { const screenBlob = await activeScreenRecorder.recordedBlobPromise; if (discardRecordingId.current === activeRecordingId) { - window.electronAPI?.discardCursorTelemetry(); + window.electronAPI?.discardCursorTelemetry(activeRecordingId); return; } if (screenBlob.size === 0) { @@ -554,7 +554,7 @@ export function useScreenRecorder(): UseScreenRecorderReturn { setRecording(true); setPaused(false); setElapsedSeconds(0); - window.electronAPI?.setRecordingState(true); + window.electronAPI?.setRecordingState(true, recordingId.current); const activeScreenRecorder = screenRecorder.current; const activeWebcamRecorder = webcamRecorder.current; diff --git a/src/lib/cursorTelemetryBuffer.test.ts b/src/lib/cursorTelemetryBuffer.test.ts index 567a1eb..17174ac 100644 --- a/src/lib/cursorTelemetryBuffer.test.ts +++ b/src/lib/cursorTelemetryBuffer.test.ts @@ -2,141 +2,182 @@ import { describe, expect, it, vi } from "vitest"; import { type CursorTelemetryPoint, createCursorTelemetryBuffer } from "./cursorTelemetryBuffer"; function sample(tag: number): CursorTelemetryPoint { - return { timeMs: tag, cx: tag / 10, cy: tag / 10 }; + // Decouple the timestamp tag from the coordinate fixture so cursor + // points stay inside the normalized [0, 1] range that real samples use. + const normalized = (tag % 100) / 100; + return { timeMs: tag, cx: normalized, cy: normalized }; } describe("createCursorTelemetryBuffer", () => { it("stores samples captured during an active session", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); - buf.startSession(); + buf.startSession(1); for (let i = 0; i < 3; i++) buf.push(sample(i)); buf.endSession(); const batch = buf.takeNextBatch(); - expect(batch).toHaveLength(3); - expect(batch[0]?.timeMs).toBe(0); + expect(batch?.recordingId).toBe(1); + expect(batch?.samples).toHaveLength(3); + expect(batch?.samples[0]?.timeMs).toBe(0); }); it("trims active samples past maxActiveSamples (ring behaviour)", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 2 }); - buf.startSession(); + buf.startSession(1); buf.push(sample(1)); buf.push(sample(2)); buf.push(sample(3)); buf.endSession(); const batch = buf.takeNextBatch(); - expect(batch).toEqual([sample(2), sample(3)]); + expect(batch?.samples).toEqual([sample(2), sample(3)]); }); it("preserves earlier pending batches when a new session starts before store", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); // Recording 1 - buf.startSession(); + buf.startSession(1); buf.push(sample(101)); buf.push(sample(102)); buf.endSession(); // Recording 2 starts before recording 1's batch has been consumed - buf.startSession(); + buf.startSession(2); buf.push(sample(201)); buf.endSession(); const batch1 = buf.takeNextBatch(); const batch2 = buf.takeNextBatch(); - expect(batch1.map((s) => s.timeMs)).toEqual([101, 102]); - expect(batch2.map((s) => s.timeMs)).toEqual([201]); + expect(batch1?.recordingId).toBe(1); + expect(batch1?.samples.map((s) => s.timeMs)).toEqual([101, 102]); + expect(batch2?.recordingId).toBe(2); + expect(batch2?.samples.map((s) => s.timeMs)).toEqual([201]); }); - it("returns an empty batch when nothing is pending", () => { + it("returns null when nothing is pending", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); - expect(buf.takeNextBatch()).toEqual([]); + expect(buf.takeNextBatch()).toBeNull(); }); it("drops empty sessions instead of queuing empty batches", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); - buf.startSession(); + buf.startSession(1); buf.endSession(); expect(buf.pendingCount).toBe(0); - expect(buf.takeNextBatch()).toEqual([]); + expect(buf.takeNextBatch()).toBeNull(); }); it("caps the pending queue at maxPendingBatches to bound memory", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10, maxPendingBatches: 3 }); for (let round = 1; round <= 5; round++) { - buf.startSession(); + buf.startSession(round); buf.push(sample(round)); buf.endSession(); } expect(buf.pendingCount).toBe(3); // Oldest two batches (rounds 1 and 2) should have been dropped - expect(buf.takeNextBatch().map((s) => s.timeMs)).toEqual([3]); - expect(buf.takeNextBatch().map((s) => s.timeMs)).toEqual([4]); - expect(buf.takeNextBatch().map((s) => s.timeMs)).toEqual([5]); + expect(buf.takeNextBatch()?.recordingId).toBe(3); + expect(buf.takeNextBatch()?.recordingId).toBe(4); + expect(buf.takeNextBatch()?.recordingId).toBe(5); }); it("starting a new session clears in-progress samples but keeps pending batches", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); - buf.startSession(); + buf.startSession(1); buf.push(sample(1)); buf.endSession(); - buf.startSession(); + buf.startSession(2); buf.push(sample(99)); // Simulate another startSession before endSession (e.g. rapid restart) - buf.startSession(); + buf.startSession(3); expect(buf.activeCount).toBe(0); expect(buf.pendingCount).toBe(1); const batch = buf.takeNextBatch(); - expect(batch.map((s) => s.timeMs)).toEqual([1]); + expect(batch?.recordingId).toBe(1); + expect(batch?.samples.map((s) => s.timeMs)).toEqual([1]); }); - it("discardLatestPending() drops the most recently enqueued batch", () => { + it("discardBatch(id) drops only the batch produced by that recording id", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); - buf.startSession(); + buf.startSession(1); buf.push(sample(1)); buf.endSession(); - buf.startSession(); + buf.startSession(2); buf.push(sample(2)); buf.endSession(); expect(buf.pendingCount).toBe(2); - buf.discardLatestPending(); + expect(buf.discardBatch(1)).toBe(true); expect(buf.pendingCount).toBe(1); - expect(buf.takeNextBatch().map((s) => s.timeMs)).toEqual([1]); + expect(buf.takeNextBatch()?.recordingId).toBe(2); }); - it("discardLatestPending() is safe to call on an empty queue", () => { + it("discardBatch(id) targets the correct batch even when a later recording sits in front of it", () => { + // Regression test for the rapid Stop → Record → Discard sequence: + // recording A's finalize callback does async work (fixWebmDuration), + // recording B finishes in the meantime, then A's callback resolves + // with discard intent. The discard must drop A — not B, which + // happens to be the *latest* pending batch by the time discard runs. const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); - buf.discardLatestPending(); + + buf.startSession(1); + buf.push(sample(11)); + buf.endSession(); + + buf.startSession(2); + buf.push(sample(22)); + buf.endSession(); + + expect(buf.pendingCount).toBe(2); + expect(buf.discardBatch(1)).toBe(true); + + const remaining = buf.takeNextBatch(); + expect(remaining?.recordingId).toBe(2); + expect(remaining?.samples.map((s) => s.timeMs)).toEqual([22]); + expect(buf.takeNextBatch()).toBeNull(); + }); + + it("discardBatch(id) is a no-op (returns false) when the id is unknown or already drained", () => { + const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); + expect(buf.discardBatch(42)).toBe(false); + + buf.startSession(1); + buf.push(sample(1)); + buf.endSession(); + buf.takeNextBatch(); + expect(buf.discardBatch(1)).toBe(false); expect(buf.pendingCount).toBe(0); }); it("prependBatch() re-inserts a batch at the front of the queue", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); - buf.startSession(); + buf.startSession(1); buf.push(sample(1)); buf.endSession(); const batch = buf.takeNextBatch(); + expect(batch).not.toBeNull(); expect(buf.pendingCount).toBe(0); - buf.prependBatch(batch); + if (batch) buf.prependBatch(batch); expect(buf.pendingCount).toBe(1); - expect(buf.takeNextBatch().map((s) => s.timeMs)).toEqual([1]); + const next = buf.takeNextBatch(); + expect(next?.recordingId).toBe(1); + expect(next?.samples.map((s) => s.timeMs)).toEqual([1]); }); it("prependBatch() ignores empty batches", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); - buf.prependBatch([]); + buf.prependBatch({ recordingId: 1, samples: [] }); expect(buf.pendingCount).toBe(0); }); @@ -145,13 +186,13 @@ describe("createCursorTelemetryBuffer", () => { const warn = vi.spyOn(console, "warn").mockImplementation(() => undefined); for (let round = 1; round <= 2; round++) { - buf.startSession(); + buf.startSession(round); buf.push(sample(round)); expect(buf.endSession()).toBe(0); } expect(warn).not.toHaveBeenCalled(); - buf.startSession(); + buf.startSession(3); buf.push(sample(3)); const dropped = buf.endSession(); expect(dropped).toBe(1); @@ -168,7 +209,7 @@ describe("createCursorTelemetryBuffer", () => { // Fill the queue to the cap without dropping anything. for (let round = 1; round <= 2; round++) { - buf.startSession(); + buf.startSession(round); buf.push(sample(round)); buf.endSession(); } @@ -177,14 +218,14 @@ describe("createCursorTelemetryBuffer", () => { // Simulate a misuse where a retry prepends without first draining: // queue would grow to 3, so the oldest-trailing entry must be evicted. - buf.prependBatch([sample(99)]); + buf.prependBatch({ recordingId: 99, samples: [sample(99)] }); expect(buf.pendingCount).toBe(2); expect(warn).toHaveBeenCalledTimes(1); expect(warn.mock.calls[0]?.[0]).toMatch(/prependBatch trimmed 1 trailing batch/); // Front is the prepended batch; the preserved trailing batch is round 1. - expect(buf.takeNextBatch().map((s) => s.timeMs)).toEqual([99]); - expect(buf.takeNextBatch().map((s) => s.timeMs)).toEqual([1]); + expect(buf.takeNextBatch()?.recordingId).toBe(99); + expect(buf.takeNextBatch()?.recordingId).toBe(1); expect(buf.pendingCount).toBe(0); warn.mockRestore(); @@ -198,7 +239,7 @@ describe("createCursorTelemetryBuffer", () => { maxPendingBatches: Number.NaN, }); - buf.startSession(); + buf.startSession(1); buf.push(sample(1)); expect(() => buf.endSession()).not.toThrow(); expect(buf.pendingCount).toBe(1); @@ -207,7 +248,7 @@ describe("createCursorTelemetryBuffer", () => { maxActiveSamples: -5, maxPendingBatches: 0, }); - buf2.startSession(); + buf2.startSession(2); buf2.push(sample(2)); expect(() => buf2.endSession()).not.toThrow(); expect(buf2.pendingCount).toBe(1); @@ -215,16 +256,16 @@ describe("createCursorTelemetryBuffer", () => { it("reset() clears both active and pending state", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); - buf.startSession(); + buf.startSession(1); buf.push(sample(1)); buf.endSession(); - buf.startSession(); + buf.startSession(2); buf.push(sample(2)); buf.reset(); expect(buf.activeCount).toBe(0); expect(buf.pendingCount).toBe(0); - expect(buf.takeNextBatch()).toEqual([]); + expect(buf.takeNextBatch()).toBeNull(); }); }); diff --git a/src/lib/cursorTelemetryBuffer.ts b/src/lib/cursorTelemetryBuffer.ts index e97bab8..0c7e0e1 100644 --- a/src/lib/cursorTelemetryBuffer.ts +++ b/src/lib/cursorTelemetryBuffer.ts @@ -12,25 +12,39 @@ export interface CursorTelemetryPoint { cy: number; } +/** + * A completed batch of cursor samples, tagged with the recording id that + * produced them. The id is supplied at `startSession()` time and travels + * with the batch through the pending queue, retries, and discards. + */ +export interface CursorTelemetryBatch { + recordingId: number; + samples: CursorTelemetryPoint[]; +} + /** * Per-session cursor telemetry buffer with bounded memory. * - * Flow: `startSession()` → `push(point)` N times → `endSession()` enqueues - * the collected samples as a completed batch. The main process later - * drains batches in FIFO order via `takeNextBatch()` to persist them to - * disk, and can `prependBatch()` on write failure to retry without losing - * order. + * Flow: `startSession(recordingId)` → `push(point)` N times → `endSession()` + * enqueues the collected samples as a completed batch tagged with that + * `recordingId`. The main process later drains batches in FIFO order via + * `takeNextBatch()` to persist them to disk, and can `prependBatch()` on + * write failure to retry without losing order. A discard request keys on + * the recording id so an asynchronous "discard recording A" decision that + * arrives after recording B has already enqueued its batch still drops + * the right one. * * Memory is bounded by `maxActiveSamples` (ring buffer on the in-progress * batch) and `maxPendingBatches` (FIFO cap across completed batches). */ export interface CursorTelemetryBuffer { /** - * Begin a new recording session. Clears any in-progress active samples - * (without touching already-completed pending batches). Safe to call - * repeatedly — e.g. a rapid Stop → Record sequence. + * Begin a new recording session under the given `recordingId`. Clears + * any in-progress active samples (without touching already-completed + * pending batches). Safe to call repeatedly — e.g. a rapid Stop → + * Record sequence — and the most recent id wins. */ - startSession(): void; + startSession(recordingId: number): void; /** * Append a telemetry sample to the current active session. When the @@ -41,8 +55,8 @@ export interface CursorTelemetryBuffer { /** * Finalize the active session, moving its samples into the pending - * queue as a single batch. Empty sessions are dropped (no empty batch - * is enqueued). + * queue as a single batch tagged with the current recording id. Empty + * sessions are dropped (no empty batch is enqueued). * * If the pending queue would exceed `maxPendingBatches`, the oldest * batches are evicted to bound memory. A `console.warn` is emitted @@ -55,10 +69,10 @@ export interface CursorTelemetryBuffer { endSession(): number; /** - * Remove and return the oldest pending batch, or an empty array if - * the queue is empty. + * Remove and return the oldest pending batch, or `null` if the queue + * is empty. */ - takeNextBatch(): CursorTelemetryPoint[]; + takeNextBatch(): CursorTelemetryBatch | null; /** * Re-insert a batch at the front of the queue, preserving FIFO order @@ -71,14 +85,22 @@ export interface CursorTelemetryBuffer { * normal retry usage this trim is a no-op because the caller has just * removed the batch via `takeNextBatch()`. */ - prependBatch(batch: CursorTelemetryPoint[]): void; + prependBatch(batch: CursorTelemetryBatch): void; /** - * Drop the most recently enqueued pending batch. Used when a recording - * is discarded after `endSession()` but before it has been persisted. - * No-op on an empty queue. + * Drop the pending batch produced by the given `recordingId`. Used + * when a recording is discarded after its `endSession()` has run but + * before it has been persisted. Returns `true` if a batch was + * removed, `false` otherwise (no matching id, or the batch was + * already drained). + * + * Keying on the recording id (rather than "the latest pending batch") + * avoids a real bug: when finalizing a recording does asynchronous + * work like `fixWebmDuration`, a quick Stop → Record → Discard + * sequence can interleave such that the latest pending batch belongs + * to a *later* recording than the one being discarded. */ - discardLatestPending(): void; + discardBatch(recordingId: number): boolean; /** * Clear both the active and pending state. Intended for tests and @@ -121,11 +143,13 @@ export function createCursorTelemetryBuffer( const maxPending = sanitizeLimit(options.maxPendingBatches, DEFAULT_MAX_PENDING_BATCHES); let active: CursorTelemetryPoint[] = []; - let pending: CursorTelemetryPoint[][] = []; + let activeRecordingId: number | null = null; + let pending: CursorTelemetryBatch[] = []; return { - startSession() { + startSession(recordingId) { active = []; + activeRecordingId = recordingId; }, push(point) { active.push(point); @@ -135,14 +159,15 @@ export function createCursorTelemetryBuffer( }, endSession() { let dropped = 0; - if (active.length > 0) { - pending.push(active); + if (active.length > 0 && activeRecordingId !== null) { + pending.push({ recordingId: activeRecordingId, samples: active }); while (pending.length > maxPending) { pending.shift(); dropped++; } } active = []; + activeRecordingId = null; if (dropped > 0) { console.warn( `[cursorTelemetryBuffer] dropped ${dropped} pending batch(es) to stay within maxPendingBatches=${maxPending}`, @@ -151,10 +176,10 @@ export function createCursorTelemetryBuffer( return dropped; }, takeNextBatch() { - return pending.shift() ?? []; + return pending.shift() ?? null; }, prependBatch(batch) { - if (batch.length === 0) return; + if (batch.samples.length === 0) return; pending.unshift(batch); let dropped = 0; while (pending.length > maxPending) { @@ -167,11 +192,15 @@ export function createCursorTelemetryBuffer( ); } }, - discardLatestPending() { - pending.pop(); + discardBatch(recordingId) { + const idx = pending.findIndex((b) => b.recordingId === recordingId); + if (idx === -1) return false; + pending.splice(idx, 1); + return true; }, reset() { active = []; + activeRecordingId = null; pending = []; }, get activeCount() {