Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commita81e499

Browse files
authored
Merge pull request#725 from carun/example-async-file-uploader
Added example to store large streaming uploads directly to filesystem using async server.
2 parents111438d +597a107 commita81e499

File tree

2 files changed

+266
-0
lines changed

2 files changed

+266
-0
lines changed

‎libs/network/example/CMakeLists.txt

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ add_executable(trivial_google trivial_google.cpp)
2020

2121
if (UNIX)
2222
add_executable(fileserverhttp/fileserver.cpp)
23+
add_executable(async_server_file_uploadhttp/async_server_file_upload.cpp)
24+
add_dependencies(async_server_file_uploadcppnetlib-server-parsers)
2325
endif (UNIX)
2426
add_dependencies(http_clientcppnetlib-uricppnetlib-client-connections)
2527
add_dependencies(simple_wgetcppnetlib-uricppnetlib-client-connections)
@@ -136,6 +138,17 @@ if (UNIX)
136138
if (OPENSSL_FOUND)
137139
target_link_libraries(fileserver${OPENSSL_LIBRARIES})
138140
endif(OPENSSL_FOUND)
141+
142+
target_link_libraries(async_server_file_upload
143+
${Boost_LIBRARIES}
144+
${CMAKE_THREAD_LIBS_INIT}
145+
cppnetlib-server-parsers)
146+
if (${CMAKE_SYSTEM_NAME}MATCHES"Linux")
147+
target_link_libraries(async_server_file_uploadrt)
148+
endif ()
149+
if (OPENSSL_FOUND)
150+
target_link_libraries(async_server_file_upload${OPENSSL_LIBRARIES})
151+
endif (OPENSSL_FOUND)
139152
endif (UNIX)
140153

141154
set_target_properties(http_clientPROPERTIESRUNTIME_OUTPUT_DIRECTORY${CPP-NETLIB_BINARY_DIR}/example)
@@ -152,4 +165,5 @@ endif (OPENSSL_FOUND)
152165

