|
| 1 | +/*------------------------------------------------------------------------- |
| 2 | + * |
| 3 | + * barrier.c |
| 4 | + * Barriers for synchronizing cooperating processes. |
| 5 | + * |
| 6 | + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group |
| 7 | + * Portions Copyright (c) 1994, Regents of the University of California |
| 8 | + * |
| 9 | + * From Wikipedia[1]: "In parallel computing, a barrier is a type of |
| 10 | + * synchronization method. A barrier for a group of threads or processes in |
| 11 | + * the source code means any thread/process must stop at this point and cannot |
| 12 | + * proceed until all other threads/processes reach this barrier." |
| 13 | + * |
| 14 | + * This implementation of barriers allows for static sets of participants |
| 15 | + * known up front, or dynamic sets of participants which processes can join or |
| 16 | + * leave at any time. In the dynamic case, a phase number can be used to |
| 17 | + * track progress through a parallel algorithm, and may be necessary to |
| 18 | + * synchronize with the current phase of a multi-phase algorithm when a new |
| 19 | + * participant joins. In the static case, the phase number is used |
| 20 | + * internally, but it isn't strictly necessary for client code to access it |
| 21 | + * because the phase can only advance when the declared number of participants |
| 22 | + * reaches the barrier, so client code should be in no doubt about the current |
| 23 | + * phase of computation at all times. |
| 24 | + * |
| 25 | + * Consider a parallel algorithm that involves separate phases of computation |
| 26 | + * A, B and C where the output of each phase is needed before the next phase |
| 27 | + * can begin. |
| 28 | + * |
| 29 | + * In the case of a static barrier initialized with 4 participants, each |
| 30 | + * participant works on phase A, then calls BarrierArriveAndWait to wait until |
| 31 | + * all 4 participants have reached that point. When BarrierArriveAndWait |
| 32 | + * returns control, each participant can work on B, and so on. Because the |
| 33 | + * barrier knows how many participants to expect, the phases of computation |
| 34 | + * don't need labels or numbers, since each process's program counter implies |
| 35 | + * the current phase. Even if some of the processes are slow to start up and |
| 36 | + * begin running phase A, the other participants are expecting them and will |
| 37 | + * patiently wait at the barrier. The code could be written as follows: |
| 38 | + * |
| 39 | + * perform_a(); |
| 40 | + * BarrierArriveAndWait(&barrier, ...); |
| 41 | + * perform_b(); |
| 42 | + * BarrierArriveAndWait(&barrier, ...); |
| 43 | + * perform_c(); |
| 44 | + * BarrierArriveAndWait(&barrier, ...); |
| 45 | + * |
| 46 | + * If the number of participants is not known up front, then a dynamic barrier |
| 47 | + * is needed and the number should be set to zero at initialization. New |
| 48 | + * complications arise because the number necessarily changes over time as |
| 49 | + * participants attach and detach, and therefore phases B, C or even the end |
| 50 | + * of processing may be reached before any given participant has started |
| 51 | + * running and attached. Therefore the client code must perform an initial |
| 52 | + * test of the phase number after attaching, because it needs to find out |
| 53 | + * which phase of the algorithm has been reached by any participants that are |
| 54 | + * already attached in order to synchronize with that work. Once the program |
| 55 | + * counter or some other representation of current progress is synchronized |
| 56 | + * with the barrier's phase, normal control flow can be used just as in the |
| 57 | + * static case. Our example could be written using a switch statement with |
| 58 | + * cases that fall-through, as follows: |
| 59 | + * |
| 60 | + * phase = BarrierAttach(&barrier); |
| 61 | + * switch (phase) |
| 62 | + * { |
| 63 | + * case PHASE_A: |
| 64 | + * perform_a(); |
| 65 | + * BarrierArriveAndWait(&barrier, ...); |
| 66 | + * case PHASE_B: |
| 67 | + * perform_b(); |
| 68 | + * BarrierArriveAndWait(&barrier, ...); |
| 69 | + * case PHASE_C: |
| 70 | + * perform_c(); |
| 71 | + * BarrierArriveAndWait(&barrier, ...); |
| 72 | + * } |
| 73 | + * BarrierDetach(&barrier); |
| 74 | + * |
| 75 | + * Static barriers behave similarly to POSIX's pthread_barrier_t. Dynamic |
| 76 | + * barriers behave similarly to Java's java.util.concurrent.Phaser. |
| 77 | + * |
| 78 | + * [1] https://en.wikipedia.org/wiki/Barrier_(computer_science) |
| 79 | + * |
| 80 | + * IDENTIFICATION |
| 81 | + * src/backend/storage/ipc/barrier.c |
| 82 | + * |
| 83 | + *------------------------------------------------------------------------- |
| 84 | + */ |
| 85 | + |
| 86 | +#include"postgres.h" |
| 87 | +#include"storage/barrier.h" |
| 88 | + |
| 89 | +staticinlineboolBarrierDetachImpl(Barrier*barrier,boolarrive); |
| 90 | + |
| 91 | +/* |
| 92 | + * Initialize this barrier. To use a static party size, provide the number of |
| 93 | + * participants to wait for at each phase indicating that that number of |
| 94 | + * backends is implicitly attached. To use a dynamic party size, specify zero |
| 95 | + * here and then use BarrierAttach() and |
| 96 | + * BarrierDetach()/BarrierArriveAndDetach() to register and deregister |
| 97 | + * participants explicitly. |
| 98 | + */ |
| 99 | +void |
| 100 | +BarrierInit(Barrier*barrier,intparticipants) |
| 101 | +{ |
| 102 | +SpinLockInit(&barrier->mutex); |
| 103 | +barrier->participants=participants; |
| 104 | +barrier->arrived=0; |
| 105 | +barrier->phase=0; |
| 106 | +barrier->elected=0; |
| 107 | +barrier->static_party=participants>0; |
| 108 | +ConditionVariableInit(&barrier->condition_variable); |
| 109 | +} |
| 110 | + |
| 111 | +/* |
| 112 | + * Arrive at this barrier, wait for all other attached participants to arrive |
| 113 | + * too and then return. Increments the current phase. The caller must be |
| 114 | + * attached. |
| 115 | + * |
| 116 | + * While waiting, pg_stat_activity shows a wait_event_class and wait_event |
| 117 | + * controlled by the wait_event_info passed in, which should be a value from |
| 118 | + * from one of the WaitEventXXX enums defined in pgstat.h. |
| 119 | + * |
| 120 | + * Return true in one arbitrarily chosen participant. Return false in all |
| 121 | + * others. The return code can be used to elect one participant to execute a |
| 122 | + * phase of work that must be done serially while other participants wait. |
| 123 | + */ |
| 124 | +bool |
| 125 | +BarrierArriveAndWait(Barrier*barrier,uint32wait_event_info) |
| 126 | +{ |
| 127 | +boolrelease= false; |
| 128 | +boolelected; |
| 129 | +intstart_phase; |
| 130 | +intnext_phase; |
| 131 | + |
| 132 | +SpinLockAcquire(&barrier->mutex); |
| 133 | +start_phase=barrier->phase; |
| 134 | +next_phase=start_phase+1; |
| 135 | +++barrier->arrived; |
| 136 | +if (barrier->arrived==barrier->participants) |
| 137 | +{ |
| 138 | +release= true; |
| 139 | +barrier->arrived=0; |
| 140 | +barrier->phase=next_phase; |
| 141 | +barrier->elected=next_phase; |
| 142 | +} |
| 143 | +SpinLockRelease(&barrier->mutex); |
| 144 | + |
| 145 | +/* |
| 146 | + * If we were the last expected participant to arrive, we can release our |
| 147 | + * peers and return true to indicate that this backend has been elected to |
| 148 | + * perform any serial work. |
| 149 | + */ |
| 150 | +if (release) |
| 151 | +{ |
| 152 | +ConditionVariableBroadcast(&barrier->condition_variable); |
| 153 | + |
| 154 | +return true; |
| 155 | +} |
| 156 | + |
| 157 | +/* |
| 158 | + * Otherwise we have to wait for the last participant to arrive and |
| 159 | + * advance the phase. |
| 160 | + */ |
| 161 | +elected= false; |
| 162 | +ConditionVariablePrepareToSleep(&barrier->condition_variable); |
| 163 | +for (;;) |
| 164 | +{ |
| 165 | +/* |
| 166 | + * We know that phase must either be start_phase, indicating that we |
| 167 | + * need to keep waiting, or next_phase, indicating that the last |
| 168 | + * participant that we were waiting for has either arrived or detached |
| 169 | + * so that the next phase has begun. The phase cannot advance any |
| 170 | + * further than that without this backend's participation, because |
| 171 | + * this backend is attached. |
| 172 | + */ |
| 173 | +SpinLockAcquire(&barrier->mutex); |
| 174 | +Assert(barrier->phase==start_phase||barrier->phase==next_phase); |
| 175 | +release=barrier->phase==next_phase; |
| 176 | +if (release&&barrier->elected!=next_phase) |
| 177 | +{ |
| 178 | +/* |
| 179 | + * Usually the backend that arrives last and releases the other |
| 180 | + * backends is elected to return true (see above), so that it can |
| 181 | + * begin processing serial work while it has a CPU timeslice. |
| 182 | + * However, if the barrier advanced because someone detached, then |
| 183 | + * one of the backends that is awoken will need to be elected. |
| 184 | + */ |
| 185 | +barrier->elected=barrier->phase; |
| 186 | +elected= true; |
| 187 | +} |
| 188 | +SpinLockRelease(&barrier->mutex); |
| 189 | +if (release) |
| 190 | +break; |
| 191 | +ConditionVariableSleep(&barrier->condition_variable,wait_event_info); |
| 192 | +} |
| 193 | +ConditionVariableCancelSleep(); |
| 194 | + |
| 195 | +returnelected; |
| 196 | +} |
| 197 | + |
| 198 | +/* |
| 199 | + * Arrive at this barrier, but detach rather than waiting. Returns true if |
| 200 | + * the caller was the last to detach. |
| 201 | + */ |
| 202 | +bool |
| 203 | +BarrierArriveAndDetach(Barrier*barrier) |
| 204 | +{ |
| 205 | +returnBarrierDetachImpl(barrier, true); |
| 206 | +} |
| 207 | + |
| 208 | +/* |
| 209 | + * Attach to a barrier. All waiting participants will now wait for this |
| 210 | + * participant to call BarrierArriveAndWait(), BarrierDetach() or |
| 211 | + * BarrierArriveAndDetach(). Return the current phase. |
| 212 | + */ |
| 213 | +int |
| 214 | +BarrierAttach(Barrier*barrier) |
| 215 | +{ |
| 216 | +intphase; |
| 217 | + |
| 218 | +Assert(!barrier->static_party); |
| 219 | + |
| 220 | +SpinLockAcquire(&barrier->mutex); |
| 221 | +++barrier->participants; |
| 222 | +phase=barrier->phase; |
| 223 | +SpinLockRelease(&barrier->mutex); |
| 224 | + |
| 225 | +returnphase; |
| 226 | +} |
| 227 | + |
| 228 | +/* |
| 229 | + * Detach from a barrier. This may release other waiters from BarrierWait and |
| 230 | + * advance the phase if they were only waiting for this backend. Return true |
| 231 | + * if this participant was the last to detach. |
| 232 | + */ |
| 233 | +bool |
| 234 | +BarrierDetach(Barrier*barrier) |
| 235 | +{ |
| 236 | +returnBarrierDetachImpl(barrier, false); |
| 237 | +} |
| 238 | + |
| 239 | +/* |
| 240 | + * Return the current phase of a barrier. The caller must be attached. |
| 241 | + */ |
| 242 | +int |
| 243 | +BarrierPhase(Barrier*barrier) |
| 244 | +{ |
| 245 | +/* |
| 246 | + * It is OK to read barrier->phase without locking, because it can't |
| 247 | + * change without us (we are attached to it), and we executed a memory |
| 248 | + * barrier when we either attached or participated in changing it last |
| 249 | + * time. |
| 250 | + */ |
| 251 | +returnbarrier->phase; |
| 252 | +} |
| 253 | + |
| 254 | +/* |
| 255 | + * Return an instantaneous snapshot of the number of participants currently |
| 256 | + * attached to this barrier. For debugging purposes only. |
| 257 | + */ |
| 258 | +int |
| 259 | +BarrierParticipants(Barrier*barrier) |
| 260 | +{ |
| 261 | +intparticipants; |
| 262 | + |
| 263 | +SpinLockAcquire(&barrier->mutex); |
| 264 | +participants=barrier->participants; |
| 265 | +SpinLockRelease(&barrier->mutex); |
| 266 | + |
| 267 | +returnparticipants; |
| 268 | +} |
| 269 | + |
| 270 | +/* |
| 271 | + * Detach from a barrier. If 'arrive' is true then also increment the phase |
| 272 | + * if there are no other participants. If there are other participants |
| 273 | + * waiting, then the phase will be advanced and they'll be released if they |
| 274 | + * were only waiting for the caller. Return true if this participant was the |
| 275 | + * last to detach. |
| 276 | + */ |
| 277 | +staticinlinebool |
| 278 | +BarrierDetachImpl(Barrier*barrier,boolarrive) |
| 279 | +{ |
| 280 | +boolrelease; |
| 281 | +boollast; |
| 282 | + |
| 283 | +Assert(!barrier->static_party); |
| 284 | + |
| 285 | +SpinLockAcquire(&barrier->mutex); |
| 286 | +Assert(barrier->participants>0); |
| 287 | +--barrier->participants; |
| 288 | + |
| 289 | +/* |
| 290 | + * If any other participants are waiting and we were the last participant |
| 291 | + * waited for, release them. If no other participants are waiting, but |
| 292 | + * this is a BarrierArriveAndDetach() call, then advance the phase too. |
| 293 | + */ |
| 294 | +if ((arrive||barrier->participants>0)&& |
| 295 | +barrier->arrived==barrier->participants) |
| 296 | +{ |
| 297 | +release= true; |
| 298 | +barrier->arrived=0; |
| 299 | +++barrier->phase; |
| 300 | +} |
| 301 | +else |
| 302 | +release= false; |
| 303 | + |
| 304 | +last=barrier->participants==0; |
| 305 | +SpinLockRelease(&barrier->mutex); |
| 306 | + |
| 307 | +if (release) |
| 308 | +ConditionVariableBroadcast(&barrier->condition_variable); |
| 309 | + |
| 310 | +returnlast; |
| 311 | +} |