@@ -149,65 +149,52 @@ pub enum PacketType {
149
149
/// Wrapper for TcpStream with some built-in buffering
150
150
struct ConnReader {
151
151
stream : Rc < TcpStream > ,
152
+ packet_buf : Vec < u8 > ,
152
153
read_buf : Vec < u8 > ,
153
- read_pos : usize ,
154
154
}
155
155
156
156
/// Wrapper for TcpStream with some built-in buffering
157
157
struct ConnWriter {
158
158
stream : Rc < TcpStream > ,
159
159
write_buf : Vec < u8 > ,
160
- write_pos : usize ,
161
160
}
162
161
163
162
impl ConnReader {
164
163
165
164
fn new ( stream : Rc < TcpStream > ) ->Self {
166
165
ConnReader {
167
166
stream : stream,
168
- read_buf : vec ! [ 0u8 ; 4096 ] ,
169
- read_pos : 0 ,
167
+ packet_buf : Vec :: with_capacity ( 4096 ) ,
168
+ read_buf : vec ! [ 0_u8 ; 4096 ]
170
169
}
171
170
}
172
171
173
172
/// Read from the socket until the status is NotReady
174
173
fn read ( & mut self ) ->Poll < ( ) , io:: Error > {
174
+ debug ! ( "read()" ) ;
175
175
loop {
176
176
match self . stream . poll_read ( ) {
177
177
Async :: Ready ( _) =>{
178
- //TODO: ensure capacity first
179
- let n =try_nb ! ( ( & * self . stream) . read( & mut self . read_buf[ self . read_pos..] ) ) ;
178
+ let n =try_nb ! ( ( & * self . stream) . read( & mut self . read_buf[ ..] ) ) ;
180
179
if n ==0 {
181
180
return Err ( Error :: new ( ErrorKind :: Other , "connection closed" ) ) ;
182
181
}
183
- self . read_pos += n ;
182
+ self . packet_buf . extend_from_slice ( & self . read_buf [ 0 ..n ] ) ;
184
183
} ,
185
184
_ =>return Ok ( Async :: NotReady ) ,
186
185
}
187
186
}
188
187
}
189
188
190
189
fn next ( & mut self ) ->Option < Packet > {
190
+ debug ! ( "next()" ) ;
191
191
// do we have a header
192
- if self . read_pos >3 {
193
- let l =parse_packet_length ( & self . read_buf ) ;
192
+ if self . packet_buf . len ( ) >3 {
193
+ let l =parse_packet_length ( & self . packet_buf ) ;
194
194
// do we have the whole packet?
195
195
let s =4 + l;
196
- if self . read_pos >= s{
197
- let mut temp: Vec < u8 > =Vec :: with_capacity ( s) ;
198
- temp. extend_from_slice ( & self . read_buf [ 0 ..s] ) ;
199
- let p =Packet { bytes : temp} ;
200
- if self . read_pos == s{
201
- self . read_pos =0 ;
202
- } else {
203
- // shift data down
204
- let mut j =0 ;
205
- for iin s ..self . read_pos {
206
- self . read_buf [ j] =self . read_buf [ i] ;
207
- j +=1 ;
208
- }
209
- self . read_pos -= s;
210
- }
196
+ if self . packet_buf . len ( ) >= s{
197
+ let p =Packet { bytes : self . packet_buf . drain ( 0 ..s) . collect ( ) } ;
211
198
Some ( p)
212
199
} else {
213
200
None
@@ -223,37 +210,27 @@ impl ConnWriter {
223
210
fn new ( stream : Rc < TcpStream > ) ->Self {
224
211
ConnWriter {
225
212
stream : stream,
226
- write_buf : vec ! [ 0u8 ; 4096 ] ,
227
- write_pos : 0 ,
213
+ write_buf : Vec :: with_capacity ( 4096 ) ,
228
214
}
229
215
}
230
216
231
217
/// Write a packet to the write buffer
232
218
fn push ( & mut self , p : & Packet ) {
233
- for iin 0 .. p. bytes . len ( ) {
234
- self . write_buf [ self . write_pos + i] = p. bytes [ i] ;
235
- }
236
- self . write_pos += p. bytes . len ( ) ;
219
+ // debug!("push() capacity: {} position: {} packet_size: {}",
220
+ // self.write_buf.capacity(), self.write_pos, p.bytes.len());
221
+
222
+ self . write_buf . extend_from_slice ( & p. bytes ) ;
223
+ debug ! ( "end push()" ) ;
237
224
}
238
225
239
226
/// Writes the contents of the write buffer to the socket
240
227
fn write ( & mut self ) ->Poll < ( ) , io:: Error > {
241
- while self . write_pos >0 {
228
+ debug ! ( "write()" ) ;
229
+ while self . write_buf . len ( ) >0 {
242
230
match self . stream . poll_write ( ) {
243
231
Async :: Ready ( _) =>{
244
- let s = try!( ( & * self . stream ) . write ( & self . write_buf [ 0 ..self . write_pos ] ) ) ;
245
- if s ==self . write_pos {
246
- self . write_pos =0 ;
247
- } else {
248
- // for a partial write, shift data down
249
- let mut j =0 ;
250
- for iin s..self . write_pos {
251
- self . write_buf [ j] =self . write_buf [ i] ;
252
- j +=1 ;
253
- }
254
- self . write_pos -= s;
255
- }
256
-
232
+ let s = try!( ( & * self . stream ) . write ( & self . write_buf [ ..] ) ) ;
233
+ let _: Vec < u8 > =self . write_buf . drain ( 0 ..s) . collect ( ) ;
257
234
} ,
258
235
_ =>return Ok ( Async :: NotReady )
259
236
}
@@ -272,9 +249,9 @@ pub struct Pipe<H: PacketHandler + 'static> {
272
249
273
250
impl < H > Pipe < H > where H : PacketHandler +' static {
274
251
pub fn new ( client : Rc < TcpStream > ,
275
- server : Rc < TcpStream > ,
276
- handler : H
277
- ) ->Pipe < H > {
252
+ server : Rc < TcpStream > ,
253
+ handler : H
254
+ ) ->Pipe < H > {
278
255
279
256
Pipe {
280
257
client_reader : ConnReader :: new ( client. clone ( ) ) ,