@@ -51,12 +51,15 @@ static void gistdoinsert(Relation r,
51
51
Size freespace ,
52
52
GISTSTATE * GISTstate );
53
53
static void gistfixsplit (GISTInsertState * state ,GISTSTATE * giststate );
54
+ static bool gistinserttuple (GISTInsertState * state ,GISTInsertStack * stack ,
55
+ GISTSTATE * giststate ,IndexTuple tuple ,OffsetNumber oldoffnum );
54
56
static bool gistinserttuples (GISTInsertState * state ,GISTInsertStack * stack ,
55
57
GISTSTATE * giststate ,
56
58
IndexTuple * tuples ,int ntup ,OffsetNumber oldoffnum ,
57
- Buffer leftchild );
59
+ Buffer leftchild ,Buffer rightchild ,
60
+ bool unlockbuf ,bool unlockleftchild );
58
61
static void gistfinishsplit (GISTInsertState * state ,GISTInsertStack * stack ,
59
- GISTSTATE * giststate ,List * splitinfo );
62
+ GISTSTATE * giststate ,List * splitinfo , bool releasebuf );
60
63
61
64
62
65
#define ROTATEDIST (d ) do { \
@@ -755,15 +758,15 @@ gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
755
758
/*
756
759
* Update the tuple.
757
760
*
758
- * We still hold the lock aftergistinserttuples (), but it
761
+ * We still hold the lock aftergistinserttuple (), but it
759
762
* might have to split the page to make the updated tuple fit.
760
763
* In that case the updated tuple might migrate to the other
761
764
* half of the split, so we have to go back to the parent and
762
765
* descend back to the half that's a better fit for the new
763
766
* tuple.
764
767
*/
765
- if (gistinserttuples (& state ,stack ,giststate ,& newtup , 1 ,
766
- stack -> childoffnum , InvalidBuffer ))
768
+ if (gistinserttuple (& state ,stack ,giststate ,newtup ,
769
+ stack -> childoffnum ))
767
770
{
768
771
/*
769
772
* If this was a root split, the root page continues to be
@@ -848,8 +851,8 @@ gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
848
851
849
852
/* now state.stack->(page, buffer and blkno) points to leaf page */
850
853
851
- gistinserttuples (& state ,stack ,giststate ,& itup , 1 ,
852
- InvalidOffsetNumber , InvalidBuffer );
854
+ gistinserttuple (& state ,stack ,giststate ,itup ,
855
+ InvalidOffsetNumber );
853
856
LockBuffer (stack -> buffer ,GIST_UNLOCK );
854
857
855
858
/* Release any pins we might still hold before exiting */
@@ -1190,49 +1193,104 @@ gistfixsplit(GISTInsertState *state, GISTSTATE *giststate)
1190
1193
}
1191
1194
1192
1195
/* Insert the downlinks */
1193
- gistfinishsplit (state ,stack ,giststate ,splitinfo );
1196
+ gistfinishsplit (state ,stack ,giststate ,splitinfo , false );
1194
1197
}
1195
1198
1196
1199
/*
1197
- * Inserttuples to stack->buffer. If 'oldoffnum' is valid, the new tuples
1198
- *replace an old tuple at oldoffnum. The caller must hold an exclusive lock
1199
- *on the page.
1200
+ * Insertor replace a tuple in stack->buffer. If 'oldoffnum' is valid, the
1201
+ * tuple at' oldoffnum' is replaced, otherwise the tuple is inserted as new.
1202
+ *'stack' represents thepath from the root to the page being updated .
1200
1203
*
1201
- * If leftchild is valid, we're inserting/updating the downlink for the
1202
- * page to the right of leftchild. We clear the F_FOLLOW_RIGHT flag and
1203
- * update NSN on leftchild, atomically with the insertion of the downlink.
1204
+ * The caller must hold an exclusive lock on stack->buffer. The lock is still
1205
+ * held on return, but the page might not contain the inserted tuple if the
1206
+ * page was split. The function returns true if the page was split, false
1207
+ * otherwise.
1208
+ */
1209
+ static bool
1210
+ gistinserttuple (GISTInsertState * state ,GISTInsertStack * stack ,
1211
+ GISTSTATE * giststate ,IndexTuple tuple ,OffsetNumber oldoffnum )
1212
+ {
1213
+ return gistinserttuples (state ,stack ,giststate ,& tuple ,1 ,oldoffnum ,
1214
+ InvalidBuffer ,InvalidBuffer , false, false);
1215
+ }
1216
+
1217
+ /* ----------------
1218
+ * An extended workhorse version of gistinserttuple(). This version allows
1219
+ * inserting multiple tuples, or replacing a single tuple with multiple tuples.
1220
+ * This is used to recursively update the downlinks in the parent when a page
1221
+ * is split.
1204
1222
*
1205
- * Returns 'true' if the page had to be split. On return, we will continue
1206
- * to hold an exclusive lock on state->stack->buffer, but if we had to split
1207
- * the page, it might not contain the tuple we just inserted/updated.
1223
+ * If leftchild and rightchild are valid, we're inserting/replacing the
1224
+ * downlink for rightchild, and leftchild is its left sibling. We clear the
1225
+ * F_FOLLOW_RIGHT flag and update NSN on leftchild, atomically with the
1226
+ * insertion of the downlink.
1227
+ *
1228
+ * To avoid holding locks for longer than necessary, when recursing up the
1229
+ * tree to update the parents, the locking is a bit peculiar here. On entry,
1230
+ * the caller must hold an exclusive lock on stack->buffer, as well as
1231
+ * leftchild and rightchild if given. On return:
1232
+ *
1233
+ *- Lock on stack->buffer is released, if 'unlockbuf' is true. The page is
1234
+ * always kept pinned, however.
1235
+ *- Lock on 'leftchild' is released, if 'unlockleftchild' is true. The page
1236
+ * is kept pinned.
1237
+ *- Lock and pin on 'rightchild' are always released.
1238
+ *
1239
+ * Returns 'true' if the page had to be split. Note that if the page had
1240
+ * be split, the inserted/updated might've been inserted to a right sibling
1241
+ * of stack->buffer instead of stack->buffer itself.
1208
1242
*/
1209
1243
static bool
1210
1244
gistinserttuples (GISTInsertState * state ,GISTInsertStack * stack ,
1211
1245
GISTSTATE * giststate ,
1212
1246
IndexTuple * tuples ,int ntup ,OffsetNumber oldoffnum ,
1213
- Buffer leftchild )
1247
+ Buffer leftchild ,Buffer rightchild ,
1248
+ bool unlockbuf ,bool unlockleftchild )
1214
1249
{
1215
1250
List * splitinfo ;
1216
1251
bool is_split ;
1217
1252
1253
+ /* Insert the tuple(s) to the page, splitting the page if necessary */
1218
1254
is_split = gistplacetopage (state ,giststate ,stack -> buffer ,
1219
1255
tuples ,ntup ,oldoffnum ,
1220
- leftchild ,
1221
- & splitinfo );
1256
+ leftchild ,& splitinfo );
1257
+
1258
+ /*
1259
+ * Before recursing up in case the page was split, release locks on the
1260
+ * child pages. We don't need to keep them locked when updating the
1261
+ * parent.
1262
+ */
1263
+ if (BufferIsValid (rightchild ))
1264
+ UnlockReleaseBuffer (rightchild );
1265
+ if (BufferIsValid (leftchild )&& unlockleftchild )
1266
+ LockBuffer (leftchild ,GIST_UNLOCK );
1267
+
1268
+ /*
1269
+ * If we had to split, insert/update the downlinks in the parent. If the
1270
+ * caller requested us to release the lock on stack->buffer, tell
1271
+ * gistfinishsplit() to do that as soon as it's safe to do so. If we
1272
+ * didn't have to split, release it ourselves.
1273
+ */
1222
1274
if (splitinfo )
1223
- gistfinishsplit (state ,stack ,giststate ,splitinfo );
1275
+ gistfinishsplit (state ,stack ,giststate ,splitinfo ,unlockbuf );
1276
+ else if (unlockbuf )
1277
+ LockBuffer (stack -> buffer ,GIST_UNLOCK );
1224
1278
1225
1279
return is_split ;
1226
1280
}
1227
1281
1228
1282
/*
1229
- * Finish an incomplete split by inserting/updating the downlinks in
1230
- * parent page. 'splitinfo' contains all the child pages, exclusively-locked,
1231
- * involved in the split, from left-to-right.
1283
+ * Finish an incomplete split by inserting/updating the downlinks in parent
1284
+ * page. 'splitinfo' contains all the child pages involved in the split,
1285
+ * from left-to-right.
1286
+ *
1287
+ * On entry, the caller must hold a lock on stack->buffer and all the child
1288
+ * pages in 'splitinfo'. If 'unlockbuf' is true, the lock on stack->buffer is
1289
+ * released on return. The child pages are always unlocked and unpinned.
1232
1290
*/
1233
1291
static void
1234
1292
gistfinishsplit (GISTInsertState * state ,GISTInsertStack * stack ,
1235
- GISTSTATE * giststate ,List * splitinfo )
1293
+ GISTSTATE * giststate ,List * splitinfo , bool unlockbuf )
1236
1294
{
1237
1295
ListCell * lc ;
1238
1296
List * reversed ;
@@ -1261,6 +1319,10 @@ gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
1261
1319
LockBuffer (stack -> parent -> buffer ,GIST_EXCLUSIVE );
1262
1320
gistFindCorrectParent (state -> r ,stack );
1263
1321
1322
+ /*
1323
+ * insert downlinks for the siblings from right to left, until there are
1324
+ * only two siblings left.
1325
+ */
1264
1326
while (list_length (reversed )> 2 )
1265
1327
{
1266
1328
right = (GISTPageSplitInfo * )linitial (reversed );
@@ -1269,15 +1331,15 @@ gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
1269
1331
if (gistinserttuples (state ,stack -> parent ,giststate ,
1270
1332
& right -> downlink ,1 ,
1271
1333
InvalidOffsetNumber ,
1272
- left -> buf ))
1334
+ left -> buf , right -> buf , false, false ))
1273
1335
{
1274
1336
/*
1275
1337
* If the parent page was split, need to relocate the original
1276
1338
* parent pointer.
1277
1339
*/
1278
1340
gistFindCorrectParent (state -> r ,stack );
1279
1341
}
1280
- UnlockReleaseBuffer ( right -> buf );
1342
+ /* gistinserttuples() released the lock on right->buf. */
1281
1343
reversed = list_delete_first (reversed );
1282
1344
}
1283
1345
@@ -1294,9 +1356,10 @@ gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
1294
1356
gistinserttuples (state ,stack -> parent ,giststate ,
1295
1357
tuples ,2 ,
1296
1358
stack -> parent -> childoffnum ,
1297
- left -> buf );
1298
- LockBuffer (stack -> parent -> buffer ,GIST_UNLOCK );
1299
- UnlockReleaseBuffer (right -> buf );
1359
+ left -> buf ,right -> buf ,
1360
+ true,/* Unlock parent */
1361
+ unlockbuf /* Unlock stack->buffer if caller wants that */
1362
+ );
1300
1363
Assert (left -> buf == stack -> buffer );
1301
1364
}
1302
1365