*** src/backend/storage/lmgr/lwlock.c.orig Sat Dec 29 19:48:03 2001 --- src/backend/storage/lmgr/lwlock.c Sun Dec 30 12:11:47 2001 *************** *** 30,35 **** --- 30,36 ---- typedef struct LWLock { slock_t mutex; /* Protects LWLock and queue of PROCs */ + bool releaseOK; /* T if ok to release waiters */ char exclusive; /* # of exclusive holders (0 or 1) */ int shared; /* # of shared holders (0..MaxBackends) */ PROC *head; /* head of list of waiting PROCs */ *************** *** 67,75 **** PRINT_LWDEBUG(const char *where, LWLockId lockid, const volatile LWLock *lock) { if (Trace_lwlocks) ! elog(DEBUG, "%s(%d): excl %d shared %d head %p", where, (int) lockid, ! (int) lock->exclusive, lock->shared, lock->head); } inline static void --- 68,77 ---- PRINT_LWDEBUG(const char *where, LWLockId lockid, const volatile LWLock *lock) { if (Trace_lwlocks) ! elog(DEBUG, "%s(%d): excl %d shared %d head %p rOK %d", where, (int) lockid, ! (int) lock->exclusive, lock->shared, lock->head, ! (int) lock->releaseOK); } inline static void *************** *** 153,158 **** --- 155,161 ---- for (id = 0, lock = LWLockArray; id < numLocks; id++, lock++) { SpinLockInit(&lock->mutex); + lock->releaseOK = true; lock->exclusive = 0; lock->shared = 0; lock->head = NULL; *************** *** 195,201 **** LWLockAcquire(LWLockId lockid, LWLockMode mode) { volatile LWLock *lock = LWLockArray + lockid; ! bool mustwait; PRINT_LWDEBUG("LWLockAcquire", lockid, lock); --- 198,206 ---- LWLockAcquire(LWLockId lockid, LWLockMode mode) { volatile LWLock *lock = LWLockArray + lockid; ! PROC *proc = MyProc; ! bool retry = false; ! int extraWaits = 0; PRINT_LWDEBUG("LWLockAcquire", lockid, lock); *************** *** 206,248 **** */ HOLD_INTERRUPTS(); ! /* Acquire mutex. Time spent holding mutex should be short! */ ! SpinLockAcquire_NoHoldoff(&lock->mutex); ! ! /* If I can get the lock, do so quickly. */ ! if (mode == LW_EXCLUSIVE) { ! if (lock->exclusive == 0 && lock->shared == 0) { ! lock->exclusive++; ! mustwait = false; } else - mustwait = true; - } - else - { - /* - * If there is someone waiting (presumably for exclusive access), - * queue up behind him even though I could get the lock. This - * prevents a stream of read locks from starving a writer. - */ - if (lock->exclusive == 0 && lock->head == NULL) { ! lock->shared++; ! mustwait = false; } - else - mustwait = true; - } ! if (mustwait) ! { ! /* Add myself to wait queue */ ! PROC *proc = MyProc; ! int extraWaits = 0; /* * If we don't have a PROC structure, there's no way to wait. This * should never occur, since MyProc should only be null during * shared memory initialization. --- 211,271 ---- */ HOLD_INTERRUPTS(); ! /* ! * Loop here to try to acquire lock after each time we are signaled ! * by LWLockRelease. ! * ! * NOTE: it might seem better to have LWLockRelease actually grant us ! * the lock, rather than retrying and possibly having to go back to ! * sleep. But in practice that is no good because it means a process ! * swap for every lock acquisition when two or more processes are ! * contending for the same lock. Since LWLocks are normally used to ! * protect not-very-long sections of computation, a process needs to ! * be able to acquire and release the same lock many times during a ! * single process dispatch cycle, even in the presence of contention. ! * The efficiency of being able to do that outweighs the inefficiency of ! * sometimes wasting a dispatch cycle because the lock is not free when a ! * released waiter gets to run. See pgsql-hackers archives for 29-Dec-01. ! */ ! for (;;) { ! bool mustwait; ! ! /* Acquire mutex. Time spent holding mutex should be short! */ ! SpinLockAcquire_NoHoldoff(&lock->mutex); ! ! /* If retrying, allow LWLockRelease to release waiters again */ ! if (retry) ! lock->releaseOK = true; ! ! /* If I can get the lock, do so quickly. */ ! if (mode == LW_EXCLUSIVE) { ! if (lock->exclusive == 0 && lock->shared == 0) ! { ! lock->exclusive++; ! mustwait = false; ! } ! else ! mustwait = true; } else { ! if (lock->exclusive == 0) ! { ! lock->shared++; ! mustwait = false; ! } ! else ! mustwait = true; } ! if (!mustwait) ! break; /* got the lock */ /* + * Add myself to wait queue. + * * If we don't have a PROC structure, there's no way to wait. This * should never occur, since MyProc should only be null during * shared memory initialization. *************** *** 267,275 **** * * Since we share the process wait semaphore with the regular lock * manager and ProcWaitForSignal, and we may need to acquire an ! * LWLock while one of those is pending, it is possible that we ! * get awakened for a reason other than being granted the LWLock. ! * If so, loop back and wait again. Once we've gotten the lock, * re-increment the sema by the number of additional signals * received, so that the lock manager or signal manager will see * the received signal when it next waits. --- 290,298 ---- * * Since we share the process wait semaphore with the regular lock * manager and ProcWaitForSignal, and we may need to acquire an ! * LWLock while one of those is pending, it is possible that we get ! * awakened for a reason other than being signaled by LWLockRelease. ! * If so, loop back and wait again. Once we've gotten the LWLock, * re-increment the sema by the number of additional signals * received, so that the lock manager or signal manager will see * the received signal when it next waits. *************** *** 287,309 **** LOG_LWDEBUG("LWLockAcquire", lockid, "awakened"); ! /* ! * The awakener already updated the lock struct's state, so we ! * don't need to do anything more to it. Just need to fix the ! * semaphore count. ! */ ! while (extraWaits-- > 0) ! IpcSemaphoreUnlock(proc->sem.semId, proc->sem.semNum); ! } ! else ! { ! /* Got the lock without waiting */ ! SpinLockRelease_NoHoldoff(&lock->mutex); } /* Add lock to list of locks held by this backend */ Assert(num_held_lwlocks < MAX_SIMUL_LWLOCKS); held_lwlocks[num_held_lwlocks++] = lockid; } /* --- 310,331 ---- LOG_LWDEBUG("LWLockAcquire", lockid, "awakened"); ! /* Now loop back and try to acquire lock again. */ ! retry = true; } + /* We are done updating shared state of the lock itself. */ + SpinLockRelease_NoHoldoff(&lock->mutex); + /* Add lock to list of locks held by this backend */ Assert(num_held_lwlocks < MAX_SIMUL_LWLOCKS); held_lwlocks[num_held_lwlocks++] = lockid; + + /* + * Fix the process wait semaphore's count for any absorbed wakeups. + */ + while (extraWaits-- > 0) + IpcSemaphoreUnlock(proc->sem.semId, proc->sem.semNum); } /* *************** *** 344,355 **** } else { ! /* ! * If there is someone waiting (presumably for exclusive access), ! * queue up behind him even though I could get the lock. This ! * prevents a stream of read locks from starving a writer. ! */ ! if (lock->exclusive == 0 && lock->head == NULL) { lock->shared++; mustwait = false; --- 366,372 ---- } else { ! if (lock->exclusive == 0) { lock->shared++; mustwait = false; *************** *** 419,451 **** /* * See if I need to awaken any waiters. If I released a non-last ! * shared hold, there cannot be anything to do. */ head = lock->head; if (head != NULL) { ! if (lock->exclusive == 0 && lock->shared == 0) { /* ! * Remove the to-be-awakened PROCs from the queue, and update ! * the lock state to show them as holding the lock. */ proc = head; ! if (proc->lwExclusive) ! lock->exclusive++; ! else { - lock->shared++; while (proc->lwWaitLink != NULL && !proc->lwWaitLink->lwExclusive) { proc = proc->lwWaitLink; - lock->shared++; } } /* proc is now the last PROC to be released */ lock->head = proc->lwWaitLink; proc->lwWaitLink = NULL; } else { --- 436,469 ---- /* * See if I need to awaken any waiters. If I released a non-last ! * shared hold, there cannot be anything to do. Also, do not awaken ! * any waiters if someone has already awakened waiters that haven't ! * yet acquired the lock. */ head = lock->head; if (head != NULL) { ! if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK) { /* ! * Remove the to-be-awakened PROCs from the queue. If the ! * front waiter wants exclusive lock, awaken him only. ! * Otherwise awaken as many waiters as want shared access. */ proc = head; ! if (!proc->lwExclusive) { while (proc->lwWaitLink != NULL && !proc->lwWaitLink->lwExclusive) { proc = proc->lwWaitLink; } } /* proc is now the last PROC to be released */ lock->head = proc->lwWaitLink; proc->lwWaitLink = NULL; + /* prevent additional wakeups until retryer gets to run */ + lock->releaseOK = false; } else {