diff --git a/packages/utils/docs/profiler.md b/packages/utils/docs/profiler.md new file mode 100644 index 000000000..a3740b875 --- /dev/null +++ b/packages/utils/docs/profiler.md @@ -0,0 +1,360 @@ +# User Timing Profiler + +⏱️ **High-performance profiling utility for structured timing measurements with Chrome DevTools Extensibility API payloads.** 📊 + +--- + +The `Profiler` class provides a clean, type-safe API for performance monitoring that integrates seamlessly with Chrome DevTools. It supports both synchronous and asynchronous operations with smart defaults for custom track visualization, enabling developers to track performance bottlenecks and optimize application speed. + +## Getting started + +1. If you haven't already, install [@code-pushup/utils](../../README.md). + +2. Install as a dependency with your package manager: + + ```sh + npm install @code-pushup/utils + ``` + + ```sh + yarn add @code-pushup/utils + ``` + + ```sh + pnpm add @code-pushup/utils + ``` + +3. Import and create a profiler instance: + + ```ts + import { Profiler } from '@code-pushup/utils'; + + const profiler = new Profiler({ + prefix: 'cp', + track: 'CLI', + trackGroup: 'Code Pushup', + color: 'primary-dark', + tracks: { + utils: { track: 'Utils', color: 'primary' }, + core: { track: 'Core', color: 'primary-light' }, + }, + enabled: true, + }); + ``` + +4. Start measuring performance: + + ```ts + // Measure synchronous operations + const result = profiler.measure('data-processing', () => { + return processData(data); + }); + + // Measure asynchronous operations + const asyncResult = await profiler.measureAsync('api-call', async () => { + return await fetch('/api/data').then(r => r.json()); + }); + ``` + +## Configuration + +```ts +new Profiler(options: ProfilerOptions) +``` + +**Parameters:** + +- `options` - Configuration options for the profiler instance + +**Options:** + +| Property | Type | Default | Description | +| ------------ | --------- | ----------- | --------------------------------------------------------------- | +| `tracks` | `object` | `undefined` | Custom track configurations merged with defaults | +| `prefix` | `string` | `undefined` | Prefix for all measurement names | +| `track` | `string` | `undefined` | Default track name for measurements | +| `trackGroup` | `string` | `undefined` | Default track group for organization | +| `color` | `string` | `undefined` | Default color for track entries | +| `enabled` | `boolean` | `env var` | Whether profiling is enabled (defaults to CP_PROFILING env var) | + +### Environment Variables + +- `CP_PROFILING` - Enables or disables profiling globally (boolean) + +```bash +# Enable profiling in development +CP_PROFILING=true npm run dev + +# Disable profiling in production +CP_PROFILING=false npm run build +``` + +## API Methods + +The profiler provides several methods for different types of performance measurements: + +| Method | Description | +| ------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------- | +| `measure(event: string, work: () => R, options?: MeasureOptions): R` | Measures synchronous operation execution time with DevTools payloads. Noop when profiling is disabled. | +| `measureAsync(event: string, work: () => Promise, options?: MeasureOptions): Promise` | Measures asynchronous operation execution time with DevTools payloads. Noop when profiling is disabled. | +| `marker(name: string, opt?: MarkerOptions): void` | Creates performance markers as vertical lines in DevTools timeline. Noop when profiling is disabled. | +| `setEnabled(enabled: boolean): void` | Controls profiling at runtime. | +| `isEnabled(): boolean` | Returns whether profiling is currently enabled. | + +### Synchronous measurements + +```ts +profiler.measure(event: string, work: () => R, options?: MeasureOptions): R +``` + +Measures the execution time of a synchronous operation. Creates performance start/end marks and a final measure with Chrome DevTools Extensibility API payloads. + +```ts +const result = profiler.measure( + 'file-processing', + () => { + return fs.readFileSync('large-file.txt', 'utf8'); + }, + { + track: 'io-operations', + color: 'warning', + }, +); +``` + +### Asynchronous measurements + +```ts +profiler.measureAsync(event: string, work: () => Promise, options?: MeasureOptions): Promise +``` + +Measures the execution time of an asynchronous operation. + +```ts +const data = await profiler.measureAsync( + 'api-request', + async () => { + const response = await fetch('/api/data'); + return response.json(); + }, + { + track: 'network', + trackGroup: 'external', + }, +); +``` + +### Performance markers + +```ts +profiler.marker(name: string, options?: EntryMeta & { color?: DevToolsColor }): void +``` + +Creates a performance mark with Chrome DevTools marker visualization. Markers appear as vertical lines spanning all tracks and can include custom metadata. + +```ts +profiler.marker('user-action', { + color: 'secondary', + tooltipText: 'User clicked save button', + properties: [ + ['action', 'save'], + ['elementId', 'save-btn'], + ], +}); +``` + +### Runtime control + +```ts +profiler.setEnabled(enabled: boolean): void +profiler.isEnabled(): boolean +``` + +Control profiling at runtime and check current status. + +```ts +// Disable profiling temporarily +profiler.setEnabled(false); + +// Check if profiling is active +if (profiler.isEnabled()) { + console.log('Performance monitoring is active'); +} +``` + +## Examples + +### Basic usage + +```ts +import { Profiler } from '@code-pushup/utils'; + +const profiler = new Profiler({ + prefix: 'cp', + track: 'CLI', + trackGroup: 'Code Pushup', + color: 'primary-dark', + tracks: { + utils: { track: 'Utils', color: 'primary' }, + core: { track: 'Core', color: 'primary-light' }, + }, + enabled: true, +}); + +// Simple measurement +const result = profiler.measure('data-transform', () => { + return transformData(input); +}); + +// Async measurement with custom options +const data = await profiler.measureAsync( + 'fetch-user', + async () => { + return await api.getUser(userId); + }, + { + track: 'api', + color: 'info', + }, +); + +// Add a marker for important events +profiler.marker('user-login', { + tooltipText: 'User authentication completed', +}); +``` + +### Custom tracks + +Define custom track configurations for better organization: + +```ts +interface AppTracks { + api: ActionTrackEntryPayload; + db: ActionTrackEntryPayload; + cache: ActionTrackEntryPayload; +} + +const profiler = new Profiler({ + track: 'API', + trackGroup: 'Server', + color: 'primary-dark', + tracks: { + api: { color: 'primary' }, + db: { track: 'database', color: 'warning' }, + cache: { track: 'cache', color: 'success' }, + }, +}); + +// Use predefined tracks +const users = await profiler.measureAsync('fetch-users', fetchUsers, profiler.tracks.api); + +const saved = profiler.measure('save-user', () => saveToDb(user), { + ...profiler.tracks.db, + color: 'primary', +}); +``` + +## NodeJSProfiler + +This profiler extends all options and API from Profiler with automatic process exit handling for buffered performance data. + +The NodeJSProfiler automatically subscribes to performance observation and installs exit handlers that flush buffered data on process termination (signals, fatal errors, or normal exit). + +## Configuration + +```ts +new NodejsProfiler(options: NodejsProfilerOptions) +``` + +**Parameters:** + +- `options` - Configuration options for the profiler instance + +**Options:** + +| Property | Type | Default | Description | +| ------------------------ | --------------------------------------- | ---------- | ------------------------------------------------------------------------------- | +| `encodePerfEntry` | `PerformanceEntryEncoder` | _required_ | Function that encodes raw PerformanceEntry objects into domain-specific types | +| `captureBufferedEntries` | `boolean` | `true` | Whether to capture performance entries that occurred before observation started | +| `flushThreshold` | `number` | `20` | Threshold for triggering queue flushes based on queue length | +| `maxQueueSize` | `number` | `10_000` | Maximum number of items allowed in the queue before new entries are dropped | + +## API Methods + +The NodeJSProfiler inherits all API methods from the base Profiler class and adds additional methods for queue management and WAL lifecycle control. + +| Method | Description | +| ------------------------------------ | ------------------------------------------------------------------------------- | +| `getStats()` | Returns comprehensive queue statistics for monitoring and debugging. | +| `flush()` | Forces immediate writing of all queued performance entries to the WAL. | +| `setEnabled(enabled: boolean): void` | Controls profiling at runtime with automatic WAL/observer lifecycle management. | + +### Runtime control with Write Ahead Log lifecycle management + +```ts +profiler.setEnabled(enabled: boolean): void +``` + +Controls profiling at runtime and manages the WAL/observer lifecycle. Unlike the base Profiler class, this method ensures that when profiling is enabled, the WAL is opened and the performance observer is subscribed. When disabled, the WAL is closed and the observer is unsubscribed. + +```ts +// Temporarily disable profiling to reduce overhead during heavy operations +profiler.setEnabled(false); +await performHeavyOperation(); +profiler.setEnabled(true); // WAL reopens and observer resubscribes +``` + +### Queue statistics + +```ts +profiler.getStats(): { + enabled: boolean; + observing: boolean; + walOpen: boolean; + isSubscribed: boolean; + queued: number; + dropped: number; + written: number; + maxQueueSize: number; + flushThreshold: number; + addedSinceLastFlush: number; + buffered: boolean; +} +``` + +Returns comprehensive queue statistics for monitoring and debugging. Provides insight into the current state of the performance entry queue, useful for monitoring memory usage and processing throughput. + +```ts +const stats = profiler.getStats(); +console.log(`Enabled: ${stats.enabled}, WAL Open: ${stats.walOpen}, Observing: ${stats.observing}, Subscribed: ${stats.isSubscribed}, Queued: ${stats.queued}`); +if (stats.enabled && stats.walOpen && stats.observing && stats.isSubscribed && stats.queued > stats.flushThreshold) { + console.log('Queue nearing capacity, consider manual flush'); +} +``` + +### Manual flushing + +```ts +profiler.flush(): void +``` + +Forces immediate writing of all queued performance entries to the write ahead log, ensuring no performance data is lost. This method is useful for manual control over when buffered data is written, complementing the automatic flushing that occurs during process exit or when thresholds are reached. + +```ts +// Flush periodically in long-running applications to prevent memory buildup +setInterval(() => { + profiler.flush(); +}, 60000); // Flush every minute + +// Ensure all measurements are saved before critical operations +await profiler.measureAsync('database-migration', async () => { + await runMigration(); + profiler.flush(); // Ensure migration timing is recorded immediately +}); +``` + +## Resources + +- **[Chrome DevTools Extensibility API](https://developer.chrome.com/docs/devtools/performance/extension)** - Official documentation for performance profiling +- **[User Timing API](https://developer.mozilla.org/en-US/docs/Web/API/User_Timing_API)** - Web Performance API reference diff --git a/packages/utils/mocks/sink.mock.ts b/packages/utils/mocks/sink.mock.ts index 13d89e91c..7f435ab4f 100644 --- a/packages/utils/mocks/sink.mock.ts +++ b/packages/utils/mocks/sink.mock.ts @@ -1,30 +1,55 @@ -import type { Sink } from '../src/lib/sink-source.type'; +import { vi } from 'vitest'; +import type { + RecoverResult, + Recoverable, + Sink, +} from '../src/lib/sink-source.type'; export class MockSink implements Sink { private writtenItems: string[] = []; - private closed = false; + private closed = true; - open(): void { + open = vi.fn((): void => { this.closed = false; - } + }); - write(input: string): void { + write = vi.fn((input: string): void => { this.writtenItems.push(input); - } + }); - close(): void { + close = vi.fn((): void => { this.closed = true; - } + }); - isClosed(): boolean { + isClosed = vi.fn((): boolean => { return this.closed; - } + }); - encode(input: string): string { + encode = vi.fn((input: string): string => { return `${input}-${this.constructor.name}-encoded`; - } + }); - getWrittenItems(): string[] { + getWrittenItems = vi.fn((): string[] => { return [...this.writtenItems]; - } + }); +} + +export class MockTraceEventFileSink extends MockSink implements Recoverable { + recover = vi.fn( + (): { + records: unknown[]; + errors: { lineNo: number; line: string; error: Error }[]; + partialTail: string | null; + } => { + return { + records: this.getWrittenItems(), + errors: [], + partialTail: null, + } satisfies RecoverResult; + }, + ); + + repack = vi.fn((): void => {}); + + finalize = vi.fn((): void => {}); } diff --git a/packages/utils/src/lib/exit-process.ts b/packages/utils/src/lib/exit-process.ts index 62cee4977..e2e3f89f8 100644 --- a/packages/utils/src/lib/exit-process.ts +++ b/packages/utils/src/lib/exit-process.ts @@ -44,8 +44,8 @@ export function installExitHandlers(options: ExitHandlerOptions = {}): void { const { onExit, onError, - exitOnFatal, - exitOnSignal, + exitOnFatal = false, + exitOnSignal = false, fatalExitCode = DEFAULT_FATAL_EXIT_CODE, } = options; diff --git a/packages/utils/src/lib/performance-observer.int.test.ts b/packages/utils/src/lib/performance-observer.int.test.ts index 2c1721ebb..d97e0fd47 100644 --- a/packages/utils/src/lib/performance-observer.int.test.ts +++ b/packages/utils/src/lib/performance-observer.int.test.ts @@ -23,13 +23,14 @@ describe('PerformanceObserverSink', () => { beforeEach(() => { sink = new MockSink(); + sink.open(); encode = vi.fn((entry: PerformanceEntry) => [ `${entry.name}:${entry.entryType}`, ]); options = { sink, - encode, + encodePerfEntry: encode, }; performance.clearMarks(); @@ -40,23 +41,22 @@ describe('PerformanceObserverSink', () => { expect(() => new PerformanceObserverSink(options)).not.toThrow(); }); - it('internal PerformanceObserver should process observed entries', () => { + it('internal PerformanceObserver should process observed entries', async () => { const observer = new PerformanceObserverSink(options); observer.subscribe(); performance.mark('test-mark'); performance.measure('test-measure'); + await awaitObserverCallback(); observer.flush(); expect(encode).toHaveBeenCalledTimes(2); - expect(encode).toHaveBeenNthCalledWith( - 1, + expect(encode).toHaveBeenCalledWith( expect.objectContaining({ name: 'test-mark', entryType: 'mark', }), ); - expect(encode).toHaveBeenNthCalledWith( - 2, + expect(encode).toHaveBeenCalledWith( expect.objectContaining({ name: 'test-measure', entryType: 'measure', @@ -80,7 +80,7 @@ describe('PerformanceObserverSink', () => { expect(encode).toHaveBeenCalledTimes(3); }); - it('flush flushes observed entries when subscribed', () => { + it('flush flushes observed entries when subscribed', async () => { const observer = new PerformanceObserverSink(options); observer.subscribe(); @@ -88,6 +88,7 @@ describe('PerformanceObserverSink', () => { performance.mark('test-mark2'); expect(sink.getWrittenItems()).toStrictEqual([]); + await awaitObserverCallback(); observer.flush(); expect(sink.getWrittenItems()).toStrictEqual([ 'test-mark1:mark', @@ -95,13 +96,14 @@ describe('PerformanceObserverSink', () => { ]); }); - it('flush calls encode for each entry', () => { + it('flush calls encode for each entry', async () => { const observer = new PerformanceObserverSink(options); observer.subscribe(); performance.mark('test-mark1'); performance.mark('test-mark2'); + await awaitObserverCallback(); observer.flush(); expect(encode).toHaveBeenCalledWith( @@ -137,91 +139,43 @@ describe('PerformanceObserverSink', () => { expect(encode).toHaveBeenCalledTimes(2); }); - it('should observe performance entries and write them to the sink on flush', () => { + it('should observe performance entries and write them to the sink on flush', async () => { const observer = new PerformanceObserverSink(options); observer.subscribe(); performance.mark('test-mark'); + await awaitObserverCallback(); observer.flush(); expect(sink.getWrittenItems()).toHaveLength(1); }); - it('should observe buffered performance entries when buffered is enabled', async () => { - const observer = new PerformanceObserverSink({ - ...options, - buffered: true, - }); + it('should observe performance entries when subscribed', async () => { + const observer = new PerformanceObserverSink(options); + observer.subscribe(); performance.mark('test-mark-1'); performance.mark('test-mark-2'); - await new Promise(resolve => setTimeout(resolve, 10)); - observer.subscribe(); - await new Promise(resolve => setTimeout(resolve, 10)); - expect(performance.getEntries()).toHaveLength(2); + await awaitObserverCallback(); observer.flush(); expect(sink.getWrittenItems()).toHaveLength(2); }); - it('handles multiple encoded items per performance entry', () => { + it('handles multiple encoded items per performance entry', async () => { const multiEncodeFn = vi.fn(e => [ `${e.entryType}-item1`, `${e.entryType}item2`, ]); const observer = new PerformanceObserverSink({ ...options, - encode: multiEncodeFn, + encodePerfEntry: multiEncodeFn, }); observer.subscribe(); performance.mark('test-mark'); + await awaitObserverCallback(); observer.flush(); expect(sink.getWrittenItems()).toHaveLength(2); }); - - it('cursor logic prevents duplicate processing of performance entries', () => { - const observer = new PerformanceObserverSink(options); - observer.subscribe(); - - performance.mark('first-mark'); - performance.mark('second-mark'); - expect(encode).not.toHaveBeenCalled(); - observer.flush(); - expect(sink.getWrittenItems()).toStrictEqual([ - 'first-mark:mark', - 'second-mark:mark', - ]); - - expect(encode).toHaveBeenCalledTimes(2); - expect(encode).toHaveBeenNthCalledWith( - 1, - expect.objectContaining({ name: 'first-mark' }), - ); - expect(encode).toHaveBeenNthCalledWith( - 2, - expect.objectContaining({ name: 'second-mark' }), - ); - - performance.mark('third-mark'); - performance.measure('first-measure'); - - observer.flush(); - expect(sink.getWrittenItems()).toStrictEqual([ - 'first-mark:mark', - 'second-mark:mark', - 'third-mark:mark', - 'first-measure:measure', - ]); - - expect(encode).toHaveBeenCalledTimes(4); - expect(encode).toHaveBeenNthCalledWith( - 3, - expect.objectContaining({ name: 'third-mark' }), - ); - expect(encode).toHaveBeenNthCalledWith( - 4, - expect.objectContaining({ name: 'first-measure' }), - ); - }); }); diff --git a/packages/utils/src/lib/performance-observer.ts b/packages/utils/src/lib/performance-observer.ts index fa5720427..60a549ac8 100644 --- a/packages/utils/src/lib/performance-observer.ts +++ b/packages/utils/src/lib/performance-observer.ts @@ -1,111 +1,317 @@ -import { - type PerformanceEntry, - PerformanceObserver, - type PerformanceObserverEntryList, - performance, -} from 'node:perf_hooks'; -import type { Buffered, Encoder, Observer, Sink } from './sink-source.type'; +import { type PerformanceEntry, PerformanceObserver } from 'node:perf_hooks'; +import type { Buffered, Observer, Sink } from './sink-source.type'; +/** + * Encoder that converts PerformanceEntry to domain events. + * + * Pure function that transforms performance entries into domain events. + * Should be stateless, synchronous, and have no side effects. + * Returns a readonly array of encoded items. + */ +export type PerformanceEntryEncoder = ( + entry: PerformanceEntry, +) => readonly F[]; + +/** + * Array of performance entry types that this observer monitors. + * Only 'mark' and 'measure' entries are tracked as they represent + * user-defined performance markers and measurements. + */ const OBSERVED_TYPES = ['mark', 'measure'] as const; type ObservedEntryType = 'mark' | 'measure'; +const OBSERVED_TYPE_SET = new Set(OBSERVED_TYPES); + +/** + * Default threshold for triggering queue flushes based on queue length. + * When the queue length reaches (maxQueueSize - flushThreshold), + * a flush is triggered to prevent overflow. This provides a buffer zone + * before hitting the maximum queue capacity. + */ export const DEFAULT_FLUSH_THRESHOLD = 20; +/** + * Default maximum number of items allowed in the queue before entries are dropped. + * This acts as a memory safety limit to prevent unbounded memory growth + * in case of sink slowdown or high-frequency performance entries. + */ +export const DEFAULT_MAX_QUEUE_SIZE = 10_000; + +/** + * Validates the flush threshold configuration to ensure sensible bounds. + * + * The flush threshold must be positive and cannot exceed the maximum queue size, + * as it represents a buffer zone within the queue capacity. + * + * @param flushThreshold - The threshold value to validate (must be > 0) + * @param maxQueueSize - The maximum queue size for comparison (flushThreshold <= maxQueueSize) + * @throws {Error} If flushThreshold is not positive or exceeds maxQueueSize + */ +export function validateFlushThreshold( + flushThreshold: number, + maxQueueSize: number, +): void { + if (flushThreshold <= 0) { + throw new Error('flushThreshold must be > 0'); + } + if (flushThreshold > maxQueueSize) { + throw new Error('flushThreshold must be <= maxQueueSize'); + } +} + +/** + * Configuration options for the PerformanceObserverSink. + * + * @template T - The type of encoded performance data that will be written to the sink + */ export type PerformanceObserverOptions = { + /** + * The sink where encoded performance entries will be written. + * Must implement the Sink interface for handling the encoded data. + */ sink: Sink; - encode: (entry: PerformanceEntry) => T[]; - buffered?: boolean; + + /** + * Function that encodes raw PerformanceEntry objects into domain-specific types. + * This transformer converts Node.js performance entries into application-specific data structures. + * Returns a readonly array of encoded items. + */ + encodePerfEntry: PerformanceEntryEncoder; + + /** + * Whether to enable buffered observation mode. + * When true, captures all performance entries that occurred before observation started. + * When false, only captures entries after subscription begins. + * + * @default true + */ + captureBufferedEntries?: boolean; + + /** + * Threshold for triggering queue flushes based on queue length. + * Flushes occur when queue length reaches (maxQueueSize - flushThreshold). + * Larger values provide more buffer space before hitting capacity limits. + * + * @default DEFAULT_FLUSH_THRESHOLD (20) + */ flushThreshold?: number; + + /** + * Maximum number of items allowed in the queue before new entries are dropped. + * Acts as a memory safety limit to prevent unbounded growth during sink slowdown. + * + * @default DEFAULT_MAX_QUEUE_SIZE (10000) + */ + maxQueueSize?: number; }; -export class PerformanceObserverSink - implements Observer, Buffered, Encoder -{ - #encode: (entry: PerformanceEntry) => T[]; +/** + * A sink implementation that observes Node.js performance entries and forwards them to a configurable sink. + * + * This class provides a buffered, memory-safe bridge between Node.js PerformanceObserver + * and application-specific data sinks. It handles performance entry encoding, queue management, + * and graceful degradation under high load conditions. + * + * @template T - The type of encoded performance data written to the sink + * @implements {Observer} - Lifecycle management interface + * @implements {Buffered} - Queue statistics interface + */ +export class PerformanceObserverSink implements Observer, Buffered { + /** Encoder function for transforming PerformanceEntry objects into domain types */ + #encodePerfEntry: PerformanceEntryEncoder; + + /** Whether buffered observation mode is enabled */ #buffered: boolean; + + /** Threshold for triggering flushes based on queue length proximity to max capacity */ #flushThreshold: number; + + /** Maximum number of items allowed in queue before dropping new entries (hard memory limit) */ + #maxQueueSize: number; + + /** The target sink where encoded performance data is written */ #sink: Sink; + + /** Node.js PerformanceObserver instance, undefined when not subscribed */ #observer: PerformanceObserver | undefined; - #pendingCount = 0; + /** Bounded queue storing encoded performance items awaiting flush */ + #queue: T[] = []; - // "cursor" per type: how many we already wrote from the global buffer - #written: Map; + /** Count of performance entries dropped due to queue overflow */ + #dropped = 0; + /** Count of performance entries successfully written to sink */ + #written = 0; + + /** Number of items added to queue since last successful flush */ + #addedSinceLastFlush = 0; + + /** + * Creates a new PerformanceObserverSink with the specified configuration. + * + * @param options - Configuration options for the performance observer sink + * @throws {Error} If flushThreshold validation fails (must be > 0 and <= maxQueueSize) + */ constructor(options: PerformanceObserverOptions) { - const { encode, sink, buffered, flushThreshold } = options; - this.#encode = encode; - this.#written = new Map( - OBSERVED_TYPES.map(t => [t, 0]), - ); + const { + encodePerfEntry, + sink, + captureBufferedEntries, + flushThreshold = DEFAULT_FLUSH_THRESHOLD, + maxQueueSize = DEFAULT_MAX_QUEUE_SIZE, + } = options; + this.#encodePerfEntry = encodePerfEntry; this.#sink = sink; - this.#buffered = buffered ?? false; - this.#flushThreshold = flushThreshold ?? DEFAULT_FLUSH_THRESHOLD; + this.#buffered = captureBufferedEntries ?? true; + this.#maxQueueSize = maxQueueSize; + validateFlushThreshold(flushThreshold, this.#maxQueueSize); + this.#flushThreshold = flushThreshold; + } + + /** + * Returns current queue statistics for monitoring and debugging. + * + * Provides insight into the current state of the performance entry queue, + * useful for monitoring memory usage and processing throughput. + * + * @returns Object containing all states and entry counts + */ + getStats() { + return { + isSubscribed: this.isSubscribed(), + queued: this.#queue.length, + dropped: this.#dropped, + written: this.#written, + maxQueueSize: this.#maxQueueSize, + flushThreshold: this.#flushThreshold, + addedSinceLastFlush: this.#addedSinceLastFlush, + buffered: this.#buffered, + }; } - encode(entry: PerformanceEntry): T[] { - return this.#encode(entry); + /** + * Encodes a raw PerformanceEntry using the configured encoder function. + * + * This method delegates to the user-provided encoder function, allowing + * transformation of Node.js performance entries into application-specific types. + * + * @param entry - The raw performance entry to encode + * @returns Readonly array of encoded items + */ + encode(entry: PerformanceEntry): readonly T[] { + return this.#encodePerfEntry(entry); } + /** + * Starts observing performance entries and forwarding them to the sink. + * + * Creates a Node.js PerformanceObserver that monitors 'mark' and 'measure' entries. + * The observer uses a bounded queue with proactive flushing to manage memory usage. + * When buffered mode is enabled, any existing buffered entries are immediately flushed. + * + * @throws {Error} If the sink is closed before subscription + * + */ subscribe(): void { if (this.#observer) { return; } - // Only used to trigger the flush - it's not processing the entries, just counting them - this.#observer = new PerformanceObserver( - (list: PerformanceObserverEntryList) => { - const batchCount = OBSERVED_TYPES.reduce( - (n, t) => n + list.getEntriesByType(t).length, - 0, - ); - - this.#pendingCount += batchCount; - if (this.#pendingCount >= this.#flushThreshold) { - this.flush(); + this.#observer = new PerformanceObserver(list => { + list.getEntries().forEach(entry => { + if (OBSERVED_TYPE_SET.has(entry.entryType as ObservedEntryType)) { + const items = this.encode(entry); + items.forEach(item => { + if (this.#queue.length >= this.#maxQueueSize) { + this.#dropped++; + return; + } + + if ( + this.#queue.length >= + this.#maxQueueSize - this.#flushThreshold + ) { + this.flush(); + } + this.#queue.push(item); + this.#addedSinceLastFlush++; + }); } - }, - ); + }); + + if (this.#addedSinceLastFlush >= this.#flushThreshold) { + this.flush(); + } + }); this.#observer.observe({ entryTypes: OBSERVED_TYPES, buffered: this.#buffered, }); + + if (this.#buffered) { + this.flush(); + } } + /** + * Flushes all queued performance entries to the sink. + * + * Writes all currently queued encoded performance entries to the configured sink. + * If the sink is closed during flush, the queue is cleared without writing. + * The queue is always cleared after flush attempt, regardless of success or failure. + * + * @throws {Error} If sink write operations fail (with original error as cause) + */ flush(): void { - if (!this.#observer) { + if (this.#queue.length === 0) { return; } - OBSERVED_TYPES.forEach(t => { - const written = this.#written.get(t) ?? 0; - const fresh = performance.getEntriesByType(t).slice(written); - - try { - fresh - .flatMap(entry => this.encode(entry)) - .forEach(item => this.#sink.write(item)); - - this.#written.set(t, written + fresh.length); - } catch (error) { - throw new Error( - 'PerformanceObserverSink failed to write items to sink.', - { cause: error }, - ); - } - }); - - this.#pendingCount = 0; + try { + this.#queue.forEach(item => { + this.#sink.write(item); + this.#written++; + }); + } catch (error) { + this.#dropped += this.#queue.length; + throw new Error( + 'PerformanceObserverSink failed to write items to sink.', + { cause: error }, + ); + } finally { + this.#queue.length = 0; + this.#addedSinceLastFlush = 0; + } } + /** + * Stops observing performance entries and cleans up resources. + * + * Performs a final flush of any remaining queued entries, then disconnects + * the PerformanceObserver and releases all references. + * + * This method is idempotent - safe to call multiple times. + */ unsubscribe(): void { if (!this.#observer) { return; } - this.#observer?.disconnect(); + this.flush(); + this.#queue.length = 0; + this.#addedSinceLastFlush = 0; + this.#observer.disconnect(); this.#observer = undefined; } + /** + * Checks whether the performance observer is currently active. + * + * Returns true if the sink is subscribed and actively observing performance entries. + * This indicates that a PerformanceObserver instance exists and is connected. + * + * @returns true if currently subscribed and observing, false otherwise + */ isSubscribed(): boolean { return this.#observer !== undefined; } diff --git a/packages/utils/src/lib/performance-observer.unit.test.ts b/packages/utils/src/lib/performance-observer.unit.test.ts index a73be955a..df91c381f 100644 --- a/packages/utils/src/lib/performance-observer.unit.test.ts +++ b/packages/utils/src/lib/performance-observer.unit.test.ts @@ -10,25 +10,71 @@ import { import { MockPerformanceObserver } from '@code-pushup/test-utils'; import { MockSink } from '../../mocks/sink.mock'; import { + DEFAULT_FLUSH_THRESHOLD, + DEFAULT_MAX_QUEUE_SIZE, type PerformanceObserverOptions, PerformanceObserverSink, + validateFlushThreshold, } from './performance-observer.js'; +describe('validateFlushThreshold', () => { + it.each([ + { flushThreshold: 1, description: 'minimum valid value (1)' }, + { flushThreshold: 10, description: 'arbitrary valid value (10)' }, + { + flushThreshold: DEFAULT_FLUSH_THRESHOLD, + description: 'default flush threshold', + }, + { + flushThreshold: DEFAULT_MAX_QUEUE_SIZE, + description: 'maximum valid value (equals maxQueueSize)', + }, + ])( + 'accepts valid flushThreshold value: $description', + ({ flushThreshold }) => { + expect(() => + validateFlushThreshold(flushThreshold, DEFAULT_MAX_QUEUE_SIZE), + ).not.toThrow(); + }, + ); + + it.each([ + { flushThreshold: 0, expectedError: 'flushThreshold must be > 0' }, + { flushThreshold: -1, expectedError: 'flushThreshold must be > 0' }, + { flushThreshold: -10, expectedError: 'flushThreshold must be > 0' }, + { + flushThreshold: DEFAULT_MAX_QUEUE_SIZE + 1, + expectedError: 'flushThreshold must be <= maxQueueSize', + }, + { + flushThreshold: 20_000, + expectedError: 'flushThreshold must be <= maxQueueSize', + }, + ])( + 'throws error when flushThreshold is invalid: $flushThreshold', + ({ flushThreshold, expectedError }) => { + expect(() => + validateFlushThreshold(flushThreshold, DEFAULT_MAX_QUEUE_SIZE), + ).toThrow(expectedError); + }, + ); +}); + describe('PerformanceObserverSink', () => { - let encode: MockedFunction<(entry: PerformanceEntry) => string[]>; + let encodePerfEntry: MockedFunction<(entry: PerformanceEntry) => string[]>; let sink: MockSink; let options: PerformanceObserverOptions; beforeEach(() => { vi.clearAllMocks(); sink = new MockSink(); - encode = vi.fn((entry: PerformanceEntry) => [ + sink.open(); + encodePerfEntry = vi.fn((entry: PerformanceEntry) => [ `${entry.name}:${entry.entryType}`, ]); options = { sink, - encode, - // we test buffered behavior separately + encodePerfEntry, flushThreshold: 1, }; @@ -46,33 +92,10 @@ describe('PerformanceObserverSink', () => { () => new PerformanceObserverSink({ sink, - encode, + encodePerfEntry, }), ).not.toThrow(); expect(MockPerformanceObserver.instances).toHaveLength(0); - // Instance creation covers the default flushThreshold assignment - }); - - it('automatically flushes when pendingCount reaches flushThreshold', () => { - const observer = new PerformanceObserverSink({ - sink, - encode, - flushThreshold: 2, // Set threshold to 2 - }); - observer.subscribe(); - - const mockObserver = MockPerformanceObserver.lastInstance(); - - // Emit 1 entry - should not trigger flush yet (pendingCount = 1 < 2) - mockObserver?.emitMark('first-mark'); - expect(sink.getWrittenItems()).toStrictEqual([]); - - // Emit 1 more entry - should trigger flush (pendingCount = 2 >= 2) - mockObserver?.emitMark('second-mark'); - expect(sink.getWrittenItems()).toStrictEqual([ - 'first-mark:mark', - 'second-mark:mark', - ]); }); it('creates instance with all options without starting to observe', () => { @@ -80,14 +103,34 @@ describe('PerformanceObserverSink', () => { () => new PerformanceObserverSink({ ...options, - buffered: true, + captureBufferedEntries: true, flushThreshold: 10, }), ).not.toThrow(); expect(MockPerformanceObserver.instances).toHaveLength(0); }); - it('subscribe is isomorphic and calls observe on internal PerformanceObserver', () => { + it.each([ + { flushThreshold: 0, expectedError: 'flushThreshold must be > 0' }, + { flushThreshold: -1, expectedError: 'flushThreshold must be > 0' }, + { + flushThreshold: 10_001, + expectedError: 'flushThreshold must be <= maxQueueSize', + }, + ])( + 'throws error when flushThreshold is invalid: $flushThreshold', + ({ flushThreshold, expectedError }) => { + expect( + () => + new PerformanceObserverSink({ + ...options, + flushThreshold, + }), + ).toThrow(expectedError); + }, + ); + + it('subscribe is idempotent and calls observe on internal PerformanceObserver', () => { const observer = new PerformanceObserverSink(options); observer.subscribe(); @@ -110,7 +153,7 @@ describe('PerformanceObserverSink', () => { ); }); - it('internal PerformanceObserver should observe unbuffered by default', () => { + it('internal PerformanceObserver should observe buffered by default', () => { const observer = new PerformanceObserverSink(options); observer.subscribe(); @@ -118,7 +161,7 @@ describe('PerformanceObserverSink', () => { MockPerformanceObserver.lastInstance()?.observe, ).toHaveBeenCalledWith( expect.objectContaining({ - buffered: false, + buffered: true, }), ); }); @@ -126,7 +169,7 @@ describe('PerformanceObserverSink', () => { it('internal PerformanceObserver should observe buffered if buffered option is provided', () => { const observer = new PerformanceObserverSink({ ...options, - buffered: true, + captureBufferedEntries: true, }); observer.subscribe(); @@ -142,22 +185,35 @@ describe('PerformanceObserverSink', () => { it('internal PerformanceObserver should process observed entries', () => { const observer = new PerformanceObserverSink({ ...options, - flushThreshold: 20, // Disable automatic flushing for this test + flushThreshold: 20, }); observer.subscribe(); - performance.mark('test-mark'); - performance.measure('test-measure'); + const mockObserver = MockPerformanceObserver.lastInstance(); + mockObserver?.emit([ + { + name: 'test-mark', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + { + name: 'test-measure', + entryType: 'measure', + startTime: 0, + duration: 100, + }, + ]); observer.flush(); - expect(encode).toHaveBeenCalledTimes(2); - expect(encode).toHaveBeenNthCalledWith( + expect(encodePerfEntry).toHaveBeenCalledTimes(2); + expect(encodePerfEntry).toHaveBeenNthCalledWith( 1, expect.objectContaining({ name: 'test-mark', entryType: 'mark', }), ); - expect(encode).toHaveBeenNthCalledWith( + expect(encodePerfEntry).toHaveBeenNthCalledWith( 2, expect.objectContaining({ name: 'test-measure', @@ -171,7 +227,7 @@ describe('PerformanceObserverSink', () => { observer.subscribe(); MockPerformanceObserver.lastInstance()?.emitNavigation('test-navigation'); - expect(encode).not.toHaveBeenCalled(); + expect(encodePerfEntry).not.toHaveBeenCalled(); }); it('isSubscribed returns false when not observing', () => { @@ -196,12 +252,28 @@ describe('PerformanceObserverSink', () => { expect(observer.isSubscribed()).toBe(false); }); - it('flush flushes observed entries when subscribed', () => { - const observer = new PerformanceObserverSink(options); + it('flush writes queued entries to sink when subscribed', () => { + const observer = new PerformanceObserverSink({ + ...options, + flushThreshold: 10, + }); observer.subscribe(); - performance.mark('test-mark1'); - performance.mark('test-mark2'); + const mockObserver = MockPerformanceObserver.lastInstance(); + mockObserver?.emit([ + { + name: 'test-mark1', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + { + name: 'test-mark2', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + ]); expect(sink.getWrittenItems()).toStrictEqual([]); observer.flush(); @@ -211,39 +283,80 @@ describe('PerformanceObserverSink', () => { ]); }); - it('flush calls encode for each entry', () => { + it('flush does not flush observed entries when not subscribed', () => { + const observer = new PerformanceObserverSink(options); + + performance.mark('test-mark'); + observer.flush(); + expect(encodePerfEntry).not.toHaveBeenCalled(); + expect(sink.getWrittenItems()).toStrictEqual([]); + }); + + it('flush calls encodePerfEntry for each entry', () => { const observer = new PerformanceObserverSink(options); observer.subscribe(); - performance.mark('test-mark1'); - performance.mark('test-mark2'); + const mockObserver = MockPerformanceObserver.lastInstance(); + mockObserver?.emit([ + { + name: 'test-mark1', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + { + name: 'test-mark2', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + ]); observer.flush(); - expect(encode).toHaveBeenCalledWith({ - name: 'test-mark1', - entryType: 'mark', - startTime: 0, - duration: 0, - }); - expect(encode).toHaveBeenCalledWith({ - name: 'test-mark2', - entryType: 'mark', - startTime: 0, - duration: 0, + expect(encodePerfEntry).toHaveBeenCalledWith( + expect.objectContaining({ + name: 'test-mark1', + entryType: 'mark', + }), + ); + expect(encodePerfEntry).toHaveBeenCalledWith( + expect.objectContaining({ + name: 'test-mark2', + entryType: 'mark', + }), + ); + }); + + it('flush is idempotent and safe when queue is empty', () => { + const observer = new PerformanceObserverSink({ + sink, + encodePerfEntry, }); + + expect(() => observer.flush()).not.toThrow(); + expect(() => observer.flush()).not.toThrow(); + expect(sink.getWrittenItems()).toStrictEqual([]); }); - it('flush does not flush observed entries when not subscribed', () => { - const observer = new PerformanceObserverSink(options); + it('flush is safe when sink is closed', () => { + const observer = new PerformanceObserverSink({ + sink, + encodePerfEntry, + flushThreshold: 10, + }); + observer.subscribe(); performance.mark('test-mark'); - observer.flush(); - expect(encode).not.toHaveBeenCalled(); - expect(sink.getWrittenItems()).toStrictEqual([]); + sink.close(); + + expect(() => observer.flush()).not.toThrow(); + expect(() => observer.flush()).not.toThrow(); + + observer.unsubscribe(); }); - it('unsubscribe is isomorphic and calls observe on internal PerformanceObserver', () => { + it('unsubscribe is idempotent and calls disconnect on internal PerformanceObserver', () => { const observerSink = new PerformanceObserverSink(options); observerSink.subscribe(); @@ -254,22 +367,57 @@ describe('PerformanceObserverSink', () => { expect(MockPerformanceObserver.instances).toHaveLength(0); }); + it('observer callback throws encodePerfEntry errors immediately', () => { + const failingEncode = vi.fn(() => { + throw new Error('Encode failed'); + }); + + const observer = new PerformanceObserverSink({ + sink, + encodePerfEntry: failingEncode, + flushThreshold: 10, + }); + + observer.subscribe(); + + const mockObserver = MockPerformanceObserver.lastInstance(); + expect(() => + mockObserver?.emit([ + { + name: 'test-mark', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + ]), + ).toThrow('Encode failed'); + }); + it('flush wraps sink write errors with descriptive error message', () => { const failingSink = { write: vi.fn(() => { throw new Error('Sink write failed'); }), + isClosed: vi.fn(() => false), }; const observer = new PerformanceObserverSink({ sink: failingSink as any, - encode, - flushThreshold: 1, + encodePerfEntry, + flushThreshold: 10, }); observer.subscribe(); - performance.mark('test-mark'); + const mockObserver = MockPerformanceObserver.lastInstance(); + mockObserver?.emit([ + { + name: 'test-mark', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + ]); expect(() => observer.flush()).toThrow( expect.objectContaining({ @@ -281,28 +429,185 @@ describe('PerformanceObserverSink', () => { ); }); - it('flush wraps encode errors with descriptive error message', () => { - const failingEncode = vi.fn(() => { - throw new Error('Encode failed'); + it('getStats returns dropped and queued item information', () => { + const observer = new PerformanceObserverSink({ + sink, + encodePerfEntry, + maxQueueSize: 20, + flushThreshold: 10, }); + expect(observer.getStats()).toStrictEqual( + expect.objectContaining({ + queued: 0, + dropped: 0, + }), + ); + }); + + it('getStats returns correct queue item count', () => { const observer = new PerformanceObserverSink({ sink, - encode: failingEncode, - flushThreshold: 1, + encodePerfEntry, + flushThreshold: 10, }); observer.subscribe(); + const mockObserver = MockPerformanceObserver.lastInstance(); + mockObserver?.emit([ + { + name: 'start-operation', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + ]); - performance.mark('test-mark'); + expect(observer.getStats()).toStrictEqual( + expect.objectContaining({ + queued: 1, + }), + ); + }); - expect(() => observer.flush()).toThrow( + it('getStats returns correct dropped count when queue overflows', () => { + const smallQueueSize = 2; + const observer = new PerformanceObserverSink({ + sink, + encodePerfEntry, + maxQueueSize: smallQueueSize, + flushThreshold: smallQueueSize, + }); + + const flushSpy = vi.spyOn(observer, 'flush').mockImplementation(() => {}); + + observer.subscribe(); + + const mockObserver = MockPerformanceObserver.lastInstance(); + mockObserver?.emit([ + { + name: 'mark-1', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + { + name: 'mark-2', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + { + name: 'mark-3', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + ]); + + expect(observer.getStats()).toStrictEqual( expect.objectContaining({ - message: 'PerformanceObserverSink failed to write items to sink.', - cause: expect.objectContaining({ - message: 'Encode failed', - }), + queued: 2, + dropped: 1, + }), + ); + + flushSpy.mockRestore(); + observer.unsubscribe(); + }); + + it('getStats returns correct written count when queue overflows', () => { + const observer = new PerformanceObserverSink({ + sink, + encodePerfEntry, + flushThreshold: 2, + }); + + observer.subscribe(); + const mockObserver = MockPerformanceObserver.lastInstance(); + mockObserver?.emit([ + { + name: 'write-test-1', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + { + name: 'write-test-2', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + { + name: 'write-test-3', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + ]); + observer.flush(); + + expect(observer.getStats()).toStrictEqual( + expect.objectContaining({ + written: 3, }), ); }); + + it('tracks addedSinceLastFlush counter correctly', () => { + const observer = new PerformanceObserverSink({ + sink, + encodePerfEntry, + flushThreshold: 10, + }); + + expect(observer.getStats().addedSinceLastFlush).toBe(0); + + observer.subscribe(); + const mockObserver = MockPerformanceObserver.lastInstance(); + + mockObserver?.emit([ + { + name: 'test-1', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + ]); + + expect(observer.getStats().addedSinceLastFlush).toBe(1); + + mockObserver?.emit([ + { + name: 'test-2', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + ]); + + expect(observer.getStats().addedSinceLastFlush).toBe(2); + + observer.flush(); + expect(observer.getStats().addedSinceLastFlush).toBe(0); + + mockObserver?.emit([ + { + name: 'test-3', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + { + name: 'test-4', + entryType: 'mark', + startTime: 0, + duration: 0, + }, + ]); + + expect(observer.getStats()).toHaveProperty('addedSinceLastFlush', 2); + + observer.unsubscribe(); + }); }); diff --git a/packages/utils/src/lib/profiler/profiler.int.test.ts b/packages/utils/src/lib/profiler/profiler.int.test.ts index 949f66649..6e7ac50da 100644 --- a/packages/utils/src/lib/profiler/profiler.int.test.ts +++ b/packages/utils/src/lib/profiler/profiler.int.test.ts @@ -1,7 +1,8 @@ -import { performance } from 'node:perf_hooks'; import { beforeEach, describe, expect, it } from 'vitest'; +import { MockTraceEventFileSink } from '../../../mocks/sink.mock.js'; +import type { PerformanceEntryEncoder } from '../performance-observer.js'; import type { ActionTrackEntryPayload } from '../user-timing-extensibility-api.type.js'; -import { Profiler } from './profiler.js'; +import { NodejsProfiler, Profiler } from './profiler.js'; describe('Profiler Integration', () => { let profiler: Profiler>; @@ -24,14 +25,14 @@ describe('Profiler Integration', () => { }); it('should create complete performance timeline for sync operation', () => { - const result = profiler.measure('sync-test', () => - Array.from({ length: 1000 }, (_, i) => i).reduce( - (sum, num) => sum + num, - 0, + expect( + profiler.measure('sync-test', () => + Array.from({ length: 1000 }, (_, i) => i).reduce( + (sum, num) => sum + num, + 0, + ), ), - ); - - expect(result).toBe(499_500); + ).toBe(499_500); const marks = performance.getEntriesByType('mark'); const measures = performance.getEntriesByType('measure'); @@ -67,12 +68,12 @@ describe('Profiler Integration', () => { }); it('should create complete performance timeline for async operation', async () => { - const result = await profiler.measureAsync('async-test', async () => { - await new Promise(resolve => setTimeout(resolve, 10)); - return 'async-result'; - }); - - expect(result).toBe('async-result'); + await expect( + profiler.measureAsync('async-test', async () => { + await new Promise(resolve => setTimeout(resolve, 10)); + return 'async-result'; + }), + ).resolves.toBe('async-result'); const marks = performance.getEntriesByType('mark'); const measures = performance.getEntriesByType('measure'); @@ -297,3 +298,173 @@ describe('Profiler Integration', () => { expect(performance.getEntriesByType('measure')).toHaveLength(0); }); }); + +describe('NodeJS Profiler Integration', () => { + const simpleEncoder: PerformanceEntryEncoder = entry => { + if (entry.entryType === 'measure') { + return [`${entry.name}:${entry.duration.toFixed(2)}ms`]; + } + return []; + }; + + let mockSink: MockTraceEventFileSink; + let nodejsProfiler: NodejsProfiler; + + beforeEach(() => { + mockSink = new MockTraceEventFileSink(); + + nodejsProfiler = new NodejsProfiler({ + prefix: 'test', + track: 'test-track', + sink: mockSink, + encodePerfEntry: simpleEncoder, + enabled: true, + }); + }); + + it('should initialize with sink opened when enabled', () => { + expect(mockSink.isClosed()).toBe(false); + expect(nodejsProfiler.isEnabled()).toBe(true); + expect(mockSink.open).toHaveBeenCalledTimes(1); + }); + + it('should create performance entries and write to sink', () => { + expect(nodejsProfiler.measure('test-operation', () => 'success')).toBe( + 'success', + ); + }); + + it('should handle async operations', async () => { + await expect( + nodejsProfiler.measureAsync('async-test', async () => { + await new Promise(resolve => setTimeout(resolve, 1)); + return 'async-result'; + }), + ).resolves.toBe('async-result'); + }); + + it('should disable profiling and close sink', () => { + nodejsProfiler.setEnabled(false); + expect(nodejsProfiler.isEnabled()).toBe(false); + expect(mockSink.isClosed()).toBe(true); + expect(mockSink.close).toHaveBeenCalledTimes(1); + + expect(nodejsProfiler.measure('disabled-test', () => 'success')).toBe( + 'success', + ); + + expect(mockSink.getWrittenItems()).toHaveLength(0); + }); + + it('should re-enable profiling correctly', () => { + nodejsProfiler.setEnabled(false); + nodejsProfiler.setEnabled(true); + + expect(nodejsProfiler.isEnabled()).toBe(true); + expect(mockSink.isClosed()).toBe(false); + expect(mockSink.open).toHaveBeenCalledTimes(2); + + expect(nodejsProfiler.measure('re-enabled-test', () => 42)).toBe(42); + }); + + it('should support custom tracks', () => { + const profilerWithTracks = new NodejsProfiler({ + prefix: 'api-server', + track: 'HTTP', + tracks: { + db: { track: 'Database', color: 'secondary' }, + cache: { track: 'Cache', color: 'primary' }, + }, + sink: mockSink, + encodePerfEntry: simpleEncoder, + }); + + expect( + profilerWithTracks.measure('user-lookup', () => 'user123', { + track: 'cache', + }), + ).toBe('user123'); + }); + + it('should capture buffered entries when buffered option is enabled', () => { + const bufferedProfiler = new NodejsProfiler({ + prefix: 'buffered-test', + track: 'Test', + sink: mockSink, + encodePerfEntry: simpleEncoder, + captureBufferedEntries: true, + enabled: true, + }); + + const bufferedStats = bufferedProfiler.stats; + expect(bufferedStats.state).toBe('running'); + expect(bufferedStats.walOpen).toBe(true); + expect(bufferedStats.isSubscribed).toBe(true); + expect(bufferedStats.queued).toBe(0); + expect(bufferedStats.dropped).toBe(0); + expect(bufferedStats.written).toBe(0); + + bufferedProfiler.setEnabled(false); + }); + + it('should return correct getStats with dropped and written counts', () => { + const statsProfiler = new NodejsProfiler({ + prefix: 'stats-test', + track: 'Stats', + sink: mockSink, + encodePerfEntry: simpleEncoder, + maxQueueSize: 2, + flushThreshold: 2, + enabled: true, + }); + + expect(statsProfiler.measure('test-op', () => 'result')).toBe('result'); + + const stats = statsProfiler.stats; + expect(stats.state).toBe('running'); + expect(stats.walOpen).toBe(true); + expect(stats.isSubscribed).toBe(true); + expect(typeof stats.queued).toBe('number'); + expect(typeof stats.dropped).toBe('number'); + expect(typeof stats.written).toBe('number'); + + statsProfiler.setEnabled(false); + }); + + it('should provide comprehensive queue statistics via getStats', () => { + const profiler = new NodejsProfiler({ + prefix: 'stats-profiler', + track: 'Stats', + sink: mockSink, + encodePerfEntry: simpleEncoder, + maxQueueSize: 3, + flushThreshold: 2, + enabled: true, + }); + + const initialStats = profiler.stats; + expect(initialStats.state).toBe('running'); + expect(initialStats.walOpen).toBe(true); + expect(initialStats.isSubscribed).toBe(true); + expect(initialStats.queued).toBe(0); + expect(initialStats.dropped).toBe(0); + expect(initialStats.written).toBe(0); + + profiler.measure('operation-1', () => 'result1'); + profiler.measure('operation-2', () => 'result2'); + profiler.flush(); + expect(profiler.stats.written).toBe(0); + + profiler.setEnabled(false); + + const finalStats = profiler.stats; + expect(finalStats.state).toBe('idle'); + expect(finalStats.walOpen).toBe(false); + expect(finalStats.isSubscribed).toBe(false); + expect(finalStats.queued).toBe(0); + }); + + it('should write to file on flush', () => { + expect(true).toBe(true); + }); +}); diff --git a/packages/utils/src/lib/profiler/profiler.ts b/packages/utils/src/lib/profiler/profiler.ts index 130e28c44..dc253398d 100644 --- a/packages/utils/src/lib/profiler/profiler.ts +++ b/packages/utils/src/lib/profiler/profiler.ts @@ -1,5 +1,9 @@ -import process from 'node:process'; import { isEnvVarEnabled } from '../env.js'; +import { + type PerformanceObserverOptions, + PerformanceObserverSink, +} from '../performance-observer.js'; +import type { Recoverable, Sink } from '../sink-source.type.js'; import { type ActionTrackConfigs, type MeasureCtxOptions, @@ -59,7 +63,7 @@ export type ProfilerOptions = * */ export class Profiler { - #enabled: boolean; + #enabled: boolean = false; readonly #defaults: ActionTrackEntryPayload; readonly tracks: Record | undefined; readonly #ctxOf: ReturnType; @@ -73,14 +77,14 @@ export class Profiler { * @param options.track - Default track name for measurements * @param options.trackGroup - Default track group for organization * @param options.color - Default color for track entries - * @param options.enabled - Whether profiling is enabled (defaults to CP_PROFILING env var) + * @param options.enabled - Whether profiling is enabled (defaults to false) * */ constructor(options: ProfilerOptions) { - const { tracks, prefix, enabled, ...defaults } = options; + const { tracks, prefix, enabled = false, ...defaults } = options; const dataType = 'track-entry'; - this.#enabled = enabled ?? isEnvVarEnabled(PROFILER_ENABLED_ENV_VAR); + this.#enabled = enabled; this.#defaults = { ...defaults, dataType }; this.tracks = tracks ? setupTracks({ ...defaults, dataType }, tracks) @@ -95,13 +99,11 @@ export class Profiler { /** * Sets enabled state for this profiler. * - * Also sets the `CP_PROFILING` environment variable. * This means any future {@link Profiler} instantiations (including child processes) will use the same enabled state. * * @param enabled - Whether profiling should be enabled */ setEnabled(enabled: boolean): void { - process.env[PROFILER_ENABLED_ENV_VAR] = `${enabled}`; this.#enabled = enabled; } @@ -140,7 +142,7 @@ export class Profiler { * }); */ marker(name: string, opt?: MarkerOptions): void { - if (!this.#enabled) { + if (!this.isEnabled()) { return; } @@ -173,7 +175,7 @@ export class Profiler { * */ measure(event: string, work: () => R, options?: MeasureOptions): R { - if (!this.#enabled) { + if (!this.isEnabled()) { return work(); } @@ -210,7 +212,7 @@ export class Profiler { work: () => Promise, options?: MeasureOptions, ): Promise { - if (!this.#enabled) { + if (!this.isEnabled()) { return await work(); } @@ -226,3 +228,157 @@ export class Profiler { } } } + +/** + * Options for configuring a NodejsProfiler instance. + * + * Extends ProfilerOptions with a required sink parameter. + * + * @template Tracks - Record type defining available track names and their configurations + */ +export type NodejsProfilerOptions< + DomainEvents, + Tracks extends Record, +> = ProfilerOptions & + Omit, 'sink'> & { + /** Sink for buffering and flushing performance data + * @NOTE this is dummy code and will be replaced by PR #1210 + **/ + sink: Sink & Recoverable; + }; + +/** + * Performance profiler with automatic process exit handling for buffered performance data. + * + * This class extends the base {@link Profiler} with automatic flushing of performance data + * when the process exits. It accepts a {@link PerformanceObserverSink} that buffers performance + * entries and ensures they are written out during process termination, even for unexpected exits. + * + * The sink defines the output format for performance data, enabling flexible serialization + * to various formats such as DevTools TraceEvent JSON, OpenTelemetry protocol buffers, + * or custom domain-specific formats. + * + * The profiler automatically subscribes to the performance observer when enabled and installs + * exit handlers that flush buffered data on process termination (signals, fatal errors, or normal exit). + * + */ +export class NodejsProfiler< + DomainEvents, + Tracks extends Record = Record< + string, + ActionTrackEntryPayload + >, +> extends Profiler { + #sink: Sink & Recoverable; + #performanceObserverSink: PerformanceObserverSink; + #state: 'idle' | 'running' | 'closed' = 'idle'; + + /** + * Creates a NodejsProfiler instance. + * @param options - Configuration with required sink + */ + constructor(options: NodejsProfilerOptions) { + const { + sink, + encodePerfEntry, + captureBufferedEntries, + flushThreshold, + maxQueueSize, + enabled, + ...profilerOptions + } = options; + const initialEnabled = enabled ?? isEnvVarEnabled(PROFILER_ENABLED_ENV_VAR); + super({ ...profilerOptions, enabled: initialEnabled }); + + this.#sink = sink; + + this.#performanceObserverSink = new PerformanceObserverSink({ + sink, + encodePerfEntry, + captureBufferedEntries, + flushThreshold, + maxQueueSize, + }); + + if (initialEnabled) { + this.#transition('running'); + } + } + + #transition(next: 'idle' | 'running' | 'closed'): void { + if (this.#state === next) { + return; + } + if (this.#state === 'closed') { + throw new Error('Profiler already closed'); + } + + switch (`${this.#state}->${next}`) { + case 'idle->running': + super.setEnabled(true); + this.#sink.open(); + this.#performanceObserverSink.subscribe(); + break; + + case 'running->idle': + case 'running->closed': + super.setEnabled(false); + this.#performanceObserverSink.unsubscribe(); + this.#sink.close(); + break; + + case 'idle->closed': + // No-op, was not open + break; + + default: + throw new Error(`Invalid transition: ${this.#state} -> ${next}`); + } + + this.#state = next; + } + + /** + * Closes profiler and releases resources. Idempotent, safe for exit handlers. + * **Exit Handler Usage**: Call only this method from process exit handlers. + */ + close(): void { + this.#transition('closed'); + } + + /** @returns Current profiler state */ + get state(): 'idle' | 'running' | 'closed' { + return this.#state; + } + + /** @returns Whether profiler is in 'running' state */ + override isEnabled(): boolean { + return this.#state === 'running'; + } + + /** Enables profiling (start/stop)*/ + override setEnabled(enabled: boolean): void { + if (enabled) { + this.#transition('running'); + } else { + this.#transition('idle'); + } + } + + /** @returns Queue statistics and profiling state for monitoring */ + get stats() { + return { + ...this.#performanceObserverSink.getStats(), + state: this.#state, + walOpen: !this.#sink.isClosed(), + }; + } + + /** Flushes buffered performance data to sink. */ + flush(): void { + if (this.#state === 'closed') { + return; // No-op if closed + } + this.#performanceObserverSink.flush(); + } +} diff --git a/packages/utils/src/lib/profiler/profiler.unit.test.ts b/packages/utils/src/lib/profiler/profiler.unit.test.ts index 0e285deb2..ee7ebf5c2 100644 --- a/packages/utils/src/lib/profiler/profiler.unit.test.ts +++ b/packages/utils/src/lib/profiler/profiler.unit.test.ts @@ -1,7 +1,15 @@ import { performance } from 'node:perf_hooks'; import { beforeEach, describe, expect, it, vi } from 'vitest'; +import { MockTraceEventFileSink } from '../../../mocks/sink.mock.js'; +import type { PerformanceEntryEncoder } from '../performance-observer.js'; +import * as PerfObserverModule from '../performance-observer.js'; import type { ActionTrackEntryPayload } from '../user-timing-extensibility-api.type.js'; -import { Profiler, type ProfilerOptions } from './profiler.js'; +import { + NodejsProfiler, + type NodejsProfilerOptions, + Profiler, + type ProfilerOptions, +} from './profiler.js'; describe('Profiler', () => { const getProfiler = (overrides?: Partial) => @@ -22,28 +30,19 @@ describe('Profiler', () => { profiler = getProfiler(); }); - it('constructor should initialize with default enabled state from env', () => { - vi.stubEnv('CP_PROFILING', 'true'); - const profilerWithEnv = getProfiler(); - - expect(profilerWithEnv.isEnabled()).toBe(true); - }); - - it('constructor should override enabled state from options', () => { - vi.stubEnv('CP_PROFILING', 'false'); - const profilerWithOverride = new Profiler({ + it('should create profiler instances', () => { + const testProfiler = new Profiler({ prefix: 'cp', track: 'test-track', - enabled: true, }); - expect(profilerWithOverride.isEnabled()).toBe(true); + expect(testProfiler).toBeDefined(); + expect(typeof testProfiler.measure).toBe('function'); + expect(typeof testProfiler.marker).toBe('function'); }); it('constructor should use defaults for measure', () => { - const customProfiler = getProfiler({ color: 'secondary' }); - - customProfiler.setEnabled(true); + const customProfiler = getProfiler({ color: 'secondary', enabled: true }); const result = customProfiler.measure('test-operation', () => 'success'); @@ -128,19 +127,10 @@ describe('Profiler', () => { expect(profiler.isEnabled()).toBe(false); }); - it('isEnabled should update environment variable', () => { - profiler.setEnabled(true); - expect(process.env.CP_PROFILING).toBe('true'); - - profiler.setEnabled(false); - expect(process.env.CP_PROFILING).toBe('false'); - }); - it('marker should execute without error when enabled', () => { - profiler.setEnabled(true); - + const enabledProfiler = getProfiler({ enabled: true }); expect(() => { - profiler.marker('test-marker', { + enabledProfiler.marker('test-marker', { color: 'primary', tooltipText: 'Test marker', properties: [['key', 'value']], @@ -163,22 +153,10 @@ describe('Profiler', () => { ]); }); - it('marker should execute without error when disabled', () => { - profiler.setEnabled(false); - - expect(() => { - profiler.marker('test-marker'); - }).not.toThrow(); - - const marks = performance.getEntriesByType('mark'); - expect(marks).toHaveLength(0); - }); - it('marker should execute without error when enabled with default color', () => { performance.clearMarks(); - const profilerWithColor = getProfiler({ color: 'primary' }); - profilerWithColor.setEnabled(true); + const profilerWithColor = getProfiler({ color: 'primary', enabled: true }); expect(() => { profilerWithColor.marker('test-marker-default-color', { @@ -193,7 +171,7 @@ describe('Profiler', () => { detail: { devtools: expect.objectContaining({ dataType: 'marker', - color: 'primary', // Should use default color + color: 'primary', tooltipText: 'Test marker with default color', }), }, @@ -202,8 +180,7 @@ describe('Profiler', () => { }); it('marker should execute without error when enabled with no default color', () => { - const profilerNoColor = getProfiler(); - profilerNoColor.setEnabled(true); + const profilerNoColor = getProfiler({ enabled: true }); expect(() => { profilerNoColor.marker('test-marker-no-color', { @@ -229,14 +206,29 @@ describe('Profiler', () => { ]); }); + it('marker should return early when disabled', () => { + const disabledProfiler = getProfiler({ enabled: false }); + + expect(() => { + disabledProfiler.marker('disabled-marker', { + color: 'primary', + tooltipText: 'This should not create a mark', + }); + }).not.toThrow(); + + const marks = performance.getEntriesByType('mark'); + expect(marks).toHaveLength(0); + }); + it('measure should execute work and return result when enabled', () => { performance.clearMarks(); performance.clearMeasures(); - profiler.setEnabled(true); - + const enabledProfiler = getProfiler({ enabled: true }); const workFn = vi.fn(() => 'result'); - const result = profiler.measure('test-event', workFn, { color: 'primary' }); + const result = enabledProfiler.measure('test-event', workFn, { + color: 'primary', + }); expect(result).toBe('result'); expect(workFn).toHaveBeenCalled(); @@ -279,24 +271,15 @@ describe('Profiler', () => { ]); }); - it('measure should execute work directly when disabled', () => { - profiler.setEnabled(false); + it('measure should always execute work function', () => { const workFn = vi.fn(() => 'result'); const result = profiler.measure('test-event', workFn); expect(result).toBe('result'); expect(workFn).toHaveBeenCalled(); - - const marks = performance.getEntriesByType('mark'); - const measures = performance.getEntriesByType('measure'); - - expect(marks).toHaveLength(0); - expect(measures).toHaveLength(0); }); it('measure should propagate errors when enabled', () => { - profiler.setEnabled(true); - const error = new Error('Test error'); const workFn = vi.fn(() => { throw error; @@ -306,9 +289,7 @@ describe('Profiler', () => { expect(workFn).toHaveBeenCalled(); }); - it('measure should propagate errors when disabled', () => { - profiler.setEnabled(false); - + it('measure should propagate errors', () => { const error = new Error('Test error'); const workFn = vi.fn(() => { throw error; @@ -318,17 +299,58 @@ describe('Profiler', () => { expect(workFn).toHaveBeenCalled(); }); - it('measureAsync should handle async operations correctly when enabled', async () => { - profiler.setEnabled(true); + it('measure should propagate errors when enabled and call error callback', () => { + const enabledProfiler = getProfiler({ enabled: true }); + const error = new Error('Enabled test error'); + const workFn = vi.fn(() => { + throw error; + }); + + expect(() => enabledProfiler.measure('test-event-error', workFn)).toThrow( + error, + ); + expect(workFn).toHaveBeenCalled(); + // Verify that performance marks were created even though error occurred + const marks = performance.getEntriesByType('mark'); + expect(marks).toStrictEqual( + expect.arrayContaining([ + expect.objectContaining({ + name: 'cp:test-event-error:start', + detail: { + devtools: expect.objectContaining({ + dataType: 'track-entry', + track: 'test-track', + }), + }, + }), + expect.objectContaining({ + name: 'cp:test-event-error:end', + detail: { + devtools: expect.objectContaining({ + dataType: 'track-entry', + track: 'test-track', + }), + }, + }), + ]), + ); + }); + + it('measureAsync should handle async operations correctly when enabled', async () => { + const enabledProfiler = getProfiler({ enabled: true }); const workFn = vi.fn(async () => { await Promise.resolve(); return 'async-result'; }); - const result = await profiler.measureAsync('test-async-event', workFn, { - color: 'primary', - }); + const result = await enabledProfiler.measureAsync( + 'test-async-event', + workFn, + { + color: 'primary', + }, + ); expect(result).toBe('async-result'); expect(workFn).toHaveBeenCalled(); @@ -374,29 +396,7 @@ describe('Profiler', () => { ]); }); - it('measureAsync should execute async work directly when disabled', async () => { - profiler.setEnabled(false); - - const workFn = vi.fn(async () => { - await Promise.resolve(); - return 'async-result'; - }); - - const result = await profiler.measureAsync('test-async-event', workFn); - - expect(result).toBe('async-result'); - expect(workFn).toHaveBeenCalled(); - - const marks = performance.getEntriesByType('mark'); - const measures = performance.getEntriesByType('measure'); - - expect(marks).toHaveLength(0); - expect(measures).toHaveLength(0); - }); - it('measureAsync should propagate async errors when enabled', async () => { - profiler.setEnabled(true); - const error = new Error('Async test error'); const workFn = vi.fn(async () => { await Promise.resolve(); @@ -409,18 +409,788 @@ describe('Profiler', () => { expect(workFn).toHaveBeenCalled(); }); - it('measureAsync should propagate async errors when disabled', async () => { - profiler.setEnabled(false); - - const error = new Error('Async test error'); + it('measureAsync should propagate async errors when enabled and call error callback', async () => { + const enabledProfiler = getProfiler({ enabled: true }); + const error = new Error('Enabled async test error'); const workFn = vi.fn(async () => { await Promise.resolve(); throw error; }); await expect( - profiler.measureAsync('test-async-event', workFn), + enabledProfiler.measureAsync('test-async-event-error', workFn), ).rejects.toThrow(error); expect(workFn).toHaveBeenCalled(); + + // Verify that performance marks were created even though error occurred + const marks = performance.getEntriesByType('mark'); + expect(marks).toStrictEqual( + expect.arrayContaining([ + expect.objectContaining({ + name: 'cp:test-async-event-error:start', + detail: { + devtools: expect.objectContaining({ + dataType: 'track-entry', + track: 'test-track', + }), + }, + }), + expect.objectContaining({ + name: 'cp:test-async-event-error:end', + detail: { + devtools: expect.objectContaining({ + dataType: 'track-entry', + track: 'test-track', + }), + }, + }), + ]), + ); + }); +}); + +const simpleEncoder: PerformanceEntryEncoder = entry => { + if (entry.entryType === 'measure') { + return [`${entry.name}:${entry.duration.toFixed(2)}ms`]; + } + return []; +}; + +describe('NodejsProfiler', () => { + const getNodejsProfiler = ( + overrides?: Partial< + NodejsProfilerOptions> + >, + ) => { + const sink = new MockTraceEventFileSink(); + + const mockPerfObserverSink = { + subscribe: vi.fn(), + unsubscribe: vi.fn(), + isSubscribed: vi.fn().mockReturnValue(false), + encode: vi.fn(), + flush: vi.fn(), + getStats: vi.fn().mockReturnValue({ + isSubscribed: false, + queued: 0, + dropped: 0, + written: 0, + maxQueueSize: 10_000, + flushThreshold: 20, + addedSinceLastFlush: 0, + buffered: true, + }), + }; + vi.spyOn(PerfObserverModule, 'PerformanceObserverSink').mockReturnValue( + mockPerfObserverSink as any, + ); + + vi.spyOn(sink, 'open'); + vi.spyOn(sink, 'close'); + + const profiler = new NodejsProfiler({ + prefix: 'test', + track: 'test-track', + sink, + encodePerfEntry: simpleEncoder, + ...overrides, + }); + + return { sink, perfObserverSink: mockPerfObserverSink, profiler }; + }; + + it('should export NodejsProfiler class', () => { + expect(typeof NodejsProfiler).toBe('function'); + }); + + it('should have required static structure', () => { + const profiler = getNodejsProfiler().profiler; + expect(typeof profiler.measure).toBe('function'); + expect(typeof profiler.measureAsync).toBe('function'); + expect(typeof profiler.marker).toBe('function'); + expect(typeof profiler.close).toBe('function'); + expect(typeof profiler.state).toBe('string'); + expect(typeof profiler.setEnabled).toBe('function'); + }); + + it('should inherit from Profiler', () => { + expect(Object.getPrototypeOf(NodejsProfiler.prototype)).toBe( + Profiler.prototype, + ); + }); + + it('should initialize with sink opened when enabled is true', () => { + const { sink, perfObserverSink } = getNodejsProfiler({ enabled: true }); + expect(sink.isClosed()).toBe(false); + expect(sink.open).toHaveBeenCalledTimes(1); + expect(perfObserverSink.subscribe).toHaveBeenCalledTimes(1); + }); + + it('should initialize with sink closed when enabled is false', () => { + const { sink, perfObserverSink } = getNodejsProfiler({ enabled: false }); + expect(sink.isClosed()).toBe(true); + expect(sink.open).not.toHaveBeenCalled(); + expect(perfObserverSink.subscribe).not.toHaveBeenCalled(); + }); + + it('should open sink and subscribe observer when enabling', () => { + const { sink, perfObserverSink, profiler } = getNodejsProfiler({ + enabled: false, + }); + + profiler.setEnabled(true); + + expect(profiler.state).toBe('running'); + expect(sink.isClosed()).toBe(false); + expect(sink.open).toHaveBeenCalledTimes(1); + expect(perfObserverSink.subscribe).toHaveBeenCalledTimes(1); + }); + + it('should close sink and unsubscribe observer when disabling', () => { + const { sink, perfObserverSink, profiler } = getNodejsProfiler({ + enabled: true, + }); + + profiler.setEnabled(false); + + expect(profiler.isEnabled()).toBe(false); + expect(sink.isClosed()).toBe(true); + expect(sink.close).toHaveBeenCalledTimes(1); + expect(perfObserverSink.unsubscribe).toHaveBeenCalledTimes(1); + }); + + it('should be idempotent - no-op when setting same state', () => { + const { sink, perfObserverSink, profiler } = getNodejsProfiler({ + enabled: true, + }); + + expect(sink.open).toHaveBeenCalledTimes(1); + expect(perfObserverSink.subscribe).toHaveBeenCalledTimes(1); + + profiler.setEnabled(true); + + expect(sink.open).toHaveBeenCalledTimes(1); + expect(perfObserverSink.subscribe).toHaveBeenCalledTimes(1); + }); + + it('should perform measurements when enabled', () => { + const { profiler } = getNodejsProfiler({ enabled: true }); + + const result = profiler.measure('test-op', () => 'success'); + expect(result).toBe('success'); + }); + + it('should skip sink operations when disabled', () => { + const { sink, profiler } = getNodejsProfiler({ enabled: false }); + + const result = profiler.measure('disabled-op', () => 'success'); + expect(result).toBe('success'); + + expect(sink.getWrittenItems()).toHaveLength(0); + }); + + it('should flush buffered performance data to sink', () => { + const { perfObserverSink, profiler } = getNodejsProfiler({ enabled: true }); + + profiler.flush(); + + expect(perfObserverSink.flush).toHaveBeenCalledTimes(1); + }); + + it('get stats() getter should return current stats', () => { + const { profiler } = getNodejsProfiler({ enabled: false }); + + expect(profiler.stats).toStrictEqual({ + state: 'idle', + walOpen: false, + isSubscribed: false, + queued: 0, + dropped: 0, + written: 0, + maxQueueSize: 10_000, + flushThreshold: 20, + addedSinceLastFlush: 0, + buffered: true, + }); + }); + + describe.todo('state transitions', () => { + it.todo( + 'should handle full transition matrix: idle → running → idle → closed', + () => { + const { sink, perfObserverSink, profiler } = getNodejsProfiler({ + enabled: false, + }); + + // Initial state: idle + expect(profiler.isRunning()).toBe(false); + expect(profiler.activeat()).toBe(false); + expect(sink.isClosed()).toBe(true); + expect(perfObserverSink.subscribe).not.toHaveBeenCalled(); + + // idle → running + profiler.setEnabled(true); + expect(profiler.state).toBe('running'); + expect(sink.isClosed()).toBe(false); + expect(sink.open).toHaveBeenCalledTimes(1); + expect(perfObserverSink.subscribe).toHaveBeenCalledTimes(1); + + // running → idle + profiler.setEnabled(false); + expect(profiler.isEnabled()).toBe(false); + expect(sink.isClosed()).toBe(true); + expect(sink.close).toHaveBeenCalledTimes(1); + expect(perfObserverSink.unsubscribe).toHaveBeenCalledTimes(1); + + // idle → closed (terminal) + profiler.close(); + expect(sink.close).toHaveBeenCalledTimes(1); // No additional close since we're in idle + expect(perfObserverSink.unsubscribe).toHaveBeenCalledTimes(1); // No additional unsubscribe since we're in idle + expect(perfObserverSink.flush).toHaveBeenCalledTimes(0); // No flush for idle->closed + + // Verify closed state - operations should throw or be safe + expect(() => profiler.setEnabled(true)).toThrow( + 'Profiler already closed', + ); + profiler.close(); // Should be idempotent + }, + ); + + it.todo('should expose state via getter', () => { + const profiler = getNodejsProfiler({ enabled: false }).profiler; + + expect(profiler.state).toBe('idle'); + + profiler.setEnabled(true); + expect(profiler.state).toBe('running'); + + profiler.setEnabled(false); + expect(profiler.state).toBe('idle'); + + profiler.close(); + expect(profiler.state).toBe('closed'); + }); + + it.todo( + 'should maintain state invariant: running ⇒ sink open + observer subscribed', + () => { + const { sink, perfObserverSink, profiler } = getNodejsProfiler({ + enabled: false, + }); + + // Initially idle - sink closed, observer not subscribed + expect(profiler.state).toBe('idle'); + expect(sink.isClosed()).toBe(true); + expect(perfObserverSink.isSubscribed).toHaveBeenCalledWith(false); + + // Enable - should open sink and subscribe observer + profiler.setEnabled(true); + expect(profiler.state).toBe('running'); + expect(sink.isClosed()).toBe(false); + expect(sink.open).toHaveBeenCalledTimes(1); + expect(perfObserverSink.subscribe).toHaveBeenCalledTimes(1); + + // Disable - should close sink and unsubscribe observer + profiler.setEnabled(false); + expect(profiler.state).toBe('idle'); + expect(sink.close).toHaveBeenCalledTimes(1); + expect(perfObserverSink.unsubscribe).toHaveBeenCalledTimes(1); + + // Enable again - should open and subscribe again + profiler.setEnabled(true); + expect(profiler.state).toBe('running'); + expect(sink.isClosed()).toBe(false); + expect(sink.open).toHaveBeenCalledTimes(2); + expect(perfObserverSink.subscribe).toHaveBeenCalledTimes(2); + }, + ); + + it.todo('should handle running → closed transition', () => { + const { sink, perfObserverSink, profiler } = getNodejsProfiler({ + enabled: true, + }); + + // Initial state: running + expect(profiler.state).toBe('running'); + + // running → closed + profiler.close(); + expect(perfObserverSink.flush).toHaveBeenCalledTimes(1); + expect(perfObserverSink.unsubscribe).toHaveBeenCalledTimes(1); + expect(sink.close).toHaveBeenCalledTimes(1); + }); + + it.todo('should prevent invalid transitions when closed', () => { + const { profiler } = getNodejsProfiler({ enabled: false }); + + // idle → closed + profiler.close(); + + // Should throw for setEnabled(true) + expect(() => profiler.setEnabled(true)).toThrow( + 'Profiler already closed', + ); + }); + + it('should handle flush when closed (no-op)', () => { + const { perfObserverSink, profiler } = getNodejsProfiler({ + enabled: false, + }); + + // Close profiler + profiler.close(); + + // flush should be no-op when closed + profiler.flush(); + expect(perfObserverSink.flush).not.toHaveBeenCalled(); + }); + + it('should handle flush when running', () => { + const { perfObserverSink, profiler } = getNodejsProfiler({ + enabled: true, + }); + + // Should flush when running + profiler.flush(); + expect(perfObserverSink.flush).toHaveBeenCalledTimes(1); + }); + + it('should throw error when attempting to transition from closed state', () => { + const { profiler } = getNodejsProfiler({ enabled: false }); + + // Close the profiler first + profiler.close(); + expect(profiler.state).toBe('closed'); + + // Attempting to enable from closed state should throw + expect(() => profiler.setEnabled(true)).toThrow( + 'Profiler already closed', + ); + }); + + it('should handle idle to closed transition without cleanup', () => { + const { sink, perfObserverSink, profiler } = getNodejsProfiler({ + enabled: false, + }); + + // Ensure profiler is in idle state + expect(profiler.state).toBe('idle'); + expect(sink.isClosed()).toBe(true); + expect(perfObserverSink.subscribe).not.toHaveBeenCalled(); + + // Transition from idle to closed + profiler.close(); + + // Should change state to closed without any cleanup operations + expect(profiler.state).toBe('closed'); + expect(sink.close).not.toHaveBeenCalled(); + expect(perfObserverSink.unsubscribe).not.toHaveBeenCalled(); + }); + + it('should handle running to closed transition with cleanup', () => { + const { sink, perfObserverSink, profiler } = getNodejsProfiler({ + enabled: true, + }); + + // Ensure profiler is in running state + expect(profiler.state).toBe('running'); + expect(sink.isClosed()).toBe(false); + expect(perfObserverSink.subscribe).toHaveBeenCalledTimes(1); + + // Transition from running to closed + profiler.close(); + + // Should change state to closed and perform cleanup operations + expect(profiler.state).toBe('closed'); + expect(sink.close).toHaveBeenCalledTimes(1); + expect(perfObserverSink.unsubscribe).toHaveBeenCalledTimes(1); + }); + + it('should close profiler and change state to closed', () => { + const { profiler } = getNodejsProfiler({ enabled: false }); + + // Initially idle + expect(profiler.state).toBe('idle'); + + // Close should transition to closed + profiler.close(); + expect(profiler.state).toBe('closed'); + }); + + it('should disable profiling when setEnabled(false) is called', () => { + const { sink, perfObserverSink, profiler } = getNodejsProfiler({ + enabled: true, + }); + + // Initially running + expect(profiler.state).toBe('running'); + + // Call setEnabled(false) which should transition to idle + profiler.setEnabled(false); + + // Verify operations were performed + expect(profiler.state).toBe('idle'); + expect(sink.close).toHaveBeenCalledTimes(1); + expect(perfObserverSink.unsubscribe).toHaveBeenCalledTimes(1); + }); + + describe('#transition method state transitions', () => { + it('should return early when transitioning to same state (idle->idle)', () => { + const { sink, perfObserverSink, profiler } = getNodejsProfiler({ + enabled: false, + }); + + // Ensure profiler is in idle state + expect(profiler.state).toBe('idle'); + + // Try to transition to same state - should be no-op + profiler.setEnabled(true); // This calls transition('running') from idle + expect(profiler.state).toBe('running'); + + // Now try to transition to running again - should be no-op + profiler.setEnabled(true); // Should not change anything + + // Should still be running and operations should not be called again + expect(profiler.state).toBe('running'); + expect(sink.open).toHaveBeenCalledTimes(1); + expect(perfObserverSink.subscribe).toHaveBeenCalledTimes(1); + }); + + it('should return early when transitioning to same state (running->running)', () => { + const { sink, perfObserverSink, profiler } = getNodejsProfiler({ + enabled: true, + }); + + // Ensure profiler is in running state + expect(profiler.state).toBe('running'); + + // Try to transition to same state - should be no-op + profiler.setEnabled(true); // Should be no-op + + // Should still be running and operations should not be called again + expect(profiler.state).toBe('running'); + expect(sink.open).toHaveBeenCalledTimes(1); + expect(perfObserverSink.subscribe).toHaveBeenCalledTimes(1); + }); + + it('should throw error when attempting to transition from closed state', () => { + const { profiler } = getNodejsProfiler({ enabled: false }); + + // Close the profiler first + profiler.close(); + expect(profiler.state).toBe('closed'); + + // Attempting to enable from closed state should throw + expect(() => profiler.setEnabled(true)).toThrow( + 'Profiler already closed', + ); + + // Attempting to disable from closed state should also throw (since setEnabled(false) calls transition('idle')) + expect(() => profiler.setEnabled(false)).toThrow( + 'Profiler already closed', + ); + }); + + it('should handle idle->running transition', () => { + const { sink, perfObserverSink, profiler } = getNodejsProfiler({ + enabled: false, + }); + + // Enable from idle state + expect(profiler.state).toBe('idle'); + + profiler.setEnabled(true); + + expect(profiler.state).toBe('running'); + expect(sink.open).toHaveBeenCalledTimes(1); + expect(perfObserverSink.subscribe).toHaveBeenCalledTimes(1); + }); + + it('should handle running->idle transition', () => { + const { sink, perfObserverSink, profiler } = getNodejsProfiler({ + enabled: true, + }); + + // Disable from running state + expect(profiler.state).toBe('running'); + + profiler.setEnabled(false); + + expect(profiler.state).toBe('idle'); + expect(perfObserverSink.unsubscribe).toHaveBeenCalledTimes(1); + expect(sink.close).toHaveBeenCalledTimes(1); + }); + + it('should handle idle->closed transition', () => { + const { sink, perfObserverSink, profiler } = getNodejsProfiler({ + enabled: false, + }); + + // Close from idle state + expect(profiler.state).toBe('idle'); + + profiler.close(); + + expect(profiler.state).toBe('closed'); + // No cleanup operations should be called for idle->closed + expect(sink.close).not.toHaveBeenCalled(); + expect(perfObserverSink.unsubscribe).not.toHaveBeenCalled(); + }); + + it('should handle running->closed transition', () => { + const { sink, perfObserverSink, profiler } = getNodejsProfiler({ + enabled: true, + }); + + // Close from running state + expect(profiler.state).toBe('running'); + + profiler.close(); + + expect(profiler.state).toBe('closed'); + expect(perfObserverSink.unsubscribe).toHaveBeenCalledTimes(1); + expect(sink.close).toHaveBeenCalledTimes(1); + }); + + it('should execute all operations in running->closed case', () => { + const { sink, perfObserverSink, profiler } = getNodejsProfiler({ + enabled: true, + }); + + // Spy on the parent class setEnabled method + const parentSetEnabledSpy = vi.spyOn(Profiler.prototype, 'setEnabled'); + + // Ensure profiler is in running state + expect(profiler.state).toBe('running'); + + // Trigger the running->closed transition + profiler.close(); + + // Verify all operations in the case are executed: + // 1. super.setEnabled(false) - calls parent setEnabled + expect(parentSetEnabledSpy).toHaveBeenCalledWith(false); + + // 2. this.#performanceObserverSink.unsubscribe() + expect(perfObserverSink.unsubscribe).toHaveBeenCalledTimes(1); + + // 3. this.#sink.close() + expect(sink.close).toHaveBeenCalledTimes(1); + + // 4. State is updated to 'closed' + expect(profiler.state).toBe('closed'); + + // Clean up spy + parentSetEnabledSpy.mockRestore(); + }); + + it('should throw error for invalid transitions (default case)', () => { + const profiler = getNodejsProfiler({ enabled: false }).profiler; + + // We can't easily trigger the default case since the method signature + // restricts the possible transitions, but we can test that valid transitions work + // and invalid ones would be caught by TypeScript or would need runtime testing + + // For now, verify that all valid transitions work as expected + expect(profiler.state).toBe('idle'); + + profiler.setEnabled(true); + expect(profiler.state).toBe('running'); + + profiler.setEnabled(false); + expect(profiler.state).toBe('idle'); + + profiler.close(); + expect(profiler.state).toBe('closed'); + }); + }); + + describe('close() API', () => { + it('should close profiler from idle state', () => { + const { profiler } = getNodejsProfiler({ enabled: false }); + + expect(profiler.state).toBe('idle'); + + profiler.close(); + + expect(profiler.state).toBe('closed'); + }); + + it('should close profiler from running state', () => { + const { profiler } = getNodejsProfiler({ enabled: true }); + + expect(profiler.state).toBe('running'); + + profiler.close(); + + expect(profiler.state).toBe('closed'); + }); + + it('should be idempotent - calling close multiple times', () => { + const { profiler } = getNodejsProfiler({ enabled: false }); + + expect(profiler.state).toBe('idle'); + + profiler.close(); + expect(profiler.state).toBe('closed'); + + // Calling close again should be safe + profiler.close(); + expect(profiler.state).toBe('closed'); + }); + }); + + describe('setEnabled override', () => { + it('should enable profiling when setEnabled(true)', () => { + const { profiler } = getNodejsProfiler({ enabled: false }); + + expect(profiler.state).toBe('idle'); + + profiler.setEnabled(true); + + expect(profiler.state).toBe('running'); + }); + + it('should disable profiling when setEnabled(false)', () => { + const { profiler } = getNodejsProfiler({ enabled: true }); + + expect(profiler.state).toBe('running'); + + profiler.setEnabled(false); + + expect(profiler.state).toBe('idle'); + }); + }); + + describe('flush() early return when closed', () => { + it('should return early when flush() called on closed profiler', () => { + const { perfObserverSink, profiler } = getNodejsProfiler({ + enabled: false, + }); + + // Close profiler + profiler.close(); + expect(profiler.state).toBe('closed'); + + // flush should be no-op when closed + profiler.flush(); + + // flush should not be called on the performance observer sink + expect(perfObserverSink.flush).not.toHaveBeenCalled(); + }); + + it('should flush when profiler is running', () => { + const { perfObserverSink, profiler } = getNodejsProfiler({ + enabled: true, + }); + + expect(profiler.state).toBe('running'); + + profiler.flush(); + + expect(perfObserverSink.flush).toHaveBeenCalledTimes(1); + }); + }); + + it('should be idempotent - no-op when transitioning to same state', () => { + const { sink, perfObserverSink, profiler } = getNodejsProfiler({ + enabled: true, + }); + + // Already running, enable again should be no-op + expect(sink.open).toHaveBeenCalledTimes(1); + expect(perfObserverSink.subscribe).toHaveBeenCalledTimes(1); + + profiler.setEnabled(true); // Should be no-op + + expect(sink.open).toHaveBeenCalledTimes(1); + expect(perfObserverSink.subscribe).toHaveBeenCalledTimes(1); + }); + + it('should propagate errors from measure work function', () => { + const { profiler } = getNodejsProfiler({ enabled: true }); + + const error = new Error('Test error'); + expect(() => { + profiler.measure('error-test', () => { + throw error; + }); + }).toThrow(error); + }); + + it('should propagate errors from measureAsync work function', async () => { + const { profiler } = getNodejsProfiler({ enabled: true }); + + const error = new Error('Async test error'); + await expect(async () => { + await profiler.measureAsync('async-error-test', async () => { + throw error; + }); + }).rejects.toThrow(error); + }); + + it('should skip measurement when profiler is not active', () => { + const { profiler } = getNodejsProfiler({ enabled: false }); + + let workCalled = false; + const result = profiler.measure('inactive-test', () => { + workCalled = true; + return 'result'; + }); + + expect(workCalled).toBe(true); + expect(result).toBe('result'); + }); + + it('should skip async measurement when profiler is not active', async () => { + const { profiler } = getNodejsProfiler({ enabled: false }); + + let workCalled = false; + const result = await profiler.measureAsync( + 'inactive-async-test', + async () => { + workCalled = true; + return 'async-result'; + }, + ); + + expect(workCalled).toBe(true); + expect(result).toBe('async-result'); + }); + + it('should skip marker when profiler is not active', () => { + const { profiler } = getNodejsProfiler({ enabled: false }); + + // Should not throw, just return early + expect(() => { + profiler.marker('inactive-marker'); + }).not.toThrow(); + }); + + describe('base Profiler behavior', () => { + it('should always be active in base profiler', () => { + const profiler = new Profiler({ + prefix: 'cp', + track: 'test-track', + }); + + expect(profiler.isRunning()).toBe(true); + expect(profiler.activeat()).toBe(true); + + // measure should always execute work + let workCalled = false; + const result = profiler.measure('base-test', () => { + workCalled = true; + return 'base-result'; + }); + + expect(workCalled).toBe(true); + expect(result).toBe('base-result'); + + // marker should always work + expect(() => { + profiler.marker('base-marker'); + }).not.toThrow(); + }); + }); }); }); diff --git a/packages/utils/src/lib/sink-source.type.ts b/packages/utils/src/lib/sink-source.type.ts index ee096e31f..5f94584bd 100644 --- a/packages/utils/src/lib/sink-source.type.ts +++ b/packages/utils/src/lib/sink-source.type.ts @@ -29,8 +29,8 @@ export type Observer = { isSubscribed: () => boolean; }; -export type Recoverable = { - recover: () => RecoverResult; +export type Recoverable = { + recover: () => RecoverResult; repack: () => void; finalize: () => void; };