diff --git a/package.json b/package.json index 3d87bbb..0d649f8 100644 --- a/package.json +++ b/package.json @@ -91,6 +91,30 @@ "./plugins/ecs-protection-manager": { "import": "./dist/src/plugins/ecs-protection-manager.js", "types": "./dist/src/plugins/ecs-protection-manager.d.ts" + }, + "./interfaces/mutex": { + "import": "./dist/src/interfaces/mutex.js", + "types": "./dist/src/interfaces/mutex.d.ts" + }, + "./mutex/redis": { + "import": "./dist/src/drivers/redis-mutex.js", + "types": "./dist/src/drivers/redis-mutex.d.ts" + }, + "./mutex/sqlite": { + "import": "./dist/src/drivers/sqlite-mutex.js", + "types": "./dist/src/drivers/sqlite-mutex.d.ts" + }, + "./mutex/mongoose": { + "import": "./dist/src/drivers/mongoose-mutex.js", + "types": "./dist/src/drivers/mongoose-mutex.d.ts" + }, + "./mutex/file": { + "import": "./dist/src/drivers/file-mutex.js", + "types": "./dist/src/drivers/file-mutex.d.ts" + }, + "./mutex/memory": { + "import": "./dist/src/drivers/memory-mutex.js", + "types": "./dist/src/drivers/memory-mutex.d.ts" } }, "peerDependencies": { diff --git a/src/drivers/file-mutex.ts b/src/drivers/file-mutex.ts new file mode 100644 index 0000000..9e94f0a --- /dev/null +++ b/src/drivers/file-mutex.ts @@ -0,0 +1,219 @@ +import { randomUUID } from 'crypto'; +import { promises as fs } from 'fs'; +import { open } from 'fs/promises'; +import path from 'path'; +import type { DistributedMutex, MutexLockOptions } from '../interfaces/mutex.ts'; +import { MutexAcquireError } from '../interfaces/mutex.ts'; + +interface LockFileContent { + token: string; + expiresAt: number; + pid: number; +} + +/** + * File-based distributed mutex implementation. + * + * Uses filesystem-level locking with exclusive file creation (O_EXCL flag) + * to ensure atomic lock acquisition. Lock files contain a token and expiration + * time to handle stale locks from crashed processes. + * + * This implementation is suitable for distributed locking when multiple processes + * have access to the same filesystem (e.g., same machine or shared network filesystem). + * + * @example + * ```typescript + * import { FileMutex } from 'adapter-queue/file-mutex'; + * + * const mutex = new FileMutex({ lockDir: '/tmp/myapp/locks' }); + * + * await mutex.withLock('critical-section', async () => { + * // Only one process can execute this at a time + * await performCriticalOperation(); + * }); + * ``` + */ +export class FileMutex implements DistributedMutex { + private lockDir: string; + private initialized = false; + + /** + * Create a new file-based mutex. + * + * @param options - Configuration options + * @param options.lockDir - Directory to store lock files (default: '/tmp/mutex-locks') + */ + constructor(options?: { lockDir?: string }) { + this.lockDir = options?.lockDir ?? '/tmp/mutex-locks'; + } + + private async ensureInitialized(): Promise { + if (this.initialized) return; + + await fs.mkdir(this.lockDir, { recursive: true, mode: 0o755 }); + this.initialized = true; + } + + private getLockPath(key: string): string { + // Sanitize key to be filesystem-safe + const safeKey = key.replace(/[^a-zA-Z0-9_-]/g, '_'); + return path.join(this.lockDir, `${safeKey}.lock`); + } + + async tryAcquire(key: string, ttlMs: number): Promise { + await this.ensureInitialized(); + + const lockPath = this.getLockPath(key); + const token = randomUUID(); + const expiresAt = Date.now() + ttlMs; + + // First, check if an existing lock has expired + try { + const existingContent = await fs.readFile(lockPath, 'utf-8'); + const existingLock: LockFileContent = JSON.parse(existingContent); + + if (existingLock.expiresAt < Date.now()) { + // Lock is expired, try to remove it + try { + await fs.unlink(lockPath); + } catch { + // Another process may have removed it, continue trying to acquire + } + } else { + // Lock is still valid + return null; + } + } catch (err: any) { + if (err.code !== 'ENOENT') { + // Unexpected error reading lock file + throw err; + } + // Lock file doesn't exist, continue to acquire + } + + // Try to create lock file exclusively + let handle; + try { + handle = await open(lockPath, 'wx'); + + const lockContent: LockFileContent = { + token, + expiresAt, + pid: process.pid, + }; + + await handle.writeFile(JSON.stringify(lockContent), 'utf-8'); + await handle.close(); + + return token; + } catch (err: any) { + if (handle) { + try { + await handle.close(); + } catch { + // Ignore close errors + } + } + + if (err.code === 'EEXIST') { + // Lock file was created by another process + return null; + } + throw err; + } + } + + async release(key: string, token: string): Promise { + await this.ensureInitialized(); + + const lockPath = this.getLockPath(key); + + try { + const content = await fs.readFile(lockPath, 'utf-8'); + const lock: LockFileContent = JSON.parse(content); + + // Only delete if token matches (we own the lock) + if (lock.token === token) { + await fs.unlink(lockPath); + return true; + } + + return false; + } catch (err: any) { + if (err.code === 'ENOENT') { + // Lock file doesn't exist (may have expired and been cleaned up) + return false; + } + throw err; + } + } + + async withLock( + key: string, + fn: () => Promise, + options?: MutexLockOptions + ): Promise { + const { ttlMs = 30000, waitMs = 5000, retryIntervalMs = 100 } = options ?? {}; + const deadline = Date.now() + waitMs; + let token: string | null = null; + + // Try to acquire lock with retries + while (Date.now() < deadline) { + token = await this.tryAcquire(key, ttlMs); + if (token) break; + await sleep(retryIntervalMs); + } + + if (!token) { + throw new MutexAcquireError(key, waitMs); + } + + try { + return await fn(); + } finally { + await this.release(key, token); + } + } + + /** + * Clean up all expired locks in the lock directory. + * This can be called periodically for maintenance. + * + * @returns Number of expired locks cleaned up + */ + async cleanup(): Promise { + await this.ensureInitialized(); + + let cleaned = 0; + const now = Date.now(); + + try { + const files = await fs.readdir(this.lockDir); + + for (const file of files) { + if (!file.endsWith('.lock')) continue; + + const lockPath = path.join(this.lockDir, file); + try { + const content = await fs.readFile(lockPath, 'utf-8'); + const lock: LockFileContent = JSON.parse(content); + + if (lock.expiresAt < now) { + await fs.unlink(lockPath); + cleaned++; + } + } catch { + // Ignore errors for individual files + } + } + } catch { + // Ignore errors listing directory + } + + return cleaned; + } +} + +function sleep(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); +} diff --git a/src/drivers/memory-mutex.ts b/src/drivers/memory-mutex.ts new file mode 100644 index 0000000..e503011 --- /dev/null +++ b/src/drivers/memory-mutex.ts @@ -0,0 +1,138 @@ +import { randomUUID } from 'crypto'; +import type { DistributedMutex, MutexLockOptions } from '../interfaces/mutex.ts'; +import { MutexAcquireError } from '../interfaces/mutex.ts'; + +interface LockEntry { + token: string; + expiresAt: number; + timeout: NodeJS.Timeout; +} + +/** + * In-memory mutex implementation for testing and single-process applications. + * + * This mutex only works within a single process and is not suitable for + * distributed locking across multiple processes or servers. Use it for: + * - Unit testing + * - Development environments + * - Single-process applications that need mutex semantics + * + * @example + * ```typescript + * import { InMemoryMutex } from 'adapter-queue/memory-mutex'; + * + * const mutex = new InMemoryMutex(); + * + * await mutex.withLock('critical-section', async () => { + * // Only one async operation can execute this at a time + * await performCriticalOperation(); + * }); + * ``` + */ +export class InMemoryMutex implements DistributedMutex { + private locks = new Map(); + + async tryAcquire(key: string, ttlMs: number): Promise { + const existing = this.locks.get(key); + + // Check if lock exists and is not expired + if (existing && existing.expiresAt > Date.now()) { + return null; + } + + // Clean up expired lock if it exists + if (existing) { + clearTimeout(existing.timeout); + this.locks.delete(key); + } + + const token = randomUUID(); + const expiresAt = Date.now() + ttlMs; + + // Set up automatic cleanup on expiration + const timeout = setTimeout(() => { + const lock = this.locks.get(key); + if (lock && lock.token === token) { + this.locks.delete(key); + } + }, ttlMs); + + this.locks.set(key, { token, expiresAt, timeout }); + + return token; + } + + async release(key: string, token: string): Promise { + const lock = this.locks.get(key); + + if (!lock || lock.token !== token) { + return false; + } + + clearTimeout(lock.timeout); + this.locks.delete(key); + + return true; + } + + async withLock( + key: string, + fn: () => Promise, + options?: MutexLockOptions + ): Promise { + const { ttlMs = 30000, waitMs = 5000, retryIntervalMs = 100 } = options ?? {}; + const deadline = Date.now() + waitMs; + let token: string | null = null; + + // Try to acquire lock with retries + while (Date.now() < deadline) { + token = await this.tryAcquire(key, ttlMs); + if (token) break; + await sleep(retryIntervalMs); + } + + if (!token) { + throw new MutexAcquireError(key, waitMs); + } + + try { + return await fn(); + } finally { + await this.release(key, token); + } + } + + /** + * Check if a lock is currently held. + * Useful for testing and debugging. + * + * @param key - The lock key to check + * @returns true if the lock is held and not expired + */ + isLocked(key: string): boolean { + const lock = this.locks.get(key); + return !!lock && lock.expiresAt > Date.now(); + } + + /** + * Get the number of active locks. + * Useful for testing and debugging. + */ + get size(): number { + return this.locks.size; + } + + /** + * Clear all locks. Useful for test cleanup. + */ + clear(): void { + for (const lock of this.locks.values()) { + clearTimeout(lock.timeout); + } + this.locks.clear(); + } +} + +function sleep(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); +} diff --git a/src/drivers/mongoose-mutex.ts b/src/drivers/mongoose-mutex.ts new file mode 100644 index 0000000..2cd8939 --- /dev/null +++ b/src/drivers/mongoose-mutex.ts @@ -0,0 +1,219 @@ +import { randomUUID } from 'crypto'; +import { Schema, model, Model, Document, Connection } from 'mongoose'; +import type { DistributedMutex, MutexLockOptions } from '../interfaces/mutex.ts'; +import { MutexAcquireError } from '../interfaces/mutex.ts'; + +/** + * MongoDB document interface for mutex locks. + */ +export interface IMutexLockDocument extends Document { + _id: string; // Using key as _id for uniqueness + token: string; + expiresAt: Date; +} + +/** + * Mongoose schema for mutex locks. + */ +export const MutexLockSchema = new Schema( + { + _id: { type: String, required: true }, // Lock key + token: { type: String, required: true }, + expiresAt: { type: Date, required: true }, + }, + { + collection: 'mutex_locks', + timestamps: false, + } +); + +// TTL index to automatically remove expired locks +MutexLockSchema.index({ expiresAt: 1 }, { expireAfterSeconds: 0 }); + +/** + * Mongoose/MongoDB-based distributed mutex implementation. + * + * Uses a dedicated `mutex_locks` collection with atomic findOneAndUpdate + * for lock acquisition. MongoDB's TTL index automatically removes expired locks. + * + * This implementation is suitable for distributed locking across multiple + * processes or servers connecting to the same MongoDB database. + * + * @example + * ```typescript + * import mongoose from 'mongoose'; + * import { MongooseMutex } from 'adapter-queue/mongoose-mutex'; + * + * await mongoose.connect('mongodb://localhost/myapp'); + * + * const mutex = new MongooseMutex(); + * + * await mutex.withLock('critical-section', async () => { + * // Only one process can execute this at a time + * await performCriticalOperation(); + * }); + * ``` + */ +export class MongooseMutex implements DistributedMutex { + private model: Model; + + /** + * Create a new Mongoose mutex. + * + * @param options - Configuration options + * @param options.model - Custom Mongoose model for locks (uses default if not provided) + * @param options.connection - Mongoose connection to use (uses default if not provided) + * @param options.collectionName - Name of the collection (default: 'mutex_locks') + */ + constructor(options?: { + model?: Model; + connection?: Connection; + collectionName?: string; + }) { + if (options?.model) { + this.model = options.model; + } else { + this.model = createMutexModel(options?.collectionName, options?.connection); + } + } + + async tryAcquire(key: string, ttlMs: number): Promise { + const token = randomUUID(); + const expiresAt = new Date(Date.now() + ttlMs); + const now = new Date(); + + try { + // Use findOneAndUpdate with upsert to atomically: + // 1. Create the lock if it doesn't exist + // 2. Update if it exists but is expired + // 3. Fail if it exists and is not expired + const result = await this.model.findOneAndUpdate( + { + _id: key, + $or: [ + { expiresAt: { $lt: now } }, // Lock expired + ], + }, + { + $set: { + token, + expiresAt, + }, + }, + { + upsert: false, // Don't create if not matched + new: true, + } + ); + + if (result) { + // Updated an expired lock + return token; + } + + // Try to insert new lock (handles case where lock doesn't exist) + try { + await this.model.create({ + _id: key, + token, + expiresAt, + }); + return token; + } catch (err: any) { + // Duplicate key error means lock was created by another process + if (err.code === 11000) { + return null; + } + throw err; + } + } catch (err) { + // Log but don't throw for expected race conditions + throw err; + } + } + + async release(key: string, token: string): Promise { + const result = await this.model.deleteOne({ + _id: key, + token: token, + }); + + return result.deletedCount > 0; + } + + async withLock( + key: string, + fn: () => Promise, + options?: MutexLockOptions + ): Promise { + const { ttlMs = 30000, waitMs = 5000, retryIntervalMs = 100 } = options ?? {}; + const deadline = Date.now() + waitMs; + let token: string | null = null; + + // Try to acquire lock with retries + while (Date.now() < deadline) { + token = await this.tryAcquire(key, ttlMs); + if (token) break; + await sleep(retryIntervalMs); + } + + if (!token) { + throw new MutexAcquireError(key, waitMs); + } + + try { + return await fn(); + } finally { + await this.release(key, token); + } + } + + /** + * Clean up all expired locks. + * Note: MongoDB TTL index handles this automatically, but this can be + * called for immediate cleanup if needed. + */ + async cleanup(): Promise { + const result = await this.model.deleteMany({ + expiresAt: { $lt: new Date() }, + }); + + return result.deletedCount; + } +} + +/** + * Create a Mongoose model for mutex locks. + * + * @param collectionName - Name of the collection (default: 'mutex_locks') + * @param connection - Mongoose connection to use (uses default if not provided) + */ +export function createMutexModel( + collectionName?: string, + connection?: Connection +): Model { + const modelName = 'MutexLock'; + + // Check if model already exists on the connection + try { + if (connection) { + return connection.model(modelName); + } + return model(modelName); + } catch { + // Model doesn't exist, create it + const schema = MutexLockSchema.clone(); + if (collectionName) { + schema.set('collection', collectionName); + } + + if (connection) { + return connection.model(modelName, schema); + } + return model(modelName, schema); + } +} + +function sleep(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); +} diff --git a/src/drivers/redis-mutex.ts b/src/drivers/redis-mutex.ts new file mode 100644 index 0000000..865fd0e --- /dev/null +++ b/src/drivers/redis-mutex.ts @@ -0,0 +1,131 @@ +import { randomUUID } from 'crypto'; +import type { DistributedMutex, MutexLockOptions } from '../interfaces/mutex.ts'; +import { MutexAcquireError } from '../interfaces/mutex.ts'; +import type { RedisClient } from './redis.ts'; + +/** + * Extended Redis client interface for mutex operations. + * Adds the SET command with NX and PX options needed for distributed locking. + */ +export interface RedisMutexClient extends Pick { + /** + * SET command with options for atomic lock acquisition. + * @param key - The key to set + * @param value - The value to set + * @param options - NX (only set if not exists), PX (expire in milliseconds) + * @returns 'OK' if set, null if NX condition failed + */ + set( + key: string, + value: string, + options?: { NX?: boolean; PX?: number } + ): Promise; +} + +/** + * Redis-based distributed mutex implementation. + * + * Uses the Redis SET command with NX (not exists) and PX (expire) options + * for atomic lock acquisition. Lock release uses a Lua script to ensure + * we only delete the key if we own it (token matches). + * + * This implementation follows the Redis distributed lock pattern described at: + * https://redis.io/docs/manual/patterns/distributed-locks/ + * + * @example + * ```typescript + * import { createClient } from 'redis'; + * import { RedisMutex } from 'adapter-queue/redis-mutex'; + * + * const redis = createClient(); + * await redis.connect(); + * + * const mutex = new RedisMutex(redis, { prefix: 'myapp:lock:' }); + * + * await mutex.withLock('critical-section', async () => { + * // Only one process can execute this at a time + * await performCriticalOperation(); + * }); + * ``` + */ +export class RedisMutex implements DistributedMutex { + private prefix: string; + + /** + * Create a new Redis mutex. + * + * @param redis - Redis client instance (must support set with NX/PX and eval) + * @param options - Configuration options + * @param options.prefix - Key prefix for lock keys (default: 'mutex:') + */ + constructor( + private redis: RedisMutexClient, + options?: { prefix?: string } + ) { + this.prefix = options?.prefix ?? 'mutex:'; + } + + async tryAcquire(key: string, ttlMs: number): Promise { + const token = randomUUID(); + const lockKey = `${this.prefix}${key}`; + + const result = await this.redis.set(lockKey, token, { NX: true, PX: ttlMs }); + + return result === 'OK' ? token : null; + } + + async release(key: string, token: string): Promise { + const lockKey = `${this.prefix}${key}`; + + // Lua script to atomically check token and delete + // This prevents race conditions where: + // 1. Lock expires + // 2. Another process acquires the lock + // 3. We try to release and accidentally delete the new lock + const script = ` + if redis.call("GET", KEYS[1]) == ARGV[1] then + return redis.call("DEL", KEYS[1]) + else + return 0 + end + `; + + const result = await this.redis.eval(script, { + keys: [lockKey], + arguments: [token], + }); + + return result === 1; + } + + async withLock( + key: string, + fn: () => Promise, + options?: MutexLockOptions + ): Promise { + const { ttlMs = 30000, waitMs = 5000, retryIntervalMs = 100 } = options ?? {}; + const deadline = Date.now() + waitMs; + let token: string | null = null; + + // Try to acquire lock with retries + while (Date.now() < deadline) { + token = await this.tryAcquire(key, ttlMs); + if (token) break; + await sleep(retryIntervalMs); + } + + if (!token) { + throw new MutexAcquireError(key, waitMs); + } + + try { + return await fn(); + } finally { + await this.release(key, token); + } + } +} + +function sleep(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); +} diff --git a/src/drivers/sqlite-mutex.ts b/src/drivers/sqlite-mutex.ts new file mode 100644 index 0000000..e126a9b --- /dev/null +++ b/src/drivers/sqlite-mutex.ts @@ -0,0 +1,155 @@ +import { randomUUID } from 'crypto'; +import type { DistributedMutex, MutexLockOptions } from '../interfaces/mutex.ts'; +import { MutexAcquireError } from '../interfaces/mutex.ts'; +import type { SQLiteDatabase } from './sqlite.ts'; + +/** + * SQLite-based distributed mutex implementation. + * + * Uses a dedicated `mutex_locks` table with a unique constraint on the key column. + * Lock acquisition uses INSERT with conflict handling, and expired locks are + * automatically cleaned up before acquisition attempts. + * + * This implementation is suitable for distributed locking when multiple processes + * share the same SQLite database file (e.g., on a shared filesystem). + * + * @example + * ```typescript + * import Database from 'better-sqlite3'; + * import { SQLiteMutex } from 'adapter-queue/sqlite-mutex'; + * + * const db = new Database('app.db'); + * const mutex = new SQLiteMutex(db); + * + * await mutex.withLock('critical-section', async () => { + * // Only one process can execute this at a time + * await performCriticalOperation(); + * }); + * ``` + */ +export class SQLiteMutex implements DistributedMutex { + private tableName: string; + private initialized = false; + + /** + * Create a new SQLite mutex. + * + * @param db - SQLite database instance + * @param options - Configuration options + * @param options.tableName - Name of the locks table (default: 'mutex_locks') + */ + constructor( + private db: SQLiteDatabase, + options?: { tableName?: string } + ) { + this.tableName = options?.tableName ?? 'mutex_locks'; + } + + private ensureInitialized(): void { + if (this.initialized) return; + + // Create locks table if it doesn't exist + this.db.exec(` + CREATE TABLE IF NOT EXISTS ${this.tableName} ( + key TEXT PRIMARY KEY, + token TEXT NOT NULL, + expires_at INTEGER NOT NULL + ) + `); + + this.initialized = true; + } + + async tryAcquire(key: string, ttlMs: number): Promise { + this.ensureInitialized(); + + const token = randomUUID(); + const expiresAt = Date.now() + ttlMs; + const now = Date.now(); + + // Delete expired locks first + const deleteStmt = this.db.prepare( + `DELETE FROM ${this.tableName} WHERE expires_at < ?` + ); + deleteStmt.run(now); + + // Try to insert new lock + // SQLite's INSERT OR IGNORE won't tell us if the insert failed due to conflict, + // so we use a different approach: try insert, catch constraint error + try { + const insertStmt = this.db.prepare( + `INSERT INTO ${this.tableName} (key, token, expires_at) VALUES (?, ?, ?)` + ); + insertStmt.run(key, token, expiresAt); + return token; + } catch (err: any) { + // Check if it's a unique constraint violation + if ( + err.code === 'SQLITE_CONSTRAINT' || + err.code === 'SQLITE_CONSTRAINT_PRIMARYKEY' || + (err.message && err.message.includes('UNIQUE constraint failed')) + ) { + return null; // Lock already held + } + throw err; + } + } + + async release(key: string, token: string): Promise { + this.ensureInitialized(); + + const stmt = this.db.prepare( + `DELETE FROM ${this.tableName} WHERE key = ? AND token = ?` + ); + const result = stmt.run(key, token); + + return result.changes > 0; + } + + async withLock( + key: string, + fn: () => Promise, + options?: MutexLockOptions + ): Promise { + const { ttlMs = 30000, waitMs = 5000, retryIntervalMs = 100 } = options ?? {}; + const deadline = Date.now() + waitMs; + let token: string | null = null; + + // Try to acquire lock with retries + while (Date.now() < deadline) { + token = await this.tryAcquire(key, ttlMs); + if (token) break; + await sleep(retryIntervalMs); + } + + if (!token) { + throw new MutexAcquireError(key, waitMs); + } + + try { + return await fn(); + } finally { + await this.release(key, token); + } + } + + /** + * Clean up all expired locks. + * This is called automatically during tryAcquire, but can be called + * manually for maintenance purposes. + */ + async cleanup(): Promise { + this.ensureInitialized(); + + const stmt = this.db.prepare( + `DELETE FROM ${this.tableName} WHERE expires_at < ?` + ); + const result = stmt.run(Date.now()); + + return result.changes; + } +} + +function sleep(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); +} diff --git a/src/interfaces/mutex.ts b/src/interfaces/mutex.ts new file mode 100644 index 0000000..b01b4b4 --- /dev/null +++ b/src/interfaces/mutex.ts @@ -0,0 +1,123 @@ +/** + * Options for the withLock method. + */ +export interface MutexLockOptions { + /** + * Time-to-live for the lock in milliseconds. + * Lock auto-expires after this time to prevent deadlocks. + * @default 30000 (30 seconds) + */ + ttlMs?: number; + + /** + * Maximum time to wait for lock acquisition in milliseconds. + * If the lock cannot be acquired within this time, an error is thrown. + * @default 5000 (5 seconds) + */ + waitMs?: number; + + /** + * Interval between lock acquisition attempts in milliseconds. + * @default 100 + */ + retryIntervalMs?: number; +} + +/** + * A distributed mutex interface for coordinating access across multiple processes. + * + * Each driver that supports distributed locking should implement this interface. + * The mutex uses a token-based approach where: + * - `tryAcquire` returns a unique token if the lock is acquired + * - `release` requires the token to prevent releasing locks held by other processes + * - Locks have a TTL to prevent deadlocks from crashed processes + * + * @example + * ```typescript + * const mutex = new RedisMutex(redisClient); + * + * // Simple usage with withLock + * await mutex.withLock('my-resource', async () => { + * // Critical section - only one process can execute this at a time + * await doSomethingExclusive(); + * }); + * + * // Manual lock management + * const token = await mutex.tryAcquire('my-resource', 30000); + * if (token) { + * try { + * await doSomethingExclusive(); + * } finally { + * await mutex.release('my-resource', token); + * } + * } + * ``` + */ +export interface DistributedMutex { + /** + * Attempt to acquire a named lock. + * + * This is a non-blocking operation that returns immediately. + * If the lock is already held by another process, returns null. + * + * @param key - Unique identifier for the lock + * @param ttlMs - Lock auto-expires after this time in milliseconds (prevents deadlocks) + * @returns A unique token if lock was acquired, null if lock is held by another process + */ + tryAcquire(key: string, ttlMs: number): Promise; + + /** + * Release a lock using the token from tryAcquire. + * + * Only releases the lock if the token matches the one used to acquire it. + * This prevents accidentally releasing locks held by other processes. + * + * @param key - The lock key to release + * @param token - The token returned from tryAcquire + * @returns true if the lock was released, false if token didn't match or lock expired + */ + release(key: string, token: string): Promise; + + /** + * Execute a function while holding a lock. + * + * This is the recommended way to use the mutex as it handles: + * - Automatic lock acquisition with retries + * - Automatic lock release (even on errors) + * - Timeout handling + * + * @param key - Unique identifier for the lock + * @param fn - The function to execute while holding the lock + * @param options - Lock options (ttl, wait time, retry interval) + * @returns The return value of the function + * @throws Error if lock cannot be acquired within the wait time + * + * @example + * ```typescript + * const result = await mutex.withLock( + * 'process-orders', + * async () => { + * const orders = await getUnprocessedOrders(); + * await processOrders(orders); + * return orders.length; + * }, + * { ttlMs: 60000, waitMs: 10000 } + * ); + * ``` + */ + withLock( + key: string, + fn: () => Promise, + options?: MutexLockOptions + ): Promise; +} + +/** + * Error thrown when a lock cannot be acquired within the specified wait time. + */ +export class MutexAcquireError extends Error { + constructor(key: string, waitMs: number) { + super(`Failed to acquire lock '${key}' within ${waitMs}ms`); + this.name = 'MutexAcquireError'; + } +} diff --git a/tests/mutex/all-mutex.test.ts b/tests/mutex/all-mutex.test.ts new file mode 100644 index 0000000..c383e03 --- /dev/null +++ b/tests/mutex/all-mutex.test.ts @@ -0,0 +1,410 @@ +import { + describe, + it, + expect, + beforeAll, + afterAll, + beforeEach, + afterEach, +} from "vitest"; +import { promises as fs } from "fs"; +import path from "path"; +import os from "os"; +import { GenericContainer, type StartedTestContainer } from "testcontainers"; +import { createClient, type RedisClientType } from "redis"; +import Database from "better-sqlite3"; +import mongoose from "mongoose"; +import type { DistributedMutex } from "../../src/interfaces/mutex.ts"; +import { MutexAcquireError } from "../../src/interfaces/mutex.ts"; +import { InMemoryMutex } from "../../src/drivers/memory-mutex.ts"; +import { FileMutex } from "../../src/drivers/file-mutex.ts"; +import { SQLiteMutex } from "../../src/drivers/sqlite-mutex.ts"; +import { RedisMutex } from "../../src/drivers/redis-mutex.ts"; +import { MongooseMutex } from "../../src/drivers/mongoose-mutex.ts"; + +interface MutexDriverConfig { + name: string; + beforeAll?: () => Promise; + afterAll?: () => Promise; + createMutex: () => Promise; + cleanup?: (mutex: DistributedMutex) => Promise; +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +// Driver configurations +const drivers: Array<() => Promise | MutexDriverConfig> = [ + // InMemoryMutex - no external dependencies + () => ({ + name: "InMemoryMutex", + createMutex: async () => new InMemoryMutex(), + cleanup: async (mutex) => { + if ("clear" in mutex && typeof mutex.clear === "function") { + (mutex as InMemoryMutex).clear(); + } + }, + }), + + // FileMutex - file system based + () => { + let baseTempDir: string; + + return { + name: "FileMutex", + beforeAll: async () => { + baseTempDir = await fs.mkdtemp( + path.join(os.tmpdir(), "mutex-file-tests-") + ); + }, + afterAll: async () => { + try { + await fs.rm(baseTempDir, { recursive: true, force: true }); + } catch (error) { + console.warn("Failed to clean up FileMutex test directory:", error); + } + }, + createMutex: async () => { + const lockDir = path.join( + baseTempDir, + `mutex-${Date.now()}-${Math.random().toString(36).substring(2, 9)}` + ); + return new FileMutex({ lockDir }); + }, + cleanup: async (mutex) => { + if ("cleanup" in mutex && typeof mutex.cleanup === "function") { + await (mutex as FileMutex).cleanup(); + } + }, + }; + }, + + // SQLiteMutex - in-memory SQLite + () => ({ + name: "SQLiteMutex", + createMutex: async () => { + const db = new Database(":memory:"); + return new SQLiteMutex(db); + }, + cleanup: async (mutex) => { + if ("cleanup" in mutex && typeof mutex.cleanup === "function") { + await (mutex as SQLiteMutex).cleanup(); + } + }, + }), + + // RedisMutex - requires Redis container + async () => { + let redisContainer: StartedTestContainer; + let redisClient: RedisClientType; + + return { + name: "RedisMutex", + beforeAll: async () => { + redisContainer = await new GenericContainer("valkey/valkey:7-alpine") + .withExposedPorts(6379) + .start(); + + const redisPort = redisContainer.getMappedPort(6379); + const redisHost = redisContainer.getHost(); + + redisClient = createClient({ + url: `redis://${redisHost}:${redisPort}`, + }); + + await redisClient.connect(); + }, + afterAll: async () => { + if (redisClient) { + await redisClient.quit(); + } + if (redisContainer) { + await redisContainer.stop(); + } + }, + createMutex: async () => { + return new RedisMutex(redisClient, { prefix: "test:mutex:" }); + }, + cleanup: async () => { + // Clear all mutex keys between tests + const keys = await redisClient.keys("test:mutex:*"); + if (keys.length > 0) { + await redisClient.del(keys); + } + }, + }; + }, + + // MongooseMutex - requires MongoDB container + async () => { + let mongoContainer: StartedTestContainer; + let mongoUri: string; + + return { + name: "MongooseMutex", + beforeAll: async () => { + mongoContainer = await new GenericContainer("mongo:7") + .withExposedPorts(27017) + .withStartupTimeout(60000) + .start(); + + const mongoPort = mongoContainer.getMappedPort(27017); + const mongoHost = mongoContainer.getHost(); + mongoUri = `mongodb://${mongoHost}:${mongoPort}/test-mutex-db`; + + await mongoose.connect(mongoUri); + }, + afterAll: async () => { + if (mongoose.connection.readyState === 1) { + await mongoose.disconnect(); + } + if (mongoContainer) { + await mongoContainer.stop(); + } + }, + createMutex: async () => { + return new MongooseMutex(); + }, + cleanup: async () => { + // Clean up MongoDB collections between tests + if (mongoose.connection.readyState === 1 && mongoose.connection.db) { + try { + await mongoose.connection.db.collection("mutex_locks").deleteMany({}); + } catch { + // Collection might not exist yet + } + } + }, + }; + }, +]; + +async function resolveAllConfigs(): Promise> { + console.time("resolveAllConfigs"); + const resolvedConfigs = []; + for (const driver of drivers) { + const res = driver(); + const config = res instanceof Promise ? await res : res; + resolvedConfigs.push(config); + } + console.timeEnd("resolveAllConfigs"); + return resolvedConfigs; +} + +const configs = await resolveAllConfigs(); + +describe.concurrent.each(configs)("Mutex Driver: $name", (config) => { + console.time(`${config.name} setup`); + + let mutex: DistributedMutex; + + beforeAll(async () => { + console.time(`${config.name} beforeAll`); + if (config.beforeAll) { + await config.beforeAll(); + } + console.timeEnd(`${config.name} beforeAll`); + }, 60_000); + + afterAll(async () => { + if (config.afterAll) { + await config.afterAll(); + } + }); + + beforeEach(async () => { + mutex = await config.createMutex(); + }); + + afterEach(async () => { + if (config.cleanup) { + await config.cleanup(mutex); + } + }); + + describe.sequential(`${config.name}`, () => { + describe("tryAcquire", () => { + it(`${config.name} should acquire a lock and return a token`, async () => { + const token = await mutex.tryAcquire("test-key", 5000); + expect(token).toBeTruthy(); + expect(typeof token).toBe("string"); + }); + + it(`${config.name} should return null if lock is already held`, async () => { + const token1 = await mutex.tryAcquire("test-key", 5000); + expect(token1).toBeTruthy(); + + const token2 = await mutex.tryAcquire("test-key", 5000); + expect(token2).toBeNull(); + }); + + it(`${config.name} should allow acquiring different keys`, async () => { + const token1 = await mutex.tryAcquire("key-1", 5000); + const token2 = await mutex.tryAcquire("key-2", 5000); + + expect(token1).toBeTruthy(); + expect(token2).toBeTruthy(); + expect(token1).not.toBe(token2); + }); + + it(`${config.name} should allow re-acquiring after lock expires`, async () => { + const token1 = await mutex.tryAcquire("test-key", 100); // 100ms TTL + expect(token1).toBeTruthy(); + + // Wait for lock to expire + await sleep(200); + + const token2 = await mutex.tryAcquire("test-key", 5000); + expect(token2).toBeTruthy(); + expect(token2).not.toBe(token1); + }); + }); + + describe("release", () => { + it(`${config.name} should release a lock with correct token`, async () => { + const token = await mutex.tryAcquire("test-key", 5000); + expect(token).toBeTruthy(); + + const released = await mutex.release("test-key", token!); + expect(released).toBe(true); + + // Should be able to acquire again + const newToken = await mutex.tryAcquire("test-key", 5000); + expect(newToken).toBeTruthy(); + }); + + it(`${config.name} should not release a lock with wrong token`, async () => { + const token = await mutex.tryAcquire("test-key", 5000); + expect(token).toBeTruthy(); + + const released = await mutex.release("test-key", "wrong-token"); + expect(released).toBe(false); + + // Lock should still be held + const newToken = await mutex.tryAcquire("test-key", 5000); + expect(newToken).toBeNull(); + }); + + it(`${config.name} should return false for non-existent lock`, async () => { + const released = await mutex.release("non-existent", "any-token"); + expect(released).toBe(false); + }); + }); + + describe("withLock", () => { + it(`${config.name} should execute function while holding lock`, async () => { + let executed = false; + + await mutex.withLock("test-key", async () => { + executed = true; + // Lock should be held during execution + const token = await mutex.tryAcquire("test-key", 5000); + expect(token).toBeNull(); + }); + + expect(executed).toBe(true); + + // Lock should be released after + const token = await mutex.tryAcquire("test-key", 5000); + expect(token).toBeTruthy(); + }); + + it(`${config.name} should return the function result`, async () => { + const result = await mutex.withLock("test-key", async () => { + return "hello world"; + }); + + expect(result).toBe("hello world"); + }); + + it(`${config.name} should release lock even if function throws`, async () => { + const error = new Error("Test error"); + + await expect( + mutex.withLock("test-key", async () => { + throw error; + }) + ).rejects.toThrow("Test error"); + + // Lock should be released + const token = await mutex.tryAcquire("test-key", 5000); + expect(token).toBeTruthy(); + }); + + it(`${config.name} should throw MutexAcquireError if lock cannot be acquired`, async () => { + // Acquire lock first + const token = await mutex.tryAcquire("test-key", 10000); + expect(token).toBeTruthy(); + + // Try to acquire with withLock with short wait time + await expect( + mutex.withLock("test-key", async () => "result", { + waitMs: 100, + retryIntervalMs: 20, + }) + ).rejects.toThrow(MutexAcquireError); + }); + + it(`${config.name} should wait and retry to acquire lock`, async () => { + // Acquire lock with short TTL + const token = await mutex.tryAcquire("test-key", 150); + expect(token).toBeTruthy(); + + // withLock should wait for lock to expire and then acquire + const startTime = Date.now(); + const result = await mutex.withLock( + "test-key", + async () => "acquired", + { waitMs: 500, retryIntervalMs: 30 } + ); + const elapsed = Date.now() - startTime; + + expect(result).toBe("acquired"); + expect(elapsed).toBeGreaterThanOrEqual(100); // Should have waited for TTL + }); + }); + + describe("concurrent access", () => { + it(`${config.name} should serialize concurrent withLock calls`, async () => { + const order: number[] = []; + + const task = async (id: number, delay: number) => { + await mutex.withLock( + "test-key", + async () => { + order.push(id); + await sleep(delay); + }, + { waitMs: 5000 } + ); + }; + + // Start multiple tasks concurrently + await Promise.all([task(1, 50), task(2, 50), task(3, 50)]); + + // All tasks should complete + expect(order).toHaveLength(3); + // Order should contain all three IDs (order depends on timing) + expect(order.sort()).toEqual([1, 2, 3]); + }); + + it(`${config.name} should handle rapid acquire/release cycles`, async () => { + const iterations = 20; + let successCount = 0; + + for (let i = 0; i < iterations; i++) { + const token = await mutex.tryAcquire("test-key", 5000); + if (token) { + successCount++; + await mutex.release("test-key", token); + } + } + + expect(successCount).toBe(iterations); + }); + }); + }); + + console.timeEnd(`${config.name} setup`); +});