Skip to content

Commit 2e88c72

Browse files
committed
Remove the sync::spsc module for now
It no longer has as much functionality as the `mpsc` module and it's not clear that the perf is all that different to buy two separate queue implementations. The `spsc` queues have been moved to a `futures-spsc` crate for now and if they prove themselves in the long run we can move them back into this crate.
1 parent 4ede3af commit 2e88c72

File tree

13 files changed

+123
-1106
lines changed

13 files changed

+123
-1106
lines changed

src/stream/channel.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#![cfg(feature = "with-deprecated")]
2-
#![deprecated(since = "0.1.4", note = "use sync::spsc::channel instead")]
2+
#![deprecated(since = "0.1.4", note = "use sync::mpsc::channel instead")]
33
#![allow(deprecated)]
44

55
use std::any::Any;
@@ -8,7 +8,7 @@ use std::fmt;
88

99
use {Poll, Async, Stream, Future, Sink};
1010
use sink::Send;
11-
use sync::spsc;
11+
use sync::mpsc;
1212

1313
/// Creates an in-memory channel implementation of the `Stream` trait.
1414
///
@@ -22,15 +22,15 @@ use sync::spsc;
2222
/// The `Receiver` returned implements the `Stream` trait and has access to any
2323
/// number of the associated combinators for transforming the result.
2424
pub fn channel<T, E>() -> (Sender<T, E>, Receiver<T, E>) {
25-
let (tx, rx) = spsc::channel();
25+
let (tx, rx) = mpsc::channel(0);
2626
(Sender { inner: tx }, Receiver { inner: rx })
2727
}
2828

2929
/// The transmission end of a channel which is used to send values.
3030
///
3131
/// This is created by the `channel` method in the `stream` module.
3232
pub struct Sender<T, E> {
33-
inner: spsc::Sender<Result<T, E>>,
33+
inner: mpsc::Sender<Result<T, E>>,
3434
}
3535

