Re: Refactoring in lock.c - Mailing list pgsql-patches

From Tom Lane
Subject Re: Refactoring in lock.c
Date
Msg-id 10218.1116545830@sss.pgh.pa.us
Whole thread Raw
In response to Re: Refactoring in lock.c  (Alvaro Herrera <alvherre@surnet.cl>)
List pgsql-patches
Alvaro Herrera <alvherre@surnet.cl> writes:
> Ok, new version of this patch, adjusted per your comments.  Thanks.

Applied with revisions.  I backed off the idea of removing LockRelease's
return value after looking at contrib/userlock; it seems possible that
some applications out there are depending on being able to issue
release for locks they don't hold.  Also fixed the logic: the code as
submitted was slightly wrong since it would fail to waken waiters if any
modes remain held.  ProcLockWakeup may need to be run even if we still
have a proclock.  So I renamed the function to CleanUpLock, which
hopefully is more indicative of what it really does.

Patch as applied is attached.

            regards, tom lane

*** contrib/userlock/user_locks.c.orig    Tue May 17 17:35:04 2005
--- contrib/userlock/user_locks.c    Thu May 19 18:31:26 2005
***************
*** 75,79 ****
  int
  user_unlock_all(void)
  {
!     return LockReleaseAll(USER_LOCKMETHOD, true);
  }
--- 75,81 ----
  int
  user_unlock_all(void)
  {
!     LockReleaseAll(USER_LOCKMETHOD, true);
!
!     return true;
  }
*** src/backend/storage/lmgr/lock.c.orig    Wed May 11 12:13:55 2005
--- src/backend/storage/lmgr/lock.c    Thu May 19 19:21:17 2005
***************
*** 166,171 ****
--- 166,173 ----
                   int *myHolding);
  static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
                          PROCLOCK *proclock, LockMethod lockMethodTable);
+ static void CleanUpLock(LOCKMETHODID lockmethodid, LOCK *lock,
+                         PROCLOCK *proclock, bool wakeupNeeded);


  /*
***************
*** 589,601 ****
               * anyone to release the lock object later.
               */
              Assert(SHMQueueEmpty(&(lock->procLocks)));
!             lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
!                                         (void *) &(lock->tag),
!                                         HASH_REMOVE, NULL);
          }
          LWLockRelease(masterLock);
-         if (!lock)                /* hash remove failed? */
-             elog(WARNING, "lock table corrupted");
          ereport(ERROR,
                  (errcode(ERRCODE_OUT_OF_MEMORY),
                   errmsg("out of shared memory"),
--- 591,602 ----
               * anyone to release the lock object later.
               */
              Assert(SHMQueueEmpty(&(lock->procLocks)));
!             if (!hash_search(LockMethodLockHash[lockmethodid],
!                              (void *) &(lock->tag),
!                              HASH_REMOVE, NULL))
!                 elog(PANIC, "lock table corrupted");
          }
          LWLockRelease(masterLock);
          ereport(ERROR,
                  (errcode(ERRCODE_OUT_OF_MEMORY),
                   errmsg("out of shared memory"),
***************
*** 708,718 ****
              {
                  SHMQueueDelete(&proclock->lockLink);
                  SHMQueueDelete(&proclock->procLink);
!                 proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
!                                                (void *) &(proclock->tag),
!                                                     HASH_REMOVE, NULL);
!                 if (!proclock)
!                     elog(WARNING, "proclock table corrupted");
              }
              else
                  PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock);
--- 709,718 ----
              {
                  SHMQueueDelete(&proclock->lockLink);
                  SHMQueueDelete(&proclock->procLink);
!                 if (!hash_search(LockMethodProcLockHash[lockmethodid],
!                                  (void *) &(proclock->tag),
!                                  HASH_REMOVE, NULL))
!                     elog(PANIC, "proclock table corrupted");
              }
              else
                  PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock);
***************
*** 784,793 ****

      pfree(locallock->lockOwners);
      locallock->lockOwners = NULL;
!     locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid],
!                                           (void *) &(locallock->tag),
!                                           HASH_REMOVE, NULL);
!     if (!locallock)
          elog(WARNING, "locallock table corrupted");
  }

--- 784,792 ----

      pfree(locallock->lockOwners);
      locallock->lockOwners = NULL;
!     if (!hash_search(LockMethodLocalHash[lockmethodid],
!                      (void *) &(locallock->tag),
!                      HASH_REMOVE, NULL))
          elog(WARNING, "locallock table corrupted");
  }

