Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit03a11dd

Browse files
committed
DMQ Wrapper + SELECT * from partition works
1 parent2ffbe84 commit03a11dd

23 files changed

+984
-781
lines changed

‎contrib/Makefile

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ SUBDIRS = \
3131
passwordcheck\
3232
pg_buffercache\
3333
pg_exchange\
34-
pg_execplan\
3534
pg_freespacemap\
3635
pg_prewarm\
3736
pg_standby\

‎contrib/pg_exchange/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ EXTENSION = pg_exchange
55
EXTVERSION = 0.1
66
PGFILEDESC = "pg_exchange - an exchange custom node and rules for the planner"
77
MODULES = pg_exchange
8-
OBJS = pg_exchange.o exchange.o hooks.o common.o nodeDummyscan.o nodeDistPlanExec.o dmq.o$(WIN32RES)
8+
OBJS = pg_exchange.o exchange.o hooks.o common.o nodeDummyscan.o nodeDistPlanExec.o dmq.ostream.o$(WIN32RES)
99

1010
fdw_srcdir =$(top_srcdir)/contrib/postgres_fdw/
1111

‎contrib/pg_exchange/common.c

Lines changed: 1 addition & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -23,96 +23,4 @@
2323
#include"common.h"
2424

2525

26-
/* GUC variables */
27-
intnode_number;
28-
intnodes_at_cluster;
29-
char*pargres_hosts_string=NULL;
30-
char*pargres_ports_string=NULL;
31-
inteports_pool_size=100;
32-
33-
intCoordNode=-1;
34-
boolPargresInitialized= false;
35-
PortStack*PORTS;
36-
37-
38-
Oid
39-
get_pargres_schema(void)
40-
{
41-
Oidresult;
42-
Relationrel;
43-
SysScanDescscandesc;
44-
HeapTupletuple;
45-
ScanKeyDataentry[1];
46-
Oidext_oid;
47-
48-
/* It's impossible to fetch pg_pathman's schema now */
49-
if (!IsTransactionState())
50-
returnInvalidOid;
51-
52-
ext_oid=get_extension_oid("pargres", true);
53-
if (ext_oid==InvalidOid)
54-
returnInvalidOid;/* exit if pg_pathman does not exist */
55-
56-
ScanKeyInit(&entry[0],
57-
ObjectIdAttributeNumber,
58-
BTEqualStrategyNumber,F_OIDEQ,
59-
ObjectIdGetDatum(ext_oid));
60-
61-
rel=heap_open(ExtensionRelationId,AccessShareLock);
62-
scandesc=systable_beginscan(rel,ExtensionOidIndexId, true,
63-
NULL,1,entry);
64-
65-
tuple=systable_getnext(scandesc);
66-
67-
/* We assume that there can be at most one matching tuple */
68-
if (HeapTupleIsValid(tuple))
69-
result= ((Form_pg_extension)GETSTRUCT(tuple))->extnamespace;
70-
else
71-
result=InvalidOid;
72-
73-
systable_endscan(scandesc);
74-
75-
heap_close(rel,AccessShareLock);
76-
77-
returnresult;
78-
}
79-
80-
void
81-
STACK_Init(PortStack*stack,intrange_min,intsize)
82-
{
83-
inti;
84-
85-
LWLockAcquire(&stack->lock,LW_EXCLUSIVE);
86-
87-
stack->size=size;
88-
stack->index=0;
89-
for (i=0;i<stack->size;i++)
90-
stack->values[i]=range_min+i;
91-
92-
LWLockRelease(&stack->lock);
93-
}
94-
95-
int
96-
STACK_Pop(PortStack*stack)
97-
{
98-
intvalue;
99-
100-
LWLockAcquire(&stack->lock,LW_EXCLUSIVE);
101-
102-
Assert(stack->index<stack->size);
103-
value=stack->values[stack->index++];
104-
105-
LWLockRelease(&stack->lock);
106-
returnvalue;
107-
}
108-
109-
void
110-
STACK_Push(PortStack*stack,intvalue)
111-
{
112-
LWLockAcquire(&stack->lock,LW_EXCLUSIVE);
113-
114-
Assert(stack->index>0);
115-
stack->values[--stack->index]=value;
116-
117-
LWLockRelease(&stack->lock);
118-
}
26+
ExchangeSharedState*ExchShmem=NULL;

‎contrib/pg_exchange/common.h

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,30 +17,28 @@
1717

1818
#include"nodes/pg_list.h"
1919
#include"storage/lock.h"
20+
#include"dmq.h"
2021

2122

2223
typedefstruct
2324
{
24-
LWLocklock;
25-
intsize;
26-
intindex;
27-
intvalues[FLEXIBLE_ARRAY_MEMBER];
28-
}PortStack;
25+
Oidserverid;
26+
DmqDestinationIddest_id;
27+
}DMQDestinations;
2928

29+
typedefstruct
30+
{
31+
intnservers;
32+
DMQDestinations*dests;
33+
intcoordinator_num;
34+
}DMQDestCont;
3035

