Skip to content

Commit 23c232c

Browse files
committed
Start using tokio_io
1 parent 9876ed2 commit 23c232c

File tree

4 files changed

+55
-31
lines changed

4 files changed

+55
-31
lines changed

tokio-postgres/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@ with-uuid = ["postgres-shared/with-uuid"]
2222
with-openssl = ["tokio-openssl", "openssl"]
2323

2424
[dependencies]
25+
bytes = "0.4"
2526
fallible-iterator = "0.1.3"
2627
futures = "0.1.7"
2728
futures-state-stream = "0.1"
2829
postgres-protocol = { version = "0.2", path = "../postgres-protocol" }
2930
postgres-shared = { version = "0.2", path = "../postgres-shared" }
3031
tokio-core = "0.1"
3132
tokio-dns-unofficial = "0.1"
33+
tokio-io = "0.1"
3234

3335
tokio-openssl = { version = "0.1", optional = true }
3436
openssl = { version = "0.9", optional = true }

tokio-postgres/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,14 @@
5454
#![doc(html_root_url="https://docs.rs/tokio-postgres/0.2.1")]
5555
#![warn(missing_docs)]
5656

57+
extern crate bytes;
5758
extern crate fallible_iterator;
5859
extern crate futures_state_stream;
5960
extern crate postgres_shared;
6061
extern crate postgres_protocol;
6162
extern crate tokio_core;
6263
extern crate tokio_dns;
64+
extern crate tokio_io;
6365

6466
#[macro_use]
6567
extern crate futures;
@@ -86,7 +88,7 @@ use std::io;
8688
use std::sync::Arc;
8789
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
8890
use std::sync::mpsc::{self, Sender, Receiver};
89-
use tokio_core::io::IoFuture;
91+
use tokio_io::IoFuture;
9092
use tokio_core::reactor::Handle;
9193

9294
#[doc(inline)]

tokio-postgres/src/stream.rs

Lines changed: 48 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1-
use futures::{BoxFuture, Future, IntoFuture, Async, Sink, Stream as FuturesStream};
1+
use bytes::{BytesMut, BufMut};
2+
use futures::{BoxFuture, Future, IntoFuture, Sink, Stream as FuturesStream, Poll};
23
use futures::future::Either;
34
use postgres_shared::params::Host;
45
use postgres_protocol::message::backend::{self, ParseResult};
56
use postgres_protocol::message::frontend;
67
use std::io::{self, Read, Write};
7-
use tokio_core::io::{Io, Codec, EasyBuf, Framed};
8+
use tokio_io::{AsyncRead, AsyncWrite};
9+
use tokio_io::codec::{Encoder, Decoder, Framed};
810
use tokio_core::net::TcpStream;
911
use tokio_core::reactor::Handle;
1012
use tokio_dns;
@@ -132,65 +134,85 @@ impl Write for Stream {
132134
}
133135
}
134136

