Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions src/http/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use crate::http::HttpServer;
use futures::channel::oneshot::{self, Sender};
use futures::future::FutureExt;
use futures::{pin_mut, select};
use jsonrpsee_test_utils::types::{Id, StatusCode};
use jsonrpsee_test_utils::helpers::*;
use jsonrpsee_test_utils::types::{Id, StatusCode};
use std::net::SocketAddr;

async fn server(server_started_tx: Sender<SocketAddr>) {
Expand Down Expand Up @@ -77,7 +77,9 @@ async fn single_method_call_with_params() {
let server_addr = server_started_rx.await.unwrap();

let req = r#"{"jsonrpc":"2.0","method":"add", "params":[1, 2],"id":1}"#;
let response = http_request(req.into(), to_http_uri(server_addr)).await.unwrap();
let response = http_request(req.into(), to_http_uri(server_addr))
.await
.unwrap();
assert_eq!(response.status, StatusCode::OK);
assert_eq!(
response.body,
Expand All @@ -92,7 +94,9 @@ async fn should_return_method_not_found() {
let server_addr = server_started_rx.await.unwrap();

let req = r#"{"jsonrpc":"2.0","method":"bar","id":"foo"}"#;
let response = http_request(req.into(), to_http_uri(server_addr)).await.unwrap();
let response = http_request(req.into(), to_http_uri(server_addr))
.await
.unwrap();
assert_eq!(response.status, StatusCode::OK);
assert_eq!(response.body, method_not_found(Id::Str("foo".into())));
}
Expand All @@ -104,7 +108,9 @@ async fn invalid_json_id_missing_value() {
let server_addr = server_started_rx.await.unwrap();

let req = r#"{"jsonrpc":"2.0","method":"say_hello","id"}"#;
let response = http_request(req.into(), to_http_uri(server_addr)).await.unwrap();
let response = http_request(req.into(), to_http_uri(server_addr))
.await
.unwrap();
// If there was an error in detecting the id in the Request object (e.g. Parse error/Invalid Request), it MUST be Null.
assert_eq!(response.body, parse_error(Id::Null));
}
Expand All @@ -116,7 +122,9 @@ async fn invalid_request_object() {
let server_addr = server_started_rx.await.unwrap();

let req = r#"{"jsonrpc":"2.0","method":"bar","id":1,"is_not_request_object":1}"#;
let response = http_request(req.into(), to_http_uri(server_addr)).await.unwrap();
let response = http_request(req.into(), to_http_uri(server_addr))
.await
.unwrap();
assert_eq!(response.status, StatusCode::OK);
assert_eq!(response.body, invalid_request(Id::Num(1)));
}
8 changes: 6 additions & 2 deletions src/ws/raw/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ async fn request_work() {
let (mut server, server_addr) = raw_server().await;

tokio::spawn(async move {
let mut client = WsTransportClient::new(&to_ws_uri_string(server_addr)).await.unwrap();
let mut client = WsTransportClient::new(&to_ws_uri_string(server_addr))
.await
.unwrap();
let call = Call::MethodCall(MethodCall {
jsonrpc: Version::V2,
method: "hello_world".to_owned(),
Expand Down Expand Up @@ -75,7 +77,9 @@ async fn notification_work() {
let (mut server, server_addr) = raw_server().await;

tokio::spawn(async move {
let mut client = WsTransportClient::new(&to_ws_uri_string(server_addr)).await.unwrap();
let mut client = WsTransportClient::new(&to_ws_uri_string(server_addr))
.await
.unwrap();
let n = Notification {
jsonrpc: Version::V2,
method: "hello_world".to_owned(),
Expand Down
36 changes: 36 additions & 0 deletions src/ws/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,3 +271,39 @@ async fn register_methods_works() {
"Failed register_subscription should not have side-effects"
);
}

#[tokio::test]
async fn parse_error_request_should_not_close_connection() {
let (server_started_tx, server_started_rx) = oneshot::channel::<SocketAddr>();
tokio::spawn(server(server_started_tx));
let server_addr = server_started_rx.await.unwrap();

let mut client = WebSocketTestClient::new(server_addr).await.unwrap();
let invalid_request = r#"{"jsonrpc":"2.0","method":"bar","params":[1,"id":99}"#;
let response1 = client.send_request_text(invalid_request).await.unwrap();
assert_eq!(response1, parse_error(Id::Null));
let request = r#"{"jsonrpc":"2.0","method":"say_hello","id":33}"#;
let response2 = client.send_request_text(request).await.unwrap();
assert_eq!(
response2,
ok_response(JsonValue::String("hello".to_owned()), Id::Num(33))
);
}

#[tokio::test]
async fn invalid_request_should_not_close_connection() {
let (server_started_tx, server_started_rx) = oneshot::channel::<SocketAddr>();
tokio::spawn(server(server_started_tx));
let server_addr = server_started_rx.await.unwrap();

let mut client = WebSocketTestClient::new(server_addr).await.unwrap();
let req = r#"{"jsonrpc":"2.0","method":"bar","id":1,"is_not_request_object":1}"#;
let response = client.send_request_text(req).await.unwrap();
assert_eq!(response, invalid_request(Id::Num(1)));
let request = r#"{"jsonrpc":"2.0","method":"say_hello","id":33}"#;
let response = client.send_request_text(request).await.unwrap();
assert_eq!(
response,
ok_response(JsonValue::String("hello".to_owned()), Id::Num(33))
);
}
55 changes: 36 additions & 19 deletions src/ws/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,9 +349,8 @@ impl WsTransportServerBuilder {

/// Processes a single connection.
//
//
// TODO: document this function it is quite hard to understand the outcome when it returns.
// For example it returns when it receives an error and terminates all pending tasks in the connection.
// TODO: document this function it is quite hard to understand the outcome when it returns `Vec<WsRequestId>`
// both when an error or if the actual connection was terminated.
async fn per_connection_task(
socket: TcpStream,
next_request_id: Arc<atomic::AtomicU64>,
Expand Down Expand Up @@ -425,14 +424,20 @@ async fn per_connection_task(
crate::common::Version::V2,
))
.expect("valid JSON; qed");
if let Err(e) = sender.send_text(&response).await {
log::warn!(
"Failed to send: {:?} over WebSocket transport with error: {:?}",
response,
e
);

match sender.send_text(&response).await {
// deserialization failed but the client is still alive
Ok(_) => continue,
// deserialization failed and the client is not alive
Err(e) => {
log::warn!(
"Failed to send: {:?} over WebSocket transport with error: {:?}",
response,
e
);
return pending_requests;
}
}
return pending_requests;
}
};

Expand All @@ -458,15 +463,27 @@ async fn per_connection_task(
})
.now_or_never();

if let Some(Ok(_)) = result {
pending_requests.push(request_id);
} else {
// TODO: why are we terminating all pending requests if only one fails?!.
log::error!(
"Send request={:?} failed; terminating all pending_requests",
request_id
);
return pending_requests;
match result {
// Request was succesfully transmitted to the frontend.
Some(Ok(_)) => pending_requests.push(request_id),
// The channel is down or full.
Some(Err(e)) => {
log::error!(
"send request={:?} to frontend failed because of {:?}, terminating the connection",
request_id,
e,
);
return pending_requests;
}
// The future wasn't ready.
// TODO(niklasad1): verify if this is possible to happen "in practice".
Copy link
Contributor Author

@niklasad1 niklasad1 Oct 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry added some ugly logging to distinguish mpsc::SendError from future is not ready after your review.

Out of scope from this PR:

I'm unsure if this is the right action if the future wasn't ready then we terminate the connection currently.

It might be possible that the receiving side ("frontend") is busy doing other things so it might be better just to save the request and try later alternative ignore the request notify the client and continue process the connection further.

None => {
log::error!(
"send request={:?} to frontend failed future not ready, terminating the connection",
request_id,
);
return pending_requests;
}
}
}

Expand Down