***************
*** 994,999 ****
--- 993,1047 ----
  }

  /*
+  * CleanUpLock -- clean up after releasing a lock.  We garbage-collect the
+  * proclock and lock objects if possible, and call ProcLockWakeup if there
+  * are remaining requests and the caller says it's OK.  (Normally, this
+  * should be called after UnGrantLock, and wakeupNeeded is the result from
+  * UnGrantLock.)
+  *
+  * The locktable's masterLock must be held at entry, and will be
+  * held at exit.
+  */
+ static void
+ CleanUpLock(LOCKMETHODID lockmethodid, LOCK *lock, PROCLOCK *proclock,
+             bool wakeupNeeded)
+ {
+     /*
+      * If this was my last hold on this lock, delete my entry in the
+      * proclock table.
+      */
+     if (proclock->holdMask == 0)
+     {
+         PROCLOCK_PRINT("CleanUpLock: deleting", proclock);
+         SHMQueueDelete(&proclock->lockLink);
+         SHMQueueDelete(&proclock->procLink);
+         if (!hash_search(LockMethodProcLockHash[lockmethodid],
+                          (void *) &(proclock->tag),
+                          HASH_REMOVE, NULL))
+             elog(PANIC, "proclock table corrupted");
+     }
+
+     if (lock->nRequested == 0)
+     {
+         /*
+          * The caller just released the last lock, so garbage-collect the
+          * lock object.
+          */
+         LOCK_PRINT("CleanUpLock: deleting", lock, 0);
+         Assert(SHMQueueEmpty(&(lock->procLocks)));
+         if (!hash_search(LockMethodLockHash[lockmethodid],
+                          (void *) &(lock->tag),
+                          HASH_REMOVE, NULL))
+             elog(PANIC, "lock table corrupted");
+     }
+     else if (wakeupNeeded)
+     {
+         /* There are waiters on this lock, so wake them up. */
+         ProcLockWakeup(LockMethods[lockmethodid], lock);
+     }
+ }
+
+ /*
   * GrantLockLocal -- update the locallock data structures to show
   *        the lock request has been granted.
   *
***************
*** 1160,1189 ****

      /*
       * Delete the proclock immediately if it represents no already-held locks.
!      * This must happen now because if the owner of the lock decides to release
!      * it, and the requested/granted counts then go to zero, LockRelease
!      * expects there to be no remaining proclocks.
!      */
!     if (proclock->holdMask == 0)
!     {
!         PROCLOCK_PRINT("RemoveFromWaitQueue: deleting proclock", proclock);
!         SHMQueueDelete(&proclock->lockLink);
!         SHMQueueDelete(&proclock->procLink);
!         proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
!                                             (void *) &(proclock->tag),
!                                             HASH_REMOVE, NULL);
!         if (!proclock)
!             elog(WARNING, "proclock table corrupted");
!     }
!
!     /*
!      * There should still be some requests for the lock ... else what were
!      * we waiting for?  Therefore no need to delete the lock object.
       */
!     Assert(waitLock->nRequested > 0);
!
!     /* See if any other waiters for the lock can be woken up now */
!     ProcLockWakeup(LockMethods[lockmethodid], waitLock);
  }

  /*
--- 1208,1219 ----

      /*
       * Delete the proclock immediately if it represents no already-held locks.
!      * (This must happen now because if the owner of the lock decides to
!      * release it, and the requested/granted counts then go to zero,
!      * LockRelease expects there to be no remaining proclocks.)
!      * Then see if any other waiters for the lock can be woken up now.
       */
!     CleanUpLock(lockmethodid, waitLock, proclock, true);
  }

  /*
***************
*** 1221,1230 ****
      Assert(lockmethodid < NumLockMethods);
      lockMethodTable = LockMethods[lockmethodid];
      if (!lockMethodTable)
!     {
!         elog(WARNING, "lockMethodTable is null in LockRelease");
!         return FALSE;
!     }

      /*
       * Find the LOCALLOCK entry for this lock and lockmode
--- 1251,1257 ----
      Assert(lockmethodid < NumLockMethods);
      lockMethodTable = LockMethods[lockmethodid];
      if (!lockMethodTable)
!         elog(ERROR, "bad lock method: %d", lockmethodid);

      /*
       * Find the LOCALLOCK entry for this lock and lockmode
***************
*** 1328,1383 ****
          return FALSE;
      }

-     wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
-
      /*
!      * If this was my last hold on this lock, delete my entry in the
!      * proclock table.
       */
!     if (proclock->holdMask == 0)
!     {
!         PROCLOCK_PRINT("LockRelease: deleting proclock", proclock);
!         SHMQueueDelete(&proclock->lockLink);
!         SHMQueueDelete(&proclock->procLink);
!         proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
!                                             (void *) &(proclock->tag),
!                                             HASH_REMOVE, NULL);
!         if (!proclock)
!         {
!             LWLockRelease(masterLock);
!             elog(WARNING, "proclock table corrupted");
!             RemoveLocalLock(locallock);
!             return FALSE;
!         }
!     }

!     if (lock->nRequested == 0)
!     {
!         /*
!          * We've just released the last lock, so garbage-collect the lock
!          * object.
!          */
!         LOCK_PRINT("LockRelease: deleting lock", lock, lockmode);
!         Assert(SHMQueueEmpty(&(lock->procLocks)));
!         lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
!                                     (void *) &(lock->tag),
!                                     HASH_REMOVE, NULL);
!         if (!lock)
!         {
!             LWLockRelease(masterLock);
!             elog(WARNING, "lock table corrupted");
!             RemoveLocalLock(locallock);
!             return FALSE;
!         }
!     }
!     else
!     {
!         /*
!          * Wake up waiters if needed.
!          */
!         if (wakeupNeeded)
!             ProcLockWakeup(lockMethodTable, lock);
!     }

      LWLockRelease(masterLock);

