Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commite00fd69

Browse files
committed
Transitional commit.
1 parent3b9ebfd commite00fd69

File tree

5 files changed

+111
-16
lines changed

5 files changed

+111
-16
lines changed

‎contrib/pg_exchange/dmq.h

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ typedef int8 DmqSenderId;
2121

2222
externvoiddmq_init(constchar*library_name);
2323

24-
externDmqDestinationIddmq_destination_add(char*connstr,char*sender_name,
25-
char*receiver_name,intping_period);
24+
externDmqDestinationIddmq_destination_add(char*connstr,
25+
char*sender_name,
26+
char*receiver_name,
27+
intping_period);
2628
externDmqConnStatedmq_get_destination_status(DmqDestinationIddest_id);
2729
externvoiddmq_destination_drop(constchar*receiver_name);
2830

@@ -37,11 +39,13 @@ extern const char *dmq_sender_name(DmqSenderId id);
3739
externDmqDestinationIddmq_remote_id(constchar*name);
3840

3941
externconstchar*
40-
dmq_pop(DmqSenderId*sender_id,void**msg,Size*len,uint64mask,boolwaitMsg);
42+
dmq_pop(DmqSenderId*sender_id,void**msg,Size*len,uint64mask,
43+
boolwaitMsg);
4144
externbooldmq_pop_nb(DmqSenderId*sender_id,StringInfomsg,uint64mask);
4245

4346
externvoiddmq_push(DmqDestinationIddest_id,char*stream_name,char*msg);
44-
externvoiddmq_push_buffer(DmqDestinationIddest_id,char*stream_name,constvoid*buffer,size_tlen);
47+
externvoiddmq_push_buffer(DmqDestinationIddest_id,char*stream_name,
48+
constvoid*buffer,size_tlen);
4549

4650
typedefvoid (*dmq_receiver_hook_type) (constchar*);
4751
externdmq_receiver_hook_typedmq_receiver_start_hook;

‎contrib/pg_exchange/nodeDistPlanExec.c

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ static CustomScanMethodsdistplanexec_plan_methods;
4444
staticCustomExecMethodsdistplanexec_exec_methods;
4545

4646
chardestsName[10]="DMQ_DESTS";
47+
char*network_interface;
4748

4849

4950
staticNode*CreateDistPlanExecState(CustomScan*node);
@@ -257,7 +258,7 @@ EstablishDMQConnections(const lcontext *context, const char *serverName,
257258
sprintf(connstr,"host=%s port=%d "
258259
"fallback_application_name=%s",
259260
host,port,senderName);
260-
261+
elog(LOG,"CONN STR: %s",connstr);
261262
sub->dest_id=dmq_destination_add(connstr,senderName,receiverName,10);
262263
memcpy(sub->node,receiverName,strlen(receiverName)+1);
263264
}
@@ -391,12 +392,9 @@ ExplainDistPlanExec(CustomScanState *node, List *ancestors, ExplainState *es)
391392
}
392393

393394
staticstructPlan*
394-
CreateDistExecPlan(PlannerInfo*root,
395-
RelOptInfo*rel,
396-
structCustomPath*best_path,
397-
List*tlist,
398-
List*clauses,
399-
List*custom_plans)
395+
CreateDistExecPlan(PlannerInfo*root,RelOptInfo*rel,
396+
structCustomPath*best_path,
397+
List*tlist,List*clauses,List*custom_plans)
400398
{
401399
CustomScan*distExecNode;
402400

@@ -572,7 +570,7 @@ localize_plan(Plan *node, lcontext *context)
572570
if (IsExchangePlanNode(node))
573571
{
574572
List*private= ((CustomScan*)node)->custom_private;
575-
elog(LOG,"LOCALIZE: exchange");
573+
576574
if (lnext(lnext(list_head(private))))
577575
context->indexinfo= (IndexOptInfo*)lthird(private);
578576
}
@@ -582,7 +580,6 @@ elog(LOG, "LOCALIZE: exchange");
582580
if (context->foreign_scans!=NIL)
583581
{
584582
CustomScan*css= (CustomScan*)node;
585-
//Index scanrelid = ((Scan *) cstmSubPlan1(node))->scanrelid;
586583

587584
Assert(list_length(context->foreign_scans)==1);
588585
css->custom_plans=list_delete_ptr(css->custom_plans,
@@ -722,10 +719,48 @@ FSExtractServerName(Oid fsid, char **host, int *port)
722719
*host=hostname;
723720
}
724721

722+
#include<unistd.h>
723+
#include<sys/types.h>
724+
#include<sys/socket.h>
725+
#include<sys/ioctl.h>
726+
#include<netinet/in.h>
727+
#include<net/if.h>
728+
#include<arpa/inet.h>
729+
#include"common/ip.h"
730+
725731
void
726732
GetMyServerName(char**host,int*port)
727733
{
728-
*host=pstrdup(LOCALHOST);
734+
intfd;
735+
structifreqifr;
736+
structaddrinfohintp;
737+
structaddrinfo*result;
738+
char*sipaddr;
739+
structsockaddr_storagesaddr;
740+
intres;
741+
742+
fd=socket(AF_INET,SOCK_DGRAM,0);
743+
744+
/* I want to get an IPv4 IP address */
745+
ifr.ifr_addr.sa_family=AF_INET;
746+
747+
/* I want IP address attached to "eth0" */
748+
strncpy(ifr.ifr_name,network_interface,IFNAMSIZ-1);
749+
ioctl(fd,SIOCGIFADDR,&ifr);
750+
close(fd);
751+
752+
MemSet(&hintp,0,sizeof(hintp));
753+
hintp.ai_family=AF_INET;
754+
hintp.ai_flags=AI_ALL;
755+
sipaddr=inet_ntoa(((structsockaddr_in*)&ifr.ifr_addr)->sin_addr);
756+
if ((res=pg_getaddrinfo_all(sipaddr,NULL,&hintp,&result))!=0)
757+
elog(FATAL,"Cannot resolve network address %s, error=%d.",sipaddr,res);
758+
memcpy(&saddr,result->ai_addr,result->ai_addrlen);
759+
*host= (char*)palloc0(NI_MAXHOST);
760+
if (pg_getnameinfo_all(&saddr,result->ai_addrlen,*host,NI_MAXHOST,
761+
NULL,0,0)!=0)
762+
elog(FATAL,"Cannot resolve network name");
763+
729764
*port=PostPortNumber;
730765
}
731766

@@ -755,8 +790,9 @@ dmq_init_barrier(DMQDestCont *dmq_data, PlanState *child)
755790
/* Wait for dmq connection establishing */
756791
for (i=0;i<dmq_data->nservers;i++)
757792
while (dmq_get_destination_status(dmq_data->dests[i].dest_id)!=Active);
758-
793+
elog(LOG,"DMQ INIT BARRIER");
759794
init_exchange_channel(child, (void*)dmq_data);
795+
elog(LOG,"END DMQ INIT BARRIER");
760796
}
761797

762798
staticbool
@@ -809,7 +845,7 @@ init_exchange_channel(PlanState *node, void *context)
809845
}
810846
else
811847
state->indexes[i]=j;
812-
848+
elog(LOG,"SendByteMessage: j=%d, dest_id=%d, stream=%s",j,dmq_data->dests[j].dest_id,state->stream);
813849
SendByteMessage(dmq_data->dests[j].dest_id,state->stream,ib);
814850
}
815851
return false;