135-
impl Io for Stream {
136-
fn poll_read(&mut self) -> Async<()> {
137+
impl AsyncRead for Stream {
138+
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
137139
match self.0 {
138-
InnerStream::Tcp(ref mut s) => s.poll_read(),
140+
InnerStream::Tcp(ref s) => s.prepare_uninitialized_buffer(buf),
139141
#[cfg(unix)]
140-
InnerStream::Unix(ref mut s) => s.poll_read(),
142+
InnerStream::Unix(ref s) => s.prepare_uninitialized_buffer(buf),
141143
}
142144
}
143145

144-
fn poll_write(&mut self) -> Async<()> {
146+
fn read_buf<B>(&mut self, buf: &mut B) -> Poll<usize, io::Error>
147+
where B: BufMut
148+
{
145149
match self.0 {
146-
InnerStream::Tcp(ref mut s) => s.poll_write(),
150+
InnerStream::Tcp(ref mut s) => s.read_buf(buf),
147151
#[cfg(unix)]
148-
InnerStream::Unix(ref mut s) => s.poll_write(),
152+
InnerStream::Unix(ref mut s) => s.read_buf(buf),
153+
}
154+
}
155+
}
156+
157+
impl AsyncWrite for Stream {
158+
fn shutdown(&mut self) -> Poll<(), io::Error> {
159+
match self.0 {
160+
InnerStream::Tcp(ref mut s) => s.shutdown(),
161+
#[cfg(unix)]
162+
InnerStream::Unix(ref mut s) => s.shutdown(),
149163
}
150164
}
151165
}
152166

153167
pub struct PostgresCodec;
154168

155-
impl Codec for PostgresCodec {
156-
type In = backend::Message<Vec<u8>>;
157-
type Out = Vec<u8>;
169+
impl Decoder for PostgresCodec {
170+
type Item = backend::Message<Vec<u8>>;
171+
type Error = io::Error;
158172

159173
// FIXME ideally we'd avoid re-copying the data
160-
fn decode(&mut self, buf: &mut EasyBuf) -> io::Result<Option<Self::In>> {
174+
fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Item>> {
161175
match backend::Message::parse_owned(buf.as_ref())? {
162176
ParseResult::Complete { message, consumed } => {
163-
buf.drain_to(consumed);
177+
buf.split_to(consumed);
164178
Ok(Some(message))
165179
}
166180
ParseResult::Incomplete { .. } => Ok(None),
167181
}
168182
}
183+
}
184+
185+
impl Encoder for PostgresCodec {
186+
type Item = Vec<u8>;
187+
type Error = io::Error;
169188

170-
fn encode(&mut self, msg: Vec<u8>, buf: &mut Vec<u8>) -> io::Result<()> {
171-
buf.extend_from_slice(&msg);
189+
fn encode(&mut self, msg: Vec<u8>, buf: &mut BytesMut) -> io::Result<()> {
190+
buf.extend(&msg);
172191
Ok(())
173192
}
174193
}
175194

176195
struct SslCodec;
177196

178-
impl Codec for SslCodec {
179-
type In = u8;
180-
type Out = Vec<u8>;
197+
impl Decoder for SslCodec {
198+
type Item = u8;
199+
type Error = io::Error;
181200

182-
fn decode(&mut self, buf: &mut EasyBuf) -> io::Result<Option<u8>> {
183-
if buf.as_slice().is_empty() {
201+
fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u8>> {
202+
if buf.is_empty() {
184203
Ok(None)
185204
} else {
186-
let byte = buf.as_slice()[0];
187-
buf.drain_to(1);
188-
Ok(Some(byte))
205+
Ok(Some(buf.split_to(1)[0]))
189206
}
190207
}
208+
}
209+
210+
impl Encoder for SslCodec {
211+
type Item = Vec<u8>;
212+
type Error = io::Error;
191213

192-
fn encode(&mut self, msg: Vec<u8>, buf: &mut Vec<u8>) -> io::Result<()> {
193-
buf.extend_from_slice(&msg);
214+
fn encode(&mut self, msg: Vec<u8>, buf: &mut BytesMut) -> io::Result<()> {
215+
buf.extend(&msg);
194216
Ok(())
195217
}
196218
}

tokio-postgres/src/tls/mod.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,22 @@
22
33
use futures::BoxFuture;
44
use std::error::Error;
5-
use tokio_core::io::Io;
5+
use tokio_io::{AsyncRead, AsyncWrite};
66

77
pub use stream::Stream;
88

99
#[cfg(feature = "with-openssl")]
1010
pub mod openssl;
1111

1212
/// A trait implemented by streams returned from `Handshake` implementations.
13-
pub trait TlsStream: Io + Send {
13+
pub trait TlsStream: AsyncRead + AsyncWrite + Send {
1414
/// Returns a shared reference to the inner stream.
1515
fn get_ref(&self) -> &Stream;
1616

1717
/// Returns a mutable reference to the inner stream.
1818
fn get_mut(&mut self) -> &mut Stream;
1919
}
2020

21-
impl Io for Box<TlsStream> {}
22-
2321
impl TlsStream for Stream {
2422
fn get_ref(&self) -> &Stream {
2523
self

0 commit comments

Comments
 (0)