--- 1355,1366 ----
          return FALSE;
      }

      /*
!      * Do the releasing.  CleanUpLock will waken any now-wakable waiters.
       */
!     wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);

!     CleanUpLock(lockmethodid, lock, proclock, wakeupNeeded);

      LWLockRelease(masterLock);

***************
*** 1397,1403 ****
   * allxids == false: release all locks with Xid != 0
   * (zero is the Xid used for "session" locks).
   */
! bool
  LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids)
  {
      HASH_SEQ_STATUS status;
--- 1380,1386 ----
   * allxids == false: release all locks with Xid != 0
   * (zero is the Xid used for "session" locks).
   */
! void
  LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids)
  {
      HASH_SEQ_STATUS status;
***************
*** 1418,1427 ****
      Assert(lockmethodid < NumLockMethods);
      lockMethodTable = LockMethods[lockmethodid];
      if (!lockMethodTable)
!     {
!         elog(WARNING, "bad lock method: %d", lockmethodid);
!         return FALSE;
!     }

      numLockModes = lockMethodTable->numLockModes;
      masterLock = lockMethodTable->masterLock;
--- 1401,1407 ----
      Assert(lockmethodid < NumLockMethods);
      lockMethodTable = LockMethods[lockmethodid];
      if (!lockMethodTable)
!         elog(ERROR, "bad lock method: %d", lockmethodid);

      numLockModes = lockMethodTable->numLockModes;
      masterLock = lockMethodTable->masterLock;
***************
*** 1516,1563 ****
          Assert(lock->nGranted <= lock->nRequested);
          LOCK_PRINT("LockReleaseAll: updated", lock, 0);

!         PROCLOCK_PRINT("LockReleaseAll: deleting", proclock);
!
!         /*
!          * Remove the proclock entry from the linked lists
!          */
!         SHMQueueDelete(&proclock->lockLink);
!         SHMQueueDelete(&proclock->procLink);
!
!         /*
!          * remove the proclock entry from the hashtable
!          */
!         proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
!                                             (void *) &(proclock->tag),
!                                             HASH_REMOVE,
!                                             NULL);
!         if (!proclock)
!         {
!             LWLockRelease(masterLock);
!             elog(WARNING, "proclock table corrupted");
!             return FALSE;
!         }

!         if (lock->nRequested == 0)
!         {
!             /*
!              * We've just released the last lock, so garbage-collect the
!              * lock object.
!              */
!             LOCK_PRINT("LockReleaseAll: deleting", lock, 0);
!             Assert(SHMQueueEmpty(&(lock->procLocks)));
!             lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
!                                         (void *) &(lock->tag),
!                                         HASH_REMOVE, NULL);
!             if (!lock)
!             {
!                 LWLockRelease(masterLock);
!                 elog(WARNING, "lock table corrupted");
!                 return FALSE;
!             }
!         }
!         else if (wakeupNeeded)
!             ProcLockWakeup(lockMethodTable, lock);

  next_item:
          proclock = nextHolder;
--- 1496,1505 ----
          Assert(lock->nGranted <= lock->nRequested);
          LOCK_PRINT("LockReleaseAll: updated", lock, 0);

!         Assert(proclock->holdMask == 0);

!         /* CleanUpLock will wake up waiters if needed. */
!         CleanUpLock(lockmethodid, lock, proclock, wakeupNeeded);

  next_item:
          proclock = nextHolder;
***************
*** 1569,1576 ****
      if (lockmethodid == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
          elog(LOG, "LockReleaseAll done");
  #endif
-
-     return TRUE;
  }

  /*
--- 1511,1516 ----
*** src/include/storage/lock.h.orig    Fri Apr 29 18:28:24 2005
--- src/include/storage/lock.h    Thu May 19 18:31:09 2005
***************
*** 361,367 ****
              TransactionId xid, LOCKMODE lockmode, bool dontWait);
  extern bool LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
              TransactionId xid, LOCKMODE lockmode);
! extern bool LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids);
  extern void LockReleaseCurrentOwner(void);
  extern void LockReassignCurrentOwner(void);
  extern int LockCheckConflicts(LockMethod lockMethodTable,
--- 361,367 ----
              TransactionId xid, LOCKMODE lockmode, bool dontWait);
  extern bool LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
              TransactionId xid, LOCKMODE lockmode);
! extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids);
  extern void LockReleaseCurrentOwner(void);
  extern void LockReassignCurrentOwner(void);
  extern int LockCheckConflicts(LockMethod lockMethodTable,

pgsql-patches by date:

Previous
From: Abhijit Menon-Sen
Date:
Subject: Re: md5(bytea)
Next
From: Neil Conway
Date:
Subject: Re: md5(bytea)