‎contrib/pg_exchange/nodeDistPlanExec.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ typedef struct
2727
}lcontext;
2828

2929

30+
externchar*network_interface;
3031
externchardestsName[10];
3132
#defineDISTEXECPATHNAME"DistExecPath"
3233

‎contrib/pg_exchange/pg_exchange.c

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,25 @@ shmem_size(void)
5555
sizeof(DMQDestinations)));
5656
returnMAXALIGN(size);
5757
}
58+
#include"common/ip.h"
59+
#include"arpa/inet.h"
60+
#include"sys/socket.h"
61+
#include<netinet/in.h>
62+
#include<netdb.h>
63+
#include<sys/un.h>
64+
#include"libpq/pqcomm.h"
5865

5966
/*
6067
* Module load/unload callback
6168
*/
6269
void
6370
_PG_init(void)
6471
{
72+
DefineCustomStringVariable("network_interface",
73+
"Set network interface for EXCHANGE communications",
74+
NULL,&network_interface,"lo",
75+
PGC_SUSET,GUC_NOT_IN_SAMPLE,NULL,NULL,NULL);
76+
6577
EXCHANGE_Init_methods();
6678
DUMMYSCAN_Init_methods();
6779
EXEC_Hooks_init();
@@ -72,6 +84,47 @@ _PG_init(void)
7284

7385
old_dmq_receiver_stop_hook=dmq_receiver_stop_hook;
7486
dmq_receiver_stop_hook=OnNodeDisconnect;
87+
/*{
88+
char host[1024];
89+
FILE *f = fopen("/home/andrey/PostgresPro/pgcluster/hosts.txt", "rt");
90+
struct addrinfo hintp;
91+
struct addrinfo *result;
92+
MemSet(&hintp, 0, sizeof(hintp));
93+
//hintp.ai_socktype = SOCK_STREAM;
94+
hintp.ai_family = AF_UNSPEC;
95+
hintp.ai_flags = AI_ALL;
96+
97+
Assert(f != NULL);
98+
while (!feof(f))
99+
{
100+
struct addrinfo *next;
101+
int i=0;
102+
int res1;
103+
104+
fscanf(f, "%s", host);
105+
res1 = pg_getaddrinfo_all(host, NULL, &hintp, &result);
106+
next = result;
107+
108+
while (next != NULL)
109+
{
110+
SockAddr a1;
111+
int res2;
112+
char node[NI_MAXHOST];
113+
char service[NI_MAXSERV];
114+
char *res;
115+
116+
res = inet_ntoa(((struct sockaddr_in *)next->ai_addr)->sin_addr);
117+
memcpy(&a1.addr, next->ai_addr, next->ai_addrlen);
118+
res2 = pg_getnameinfo_all(&a1.addr, next->ai_addrlen, node, NI_MAXHOST,
119+
service, NI_MAXSERV, 0);
120+
elog(LOG, "[%d] srchost: %s, res: [%d, %d] IP: %s, host: %s, service: %s.",
121+
i++, host, res1, res2, res, node, service);
122+
next = next->ai_next;
123+
}
124+
pg_freeaddrinfo_all(hintp.ai_family, result);
125+
}
126+
fclose(f);
127+
} */
75128
}
76129

77130
Datum

‎contrib/pg_exchange/stream.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,7 @@ RecvTuple(TupleDesc tupdesc, char *streamName, int *status)
325325
*status=3;
326326
break;
327327
}
328+
328329
pfree(buf);
329330
return (TupleTableSlot*)NULL;
330331
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp