Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit53217b7

Browse files
author
Drew Manlove
committed
simplify and improve extensible buffering
1 parent2fbceb8 commit53217b7

File tree

1 file changed

+24
-47
lines changed

1 file changed

+24
-47
lines changed

‎src/lib.rs

Lines changed: 24 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -149,65 +149,52 @@ pub enum PacketType {
149149
/// Wrapper for TcpStream with some built-in buffering
150150
structConnReader{
151151
stream:Rc<TcpStream>,
152+
packet_buf:Vec<u8>,
152153
read_buf:Vec<u8>,
153-
read_pos:usize,
154154
}
155155

156156
/// Wrapper for TcpStream with some built-in buffering
157157
structConnWriter{
158158
stream:Rc<TcpStream>,
159159
write_buf:Vec<u8>,
160-
write_pos:usize,
161160
}
162161

163162
implConnReader{
164163

165164
fnnew(stream:Rc<TcpStream>) ->Self{
166165
ConnReader{
167166
stream: stream,
168-
read_buf:vec![0u8;4096],
169-
read_pos:0,
167+
packet_buf:Vec::with_capacity(4096),
168+
read_buf:vec![0_u8;4096]
170169
}
171170
}
172171

173172
/// Read from the socket until the status is NotReady
174173
fnread(&mutself) ->Poll<(), io::Error>{
174+
debug!("read()");
175175
loop{
176176
matchself.stream.poll_read(){
177177
Async::Ready(_) =>{
178-
//TODO: ensure capacity first
179-
let n =try_nb!((&*self.stream).read(&mutself.read_buf[self.read_pos..]));
178+
let n =try_nb!((&*self.stream).read(&mutself.read_buf[..]));
180179
if n ==0{
181180
returnErr(Error::new(ErrorKind::Other,"connection closed"));
182181
}
183-
self.read_pos += n;
182+
self.packet_buf.extend_from_slice(&self.read_buf[0..n]);
184183
},
185184
_ =>returnOk(Async::NotReady),
186185
}
187186
}
188187
}
189188

190189
fnnext(&mutself) ->Option<Packet>{
190+
debug!("next()");
191191
// do we have a header
192-
ifself.read_pos >3{
193-
let l =parse_packet_length(&self.read_buf);
192+
ifself.packet_buf.len() >3{
193+
let l =parse_packet_length(&self.packet_buf);
194194
// do we have the whole packet?
195195
let s =4 + l;
196-
ifself.read_pos >= s{
197-
letmut temp:Vec<u8> =Vec::with_capacity(s);
198-
temp.extend_from_slice(&self.read_buf[0..s]);
199-
let p =Packet{bytes: temp};
200-
ifself.read_pos == s{
201-
self.read_pos =0;
202-
}else{
203-
// shift data down
204-
letmut j =0;
205-
for iin s ..self.read_pos{
206-
self.read_buf[j] =self.read_buf[i];
207-
j +=1;
208-
}
209-
self.read_pos -= s;
210-
}
196+
ifself.packet_buf.len() >= s{
197+
let p =Packet{bytes:self.packet_buf.drain(0..s).collect()};
211198
Some(p)
212199
}else{
213200
None
@@ -223,37 +210,27 @@ impl ConnWriter {
223210
fnnew(stream:Rc<TcpStream>) ->Self{
224211
ConnWriter{
225212
stream: stream,
226-
write_buf:vec![0u8;4096],
227-
write_pos:0,
213+
write_buf:Vec::with_capacity(4096),
228214
}
229215
}
230216

231217
/// Write a packet to the write buffer
232218
fnpush(&mutself,p:&Packet){
233-
for iin0 .. p.bytes.len(){
234-
self.write_buf[self.write_pos + i] = p.bytes[i];
235-
}
236-
self.write_pos += p.bytes.len();
219+
// debug!("push() capacity: {} position: {} packet_size: {}",
220+
// self.write_buf.capacity(), self.write_pos, p.bytes.len());
221+
222+
self.write_buf.extend_from_slice(&p.bytes);
223+
debug!("end push()");
237224
}
238225

239226
/// Writes the contents of the write buffer to the socket
240227
fnwrite(&mutself) ->Poll<(), io::Error>{
241-
whileself.write_pos >0{
228+
debug!("write()");
229+
whileself.write_buf.len() >0{
242230
matchself.stream.poll_write(){
243231
Async::Ready(_) =>{
244-
let s = try!((&*self.stream).write(&self.write_buf[0..self.write_pos]));
245-
if s ==self.write_pos{
246-
self.write_pos =0;
247-
}else{
248-
// for a partial write, shift data down
249-
letmut j =0;
250-
for iin s..self.write_pos{
251-
self.write_buf[j] =self.write_buf[i];
252-
j +=1;
253-
}
254-
self.write_pos -= s;
255-
}
256-
232+
let s = try!((&*self.stream).write(&self.write_buf[..]));
233+
let _:Vec<u8> =self.write_buf.drain(0..s).collect();
257234
},
258235
_ =>returnOk(Async::NotReady)
259236
}
@@ -272,9 +249,9 @@ pub struct Pipe<H: PacketHandler + 'static> {
272249

273250
impl<H>Pipe<H>whereH:PacketHandler +'static{
274251
pubfnnew(client:Rc<TcpStream>,
275-
server:Rc<TcpStream>,
276-
handler:H
277-
) ->Pipe<H>{
252+
server:Rc<TcpStream>,
253+
handler:H
254+
) ->Pipe<H>{
278255

279256
Pipe{
280257
client_reader:ConnReader::new(client.clone()),

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp