@@ -125,13 +125,11 @@ static void reconnect(Shub* shub)
125
125
}while (rc < 0 && errno == EINTR );
126
126
127
127
if (rc >=0 || errno == EINPROGRESS ) {
128
- if (rc >=0 ) {
129
- }
130
128
break ;
131
129
}
132
130
}
133
131
if (rc < 0 ) {
134
- if (errno != ENOENT && errno != ECONNREFUSED ) {
132
+ if (errno != ENOENT && errno != ECONNREFUSED && errno != EINPROGRESS ) {
135
133
shub -> params -> error_handler ("Connection can not be establish" ,SHUB_FATAL_ERROR );
136
134
}
137
135
if (max_attempts -- != 0 ) {
@@ -187,6 +185,7 @@ void ShubInitialize(Shub* shub, ShubParams* params)
187
185
FD_ZERO (& shub -> inset );
188
186
FD_SET (shub -> input ,& shub -> inset );
189
187
188
+ shub -> output = -1 ;
190
189
reconnect (shub );
191
190
192
191
shub -> in_buffer = malloc (params -> buffer_size );
@@ -207,52 +206,58 @@ void ShubLoop(Shub* shub)
207
206
while (1 ) {
208
207
fd_set events ;
209
208
struct timeval tm ;
210
- int i ,max_fd , rc ;
211
- unsigned int n , size ;
209
+ int i ,rc ;
210
+ int max_fd = shub -> max_fd ;
212
211
213
212
tm .tv_sec = shub -> params -> delay /1000 ;
214
213
tm .tv_usec = shub -> params -> delay %1000 * 1000 ;
215
214
216
215
events = shub -> inset ;
217
- rc = select (shub -> max_fd + 1 ,& events ,NULL ,NULL ,shub -> in_buffer_used == 0 ?NULL :& tm );
216
+ rc = select (max_fd + 1 ,& events ,NULL ,NULL ,shub -> in_buffer_used == 0 ?NULL :& tm );
218
217
if (rc < 0 ) {
219
218
if (errno != EINTR ) {
220
219
shub -> params -> error_handler ("Select failed" ,SHUB_RECOVERABLE_ERROR );
221
220
recovery (shub );
222
221
}
223
222
}else {
224
223
if (rc > 0 ) {
225
- for (i = 0 , max_fd = shub -> max_fd ;i <=max_fd ;i ++ ) {
224
+ for (i = 0 ;i <=max_fd ;i ++ ) {
226
225
if (FD_ISSET (i ,& events )) {
227
- if (i == shub -> input ) {
226
+ if (i == shub -> input ) {/* accept incomming connection */
228
227
int s = accept (i ,NULL ,NULL );
229
228
if (s < 0 ) {
230
229
shub -> params -> error_handler ("Failed to accept socket" ,SHUB_RECOVERABLE_ERROR );
231
230
}else {
232
- if (s > max_fd ) {
231
+ if (s > shub -> max_fd ) {
233
232
shub -> max_fd = s ;
234
233
}
235
234
FD_SET (s ,& shub -> inset );
236
235
}
237
- }else if (i == shub -> output ) {
236
+ }else if (i == shub -> output ) {/* receive response from server */
237
+ /* try to read as much as possible */
238
238
int available = recv (shub -> output ,shub -> out_buffer + shub -> out_buffer_used ,buffer_size - shub -> out_buffer_used ,0 );
239
239
int pos = 0 ;
240
240
if (available <=0 ) {
241
241
shub -> params -> error_handler ("Failed to read inet socket" ,SHUB_RECOVERABLE_ERROR );
242
242
reconnect (shub );
243
+ continue ;
243
244
}
244
245
shub -> out_buffer_used += available ;
246
+
247
+ /* loop through all received responses */
245
248
while (pos + sizeof (ShubMessageHdr ) <=shub -> out_buffer_used ) {
246
- ShubMessageHdr * hdr = (ShubMessageHdr * )( shub -> out_buffer + pos ) ;
249
+ ShubMessageHdr * hdr = (ShubMessageHdr * )& shub -> out_buffer [ pos ] ;
247
250
int chan = hdr -> chan ;
248
- n = pos + sizeof (ShubMessageHdr )+ hdr -> size <=shub -> out_buffer_used ?hdr -> size + sizeof (ShubMessageHdr ) :shub -> out_buffer_used - pos ;
251
+ unsignedint n = pos + sizeof (ShubMessageHdr )+ hdr -> size <=shub -> out_buffer_used
252
+ ?hdr -> size + sizeof (ShubMessageHdr )
253
+ :shub -> out_buffer_used - pos ;
249
254
if (!write_socket (chan , (char * )hdr ,n )) {
250
255
shub -> params -> error_handler ("Failed to write to local socket" ,SHUB_RECOVERABLE_ERROR );
251
256
close_socket (shub ,chan );
252
257
chan = -1 ;
253
258
}
254
- /* read rest of message if it doesn't fit in buffer */
255
259
if (n != hdr -> size + sizeof (ShubMessageHdr )) {
260
+ /* read rest of message if it doesn't fit in the buffer */
256
261
int tail = hdr -> size + sizeof (ShubMessageHdr )- n ;
257
262
do {
258
263
n = tail < buffer_size ?tail :buffer_size ;
@@ -274,56 +279,73 @@ void ShubLoop(Shub* shub)
274
279
}
275
280
pos += n ;
276
281
}
282
+ /* Move partly fetched message header (if any) to the beginning of byffer */
277
283
memcpy (shub -> out_buffer ,shub -> out_buffer + pos ,shub -> out_buffer_used - pos );
278
284
shub -> out_buffer_used -= pos ;
279
- }else {
285
+ }else {/* receive request from client */
280
286
ShubMessageHdr * hdr = (ShubMessageHdr * )& shub -> in_buffer [shub -> in_buffer_used ];
281
- if (!read_socket (i , (char * )hdr ,sizeof (ShubMessageHdr ))) {
287
+ int chan = i ;
288
+ if (!read_socket (chan , (char * )hdr ,sizeof (ShubMessageHdr ))) {/* fetch message header */
282
289
shub -> params -> error_handler ("Failed to read local socket" ,SHUB_RECOVERABLE_ERROR );
283
290
close_socket (shub ,i );
284
291
}else {
285
- size = hdr -> size ;
286
- hdr -> chan = i ;
292
+ unsigned int size = hdr -> size ;
293
+ hdr -> chan = chan ; /* remember socket descriptor from which this message was read */
287
294
if (size + shub -> in_buffer_used + sizeof (ShubMessageHdr )> buffer_size ) {
288
- if (shub -> in_buffer_used != 0 ) {
295
+ /* message doesn't completely fit in buffer */
296
+ if (shub -> in_buffer_used != 0 ) {/* if buffer is not empty...*/
297
+ /* ... then send it */
289
298
while (!write_socket (shub -> output ,shub -> in_buffer ,shub -> in_buffer_used )) {
290
299
shub -> params -> error_handler ("Failed to write to inet socket" ,SHUB_RECOVERABLE_ERROR );
291
300
reconnect (shub );
292
301
}
302
+ /* move received message header to the beginning of the buffer */
293
303
memcpy (shub -> in_buffer ,shub -> in_buffer + shub -> in_buffer_used ,sizeof (ShubMessageHdr ));
294
304
shub -> in_buffer_used = 0 ;
295
305
}
296
306
}
297
307
shub -> in_buffer_used += sizeof (ShubMessageHdr );
298
308
299
- while ( 1 ) {
309
+ do {
300
310
unsignedint n = size + shub -> in_buffer_used > buffer_size ?buffer_size - shub -> in_buffer_used :size ;
301
- if (!read_socket (i ,shub -> in_buffer + shub -> in_buffer_used ,n )) {
311
+ /* fetch message body */
312
+ if (chan >=0 && !read_socket (chan ,shub -> in_buffer + shub -> in_buffer_used ,n )) {
302
313
shub -> params -> error_handler ("Failed to read local socket" ,SHUB_RECOVERABLE_ERROR );
303
- close_socket (shub ,i );
304
- break ;
305
- }else {
306
- if (n != size ) {
307
- while (!write_socket (shub -> output ,shub -> in_buffer ,n )) {
308
- shub -> params -> error_handler ("Failed to write to inet socket" ,SHUB_RECOVERABLE_ERROR );
309
- reconnect (shub );
310
- }
311
- size -= n ;
312
- shub -> in_buffer_used = 0 ;
313
- }else {
314
- shub -> in_buffer_used += n ;
314
+ close_socket (shub ,chan );
315
+ if (hdr != NULL ) {/* if message header is not yet sent to the server... */
316
+ /* ... then skip this message */
317
+ shub -> in_buffer_used = (char * )hdr - shub -> in_buffer ;
315
318
break ;
319
+ }else {/* if message was partly sent to the server, we can not skip it, so we have to send garbage to the server */
320
+ chan = -1 ;/* do not try to read rest of body of this message */
316
321
}
322
+ }
323
+ shub -> in_buffer_used += n ;
324
+ size -= n ;
325
+ /* if there is no more free space in the buffer to receive new message header... */
326
+ if (shub -> in_buffer_used + sizeof (ShubMessageHdr )> buffer_size ) {
327
+
328
+ /* ... then send buffer to the server */
329
+ while (!write_socket (shub -> output ,shub -> in_buffer ,shub -> in_buffer_used )) {
330
+ shub -> params -> error_handler ("Failed to write to inet socket" ,SHUB_RECOVERABLE_ERROR );
331
+ reconnect (shub );
332
+ }
333
+ hdr = NULL ;/* message is partly sent to the server: can not skip it any more */
334
+ shub -> in_buffer_used = 0 ;
317
335
}
318
- }
336
+ }while ( size != 0 ); /* repeat until all message body is received */
319
337
}
320
338
}
321
339
}
322
340
}
323
- }else if (shub -> in_buffer_used != 0 ) {
324
- while (!write_socket (shub -> output ,shub -> in_buffer ,shub -> in_buffer_used )) {
325
- shub -> params -> error_handler ("Failed to write to inet socket" ,SHUB_RECOVERABLE_ERROR );
326
- reconnect (shub );
341
+ }else {/* timeout expired */
342
+ if (shub -> in_buffer_used != 0 ) {/* if buffer is not empty... */
343
+ /* ...then send it */
344
+ while (!write_socket (shub -> output ,shub -> in_buffer ,shub -> in_buffer_used )) {
345
+ shub -> params -> error_handler ("Failed to write to inet socket" ,SHUB_RECOVERABLE_ERROR );
346
+ reconnect (shub );
347
+ }
348
+ shub -> in_buffer_used = 0 ;
327
349
}
328
350
}
329
351
}