From 3b9b4192bf650fc55fd12dafffdccad1448904e8 Mon Sep 17 00:00:00 2001 From: shaun0927 <70629228+shaun0927@users.noreply.github.com> Date: Tue, 28 Apr 2026 18:27:14 +0900 Subject: [PATCH] fix: key cursor telemetry batches by recordingId for safe discard MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit discardLatestPending() popped whichever batch happened to be at the back of the queue. With a Stop → Record → Discard sequence, the pending queue can have recording B's batch sitting in front of A's by the time A's finalize callback resolves (because finalizeRecording awaits fixWebmDuration), so the discard targets the wrong recording. Tag each completed batch with the recording id supplied at startSession() time and replace discardLatestPending() with discardBatch(recordingId). takeNextBatch() now returns the full {recordingId, samples} shape so prependBatch() can re-queue it on write-failure without losing the id. The renderer already owns a stable recordingId (Date.now() in useScreenRecorder) and the IPC surface threads it through set-recording-state and discard-cursor-telemetry. Adds a regression test that mirrors FabLrc's scenario in PR #457: two recordings finalize, A is discarded after B has already been queued, and the buffer must drop A while keeping B intact. --- electron/electron-env.d.ts | 4 +- electron/ipc/handlers.ts | 25 +++-- electron/preload.ts | 8 +- src/hooks/useScreenRecorder.ts | 4 +- src/lib/cursorTelemetryBuffer.test.ts | 129 +++++++++++++++++--------- src/lib/cursorTelemetryBuffer.ts | 83 +++++++++++------ 6 files changed, 166 insertions(+), 87 deletions(-) diff --git a/electron/electron-env.d.ts b/electron/electron-env.d.ts index ea364a1..08c06c6 100644 --- a/electron/electron-env.d.ts +++ b/electron/electron-env.d.ts @@ -63,8 +63,8 @@ interface Window { message?: string; error?: string; }>; - setRecordingState: (recording: boolean) => Promise; - discardCursorTelemetry: () => Promise; + setRecordingState: (recording: boolean, recordingId?: number) => Promise; + discardCursorTelemetry: (recordingId: number) => Promise; getCursorTelemetry: (videoPath?: string) => Promise<{ success: boolean; samples: CursorTelemetryPoint[]; diff --git a/electron/ipc/handlers.ts b/electron/ipc/handlers.ts index fc55006..7fe6c52 100644 --- a/electron/ipc/handlers.ts +++ b/electron/ipc/handlers.ts @@ -279,16 +279,20 @@ async function storeRecordedSessionFiles(payload: StoreRecordedSessionInput) { currentProjectPath = null; const telemetryPath = `${screenVideoPath}.cursor.json`; - const pendingSamples: CursorTelemetryPoint[] = cursorTelemetryBuffer.takeNextBatch(); - if (pendingSamples.length > 0) { + const pendingBatch = cursorTelemetryBuffer.takeNextBatch(); + if (pendingBatch && pendingBatch.samples.length > 0) { try { await fs.writeFile( telemetryPath, - JSON.stringify({ version: CURSOR_TELEMETRY_VERSION, samples: pendingSamples }, null, 2), + JSON.stringify( + { version: CURSOR_TELEMETRY_VERSION, samples: pendingBatch.samples }, + null, + 2, + ), "utf-8", ); } catch (err) { - cursorTelemetryBuffer.prependBatch(pendingSamples); + cursorTelemetryBuffer.prependBatch(pendingBatch); throw err; } } @@ -531,10 +535,15 @@ export function registerIpcHandlers( } }); - ipcMain.handle("set-recording-state", (_, recording: boolean) => { + ipcMain.handle("set-recording-state", (_, recording: boolean, recordingId?: number) => { if (recording) { stopCursorCapture(); - cursorTelemetryBuffer.startSession(); + // The renderer is the source of truth for the recording id (it + // uses the same id as the saved fileName). Fall back to a + // timestamp only if the renderer didn't supply one, so the + // buffer always has a stable key per session. + const id = typeof recordingId === "number" ? recordingId : Date.now(); + cursorTelemetryBuffer.startSession(id); cursorCaptureStartTimeMs = Date.now(); sampleCursorPoint(); cursorCaptureInterval = setInterval(sampleCursorPoint, CURSOR_SAMPLE_INTERVAL_MS); @@ -549,8 +558,8 @@ export function registerIpcHandlers( } }); - ipcMain.handle("discard-cursor-telemetry", () => { - cursorTelemetryBuffer.discardLatestPending(); + ipcMain.handle("discard-cursor-telemetry", (_, recordingId: number) => { + cursorTelemetryBuffer.discardBatch(recordingId); }); ipcMain.handle("get-cursor-telemetry", async (_, videoPath?: string) => { diff --git a/electron/preload.ts b/electron/preload.ts index 9367122..962e582 100644 --- a/electron/preload.ts +++ b/electron/preload.ts @@ -47,14 +47,14 @@ contextBridge.exposeInMainWorld("electronAPI", { getRecordedVideoPath: () => { return ipcRenderer.invoke("get-recorded-video-path"); }, - setRecordingState: (recording: boolean) => { - return ipcRenderer.invoke("set-recording-state", recording); + setRecordingState: (recording: boolean, recordingId?: number) => { + return ipcRenderer.invoke("set-recording-state", recording, recordingId); }, getCursorTelemetry: (videoPath?: string) => { return ipcRenderer.invoke("get-cursor-telemetry", videoPath); }, - discardCursorTelemetry: () => { - return ipcRenderer.invoke("discard-cursor-telemetry"); + discardCursorTelemetry: (recordingId: number) => { + return ipcRenderer.invoke("discard-cursor-telemetry", recordingId); }, onStopRecordingFromTray: (callback: () => void) => { const listener = () => callback(); diff --git a/src/hooks/useScreenRecorder.ts b/src/hooks/useScreenRecorder.ts index fd8a307..a95b672 100644 --- a/src/hooks/useScreenRecorder.ts +++ b/src/hooks/useScreenRecorder.ts @@ -225,7 +225,7 @@ export function useScreenRecorder(): UseScreenRecorderReturn { try { const screenBlob = await activeScreenRecorder.recordedBlobPromise; if (discardRecordingId.current === activeRecordingId) { - window.electronAPI?.discardCursorTelemetry(); + window.electronAPI?.discardCursorTelemetry(activeRecordingId); return; } if (screenBlob.size === 0) { @@ -554,7 +554,7 @@ export function useScreenRecorder(): UseScreenRecorderReturn { setRecording(true); setPaused(false); setElapsedSeconds(0); - window.electronAPI?.setRecordingState(true); + window.electronAPI?.setRecordingState(true, recordingId.current); const activeScreenRecorder = screenRecorder.current; const activeWebcamRecorder = webcamRecorder.current; diff --git a/src/lib/cursorTelemetryBuffer.test.ts b/src/lib/cursorTelemetryBuffer.test.ts index 567a1eb..17174ac 100644 --- a/src/lib/cursorTelemetryBuffer.test.ts +++ b/src/lib/cursorTelemetryBuffer.test.ts @@ -2,141 +2,182 @@ import { describe, expect, it, vi } from "vitest"; import { type CursorTelemetryPoint, createCursorTelemetryBuffer } from "./cursorTelemetryBuffer"; function sample(tag: number): CursorTelemetryPoint { - return { timeMs: tag, cx: tag / 10, cy: tag / 10 }; + // Decouple the timestamp tag from the coordinate fixture so cursor + // points stay inside the normalized [0, 1] range that real samples use. + const normalized = (tag % 100) / 100; + return { timeMs: tag, cx: normalized, cy: normalized }; } describe("createCursorTelemetryBuffer", () => { it("stores samples captured during an active session", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); - buf.startSession(); + buf.startSession(1); for (let i = 0; i < 3; i++) buf.push(sample(i)); buf.endSession(); const batch = buf.takeNextBatch(); - expect(batch).toHaveLength(3); - expect(batch[0]?.timeMs).toBe(0); + expect(batch?.recordingId).toBe(1); + expect(batch?.samples).toHaveLength(3); + expect(batch?.samples[0]?.timeMs).toBe(0); }); it("trims active samples past maxActiveSamples (ring behaviour)", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 2 }); - buf.startSession(); + buf.startSession(1); buf.push(sample(1)); buf.push(sample(2)); buf.push(sample(3)); buf.endSession(); const batch = buf.takeNextBatch(); - expect(batch).toEqual([sample(2), sample(3)]); + expect(batch?.samples).toEqual([sample(2), sample(3)]); }); it("preserves earlier pending batches when a new session starts before store", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); // Recording 1 - buf.startSession(); + buf.startSession(1); buf.push(sample(101)); buf.push(sample(102)); buf.endSession(); // Recording 2 starts before recording 1's batch has been consumed - buf.startSession(); + buf.startSession(2); buf.push(sample(201)); buf.endSession(); const batch1 = buf.takeNextBatch(); const batch2 = buf.takeNextBatch(); - expect(batch1.map((s) => s.timeMs)).toEqual([101, 102]); - expect(batch2.map((s) => s.timeMs)).toEqual([201]); + expect(batch1?.recordingId).toBe(1); + expect(batch1?.samples.map((s) => s.timeMs)).toEqual([101, 102]); + expect(batch2?.recordingId).toBe(2); + expect(batch2?.samples.map((s) => s.timeMs)).toEqual([201]); }); - it("returns an empty batch when nothing is pending", () => { + it("returns null when nothing is pending", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); - expect(buf.takeNextBatch()).toEqual([]); + expect(buf.takeNextBatch()).toBeNull(); }); it("drops empty sessions instead of queuing empty batches", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); - buf.startSession(); + buf.startSession(1); buf.endSession(); expect(buf.pendingCount).toBe(0); - expect(buf.takeNextBatch()).toEqual([]); + expect(buf.takeNextBatch()).toBeNull(); }); it("caps the pending queue at maxPendingBatches to bound memory", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10, maxPendingBatches: 3 }); for (let round = 1; round <= 5; round++) { - buf.startSession(); + buf.startSession(round); buf.push(sample(round)); buf.endSession(); } expect(buf.pendingCount).toBe(3); // Oldest two batches (rounds 1 and 2) should have been dropped - expect(buf.takeNextBatch().map((s) => s.timeMs)).toEqual([3]); - expect(buf.takeNextBatch().map((s) => s.timeMs)).toEqual([4]); - expect(buf.takeNextBatch().map((s) => s.timeMs)).toEqual([5]); + expect(buf.takeNextBatch()?.recordingId).toBe(3); + expect(buf.takeNextBatch()?.recordingId).toBe(4); + expect(buf.takeNextBatch()?.recordingId).toBe(5); }); it("starting a new session clears in-progress samples but keeps pending batches", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); - buf.startSession(); + buf.startSession(1); buf.push(sample(1)); buf.endSession(); - buf.startSession(); + buf.startSession(2); buf.push(sample(99)); // Simulate another startSession before endSession (e.g. rapid restart) - buf.startSession(); + buf.startSession(3); expect(buf.activeCount).toBe(0); expect(buf.pendingCount).toBe(1); const batch = buf.takeNextBatch(); - expect(batch.map((s) => s.timeMs)).toEqual([1]); + expect(batch?.recordingId).toBe(1); + expect(batch?.samples.map((s) => s.timeMs)).toEqual([1]); }); - it("discardLatestPending() drops the most recently enqueued batch", () => { + it("discardBatch(id) drops only the batch produced by that recording id", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); - buf.startSession(); + buf.startSession(1); buf.push(sample(1)); buf.endSession(); - buf.startSession(); + buf.startSession(2); buf.push(sample(2)); buf.endSession(); expect(buf.pendingCount).toBe(2); - buf.discardLatestPending(); + expect(buf.discardBatch(1)).toBe(true); expect(buf.pendingCount).toBe(1); - expect(buf.takeNextBatch().map((s) => s.timeMs)).toEqual([1]); + expect(buf.takeNextBatch()?.recordingId).toBe(2); }); - it("discardLatestPending() is safe to call on an empty queue", () => { + it("discardBatch(id) targets the correct batch even when a later recording sits in front of it", () => { + // Regression test for the rapid Stop → Record → Discard sequence: + // recording A's finalize callback does async work (fixWebmDuration), + // recording B finishes in the meantime, then A's callback resolves + // with discard intent. The discard must drop A — not B, which + // happens to be the *latest* pending batch by the time discard runs. const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); - buf.discardLatestPending(); + + buf.startSession(1); + buf.push(sample(11)); + buf.endSession(); + + buf.startSession(2); + buf.push(sample(22)); + buf.endSession(); + + expect(buf.pendingCount).toBe(2); + expect(buf.discardBatch(1)).toBe(true); + + const remaining = buf.takeNextBatch(); + expect(remaining?.recordingId).toBe(2); + expect(remaining?.samples.map((s) => s.timeMs)).toEqual([22]); + expect(buf.takeNextBatch()).toBeNull(); + }); + + it("discardBatch(id) is a no-op (returns false) when the id is unknown or already drained", () => { + const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); + expect(buf.discardBatch(42)).toBe(false); + + buf.startSession(1); + buf.push(sample(1)); + buf.endSession(); + buf.takeNextBatch(); + expect(buf.discardBatch(1)).toBe(false); expect(buf.pendingCount).toBe(0); }); it("prependBatch() re-inserts a batch at the front of the queue", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); - buf.startSession(); + buf.startSession(1); buf.push(sample(1)); buf.endSession(); const batch = buf.takeNextBatch(); + expect(batch).not.toBeNull(); expect(buf.pendingCount).toBe(0); - buf.prependBatch(batch); + if (batch) buf.prependBatch(batch); expect(buf.pendingCount).toBe(1); - expect(buf.takeNextBatch().map((s) => s.timeMs)).toEqual([1]); + const next = buf.takeNextBatch(); + expect(next?.recordingId).toBe(1); + expect(next?.samples.map((s) => s.timeMs)).toEqual([1]); }); it("prependBatch() ignores empty batches", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); - buf.prependBatch([]); + buf.prependBatch({ recordingId: 1, samples: [] }); expect(buf.pendingCount).toBe(0); }); @@ -145,13 +186,13 @@ describe("createCursorTelemetryBuffer", () => { const warn = vi.spyOn(console, "warn").mockImplementation(() => undefined); for (let round = 1; round <= 2; round++) { - buf.startSession(); + buf.startSession(round); buf.push(sample(round)); expect(buf.endSession()).toBe(0); } expect(warn).not.toHaveBeenCalled(); - buf.startSession(); + buf.startSession(3); buf.push(sample(3)); const dropped = buf.endSession(); expect(dropped).toBe(1); @@ -168,7 +209,7 @@ describe("createCursorTelemetryBuffer", () => { // Fill the queue to the cap without dropping anything. for (let round = 1; round <= 2; round++) { - buf.startSession(); + buf.startSession(round); buf.push(sample(round)); buf.endSession(); } @@ -177,14 +218,14 @@ describe("createCursorTelemetryBuffer", () => { // Simulate a misuse where a retry prepends without first draining: // queue would grow to 3, so the oldest-trailing entry must be evicted. - buf.prependBatch([sample(99)]); + buf.prependBatch({ recordingId: 99, samples: [sample(99)] }); expect(buf.pendingCount).toBe(2); expect(warn).toHaveBeenCalledTimes(1); expect(warn.mock.calls[0]?.[0]).toMatch(/prependBatch trimmed 1 trailing batch/); // Front is the prepended batch; the preserved trailing batch is round 1. - expect(buf.takeNextBatch().map((s) => s.timeMs)).toEqual([99]); - expect(buf.takeNextBatch().map((s) => s.timeMs)).toEqual([1]); + expect(buf.takeNextBatch()?.recordingId).toBe(99); + expect(buf.takeNextBatch()?.recordingId).toBe(1); expect(buf.pendingCount).toBe(0); warn.mockRestore(); @@ -198,7 +239,7 @@ describe("createCursorTelemetryBuffer", () => { maxPendingBatches: Number.NaN, }); - buf.startSession(); + buf.startSession(1); buf.push(sample(1)); expect(() => buf.endSession()).not.toThrow(); expect(buf.pendingCount).toBe(1); @@ -207,7 +248,7 @@ describe("createCursorTelemetryBuffer", () => { maxActiveSamples: -5, maxPendingBatches: 0, }); - buf2.startSession(); + buf2.startSession(2); buf2.push(sample(2)); expect(() => buf2.endSession()).not.toThrow(); expect(buf2.pendingCount).toBe(1); @@ -215,16 +256,16 @@ describe("createCursorTelemetryBuffer", () => { it("reset() clears both active and pending state", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); - buf.startSession(); + buf.startSession(1); buf.push(sample(1)); buf.endSession(); - buf.startSession(); + buf.startSession(2); buf.push(sample(2)); buf.reset(); expect(buf.activeCount).toBe(0); expect(buf.pendingCount).toBe(0); - expect(buf.takeNextBatch()).toEqual([]); + expect(buf.takeNextBatch()).toBeNull(); }); }); diff --git a/src/lib/cursorTelemetryBuffer.ts b/src/lib/cursorTelemetryBuffer.ts index e97bab8..0c7e0e1 100644 --- a/src/lib/cursorTelemetryBuffer.ts +++ b/src/lib/cursorTelemetryBuffer.ts @@ -12,25 +12,39 @@ export interface CursorTelemetryPoint { cy: number; } +/** + * A completed batch of cursor samples, tagged with the recording id that + * produced them. The id is supplied at `startSession()` time and travels + * with the batch through the pending queue, retries, and discards. + */ +export interface CursorTelemetryBatch { + recordingId: number; + samples: CursorTelemetryPoint[]; +} + /** * Per-session cursor telemetry buffer with bounded memory. * - * Flow: `startSession()` → `push(point)` N times → `endSession()` enqueues - * the collected samples as a completed batch. The main process later - * drains batches in FIFO order via `takeNextBatch()` to persist them to - * disk, and can `prependBatch()` on write failure to retry without losing - * order. + * Flow: `startSession(recordingId)` → `push(point)` N times → `endSession()` + * enqueues the collected samples as a completed batch tagged with that + * `recordingId`. The main process later drains batches in FIFO order via + * `takeNextBatch()` to persist them to disk, and can `prependBatch()` on + * write failure to retry without losing order. A discard request keys on + * the recording id so an asynchronous "discard recording A" decision that + * arrives after recording B has already enqueued its batch still drops + * the right one. * * Memory is bounded by `maxActiveSamples` (ring buffer on the in-progress * batch) and `maxPendingBatches` (FIFO cap across completed batches). */ export interface CursorTelemetryBuffer { /** - * Begin a new recording session. Clears any in-progress active samples - * (without touching already-completed pending batches). Safe to call - * repeatedly — e.g. a rapid Stop → Record sequence. + * Begin a new recording session under the given `recordingId`. Clears + * any in-progress active samples (without touching already-completed + * pending batches). Safe to call repeatedly — e.g. a rapid Stop → + * Record sequence — and the most recent id wins. */ - startSession(): void; + startSession(recordingId: number): void; /** * Append a telemetry sample to the current active session. When the @@ -41,8 +55,8 @@ export interface CursorTelemetryBuffer { /** * Finalize the active session, moving its samples into the pending - * queue as a single batch. Empty sessions are dropped (no empty batch - * is enqueued). + * queue as a single batch tagged with the current recording id. Empty + * sessions are dropped (no empty batch is enqueued). * * If the pending queue would exceed `maxPendingBatches`, the oldest * batches are evicted to bound memory. A `console.warn` is emitted @@ -55,10 +69,10 @@ export interface CursorTelemetryBuffer { endSession(): number; /** - * Remove and return the oldest pending batch, or an empty array if - * the queue is empty. + * Remove and return the oldest pending batch, or `null` if the queue + * is empty. */ - takeNextBatch(): CursorTelemetryPoint[]; + takeNextBatch(): CursorTelemetryBatch | null; /** * Re-insert a batch at the front of the queue, preserving FIFO order @@ -71,14 +85,22 @@ export interface CursorTelemetryBuffer { * normal retry usage this trim is a no-op because the caller has just * removed the batch via `takeNextBatch()`. */ - prependBatch(batch: CursorTelemetryPoint[]): void; + prependBatch(batch: CursorTelemetryBatch): void; /** - * Drop the most recently enqueued pending batch. Used when a recording - * is discarded after `endSession()` but before it has been persisted. - * No-op on an empty queue. + * Drop the pending batch produced by the given `recordingId`. Used + * when a recording is discarded after its `endSession()` has run but + * before it has been persisted. Returns `true` if a batch was + * removed, `false` otherwise (no matching id, or the batch was + * already drained). + * + * Keying on the recording id (rather than "the latest pending batch") + * avoids a real bug: when finalizing a recording does asynchronous + * work like `fixWebmDuration`, a quick Stop → Record → Discard + * sequence can interleave such that the latest pending batch belongs + * to a *later* recording than the one being discarded. */ - discardLatestPending(): void; + discardBatch(recordingId: number): boolean; /** * Clear both the active and pending state. Intended for tests and @@ -121,11 +143,13 @@ export function createCursorTelemetryBuffer( const maxPending = sanitizeLimit(options.maxPendingBatches, DEFAULT_MAX_PENDING_BATCHES); let active: CursorTelemetryPoint[] = []; - let pending: CursorTelemetryPoint[][] = []; + let activeRecordingId: number | null = null; + let pending: CursorTelemetryBatch[] = []; return { - startSession() { + startSession(recordingId) { active = []; + activeRecordingId = recordingId; }, push(point) { active.push(point); @@ -135,14 +159,15 @@ export function createCursorTelemetryBuffer( }, endSession() { let dropped = 0; - if (active.length > 0) { - pending.push(active); + if (active.length > 0 && activeRecordingId !== null) { + pending.push({ recordingId: activeRecordingId, samples: active }); while (pending.length > maxPending) { pending.shift(); dropped++; } } active = []; + activeRecordingId = null; if (dropped > 0) { console.warn( `[cursorTelemetryBuffer] dropped ${dropped} pending batch(es) to stay within maxPendingBatches=${maxPending}`, @@ -151,10 +176,10 @@ export function createCursorTelemetryBuffer( return dropped; }, takeNextBatch() { - return pending.shift() ?? []; + return pending.shift() ?? null; }, prependBatch(batch) { - if (batch.length === 0) return; + if (batch.samples.length === 0) return; pending.unshift(batch); let dropped = 0; while (pending.length > maxPending) { @@ -167,11 +192,15 @@ export function createCursorTelemetryBuffer( ); } }, - discardLatestPending() { - pending.pop(); + discardBatch(recordingId) { + const idx = pending.findIndex((b) => b.recordingId === recordingId); + if (idx === -1) return false; + pending.splice(idx, 1); + return true; }, reset() { active = []; + activeRecordingId = null; pending = []; }, get activeCount() {