fix: key cursor telemetry batches by recordingId for safe discard

discardLatestPending() popped whichever batch happened to be at the
back of the queue. With a Stop → Record → Discard sequence, the
pending queue can have recording B's batch sitting in front of A's by
the time A's finalize callback resolves (because finalizeRecording
awaits fixWebmDuration), so the discard targets the wrong recording.

Tag each completed batch with the recording id supplied at
startSession() time and replace discardLatestPending() with
discardBatch(recordingId). takeNextBatch() now returns the full
{recordingId, samples} shape so prependBatch() can re-queue it on
write-failure without losing the id. The renderer already owns a
stable recordingId (Date.now() in useScreenRecorder) and the IPC
surface threads it through set-recording-state and
discard-cursor-telemetry.

Adds a regression test that mirrors FabLrc's scenario in PR #457:
two recordings finalize, A is discarded after B has already been
queued, and the buffer must drop A while keeping B intact.
This commit is contained in:
shaun0927
2026-04-28 18:27:14 +09:00
parent 96765e483d
commit 3b9b4192bf
6 changed files with 166 additions and 87 deletions
+56 -27
View File
@@ -12,25 +12,39 @@ export interface CursorTelemetryPoint {
cy: number;
}
/**
* A completed batch of cursor samples, tagged with the recording id that
* produced them. The id is supplied at `startSession()` time and travels
* with the batch through the pending queue, retries, and discards.
*/
export interface CursorTelemetryBatch {
recordingId: number;
samples: CursorTelemetryPoint[];
}
/**
* Per-session cursor telemetry buffer with bounded memory.
*
* Flow: `startSession()` → `push(point)` N times → `endSession()` enqueues
* the collected samples as a completed batch. The main process later
* drains batches in FIFO order via `takeNextBatch()` to persist them to
* disk, and can `prependBatch()` on write failure to retry without losing
* order.
* Flow: `startSession(recordingId)` → `push(point)` N times → `endSession()`
* enqueues the collected samples as a completed batch tagged with that
* `recordingId`. The main process later drains batches in FIFO order via
* `takeNextBatch()` to persist them to disk, and can `prependBatch()` on
* write failure to retry without losing order. A discard request keys on
* the recording id so an asynchronous "discard recording A" decision that
* arrives after recording B has already enqueued its batch still drops
* the right one.
*
* Memory is bounded by `maxActiveSamples` (ring buffer on the in-progress
* batch) and `maxPendingBatches` (FIFO cap across completed batches).
*/
export interface CursorTelemetryBuffer {
/**
* Begin a new recording session. Clears any in-progress active samples
* (without touching already-completed pending batches). Safe to call
* repeatedly — e.g. a rapid Stop → Record sequence.
* Begin a new recording session under the given `recordingId`. Clears
* any in-progress active samples (without touching already-completed
* pending batches). Safe to call repeatedly — e.g. a rapid Stop →
* Record sequence — and the most recent id wins.
*/
startSession(): void;
startSession(recordingId: number): void;
/**
* Append a telemetry sample to the current active session. When the
@@ -41,8 +55,8 @@ export interface CursorTelemetryBuffer {
/**
* Finalize the active session, moving its samples into the pending
* queue as a single batch. Empty sessions are dropped (no empty batch
* is enqueued).
* queue as a single batch tagged with the current recording id. Empty
* sessions are dropped (no empty batch is enqueued).
*
* If the pending queue would exceed `maxPendingBatches`, the oldest
* batches are evicted to bound memory. A `console.warn` is emitted
@@ -55,10 +69,10 @@ export interface CursorTelemetryBuffer {
endSession(): number;
/**
* Remove and return the oldest pending batch, or an empty array if
* the queue is empty.
* Remove and return the oldest pending batch, or `null` if the queue
* is empty.
*/
takeNextBatch(): CursorTelemetryPoint[];
takeNextBatch(): CursorTelemetryBatch | null;
/**
* Re-insert a batch at the front of the queue, preserving FIFO order
@@ -71,14 +85,22 @@ export interface CursorTelemetryBuffer {
* normal retry usage this trim is a no-op because the caller has just
* removed the batch via `takeNextBatch()`.
*/
prependBatch(batch: CursorTelemetryPoint[]): void;
prependBatch(batch: CursorTelemetryBatch): void;
/**
* Drop the most recently enqueued pending batch. Used when a recording
* is discarded after `endSession()` but before it has been persisted.
* No-op on an empty queue.
* Drop the pending batch produced by the given `recordingId`. Used
* when a recording is discarded after its `endSession()` has run but
* before it has been persisted. Returns `true` if a batch was
* removed, `false` otherwise (no matching id, or the batch was
* already drained).
*
* Keying on the recording id (rather than "the latest pending batch")
* avoids a real bug: when finalizing a recording does asynchronous
* work like `fixWebmDuration`, a quick Stop → Record → Discard
* sequence can interleave such that the latest pending batch belongs
* to a *later* recording than the one being discarded.
*/
discardLatestPending(): void;
discardBatch(recordingId: number): boolean;
/**
* Clear both the active and pending state. Intended for tests and
@@ -121,11 +143,13 @@ export function createCursorTelemetryBuffer(
const maxPending = sanitizeLimit(options.maxPendingBatches, DEFAULT_MAX_PENDING_BATCHES);
let active: CursorTelemetryPoint[] = [];
let pending: CursorTelemetryPoint[][] = [];
let activeRecordingId: number | null = null;
let pending: CursorTelemetryBatch[] = [];
return {
startSession() {
startSession(recordingId) {
active = [];
activeRecordingId = recordingId;
},
push(point) {
active.push(point);
@@ -135,14 +159,15 @@ export function createCursorTelemetryBuffer(
},
endSession() {
let dropped = 0;
if (active.length > 0) {
pending.push(active);
if (active.length > 0 && activeRecordingId !== null) {
pending.push({ recordingId: activeRecordingId, samples: active });
while (pending.length > maxPending) {
pending.shift();
dropped++;
}
}
active = [];
activeRecordingId = null;
if (dropped > 0) {
console.warn(
`[cursorTelemetryBuffer] dropped ${dropped} pending batch(es) to stay within maxPendingBatches=${maxPending}`,
@@ -151,10 +176,10 @@ export function createCursorTelemetryBuffer(
return dropped;
},
takeNextBatch() {
return pending.shift() ?? [];
return pending.shift() ?? null;
},
prependBatch(batch) {
if (batch.length === 0) return;
if (batch.samples.length === 0) return;
pending.unshift(batch);
let dropped = 0;
while (pending.length > maxPending) {
@@ -167,11 +192,15 @@ export function createCursorTelemetryBuffer(
);
}
},
discardLatestPending() {
pending.pop();
discardBatch(recordingId) {
const idx = pending.findIndex((b) => b.recordingId === recordingId);
if (idx === -1) return false;
pending.splice(idx, 1);
return true;
},
reset() {
active = [];
activeRecordingId = null;
pending = [];
},
get activeCount() {