@@ -28,7 +28,7 @@ static void default_error_handler(char const* msg, ShubErrorSeverity severity)
28
28
29
29
void ShubInitParams (ShubParams * params )
30
30
{
31
- memset (params ,0 ,sizeof params );
31
+ memset (params ,0 ,sizeof ( * params ) );
32
32
params -> buffer_size = 64 * 1025 ;
33
33
params -> port = 54321 ;
34
34
params -> queue_size = 100 ;
@@ -65,22 +65,14 @@ static int resolve_host_by_name(const char *hostname, unsigned* addrs, unsigned*
65
65
66
66
static void close_socket (Shub * shub ,int fd )
67
67
{
68
- int i ,max_fd ;
69
- fd_set copy ;
70
- FD_ZERO (& copy );
71
68
close (fd );
72
- for (i = 0 ,max_fd = shub -> max_fd ;i <=max_fd ;i ++ ) {
73
- if (i != fd && FD_ISSET (i ,& shub -> inset )) {
74
- FD_SET (i ,& copy );
75
- }
76
- }
77
- FD_COPY (& copy ,& shub -> inset );
69
+ FD_CLR (fd ,& shub -> inset );
78
70
}
79
71
80
72
static int read_socket (int sd ,char * buf ,int size )
81
73
{
82
74
while (size != 0 ) {
83
- int n = recv (sd ,buf ,size ,0 );
75
+ int n = recv (sd ,buf ,size ,0 );
84
76
if (n <=0 ) {
85
77
return 0 ;
86
78
}
@@ -159,27 +151,26 @@ static void reconnect(Shub* shub)
159
151
static void recovery (Shub * shub )
160
152
{
161
153
int i ,max_fd ;
162
- fd_set okset ;
163
- fd_set tryset ;
164
154
165
155
for (i = 0 ,max_fd = shub -> max_fd ;i <=max_fd ;i ++ ) {
166
156
if (FD_ISSET (i ,& shub -> inset )) {
167
157
struct timeval tm = {0 ,0 };
158
+ fd_set tryset ;
168
159
FD_ZERO (& tryset );
169
160
FD_SET (i ,& tryset );
170
- if (select (i + 1 ,& tryset ,NULL ,NULL ,& tm )>= 0 ) {
171
- FD_SET ( i , & okset );
161
+ if (select (i + 1 ,& tryset ,NULL ,NULL ,& tm )< 0 ) {
162
+ close_socket ( shub , i );
172
163
}
173
164
}
174
165
}
175
- FD_COPY (& okset ,& shub -> inset );
176
166
}
177
167
178
168
void ShubInitialize (Shub * shub ,ShubParams * params )
179
169
{
180
170
struct sockaddr sock ;
181
171
182
172
shub -> params = params ;
173
+
183
174
sock .sa_family = AF_UNIX ;
184
175
strcpy (sock .sa_data ,params -> file );
185
176
unlink (params -> file );
@@ -203,6 +194,9 @@ void ShubInitialize(Shub* shub, ShubParams* params)
203
194
if (shub -> in_buffer == NULL || shub -> out_buffer == NULL ) {
204
195
shub -> params -> error_handler ("Failed to allocate buffer" ,SHUB_FATAL_ERROR );
205
196
}
197
+ shub -> in_buffer_used = 0 ;
198
+ shub -> out_buffer_used = 0 ;
199
+ shub -> max_fd = -1 ;
206
200
}
207
201
208
202
@@ -219,8 +213,7 @@ void ShubLoop(Shub* shub)
219
213
tm .tv_sec = shub -> params -> delay /1000 ;
220
214
tm .tv_usec = shub -> params -> delay %1000 * 1000 ;
221
215
222
-
223
- FD_COPY (& shub -> inset ,& events );
216
+ events = shub -> inset ;
224
217
rc = select (shub -> max_fd + 1 ,& events ,NULL ,NULL ,shub -> in_buffer_used == 0 ?NULL :& tm );
225
218
if (rc < 0 ) {
226
219
if (errno != EINTR ) {
@@ -250,15 +243,16 @@ void ShubLoop(Shub* shub)
250
243
}
251
244
shub -> out_buffer_used += available ;
252
245
while (pos + sizeof (ShubMessageHdr ) <=shub -> out_buffer_used ) {
253
- ShubMessageHdr * hdr = (ShubMessageHdr * )shub -> out_buffer ;
246
+ ShubMessageHdr * hdr = (ShubMessageHdr * )( shub -> out_buffer + pos ) ;
254
247
int chan = hdr -> chan ;
248
+ n = pos + sizeof (ShubMessageHdr )+ hdr -> size <=shub -> out_buffer_used ?hdr -> size + sizeof (ShubMessageHdr ) :shub -> out_buffer_used - pos ;
255
249
pos += sizeof (ShubMessageHdr );
256
- n = pos + hdr -> size <=shub -> out_buffer_used ?hdr -> size + sizeof (ShubMessageHdr ) :shub -> out_buffer_used - pos ;
257
250
if (!write_socket (chan , (char * )hdr ,n )) {
258
251
shub -> params -> error_handler ("Failed to write to local socket" ,SHUB_RECOVERABLE_ERROR );
259
252
close_socket (shub ,chan );
260
253
chan = -1 ;
261
254
}
255
+ /* read rest of message if it doesn't fit in buffer */
262
256
if (n != hdr -> size + sizeof (ShubMessageHdr )) {
263
257
int tail = hdr -> size + sizeof (ShubMessageHdr )- n ;
264
258
do {
@@ -275,6 +269,7 @@ void ShubLoop(Shub* shub)
275
269
}
276
270
tail -= n ;
277
271
}while (tail != 0 );
272
+ pos = ;
278
273
}
279
274
}
280
275
memcpy (shub -> out_buffer ,shub -> out_buffer + pos ,shub -> out_buffer_used - pos );