Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit3ae2e92

Browse files
authored
Merge pull request#15 from AgilData/issue_14
Issue 14
2 parentsf024718 +027872d commit3ae2e92

File tree

2 files changed

+119
-2
lines changed

2 files changed

+119
-2
lines changed

‎examples/passthrough.rs

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
//! MySQL Proxy Server
2+
externcrate mysql_proxy;
3+
use mysql_proxy::*;
4+
5+
#[macro_use]
6+
externcrate log;
7+
externcrate env_logger;
8+
#[macro_use]
9+
externcrate futures;
10+
#[macro_use]
11+
externcrate tokio_core;
12+
externcrate byteorder;
13+
14+
use std::rc::Rc;
15+
use std::env;
16+
use std::net::{SocketAddr};
17+
use std::str;
18+
19+
use futures::{Future};
20+
use futures::stream::Stream;
21+
use tokio_core::net::{TcpStream,TcpListener};
22+
use tokio_core::reactor::{Core};
23+
24+
fnmain(){
25+
env_logger::init().unwrap();
26+
27+
// determine address for the proxy to bind to
28+
let bind_addr = env::args().nth(1).unwrap_or("127.0.0.1:3307".to_string());
29+
let bind_addr = bind_addr.parse::<SocketAddr>().unwrap();
30+
31+
// determine address of the MySQL instance we are proxying for
32+
let mysql_addr = env::args().nth(2).unwrap_or("127.0.0.1:3306".to_string());
33+
let mysql_addr = mysql_addr.parse::<SocketAddr>().unwrap();
34+
35+
// Create the tokio event loop that will drive this server
36+
letmut l =Core::new().unwrap();
37+
38+
// Get a reference to the reactor event loop
39+
let handle = l.handle();
40+
41+
// Create a TCP listener which will listen for incoming connections
42+
let socket =TcpListener::bind(&bind_addr,&l.handle()).unwrap();
43+
println!("Listening on: {}", bind_addr);
44+
45+
// for each incoming connection
46+
let done = socket.incoming().for_each(move |(socket, _)|{
47+
48+
// create a future to serve requests
49+
let future =TcpStream::connect(&mysql_addr,&handle)
50+
.and_then(move |mysql|{Ok((socket, mysql))})
51+
.and_then(move |(client, server)|
52+
{Pipe::new(Rc::new(client),Rc::new(server),PassthroughHandler{})
53+
});
54+
55+
// tell the tokio reactor to run the future
56+
handle.spawn(future.map_err(|err|{
57+
println!("Failed to spawn future: {:?}", err);
58+
}));
59+
60+
// everything is great!
61+
Ok(())
62+
63+
});
64+
l.run(done).unwrap();
65+
}
66+
67+
structPassthroughHandler{}
68+
69+
implPacketHandlerforPassthroughHandler{
70+
71+
fnhandle_request(&mutself,p:&Packet) ->Action{
72+
print_packet_chars(&p.bytes);
73+
Action::Forward
74+
}
75+
76+
fnhandle_response(&mutself, _:&Packet) ->Action{
77+
// forward all responses to the client
78+
Action::Forward
79+
}
80+
81+
}
82+
83+
#[allow(dead_code)]
84+
pubfnprint_packet_chars(buf:&[u8]){
85+
print!("[");
86+
for iin0..buf.len(){
87+
print!("{} ", buf[i]aschar);
88+
}
89+
println!("]");
90+
}
91+
92+
#[allow(dead_code)]
93+
pubfnprint_packet_bytes(buf:&[u8]){
94+
print!("[");
95+
for iin0..buf.len(){
96+
if i%8==0{println!("");}
97+
print!("{:#04x} ",buf[i]);
98+
}
99+
println!("]");
100+
}

‎src/lib.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,11 +172,16 @@ impl ConnReader {
172172

173173
/// Read from the socket until the status is NotReady
174174
fnread(&mutself) ->Poll<(), io::Error>{
175+
debug!("read()");
175176
loop{
176177
matchself.stream.poll_read(){
177178
Async::Ready(_) =>{
178-
//TODO: ensure capacity first
179+
// extend if needed
180+
ifself.read_pos >=self.read_buf.len(){
181+
self.read_buf.extend_from_slice(&vec![0u8;4096]);
182+
}
179183
let n =try_nb!((&*self.stream).read(&mutself.read_buf[self.read_pos..]));
184+
180185
if n ==0{
181186
returnErr(Error::new(ErrorKind::Other,"connection closed"));
182187
}
@@ -188,6 +193,7 @@ impl ConnReader {
188193
}
189194

190195
fnnext(&mutself) ->Option<Packet>{
196+
debug!("next()");
191197
// do we have a header
192198
ifself.read_pos >3{
193199
let l =parse_packet_length(&self.read_buf);
@@ -230,14 +236,25 @@ impl ConnWriter {
230236

231237
/// Write a packet to the write buffer
232238
fnpush(&mutself,p:&Packet){
239+
debug!("push() capacity: {} position: {} packet_size: {}",
240+
self.write_buf.capacity(),self.write_pos, p.bytes.len());
241+
// Conditionally extend
242+
if(self.write_pos + p.bytes.len()) >=self.write_buf.capacity(){
243+
let size =(self.write_pos + p.bytes.len()) -self.write_buf.capacity();
244+
self.write_buf.extend_from_slice(&vec![0u8; size]);
245+
debug!("push() extend to capacity {}",self.write_buf.capacity());
246+
}
247+
233248
for iin0 .. p.bytes.len(){
234-
self.write_buf[self.write_pos + i] =p.bytes[i];
249+
self.write_buf.insert(self.write_pos + iasusize,p.bytes[i]);
235250
}
236251
self.write_pos += p.bytes.len();
252+
debug!("end push()");
237253
}
238254

239255
/// Writes the contents of the write buffer to the socket
240256
fnwrite(&mutself) ->Poll<(), io::Error>{
257+
debug!("write()");
241258
whileself.write_pos >0{
242259
matchself.stream.poll_write(){
243260
Async::Ready(_) =>{

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp