@@ -250,7 +250,8 @@ static HTAB *LockMethodProcLockHash;
250250static HTAB * LockMethodLocalHash ;
251251
252252
253- /* private state for GrantAwaitedLock */
253+ /* private state for error cleanup */
254+ static LOCALLOCK * StrongLockInProgress ;
254255static LOCALLOCK * awaitedLock ;
255256static ResourceOwner awaitedOwner ;
256257
@@ -338,6 +339,8 @@ static void RemoveLocalLock(LOCALLOCK *locallock);
338339static PROCLOCK * SetupLockInTable (LockMethod lockMethodTable ,PGPROC * proc ,
339340const LOCKTAG * locktag ,uint32 hashcode ,LOCKMODE lockmode );
340341static void GrantLockLocal (LOCALLOCK * locallock ,ResourceOwner owner );
342+ static void BeginStrongLockAcquire (LOCALLOCK * locallock ,uint32 fasthashcode );
343+ static void FinishStrongLockAcquire (void );
341344static void WaitOnLock (LOCALLOCK * locallock ,ResourceOwner owner );
342345static void ReleaseLockForOwner (LOCALLOCK * locallock ,ResourceOwner owner );
343346static bool UnGrantLock (LOCK * lock ,LOCKMODE lockmode ,
@@ -738,22 +741,11 @@ LockAcquireExtended(const LOCKTAG *locktag,
738741}
739742else if (FastPathStrongMode (lockmode ))
740743{
741- /*
742- * Adding to a memory location is not atomic, so we take a
743- * spinlock to ensure we don't collide with someone else trying
744- * to bump the count at the same time.
745- *
746- * XXX: It might be worth considering using an atomic fetch-and-add
747- * instruction here, on architectures where that is supported.
748- */
749- Assert (locallock -> holdsStrongLockCount == FALSE);
750- SpinLockAcquire (& FastPathStrongRelationLocks -> mutex );
751- FastPathStrongRelationLocks -> count [fasthashcode ]++ ;
752- locallock -> holdsStrongLockCount = TRUE;
753- SpinLockRelease (& FastPathStrongRelationLocks -> mutex );
744+ BeginStrongLockAcquire (locallock ,fasthashcode );
754745if (!FastPathTransferRelationLocks (lockMethodTable ,locktag ,
755746hashcode ))
756747{
748+ AbortStrongLockAcquire ();
757749if (reportMemoryError )
758750ereport (ERROR ,
759751(errcode (ERRCODE_OUT_OF_MEMORY ),
@@ -779,6 +771,7 @@ LockAcquireExtended(const LOCKTAG *locktag,
779771hashcode ,lockmode );
780772if (!proclock )
781773{
774+ AbortStrongLockAcquire ();
782775LWLockRelease (partitionLock );
783776if (reportMemoryError )
784777ereport (ERROR ,
@@ -820,6 +813,7 @@ LockAcquireExtended(const LOCKTAG *locktag,
820813 */
821814if (dontWait )
822815{
816+ AbortStrongLockAcquire ();
823817if (proclock -> holdMask == 0 )
824818{
825819uint32 proclock_hashcode ;
@@ -884,6 +878,7 @@ LockAcquireExtended(const LOCKTAG *locktag,
884878 */
885879if (!(proclock -> holdMask & LOCKBIT_ON (lockmode )))
886880{
881+ AbortStrongLockAcquire ();
887882PROCLOCK_PRINT ("LockAcquire: INCONSISTENT" ,proclock );
888883LOCK_PRINT ("LockAcquire: INCONSISTENT" ,lock ,lockmode );
889884/* Should we retry ? */
@@ -894,6 +889,12 @@ LockAcquireExtended(const LOCKTAG *locktag,
894889LOCK_PRINT ("LockAcquire: granted" ,lock ,lockmode );
895890}
896891
892+ /*
893+ * Lock state is fully up-to-date now; if we error out after this, no
894+ * special error cleanup is required.
895+ */
896+ FinishStrongLockAcquire ();
897+
897898LWLockRelease (partitionLock );
898899
899900/*
@@ -1349,6 +1350,64 @@ GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner)
13491350locallock -> numLockOwners ++ ;
13501351}
13511352
1353+ /*
1354+ * BeginStrongLockAcquire - inhibit use of fastpath for a given LOCALLOCK,
1355+ * and arrange for error cleanup if it fails
1356+ */
1357+ static void
1358+ BeginStrongLockAcquire (LOCALLOCK * locallock ,uint32 fasthashcode )
1359+ {
1360+ Assert (StrongLockInProgress == NULL );
1361+ Assert (locallock -> holdsStrongLockCount == FALSE);
1362+
1363+ /*
1364+ * Adding to a memory location is not atomic, so we take a
1365+ * spinlock to ensure we don't collide with someone else trying
1366+ * to bump the count at the same time.
1367+ *
1368+ * XXX: It might be worth considering using an atomic fetch-and-add
1369+ * instruction here, on architectures where that is supported.
1370+ */
1371+
1372+ SpinLockAcquire (& FastPathStrongRelationLocks -> mutex );
1373+ FastPathStrongRelationLocks -> count [fasthashcode ]++ ;
1374+ locallock -> holdsStrongLockCount = TRUE;
1375+ StrongLockInProgress = locallock ;
1376+ SpinLockRelease (& FastPathStrongRelationLocks -> mutex );
1377+ }
1378+
1379+ /*
1380+ * FinishStrongLockAcquire - cancel pending cleanup for a strong lock
1381+ * acquisition once it's no longer needed
1382+ */
1383+ static void
1384+ FinishStrongLockAcquire (void )
1385+ {
1386+ StrongLockInProgress = NULL ;
1387+ }
1388+
1389+ /*
1390+ * AbortStrongLockAcquire - undo strong lock state changes performed by
1391+ * BeginStrongLockAcquire.
1392+ */
1393+ void
1394+ AbortStrongLockAcquire (void )
1395+ {
1396+ uint32 fasthashcode ;
1397+ LOCALLOCK * locallock = StrongLockInProgress ;
1398+
1399+ if (locallock == NULL )
1400+ return ;
1401+
1402+ fasthashcode = FastPathStrongLockHashPartition (locallock -> hashcode );
1403+ Assert (locallock -> holdsStrongLockCount == TRUE);
1404+ SpinLockAcquire (& FastPathStrongRelationLocks -> mutex );
1405+ FastPathStrongRelationLocks -> count [fasthashcode ]-- ;
1406+ locallock -> holdsStrongLockCount = FALSE;
1407+ StrongLockInProgress = NULL ;
1408+ SpinLockRelease (& FastPathStrongRelationLocks -> mutex );
1409+ }
1410+
13521411/*
13531412 * GrantAwaitedLock -- call GrantLockLocal for the lock we are doing
13541413 *WaitOnLock on.
@@ -1414,7 +1473,7 @@ WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner)
14141473 * We can and do use a PG_TRY block to try to clean up after failure, but
14151474 * this still has a major limitation: elog(FATAL) can occur while waiting
14161475 * (eg, a "die" interrupt), and then control won't come back here. So all
1417- * cleanup of essential state should happen inLockWaitCancel , not here.
1476+ * cleanup of essential state should happen inLockErrorCleanup , not here.
14181477 * We can use PG_TRY to clear the "waiting" status flags, since doing that
14191478 * is unimportant if the process exits.
14201479 */
@@ -1441,7 +1500,7 @@ WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner)
14411500}
14421501PG_CATCH ();
14431502{
1444- /* In this path, awaitedLock remains set untilLockWaitCancel */
1503+ /* In this path, awaitedLock remains set untilLockErrorCleanup */
14451504
14461505/* Report change to non-waiting status */
14471506pgstat_report_waiting (false);