Skip to content

Commit 4b3d39e

Browse files
authored
fix: message proxy support multiple subscribers (#2392)
* fix: message proxy support multiple subscribers Signed-off-by: axel7083 <[email protected]> * fix: rpc readable tests Signed-off-by: axel7083 <[email protected]> --------- Signed-off-by: axel7083 <[email protected]>
1 parent 266db3d commit 4b3d39e

File tree

3 files changed

+100
-8
lines changed

3 files changed

+100
-8
lines changed

packages/frontend/src/stores/rpcReadable.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ vi.mock('../utils/client', async () => {
3737
postMessage: (message: unknown) => {
3838
if (message && typeof message === 'object' && 'channel' in message) {
3939
const f = rpcBrowser.subscribers.get(message.channel as string);
40-
f?.('');
40+
f?.forEach(listener => listener(''));
4141
}
4242
},
4343
} as unknown as PodmanDesktopApi;

packages/shared/src/messages/MessageProxy.spec.ts

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,97 @@ test('getChannel should use CHANNEL property of classType provided', () => {
187187
expect(channel).toBe('dummy-ping');
188188
});
189189

190+
describe('subscribe', () => {
191+
beforeEach(() => {
192+
window.addEventListener = vi.fn();
193+
194+
(defaultNoTimeoutChannels.noTimeoutChannels as string[]) = [];
195+
});
196+
197+
function getMessageListener(): (event: MessageEvent) => void {
198+
expect(window.addEventListener).toHaveBeenCalledOnce();
199+
expect(window.addEventListener).toHaveBeenCalledWith('message', expect.any(Function));
200+
return vi.mocked(window.addEventListener).mock.calls[0][1] as (event: MessageEvent) => void;
201+
}
202+
203+
test('subscriber should be called on event received', async () => {
204+
const rpcBrowser = new RpcBrowser(window, api);
205+
const messageListener = getMessageListener();
206+
207+
const listener = vi.fn();
208+
rpcBrowser.subscribe('example', listener);
209+
210+
messageListener({
211+
data: {
212+
id: 'example',
213+
body: 'hello',
214+
},
215+
} as unknown as MessageEvent);
216+
217+
expect(listener).toHaveBeenCalledOnce();
218+
});
219+
220+
test('all subscribers should be called if multiple exists', async () => {
221+
const rpcBrowser = new RpcBrowser(window, api);
222+
const messageListener = getMessageListener();
223+
224+
const listeners = Array.from({ length: 10 }, _ => vi.fn());
225+
226+
listeners.forEach(listener => rpcBrowser.subscribe('example', listener));
227+
228+
messageListener({
229+
data: {
230+
id: 'example',
231+
body: 'hello',
232+
},
233+
} as unknown as MessageEvent);
234+
235+
for (const listener of listeners) {
236+
expect(listener).toHaveBeenCalledWith('hello');
237+
}
238+
});
239+
240+
test('subscribers which unsubscribe should not be called', async () => {
241+
const rpcBrowser = new RpcBrowser(window, api);
242+
const messageListener = getMessageListener();
243+
244+
const [listenerA, listenerB] = [vi.fn(), vi.fn()];
245+
246+
const unsubscriberA = rpcBrowser.subscribe('example', listenerA);
247+
const unsubscriberB = rpcBrowser.subscribe('example', listenerB);
248+
249+
messageListener({
250+
data: {
251+
id: 'example',
252+
body: 'hello',
253+
},
254+
} as unknown as MessageEvent);
255+
256+
// unsubscriber the listener B
257+
unsubscriberB.unsubscribe();
258+
259+
messageListener({
260+
data: {
261+
id: 'example',
262+
body: 'hello',
263+
},
264+
} as unknown as MessageEvent);
265+
266+
// unsubscriber the listener A
267+
unsubscriberA.unsubscribe();
268+
269+
messageListener({
270+
data: {
271+
id: 'example',
272+
body: 'hello',
273+
},
274+
} as unknown as MessageEvent);
275+
276+
expect(listenerA).toHaveBeenCalledTimes(2);
277+
expect(listenerB).toHaveBeenCalledOnce();
278+
});
279+
});
280+
190281
describe('no timeout channel', () => {
191282
beforeEach(() => {
192283
vi.resetAllMocks();

packages/shared/src/messages/MessageProxy.ts

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,12 @@ export interface Subscriber {
129129
unsubscribe(): void;
130130
}
131131

132+
export type Listener = (value: any) => void;
133+
132134
export class RpcBrowser {
133135
counter: number = 0;
134136
promises: Map<number, { resolve: (value: unknown) => unknown; reject: (value: unknown) => void }> = new Map();
135-
subscribers: Map<string, (msg: any) => void> = new Map();
137+
subscribers: Map<string, Set<Listener>> = new Map();
136138

137139
getUniqueId(): number {
138140
return ++this.counter;
@@ -164,8 +166,7 @@ export class RpcBrowser {
164166
}
165167
this.promises.delete(message.id);
166168
} else if (this.isSubscribedMessage(message)) {
167-
const handler = this.subscribers.get(message.id);
168-
handler?.(message.body);
169+
this.subscribers.get(message.id)?.forEach(handler => handler(message.body));
169170
} else {
170171
console.error('Received incompatible message.', message);
171172
return;
@@ -220,12 +221,12 @@ export class RpcBrowser {
220221
return promise;
221222
}
222223

223-
// TODO(feloy) need to subscribe several times?
224-
subscribe(msgId: string, f: (msg: any) => void): Subscriber {
225-
this.subscribers.set(msgId, f);
224+
subscribe(msgId: string, f: Listener): Subscriber {
225+
this.subscribers.set(msgId, (this.subscribers.get(msgId) ?? new Set()).add(f));
226+
226227
return {
227228
unsubscribe: (): void => {
228-
this.subscribers.delete(msgId);
229+
this.subscribers.get(msgId)?.delete(f);
229230
},
230231
};
231232
}

0 commit comments

Comments
 (0)