3636
/// The receiving end of a channel which implements the `Stream` trait.
@@ -40,15 +40,15 @@ pub struct Sender<T, E> {
4040
/// `channel` method in the `stream` module.
4141
#[must_use = "streams do nothing unless polled"]
4242
pub struct Receiver<T, E> {
43-
inner: spsc::Receiver<Result<T, E>>,
43+
inner: mpsc::Receiver<Result<T, E>>,
4444
}
4545

4646
/// Error type for sending, used when the receiving end of the channel is dropped
4747
pub struct SendError<T, E>(Result<T, E>);
4848

4949
/// Future returned by `Sender::send`.
5050
pub struct FutureSender<T, E> {
51-
inner: Send<spsc::Sender<Result<T, E>>>,
51+
inner: Send<mpsc::Sender<Result<T, E>>>,
5252
}
5353

5454
impl<T, E> fmt::Debug for SendError<T, E> {

src/stream/mod.rs

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -196,9 +196,9 @@ pub trait Stream {
196196
///
197197
/// ```
198198
/// use futures::stream::*;
199-
/// use futures::sync::spsc;
199+
/// use futures::sync::mpsc;
200200
///
201-
/// let (_tx, rx) = spsc::channel();
201+
/// let (_tx, rx) = mpsc::channel(1);
202202
/// let a: BoxStream<i32, ()> = rx.boxed();
203203
/// ```
204204
#[cfg(feature = "use_std")]
@@ -237,9 +237,9 @@ pub trait Stream {
237237
///
238238
/// ```
239239
/// use futures::Stream;
240-
/// use futures::sync::spsc;
240+
/// use futures::sync::mpsc;
241241
///
242-
/// let (_tx, rx) = spsc::channel::<i32>();
242+
/// let (_tx, rx) = mpsc::channel::<i32>(1);
243243
/// let rx = rx.map(|x| x + 3);
244244
/// ```
245245
fn map<U, F>(self, f: F) -> Map<Self, F>
@@ -263,9 +263,9 @@ pub trait Stream {
263263
///
264264
/// ```
265265
/// use futures::Stream;
266-
/// use futures::sync::spsc;
266+
/// use futures::sync::mpsc;
267267
///
268-
/// let (_tx, rx) = spsc::channel::<i32>();
268+
/// let (_tx, rx) = mpsc::channel::<i32>(1);
269269
/// let rx = rx.map_err(|()| 3);
270270
/// ```
271271
fn map_err<U, F>(self, f: F) -> MapErr<Self, F>
@@ -293,9 +293,9 @@ pub trait Stream {
293293
///
294294
/// ```
295295
/// use futures::Stream;
296-
/// use futures::sync::spsc;
296+
/// use futures::sync::mpsc;
297297
///
298-
/// let (_tx, rx) = spsc::channel::<i32>();
298+
/// let (_tx, rx) = mpsc::channel::<i32>(1);
299299
/// let evens = rx.filter(|x| x % 0 == 2);
300300
/// ```
301301
fn filter<F>(self, f: F) -> Filter<Self, F>
@@ -323,9 +323,9 @@ pub trait Stream {
323323
///
324324
/// ```
325325
/// use futures::Stream;
326-
/// use futures::sync::spsc;
326+
/// use futures::sync::mpsc;
327327
///
328-
/// let (_tx, rx) = spsc::channel::<i32>();
328+
/// let (_tx, rx) = mpsc::channel::<i32>(1);
329329
/// let evens_plus_one = rx.filter_map(|x| {
330330
/// if x % 0 == 2 {
331331
/// Some(x + 1)
@@ -362,9 +362,9 @@ pub trait Stream {
362362
///
363363
/// ```
364364
/// use futures::Stream;
365-
/// use futures::sync::spsc;
365+
/// use futures::sync::mpsc;
366366
///
367-
/// let (_tx, rx) = spsc::channel::<i32>();
367+
/// let (_tx, rx) = mpsc::channel::<i32>(1);
368368
///
369369
/// let rx = rx.then(|result| {
370370
/// match result {
@@ -405,9 +405,9 @@ pub trait Stream {
405405
///
406406
/// ```
407407
/// use futures::stream::*;
408-
/// use futures::sync::spsc;
408+
/// use futures::sync::mpsc;
409409
///
410-
/// let (_tx, rx) = spsc::channel::<i32>();
410+
/// let (_tx, rx) = mpsc::channel::<i32>(1);
411411
///
412412
/// let rx = rx.and_then(|result| {
413413
/// if result % 2 == 0 {
@@ -468,9 +468,9 @@ pub trait Stream {
468468
/// use std::thread;
469469
///
470470
/// use futures::{Stream, Future, Sink};
471-
/// use futures::sync::spsc;
471+
/// use futures::sync::mpsc;
472472
///
473-
/// let (mut tx, rx) = spsc::channel();
473+
/// let (mut tx, rx) = mpsc::channel(1);
474474
///
475475
/// thread::spawn(|| {
476476
/// for i in (0..5).rev() {
@@ -530,11 +530,11 @@ pub trait Stream {
530530
/// use std::thread;
531531
///
532532
/// use futures::{Future, Stream, Poll, Sink};
533-
/// use futures::sync::spsc;
533+
/// use futures::sync::mpsc;
534534
///
535-
/// let (tx1, rx1) = spsc::channel::<i32>();
536-
/// let (tx2, rx2) = spsc::channel::<i32>();
537-
/// let (tx3, rx3) = spsc::channel();
535+
/// let (tx1, rx1) = mpsc::channel::<i32>(1);
536+
/// let (tx2, rx2) = mpsc::channel::<i32>(1);
537+
/// let (tx3, rx3) = mpsc::channel(1);
538538
///
539539
/// thread::spawn(|| {
540540
/// tx1.send(1).wait().unwrap()

src/sync/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
//! instead work at the task level.
77
88
pub mod oneshot;
9-
pub mod spsc;
109
pub mod mpsc;
1110
mod bilock;
1211

src/sync/mpsc/mod.rs

Lines changed: 53 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -67,20 +67,19 @@
6767
// happens-before semantics required for the acquire / release semantics used
6868
// by the queue structure.
6969

70-
mod queue;
71-
72-
use self::queue::{Queue, PopResult};
70+
use std::any::Any;
71+
use std::error::Error;
72+
use std::fmt;
73+
use std::sync::atomic::{AtomicUsize, Ordering};
74+
use std::sync::{Arc, Mutex};
75+
use std::thread;
76+
use std::usize;
7377

74-
use {Async, AsyncSink, Poll, StartSend};
78+
use sync::mpsc::queue::{Queue, PopResult};
7579
use task::{self, Task};
76-
use sink::{Sink};
77-
use stream::Stream;
78-
79-
use std::{thread, usize};
80-
use std::sync::{Arc, Mutex};
81-
use std::sync::atomic::{AtomicUsize, Ordering};
80+
use {Async, AsyncSink, Poll, StartSend, Sink, Stream};
8281

83-
pub use std::sync::mpsc::SendError;
82+
mod queue;
8483

8584
/// The transmission end of a channel which is used to send values.
8685
///
@@ -121,6 +120,39 @@ pub struct Receiver<T> {
121120
/// `unbounded` method.
122121
pub struct UnboundedReceiver<T>(Receiver<T>);
123122

123+
/// Error type for sending, used when the receiving end of the channel is
124+
/// dropped
125+
pub struct SendError<T>(T);
126+
127+
impl<T> fmt::Debug for SendError<T> {
128+
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
129+
fmt.debug_tuple("SendError")
130+
.field(&"...")
131+
.finish()
132+
}
133+
}
134+
135+
impl<T> fmt::Display for SendError<T> {
136+
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
137+
write!(fmt, "send failed because receiver is gone")
138+
}
139+
}
140+
141+
impl<T> Error for SendError<T>
142+
where T: Any
143+
{
144+
fn description(&self) -> &str {
145+
"send failed because receiver is gone"
146+
}
147+
}
148+
149+
impl<T> SendError<T> {
150+
/// Returns the message that was attempted to be sent but failed.
151+
pub fn into_inner(self) -> T {
152+
self.0
153+
}
154+
}
155+
124156
struct Inner<T> {
125157
// Max buffer size of the channel. If `None` then the channel is unbounded.
126158
buffer: Option<usize>,
@@ -677,6 +709,16 @@ impl<T> Stream for Receiver<T> {
677709
}
678710
}
679711

712+
impl<T> Drop for Receiver<T> {
713+
fn drop(&mut self) {
714+
// Drain the channel of all pending messages
715+
self.close();
716+
while self.next_message().is_ready() {
717+
// ...
718+
}
719+
}
720+
}
721+
680722
impl<T> UnboundedReceiver<T> {
681723
/// Closes the receiving half
682724
///

0 commit comments

Comments
 (0)