-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathlock.ts
174 lines (153 loc) · 4.47 KB
/
lock.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
/**
* A mutual exclusion mechanism for concurrent operations and protecting shared
* data.
* @module
*/
import BiMap from "./collections/BiMap.ts";
if (typeof Symbol.dispose === "undefined") {
Object.defineProperty(Symbol, "dispose", { value: Symbol("Symbol.dispose") });
}
const _queue = Symbol.for("queue");
const _value = Symbol.for("value");
const _mutex = Symbol.for("mutex");
const _unlocked = Symbol.for("unlocked");
/**
* Mutual Exclusion prevents multiple coroutines from accessing the same shared
* resource simultaneously.
*
* **NOTE:**
* Currently, the Mutex instance can not be used across multiple threads, but is
* considering adding support for `parallel` threads.
*
* @example
* ```ts
* import { Mutex } from "@ayonli/jsext/lock";
* import { random } from "@ayonli/jsext/number";
* import { sleep } from "@ayonli/jsext/async";
*
* const mutex = new Mutex(1);
*
* async function concurrentOperation() {
* using shared = await mutex.lock();
* const value1 = shared.value;
*
* await otherAsyncOperations();
*
* shared.value += 1
* const value2 = shared.value;
*
* // Without mutex lock, the shared value may have been modified by other
* // calls during `await otherAsyncOperation()`, and the following
* // assertion will fail.
* console.assert(value1 + 1 === value2);
* }
*
* async function otherAsyncOperations() {
* await sleep(100 * random(1, 10));
* }
*
* await Promise.all([
* concurrentOperation(),
* concurrentOperation(),
* concurrentOperation(),
* concurrentOperation(),
* ]);
* ```
*/
export class Mutex<T> {
private [_queue]: (() => void)[] = [];
private [_value]: T;
/**
* @param value The data associated to the mutex instance.
*/
constructor(value: T) {
this[_value] = value;
}
/**
* Acquires the lock of the mutex, optionally for modifying the shared
* resource.
*/
async lock(): Promise<Mutex.Lock<T>> {
await new Promise<void>(resolve => {
if (this[_queue].length) {
this[_queue].push(resolve);
} else {
this[_queue].push(resolve);
resolve();
}
});
const lock = Object.create(Mutex.Lock.prototype) as Mutex.Lock<T>;
lock[_mutex] = this;
return lock;
}
}
export namespace Mutex {
export abstract class Lock<T> {
private [_mutex]: Mutex<T>;
private [_unlocked] = false;
constructor(mutex: Mutex<T>) {
this[_mutex] = mutex;
}
/** Accesses the data associated to the mutex instance. */
get value(): T {
if (this[_unlocked]) {
throw new ReferenceError("trying to access data after unlocked");
}
return this[_mutex][_value];
}
set value(v: T) {
if (this[_unlocked]) {
throw new ReferenceError("trying to access data after unlocked");
}
this[_mutex][_value] = v;
}
/** Releases the current lock of the mutex. */
unlock() {
this[_unlocked] = true;
const queue = this[_mutex][_queue];
queue.shift();
const next = queue[0];
if (next) {
next();
} else if (registry.hasValue(this[_mutex] as any)) {
registry.deleteValue(this[_mutex] as any);
}
}
[Symbol.dispose]() {
this.unlock();
}
}
}
const registry = new BiMap<any, Mutex<undefined>>();
/**
* Acquires a mutex lock for the given key in order to perform concurrent
* operations and prevent conflicts.
*
* If the key is currently being locked by other coroutines, this function will
* block until the lock becomes available again.
*
* @example
* ```ts
* import lock from "@ayonli/jsext/lock";
*
* const key = "lock_key";
*
* export async function concurrentOperation() {
* using ctx = await lock(key);
* void ctx;
*
* // This block will never be run if there are other coroutines holding
* // the lock.
* //
* // Other coroutines trying to lock the same key will also never be run
* // before this function completes.
* }
* ```
*/
export default async function lock(key: any): Promise<Mutex.Lock<undefined>> {
let mutex = registry.get(key);
if (!mutex) {
registry.set(key, mutex = new Mutex(void 0));
}
return await mutex.lock();
}