Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commite8f78cc

Browse files
committed
Before suppressing stream.c
During Shuffle Join development I saw that conveyor is not work fully with synchronous message passing with delivery (we got a deadlock). We need new message passing interface. But now ,ore simplistic way for a proof-of-concept development we can use current dmq with barriers and reconnection counters.
1 parent9a49249 commite8f78cc

File tree

6 files changed

+485
-174
lines changed

6 files changed

+485
-174
lines changed

‎contrib/pg_exchange/common.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919
#include"storage/lock.h"
2020
#include"dmq.h"
2121

22+
typedefcharNodeName[256];
2223

2324
typedefstruct
2425
{
2526
Oidserverid;
2627
DmqDestinationIddest_id;
28+
NodeNamenode;
2729
}DMQDestinations;
2830

2931
typedefstruct

‎contrib/pg_exchange/dmq.c

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,19 @@ dmq_sender_at_exit(int status, Datum arg)
324324
LWLockRelease(dmq_state->lock);
325325
}
326326

327+
staticvoid
328+
switch_destination_state(DmqDestinationIddest_id,DmqConnStatestate)
329+
{
330+
DmqDestination*dest;
331+
332+
LWLockAcquire(dmq_state->lock,LW_EXCLUSIVE);
333+
dest=&(dmq_state->destinations[dest_id]);
334+
Assert(dest->active);
335+
336+
dest->state=state;
337+
LWLockRelease(dmq_state->lock);
338+
}
339+
327340
void
328341
dmq_sender_main(Datummain_arg)
329342
{
@@ -402,6 +415,7 @@ dmq_sender_main(Datum main_arg)
402415
conns[i]=*dest;
403416
Assert(conns[i].pgconn==NULL);
404417
conns[i].state=Idle;
418+
dest->state=Idle;
405419
prev_timer_at=0;/* do not wait for timer event */
406420
}
407421
/* close connection to deleted destination */
@@ -443,6 +457,7 @@ dmq_sender_main(Datum main_arg)
443457
{
444458
// Assert(PQstatus(conns[conn_id].pgconn) != CONNECTION_OK);
445459
conns[conn_id].state=Idle;
460+
switch_destination_state(conn_id,Idle);
446461
// DeleteWaitEvent(set, conns[conn_id].pos);
447462

448463
mtm_log(DmqStateFinal,
@@ -532,6 +547,7 @@ dmq_sender_main(Datum main_arg)
532547
if (PQstatus(conns[conn_id].pgconn)==CONNECTION_BAD)
533548
{
534549
conns[conn_id].state=Idle;
550+
switch_destination_state(conn_id,Idle);
535551

536552
mtm_log(DmqStateIntermediate,
537553
"[DMQ] failed to start connection with %s (%s): %s",
@@ -542,6 +558,7 @@ dmq_sender_main(Datum main_arg)
542558
else
543559
{
544560
conns[conn_id].state=Connecting;
561+
switch_destination_state(conn_id,Connecting);
545562
conns[conn_id].pos=AddWaitEventToSet(set,WL_SOCKET_CONNECTED,
546563
PQsocket(conns[conn_id].pgconn),
547564
NULL, (void*)conn_id);
@@ -559,6 +576,7 @@ dmq_sender_main(Datum main_arg)
559576
if (ret<0)
560577
{
561578
conns[conn_id].state=Idle;
579+
switch_destination_state(conn_id,Idle);
562580
// DeleteWaitEvent(set, conns[conn_id].pos);
563581
// Assert(PQstatus(conns[i].pgconn) != CONNECTION_OK);
564582

@@ -622,6 +640,7 @@ dmq_sender_main(Datum main_arg)
622640
sender_name);
623641

624642
conns[conn_id].state=Negotiating;
643+
switch_destination_state(conn_id,Negotiating);
625644
ModifyWaitEvent(set,event.pos,WL_SOCKET_READABLE,NULL);
626645
PQsendQuery(conns[conn_id].pgconn,query);
627646

@@ -632,6 +651,7 @@ dmq_sender_main(Datum main_arg)
632651
elseif (status==PGRES_POLLING_FAILED)
633652
{
634653
conns[conn_id].state=Idle;
654+
switch_destination_state(conn_id,Idle);
635655
DeleteWaitEvent(set,event.pos);
636656

637657
mtm_log(DmqStateIntermediate,
@@ -655,6 +675,7 @@ dmq_sender_main(Datum main_arg)
655675
if (!PQconsumeInput(conns[conn_id].pgconn))
656676
{
657677
conns[conn_id].state=Idle;
678+
switch_destination_state(conn_id,Idle);
658679
DeleteWaitEvent(set,event.pos);
659680

660681
mtm_log(DmqStateIntermediate,
@@ -665,6 +686,7 @@ dmq_sender_main(Datum main_arg)
665686
if (!PQisBusy(conns[conn_id].pgconn))
666687
{
667688
conns[conn_id].state=Active;
689+
switch_destination_state(conn_id,Active);
668690
DeleteWaitEvent(set,event.pos);
669691

670692
mtm_log(DmqStateFinal,
@@ -679,6 +701,7 @@ dmq_sender_main(Datum main_arg)
679701
if (!PQconsumeInput(conns[conn_id].pgconn))
680702
{
681703
conns[conn_id].state=Idle;
704+
switch_destination_state(conn_id,Idle);
682705

683706
mtm_log(DmqStateFinal,
684707
"[DMQ] connection error with %s: %s",
@@ -1645,6 +1668,31 @@ dmq_destination_add(char *connstr, char *sender_name, char *receiver_name,
16451668
returndest_id;
16461669
}
16471670

1671+
/*
1672+
* Check availability of destination node.
1673+
* It is needed before sending process to prevent data loss.
1674+
*/
1675+
DmqConnState
1676+
dmq_get_destination_status(DmqDestinationIddest_id)
1677+
{
1678+
DmqConnStatestate;
1679+
1680+
if ((dest_id<0)|| (dest_id >=DMQ_MAX_DESTINATIONS))
1681+
return-2;
1682+
1683+
LWLockAcquire(dmq_state->lock,LW_EXCLUSIVE);
1684+
DmqDestination*dest=&(dmq_state->destinations[dest_id]);
1685+
if (!dest->active)
1686+
{
1687+
LWLockRelease(dmq_state->lock);
1688+
return-1;
1689+
}
1690+
1691+
state=dest->state;
1692+
LWLockRelease(dmq_state->lock);
1693+
returnstate;
1694+
}
1695+
16481696
void
16491697
dmq_destination_drop(char*receiver_name)
16501698
{

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp