Re: Revised Patch to allow multiple table locks in "Unison" - Mailing list pgsql-patches

From Bruce Momjian
Subject Re: Revised Patch to allow multiple table locks in "Unison"
Date
Msg-id 200107262148.f6QLmDS06315@candle.pha.pa.us
Whole thread Raw
In response to Revised Patch to allow multiple table locks in "Unison"  (Neil Padgett <npadgett@redhat.com>)
List pgsql-patches
The problem with this patch is that it doesn't always lock the tables in
the order supplied by the user, leading to possible deadlock.  My guess
is that you need to try locking A, B, C and if B hangs, I think you need
to sleep on B, and when you get it, release the lock on B and try A, B,
C again.  I know this is a pain and could fail multiple times, but I
think we have to do it this ay.


> Here is the revised patch to allow the syntax:
>
> LOCK a,b,c;
>
> It uses the method I described as the "new" method in a previous
> message.
>
> Neil
>
> --
> Neil Padgett
> Red Hat Canada Ltd.                       E-Mail:  npadgett@redhat.com
> 2323 Yonge Street, Suite #300,
> Toronto, ON  M4P 2C9
>
> Index: src/backend/commands/command.c
> ===================================================================
> RCS file:
> /home/projects/pgsql/cvsroot/pgsql/src/backend/commands/command.c,v
> retrieving revision 1.136
> diff -c -p -r1.136 command.c
> *** src/backend/commands/command.c    2001/07/16 05:06:57    1.136
> --- src/backend/commands/command.c    2001/07/26 00:02:24
> *************** needs_toast_table(Relation rel)
> *** 1994,2019 ****
>   void
>   LockTableCommand(LockStmt *lockstmt)
>   {
> !     Relation    rel;
> !     int            aclresult;
>
> !     rel = heap_openr(lockstmt->relname, NoLock);
>
> !     if (rel->rd_rel->relkind != RELKIND_RELATION)
> !         elog(ERROR, "LOCK TABLE: %s is not a table", lockstmt->relname);
>
> !     if (lockstmt->mode == AccessShareLock)
> !         aclresult = pg_aclcheck(lockstmt->relname, GetUserId(), ACL_SELECT);
> !     else
> !         aclresult = pg_aclcheck(lockstmt->relname, GetUserId(),
> !                                 ACL_UPDATE | ACL_DELETE);
>
> !     if (aclresult != ACLCHECK_OK)
> !         elog(ERROR, "LOCK TABLE: permission denied");
>
> !     LockRelation(rel, lockstmt->mode);
>
> !     heap_close(rel, NoLock);    /* close rel, keep lock */
>   }
>
>
> --- 1994,2170 ----
>   void
>   LockTableCommand(LockStmt *lockstmt)
>   {
> !     int         relCnt;
>
> !     relCnt = length(lockstmt -> rellist);
>
> !     /* Handle a single relation lock specially to avoid overhead on
> likely the
> !        most common case */
>
> !     if(relCnt == 1)
> !     {
>
> !         /* Locking a single table */
>
> !         Relation    rel;
> !         int            aclresult;
> !         char *relname;
>
> !         relname = strVal(lfirst(lockstmt->rellist));
> !
> !         freeList(lockstmt->rellist);
> !
> !         rel = heap_openr(relname, NoLock);
> !
> !         if (rel->rd_rel->relkind != RELKIND_RELATION)
> !             elog(ERROR, "LOCK TABLE: %s is not a table", relname);
> !
> !         if (lockstmt->mode == AccessShareLock)
> !             aclresult = pg_aclcheck(relname, GetUserId(),
> !                                     ACL_SELECT);
> !         else
> !             aclresult = pg_aclcheck(relname, GetUserId(),
> !                                     ACL_UPDATE | ACL_DELETE);
> !
> !         if (aclresult != ACLCHECK_OK)
> !             elog(ERROR, "LOCK TABLE: permission denied");
> !
> !         LockRelation(rel, lockstmt->mode);
> !
> !         pfree(relname);
> !
> !         heap_close(rel, NoLock);    /* close rel, keep lock */
> !     }
> !     else
> !     {
> !         List *p;
> !         Relation *relationArray;
> !         Relation *pRel;
> !         Relation *blockingLockTarget;
> !         bool allLocked = false;
> !
> !         /* Locking multiple tables */
> !
> !         /* Create an array of relations */
> !
> !         relationArray = palloc(relCnt * sizeof(Relation));
> !         blockingLockTarget = relationArray;
> !
> !         pRel = relationArray;
> !
> !         /* Iterate over the list and populate the relation array */
> !
> !         foreach(p, lockstmt->rellist)
> !         {
> !             char *relname = strVal(lfirst(p));
> !             int            aclresult;
> !
> !             *pRel = heap_openr(relname, NoLock);
> !
> !             if ((*pRel)->rd_rel->relkind != RELKIND_RELATION)
> !                 elog(ERROR, "LOCK TABLE: %s is not a table",
> !                      relname);
> !
> !             if (lockstmt->mode == AccessShareLock)
> !                 aclresult = pg_aclcheck(relname, GetUserId(),
> !                                         ACL_SELECT);
> !             else
> !                 aclresult = pg_aclcheck(relname, GetUserId(),
> !                                         ACL_UPDATE | ACL_DELETE);
> !
> !             if (aclresult != ACLCHECK_OK)
> !                 elog(ERROR, "LOCK TABLE: permission denied");
> !
> !             pRel++;
> !             pfree(relname);
> !         }
> !
> !         /* Now acquire locks on all the relations */
> !
> !         while(!allLocked)
> !         {
> !
> !             allLocked = true;
> !
> !             /* Lock the blocking lock target (initially the first lock
> !                in the user's list */
> !
> !             LockRelation(*blockingLockTarget, lockstmt->mode);
> !
> !             /* Lock is now obtained on the lock target, now grab locks in a
> !                non-blocking way on the rest of the list */
> !
> !             for(pRel = blockingLockTarget + 1;
> !                 pRel < relationArray + relCnt;
> !                 pRel++)
> !             {
> !                 if(!ConditionalLockRelation(*pRel, lockstmt->mode))
> !                 {
> !                     Relation *pRelUnlock;
> !
> !                     /* Flag that all relations were not successfully
> !                        locked */
> !
> !                     allLocked = false;
> !
> !                     /* A lock wasn't obtained, so unlock all others */
> !
> !                     for(pRelUnlock = blockingLockTarget;
> !                         pRelUnlock < pRel;
> !                         pRelUnlock++)
> !                         UnlockRelation(*pRelUnlock, lockstmt->mode);
> !
> !                     /* Next time, do our blocking lock on the contended lock */
> !
> !                     blockingLockTarget = pRel;
> !
> !                     /* Now break out and try again */
> !
> !                     break;
> !                 }
> !             }
> !
> !             /* Now, lock anything before the blocking lock target in the lock
> !                target array, if we were successful in getting locks on
> !                everything after and including the blocking target */
> !
> !             if(allLocked)
> !             {
> !                 for(pRel = relationArray;
> !                     pRel < blockingLockTarget;
> !                     pRel++)
> !                 {
> !                     if(!ConditionalLockRelation(*pRel, lockstmt->mode))
> !                     {
> !                         Relation *pRelUnlock;
> !
> !                         /* Flag that all relations were not successfully
> !                            locked */
> !
> !                         allLocked = false;
> !
> !                         /* Lock wasn't obtained, so unlock all others */
> !
> !                         for(pRelUnlock = relationArray;
> !                             pRelUnlock < pRel;
> !                             pRelUnlock++)
> !                             UnlockRelation(*pRelUnlock, lockstmt->mode);
> !
> !                         /* Next time, do our blocking lock on the contended
> !                            lock */
> !
> !                         blockingLockTarget = pRel;
> !
> !                         /* Now break out and try again */
> !
> !                         break;
> !                     }
> !                 }
> !             }
> !         }
> !
> !         pfree(relationArray);
> !     }
>   }
>
>
> Index: src/backend/nodes/copyfuncs.c
> ===================================================================
> RCS file:
> /home/projects/pgsql/cvsroot/pgsql/src/backend/nodes/copyfuncs.c,v
> retrieving revision 1.148
> diff -c -p -r1.148 copyfuncs.c
> *** src/backend/nodes/copyfuncs.c    2001/07/16 19:07:37    1.148
> --- src/backend/nodes/copyfuncs.c    2001/07/26 00:02:24
> *************** _copyLockStmt(LockStmt *from)
> *** 2425,2432 ****
>   {
>       LockStmt   *newnode = makeNode(LockStmt);
>
> !     if (from->relname)
> !         newnode->relname = pstrdup(from->relname);
>       newnode->mode = from->mode;
>
>       return newnode;
> --- 2425,2432 ----
>   {
>       LockStmt   *newnode = makeNode(LockStmt);
>
> !     Node_Copy(from, newnode, rellist);
> !
>       newnode->mode = from->mode;
>
>       return newnode;
> Index: src/backend/nodes/equalfuncs.c
> ===================================================================
> RCS file:
> /home/projects/pgsql/cvsroot/pgsql/src/backend/nodes/equalfuncs.c,v
> retrieving revision 1.96
> diff -c -p -r1.96 equalfuncs.c
> *** src/backend/nodes/equalfuncs.c    2001/07/16 19:07:38    1.96
> --- src/backend/nodes/equalfuncs.c    2001/07/26 00:02:24
> *************** _equalDropUserStmt(DropUserStmt *a, Drop
> *** 1283,1289 ****
>   static bool
>   _equalLockStmt(LockStmt *a, LockStmt *b)
>   {
> !     if (!equalstr(a->relname, b->relname))
>           return false;
>       if (a->mode != b->mode)
>           return false;
> --- 1283,1289 ----
>   static bool
>   _equalLockStmt(LockStmt *a, LockStmt *b)
>   {
> !     if (!equal(a->rellist, b->rellist))
>           return false;
>       if (a->mode != b->mode)
>           return false;
> Index: src/backend/parser/gram.y
> ===================================================================
> RCS file: /home/projects/pgsql/cvsroot/pgsql/src/backend/parser/gram.y,v
> retrieving revision 2.238
> diff -c -p -r2.238 gram.y
> *** src/backend/parser/gram.y    2001/07/16 19:07:40    2.238
> --- src/backend/parser/gram.y    2001/07/26 00:02:25
> *************** DeleteStmt:  DELETE FROM relation_expr w
> *** 3280,3290 ****
>                   }
>           ;
>
> ! LockStmt:    LOCK_P opt_table relation_name opt_lock
>                   {
>                       LockStmt *n = makeNode(LockStmt);
>
> !                     n->relname = $3;
>                       n->mode = $4;
>                       $$ = (Node *)n;
>                   }
> --- 3280,3290 ----
>                   }
>           ;
>
> ! LockStmt:    LOCK_P opt_table relation_name_list opt_lock
>                   {
>                       LockStmt *n = makeNode(LockStmt);
>
> !                     n->rellist = $3;
>                       n->mode = $4;
>                       $$ = (Node *)n;
>                   }
> Index: src/include/nodes/parsenodes.h
> ===================================================================
> RCS file:
> /home/projects/pgsql/cvsroot/pgsql/src/include/nodes/parsenodes.h,v
> retrieving revision 1.136
> diff -c -p -r1.136 parsenodes.h
> *** src/include/nodes/parsenodes.h    2001/07/16 19:07:40    1.136
> --- src/include/nodes/parsenodes.h    2001/07/26 00:02:26
> *************** typedef struct VariableResetStmt
> *** 760,766 ****
>   typedef struct LockStmt
>   {
>       NodeTag        type;
> !     char       *relname;        /* relation to lock */
>       int            mode;            /* lock mode */
>   } LockStmt;
>
> --- 760,766 ----
>   typedef struct LockStmt
>   {
>       NodeTag        type;
> !     List       *rellist;        /* relation to lock */
>       int            mode;            /* lock mode */
>   } LockStmt;
>
> Index: src/interfaces/ecpg/preproc/preproc.y
> ===================================================================
> RCS file:
> /home/projects/pgsql/cvsroot/pgsql/src/interfaces/ecpg/preproc/preproc.y,v
> retrieving revision 1.146
> diff -c -p -r1.146 preproc.y
> *** src/interfaces/ecpg/preproc/preproc.y    2001/07/16 05:07:00    1.146
> --- src/interfaces/ecpg/preproc/preproc.y    2001/07/26 00:02:27
> *************** DeleteStmt:  DELETE FROM relation_expr w
> *** 2421,2427 ****
>                   }
>           ;
>
> ! LockStmt:  LOCK_P opt_table relation_name opt_lock
>                   {
>                       $$ = cat_str(4, make_str("lock"), $2, $3, $4);
>                   }
> --- 2421,2427 ----
>                   }
>           ;
>
> ! LockStmt:  LOCK_P opt_table relation_name_list opt_lock
>                   {
>                       $$ = cat_str(4, make_str("lock"), $2, $3, $4);
>                   }
>
> ---------------------------(end of broadcast)---------------------------
> TIP 6: Have you searched our list archives?
>
> http://www.postgresql.org/search.mpl
>

--
  Bruce Momjian                        |  http://candle.pha.pa.us
  pgman@candle.pha.pa.us               |  (610) 853-3000
  +  If your life is a hard drive,     |  830 Blythe Avenue
  +  Christ can be your backup.        |  Drexel Hill, Pennsylvania 19026

pgsql-patches by date:

Previous
From: Bruce Momjian
Date:
Subject: Re: Revised Patch to allow multiple table locks in "Unison"
Next
From: Tom Lane
Date:
Subject: Re: Revised Patch to allow multiple table locks in "Unison"