77 *
88 *
99 * IDENTIFICATION
10- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.51 1999/05/10 00:45:43 momjian Exp $
10+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.52 1999/05/13 15:55:44 momjian Exp $
1111 *
1212 * NOTES
1313 * Outside modules can create a lock table and acquire/release
@@ -83,9 +83,9 @@ static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode);
8383
8484#define LOCK_PRINT_AUX (where ,lock ,type ) \
8585TPRINTF(TRACE_ALL, \
86- "%s: lock(%x) tbl(%d) rel(%u) db(%d ) obj(%u) mask(%x) " \
87- "hold(%d,%d,%d,%d,%d)=%d " \
88- "act(%d,%d,%d,%d,%d)=%d wait(%d) type(%s)", \
86+ "%s: lock(%x) tbl(%d) rel(%u) db(%u ) obj(%u) mask(%x) " \
87+ "hold(%d,%d,%d,%d,%d,%d,%d )=%d " \
88+ "act(%d,%d,%d,%d,%d,%d,%d )=%d wait(%d) type(%s)", \
8989 where, \
9090 MAKE_OFFSET(lock), \
9191 lock->tag.lockmethod, \
@@ -98,12 +98,16 @@ static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode);
9898 lock->holders[3], \
9999 lock->holders[4], \
100100 lock->holders[5], \
101+ lock->holders[6], \
102+ lock->holders[7], \
101103 lock->nHolding, \
102104 lock->activeHolders[1], \
103105 lock->activeHolders[2], \
104106 lock->activeHolders[3], \
105107 lock->activeHolders[4], \
106108 lock->activeHolders[5], \
109+ lock->activeHolders[6], \
110+ lock->activeHolders[7], \
107111 lock->nActive, \
108112 lock->waitProcs.size, \
109113 lock_types[type])
@@ -119,8 +123,8 @@ static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode);
119123
120124#define XID_PRINT_AUX (where ,xidentP ) \
121125TPRINTF(TRACE_ALL, \
122- "%s: xid(%x) lock(%x) tbl(%d) pid(%d) xid(%d ) " \
123- "hold(%d,%d,%d,%d,%d)=%d", \
126+ "%s: xid(%x) lock(%x) tbl(%d) pid(%d) xid(%u ) " \
127+ "hold(%d,%d,%d,%d,%d,%d,%d )=%d", \
124128 where, \
125129 MAKE_OFFSET(xidentP), \
126130 xidentP->tag.lock, \
@@ -132,6 +136,8 @@ static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode);
132136 xidentP->holders[3], \
133137 xidentP->holders[4], \
134138 xidentP->holders[5], \
139+ xidentP->holders[6], \
140+ xidentP->holders[7], \
135141 xidentP->nHolding)
136142
137143#else /* !LOCK_MGR_DEBUG */
@@ -1561,19 +1567,26 @@ LockingDisabled()
15611567 * We have already locked the master lock before being called.
15621568 */
15631569bool
1564- DeadLockCheck (SHM_QUEUE * lockQueue ,LOCK * findlock , bool skip_check )
1570+ DeadLockCheck (void * proc ,LOCK * findlock )
15651571{
1566- int done ;
15671572XIDLookupEnt * xidLook = NULL ;
15681573XIDLookupEnt * tmp = NULL ;
1574+ PROC * thisProc = (PROC * )proc ,
1575+ * waitProc ;
1576+ SHM_QUEUE * lockQueue = & (thisProc -> lockQueue );
15691577SHMEM_OFFSET end = MAKE_OFFSET (lockQueue );
15701578LOCK * lock ;
1579+ PROC_QUEUE * waitQueue ;
1580+ int i ,
1581+ j ;
1582+ bool first_run = (thisProc == MyProc ),
1583+ done ;
15711584
15721585static PROC * checked_procs [MAXBACKENDS ];
15731586static int nprocs ;
15741587
15751588/* initialize at start of recursion */
1576- if (skip_check )
1589+ if (first_run )
15771590{
15781591checked_procs [0 ]= MyProc ;
15791592nprocs = 1 ;
@@ -1593,74 +1606,186 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
15931606
15941607LOCK_PRINT ("DeadLockCheck" ,lock ,0 );
15951608
1609+ if (lock -> tag .relId == 0 )/* user' lock */
1610+ gotonxtl ;
1611+
15961612/*
1597- * This is our only check to see if we found the lock we want.
1598- *
1599- * The lock we are waiting for is already in MyProc->lockQueue so we
1600- * need to skip it here. We are trying to find it in someone
1601- * else's lockQueue. bjm
1613+ * waitLock is always in lockQueue of waiting proc,
1614+ * if !first_run then upper caller will handle waitProcs
1615+ * queue of waitLock.
16021616 */
1603- if (lock == findlock && !skip_check )
1604- return true ;
1617+ if (thisProc -> waitLock == lock && !first_run )
1618+ goto nxtl ;
16051619
1620+ /*
1621+ * If we found proc holding findlock and sleeping on some my
1622+ * other lock then we have to check does it block me or
1623+ * another waiters.
1624+ */
1625+ if (lock == findlock && !first_run )
16061626{
1607- PROC_QUEUE * waitQueue = & (lock -> waitProcs );
1608- PROC * proc ;
1609- int i ;
1610- int j ;
1627+ LOCKMETHODCTL * lockctl =
1628+ LockMethodTable [DEFAULT_LOCKMETHOD ]-> ctl ;
1629+ int lm ;
16111630
1612- proc = ( PROC * ) MAKE_PTR ( waitQueue -> links . prev );
1613- for (i = 0 ; i < waitQueue -> size ; i ++ )
1631+ Assert ( xidLook -> nHolding > 0 );
1632+ for (lm = 1 ; lm <= lockctl -> numLockModes ; lm ++ )
16141633{
1634+ if (xidLook -> holders [lm ]> 0 &&
1635+ lockctl -> conflictTab [lm ]& findlock -> waitMask )
1636+ return true;
1637+ }
1638+ /*
1639+ * Else - get the next lock from thisProc's lockQueue
1640+ */
1641+ gotonxtl ;
1642+ }
1643+
1644+ waitQueue = & (lock -> waitProcs );
1645+ waitProc = (PROC * )MAKE_PTR (waitQueue -> links .prev );
1646+
1647+ for (i = 0 ;i < waitQueue -> size ;i ++ )
1648+ {
1649+ if (waitProc == thisProc )
1650+ {
1651+ Assert (waitProc -> waitLock == lock );
1652+ Assert (waitProc == MyProc );
1653+ waitProc = (PROC * )MAKE_PTR (waitProc -> links .prev );
1654+ continue ;
1655+ }
1656+ if (lock == findlock )/* first_run also true */
1657+ {
1658+ LOCKMETHODCTL * lockctl =
1659+ LockMethodTable [DEFAULT_LOCKMETHOD ]-> ctl ;
1660+
16151661/*
1616- * If I hold some locks on findlock and another proc
1617- * waits on it holding locks too - check if we are
1618- * waiting one another.
1662+ * If me blocked by his holdlock...
16191663 */
1620- if (proc != MyProc &&
1621- lock == findlock && /* skip_check also true */
1622- MyProc -> holdLock )
1664+ if (lockctl -> conflictTab [MyProc -> token ]& waitProc -> holdLock )
16231665{
1624- LOCKMETHODCTL * lockctl =
1625- LockMethodTable [DEFAULT_LOCKMETHOD ]-> ctl ;
1626-
1627- Assert (skip_check );
1628- if (lockctl -> conflictTab [MyProc -> token ]& proc -> holdLock &&
1629- lockctl -> conflictTab [proc -> token ]& MyProc -> holdLock )
1666+ /* and he blocked by me -> deadlock */
1667+ if (lockctl -> conflictTab [waitProc -> token ]& MyProc -> holdLock )
16301668return true;
1669+ /* we shouldn't look at lockQueue of our blockers */
1670+ waitProc = (PROC * )MAKE_PTR (waitProc -> links .prev );
1671+ continue ;
16311672}
1632-
16331673/*
1634- * No sense in looking at the wait queue of the lock we
1635- * are looking for. If lock == findlock, and I got here,
1636- * skip_check must be true too.
1674+ * If he isn't blocked by me and we request non-conflicting
1675+ * lock modes - no deadlock here because of he isn't
1676+ * blocked by me in any sence (explicitle or implicitly).
1677+ * Note that we don't do like test if !first_run
1678+ * (when thisProc is holder and non-waiter on lock) and so
1679+ * we call DeadLockCheck below for every waitProc in
1680+ * thisProc->lockQueue, even for waitProc-s un-blocked
1681+ * by thisProc. Should we? This could save us some time...
16371682 */
1638- if (lock != findlock )
1683+ if (!(lockctl -> conflictTab [waitProc -> token ]& MyProc -> holdLock )&&
1684+ !(lockctl -> conflictTab [waitProc -> token ]& (1 <<MyProc -> token )))
16391685{
1640- for (j = 0 ;j < nprocs ;j ++ )
1641- if (checked_procs [j ]== proc )
1642- break ;
1643- if (j >=nprocs && lock != findlock )
1644- {
1645- Assert (nprocs < MAXBACKENDS );
1646- checked_procs [nprocs ++ ]= proc ;
1686+ waitProc = (PROC * )MAKE_PTR (waitProc -> links .prev );
1687+ continue ;
1688+ }
1689+ }
16471690
1691+ /*
1692+ * Look in lockQueue of this waitProc, if didn't do this before.
1693+ */
1694+ for (j = 0 ;j < nprocs ;j ++ )
1695+ {
1696+ if (checked_procs [j ]== waitProc )
1697+ break ;
1698+ }
1699+ if (j >=nprocs )
1700+ {
1701+ Assert (nprocs < MAXBACKENDS );
1702+ checked_procs [nprocs ++ ]= waitProc ;
1703+
1704+ if (DeadLockCheck (waitProc ,findlock ))
1705+ {
1706+ LOCKMETHODCTL * lockctl =
1707+ LockMethodTable [DEFAULT_LOCKMETHOD ]-> ctl ;
1708+ int holdLock ;
1709+
1710+ /*
1711+ * Ok, but is waitProc waiting for me (thisProc) ?
1712+ */
1713+ if (thisProc -> waitLock == lock )
1714+ {
1715+ Assert (first_run );
1716+ holdLock = thisProc -> holdLock ;
1717+ }
1718+ else /* should we cache holdLock ? */
1719+ {
1720+ int lm ,tmpMask = 2 ;
1721+
1722+ Assert (xidLook -> nHolding > 0 );
1723+ for (holdLock = 0 ,lm = 1 ;
1724+ lm <=lockctl -> numLockModes ;
1725+ lm ++ ,tmpMask <<=1 )
1726+ {
1727+ if (xidLook -> holders [lm ]> 0 )
1728+ holdLock |=tmpMask ;
1729+ }
1730+ Assert (holdLock != 0 );
1731+ }
1732+ if (lockctl -> conflictTab [waitProc -> token ]& holdLock )
1733+ {
16481734/*
1649- * For non-MyProc entries, we are looking only
1650- * waiters, not necessarily people who already
1651- * hold locks and are waiting. Now we check for
1652- * cases where we have two or more tables in a
1653- * deadlock. We do this by continuing to search
1654- * for someone holding a lock bjm
1735+ * Last attempt to avoid deadlock - try to wakeup
1736+ * myself.
16551737 */
1656- if (DeadLockCheck (& (proc -> lockQueue ),findlock , false))
1657- return true;
1738+ if (first_run )
1739+ {
1740+ if (LockResolveConflicts (DEFAULT_LOCKMETHOD ,
1741+ MyProc -> waitLock ,
1742+ MyProc -> token ,
1743+ MyProc -> xid ,
1744+ NULL )== STATUS_OK )
1745+ {
1746+ GrantLock (MyProc -> waitLock ,MyProc -> token );
1747+ (MyProc -> waitLock -> waitProcs .size )-- ;
1748+ ProcWakeup (MyProc ,NO_ERROR );
1749+ return false;
1750+ }
1751+ }
1752+ return true;
1753+ }
1754+ /*
1755+ * Hell! Is he blocked by any (other) holder ?
1756+ */
1757+ if (LockResolveConflicts (DEFAULT_LOCKMETHOD ,
1758+ lock ,
1759+ waitProc -> token ,
1760+ waitProc -> xid ,
1761+ NULL )!= STATUS_OK )
1762+ {
1763+ /*
1764+ * Blocked by others - no deadlock...
1765+ */
1766+ #ifdef DEADLOCK_DEBUG
1767+ LOCK_PRINT ("DeadLockCheck: blocked by others" ,
1768+ lock ,waitProc -> token );
1769+ #endif
1770+ waitProc = (PROC * )MAKE_PTR (waitProc -> links .prev );
1771+ continue ;
16581772}
1773+ /*
1774+ * Well - wakeup this guy! This is the case of
1775+ * implicit blocking: thisProc blocked someone who blocked
1776+ * waitProc by the fact that he (someone) is already
1777+ * waiting for lock (we do this for anti-starving).
1778+ */
1779+ GrantLock (lock ,waitProc -> token );
1780+ waitQueue -> size -- ;
1781+ waitProc = ProcWakeup (waitProc ,NO_ERROR );
1782+ continue ;
16591783}
1660- proc = (PROC * )MAKE_PTR (proc -> links .prev );
16611784}
1785+ waitProc = (PROC * )MAKE_PTR (waitProc -> links .prev );
16621786}
16631787
1788+ nxtl :;
16641789if (done )
16651790break ;
16661791SHMQueueFirst (& xidLook -> queue , (Pointer * )& tmp ,& tmp -> queue );