153166
if (UNIX)
154167
set_target_properties(fileserverPROPERTIESRUNTIME_OUTPUT_DIRECTORY${CPP-NETLIB_BINARY_DIR}/example)
168+
set_target_properties(async_server_file_uploadPROPERTIESRUNTIME_OUTPUT_DIRECTORY${CPP-NETLIB_BINARY_DIR}/example)
155169
endif (UNIX)
Lines changed: 252 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,252 @@
1+
//
2+
// Copyright 2017 (c) Arun Chandrasekaran <aruncxy@gmail.com>
3+
// Distributed under the Boost Software License, Version 1.0.
4+
// (See accompanying file LICENSE_1_0.txt or copy at
5+
// http://www.boost.org/LICENSE_1_0.txt)
6+
//
7+
8+
//
9+
// Example for performing streaming file upload operations directly to
10+
// filesystem using async server
11+
//
12+
// If you use wget, do the following at the client side:
13+
//
14+
// wget localhost:9190/upload?filename=Earth.mp4
15+
// --post-file=$HOME/Videos/Earth-From-Space.mp4
16+
//
17+
#include<boost/shared_ptr.hpp>
18+
#include<boost/network/protocol/http/server.hpp>
19+
#include<boost/network/utils/thread_pool.hpp>
20+
#include<boost/asio.hpp>
21+
22+
#include<chrono>
23+
#include<condition_variable>
24+
#include<mutex>
25+
#include<map>
26+
27+
structconnection_handler;
28+
29+
typedef boost::network::http::server<connection_handler> server;
30+
31+
///
32+
/// Custom exception type
33+
///
34+
structfile_uploader_exception :publicstd::runtime_error {
35+
file_uploader_exception(const std::string err) :
36+
std::runtime_error(err) {
37+
}
38+
};
39+
40+
///
41+
/// Encapsulates request & connection
42+
///
43+
structfile_uploader : std::enable_shared_from_this<file_uploader> {
44+
const server::request& req;
45+
server::connection_ptr conn;
46+
47+
std::mutex mtx;
48+
std::condition_variable condvar;
49+
50+
FILE* fp =NULL;
51+
52+
public:
53+
file_uploader(const server::request& req,const server::connection_ptr& conn)
54+
: req(req)
55+
, conn(conn) {
56+
const std::string dest =destination(req);
57+
58+
if (dest.find("/upload") != std::string::npos) {
59+
auto queries =get_queries(dest);
60+
auto fname = queries.find("filename");
61+
if (fname != queries.end()) {
62+
fp = ::fopen(fname->second.c_str(),"wb");
63+
if (!fp) {
64+
throwfile_uploader_exception("Failed to open file to write");
65+
}
66+
}else {
67+
throwfile_uploader_exception("'filename' cannot be empty");
68+
}
69+
}
70+
}
71+
72+
~file_uploader() {
73+
if (fp) {
74+
::fflush(fp);
75+
::fclose(fp);
76+
}
77+
}
78+
79+
///
80+
/// Non blocking call to initiate the data transfer
81+
///
82+
voidasync_recv() {
83+
std::size_t content_length =0;
84+
autoconst& headers = req.headers;
85+
for (auto item : headers) {
86+
if (boost::to_lower_copy(item.name) =="content-length") {
87+
content_length =std::stoll(item.value);
88+
break;
89+
}
90+
}
91+
92+
read_chunk(conn, content_length);
93+
}
94+
95+
///
96+
/// The client shall wait by calling this until the transfer is done by
97+
/// the IO threadpool
98+
///
99+
voidwait_for_completion() {
100+
std::unique_lock<std::mutex>_(mtx);
101+
condvar.wait(_);
102+
}
103+
104+
private:
105+
///
106+
/// Parses the string and gets the query as a key-value pair
107+
///
108+
/// @param [in] dest String containing the path and the queries, without the fragment,
109+
/// of the form "/path?key1=value1&key2=value2"
110+
///
111+
std::map<std::string, std::string>get_queries(const std::string dest) {
112+
113+
std::size_t pos = dest.find_first_of("?");
114+
115+
std::map<std::string, std::string> queries;
116+
if (pos != std::string::npos) {
117+
std::string query_string = dest.substr(pos +1);
118+
119+
// Replace '&' with space
120+
for (pos =0; pos < query_string.size(); pos++) {
121+
if (query_string[pos] =='&') {
122+
query_string[pos] ='';
123+
}
124+
}
125+
126+
std::istringstreamsin(query_string);
127+
while (sin >> query_string) {
128+
129+
pos = query_string.find_first_of("=");
130+
131+
if (pos != std::string::npos) {
132+
const std::string key = query_string.substr(0, pos);
133+
const std::string value = query_string.substr(pos +1);
134+
queries[key] = value;
135+
}
136+
}
137+
}
138+
139+
return queries;
140+
}
141+
142+
///
143+
/// Reads a chunk of data
144+
///
145+
/// @param [in] conn Connection to read from
146+
/// @param [in] left2read Size to read
147+
///
148+
voidread_chunk(server::connection_ptr conn, std::size_t left2read) {
149+
conn->read(boost::bind(&file_uploader::on_data_ready,
150+
file_uploader::shared_from_this(),
151+
_1, _2, _3, conn, left2read));
152+
}
153+
154+
///
155+
/// Callback that gets called when the data is ready to be consumed
156+
///
157+
voidon_data_ready(server::connection::input_range range,
158+
boost::system::error_code error,
159+
std::size_t size,
160+
server::connection_ptr conn,
161+
std::size_t left2read) {
162+
if (!error) {
163+
::fwrite(boost::begin(range), size, 1, fp);
164+
std::size_t left = left2read - size;
165+
if (left >0)
166+
read_chunk(conn, left);
167+
else
168+
wakeup();
169+
}
170+
}
171+
172+
///
173+
/// Wakesup the waiting thread
174+
///
175+
voidwakeup() {
176+
std::unique_lock<std::mutex>_(mtx);
177+
condvar.notify_one();
178+
}
179+
};
180+
181+
///
182+
/// Functor that gets executed whenever there is a packet on the HTTP port
183+
///
184+
structconnection_handler {
185+
///
186+
/// Gets executed whenever there is a packet on the HTTP port.
187+
///
188+
/// @param [in] req Request object that holds the protobuf data
189+
/// @param [in] conn Connection object
190+
///
191+
voidoperator()(server::requestconst& req,const server::connection_ptr& conn) {
192+
static std::map<std::string, std::string> headers = {
193+
{"Connection","close"},
194+
{"Content-Type","text/plain"}
195+
};
196+
197+
const std::string dest =destination(req);
198+
199+
if (req.method =="POST" && dest.find("/upload") != std::string::npos) {
200+
try {
201+
auto start =std::chrono::high_resolution_clock::now();
202+
// Create a file uploader
203+
std::shared_ptr<file_uploader>uploader(newfile_uploader(req, conn));
204+
// On success to create, start receiving the data
205+
uploader->async_recv();
206+
// Wait until the data transfer is done by the IO threads
207+
uploader->wait_for_completion();
208+
209+
// Respond to the client
210+
conn->set_status(server::connection::ok);
211+
conn->set_headers(headers);
212+
auto end =std::chrono::high_resolution_clock::now();
213+
std::chrono::duration<double, std::milli> diff = end - start;
214+
std::ostringstream stm;
215+
stm <<"Took" << diff.count() <<" milliseconds for the transfer." << std::endl;
216+
conn->write(stm.str());
217+
}catch (const file_uploader_exception& e) {
218+
conn->set_status(server::connection::bad_request);
219+
conn->set_headers(headers);
220+
const std::string err = e.what();
221+
conn->write(err);
222+
}
223+
}else {
224+
conn->set_status(server::connection::bad_request);
225+
conn->set_headers(headers);
226+
conn->write("Only path allowed is /upload.");
227+
}
228+
}
229+
};
230+
231+
intmain(int ac,constchar *av[])
232+
{
233+
if (ac !=2) {
234+
std::cerr <<"Usage:" << av[0] <<" <listener-port>" << std::endl;
235+
return EXIT_SUCCESS;
236+
}
237+
238+
// Create a connection handler
239+
connection_handler handler;
240+
241+
// Setup the async server
242+
serverlocal_server(server::options(handler)
243+
.address("0.0.0.0")
244+
.port(av[1])
245+
.reuse_address(true)
246+
.thread_pool(std::make_shared<boost::network::utils::thread_pool>(2)));
247+
248+
// Start the server eventloop
249+
local_server.run();
250+
251+
return EXIT_SUCCESS;
252+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp