Skip to content

Commit c6ecd25

Browse files
authored
feat!: Connection functions now return impl Future (#108)
Now that the MSRV is > 1.75 we can start making use of return position impl trait. We need to specify send bounds on the returned futures so we return an impl Trait, but it seems like rust is smart enough to allow us to use an actual async function in the implementations.
1 parent e195471 commit c6ecd25

File tree

6 files changed

+106
-101
lines changed

6 files changed

+106
-101
lines changed

src/doc_utils.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
1-
use std::{future::Future, pin::Pin};
1+
use std::future::Future;
22

33
use crate::{next::Message, Error};
44

55
pub struct Conn;
66

77
impl crate::next::Connection for Conn {
8-
fn receive(&mut self) -> Pin<Box<dyn Future<Output = Option<Message>> + Send + '_>> {
8+
async fn receive(&mut self) -> Option<Message> {
99
unimplemented!()
1010
}
1111

12-
fn send(&mut self, _: Message) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + '_>> {
12+
async fn send(&mut self, _: Message) -> Result<(), Error> {
1313
unimplemented!()
1414
}
1515
}

src/native.rs

Lines changed: 39 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
use std::{future::Future, pin::Pin};
2-
3-
use futures::{future::BoxFuture, Sink, SinkExt};
1+
use futures::{Sink, SinkExt};
42
use futures_lite::{Stream, StreamExt};
53
use tungstenite::{self, protocol::CloseFrame};
64

@@ -16,55 +14,47 @@ where
1614
+ Unpin,
1715
<T as Sink<tungstenite::Message>>::Error: std::fmt::Display,
1816
{
19-
fn receive(&mut self) -> Pin<Box<dyn Future<Output = Option<Message>> + Send + '_>> {
20-
Box::pin(async move {
21-
loop {
22-
match self.next().await? {
23-
Ok(tungstenite::Message::Text(text)) => {
24-
return Some(crate::next::Message::Text(text))
25-
}
26-
Ok(tungstenite::Message::Ping(_)) => return Some(crate::next::Message::Ping),
27-
Ok(tungstenite::Message::Pong(_)) => return Some(crate::next::Message::Pong),
28-
Ok(tungstenite::Message::Close(frame)) => {
29-
return Some(crate::next::Message::Close {
30-
code: frame.as_ref().map(|frame| frame.code.into()),
31-
reason: frame.map(|frame| frame.reason.to_string()),
32-
})
33-
}
34-
Ok(tungstenite::Message::Frame(_) | tungstenite::Message::Binary(_)) => {
35-
continue
36-
}
37-
Err(error) => {
38-
#[allow(unused)]
39-
let error = error;
40-
crate::logging::warning!("error receiving message: {error:?}");
41-
return None;
42-
}
17+
async fn receive(&mut self) -> Option<Message> {
18+
loop {
19+
match self.next().await? {
20+
Ok(tungstenite::Message::Text(text)) => {
21+
return Some(crate::next::Message::Text(text))
22+
}
23+
Ok(tungstenite::Message::Ping(_)) => return Some(crate::next::Message::Ping),
24+
Ok(tungstenite::Message::Pong(_)) => return Some(crate::next::Message::Pong),
25+
Ok(tungstenite::Message::Close(frame)) => {
26+
return Some(crate::next::Message::Close {
27+
code: frame.as_ref().map(|frame| frame.code.into()),
28+
reason: frame.map(|frame| frame.reason.to_string()),
29+
})
30+
}
31+
Ok(tungstenite::Message::Frame(_) | tungstenite::Message::Binary(_)) => continue,
32+
Err(error) => {
33+
#[allow(unused)]
34+
let error = error;
35+
crate::logging::warning!("error receiving message: {error:?}");
36+
return None;
4337
}
4438
}
45-
})
39+
}
4640
}
4741

48-
fn send(&mut self, message: crate::next::Message) -> BoxFuture<'_, Result<(), Error>> {
49-
Box::pin(async move {
50-
<Self as SinkExt<tungstenite::Message>>::send(
51-
self,
52-
match message {
53-
crate::next::Message::Text(text) => tungstenite::Message::Text(text),
54-
crate::next::Message::Close { code, reason } => {
55-
tungstenite::Message::Close(code.zip(reason).map(|(code, reason)| {
56-
CloseFrame {
57-
code: code.into(),
58-
reason: reason.into(),
59-
}
60-
}))
61-
}
62-
crate::next::Message::Ping => tungstenite::Message::Ping(vec![]),
63-
crate::next::Message::Pong => tungstenite::Message::Pong(vec![]),
64-
},
65-
)
66-
.await
67-
.map_err(|error| Error::Send(error.to_string()))
68-
})
42+
async fn send(&mut self, message: crate::next::Message) -> Result<(), Error> {
43+
<Self as SinkExt<tungstenite::Message>>::send(
44+
self,
45+
match message {
46+
crate::next::Message::Text(text) => tungstenite::Message::Text(text),
47+
crate::next::Message::Close { code, reason } => {
48+
tungstenite::Message::Close(code.zip(reason).map(|(code, reason)| CloseFrame {
49+
code: code.into(),
50+
reason: reason.into(),
51+
}))
52+
}
53+
crate::next::Message::Ping => tungstenite::Message::Ping(vec![]),
54+
crate::next::Message::Pong => tungstenite::Message::Pong(vec![]),
55+
},
56+
)
57+
.await
58+
.map_err(|error| Error::Send(error.to_string()))
6959
}
7060
}

src/next/actor.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::{
1313
};
1414

1515
use super::{
16-
connection::{Connection, Message},
16+
connection::{Message, ObjectSafeConnection},
1717
keepalive::KeepAliveSettings,
1818
ConnectionCommand,
1919
};
@@ -26,15 +26,15 @@ use super::{
2626
/// with an async runtime.
2727
pub struct ConnectionActor {
2828
client: Option<async_channel::Receiver<ConnectionCommand>>,
29-
connection: Box<dyn Connection + Send>,
29+
connection: Box<dyn ObjectSafeConnection>,
3030
operations: HashMap<usize, async_channel::Sender<Value>>,
3131
keep_alive: KeepAliveSettings,
3232
keep_alive_actor: stream::Boxed<ConnectionCommand>,
3333
}
3434

3535
impl ConnectionActor {
3636
pub(super) fn new(
37-
connection: Box<dyn Connection + Send>,
37+
connection: Box<dyn ObjectSafeConnection>,
3838
client: async_channel::Receiver<ConnectionCommand>,
3939
keep_alive: KeepAliveSettings,
4040
) -> Self {

src/next/builder.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::{graphql::GraphqlOperation, logging::trace, protocol::Event, Error};
1010

1111
use super::{
1212
actor::ConnectionActor,
13-
connection::{Connection, Message},
13+
connection::{Connection, Message, ObjectSafeConnection},
1414
keepalive::KeepAliveSettings,
1515
production_future::read_from_producer,
1616
Client, Subscription,
@@ -34,7 +34,7 @@ use super::{
3434
pub struct ClientBuilder {
3535
payload: Option<serde_json::Value>,
3636
subscription_buffer_size: Option<usize>,
37-
connection: Box<dyn Connection + Send>,
37+
connection: Box<dyn ObjectSafeConnection>,
3838
keep_alive: KeepAliveSettings,
3939
}
4040

src/next/connection.rs

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,10 @@ use crate::Error;
1010
/// If users wish to add support for a new client they should implement this trait.
1111
pub trait Connection {
1212
/// Receive the next message on this connection.
13-
fn receive(&mut self) -> Pin<Box<dyn Future<Output = Option<Message>> + Send + '_>>;
13+
fn receive(&mut self) -> impl Future<Output = Option<Message>> + Send;
1414

1515
/// Send a message with on connection
16-
fn send(
17-
&mut self,
18-
message: Message,
19-
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + '_>>;
16+
fn send(&mut self, message: Message) -> impl Future<Output = Result<(), Error>> + Send;
2017
}
2118

2219
/// A websocket message
@@ -69,3 +66,27 @@ impl Message {
6966
)
7067
}
7168
}
69+
70+
/// An object safe wrapper around the Connection trait, allowing us
71+
/// to use it dynamically
72+
pub(crate) trait ObjectSafeConnection: Send {
73+
fn receive(&mut self) -> Pin<Box<dyn Future<Output = Option<Message>> + Send + '_>>;
74+
75+
fn send(
76+
&mut self,
77+
message: Message,
78+
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + '_>>;
79+
}
80+
81+
impl<T: Connection + Sized + Send> ObjectSafeConnection for T {
82+
fn receive(&mut self) -> Pin<Box<dyn Future<Output = Option<Message>> + Send + '_>> {
83+
Box::pin(Connection::receive(self))
84+
}
85+
86+
fn send(
87+
&mut self,
88+
message: Message,
89+
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + '_>> {
90+
Box::pin(Connection::send(self, message))
91+
}
92+
}

src/ws_stream_wasm.rs

Lines changed: 33 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use futures::{future::BoxFuture, FutureExt, SinkExt, StreamExt};
1+
use futures::{FutureExt, SinkExt, StreamExt};
22
use pharos::{Observable, ObserveConfig};
33
use ws_stream_wasm::{WsEvent, WsMessage, WsMeta, WsStream};
44

@@ -26,53 +26,47 @@ impl Connection {
2626
}
2727

2828
impl crate::next::Connection for Connection {
29-
fn receive(&mut self) -> BoxFuture<'_, Option<crate::next::Message>> {
30-
Box::pin(async move {
31-
use crate::next::Message;
32-
loop {
33-
match self.next().await? {
34-
EventOrMessage::Event(WsEvent::Closed(close)) => {
35-
return Some(Message::Close {
36-
code: Some(close.code),
37-
reason: Some(close.reason),
38-
});
39-
}
40-
EventOrMessage::Event(WsEvent::Error | WsEvent::WsErr(_)) => {
41-
return None;
42-
}
43-
EventOrMessage::Event(WsEvent::Open | WsEvent::Closing) => {
44-
continue;
45-
}
29+
async fn receive(&mut self) -> Option<crate::next::Message> {
30+
use crate::next::Message;
31+
loop {
32+
match self.next().await? {
33+
EventOrMessage::Event(WsEvent::Closed(close)) => {
34+
return Some(Message::Close {
35+
code: Some(close.code),
36+
reason: Some(close.reason),
37+
});
38+
}
39+
EventOrMessage::Event(WsEvent::Error | WsEvent::WsErr(_)) => {
40+
return None;
41+
}
42+
EventOrMessage::Event(WsEvent::Open | WsEvent::Closing) => {
43+
continue;
44+
}
4645

47-
EventOrMessage::Message(WsMessage::Text(text)) => {
48-
return Some(Message::Text(text))
49-
}
46+
EventOrMessage::Message(WsMessage::Text(text)) => return Some(Message::Text(text)),
5047

51-
EventOrMessage::Message(WsMessage::Binary(_)) => {
52-
// We shouldn't receive binary messages, but ignore them if we do
53-
continue;
54-
}
48+
EventOrMessage::Message(WsMessage::Binary(_)) => {
49+
// We shouldn't receive binary messages, but ignore them if we do
50+
continue;
5551
}
5652
}
57-
})
53+
}
5854
}
5955

60-
fn send(&mut self, message: crate::next::Message) -> BoxFuture<'_, Result<(), Error>> {
56+
async fn send(&mut self, message: crate::next::Message) -> Result<(), Error> {
6157
use crate::next::Message;
6258

63-
Box::pin(async move {
64-
match message {
65-
Message::Text(text) => self.messages.send(WsMessage::Text(text)).await,
66-
Message::Close { code, reason } => match (code, reason) {
67-
(Some(code), Some(reason)) => self.meta.close_reason(code, reason).await,
68-
(Some(code), _) => self.meta.close_code(code).await,
69-
_ => self.meta.close().await,
70-
}
71-
.map(|_| ()),
72-
Message::Ping | Message::Pong => return Ok(()),
59+
match message {
60+
Message::Text(text) => self.messages.send(WsMessage::Text(text)).await,
61+
Message::Close { code, reason } => match (code, reason) {
62+
(Some(code), Some(reason)) => self.meta.close_reason(code, reason).await,
63+
(Some(code), _) => self.meta.close_code(code).await,
64+
_ => self.meta.close().await,
7365
}
74-
.map_err(|error| Error::Send(error.to_string()))
75-
})
66+
.map(|_| ()),
67+
Message::Ping | Message::Pong => return Ok(()),
68+
}
69+
.map_err(|error| Error::Send(error.to_string()))
7670
}
7771
}
7872

0 commit comments

Comments
 (0)