31-
/* GUC variables */
32-
externchar*pargres_hosts_string;
33-
externchar*pargres_ports_string;
34-
externinteports_pool_size;
35-
36-
externPortStack*PORTS;
37-
externintCoordNode;
38-
externboolPargresInitialized;
39-
36+
typedefstruct
37+
{
38+
LWLock*lock;
39+
HTAB*htab;
40+
}ExchangeSharedState;
4041

41-
externOidget_pargres_schema(void);
42-
externvoidSTACK_Init(PortStack*stack,intrange_min,intsize);
43-
intSTACK_Pop(PortStack*stack);
44-
voidSTACK_Push(PortStack*stack,intvalue);
42+
externExchangeSharedState*ExchShmem;
4543

4644
#endif/* COMMON_H_ */

‎contrib/pg_exchange/dmq.c

Lines changed: 71 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@
6262
#defineDMQ_CONNSTR_MAX_LEN 1024
6363

6464
#defineDMQ_MAX_SUBS_PER_BACKEND 100
65-
#defineDMQ_MAX_DESTINATIONS100
65+
#defineDMQ_MAX_DESTINATIONS127
6666
#defineDMQ_MAX_RECEIVERS 100
6767

6868
typedefenum
@@ -118,7 +118,7 @@ struct DmqSharedState
118118

119119

