@@ -134,14 +134,17 @@ ReadBufferBI(Relation relation, BlockNumber targetBlock,
134134 * To avoid complexity in the callers, either buffer1 or buffer2 may be
135135 * InvalidBuffer if only one buffer is involved. For the same reason, block2
136136 * may be smaller than block1.
137+ *
138+ * Returns whether buffer locks were temporarily released.
137139 */
138- static void
140+ static bool
139141GetVisibilityMapPins (Relation relation ,Buffer buffer1 ,Buffer buffer2 ,
140142BlockNumber block1 ,BlockNumber block2 ,
141143Buffer * vmbuffer1 ,Buffer * vmbuffer2 )
142144{
143145bool need_to_pin_buffer1 ;
144146bool need_to_pin_buffer2 ;
147+ bool released_locks = false;
145148
146149/*
147150 * Swap buffers around to handle case of a single block/buffer, and to
@@ -175,9 +178,10 @@ GetVisibilityMapPins(Relation relation, Buffer buffer1, Buffer buffer2,
175178&& PageIsAllVisible (BufferGetPage (buffer2 ))
176179&& !visibilitymap_pin_ok (block2 ,* vmbuffer2 );
177180if (!need_to_pin_buffer1 && !need_to_pin_buffer2 )
178- return ;
181+ break ;
179182
180183/* We must unlock both buffers before doing any I/O. */
184+ released_locks = true;
181185LockBuffer (buffer1 ,BUFFER_LOCK_UNLOCK );
182186if (buffer2 != InvalidBuffer && buffer2 != buffer1 )
183187LockBuffer (buffer2 ,BUFFER_LOCK_UNLOCK );
@@ -203,6 +207,8 @@ GetVisibilityMapPins(Relation relation, Buffer buffer1, Buffer buffer2,
203207|| (need_to_pin_buffer1 && need_to_pin_buffer2 ))
204208break ;
205209}
210+
211+ return released_locks ;
206212}
207213
208214/*
@@ -365,6 +371,8 @@ RelationGetBufferForTuple(Relation relation, Size len,
365371BlockNumber targetBlock ,
366372otherBlock ;
367373bool needLock ;
374+ bool unlockedTargetBuffer ;
375+ bool recheckVmPins ;
368376
369377len = MAXALIGN (len );/* be conservative */
370378
@@ -645,6 +653,9 @@ RelationGetBufferForTuple(Relation relation, Size len,
645653if (needLock )
646654UnlockRelationForExtension (relation ,ExclusiveLock );
647655
656+ unlockedTargetBuffer = false;
657+ targetBlock = BufferGetBlockNumber (buffer );
658+
648659/*
649660 * We need to initialize the empty new page. Double-check that it really
650661 * is empty (this should never happen, but if it does we don't want to
@@ -654,75 +665,108 @@ RelationGetBufferForTuple(Relation relation, Size len,
654665
655666if (!PageIsNew (page ))
656667elog (ERROR ,"page %u of relation \"%s\" should be empty but is not" ,
657- BufferGetBlockNumber ( buffer ) ,
668+ targetBlock ,
658669RelationGetRelationName (relation ));
659670
660671PageInit (page ,BufferGetPageSize (buffer ),0 );
661672MarkBufferDirty (buffer );
662673
663674/*
664- * The page is empty, pin vmbuffer to set all_frozen bit.
675+ * The page is empty, pin vmbuffer to set all_frozen bit. We don't want to
676+ * do IO while the buffer is locked, so we unlock the page first if IO is
677+ * needed (necessitating checks below).
665678 */
666679if (options & HEAP_INSERT_FROZEN )
667680{
668- Assert (PageGetMaxOffsetNumber (BufferGetPage (buffer ))== 0 );
669- visibilitymap_pin (relation ,BufferGetBlockNumber (buffer ),vmbuffer );
681+ Assert (PageGetMaxOffsetNumber (page )== 0 );
682+
683+ if (!visibilitymap_pin_ok (targetBlock ,* vmbuffer ))
684+ {
685+ unlockedTargetBuffer = true;
686+ LockBuffer (buffer ,BUFFER_LOCK_UNLOCK );
687+ visibilitymap_pin (relation ,targetBlock ,vmbuffer );
688+ }
670689}
671690
672691/*
673- * Lock the other buffer. It's guaranteed to be of a lower page number
674- * than the new page. To conform with the deadlock prevent rules, we ought
675- * to lock otherBuffer first, but that would give other backends a chance
676- * to put tuples on our page. To reduce the likelihood of that, attempt to
677- * lock the other buffer conditionally, that's very likely to work.
678- * Otherwise we need to lock buffers in the correct order, and retry if
679- * the space has been used in the mean time.
692+ * Reacquire locks if necessary.
680693 *
681- * Alternatively, we could acquire the lock on otherBuffer before
682- * extending the relation, but that'd require holding the lock while
683- * performing IO, which seems worse than an unlikely retry.
694+ * If the target buffer was unlocked above, or is unlocked while
695+ * reacquiring the lock on otherBuffer below, it's unlikely, but possible,
696+ * that another backend used space on this page. We check for that below,
697+ * and retry if necessary.
684698 */
685- if (otherBuffer != InvalidBuffer )
699+ recheckVmPins = false;
700+ if (unlockedTargetBuffer )
701+ {
702+ /* released lock on target buffer above */
703+ if (otherBuffer != InvalidBuffer )
704+ LockBuffer (otherBuffer ,BUFFER_LOCK_EXCLUSIVE );
705+ LockBuffer (buffer ,BUFFER_LOCK_EXCLUSIVE );
706+ recheckVmPins = true;
707+ }
708+ else if (otherBuffer != InvalidBuffer )
686709{
710+ /*
711+ * We did not release the target buffer, and otherBuffer is valid,
712+ * need to lock the other buffer. It's guaranteed to be of a lower
713+ * page number than the new page. To conform with the deadlock
714+ * prevent rules, we ought to lock otherBuffer first, but that would
715+ * give other backends a chance to put tuples on our page. To reduce
716+ * the likelihood of that, attempt to lock the other buffer
717+ * conditionally, that's very likely to work.
718+ *
719+ * Alternatively, we could acquire the lock on otherBuffer before
720+ * extending the relation, but that'd require holding the lock while
721+ * performing IO, which seems worse than an unlikely retry.
722+ */
687723Assert (otherBuffer != buffer );
688- targetBlock = BufferGetBlockNumber (buffer );
689724Assert (targetBlock > otherBlock );
690725
691726if (unlikely (!ConditionalLockBuffer (otherBuffer )))
692727{
728+ unlockedTargetBuffer = true;
693729LockBuffer (buffer ,BUFFER_LOCK_UNLOCK );
694730LockBuffer (otherBuffer ,BUFFER_LOCK_EXCLUSIVE );
695731LockBuffer (buffer ,BUFFER_LOCK_EXCLUSIVE );
696732}
733+ recheckVmPins = true;
734+ }
697735
698- /*
699- * Because the buffers were unlocked for a while, it's possible,
700- * although unlikely, that an all-visible flag became set or that
701- * somebody used up the available space in the new page. We can use
702- * GetVisibilityMapPins to deal with the first case. In the second
703- * case, just retry from start.
704- */
705- GetVisibilityMapPins (relation ,otherBuffer ,buffer ,
706- otherBlock ,targetBlock ,vmbuffer_other ,
707- vmbuffer );
736+ /*
737+ * If one of the buffers was unlocked (always the case if otherBuffer is
738+ * valid), it's possible, although unlikely, that an all-visible flag
739+ * became set. We can use GetVisibilityMapPins to deal with that. It's
740+ * possible that GetVisibilityMapPins() might need to temporarily release
741+ * buffer locks, in which case we'll need to check if there's still enough
742+ * space on the page below.
743+ */
744+ if (recheckVmPins )
745+ {
746+ if (GetVisibilityMapPins (relation ,otherBuffer ,buffer ,
747+ otherBlock ,targetBlock ,vmbuffer_other ,
748+ vmbuffer ))
749+ unlockedTargetBuffer = true;
750+ }
708751
709- /*
710- * Note that we have to check the available space even if our
711- * conditional lock succeeded, because GetVisibilityMapPins might've
712- * transiently released lock on the target buffer to acquire a VM pin
713- * for the otherBuffer.
714- */
715- if (len > PageGetHeapFreeSpace (page ))
752+ /*
753+ * If the target buffer was temporarily unlocked since the relation
754+ * extension, it's possible, although unlikely, that all the space on the
755+ * page was already used. If so, we just retry from the start. If we
756+ * didn't unlock, something has gone wrong if there's not enough space -
757+ * the test at the top should have prevented reaching this case.
758+ */
759+ pageFreeSpace = PageGetHeapFreeSpace (page );
760+ if (len > pageFreeSpace )
761+ {
762+ if (unlockedTargetBuffer )
716763{
717- LockBuffer (otherBuffer ,BUFFER_LOCK_UNLOCK );
764+ if (otherBuffer != InvalidBuffer )
765+ LockBuffer (otherBuffer ,BUFFER_LOCK_UNLOCK );
718766UnlockReleaseBuffer (buffer );
719767
720768gotoloop ;
721769}
722- }
723- else if (len > PageGetHeapFreeSpace (page ))
724- {
725- /* We should not get here given the test at the top */
726770elog (PANIC ,"tuple is too big: size %zu" ,len );
727771}
728772
@@ -735,7 +779,7 @@ RelationGetBufferForTuple(Relation relation, Size len,
735779 * current backend to make more insertions or not, which is probably a
736780 * good bet most of the time. So for now, don't add it to FSM yet.
737781 */
738- RelationSetTargetBlock (relation ,BufferGetBlockNumber ( buffer ) );
782+ RelationSetTargetBlock (relation ,targetBlock );
739783
740784return buffer ;
741785}