3
3
*
4
4
*/
5
5
6
+ #include "sbuf.h"
6
7
#include "stream.h"
7
8
#include "miscadmin.h"
8
9
#include "unistd.h"
9
10
#include "utils/memutils.h" /* MemoryContexts */
10
11
11
- #define IsDeliveryMessage (msg )(msg->tot_len ==MinSizeOfSendBuf )
12
+ #define IsDeliveryMessage (msg )(msg->datalen ==0 )
12
13
13
14
static List * istreams = NIL ;
14
15
static List * ostreams = NIL ;
@@ -41,6 +42,8 @@ get_stream(List *streams, const char *name)
41
42
for (lc = list_head (streams );lc != NULL ;lc = lnext (lc ))
42
43
{
43
44
char * streamName = (char * )lfirst (lc );
45
+
46
+ /* streamName is a first field of IStream and OStream structures. */
44
47
if (strcmp (name ,streamName )== 0 )
45
48
return streamName ;
46
49
}
@@ -153,12 +156,13 @@ RecvIfAny(void)
153
156
DmqDestinationId dest_id ;
154
157
155
158
/* If message is not delivery message, send delivery. */
156
- dbuf .tot_len = MinSizeOfSendBuf ;
159
+ dbuf .datalen = 0 ;
157
160
dbuf .index = msg -> index ;
158
161
dest_id = dmq_dest_id (sender_id );
159
162
Assert (dest_id >=0 );
160
163
161
- dmq_push_buffer (dest_id ,istream -> streamName ,& dbuf ,dbuf .tot_len , false);
164
+ dmq_push_buffer (dest_id ,istream -> streamName ,& dbuf ,
165
+ MinSizeOfSendBuf , false);
162
166
}
163
167
}
164
168
}
@@ -193,17 +197,18 @@ checkDelivery(OStream *ostream)
193
197
{
194
198
RecvBuf * buf = lfirst (lc );
195
199
196
- istream -> msgs = list_delete_ptr (istream -> msgs ,buf );
200
+ istream -> deliveries = list_delete_ptr (istream -> deliveries ,buf );
197
201
pfree (buf );
198
202
}
203
+ list_free (temp );
199
204
return found ;
200
205
}
201
206
202
207
static void
203
208
StreamRepeatSend (OStream * ostream )
204
209
{
205
210
while (!dmq_push_buffer (ostream -> dest_id ,ostream -> streamName ,ostream -> buf ,
206
- ostream -> buf -> tot_len , true))
211
+ buf_len ( ostream -> buf ) , true))
207
212
RecvIfAny ();
208
213
}
209
214
@@ -215,36 +220,31 @@ ISendTuple(DmqDestinationId dest_id, char *stream, TupleTableSlot *slot,
215
220
int tupsize ;
216
221
SendBuf * buf ;
217
222
OStream * ostream ;
223
+ int tot_len ;
218
224
219
225
RecvIfAny ();
220
226
221
227
ostream = (OStream * )get_stream (ostreams ,stream );
222
228
Assert (ostream && !ostream -> buf );
223
229
224
- if (!TupIsNull (slot ))
225
- {
226
- int tot_len ;
230
+ Assert (!TupIsNull (slot ));
227
231
228
- if (slot -> tts_tuple == NULL )
229
- ExecMaterializeSlot (slot );
232
+ if (slot -> tts_tuple == NULL )
233
+ ExecMaterializeSlot (slot );
230
234
231
- tuple = slot -> tts_tuple ;
232
- tupsize = offsetof(HeapTupleData ,t_data );
233
-
234
- tot_len = MinSizeOfSendBuf + tupsize + tuple -> t_len ;
235
- buf = palloc (tot_len );
236
- buf -> tot_len = tot_len ;
237
- memcpy (buf -> data ,tuple ,tupsize );
238
- memcpy (buf -> data + tupsize ,tuple -> t_data ,tuple -> t_len );
239
- }
240
- else
241
- Assert (0 );
235
+ tuple = slot -> tts_tuple ;
236
+ tupsize = offsetof(HeapTupleData ,t_data );
237
+ tot_len = MinSizeOfSendBuf + tupsize + tuple -> t_len ;
238
+ buf = palloc (tot_len );
239
+ buf -> datalen = tot_len - MinSizeOfSendBuf ;
240
+ memcpy (buf -> data ,tuple ,tupsize );
241
+ memcpy (buf -> data + tupsize ,tuple -> t_data ,tuple -> t_len );
242
242
243
243
buf -> index = ++ (ostream -> index );
244
244
buf -> needConfirm = needConfirm ;
245
245
ostream -> dest_id = dest_id ;
246
246
247
- while (!dmq_push_buffer (dest_id ,stream ,buf ,buf -> tot_len , true))
247
+ while (!dmq_push_buffer (dest_id ,stream ,buf ,buf_len ( buf ) , true))
248
248
RecvIfAny ();
249
249
250
250
if (buf -> needConfirm )
@@ -284,15 +284,15 @@ SendByteMessage(DmqDestinationId dest_id, char *stream, char tag)
284
284
Assert (ostream && !ostream -> buf );
285
285
286
286
buf = palloc (MinSizeOfSendBuf + 1 );
287
- buf -> tot_len = MinSizeOfSendBuf + 1 ;
287
+ buf -> datalen = 1 ;
288
288
buf -> data [0 ]= tag ;
289
289
buf -> index = ++ (ostream -> index );
290
290
buf -> needConfirm = true;
291
291
292
292
ostream -> buf = buf ;
293
293
ostream -> dest_id = dest_id ;
294
294
295
- while (!dmq_push_buffer (dest_id ,stream ,buf ,buf -> tot_len , true))
295
+ while (!dmq_push_buffer (dest_id ,stream ,buf ,buf_len ( buf ) , true))
296
296
RecvIfAny ();
297
297
298
298
wait_for_delivery (ostream );
@@ -335,7 +335,7 @@ RecvByteMessage(const char *streamName, const char *sender)
335
335
*/
336
336
void
337
337
SendTuple (DmqDestinationId dest_id ,char * stream ,TupleTableSlot * slot ,
338
- bool needConfirm )
338
+ bool needConfirm )
339
339
{
340
340
OStream * ostream ;
341
341