120120
/* Backend-local i/o queues. */
121-
struct
121+
staticstruct
122122
{
123123
shm_mq_handle*mq_outh;
124124
intn_inhandles;
@@ -294,14 +294,6 @@ dmq_toc_size()
294294
*
295295
*****************************************************************************/
296296

297-
// static void
298-
// fe_close(PGconn *conn)
299-
// {
300-
// PQputCopyEnd(conn, NULL);
301-
// PQflush(conn);
302-
// PQfinish(conn);
303-
// }
304-
305297
staticint
306298
fe_send(PGconn*conn,char*msg,size_tlen)
307299
{
@@ -435,12 +427,12 @@ dmq_sender_main(Datum main_arg)
435427
res=shm_mq_receive(mq_handles[i],&len,&data, true);
436428
if (res==SHM_MQ_SUCCESS)
437429
{
438-
intconn_id;
430+
DmqDestinationIdconn_id;
439431

440432
/* first byte is connection_id */
441-
conn_id=* (char*)data;
442-
data= (char*)data+1;
443-
len-=1;
433+
conn_id=* (DmqDestinationId*)data;
434+
data= (char*)data+sizeof(DmqDestinationId);
435+
len-=sizeof(DmqDestinationId);
444436
Assert(0 <=conn_id&&conn_id<DMQ_MAX_DESTINATIONS);
445437

446438
if (conns[conn_id].state==Active)
@@ -724,7 +716,9 @@ dmq_handle_message(StringInfo msg, shm_mq_handle **mq_handles, dsm_segment *seg)
724716
{
725717
constchar*stream_name;
726718
constchar*body;
719+
constchar*msgptr;
727720
intbody_len;
721+
intmsg_len;
728722
boolfound;
729723
DmqStreamSubscription*sub;
730724
shm_mq_resultres;
@@ -734,9 +728,11 @@ dmq_handle_message(StringInfo msg, shm_mq_handle **mq_handles, dsm_segment *seg)
734728
* as message body with unknown format that we are going to send down to
735729
* the subscribed backend.
736730
*/
737-
stream_name=pq_getmsgrawstring(msg);
738-
body_len=msg->len-msg->cursor;
739-
body=pq_getmsgbytes(msg,body_len);
731+
msg_len=msg->len-msg->cursor;
732+
msgptr=pq_getmsgbytes(msg,msg_len);
733+
stream_name=msgptr;
734+
body=msgptr+strlen(stream_name)+1;
735+
body_len=msg_len- (body-msgptr);
740736
pq_getmsgend(msg);
741737

742738
/*
@@ -773,7 +769,7 @@ dmq_handle_message(StringInfo msg, shm_mq_handle **mq_handles, dsm_segment *seg)
773769
sub->procno);
774770

775771
/* and send it */
776-
res=shm_mq_send(mq_handles[sub->procno],body_len,body, false);
772+
res=shm_mq_send(mq_handles[sub->procno],msg_len,msgptr, false);
777773
if (res!=SHM_MQ_SUCCESS)
778774
{
779775
mtm_log(WARNING,"[DMQ] can't send to queue %d",sub->procno);
@@ -1345,11 +1341,18 @@ dmq_reattach_shm_mq(int handle_id)
13451341
}
13461342

13471343
DmqSenderId
1348-
dmq_attach_receiver(char*sender_name,intmask_pos)
1344+
dmq_attach_receiver(constchar*sender_name,intmask_pos)
13491345
{
13501346
inti;
13511347
inthandle_id;
13521348

1349+
/* Search for existed receiver. */
1350+
for (i=0;i<dmq_local.n_inhandles;i++)
1351+
{
1352+
if (strcmp(sender_name,dmq_local.inhandles[i].name)==0)
1353+
returni;
1354+
}
1355+
13531356
for (i=0;i<DMQ_MAX_RECEIVERS;i++)
13541357
{
13551358
if (dmq_local.inhandles[i].name[0]=='\0')
@@ -1375,7 +1378,7 @@ dmq_attach_receiver(char *sender_name, int mask_pos)
13751378
}
13761379

13771380
void
1378-
dmq_detach_receiver(char*sender_name)
1381+
dmq_detach_receiver(constchar*sender_name)
13791382
{
13801383
inti;
13811384
inthandle_id=-1;
@@ -1440,6 +1443,36 @@ dmq_stream_unsubscribe(const char *stream_name)
14401443
Assert(found);
14411444
}
14421445

1446+
constchar*
1447+
dmq_sender_name(DmqSenderIdid)
1448+
{
1449+
Assert((id >=0)&& (id<dmq_local.n_inhandles));
1450+
1451+
if (dmq_local.inhandles[id].name[0]=='\0')
1452+
returnNULL;
1453+
returndmq_local.inhandles[id].name;
1454+
}
1455+
1456+
DmqDestinationId
1457+
dmq_remote_id(constchar*name)
1458+
{
1459+
DmqDestinationIdi;
1460+
1461+
LWLockAcquire(dmq_state->lock,LW_SHARED);
1462+
for (i=0;i<DMQ_MAX_DESTINATIONS;i++)
1463+
{
1464+
DmqDestination*dest=&(dmq_state->destinations[i]);
1465+
if (strcmp(name,dest->receiver_name)==0)
1466+
break;
1467+
}
1468+
LWLockRelease(dmq_state->lock);
1469+
1470+
if (i==DMQ_MAX_DESTINATIONS)
1471+
return-1;
1472+
1473+
returni;
1474+
}
1475+
14431476
/*
14441477
* Get a message from input queue. Execution blocking until message will not
14451478
* received. Returns false, if an error is occured.
@@ -1448,10 +1481,12 @@ dmq_stream_unsubscribe(const char *stream_name)
14481481
* msg - buffer that contains received message.
14491482
* len - size of received message.
14501483
*/
1451-
bool
1452-
dmq_pop(DmqSenderId*sender_id,void**msg,Size*len,uint64mask)
1484+
constchar*
1485+
dmq_pop(DmqSenderId*sender_id,void**msg,Size*len,uint64mask,
1486+
boolwaitMsg)
14531487
{
14541488
shm_mq_resultres;
1489+
constchar*stream;
14551490

14561491
Assert(msg&&len);
14571492

@@ -1477,13 +1512,19 @@ dmq_pop(DmqSenderId *sender_id, void **msg, Size *len, uint64 mask)
14771512

14781513
if (res==SHM_MQ_SUCCESS)
14791514
{
1480-
*msg=data;
1515+
/*
1516+
* Stream name is first null-terminated string in
1517+
* the message buffer.
1518+
*/
1519+
stream=data;
1520+
*msg= (void*) ((char*)data+strlen(stream)+1);
1521+
*len-= (char*)(*msg)- (char*)data;
14811522
*sender_id=i;
14821523

14831524
mtm_log(DmqTraceIncoming,
14841525
"[DMQ] dmq_pop: got message %s from %s",
14851526
(char*)data,dmq_local.inhandles[i].name);
1486-
returntrue;
1527+
returnstream;
14871528
}
14881529
elseif (res==SHM_MQ_DETACHED)
14891530
{
@@ -1498,13 +1539,15 @@ dmq_pop(DmqSenderId *sender_id, void **msg, Size *len, uint64 mask)
14981539
else
14991540
{
15001541
*sender_id=i;
1501-
returnfalse;
1542+
returnNULL;
15021543
}
15031544
}
15041545
}
15051546

1506-
if (nowait)
1547+
if (nowait&&waitMsg)
15071548
continue;
1549+
if (!waitMsg)
1550+
returnNULL;
15081551

15091552
// XXX cache that
15101553
rc=WaitLatch(MyLatch,WL_LATCH_SET |WL_TIMEOUT,10.0,
@@ -1516,6 +1559,7 @@ dmq_pop(DmqSenderId *sender_id, void **msg, Size *len, uint64 mask)
15161559
if (rc&WL_LATCH_SET)
15171560
ResetLatch(MyLatch);
15181561
}
1562+
returnNULL;
15191563
}
15201564

15211565
bool
@@ -1566,8 +1610,7 @@ dmq_pop_nb(DmqSenderId *sender_id, StringInfo msg, uint64 mask)
15661610
* sender_name - a symbolic name of the sender. Remote backend will attach
15671611
* to this channel by sender name.
15681612
* See dmq_attach_receiver() routine for details.
1569-
* Call this function after shared memory initialization. For example,
1570-
* extensions may create channels during 'CREATE EXTENSION' command execution.
1613+
* Call this function after shared memory initialization.
15711614
*/
15721615
DmqDestinationId
15731616
dmq_destination_add(char*connstr,char*sender_name,char*receiver_name,

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp