Skip to content

Commit fc87889

Browse files
committed
[ws server]: fix bug in subscription response.
`.await` was missing in RegisteredSubscription::send() and no responses were actually sent which this commit fixes.
1 parent 37825cd commit fc87889

File tree

1 file changed

+11
-4
lines changed

1 file changed

+11
-4
lines changed

src/ws/server.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -238,13 +238,13 @@ impl Server {
238238
subscribe_method_name: String,
239239
unsubscribe_method_name: String,
240240
) -> Result<RegisteredSubscription, ()> {
241+
log::debug!("[register_subscription]: subscribe_method={}, unsubscribe_method={}", subscribe_method_name, unsubscribe_method_name);
241242
{
242243
let mut registered_methods = self.registered_methods.lock();
243244
if !registered_methods.insert(subscribe_method_name.clone()) {
244245
return Err(());
245246
}
246247
if !registered_methods.insert(unsubscribe_method_name.clone()) {
247-
registered_methods.remove(&subscribe_method_name);
248248
return Err(());
249249
}
250250
}
@@ -253,13 +253,13 @@ impl Server {
253253
.next_subscription_unique_id
254254
.fetch_add(1, atomic::Ordering::Relaxed);
255255

256-
let _ = self
256+
self
257257
.to_back
258258
.unbounded_send(FrontToBack::RegisterSubscription {
259259
unique_id,
260260
subscribe_method: subscribe_method_name,
261261
unsubscribe_method: unsubscribe_method_name,
262-
});
262+
}).map_err(|_| ())?;
263263

264264
Ok(RegisteredSubscription {
265265
to_back: self.to_back.clone(),
@@ -299,11 +299,12 @@ impl RegisteredMethod {
299299

300300
impl RegisteredSubscription {
301301
/// Sends out a value to all the registered clients.
302+
// TODO: return `Result<(), ()>`
302303
pub async fn send(&mut self, value: JsonValue) {
303304
let _ = self.to_back.send(FrontToBack::SendOutNotif {
304305
unique_id: self.unique_id,
305306
notification: value,
306-
});
307+
}).await;
307308
}
308309
}
309310

@@ -314,6 +315,7 @@ impl IncomingRequest {
314315
}
315316

316317
/// Respond to the request.
318+
// TODO: return `Result<(), ()>`
317319
pub async fn respond(mut self, response: impl Into<Result<JsonValue, common::Error>>) {
318320
let _ = self
319321
.to_back
@@ -381,6 +383,7 @@ async fn background_task(
381383
subscribe_method,
382384
unsubscribe_method,
383385
})) => {
386+
log::debug!("server register subscription=id={:?}, subscribe_method:{}, unsubscribe_method={}", unique_id, subscribe_method, unsubscribe_method);
384387
debug_assert_ne!(subscribe_method, unsubscribe_method);
385388
debug_assert!(!subscribe_methods.contains_key(&subscribe_method));
386389
debug_assert!(!subscribe_methods.contains_key(&unsubscribe_method));
@@ -399,15 +402,19 @@ async fn background_task(
399402
unique_id,
400403
notification,
401404
})) => {
405+
log::debug!("server send response to subscription={:?}", unique_id);
402406
debug_assert!(subscribed_clients.contains_key(&unique_id));
403407
if let Some(clients) = subscribed_clients.get(&unique_id) {
408+
log::debug!("{} client(s) is subscribing to {:?}", clients.len(), unique_id);
404409
for client in clients {
405410
debug_assert_eq!(active_subscriptions.get(client), Some(&unique_id));
406411
debug_assert!(server.subscription_by_id(*client).is_some());
407412
if let Some(sub) = server.subscription_by_id(*client) {
408413
sub.push(notification.clone()).await;
409414
}
410415
}
416+
} else {
417+
log::warn!("server subscription: {:?} not found", unique_id);
411418
}
412419
}
413420
Either::Right(RawServerEvent::Notification(notification)) => {

0 commit comments

Comments
 (0)