From 6d40c3d13fefe9394c4a18ecd29f27776eaca377 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 1 Dec 2025 01:40:21 +0000 Subject: [PATCH 1/2] feat: add distributed mutex interface and implementations Add a DistributedMutex interface for coordinating access across multiple processes, with implementations for all supported drivers: - RedisMutex: Uses SET NX PX for atomic lock acquisition with Lua script for safe release - SQLiteMutex: Uses dedicated locks table with unique constraint - MongooseMutex: Uses findOneAndUpdate with TTL index for auto-cleanup - FileMutex: Uses exclusive file creation (O_EXCL) with JSON lock files - InMemoryMutex: Map-based implementation for testing/single-process Each implementation supports: - tryAcquire(key, ttlMs): Non-blocking lock acquisition - release(key, token): Token-based release to prevent wrong releases - withLock(key, fn, options): Convenience method with auto-retry This provides the foundation for distributed coordination needed for cron job scheduling across multiple scheduler instances. --- package.json | 24 +++ src/drivers/file-mutex.ts | 219 +++++++++++++++++++++ src/drivers/memory-mutex.ts | 138 +++++++++++++ src/drivers/mongoose-mutex.ts | 219 +++++++++++++++++++++ src/drivers/redis-mutex.ts | 131 +++++++++++++ src/drivers/sqlite-mutex.ts | 155 +++++++++++++++ src/interfaces/mutex.ts | 123 ++++++++++++ tests/mutex/mutex.test.ts | 351 ++++++++++++++++++++++++++++++++++ 8 files changed, 1360 insertions(+) create mode 100644 src/drivers/file-mutex.ts create mode 100644 src/drivers/memory-mutex.ts create mode 100644 src/drivers/mongoose-mutex.ts create mode 100644 src/drivers/redis-mutex.ts create mode 100644 src/drivers/sqlite-mutex.ts create mode 100644 src/interfaces/mutex.ts create mode 100644 tests/mutex/mutex.test.ts diff --git a/package.json b/package.json index 3d87bbb..0d649f8 100644 --- a/package.json +++ b/package.json @@ -91,6 +91,30 @@ "./plugins/ecs-protection-manager": { "import": "./dist/src/plugins/ecs-protection-manager.js", "types": "./dist/src/plugins/ecs-protection-manager.d.ts" + }, + "./interfaces/mutex": { + "import": "./dist/src/interfaces/mutex.js", + "types": "./dist/src/interfaces/mutex.d.ts" + }, + "./mutex/redis": { + "import": "./dist/src/drivers/redis-mutex.js", + "types": "./dist/src/drivers/redis-mutex.d.ts" + }, + "./mutex/sqlite": { + "import": "./dist/src/drivers/sqlite-mutex.js", + "types": "./dist/src/drivers/sqlite-mutex.d.ts" + }, + "./mutex/mongoose": { + "import": "./dist/src/drivers/mongoose-mutex.js", + "types": "./dist/src/drivers/mongoose-mutex.d.ts" + }, + "./mutex/file": { + "import": "./dist/src/drivers/file-mutex.js", + "types": "./dist/src/drivers/file-mutex.d.ts" + }, + "./mutex/memory": { + "import": "./dist/src/drivers/memory-mutex.js", + "types": "./dist/src/drivers/memory-mutex.d.ts" } }, "peerDependencies": { diff --git a/src/drivers/file-mutex.ts b/src/drivers/file-mutex.ts new file mode 100644 index 0000000..9e94f0a --- /dev/null +++ b/src/drivers/file-mutex.ts @@ -0,0 +1,219 @@ +import { randomUUID } from 'crypto'; +import { promises as fs } from 'fs'; +import { open } from 'fs/promises'; +import path from 'path'; +import type { DistributedMutex, MutexLockOptions } from '../interfaces/mutex.ts'; +import { MutexAcquireError } from '../interfaces/mutex.ts'; + +interface LockFileContent { + token: string; + expiresAt: number; + pid: number; +} + +/** + * File-based distributed mutex implementation. + * + * Uses filesystem-level locking with exclusive file creation (O_EXCL flag) + * to ensure atomic lock acquisition. Lock files contain a token and expiration + * time to handle stale locks from crashed processes. + * + * This implementation is suitable for distributed locking when multiple processes + * have access to the same filesystem (e.g., same machine or shared network filesystem). + * + * @example + * ```typescript + * import { FileMutex } from 'adapter-queue/file-mutex'; + * + * const mutex = new FileMutex({ lockDir: '/tmp/myapp/locks' }); + * + * await mutex.withLock('critical-section', async () => { + * // Only one process can execute this at a time + * await performCriticalOperation(); + * }); + * ``` + */ +export class FileMutex implements DistributedMutex { + private lockDir: string; + private initialized = false; + + /** + * Create a new file-based mutex. + * + * @param options - Configuration options + * @param options.lockDir - Directory to store lock files (default: '/tmp/mutex-locks') + */ + constructor(options?: { lockDir?: string }) { + this.lockDir = options?.lockDir ?? '/tmp/mutex-locks'; + } + + private async ensureInitialized(): Promise { + if (this.initialized) return; + + await fs.mkdir(this.lockDir, { recursive: true, mode: 0o755 }); + this.initialized = true; + } + + private getLockPath(key: string): string { + // Sanitize key to be filesystem-safe + const safeKey = key.replace(/[^a-zA-Z0-9_-]/g, '_'); + return path.join(this.lockDir, `${safeKey}.lock`); + } + + async tryAcquire(key: string, ttlMs: number): Promise { + await this.ensureInitialized(); + + const lockPath = this.getLockPath(key); + const token = randomUUID(); + const expiresAt = Date.now() + ttlMs; + + // First, check if an existing lock has expired + try { + const existingContent = await fs.readFile(lockPath, 'utf-8'); + const existingLock: LockFileContent = JSON.parse(existingContent); + + if (existingLock.expiresAt < Date.now()) { + // Lock is expired, try to remove it + try { + await fs.unlink(lockPath); + } catch { + // Another process may have removed it, continue trying to acquire + } + } else { + // Lock is still valid + return null; + } + } catch (err: any) { + if (err.code !== 'ENOENT') { + // Unexpected error reading lock file + throw err; + } + // Lock file doesn't exist, continue to acquire + } + + // Try to create lock file exclusively + let handle; + try { + handle = await open(lockPath, 'wx'); + + const lockContent: LockFileContent = { + token, + expiresAt, + pid: process.pid, + }; + + await handle.writeFile(JSON.stringify(lockContent), 'utf-8'); + await handle.close(); + + return token; + } catch (err: any) { + if (handle) { + try { + await handle.close(); + } catch { + // Ignore close errors + } + } + + if (err.code === 'EEXIST') { + // Lock file was created by another process + return null; + } + throw err; + } + } + + async release(key: string, token: string): Promise { + await this.ensureInitialized(); + + const lockPath = this.getLockPath(key); + + try { + const content = await fs.readFile(lockPath, 'utf-8'); + const lock: LockFileContent = JSON.parse(content); + + // Only delete if token matches (we own the lock) + if (lock.token === token) { + await fs.unlink(lockPath); + return true; + } + + return false; + } catch (err: any) { + if (err.code === 'ENOENT') { + // Lock file doesn't exist (may have expired and been cleaned up) + return false; + } + throw err; + } + } + + async withLock( + key: string, + fn: () => Promise, + options?: MutexLockOptions + ): Promise { + const { ttlMs = 30000, waitMs = 5000, retryIntervalMs = 100 } = options ?? {}; + const deadline = Date.now() + waitMs; + let token: string | null = null; + + // Try to acquire lock with retries + while (Date.now() < deadline) { + token = await this.tryAcquire(key, ttlMs); + if (token) break; + await sleep(retryIntervalMs); + } + + if (!token) { + throw new MutexAcquireError(key, waitMs); + } + + try { + return await fn(); + } finally { + await this.release(key, token); + } + } + + /** + * Clean up all expired locks in the lock directory. + * This can be called periodically for maintenance. + * + * @returns Number of expired locks cleaned up + */ + async cleanup(): Promise { + await this.ensureInitialized(); + + let cleaned = 0; + const now = Date.now(); + + try { + const files = await fs.readdir(this.lockDir); + + for (const file of files) { + if (!file.endsWith('.lock')) continue; + + const lockPath = path.join(this.lockDir, file); + try { + const content = await fs.readFile(lockPath, 'utf-8'); + const lock: LockFileContent = JSON.parse(content); + + if (lock.expiresAt < now) { + await fs.unlink(lockPath); + cleaned++; + } + } catch { + // Ignore errors for individual files + } + } + } catch { + // Ignore errors listing directory + } + + return cleaned; + } +} + +function sleep(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); +} diff --git a/src/drivers/memory-mutex.ts b/src/drivers/memory-mutex.ts new file mode 100644 index 0000000..e503011 --- /dev/null +++ b/src/drivers/memory-mutex.ts @@ -0,0 +1,138 @@ +import { randomUUID } from 'crypto'; +import type { DistributedMutex, MutexLockOptions } from '../interfaces/mutex.ts'; +import { MutexAcquireError } from '../interfaces/mutex.ts'; + +interface LockEntry { + token: string; + expiresAt: number; + timeout: NodeJS.Timeout; +} + +/** + * In-memory mutex implementation for testing and single-process applications. + * + * This mutex only works within a single process and is not suitable for + * distributed locking across multiple processes or servers. Use it for: + * - Unit testing + * - Development environments + * - Single-process applications that need mutex semantics + * + * @example + * ```typescript + * import { InMemoryMutex } from 'adapter-queue/memory-mutex'; + * + * const mutex = new InMemoryMutex(); + * + * await mutex.withLock('critical-section', async () => { + * // Only one async operation can execute this at a time + * await performCriticalOperation(); + * }); + * ``` + */ +export class InMemoryMutex implements DistributedMutex { + private locks = new Map(); + + async tryAcquire(key: string, ttlMs: number): Promise { + const existing = this.locks.get(key); + + // Check if lock exists and is not expired + if (existing && existing.expiresAt > Date.now()) { + return null; + } + + // Clean up expired lock if it exists + if (existing) { + clearTimeout(existing.timeout); + this.locks.delete(key); + } + + const token = randomUUID(); + const expiresAt = Date.now() + ttlMs; + + // Set up automatic cleanup on expiration + const timeout = setTimeout(() => { + const lock = this.locks.get(key); + if (lock && lock.token === token) { + this.locks.delete(key); + } + }, ttlMs); + + this.locks.set(key, { token, expiresAt, timeout }); + + return token; + } + + async release(key: string, token: string): Promise { + const lock = this.locks.get(key); + + if (!lock || lock.token !== token) { + return false; + } + + clearTimeout(lock.timeout); + this.locks.delete(key); + + return true; + } + + async withLock( + key: string, + fn: () => Promise, + options?: MutexLockOptions + ): Promise { + const { ttlMs = 30000, waitMs = 5000, retryIntervalMs = 100 } = options ?? {}; + const deadline = Date.now() + waitMs; + let token: string | null = null; + + // Try to acquire lock with retries + while (Date.now() < deadline) { + token = await this.tryAcquire(key, ttlMs); + if (token) break; + await sleep(retryIntervalMs); + } + + if (!token) { + throw new MutexAcquireError(key, waitMs); + } + + try { + return await fn(); + } finally { + await this.release(key, token); + } + } + + /** + * Check if a lock is currently held. + * Useful for testing and debugging. + * + * @param key - The lock key to check + * @returns true if the lock is held and not expired + */ + isLocked(key: string): boolean { + const lock = this.locks.get(key); + return !!lock && lock.expiresAt > Date.now(); + } + + /** + * Get the number of active locks. + * Useful for testing and debugging. + */ + get size(): number { + return this.locks.size; + } + + /** + * Clear all locks. Useful for test cleanup. + */ + clear(): void { + for (const lock of this.locks.values()) { + clearTimeout(lock.timeout); + } + this.locks.clear(); + } +} + +function sleep(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); +} diff --git a/src/drivers/mongoose-mutex.ts b/src/drivers/mongoose-mutex.ts new file mode 100644 index 0000000..a3e85c0 --- /dev/null +++ b/src/drivers/mongoose-mutex.ts @@ -0,0 +1,219 @@ +import { randomUUID } from 'crypto'; +import { Schema, model, Model, Document, Connection } from 'mongoose'; +import type { DistributedMutex, MutexLockOptions } from '../interfaces/mutex.ts'; +import { MutexAcquireError } from '../interfaces/mutex.ts'; + +/** + * MongoDB document interface for mutex locks. + */ +export interface IMutexLockDocument extends Document { + _id: string; // Using key as _id for uniqueness + token: string; + expiresAt: Date; +} + +/** + * Mongoose schema for mutex locks. + */ +export const MutexLockSchema = new Schema( + { + _id: { type: String, required: true }, // Lock key + token: { type: String, required: true }, + expiresAt: { type: Date, required: true, index: true }, + }, + { + collection: 'mutex_locks', + timestamps: false, + } +); + +// TTL index to automatically remove expired locks +MutexLockSchema.index({ expiresAt: 1 }, { expireAfterSeconds: 0 }); + +/** + * Mongoose/MongoDB-based distributed mutex implementation. + * + * Uses a dedicated `mutex_locks` collection with atomic findOneAndUpdate + * for lock acquisition. MongoDB's TTL index automatically removes expired locks. + * + * This implementation is suitable for distributed locking across multiple + * processes or servers connecting to the same MongoDB database. + * + * @example + * ```typescript + * import mongoose from 'mongoose'; + * import { MongooseMutex } from 'adapter-queue/mongoose-mutex'; + * + * await mongoose.connect('mongodb://localhost/myapp'); + * + * const mutex = new MongooseMutex(); + * + * await mutex.withLock('critical-section', async () => { + * // Only one process can execute this at a time + * await performCriticalOperation(); + * }); + * ``` + */ +export class MongooseMutex implements DistributedMutex { + private model: Model; + + /** + * Create a new Mongoose mutex. + * + * @param options - Configuration options + * @param options.model - Custom Mongoose model for locks (uses default if not provided) + * @param options.connection - Mongoose connection to use (uses default if not provided) + * @param options.collectionName - Name of the collection (default: 'mutex_locks') + */ + constructor(options?: { + model?: Model; + connection?: Connection; + collectionName?: string; + }) { + if (options?.model) { + this.model = options.model; + } else { + this.model = createMutexModel(options?.collectionName, options?.connection); + } + } + + async tryAcquire(key: string, ttlMs: number): Promise { + const token = randomUUID(); + const expiresAt = new Date(Date.now() + ttlMs); + const now = new Date(); + + try { + // Use findOneAndUpdate with upsert to atomically: + // 1. Create the lock if it doesn't exist + // 2. Update if it exists but is expired + // 3. Fail if it exists and is not expired + const result = await this.model.findOneAndUpdate( + { + _id: key, + $or: [ + { expiresAt: { $lt: now } }, // Lock expired + ], + }, + { + $set: { + token, + expiresAt, + }, + }, + { + upsert: false, // Don't create if not matched + new: true, + } + ); + + if (result) { + // Updated an expired lock + return token; + } + + // Try to insert new lock (handles case where lock doesn't exist) + try { + await this.model.create({ + _id: key, + token, + expiresAt, + }); + return token; + } catch (err: any) { + // Duplicate key error means lock was created by another process + if (err.code === 11000) { + return null; + } + throw err; + } + } catch (err) { + // Log but don't throw for expected race conditions + throw err; + } + } + + async release(key: string, token: string): Promise { + const result = await this.model.deleteOne({ + _id: key, + token: token, + }); + + return result.deletedCount > 0; + } + + async withLock( + key: string, + fn: () => Promise, + options?: MutexLockOptions + ): Promise { + const { ttlMs = 30000, waitMs = 5000, retryIntervalMs = 100 } = options ?? {}; + const deadline = Date.now() + waitMs; + let token: string | null = null; + + // Try to acquire lock with retries + while (Date.now() < deadline) { + token = await this.tryAcquire(key, ttlMs); + if (token) break; + await sleep(retryIntervalMs); + } + + if (!token) { + throw new MutexAcquireError(key, waitMs); + } + + try { + return await fn(); + } finally { + await this.release(key, token); + } + } + + /** + * Clean up all expired locks. + * Note: MongoDB TTL index handles this automatically, but this can be + * called for immediate cleanup if needed. + */ + async cleanup(): Promise { + const result = await this.model.deleteMany({ + expiresAt: { $lt: new Date() }, + }); + + return result.deletedCount; + } +} + +/** + * Create a Mongoose model for mutex locks. + * + * @param collectionName - Name of the collection (default: 'mutex_locks') + * @param connection - Mongoose connection to use (uses default if not provided) + */ +export function createMutexModel( + collectionName?: string, + connection?: Connection +): Model { + const modelName = 'MutexLock'; + + // Check if model already exists on the connection + try { + if (connection) { + return connection.model(modelName); + } + return model(modelName); + } catch { + // Model doesn't exist, create it + const schema = MutexLockSchema.clone(); + if (collectionName) { + schema.set('collection', collectionName); + } + + if (connection) { + return connection.model(modelName, schema); + } + return model(modelName, schema); + } +} + +function sleep(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); +} diff --git a/src/drivers/redis-mutex.ts b/src/drivers/redis-mutex.ts new file mode 100644 index 0000000..865fd0e --- /dev/null +++ b/src/drivers/redis-mutex.ts @@ -0,0 +1,131 @@ +import { randomUUID } from 'crypto'; +import type { DistributedMutex, MutexLockOptions } from '../interfaces/mutex.ts'; +import { MutexAcquireError } from '../interfaces/mutex.ts'; +import type { RedisClient } from './redis.ts'; + +/** + * Extended Redis client interface for mutex operations. + * Adds the SET command with NX and PX options needed for distributed locking. + */ +export interface RedisMutexClient extends Pick { + /** + * SET command with options for atomic lock acquisition. + * @param key - The key to set + * @param value - The value to set + * @param options - NX (only set if not exists), PX (expire in milliseconds) + * @returns 'OK' if set, null if NX condition failed + */ + set( + key: string, + value: string, + options?: { NX?: boolean; PX?: number } + ): Promise; +} + +/** + * Redis-based distributed mutex implementation. + * + * Uses the Redis SET command with NX (not exists) and PX (expire) options + * for atomic lock acquisition. Lock release uses a Lua script to ensure + * we only delete the key if we own it (token matches). + * + * This implementation follows the Redis distributed lock pattern described at: + * https://redis.io/docs/manual/patterns/distributed-locks/ + * + * @example + * ```typescript + * import { createClient } from 'redis'; + * import { RedisMutex } from 'adapter-queue/redis-mutex'; + * + * const redis = createClient(); + * await redis.connect(); + * + * const mutex = new RedisMutex(redis, { prefix: 'myapp:lock:' }); + * + * await mutex.withLock('critical-section', async () => { + * // Only one process can execute this at a time + * await performCriticalOperation(); + * }); + * ``` + */ +export class RedisMutex implements DistributedMutex { + private prefix: string; + + /** + * Create a new Redis mutex. + * + * @param redis - Redis client instance (must support set with NX/PX and eval) + * @param options - Configuration options + * @param options.prefix - Key prefix for lock keys (default: 'mutex:') + */ + constructor( + private redis: RedisMutexClient, + options?: { prefix?: string } + ) { + this.prefix = options?.prefix ?? 'mutex:'; + } + + async tryAcquire(key: string, ttlMs: number): Promise { + const token = randomUUID(); + const lockKey = `${this.prefix}${key}`; + + const result = await this.redis.set(lockKey, token, { NX: true, PX: ttlMs }); + + return result === 'OK' ? token : null; + } + + async release(key: string, token: string): Promise { + const lockKey = `${this.prefix}${key}`; + + // Lua script to atomically check token and delete + // This prevents race conditions where: + // 1. Lock expires + // 2. Another process acquires the lock + // 3. We try to release and accidentally delete the new lock + const script = ` + if redis.call("GET", KEYS[1]) == ARGV[1] then + return redis.call("DEL", KEYS[1]) + else + return 0 + end + `; + + const result = await this.redis.eval(script, { + keys: [lockKey], + arguments: [token], + }); + + return result === 1; + } + + async withLock( + key: string, + fn: () => Promise, + options?: MutexLockOptions + ): Promise { + const { ttlMs = 30000, waitMs = 5000, retryIntervalMs = 100 } = options ?? {}; + const deadline = Date.now() + waitMs; + let token: string | null = null; + + // Try to acquire lock with retries + while (Date.now() < deadline) { + token = await this.tryAcquire(key, ttlMs); + if (token) break; + await sleep(retryIntervalMs); + } + + if (!token) { + throw new MutexAcquireError(key, waitMs); + } + + try { + return await fn(); + } finally { + await this.release(key, token); + } + } +} + +function sleep(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); +} diff --git a/src/drivers/sqlite-mutex.ts b/src/drivers/sqlite-mutex.ts new file mode 100644 index 0000000..e126a9b --- /dev/null +++ b/src/drivers/sqlite-mutex.ts @@ -0,0 +1,155 @@ +import { randomUUID } from 'crypto'; +import type { DistributedMutex, MutexLockOptions } from '../interfaces/mutex.ts'; +import { MutexAcquireError } from '../interfaces/mutex.ts'; +import type { SQLiteDatabase } from './sqlite.ts'; + +/** + * SQLite-based distributed mutex implementation. + * + * Uses a dedicated `mutex_locks` table with a unique constraint on the key column. + * Lock acquisition uses INSERT with conflict handling, and expired locks are + * automatically cleaned up before acquisition attempts. + * + * This implementation is suitable for distributed locking when multiple processes + * share the same SQLite database file (e.g., on a shared filesystem). + * + * @example + * ```typescript + * import Database from 'better-sqlite3'; + * import { SQLiteMutex } from 'adapter-queue/sqlite-mutex'; + * + * const db = new Database('app.db'); + * const mutex = new SQLiteMutex(db); + * + * await mutex.withLock('critical-section', async () => { + * // Only one process can execute this at a time + * await performCriticalOperation(); + * }); + * ``` + */ +export class SQLiteMutex implements DistributedMutex { + private tableName: string; + private initialized = false; + + /** + * Create a new SQLite mutex. + * + * @param db - SQLite database instance + * @param options - Configuration options + * @param options.tableName - Name of the locks table (default: 'mutex_locks') + */ + constructor( + private db: SQLiteDatabase, + options?: { tableName?: string } + ) { + this.tableName = options?.tableName ?? 'mutex_locks'; + } + + private ensureInitialized(): void { + if (this.initialized) return; + + // Create locks table if it doesn't exist + this.db.exec(` + CREATE TABLE IF NOT EXISTS ${this.tableName} ( + key TEXT PRIMARY KEY, + token TEXT NOT NULL, + expires_at INTEGER NOT NULL + ) + `); + + this.initialized = true; + } + + async tryAcquire(key: string, ttlMs: number): Promise { + this.ensureInitialized(); + + const token = randomUUID(); + const expiresAt = Date.now() + ttlMs; + const now = Date.now(); + + // Delete expired locks first + const deleteStmt = this.db.prepare( + `DELETE FROM ${this.tableName} WHERE expires_at < ?` + ); + deleteStmt.run(now); + + // Try to insert new lock + // SQLite's INSERT OR IGNORE won't tell us if the insert failed due to conflict, + // so we use a different approach: try insert, catch constraint error + try { + const insertStmt = this.db.prepare( + `INSERT INTO ${this.tableName} (key, token, expires_at) VALUES (?, ?, ?)` + ); + insertStmt.run(key, token, expiresAt); + return token; + } catch (err: any) { + // Check if it's a unique constraint violation + if ( + err.code === 'SQLITE_CONSTRAINT' || + err.code === 'SQLITE_CONSTRAINT_PRIMARYKEY' || + (err.message && err.message.includes('UNIQUE constraint failed')) + ) { + return null; // Lock already held + } + throw err; + } + } + + async release(key: string, token: string): Promise { + this.ensureInitialized(); + + const stmt = this.db.prepare( + `DELETE FROM ${this.tableName} WHERE key = ? AND token = ?` + ); + const result = stmt.run(key, token); + + return result.changes > 0; + } + + async withLock( + key: string, + fn: () => Promise, + options?: MutexLockOptions + ): Promise { + const { ttlMs = 30000, waitMs = 5000, retryIntervalMs = 100 } = options ?? {}; + const deadline = Date.now() + waitMs; + let token: string | null = null; + + // Try to acquire lock with retries + while (Date.now() < deadline) { + token = await this.tryAcquire(key, ttlMs); + if (token) break; + await sleep(retryIntervalMs); + } + + if (!token) { + throw new MutexAcquireError(key, waitMs); + } + + try { + return await fn(); + } finally { + await this.release(key, token); + } + } + + /** + * Clean up all expired locks. + * This is called automatically during tryAcquire, but can be called + * manually for maintenance purposes. + */ + async cleanup(): Promise { + this.ensureInitialized(); + + const stmt = this.db.prepare( + `DELETE FROM ${this.tableName} WHERE expires_at < ?` + ); + const result = stmt.run(Date.now()); + + return result.changes; + } +} + +function sleep(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); +} diff --git a/src/interfaces/mutex.ts b/src/interfaces/mutex.ts new file mode 100644 index 0000000..b01b4b4 --- /dev/null +++ b/src/interfaces/mutex.ts @@ -0,0 +1,123 @@ +/** + * Options for the withLock method. + */ +export interface MutexLockOptions { + /** + * Time-to-live for the lock in milliseconds. + * Lock auto-expires after this time to prevent deadlocks. + * @default 30000 (30 seconds) + */ + ttlMs?: number; + + /** + * Maximum time to wait for lock acquisition in milliseconds. + * If the lock cannot be acquired within this time, an error is thrown. + * @default 5000 (5 seconds) + */ + waitMs?: number; + + /** + * Interval between lock acquisition attempts in milliseconds. + * @default 100 + */ + retryIntervalMs?: number; +} + +/** + * A distributed mutex interface for coordinating access across multiple processes. + * + * Each driver that supports distributed locking should implement this interface. + * The mutex uses a token-based approach where: + * - `tryAcquire` returns a unique token if the lock is acquired + * - `release` requires the token to prevent releasing locks held by other processes + * - Locks have a TTL to prevent deadlocks from crashed processes + * + * @example + * ```typescript + * const mutex = new RedisMutex(redisClient); + * + * // Simple usage with withLock + * await mutex.withLock('my-resource', async () => { + * // Critical section - only one process can execute this at a time + * await doSomethingExclusive(); + * }); + * + * // Manual lock management + * const token = await mutex.tryAcquire('my-resource', 30000); + * if (token) { + * try { + * await doSomethingExclusive(); + * } finally { + * await mutex.release('my-resource', token); + * } + * } + * ``` + */ +export interface DistributedMutex { + /** + * Attempt to acquire a named lock. + * + * This is a non-blocking operation that returns immediately. + * If the lock is already held by another process, returns null. + * + * @param key - Unique identifier for the lock + * @param ttlMs - Lock auto-expires after this time in milliseconds (prevents deadlocks) + * @returns A unique token if lock was acquired, null if lock is held by another process + */ + tryAcquire(key: string, ttlMs: number): Promise; + + /** + * Release a lock using the token from tryAcquire. + * + * Only releases the lock if the token matches the one used to acquire it. + * This prevents accidentally releasing locks held by other processes. + * + * @param key - The lock key to release + * @param token - The token returned from tryAcquire + * @returns true if the lock was released, false if token didn't match or lock expired + */ + release(key: string, token: string): Promise; + + /** + * Execute a function while holding a lock. + * + * This is the recommended way to use the mutex as it handles: + * - Automatic lock acquisition with retries + * - Automatic lock release (even on errors) + * - Timeout handling + * + * @param key - Unique identifier for the lock + * @param fn - The function to execute while holding the lock + * @param options - Lock options (ttl, wait time, retry interval) + * @returns The return value of the function + * @throws Error if lock cannot be acquired within the wait time + * + * @example + * ```typescript + * const result = await mutex.withLock( + * 'process-orders', + * async () => { + * const orders = await getUnprocessedOrders(); + * await processOrders(orders); + * return orders.length; + * }, + * { ttlMs: 60000, waitMs: 10000 } + * ); + * ``` + */ + withLock( + key: string, + fn: () => Promise, + options?: MutexLockOptions + ): Promise; +} + +/** + * Error thrown when a lock cannot be acquired within the specified wait time. + */ +export class MutexAcquireError extends Error { + constructor(key: string, waitMs: number) { + super(`Failed to acquire lock '${key}' within ${waitMs}ms`); + this.name = 'MutexAcquireError'; + } +} diff --git a/tests/mutex/mutex.test.ts b/tests/mutex/mutex.test.ts new file mode 100644 index 0000000..986ffb5 --- /dev/null +++ b/tests/mutex/mutex.test.ts @@ -0,0 +1,351 @@ +import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'; +import { InMemoryMutex } from '../../src/drivers/memory-mutex.ts'; +import { FileMutex } from '../../src/drivers/file-mutex.ts'; +import { MutexAcquireError } from '../../src/interfaces/mutex.ts'; +import type { DistributedMutex } from '../../src/interfaces/mutex.ts'; +import { promises as fs } from 'fs'; +import path from 'path'; +import os from 'os'; + +describe('DistributedMutex Interface', () => { + // Test all mutex implementations that don't require external services + const implementations: Array<{ + name: string; + create: () => DistributedMutex; + cleanup?: () => Promise; + }> = [ + { + name: 'InMemoryMutex', + create: () => new InMemoryMutex(), + cleanup: async () => {}, + }, + { + name: 'FileMutex', + create: () => { + const lockDir = path.join(os.tmpdir(), `mutex-test-${Date.now()}-${Math.random().toString(36).slice(2)}`); + return new FileMutex({ lockDir }); + }, + cleanup: async () => { + // Clean up test directories + const tmpDir = os.tmpdir(); + const entries = await fs.readdir(tmpDir); + for (const entry of entries) { + if (entry.startsWith('mutex-test-')) { + try { + await fs.rm(path.join(tmpDir, entry), { recursive: true }); + } catch { + // Ignore cleanup errors + } + } + } + }, + }, + ]; + + for (const impl of implementations) { + describe(impl.name, () => { + let mutex: DistributedMutex; + + beforeEach(() => { + mutex = impl.create(); + }); + + afterEach(async () => { + if ('clear' in mutex && typeof mutex.clear === 'function') { + (mutex as any).clear(); + } + if (impl.cleanup) { + await impl.cleanup(); + } + }); + + describe('tryAcquire', () => { + it('should acquire a lock and return a token', async () => { + const token = await mutex.tryAcquire('test-key', 5000); + expect(token).toBeTruthy(); + expect(typeof token).toBe('string'); + }); + + it('should return null if lock is already held', async () => { + const token1 = await mutex.tryAcquire('test-key', 5000); + expect(token1).toBeTruthy(); + + const token2 = await mutex.tryAcquire('test-key', 5000); + expect(token2).toBeNull(); + }); + + it('should allow acquiring different keys', async () => { + const token1 = await mutex.tryAcquire('key-1', 5000); + const token2 = await mutex.tryAcquire('key-2', 5000); + + expect(token1).toBeTruthy(); + expect(token2).toBeTruthy(); + expect(token1).not.toBe(token2); + }); + + it('should allow re-acquiring after lock expires', async () => { + const token1 = await mutex.tryAcquire('test-key', 50); // 50ms TTL + expect(token1).toBeTruthy(); + + // Wait for lock to expire + await sleep(100); + + const token2 = await mutex.tryAcquire('test-key', 5000); + expect(token2).toBeTruthy(); + expect(token2).not.toBe(token1); + }); + }); + + describe('release', () => { + it('should release a lock with correct token', async () => { + const token = await mutex.tryAcquire('test-key', 5000); + expect(token).toBeTruthy(); + + const released = await mutex.release('test-key', token!); + expect(released).toBe(true); + + // Should be able to acquire again + const newToken = await mutex.tryAcquire('test-key', 5000); + expect(newToken).toBeTruthy(); + }); + + it('should not release a lock with wrong token', async () => { + const token = await mutex.tryAcquire('test-key', 5000); + expect(token).toBeTruthy(); + + const released = await mutex.release('test-key', 'wrong-token'); + expect(released).toBe(false); + + // Lock should still be held + const newToken = await mutex.tryAcquire('test-key', 5000); + expect(newToken).toBeNull(); + }); + + it('should return false for non-existent lock', async () => { + const released = await mutex.release('non-existent', 'any-token'); + expect(released).toBe(false); + }); + }); + + describe('withLock', () => { + it('should execute function while holding lock', async () => { + let executed = false; + + await mutex.withLock('test-key', async () => { + executed = true; + // Lock should be held during execution + const token = await mutex.tryAcquire('test-key', 5000); + expect(token).toBeNull(); + }); + + expect(executed).toBe(true); + + // Lock should be released after + const token = await mutex.tryAcquire('test-key', 5000); + expect(token).toBeTruthy(); + }); + + it('should return the function result', async () => { + const result = await mutex.withLock('test-key', async () => { + return 'hello world'; + }); + + expect(result).toBe('hello world'); + }); + + it('should release lock even if function throws', async () => { + const error = new Error('Test error'); + + await expect( + mutex.withLock('test-key', async () => { + throw error; + }) + ).rejects.toThrow('Test error'); + + // Lock should be released + const token = await mutex.tryAcquire('test-key', 5000); + expect(token).toBeTruthy(); + }); + + it('should throw MutexAcquireError if lock cannot be acquired', async () => { + // Acquire lock first + const token = await mutex.tryAcquire('test-key', 10000); + expect(token).toBeTruthy(); + + // Try to acquire with withLock with short wait time + await expect( + mutex.withLock( + 'test-key', + async () => 'result', + { waitMs: 100, retryIntervalMs: 20 } + ) + ).rejects.toThrow(MutexAcquireError); + }); + + it('should wait and retry to acquire lock', async () => { + // Acquire lock with short TTL + const token = await mutex.tryAcquire('test-key', 100); + expect(token).toBeTruthy(); + + // withLock should wait for lock to expire and then acquire + const startTime = Date.now(); + const result = await mutex.withLock( + 'test-key', + async () => 'acquired', + { waitMs: 500, retryIntervalMs: 20 } + ); + const elapsed = Date.now() - startTime; + + expect(result).toBe('acquired'); + expect(elapsed).toBeGreaterThanOrEqual(80); // Should have waited for TTL + }); + }); + + describe('concurrent access', () => { + it('should serialize concurrent withLock calls', async () => { + const order: number[] = []; + + const task = async (id: number, delay: number) => { + await mutex.withLock('test-key', async () => { + order.push(id); + await sleep(delay); + }); + }; + + // Start multiple tasks concurrently + await Promise.all([ + task(1, 50), + task(2, 50), + task(3, 50), + ]); + + // All tasks should complete + expect(order).toHaveLength(3); + // Order should contain all three IDs (order depends on timing) + expect(order.sort()).toEqual([1, 2, 3]); + }); + + it('should handle rapid acquire/release cycles', async () => { + const iterations = 20; + let successCount = 0; + + for (let i = 0; i < iterations; i++) { + const token = await mutex.tryAcquire('test-key', 5000); + if (token) { + successCount++; + await mutex.release('test-key', token); + } + } + + expect(successCount).toBe(iterations); + }); + }); + }); + } +}); + +describe('InMemoryMutex specific', () => { + let mutex: InMemoryMutex; + + beforeEach(() => { + mutex = new InMemoryMutex(); + }); + + afterEach(() => { + mutex.clear(); + }); + + it('should report isLocked correctly', async () => { + expect(mutex.isLocked('test-key')).toBe(false); + + const token = await mutex.tryAcquire('test-key', 5000); + expect(mutex.isLocked('test-key')).toBe(true); + + await mutex.release('test-key', token!); + expect(mutex.isLocked('test-key')).toBe(false); + }); + + it('should report correct size', async () => { + expect(mutex.size).toBe(0); + + await mutex.tryAcquire('key-1', 5000); + expect(mutex.size).toBe(1); + + await mutex.tryAcquire('key-2', 5000); + expect(mutex.size).toBe(2); + }); + + it('should clear all locks', async () => { + await mutex.tryAcquire('key-1', 5000); + await mutex.tryAcquire('key-2', 5000); + expect(mutex.size).toBe(2); + + mutex.clear(); + expect(mutex.size).toBe(0); + + // Should be able to acquire again + const token = await mutex.tryAcquire('key-1', 5000); + expect(token).toBeTruthy(); + }); + + it('should auto-cleanup expired locks', async () => { + await mutex.tryAcquire('test-key', 50); // 50ms TTL + expect(mutex.isLocked('test-key')).toBe(true); + + // Wait for auto-cleanup + await sleep(100); + + expect(mutex.isLocked('test-key')).toBe(false); + expect(mutex.size).toBe(0); + }); +}); + +describe('FileMutex specific', () => { + let mutex: FileMutex; + let lockDir: string; + + beforeEach(async () => { + lockDir = path.join(os.tmpdir(), `mutex-test-${Date.now()}-${Math.random().toString(36).slice(2)}`); + mutex = new FileMutex({ lockDir }); + }); + + afterEach(async () => { + try { + await fs.rm(lockDir, { recursive: true }); + } catch { + // Ignore cleanup errors + } + }); + + it('should create lock directory if it does not exist', async () => { + const token = await mutex.tryAcquire('test-key', 5000); + expect(token).toBeTruthy(); + + const stat = await fs.stat(lockDir); + expect(stat.isDirectory()).toBe(true); + }); + + it('should cleanup expired locks', async () => { + // Create an expired lock + await mutex.tryAcquire('expired-key', 50); + await sleep(100); + + const cleaned = await mutex.cleanup(); + expect(cleaned).toBe(1); + }); + + it('should sanitize lock keys for filesystem', async () => { + // Key with special characters + const token = await mutex.tryAcquire('path/to/resource:with:colons', 5000); + expect(token).toBeTruthy(); + + // Should create a valid file + const files = await fs.readdir(lockDir); + expect(files.length).toBe(1); + expect(files[0]).toMatch(/\.lock$/); + }); +}); + +function sleep(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); +} From 23e0fc74b363b6d98a8680978a528fce21cf6e3b Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 1 Dec 2025 01:49:41 +0000 Subject: [PATCH 2/2] test: improve mutex tests to follow all-queues pattern - Rewrite tests to use testcontainers for all drivers (Redis, MongoDB) - Add SQLiteMutex tests using in-memory database - Test all 5 mutex implementations with comprehensive test suite - Fix duplicate Mongoose index warning on expiresAt field --- src/drivers/mongoose-mutex.ts | 2 +- tests/mutex/all-mutex.test.ts | 410 ++++++++++++++++++++++++++++++++++ tests/mutex/mutex.test.ts | 351 ----------------------------- 3 files changed, 411 insertions(+), 352 deletions(-) create mode 100644 tests/mutex/all-mutex.test.ts delete mode 100644 tests/mutex/mutex.test.ts diff --git a/src/drivers/mongoose-mutex.ts b/src/drivers/mongoose-mutex.ts index a3e85c0..2cd8939 100644 --- a/src/drivers/mongoose-mutex.ts +++ b/src/drivers/mongoose-mutex.ts @@ -19,7 +19,7 @@ export const MutexLockSchema = new Schema( { _id: { type: String, required: true }, // Lock key token: { type: String, required: true }, - expiresAt: { type: Date, required: true, index: true }, + expiresAt: { type: Date, required: true }, }, { collection: 'mutex_locks', diff --git a/tests/mutex/all-mutex.test.ts b/tests/mutex/all-mutex.test.ts new file mode 100644 index 0000000..c383e03 --- /dev/null +++ b/tests/mutex/all-mutex.test.ts @@ -0,0 +1,410 @@ +import { + describe, + it, + expect, + beforeAll, + afterAll, + beforeEach, + afterEach, +} from "vitest"; +import { promises as fs } from "fs"; +import path from "path"; +import os from "os"; +import { GenericContainer, type StartedTestContainer } from "testcontainers"; +import { createClient, type RedisClientType } from "redis"; +import Database from "better-sqlite3"; +import mongoose from "mongoose"; +import type { DistributedMutex } from "../../src/interfaces/mutex.ts"; +import { MutexAcquireError } from "../../src/interfaces/mutex.ts"; +import { InMemoryMutex } from "../../src/drivers/memory-mutex.ts"; +import { FileMutex } from "../../src/drivers/file-mutex.ts"; +import { SQLiteMutex } from "../../src/drivers/sqlite-mutex.ts"; +import { RedisMutex } from "../../src/drivers/redis-mutex.ts"; +import { MongooseMutex } from "../../src/drivers/mongoose-mutex.ts"; + +interface MutexDriverConfig { + name: string; + beforeAll?: () => Promise; + afterAll?: () => Promise; + createMutex: () => Promise; + cleanup?: (mutex: DistributedMutex) => Promise; +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +// Driver configurations +const drivers: Array<() => Promise | MutexDriverConfig> = [ + // InMemoryMutex - no external dependencies + () => ({ + name: "InMemoryMutex", + createMutex: async () => new InMemoryMutex(), + cleanup: async (mutex) => { + if ("clear" in mutex && typeof mutex.clear === "function") { + (mutex as InMemoryMutex).clear(); + } + }, + }), + + // FileMutex - file system based + () => { + let baseTempDir: string; + + return { + name: "FileMutex", + beforeAll: async () => { + baseTempDir = await fs.mkdtemp( + path.join(os.tmpdir(), "mutex-file-tests-") + ); + }, + afterAll: async () => { + try { + await fs.rm(baseTempDir, { recursive: true, force: true }); + } catch (error) { + console.warn("Failed to clean up FileMutex test directory:", error); + } + }, + createMutex: async () => { + const lockDir = path.join( + baseTempDir, + `mutex-${Date.now()}-${Math.random().toString(36).substring(2, 9)}` + ); + return new FileMutex({ lockDir }); + }, + cleanup: async (mutex) => { + if ("cleanup" in mutex && typeof mutex.cleanup === "function") { + await (mutex as FileMutex).cleanup(); + } + }, + }; + }, + + // SQLiteMutex - in-memory SQLite + () => ({ + name: "SQLiteMutex", + createMutex: async () => { + const db = new Database(":memory:"); + return new SQLiteMutex(db); + }, + cleanup: async (mutex) => { + if ("cleanup" in mutex && typeof mutex.cleanup === "function") { + await (mutex as SQLiteMutex).cleanup(); + } + }, + }), + + // RedisMutex - requires Redis container + async () => { + let redisContainer: StartedTestContainer; + let redisClient: RedisClientType; + + return { + name: "RedisMutex", + beforeAll: async () => { + redisContainer = await new GenericContainer("valkey/valkey:7-alpine") + .withExposedPorts(6379) + .start(); + + const redisPort = redisContainer.getMappedPort(6379); + const redisHost = redisContainer.getHost(); + + redisClient = createClient({ + url: `redis://${redisHost}:${redisPort}`, + }); + + await redisClient.connect(); + }, + afterAll: async () => { + if (redisClient) { + await redisClient.quit(); + } + if (redisContainer) { + await redisContainer.stop(); + } + }, + createMutex: async () => { + return new RedisMutex(redisClient, { prefix: "test:mutex:" }); + }, + cleanup: async () => { + // Clear all mutex keys between tests + const keys = await redisClient.keys("test:mutex:*"); + if (keys.length > 0) { + await redisClient.del(keys); + } + }, + }; + }, + + // MongooseMutex - requires MongoDB container + async () => { + let mongoContainer: StartedTestContainer; + let mongoUri: string; + + return { + name: "MongooseMutex", + beforeAll: async () => { + mongoContainer = await new GenericContainer("mongo:7") + .withExposedPorts(27017) + .withStartupTimeout(60000) + .start(); + + const mongoPort = mongoContainer.getMappedPort(27017); + const mongoHost = mongoContainer.getHost(); + mongoUri = `mongodb://${mongoHost}:${mongoPort}/test-mutex-db`; + + await mongoose.connect(mongoUri); + }, + afterAll: async () => { + if (mongoose.connection.readyState === 1) { + await mongoose.disconnect(); + } + if (mongoContainer) { + await mongoContainer.stop(); + } + }, + createMutex: async () => { + return new MongooseMutex(); + }, + cleanup: async () => { + // Clean up MongoDB collections between tests + if (mongoose.connection.readyState === 1 && mongoose.connection.db) { + try { + await mongoose.connection.db.collection("mutex_locks").deleteMany({}); + } catch { + // Collection might not exist yet + } + } + }, + }; + }, +]; + +async function resolveAllConfigs(): Promise> { + console.time("resolveAllConfigs"); + const resolvedConfigs = []; + for (const driver of drivers) { + const res = driver(); + const config = res instanceof Promise ? await res : res; + resolvedConfigs.push(config); + } + console.timeEnd("resolveAllConfigs"); + return resolvedConfigs; +} + +const configs = await resolveAllConfigs(); + +describe.concurrent.each(configs)("Mutex Driver: $name", (config) => { + console.time(`${config.name} setup`); + + let mutex: DistributedMutex; + + beforeAll(async () => { + console.time(`${config.name} beforeAll`); + if (config.beforeAll) { + await config.beforeAll(); + } + console.timeEnd(`${config.name} beforeAll`); + }, 60_000); + + afterAll(async () => { + if (config.afterAll) { + await config.afterAll(); + } + }); + + beforeEach(async () => { + mutex = await config.createMutex(); + }); + + afterEach(async () => { + if (config.cleanup) { + await config.cleanup(mutex); + } + }); + + describe.sequential(`${config.name}`, () => { + describe("tryAcquire", () => { + it(`${config.name} should acquire a lock and return a token`, async () => { + const token = await mutex.tryAcquire("test-key", 5000); + expect(token).toBeTruthy(); + expect(typeof token).toBe("string"); + }); + + it(`${config.name} should return null if lock is already held`, async () => { + const token1 = await mutex.tryAcquire("test-key", 5000); + expect(token1).toBeTruthy(); + + const token2 = await mutex.tryAcquire("test-key", 5000); + expect(token2).toBeNull(); + }); + + it(`${config.name} should allow acquiring different keys`, async () => { + const token1 = await mutex.tryAcquire("key-1", 5000); + const token2 = await mutex.tryAcquire("key-2", 5000); + + expect(token1).toBeTruthy(); + expect(token2).toBeTruthy(); + expect(token1).not.toBe(token2); + }); + + it(`${config.name} should allow re-acquiring after lock expires`, async () => { + const token1 = await mutex.tryAcquire("test-key", 100); // 100ms TTL + expect(token1).toBeTruthy(); + + // Wait for lock to expire + await sleep(200); + + const token2 = await mutex.tryAcquire("test-key", 5000); + expect(token2).toBeTruthy(); + expect(token2).not.toBe(token1); + }); + }); + + describe("release", () => { + it(`${config.name} should release a lock with correct token`, async () => { + const token = await mutex.tryAcquire("test-key", 5000); + expect(token).toBeTruthy(); + + const released = await mutex.release("test-key", token!); + expect(released).toBe(true); + + // Should be able to acquire again + const newToken = await mutex.tryAcquire("test-key", 5000); + expect(newToken).toBeTruthy(); + }); + + it(`${config.name} should not release a lock with wrong token`, async () => { + const token = await mutex.tryAcquire("test-key", 5000); + expect(token).toBeTruthy(); + + const released = await mutex.release("test-key", "wrong-token"); + expect(released).toBe(false); + + // Lock should still be held + const newToken = await mutex.tryAcquire("test-key", 5000); + expect(newToken).toBeNull(); + }); + + it(`${config.name} should return false for non-existent lock`, async () => { + const released = await mutex.release("non-existent", "any-token"); + expect(released).toBe(false); + }); + }); + + describe("withLock", () => { + it(`${config.name} should execute function while holding lock`, async () => { + let executed = false; + + await mutex.withLock("test-key", async () => { + executed = true; + // Lock should be held during execution + const token = await mutex.tryAcquire("test-key", 5000); + expect(token).toBeNull(); + }); + + expect(executed).toBe(true); + + // Lock should be released after + const token = await mutex.tryAcquire("test-key", 5000); + expect(token).toBeTruthy(); + }); + + it(`${config.name} should return the function result`, async () => { + const result = await mutex.withLock("test-key", async () => { + return "hello world"; + }); + + expect(result).toBe("hello world"); + }); + + it(`${config.name} should release lock even if function throws`, async () => { + const error = new Error("Test error"); + + await expect( + mutex.withLock("test-key", async () => { + throw error; + }) + ).rejects.toThrow("Test error"); + + // Lock should be released + const token = await mutex.tryAcquire("test-key", 5000); + expect(token).toBeTruthy(); + }); + + it(`${config.name} should throw MutexAcquireError if lock cannot be acquired`, async () => { + // Acquire lock first + const token = await mutex.tryAcquire("test-key", 10000); + expect(token).toBeTruthy(); + + // Try to acquire with withLock with short wait time + await expect( + mutex.withLock("test-key", async () => "result", { + waitMs: 100, + retryIntervalMs: 20, + }) + ).rejects.toThrow(MutexAcquireError); + }); + + it(`${config.name} should wait and retry to acquire lock`, async () => { + // Acquire lock with short TTL + const token = await mutex.tryAcquire("test-key", 150); + expect(token).toBeTruthy(); + + // withLock should wait for lock to expire and then acquire + const startTime = Date.now(); + const result = await mutex.withLock( + "test-key", + async () => "acquired", + { waitMs: 500, retryIntervalMs: 30 } + ); + const elapsed = Date.now() - startTime; + + expect(result).toBe("acquired"); + expect(elapsed).toBeGreaterThanOrEqual(100); // Should have waited for TTL + }); + }); + + describe("concurrent access", () => { + it(`${config.name} should serialize concurrent withLock calls`, async () => { + const order: number[] = []; + + const task = async (id: number, delay: number) => { + await mutex.withLock( + "test-key", + async () => { + order.push(id); + await sleep(delay); + }, + { waitMs: 5000 } + ); + }; + + // Start multiple tasks concurrently + await Promise.all([task(1, 50), task(2, 50), task(3, 50)]); + + // All tasks should complete + expect(order).toHaveLength(3); + // Order should contain all three IDs (order depends on timing) + expect(order.sort()).toEqual([1, 2, 3]); + }); + + it(`${config.name} should handle rapid acquire/release cycles`, async () => { + const iterations = 20; + let successCount = 0; + + for (let i = 0; i < iterations; i++) { + const token = await mutex.tryAcquire("test-key", 5000); + if (token) { + successCount++; + await mutex.release("test-key", token); + } + } + + expect(successCount).toBe(iterations); + }); + }); + }); + + console.timeEnd(`${config.name} setup`); +}); diff --git a/tests/mutex/mutex.test.ts b/tests/mutex/mutex.test.ts deleted file mode 100644 index 986ffb5..0000000 --- a/tests/mutex/mutex.test.ts +++ /dev/null @@ -1,351 +0,0 @@ -import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'; -import { InMemoryMutex } from '../../src/drivers/memory-mutex.ts'; -import { FileMutex } from '../../src/drivers/file-mutex.ts'; -import { MutexAcquireError } from '../../src/interfaces/mutex.ts'; -import type { DistributedMutex } from '../../src/interfaces/mutex.ts'; -import { promises as fs } from 'fs'; -import path from 'path'; -import os from 'os'; - -describe('DistributedMutex Interface', () => { - // Test all mutex implementations that don't require external services - const implementations: Array<{ - name: string; - create: () => DistributedMutex; - cleanup?: () => Promise; - }> = [ - { - name: 'InMemoryMutex', - create: () => new InMemoryMutex(), - cleanup: async () => {}, - }, - { - name: 'FileMutex', - create: () => { - const lockDir = path.join(os.tmpdir(), `mutex-test-${Date.now()}-${Math.random().toString(36).slice(2)}`); - return new FileMutex({ lockDir }); - }, - cleanup: async () => { - // Clean up test directories - const tmpDir = os.tmpdir(); - const entries = await fs.readdir(tmpDir); - for (const entry of entries) { - if (entry.startsWith('mutex-test-')) { - try { - await fs.rm(path.join(tmpDir, entry), { recursive: true }); - } catch { - // Ignore cleanup errors - } - } - } - }, - }, - ]; - - for (const impl of implementations) { - describe(impl.name, () => { - let mutex: DistributedMutex; - - beforeEach(() => { - mutex = impl.create(); - }); - - afterEach(async () => { - if ('clear' in mutex && typeof mutex.clear === 'function') { - (mutex as any).clear(); - } - if (impl.cleanup) { - await impl.cleanup(); - } - }); - - describe('tryAcquire', () => { - it('should acquire a lock and return a token', async () => { - const token = await mutex.tryAcquire('test-key', 5000); - expect(token).toBeTruthy(); - expect(typeof token).toBe('string'); - }); - - it('should return null if lock is already held', async () => { - const token1 = await mutex.tryAcquire('test-key', 5000); - expect(token1).toBeTruthy(); - - const token2 = await mutex.tryAcquire('test-key', 5000); - expect(token2).toBeNull(); - }); - - it('should allow acquiring different keys', async () => { - const token1 = await mutex.tryAcquire('key-1', 5000); - const token2 = await mutex.tryAcquire('key-2', 5000); - - expect(token1).toBeTruthy(); - expect(token2).toBeTruthy(); - expect(token1).not.toBe(token2); - }); - - it('should allow re-acquiring after lock expires', async () => { - const token1 = await mutex.tryAcquire('test-key', 50); // 50ms TTL - expect(token1).toBeTruthy(); - - // Wait for lock to expire - await sleep(100); - - const token2 = await mutex.tryAcquire('test-key', 5000); - expect(token2).toBeTruthy(); - expect(token2).not.toBe(token1); - }); - }); - - describe('release', () => { - it('should release a lock with correct token', async () => { - const token = await mutex.tryAcquire('test-key', 5000); - expect(token).toBeTruthy(); - - const released = await mutex.release('test-key', token!); - expect(released).toBe(true); - - // Should be able to acquire again - const newToken = await mutex.tryAcquire('test-key', 5000); - expect(newToken).toBeTruthy(); - }); - - it('should not release a lock with wrong token', async () => { - const token = await mutex.tryAcquire('test-key', 5000); - expect(token).toBeTruthy(); - - const released = await mutex.release('test-key', 'wrong-token'); - expect(released).toBe(false); - - // Lock should still be held - const newToken = await mutex.tryAcquire('test-key', 5000); - expect(newToken).toBeNull(); - }); - - it('should return false for non-existent lock', async () => { - const released = await mutex.release('non-existent', 'any-token'); - expect(released).toBe(false); - }); - }); - - describe('withLock', () => { - it('should execute function while holding lock', async () => { - let executed = false; - - await mutex.withLock('test-key', async () => { - executed = true; - // Lock should be held during execution - const token = await mutex.tryAcquire('test-key', 5000); - expect(token).toBeNull(); - }); - - expect(executed).toBe(true); - - // Lock should be released after - const token = await mutex.tryAcquire('test-key', 5000); - expect(token).toBeTruthy(); - }); - - it('should return the function result', async () => { - const result = await mutex.withLock('test-key', async () => { - return 'hello world'; - }); - - expect(result).toBe('hello world'); - }); - - it('should release lock even if function throws', async () => { - const error = new Error('Test error'); - - await expect( - mutex.withLock('test-key', async () => { - throw error; - }) - ).rejects.toThrow('Test error'); - - // Lock should be released - const token = await mutex.tryAcquire('test-key', 5000); - expect(token).toBeTruthy(); - }); - - it('should throw MutexAcquireError if lock cannot be acquired', async () => { - // Acquire lock first - const token = await mutex.tryAcquire('test-key', 10000); - expect(token).toBeTruthy(); - - // Try to acquire with withLock with short wait time - await expect( - mutex.withLock( - 'test-key', - async () => 'result', - { waitMs: 100, retryIntervalMs: 20 } - ) - ).rejects.toThrow(MutexAcquireError); - }); - - it('should wait and retry to acquire lock', async () => { - // Acquire lock with short TTL - const token = await mutex.tryAcquire('test-key', 100); - expect(token).toBeTruthy(); - - // withLock should wait for lock to expire and then acquire - const startTime = Date.now(); - const result = await mutex.withLock( - 'test-key', - async () => 'acquired', - { waitMs: 500, retryIntervalMs: 20 } - ); - const elapsed = Date.now() - startTime; - - expect(result).toBe('acquired'); - expect(elapsed).toBeGreaterThanOrEqual(80); // Should have waited for TTL - }); - }); - - describe('concurrent access', () => { - it('should serialize concurrent withLock calls', async () => { - const order: number[] = []; - - const task = async (id: number, delay: number) => { - await mutex.withLock('test-key', async () => { - order.push(id); - await sleep(delay); - }); - }; - - // Start multiple tasks concurrently - await Promise.all([ - task(1, 50), - task(2, 50), - task(3, 50), - ]); - - // All tasks should complete - expect(order).toHaveLength(3); - // Order should contain all three IDs (order depends on timing) - expect(order.sort()).toEqual([1, 2, 3]); - }); - - it('should handle rapid acquire/release cycles', async () => { - const iterations = 20; - let successCount = 0; - - for (let i = 0; i < iterations; i++) { - const token = await mutex.tryAcquire('test-key', 5000); - if (token) { - successCount++; - await mutex.release('test-key', token); - } - } - - expect(successCount).toBe(iterations); - }); - }); - }); - } -}); - -describe('InMemoryMutex specific', () => { - let mutex: InMemoryMutex; - - beforeEach(() => { - mutex = new InMemoryMutex(); - }); - - afterEach(() => { - mutex.clear(); - }); - - it('should report isLocked correctly', async () => { - expect(mutex.isLocked('test-key')).toBe(false); - - const token = await mutex.tryAcquire('test-key', 5000); - expect(mutex.isLocked('test-key')).toBe(true); - - await mutex.release('test-key', token!); - expect(mutex.isLocked('test-key')).toBe(false); - }); - - it('should report correct size', async () => { - expect(mutex.size).toBe(0); - - await mutex.tryAcquire('key-1', 5000); - expect(mutex.size).toBe(1); - - await mutex.tryAcquire('key-2', 5000); - expect(mutex.size).toBe(2); - }); - - it('should clear all locks', async () => { - await mutex.tryAcquire('key-1', 5000); - await mutex.tryAcquire('key-2', 5000); - expect(mutex.size).toBe(2); - - mutex.clear(); - expect(mutex.size).toBe(0); - - // Should be able to acquire again - const token = await mutex.tryAcquire('key-1', 5000); - expect(token).toBeTruthy(); - }); - - it('should auto-cleanup expired locks', async () => { - await mutex.tryAcquire('test-key', 50); // 50ms TTL - expect(mutex.isLocked('test-key')).toBe(true); - - // Wait for auto-cleanup - await sleep(100); - - expect(mutex.isLocked('test-key')).toBe(false); - expect(mutex.size).toBe(0); - }); -}); - -describe('FileMutex specific', () => { - let mutex: FileMutex; - let lockDir: string; - - beforeEach(async () => { - lockDir = path.join(os.tmpdir(), `mutex-test-${Date.now()}-${Math.random().toString(36).slice(2)}`); - mutex = new FileMutex({ lockDir }); - }); - - afterEach(async () => { - try { - await fs.rm(lockDir, { recursive: true }); - } catch { - // Ignore cleanup errors - } - }); - - it('should create lock directory if it does not exist', async () => { - const token = await mutex.tryAcquire('test-key', 5000); - expect(token).toBeTruthy(); - - const stat = await fs.stat(lockDir); - expect(stat.isDirectory()).toBe(true); - }); - - it('should cleanup expired locks', async () => { - // Create an expired lock - await mutex.tryAcquire('expired-key', 50); - await sleep(100); - - const cleaned = await mutex.cleanup(); - expect(cleaned).toBe(1); - }); - - it('should sanitize lock keys for filesystem', async () => { - // Key with special characters - const token = await mutex.tryAcquire('path/to/resource:with:colons', 5000); - expect(token).toBeTruthy(); - - // Should create a valid file - const files = await fs.readdir(lockDir); - expect(files.length).toBe(1); - expect(files[0]).toMatch(/\.lock$/); - }); -}); - -function sleep(ms: number): Promise { - return new Promise(resolve => setTimeout(resolve, ms)); -}