@@ -151,6 +151,23 @@ static void reconnect(Shub* shub)
151
151
}
152
152
}
153
153
154
+ static void notify_disconnect (Shub * shub ,int chan )
155
+ {
156
+ ShubMessageHdr * hdr ;
157
+ hdr = (ShubMessageHdr * )& shub -> in_buffer [shub -> in_buffer_used ];
158
+ hdr -> size = 0 ;
159
+ hdr -> chan = chan ;
160
+ hdr -> code = MSG_DISCONNECT ;
161
+ shub -> in_buffer_used += sizeof (ShubMessageHdr );
162
+ if (shub -> in_buffer_used + sizeof (ShubMessageHdr )> shub -> params -> buffer_size ) {
163
+ while (!write_socket (shub -> output ,shub -> in_buffer ,shub -> in_buffer_used )) {
164
+ shub -> params -> error_handler ("Failed to write to inet socket" ,SHUB_RECOVERABLE_ERROR );
165
+ reconnect (shub );
166
+ }
167
+ shub -> in_buffer_used = 0 ;
168
+ }
169
+ }
170
+
154
171
static void recovery (Shub * shub )
155
172
{
156
173
int i ,max_fd ;
@@ -162,6 +179,9 @@ static void recovery(Shub* shub)
162
179
FD_ZERO (& tryset );
163
180
FD_SET (i ,& tryset );
164
181
if (select (i + 1 ,& tryset ,NULL ,NULL ,& tm )< 0 ) {
182
+ if (i != shub -> input && i != shub -> output ) {
183
+ notify_disconnect (shub ,i );
184
+ }
165
185
close_socket (shub ,i );
166
186
}
167
187
}
@@ -259,6 +279,7 @@ void ShubLoop(Shub* shub)
259
279
if (!write_socket (chan , (char * )hdr ,n )) {
260
280
shub -> params -> error_handler ("Failed to write to local socket" ,SHUB_RECOVERABLE_ERROR );
261
281
close_socket (shub ,chan );
282
+ notify_disconnect (shub ,chan );
262
283
chan = -1 ;
263
284
}
264
285
if (n != hdr -> size + sizeof (ShubMessageHdr )) {
@@ -274,6 +295,7 @@ void ShubLoop(Shub* shub)
274
295
if (chan >=0 && !write_socket (chan ,shub -> out_buffer ,n )) {
275
296
shub -> params -> error_handler ("Failed to write to local socket" ,SHUB_RECOVERABLE_ERROR );
276
297
close_socket (shub ,chan );
298
+ notify_disconnect (shub ,chan );
277
299
chan = -1 ;
278
300
}
279
301
tail -= n ;
@@ -295,6 +317,7 @@ void ShubLoop(Shub* shub)
295
317
if (available < sizeof (ShubMessageHdr )) {
296
318
shub -> params -> error_handler ("Failed to read local socket" ,SHUB_RECOVERABLE_ERROR );
297
319
close_socket (shub ,i );
320
+ notify_disconnect (shub ,i );
298
321
}else {
299
322
int pos = 0 ;
300
323
/* loop through all fetched messages */
@@ -333,6 +356,7 @@ void ShubLoop(Shub* shub)
333
356
if (hdr != NULL ) {/* if message header is not yet sent to the server... */
334
357
/* ... then skip this message */
335
358
shub -> in_buffer_used = (char * )hdr - shub -> in_buffer ;
359
+ notify_disconnect (shub ,chan );
336
360
break ;
337
361
}else {/* if message was partly sent to the server, we can not skip it, so we have to send garbage to the server */
338
362
chan = -1 ;/* do not try to read rest of body of this message */
@@ -351,6 +375,10 @@ void ShubLoop(Shub* shub)
351
375
shub -> in_buffer_used = 0 ;
352
376
}
353
377
}while (size != 0 );/* repeat until all message body is received */
378
+
379
+ if (chan < 0 ) {
380
+ notify_disconnect (shub ,i );
381
+ }
354
382
355
383
pos = available ;
356
384
break ;