Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit8a12ea5

Browse files
committed
[PGPRO-4074] DeleteWaitEvent() for event sets.
Cherry-pick of c82e0d53f5.Note: I've blindly adapted to kqueue, need to test it.tags: multimaster, connpool.(cherry picked from commit 434471857301439c8b2a48dd1594bc608c65d23a)
1 parent652f552 commit8a12ea5

File tree

2 files changed

+157
-43
lines changed

2 files changed

+157
-43
lines changed

‎src/backend/storage/ipc/latch.c

Lines changed: 155 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -78,18 +78,36 @@
7878
#error "no wait set implementation available"
7979
#endif
8080

81+
/*
82+
* Connection pooler and mtm need to delete events from event set.
83+
* As far as we have too preserve positions of all other events,
84+
* we can not move events. So we have to maintain list of free events.
85+
* But poll/WaitForMultipleObjects manipulates with array of listened events.
86+
* That is why elements in pollfds and handle arrays should be stored without holes
87+
* and we need to maintain mapping between them and WaitEventSet events.
88+
* This mapping is stored in "permutation" array. Also we need backward mapping
89+
* (from event to descriptors array) which is implemented using "index" field of WaitEvent.
90+
*/
91+
8192
/* typedef in latch.h */
8293
structWaitEventSet
8394
{
8495
intnevents;/* number of registered events */
8596
intnevents_space;/* maximum number of events in this set */
8697

98+
/*
99+
* L1-list of free events linked by "pos" and terminated by -1.
100+
*/
101+
intfree_events;
102+
87103
/*
88104
* Array, of nevents_space length, storing the definition of events this
89105
* set is waiting for.
90106
*/
91107
WaitEvent*events;
92108

109+
int*permutation;/* indexes of used events (see comment above) */
110+
93111
/*
94112
* If WL_LATCH_SET is specified in any wait event, latch is a pointer to
95113
* said latch, and latch_pos the offset in the ->events array. This is
@@ -150,9 +168,9 @@ static void WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action
150168
#elif defined(WAIT_USE_KQUEUE)
151169
staticvoidWaitEventAdjustKqueue(WaitEventSet*set,WaitEvent*event,intold_events);
152170
#elif defined(WAIT_USE_POLL)
153-
staticvoidWaitEventAdjustPoll(WaitEventSet*set,WaitEvent*event);
171+
staticvoidWaitEventAdjustPoll(WaitEventSet*set,WaitEvent*event,boolremove);
154172
#elif defined(WAIT_USE_WIN32)
155-
staticvoidWaitEventAdjustWin32(WaitEventSet*set,WaitEvent*event);
173+
staticvoidWaitEventAdjustWin32(WaitEventSet*set,WaitEvent*event,boolremove);
156174
#endif
157175

158176
staticinlineintWaitEventSetWaitBlock(WaitEventSet*set,intcur_timeout,
@@ -574,6 +592,7 @@ CreateWaitEventSet(MemoryContext context, int nevents)
574592
*/
575593
sz+=MAXALIGN(sizeof(WaitEventSet));
576594
sz+=MAXALIGN(sizeof(WaitEvent)*nevents);
595+
sz+=MAXALIGN(sizeof(int)*nevents);
577596

578597
#if defined(WAIT_USE_EPOLL)
579598
sz+=MAXALIGN(sizeof(structepoll_event)*nevents);
@@ -594,23 +613,23 @@ CreateWaitEventSet(MemoryContext context, int nevents)
594613
set->events= (WaitEvent*)data;
595614
data+=MAXALIGN(sizeof(WaitEvent)*nevents);
596615

616+
set->permutation= (int*)data;
617+
data+=MAXALIGN(sizeof(int)*nevents);
618+
597619
#if defined(WAIT_USE_EPOLL)
598620
set->epoll_ret_events= (structepoll_event*)data;
599-
data+=MAXALIGN(sizeof(structepoll_event)*nevents);
600621
#elif defined(WAIT_USE_KQUEUE)
601622
set->kqueue_ret_events= (structkevent*)data;
602-
data+=MAXALIGN(sizeof(structkevent)*nevents);
603623
#elif defined(WAIT_USE_POLL)
604624
set->pollfds= (structpollfd*)data;
605-
data+=MAXALIGN(sizeof(structpollfd)*nevents);
606625
#elif defined(WAIT_USE_WIN32)
607-
set->handles= (HANDLE)data;
608-
data+=MAXALIGN(sizeof(HANDLE)*nevents);
626+
set->handles= (HANDLE*)data;
609627
#endif
610628

611629
set->latch=NULL;
612630
set->nevents_space=nevents;
613631
set->exit_on_postmaster_death= false;
632+
set->free_events=-1;
614633

615634
#if defined(WAIT_USE_EPOLL)
616635
if (!AcquireExternalFD())
@@ -702,12 +721,11 @@ FreeWaitEventSet(WaitEventSet *set)
702721
close(set->kqueue_fd);
703722
ReleaseExternalFD();
704723
#elif defined(WAIT_USE_WIN32)
705-
WaitEvent*cur_event;
724+
inti;
706725

707-
for (cur_event=set->events;
708-
cur_event< (set->events+set->nevents);
709-
cur_event++)
726+
for (i=0;i<set->nevents;i++)
710727
{
728+
WaitEvent*cur_event=&set->events[set->permutation[i]];
711729
if (cur_event->events&WL_LATCH_SET)
712730
{
713731
/* uses the latch's HANDLE */
@@ -720,7 +738,7 @@ FreeWaitEventSet(WaitEventSet *set)
720738
{
721739
/* Clean up the event object we created for the socket */
722740
WSAEventSelect(cur_event->fd,NULL,0);
723-
WSACloseEvent(set->handles[cur_event->pos+1]);
741+
WSACloseEvent(set->handles[cur_event->index+1]);
724742
}
725743
}
726744
#endif
@@ -761,6 +779,7 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
761779
void*user_data)
762780
{
763781
WaitEvent*event;
782+
intfree_event;
764783

765784
/* not enough space */
766785
Assert(set->nevents<set->nevents_space);
@@ -790,8 +809,20 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
790809
if (fd==PGINVALID_SOCKET&& (events&WL_SOCKET_MASK))
791810
elog(ERROR,"cannot wait on socket event without a socket");
792811

793-
event=&set->events[set->nevents];
794-
event->pos=set->nevents++;
812+
free_event=set->free_events;
813+
if (free_event >=0)
814+
{
815+
event=&set->events[free_event];
816+
set->free_events=event->pos;
817+
event->pos=free_event;
818+
}
819+
else
820+
{
821+
event=&set->events[set->nevents];
822+
event->pos=set->nevents;
823+
}
824+
set->permutation[set->nevents]=event->pos;
825+
event->index=set->nevents++;
795826
event->fd=fd;
796827
event->events=events;
797828
event->user_data=user_data;
@@ -820,14 +851,54 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
820851
#elif defined(WAIT_USE_KQUEUE)
821852
WaitEventAdjustKqueue(set,event,0);
822853
#elif defined(WAIT_USE_POLL)
823-
WaitEventAdjustPoll(set,event);
854+
WaitEventAdjustPoll(set,event, false);
824855
#elif defined(WAIT_USE_WIN32)
825-
WaitEventAdjustWin32(set,event);
856+
WaitEventAdjustWin32(set,event, false);
826857
#endif
827858

828859
returnevent->pos;
829860
}
830861

862+
/*
863+
* Remove event with specified position in event set.
864+
*
865+
* 'pos' is the id returned by AddWaitEventToSet.
866+
*/
867+
void
868+
DeleteWaitEvent(WaitEventSet*set,intpos)
869+
{
870+
WaitEvent*event;
871+
#if defined(WAIT_USE_KQUEUE)
872+
intold_events;
873+
#endif
874+
875+
Assert(pos<set->nevents_space);
876+
event=&set->events[pos];
877+
878+
#if defined(WAIT_USE_EPOLL)
879+
WaitEventAdjustEpoll(set,event,EPOLL_CTL_DEL);
880+
#elif defined(WAIT_USE_KQUEUE)
881+
old_events=event->events;
882+
event->events=0;
883+
WaitEventAdjustKqueue(set,event,old_events);
884+
#elif defined(WAIT_USE_POLL)
885+
WaitEventAdjustPoll(set,event, true);
886+
#elif defined(WAIT_USE_WIN32)
887+
WaitEventAdjustWin32(set,event, true);
888+
#endif
889+
if (--set->nevents!=0)
890+
{
891+
set->permutation[event->index]=set->permutation[set->nevents];
892+
set->events[set->permutation[set->nevents]].index=event->index;
893+
}
894+
event->fd=PGINVALID_SOCKET;
895+
event->events=0;
896+
event->index=-1;
897+
event->pos=set->free_events;
898+
set->free_events=pos;
899+
}
900+
901+
831902
/*
832903
* Change the event mask and, in the WL_LATCH_SET case, the latch associated
833904
* with the WaitEvent.
@@ -842,7 +913,7 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
842913
intold_events;
843914
#endif
844915

845-
Assert(pos<set->nevents);
916+
Assert(pos<set->nevents_space);
846917

847918
event=&set->events[pos];
848919
#if defined(WAIT_USE_KQUEUE)
@@ -884,9 +955,9 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
884955
#elif defined(WAIT_USE_KQUEUE)
885956
WaitEventAdjustKqueue(set,event,old_events);
886957
#elif defined(WAIT_USE_POLL)
887-
WaitEventAdjustPoll(set,event);
958+
WaitEventAdjustPoll(set,event, false);
888959
#elif defined(WAIT_USE_WIN32)
889-
WaitEventAdjustWin32(set,event);
960+
WaitEventAdjustWin32(set,event, false);
890961
#endif
891962
}
892963

@@ -933,7 +1004,20 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action)
9331004
*/
9341005
rc=epoll_ctl(set->epoll_fd,action,event->fd,&epoll_ev);
9351006

936-
if (rc<0)
1007+
/*
1008+
* Skip throwing error in case of EPOLL_CTL_DEL. Upon connection error
1009+
* libpq may or may not close the socket, so epfd can disappear.
1010+
*
1011+
* XXX it is not entirely clear which errnos should be checked
1012+
* here. According to the mans I would say it is 'EBADF' (closed socket is
1013+
* not valid, right?), any simple test on my 5.1.11 debian agrees with
1014+
* that. However, msvs-6-3 bf machine with 2.6.32 spits out ENOENT (under
1015+
* dmq) despite evidently correct usage (we don't DEL the same fd
1016+
* twice). EINVAL was also historically checked here.
1017+
*/
1018+
if (rc<0&&
1019+
!(action==EPOLL_CTL_DEL&&
1020+
(errno==EBADF||errno==EINVAL||errno==ENOENT)))
9371021
ereport(ERROR,
9381022
(errcode_for_socket_access(),
9391023
/* translator: %s is a syscall name, such as "poll()" */
@@ -944,11 +1028,16 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action)
9441028

9451029
#if defined(WAIT_USE_POLL)
9461030
staticvoid
947-
WaitEventAdjustPoll(WaitEventSet*set,WaitEvent*event)
1031+
WaitEventAdjustPoll(WaitEventSet*set,WaitEvent*event,boolremove)
9481032
{
949-
structpollfd*pollfd=&set->pollfds[event->pos];
1033+
structpollfd*pollfd=&set->pollfds[event->index];
1034+
1035+
if (remove)
1036+
{
1037+
*pollfd=set->pollfds[set->nevents-1];/* nevents is not decremented yet */
1038+
return;
1039+
}
9501040

951-
pollfd->revents=0;
9521041
pollfd->fd=event->fd;
9531042

9541043
/* prepare pollfd entry once */
@@ -1088,7 +1177,11 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
10881177
if (event->events==WL_POSTMASTER_DEATH&&
10891178
(errno==ESRCH||errno==EACCES))
10901179
set->report_postmaster_not_running= true;
1091-
else
1180+
/*
1181+
* Like in WaitEventAdjustEpoll, don't throw if we are trying to
1182+
* remove already closed socket. FIXME: ensure this check is right.
1183+
*/
1184+
elseif (!(event->events==0&&errno==EBADF ))
10921185
ereport(ERROR,
10931186
(errcode_for_socket_access(),
10941187
/* translator: %s is a syscall name, such as "poll()" */
@@ -1112,9 +1205,21 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
11121205

11131206
#if defined(WAIT_USE_WIN32)
11141207
staticvoid
1115-
WaitEventAdjustWin32(WaitEventSet*set,WaitEvent*event)
1208+
WaitEventAdjustWin32(WaitEventSet*set,WaitEvent*event,boolremove)
11161209
{
1117-
HANDLE*handle=&set->handles[event->pos+1];
1210+
HANDLE*handle=&set->handles[event->index+1];
1211+
1212+
if (remove)
1213+
{
1214+
Assert(event->fd!=PGINVALID_SOCKET);
1215+
1216+
if (*handle!=WSA_INVALID_EVENT)
1217+
WSACloseEvent(*handle);
1218+
1219+
*handle=set->handles[set->nevents];/* nevents is not decremented yet but we need to add 1 to the index */
1220+
set->handles[set->nevents]=WSA_INVALID_EVENT;
1221+
return;
1222+
}
11181223

11191224
if (event->events==WL_LATCH_SET)
11201225
{
@@ -1562,11 +1667,12 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
15621667
{
15631668
intreturned_events=0;
15641669
intrc;
1565-
WaitEvent*cur_event;
1566-
structpollfd*cur_pollfd;
1670+
inti;
1671+
structpollfd*cur_pollfd=set->pollfds;
1672+
WaitEvent*cur_event;
15671673

15681674
/* Sleep */
1569-
rc=poll(set->pollfds,set->nevents, (int)cur_timeout);
1675+
rc=poll(cur_pollfd,set->nevents, (int)cur_timeout);
15701676

15711677
/* Check return code */
15721678
if (rc<0)
@@ -1589,15 +1695,13 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
15891695
return-1;
15901696
}
15911697

1592-
for (cur_event=set->events,cur_pollfd=set->pollfds;
1593-
cur_event< (set->events+set->nevents)&&
1594-
returned_events<nevents;
1595-
cur_event++,cur_pollfd++)
1698+
for (i=0;i<set->nevents&&returned_events<nevents;i++,cur_pollfd++)
15961699
{
15971700
/* no activity on this FD, skip */
15981701
if (cur_pollfd->revents==0)
15991702
continue;
16001703

1704+
cur_event=&set->events[set->permutation[i]];
16011705
occurred_events->pos=cur_event->pos;
16021706
occurred_events->user_data=cur_event->user_data;
16031707
occurred_events->events=0;
@@ -1688,17 +1792,25 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
16881792
WaitEvent*occurred_events,intnevents)
16891793
{
16901794
intreturned_events=0;
1795+
inti;
16911796
DWORDrc;
1692-
WaitEvent*cur_event;
1797+
WaitEvent*cur_event;
16931798

16941799
/* Reset any wait events that need it */
1695-
for (cur_event=set->events;
1696-
cur_event< (set->events+set->nevents);
1697-
cur_event++)
1698-
{
1699-
if (cur_event->reset)
1700-
{
1701-
WaitEventAdjustWin32(set,cur_event);
1800+
for (i=0;i<set->nevents;i++)
1801+
{
1802+
cur_event=&set->events[set->permutation[i]];
1803+
1804+
/*
1805+
* I have problem at Windows when SSPI connections "hanged" in WaitForMultipleObjects which
1806+
* doesn't signal presence of input data (while it is possible to read this data from the socket).
1807+
* Looks like "reset" logic is not completely correct (resetting event just after
1808+
* receiveing presious read event). Reseting all read events fixes this problem.
1809+
*/
1810+
if (cur_event->events&WL_SOCKET_READABLE)
1811+
/* if (cur_event->reset) */
1812+
{
1813+
WaitEventAdjustWin32(set,cur_event, false);
17021814
cur_event->reset= false;
17031815
}
17041816

@@ -1764,7 +1876,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
17641876
* With an offset of one, due to the always present pgwin32_signal_event,
17651877
* the handle offset directly corresponds to a wait event.
17661878
*/
1767-
cur_event= (WaitEvent*)&set->events[rc-WAIT_OBJECT_0-1];
1879+
cur_event= (WaitEvent*)&set->events[set->permutation[rc-WAIT_OBJECT_0-1]];
17681880

17691881
occurred_events->pos=cur_event->pos;
17701882
occurred_events->user_data=cur_event->user_data;
@@ -1805,7 +1917,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
18051917
elseif (cur_event->events&WL_SOCKET_MASK)
18061918
{
18071919
WSANETWORKEVENTSresEvents;
1808-
HANDLEhandle=set->handles[cur_event->pos+1];
1920+
HANDLEhandle=set->handles[cur_event->index+1];
18091921

18101922
Assert(cur_event->fd);
18111923

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp