Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,30 @@
"./plugins/ecs-protection-manager": {
"import": "./dist/src/plugins/ecs-protection-manager.js",
"types": "./dist/src/plugins/ecs-protection-manager.d.ts"
},
"./interfaces/mutex": {
"import": "./dist/src/interfaces/mutex.js",
"types": "./dist/src/interfaces/mutex.d.ts"
},
"./mutex/redis": {
"import": "./dist/src/drivers/redis-mutex.js",
"types": "./dist/src/drivers/redis-mutex.d.ts"
},
"./mutex/sqlite": {
"import": "./dist/src/drivers/sqlite-mutex.js",
"types": "./dist/src/drivers/sqlite-mutex.d.ts"
},
"./mutex/mongoose": {
"import": "./dist/src/drivers/mongoose-mutex.js",
"types": "./dist/src/drivers/mongoose-mutex.d.ts"
},
"./mutex/file": {
"import": "./dist/src/drivers/file-mutex.js",
"types": "./dist/src/drivers/file-mutex.d.ts"
},
"./mutex/memory": {
"import": "./dist/src/drivers/memory-mutex.js",
"types": "./dist/src/drivers/memory-mutex.d.ts"
}
},
"peerDependencies": {
Expand Down
219 changes: 219 additions & 0 deletions src/drivers/file-mutex.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
import { randomUUID } from 'crypto';
import { promises as fs } from 'fs';
import { open } from 'fs/promises';
import path from 'path';
import type { DistributedMutex, MutexLockOptions } from '../interfaces/mutex.ts';
import { MutexAcquireError } from '../interfaces/mutex.ts';

interface LockFileContent {
token: string;
expiresAt: number;
pid: number;
}

/**
* File-based distributed mutex implementation.
*
* Uses filesystem-level locking with exclusive file creation (O_EXCL flag)
* to ensure atomic lock acquisition. Lock files contain a token and expiration
* time to handle stale locks from crashed processes.
*
* This implementation is suitable for distributed locking when multiple processes
* have access to the same filesystem (e.g., same machine or shared network filesystem).
*
* @example
* ```typescript
* import { FileMutex } from 'adapter-queue/file-mutex';
*
* const mutex = new FileMutex({ lockDir: '/tmp/myapp/locks' });
*
* await mutex.withLock('critical-section', async () => {
* // Only one process can execute this at a time
* await performCriticalOperation();
* });
* ```
*/
export class FileMutex implements DistributedMutex {
private lockDir: string;
private initialized = false;

/**
* Create a new file-based mutex.
*
* @param options - Configuration options
* @param options.lockDir - Directory to store lock files (default: '/tmp/mutex-locks')
*/
constructor(options?: { lockDir?: string }) {
this.lockDir = options?.lockDir ?? '/tmp/mutex-locks';
}

private async ensureInitialized(): Promise<void> {
if (this.initialized) return;

await fs.mkdir(this.lockDir, { recursive: true, mode: 0o755 });
this.initialized = true;
}

private getLockPath(key: string): string {
// Sanitize key to be filesystem-safe
const safeKey = key.replace(/[^a-zA-Z0-9_-]/g, '_');
return path.join(this.lockDir, `${safeKey}.lock`);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Key sanitization causes different keys to collide

The getLockPath method sanitizes keys by replacing all non-alphanumeric characters (except _ and -) with underscores. This causes different logical keys to map to the same lock file, breaking mutex semantics. For example, "key:1", "key.1", "key/1", and "key_1" all become "key_1.lock". A process acquiring a lock on "resource:abc" would incorrectly block another process trying to lock "resource.abc", even though these are intended to be independent locks.

Fix in Cursor Fix in Web


async tryAcquire(key: string, ttlMs: number): Promise<string | null> {
await this.ensureInitialized();

const lockPath = this.getLockPath(key);
const token = randomUUID();
const expiresAt = Date.now() + ttlMs;

// First, check if an existing lock has expired
try {
const existingContent = await fs.readFile(lockPath, 'utf-8');
const existingLock: LockFileContent = JSON.parse(existingContent);

if (existingLock.expiresAt < Date.now()) {
// Lock is expired, try to remove it
try {
await fs.unlink(lockPath);
} catch {
// Another process may have removed it, continue trying to acquire
}
} else {
// Lock is still valid
return null;
}
} catch (err: any) {
if (err.code !== 'ENOENT') {
// Unexpected error reading lock file
throw err;
}
// Lock file doesn't exist, continue to acquire
}

// Try to create lock file exclusively
let handle;
try {
handle = await open(lockPath, 'wx');

const lockContent: LockFileContent = {
token,
expiresAt,
pid: process.pid,
};

await handle.writeFile(JSON.stringify(lockContent), 'utf-8');
await handle.close();

return token;
} catch (err: any) {
if (handle) {
try {
await handle.close();
} catch {
// Ignore close errors
}
}

if (err.code === 'EEXIST') {
// Lock file was created by another process
return null;
}
throw err;
}
}

async release(key: string, token: string): Promise<boolean> {
await this.ensureInitialized();

const lockPath = this.getLockPath(key);

try {
const content = await fs.readFile(lockPath, 'utf-8');
const lock: LockFileContent = JSON.parse(content);

// Only delete if token matches (we own the lock)
if (lock.token === token) {
await fs.unlink(lockPath);
return true;
}

return false;
} catch (err: any) {
if (err.code === 'ENOENT') {
// Lock file doesn't exist (may have expired and been cleaned up)
return false;
}
throw err;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: File mutex release has TOCTOU race condition

The release method has a time-of-check to time-of-use (TOCTOU) race condition. It first reads the lock file and checks if the token matches (line 132-136), then separately calls fs.unlink() (line 137). Between these non-atomic operations, another process could delete the expired lock and create a new one with a different token. The original process would then delete the new lock, violating mutual exclusion. The other mutex implementations (Redis, SQLite, Mongoose) correctly use atomic check-and-delete operations to prevent this exact scenario.

Fix in Cursor Fix in Web

}

async withLock<T>(
key: string,
fn: () => Promise<T>,
options?: MutexLockOptions
): Promise<T> {
const { ttlMs = 30000, waitMs = 5000, retryIntervalMs = 100 } = options ?? {};
const deadline = Date.now() + waitMs;
let token: string | null = null;

// Try to acquire lock with retries
while (Date.now() < deadline) {
token = await this.tryAcquire(key, ttlMs);
if (token) break;
await sleep(retryIntervalMs);
}

if (!token) {
throw new MutexAcquireError(key, waitMs);
}

try {
return await fn();
} finally {
await this.release(key, token);
}
}

/**
* Clean up all expired locks in the lock directory.
* This can be called periodically for maintenance.
*
* @returns Number of expired locks cleaned up
*/
async cleanup(): Promise<number> {
await this.ensureInitialized();

let cleaned = 0;
const now = Date.now();

try {
const files = await fs.readdir(this.lockDir);

for (const file of files) {
if (!file.endsWith('.lock')) continue;

const lockPath = path.join(this.lockDir, file);
try {
const content = await fs.readFile(lockPath, 'utf-8');
const lock: LockFileContent = JSON.parse(content);

if (lock.expiresAt < now) {
await fs.unlink(lockPath);
cleaned++;
}
} catch {
// Ignore errors for individual files
}
}
} catch {
// Ignore errors listing directory
}

return cleaned;
}
}

function sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
138 changes: 138 additions & 0 deletions src/drivers/memory-mutex.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import { randomUUID } from 'crypto';
import type { DistributedMutex, MutexLockOptions } from '../interfaces/mutex.ts';
import { MutexAcquireError } from '../interfaces/mutex.ts';

interface LockEntry {
token: string;
expiresAt: number;
timeout: NodeJS.Timeout;
}

/**
* In-memory mutex implementation for testing and single-process applications.
*
* This mutex only works within a single process and is not suitable for
* distributed locking across multiple processes or servers. Use it for:
* - Unit testing
* - Development environments
* - Single-process applications that need mutex semantics
*
* @example
* ```typescript
* import { InMemoryMutex } from 'adapter-queue/memory-mutex';
*
* const mutex = new InMemoryMutex();
*
* await mutex.withLock('critical-section', async () => {
* // Only one async operation can execute this at a time
* await performCriticalOperation();
* });
* ```
*/
export class InMemoryMutex implements DistributedMutex {
private locks = new Map<string, LockEntry>();

async tryAcquire(key: string, ttlMs: number): Promise<string | null> {
const existing = this.locks.get(key);

// Check if lock exists and is not expired
if (existing && existing.expiresAt > Date.now()) {
return null;
}

// Clean up expired lock if it exists
if (existing) {
clearTimeout(existing.timeout);
this.locks.delete(key);
}

const token = randomUUID();
const expiresAt = Date.now() + ttlMs;

// Set up automatic cleanup on expiration
const timeout = setTimeout(() => {
const lock = this.locks.get(key);
if (lock && lock.token === token) {
this.locks.delete(key);
}
}, ttlMs);

this.locks.set(key, { token, expiresAt, timeout });

return token;
}

async release(key: string, token: string): Promise<boolean> {
const lock = this.locks.get(key);

if (!lock || lock.token !== token) {
return false;
}

clearTimeout(lock.timeout);
this.locks.delete(key);

return true;
}

async withLock<T>(
key: string,
fn: () => Promise<T>,
options?: MutexLockOptions
): Promise<T> {
const { ttlMs = 30000, waitMs = 5000, retryIntervalMs = 100 } = options ?? {};
const deadline = Date.now() + waitMs;
let token: string | null = null;

// Try to acquire lock with retries
while (Date.now() < deadline) {
token = await this.tryAcquire(key, ttlMs);
if (token) break;
await sleep(retryIntervalMs);
}

if (!token) {
throw new MutexAcquireError(key, waitMs);
}

try {
return await fn();
} finally {
await this.release(key, token);
}
}

/**
* Check if a lock is currently held.
* Useful for testing and debugging.
*
* @param key - The lock key to check
* @returns true if the lock is held and not expired
*/
isLocked(key: string): boolean {
const lock = this.locks.get(key);
return !!lock && lock.expiresAt > Date.now();
}

/**
* Get the number of active locks.
* Useful for testing and debugging.
*/
get size(): number {
return this.locks.size;
}

/**
* Clear all locks. Useful for test cleanup.
*/
clear(): void {
for (const lock of this.locks.values()) {
clearTimeout(lock.timeout);
}
this.locks.clear();
}
}

function sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
Loading