Skip site navigation (1) Skip section navigation (2)

Re: Fix spinlock usage in UnpinBuffer()

From: Qingqing Zhou <zhouqq(at)cs(dot)toronto(dot)edu>
To: Tom Lane <tgl(at)sss(dot)pgh(dot)pa(dot)us>
Subject: Re: Fix spinlock usage in UnpinBuffer()
Date: 2005-12-29 07:05:35
Message-ID: Pine.LNX.4.58.0512290153070.30003@eon.cs (view raw or flat)
Thread:
Lists: pgsql-patches

On Wed, 28 Dec 2005, Tom Lane wrote:
> "Qingqing Zhou" <zhouqq(at)cs(dot)toronto(dot)edu> writes:
> > I agree on this. But before changing it, we need to inspect those spinlocks
> > one by one to making sure two things (1) if there is out-of-line-call, make
> > sure no CHECK_FOR_INTERRUPTS(); (2) ImmediateInterruptsOK is false (99% sure
> > now).
>
> I'm sure of those things already ... but feel free to look for yourself.
>

Patch is ready.

---
Remove SpinLockAcquire_NoHoldoff() and related. Now SpinLockAcquire() will
not holdoff cancle/die interrupts. This will give cleaner and clearer
useage of spinlock and also save a few cycles.

The difference between the original SpinLockAcquire() and the original
SpinLockAcquire_NoHoldoff() is that the former will HOLD_INTERRUPTS()
before trying to acquire the lock. But in fact we can safely allow
Cancel/die interrupts the spinlock providing we follow the coding rule:
don't use any out-of-line calls that might invoke CHECK_FOR_INTERRUPTS()
while holding the locks. This is reasonable since we also require that the
spinlock holding time must be short. Thus, the "ImmediateInterruptsOK"
will help us to leave the processing to the proper time.

Per discussion with Tom.
---

> > Our tests indicates that BSD version is better ... but it is just a
> > home-brew test.
>
> Better than what is the operative question.
>

Better than the default libc qsort (except BSD machines). That is, if we
agreed, we will link against to the port/qsort.c just as Solaris currently
does. But if we leave as it is, I won't argue hard for it.

Regards,
Qingqing

---

Index: src/backend/access/transam/xlog.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/access/transam/xlog.c,v
retrieving revision 1.224
diff -c -r1.224 xlog.c
*** src/backend/access/transam/xlog.c	28 Dec 2005 23:22:50 -0000	1.224
--- src/backend/access/transam/xlog.c	29 Dec 2005 06:41:24 -0000
***************
*** 695,704 ****
  		/* use volatile pointer to prevent code rearrangement */
  		volatile XLogCtlData *xlogctl = XLogCtl;

! 		SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);
  		LogwrtRqst = xlogctl->LogwrtRqst;
  		LogwrtResult = xlogctl->LogwrtResult;
! 		SpinLockRelease_NoHoldoff(&xlogctl->info_lck);
  	}

  	/*
--- 695,704 ----
  		/* use volatile pointer to prevent code rearrangement */
  		volatile XLogCtlData *xlogctl = XLogCtl;

! 		SpinLockAcquire(&xlogctl->info_lck);
  		LogwrtRqst = xlogctl->LogwrtRqst;
  		LogwrtResult = xlogctl->LogwrtResult;
! 		SpinLockRelease(&xlogctl->info_lck);
  	}

  	/*
***************
*** 940,952 ****
  		/* use volatile pointer to prevent code rearrangement */
  		volatile XLogCtlData *xlogctl = XLogCtl;

! 		SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);
  		/* advance global request to include new block(s) */
  		if (XLByteLT(xlogctl->LogwrtRqst.Write, WriteRqst))
  			xlogctl->LogwrtRqst.Write = WriteRqst;
  		/* update local result copy while I have the chance */
  		LogwrtResult = xlogctl->LogwrtResult;
! 		SpinLockRelease_NoHoldoff(&xlogctl->info_lck);
  	}

  	ProcLastRecEnd = RecPtr;
--- 940,952 ----
  		/* use volatile pointer to prevent code rearrangement */
  		volatile XLogCtlData *xlogctl = XLogCtl;

! 		SpinLockAcquire(&xlogctl->info_lck);
  		/* advance global request to include new block(s) */
  		if (XLByteLT(xlogctl->LogwrtRqst.Write, WriteRqst))
  			xlogctl->LogwrtRqst.Write = WriteRqst;
  		/* update local result copy while I have the chance */
  		LogwrtResult = xlogctl->LogwrtResult;
! 		SpinLockRelease(&xlogctl->info_lck);
  	}

  	ProcLastRecEnd = RecPtr;
***************
*** 1175,1185 ****
  			/* use volatile pointer to prevent code rearrangement */
  			volatile XLogCtlData *xlogctl = XLogCtl;

! 			SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);
  			if (XLByteLT(xlogctl->LogwrtRqst.Write, FinishedPageRqstPtr))
  				xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr;
  			LogwrtResult = xlogctl->LogwrtResult;
! 			SpinLockRelease_NoHoldoff(&xlogctl->info_lck);
  		}

  		update_needed = false;	/* Did the shared-request update */
--- 1175,1185 ----
  			/* use volatile pointer to prevent code rearrangement */
  			volatile XLogCtlData *xlogctl = XLogCtl;

! 			SpinLockAcquire(&xlogctl->info_lck);
  			if (XLByteLT(xlogctl->LogwrtRqst.Write, FinishedPageRqstPtr))
  				xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr;
  			LogwrtResult = xlogctl->LogwrtResult;
! 			SpinLockRelease(&xlogctl->info_lck);
  		}

  		update_needed = false;	/* Did the shared-request update */
***************
*** 1560,1572 ****
  		/* use volatile pointer to prevent code rearrangement */
  		volatile XLogCtlData *xlogctl = XLogCtl;

! 		SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);
  		xlogctl->LogwrtResult = LogwrtResult;
  		if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write))
  			xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
  		if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush))
  			xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
! 		SpinLockRelease_NoHoldoff(&xlogctl->info_lck);
  	}

  	Write->LogwrtResult = LogwrtResult;
--- 1560,1572 ----
  		/* use volatile pointer to prevent code rearrangement */
  		volatile XLogCtlData *xlogctl = XLogCtl;

! 		SpinLockAcquire(&xlogctl->info_lck);
  		xlogctl->LogwrtResult = LogwrtResult;
  		if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write))
  			xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
  		if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush))
  			xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
! 		SpinLockRelease(&xlogctl->info_lck);
  	}

  	Write->LogwrtResult = LogwrtResult;
***************
*** 1618,1628 ****
  		/* use volatile pointer to prevent code rearrangement */
  		volatile XLogCtlData *xlogctl = XLogCtl;

! 		SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);
  		if (XLByteLT(WriteRqstPtr, xlogctl->LogwrtRqst.Write))
  			WriteRqstPtr = xlogctl->LogwrtRqst.Write;
  		LogwrtResult = xlogctl->LogwrtResult;
! 		SpinLockRelease_NoHoldoff(&xlogctl->info_lck);
  	}

  	/* done already? */
--- 1618,1628 ----
  		/* use volatile pointer to prevent code rearrangement */
  		volatile XLogCtlData *xlogctl = XLogCtl;

! 		SpinLockAcquire(&xlogctl->info_lck);
  		if (XLByteLT(WriteRqstPtr, xlogctl->LogwrtRqst.Write))
  			WriteRqstPtr = xlogctl->LogwrtRqst.Write;
  		LogwrtResult = xlogctl->LogwrtResult;
! 		SpinLockRelease(&xlogctl->info_lck);
  	}

  	/* done already? */
***************
*** 4984,4993 ****
  	/* use volatile pointer to prevent code rearrangement */
  	volatile XLogCtlData *xlogctl = XLogCtl;

! 	SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);
  	Assert(XLByteLE(RedoRecPtr, xlogctl->Insert.RedoRecPtr));
  	RedoRecPtr = xlogctl->Insert.RedoRecPtr;
! 	SpinLockRelease_NoHoldoff(&xlogctl->info_lck);

  	return RedoRecPtr;
  }
--- 4984,4993 ----
  	/* use volatile pointer to prevent code rearrangement */
  	volatile XLogCtlData *xlogctl = XLogCtl;

! 	SpinLockAcquire(&xlogctl->info_lck);
  	Assert(XLByteLE(RedoRecPtr, xlogctl->Insert.RedoRecPtr));
  	RedoRecPtr = xlogctl->Insert.RedoRecPtr;
! 	SpinLockRelease(&xlogctl->info_lck);

  	return RedoRecPtr;
  }
***************
*** 5165,5173 ****
  		/* use volatile pointer to prevent code rearrangement */
  		volatile XLogCtlData *xlogctl = XLogCtl;

! 		SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);
  		RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
! 		SpinLockRelease_NoHoldoff(&xlogctl->info_lck);
  	}

  	/*
--- 5165,5173 ----
  		/* use volatile pointer to prevent code rearrangement */
  		volatile XLogCtlData *xlogctl = XLogCtl;

! 		SpinLockAcquire(&xlogctl->info_lck);
  		RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
! 		SpinLockRelease(&xlogctl->info_lck);
  	}

  	/*
Index: src/backend/storage/buffer/bufmgr.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v
retrieving revision 1.200
diff -c -r1.200 bufmgr.c
*** src/backend/storage/buffer/bufmgr.c	22 Nov 2005 18:17:19 -0000	1.200
--- src/backend/storage/buffer/bufmgr.c	29 Dec 2005 06:41:25 -0000
***************
*** 442,448 ****
  		/*
  		 * Need to lock the buffer header too in order to change its tag.
  		 */
! 		LockBufHdr_NoHoldoff(buf);

  		/*
  		 * Somebody could have pinned or re-dirtied the buffer while we were
--- 442,448 ----
  		/*
  		 * Need to lock the buffer header too in order to change its tag.
  		 */
! 		LockBufHdr(buf);

  		/*
  		 * Somebody could have pinned or re-dirtied the buffer while we were
***************
*** 453,459 ****
  		if (buf->refcount == 1 && !(buf->flags & BM_DIRTY))
  			break;

! 		UnlockBufHdr_NoHoldoff(buf);
  		BufTableDelete(&newTag);
  		LWLockRelease(BufMappingLock);
  		UnpinBuffer(buf, true, false /* evidently recently used */ );
--- 453,459 ----
  		if (buf->refcount == 1 && !(buf->flags & BM_DIRTY))
  			break;

! 		UnlockBufHdr(buf);
  		BufTableDelete(&newTag);
  		LWLockRelease(BufMappingLock);
  		UnpinBuffer(buf, true, false /* evidently recently used */ );
***************
*** 473,479 ****
  	buf->flags |= BM_TAG_VALID;
  	buf->usage_count = 0;

! 	UnlockBufHdr_NoHoldoff(buf);

  	if (oldFlags & BM_TAG_VALID)
  		BufTableDelete(&oldTag);
--- 473,479 ----
  	buf->flags |= BM_TAG_VALID;
  	buf->usage_count = 0;

! 	UnlockBufHdr(buf);

  	if (oldFlags & BM_TAG_VALID)
  		BufTableDelete(&oldTag);
***************
*** 529,541 ****
  	 */
  	LWLockAcquire(BufMappingLock, LW_EXCLUSIVE);

! 	/* Re-lock the buffer header (NoHoldoff since we have an LWLock) */
! 	LockBufHdr_NoHoldoff(buf);

  	/* If it's changed while we were waiting for lock, do nothing */
  	if (!BUFFERTAGS_EQUAL(buf->tag, oldTag))
  	{
! 		UnlockBufHdr_NoHoldoff(buf);
  		LWLockRelease(BufMappingLock);
  		return;
  	}
--- 529,541 ----
  	 */
  	LWLockAcquire(BufMappingLock, LW_EXCLUSIVE);

! 	/* Re-lock the buffer header */
! 	LockBufHdr(buf);

  	/* If it's changed while we were waiting for lock, do nothing */
  	if (!BUFFERTAGS_EQUAL(buf->tag, oldTag))
  	{
! 		UnlockBufHdr(buf);
  		LWLockRelease(BufMappingLock);
  		return;
  	}
***************
*** 551,557 ****
  	 */
  	if (buf->refcount != 0)
  	{
! 		UnlockBufHdr_NoHoldoff(buf);
  		LWLockRelease(BufMappingLock);
  		/* safety check: should definitely not be our *own* pin */
  		if (PrivateRefCount[buf->buf_id] != 0)
--- 551,557 ----
  	 */
  	if (buf->refcount != 0)
  	{
! 		UnlockBufHdr(buf);
  		LWLockRelease(BufMappingLock);
  		/* safety check: should definitely not be our *own* pin */
  		if (PrivateRefCount[buf->buf_id] != 0)
***************
*** 569,575 ****
  	buf->flags = 0;
  	buf->usage_count = 0;

! 	UnlockBufHdr_NoHoldoff(buf);

  	/*
  	 * Remove the buffer from the lookup hashtable, if it was in there.
--- 569,575 ----
  	buf->flags = 0;
  	buf->usage_count = 0;

! 	UnlockBufHdr(buf);

  	/*
  	 * Remove the buffer from the lookup hashtable, if it was in there.
***************
*** 729,743 ****

  	if (PrivateRefCount[b] == 0)
  	{
! 		/*
! 		 * Use NoHoldoff here because we don't want the unlock to be a
! 		 * potential place to honor a QueryCancel request. (The caller should
! 		 * be holding off interrupts anyway.)
! 		 */
! 		LockBufHdr_NoHoldoff(buf);
  		buf->refcount++;
  		result = (buf->flags & BM_VALID) != 0;
! 		UnlockBufHdr_NoHoldoff(buf);
  	}
  	else
  	{
--- 729,738 ----

  	if (PrivateRefCount[b] == 0)
  	{
! 		LockBufHdr(buf);
  		buf->refcount++;
  		result = (buf->flags & BM_VALID) != 0;
! 		UnlockBufHdr(buf);
  	}
  	else
  	{
***************
*** 766,779 ****

  	if (PrivateRefCount[b] == 0)
  		buf->refcount++;
! 	/* NoHoldoff since we mustn't accept cancel interrupt here */
! 	UnlockBufHdr_NoHoldoff(buf);
  	PrivateRefCount[b]++;
  	Assert(PrivateRefCount[b] > 0);
  	ResourceOwnerRememberBuffer(CurrentResourceOwner,
  								BufferDescriptorGetBuffer(buf));
- 	/* Now we can accept cancel */
- 	RESUME_INTERRUPTS();
  }

  /*
--- 761,771 ----

  	if (PrivateRefCount[b] == 0)
  		buf->refcount++;
! 	UnlockBufHdr(buf);
  	PrivateRefCount[b]++;
  	Assert(PrivateRefCount[b] > 0);
  	ResourceOwnerRememberBuffer(CurrentResourceOwner,
  								BufferDescriptorGetBuffer(buf));
  }

  /*
***************
*** 811,818 ****
  		Assert(!LWLockHeldByMe(buf->content_lock));
  		Assert(!LWLockHeldByMe(buf->io_in_progress_lock));

! 		/* NoHoldoff ensures we don't lose control before sending signal */
! 		LockBufHdr_NoHoldoff(buf);

  		/* Decrement the shared reference count */
  		Assert(buf->refcount > 0);
--- 803,809 ----
  		Assert(!LWLockHeldByMe(buf->content_lock));
  		Assert(!LWLockHeldByMe(buf->io_in_progress_lock));

! 		LockBufHdr(buf);

  		/* Decrement the shared reference count */
  		Assert(buf->refcount > 0);
***************
*** 841,851 ****
  			int			wait_backend_pid = buf->wait_backend_pid;

  			buf->flags &= ~BM_PIN_COUNT_WAITER;
! 			UnlockBufHdr_NoHoldoff(buf);
  			ProcSendSignal(wait_backend_pid);
  		}
  		else
! 			UnlockBufHdr_NoHoldoff(buf);

  		/*
  		 * If VACUUM is releasing an otherwise-unused buffer, send it to the
--- 832,842 ----
  			int			wait_backend_pid = buf->wait_backend_pid;

  			buf->flags &= ~BM_PIN_COUNT_WAITER;
! 			UnlockBufHdr(buf);
  			ProcSendSignal(wait_backend_pid);
  		}
  		else
! 			UnlockBufHdr(buf);

  		/*
  		 * If VACUUM is releasing an otherwise-unused buffer, send it to the
***************
*** 1300,1308 ****
  	 */

  	/* To check if block content changes while flushing. - vadim 01/17/97 */
! 	LockBufHdr_NoHoldoff(buf);
  	buf->flags &= ~BM_JUST_DIRTIED;
! 	UnlockBufHdr_NoHoldoff(buf);

  	smgrwrite(reln,
  			  buf->tag.blockNum,
--- 1291,1299 ----
  	 */

  	/* To check if block content changes while flushing. - vadim 01/17/97 */
! 	LockBufHdr(buf);
  	buf->flags &= ~BM_JUST_DIRTIED;
! 	UnlockBufHdr(buf);

  	smgrwrite(reln,
  			  buf->tag.blockNum,
***************
*** 1691,1699 ****

  	if (buf)
  	{
! 		HOLD_INTERRUPTS();		/* don't want to die() partway through... */
!
! 		LockBufHdr_NoHoldoff(buf);

  		/*
  		 * Don't complain if flag bit not set; it could have been reset but we
--- 1682,1688 ----

  	if (buf)
  	{
! 		LockBufHdr(buf);

  		/*
  		 * Don't complain if flag bit not set; it could have been reset but we
***************
*** 1703,1715 ****
  			buf->wait_backend_pid == MyProcPid)
  			buf->flags &= ~BM_PIN_COUNT_WAITER;

! 		UnlockBufHdr_NoHoldoff(buf);

  		ProcCancelWaitForSignal();

  		PinCountWaitBuf = NULL;
-
- 		RESUME_INTERRUPTS();
  	}
  }

--- 1692,1702 ----
  			buf->wait_backend_pid == MyProcPid)
  			buf->flags &= ~BM_PIN_COUNT_WAITER;

! 		UnlockBufHdr(buf);

  		ProcCancelWaitForSignal();

  		PinCountWaitBuf = NULL;
  	}
  }

***************
*** 1741,1749 ****
  		 * that it's critical to set dirty bit *before* logging changes with
  		 * XLogInsert() - see comments in SyncOneBuffer().
  		 */
! 		LockBufHdr_NoHoldoff(buf);
  		buf->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
! 		UnlockBufHdr_NoHoldoff(buf);
  	}
  	else
  		elog(ERROR, "unrecognized buffer lock mode: %d", mode);
--- 1728,1736 ----
  		 * that it's critical to set dirty bit *before* logging changes with
  		 * XLogInsert() - see comments in SyncOneBuffer().
  		 */
! 		LockBufHdr(buf);
  		buf->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
! 		UnlockBufHdr(buf);
  	}
  	else
  		elog(ERROR, "unrecognized buffer lock mode: %d", mode);
***************
*** 1773,1781 ****
  		 * that it's critical to set dirty bit *before* logging changes with
  		 * XLogInsert() - see comments in SyncOneBuffer().
  		 */
! 		LockBufHdr_NoHoldoff(buf);
  		buf->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
! 		UnlockBufHdr_NoHoldoff(buf);

  		return true;
  	}
--- 1760,1768 ----
  		 * that it's critical to set dirty bit *before* logging changes with
  		 * XLogInsert() - see comments in SyncOneBuffer().
  		 */
! 		LockBufHdr(buf);
  		buf->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
! 		UnlockBufHdr(buf);

  		return true;
  	}
***************
*** 1827,1851 ****
  	{
  		/* Try to acquire lock */
  		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
! 		LockBufHdr_NoHoldoff(bufHdr);
  		Assert(bufHdr->refcount > 0);
  		if (bufHdr->refcount == 1)
  		{
  			/* Successfully acquired exclusive lock with pincount 1 */
! 			UnlockBufHdr_NoHoldoff(bufHdr);
  			return;
  		}
  		/* Failed, so mark myself as waiting for pincount 1 */
  		if (bufHdr->flags & BM_PIN_COUNT_WAITER)
  		{
! 			UnlockBufHdr_NoHoldoff(bufHdr);
  			LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
  			elog(ERROR, "multiple backends attempting to wait for pincount 1");
  		}
  		bufHdr->wait_backend_pid = MyProcPid;
  		bufHdr->flags |= BM_PIN_COUNT_WAITER;
  		PinCountWaitBuf = bufHdr;
! 		UnlockBufHdr_NoHoldoff(bufHdr);
  		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
  		/* Wait to be signaled by UnpinBuffer() */
  		ProcWaitForSignal();
--- 1814,1838 ----
  	{
  		/* Try to acquire lock */
  		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
! 		LockBufHdr(bufHdr);
  		Assert(bufHdr->refcount > 0);
  		if (bufHdr->refcount == 1)
  		{
  			/* Successfully acquired exclusive lock with pincount 1 */
! 			UnlockBufHdr(bufHdr);
  			return;
  		}
  		/* Failed, so mark myself as waiting for pincount 1 */
  		if (bufHdr->flags & BM_PIN_COUNT_WAITER)
  		{
! 			UnlockBufHdr(bufHdr);
  			LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
  			elog(ERROR, "multiple backends attempting to wait for pincount 1");
  		}
  		bufHdr->wait_backend_pid = MyProcPid;
  		bufHdr->flags |= BM_PIN_COUNT_WAITER;
  		PinCountWaitBuf = bufHdr;
! 		UnlockBufHdr(bufHdr);
  		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
  		/* Wait to be signaled by UnpinBuffer() */
  		ProcWaitForSignal();
***************
*** 1926,1933 ****
  		 */
  		LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE);

! 		/* NoHoldoff is OK since we now have an LWLock */
! 		LockBufHdr_NoHoldoff(buf);

  		if (!(buf->flags & BM_IO_IN_PROGRESS))
  			break;
--- 1913,1919 ----
  		 */
  		LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE);

! 		LockBufHdr(buf);

  		if (!(buf->flags & BM_IO_IN_PROGRESS))
  			break;
***************
*** 1938,1944 ****
  		 * an error (see AbortBufferIO).  If that's the case, we must wait for
  		 * him to get unwedged.
  		 */
! 		UnlockBufHdr_NoHoldoff(buf);
  		LWLockRelease(buf->io_in_progress_lock);
  		WaitIO(buf);
  	}
--- 1924,1930 ----
  		 * an error (see AbortBufferIO).  If that's the case, we must wait for
  		 * him to get unwedged.
  		 */
! 		UnlockBufHdr(buf);
  		LWLockRelease(buf->io_in_progress_lock);
  		WaitIO(buf);
  	}
***************
*** 1948,1961 ****
  	if (forInput ? (buf->flags & BM_VALID) : !(buf->flags & BM_DIRTY))
  	{
  		/* someone else already did the I/O */
! 		UnlockBufHdr_NoHoldoff(buf);
  		LWLockRelease(buf->io_in_progress_lock);
  		return false;
  	}

  	buf->flags |= BM_IO_IN_PROGRESS;

! 	UnlockBufHdr_NoHoldoff(buf);

  	InProgressBuf = buf;
  	IsForInput = forInput;
--- 1934,1947 ----
  	if (forInput ? (buf->flags & BM_VALID) : !(buf->flags & BM_DIRTY))
  	{
  		/* someone else already did the I/O */
! 		UnlockBufHdr(buf);
  		LWLockRelease(buf->io_in_progress_lock);
  		return false;
  	}

  	buf->flags |= BM_IO_IN_PROGRESS;

! 	UnlockBufHdr(buf);

  	InProgressBuf = buf;
  	IsForInput = forInput;
***************
*** 1986,1993 ****
  {
  	Assert(buf == InProgressBuf);

! 	/* NoHoldoff is OK since we must have an LWLock */
! 	LockBufHdr_NoHoldoff(buf);

  	Assert(buf->flags & BM_IO_IN_PROGRESS);
  	buf->flags &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR);
--- 1972,1978 ----
  {
  	Assert(buf == InProgressBuf);

! 	LockBufHdr(buf);

  	Assert(buf->flags & BM_IO_IN_PROGRESS);
  	buf->flags &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR);
***************
*** 1995,2001 ****
  		buf->flags &= ~BM_DIRTY;
  	buf->flags |= set_flag_bits;

! 	UnlockBufHdr_NoHoldoff(buf);

  	InProgressBuf = NULL;

--- 1980,1986 ----
  		buf->flags &= ~BM_DIRTY;
  	buf->flags |= set_flag_bits;

! 	UnlockBufHdr(buf);

  	InProgressBuf = NULL;

***************
*** 2026,2040 ****
  		 */
  		LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE);

! 		/* NoHoldoff is OK since we now have an LWLock */
! 		LockBufHdr_NoHoldoff(buf);
  		Assert(buf->flags & BM_IO_IN_PROGRESS);
  		if (IsForInput)
  		{
  			Assert(!(buf->flags & BM_DIRTY));
  			/* We'd better not think buffer is valid yet */
  			Assert(!(buf->flags & BM_VALID));
! 			UnlockBufHdr_NoHoldoff(buf);
  		}
  		else
  		{
--- 2011,2024 ----
  		 */
  		LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE);

! 		LockBufHdr(buf);
  		Assert(buf->flags & BM_IO_IN_PROGRESS);
  		if (IsForInput)
  		{
  			Assert(!(buf->flags & BM_DIRTY));
  			/* We'd better not think buffer is valid yet */
  			Assert(!(buf->flags & BM_VALID));
! 			UnlockBufHdr(buf);
  		}
  		else
  		{
***************
*** 2042,2048 ****

  			sv_flags = buf->flags;
  			Assert(sv_flags & BM_DIRTY);
! 			UnlockBufHdr_NoHoldoff(buf);
  			/* Issue notice if this is not the first failure... */
  			if (sv_flags & BM_IO_ERROR)
  			{
--- 2026,2032 ----

  			sv_flags = buf->flags;
  			Assert(sv_flags & BM_DIRTY);
! 			UnlockBufHdr(buf);
  			/* Issue notice if this is not the first failure... */
  			if (sv_flags & BM_IO_ERROR)
  			{
Index: src/backend/storage/lmgr/lwlock.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/storage/lmgr/lwlock.c,v
retrieving revision 1.36
diff -c -r1.36 lwlock.c
*** src/backend/storage/lmgr/lwlock.c	11 Dec 2005 21:02:18 -0000	1.36
--- src/backend/storage/lmgr/lwlock.c	29 Dec 2005 06:41:25 -0000
***************
*** 301,307 ****
  		bool		mustwait;

  		/* Acquire mutex.  Time spent holding mutex should be short! */
! 		SpinLockAcquire_NoHoldoff(&lock->mutex);

  		/* If retrying, allow LWLockRelease to release waiters again */
  		if (retry)
--- 301,307 ----
  		bool		mustwait;

  		/* Acquire mutex.  Time spent holding mutex should be short! */
! 		SpinLockAcquire(&lock->mutex);

  		/* If retrying, allow LWLockRelease to release waiters again */
  		if (retry)
***************
*** 352,358 ****
  		lock->tail = proc;

  		/* Can release the mutex now */
! 		SpinLockRelease_NoHoldoff(&lock->mutex);

  		/*
  		 * Wait until awakened.
--- 352,358 ----
  		lock->tail = proc;

  		/* Can release the mutex now */
! 		SpinLockRelease(&lock->mutex);

  		/*
  		 * Wait until awakened.
***************
*** 384,390 ****
  	}

  	/* We are done updating shared state of the lock itself. */
! 	SpinLockRelease_NoHoldoff(&lock->mutex);

  	/* Add lock to list of locks held by this backend */
  	held_lwlocks[num_held_lwlocks++] = lockid;
--- 384,390 ----
  	}

  	/* We are done updating shared state of the lock itself. */
! 	SpinLockRelease(&lock->mutex);

  	/* Add lock to list of locks held by this backend */
  	held_lwlocks[num_held_lwlocks++] = lockid;
***************
*** 423,429 ****
  	HOLD_INTERRUPTS();

  	/* Acquire mutex.  Time spent holding mutex should be short! */
! 	SpinLockAcquire_NoHoldoff(&lock->mutex);

  	/* If I can get the lock, do so quickly. */
  	if (mode == LW_EXCLUSIVE)
--- 423,429 ----
  	HOLD_INTERRUPTS();

  	/* Acquire mutex.  Time spent holding mutex should be short! */
! 	SpinLockAcquire(&lock->mutex);

  	/* If I can get the lock, do so quickly. */
  	if (mode == LW_EXCLUSIVE)
***************
*** 448,454 ****
  	}

  	/* We are done updating shared state of the lock itself. */
! 	SpinLockRelease_NoHoldoff(&lock->mutex);

  	if (mustwait)
  	{
--- 448,454 ----
  	}

  	/* We are done updating shared state of the lock itself. */
! 	SpinLockRelease(&lock->mutex);

  	if (mustwait)
  	{
***************
*** 494,500 ****
  		held_lwlocks[i] = held_lwlocks[i + 1];

  	/* Acquire mutex.  Time spent holding mutex should be short! */
! 	SpinLockAcquire_NoHoldoff(&lock->mutex);

  	/* Release my hold on lock */
  	if (lock->exclusive > 0)
--- 494,500 ----
  		held_lwlocks[i] = held_lwlocks[i + 1];

  	/* Acquire mutex.  Time spent holding mutex should be short! */
! 	SpinLockAcquire(&lock->mutex);

  	/* Release my hold on lock */
  	if (lock->exclusive > 0)
***************
*** 542,548 ****
  	}

  	/* We are done updating shared state of the lock itself. */
! 	SpinLockRelease_NoHoldoff(&lock->mutex);

  	/*
  	 * Awaken any waiters I removed from the queue.
--- 542,548 ----
  	}

  	/* We are done updating shared state of the lock itself. */
! 	SpinLockRelease(&lock->mutex);

  	/*
  	 * Awaken any waiters I removed from the queue.
Index: src/include/storage/buf_internals.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/include/storage/buf_internals.h,v
retrieving revision 1.83
diff -c -r1.83 buf_internals.h
*** src/include/storage/buf_internals.h	22 Nov 2005 18:17:31 -0000	1.83
--- src/include/storage/buf_internals.h	29 Dec 2005 06:41:26 -0000
***************
*** 138,147 ****
  #define FREENEXT_NOT_IN_LIST	(-2)

  /*
!  * Macros for acquiring/releasing a buffer header's spinlock.  The
!  * NoHoldoff cases may be used when we know that we hold some LWLock
!  * and therefore interrupts are already held off.  Do not apply these
!  * to local buffers!
   *
   * Note: as a general coding rule, if you are using these then you probably
   * want to be using a volatile-qualified pointer to the buffer header, to
--- 138,145 ----
  #define FREENEXT_NOT_IN_LIST	(-2)

  /*
!  * Macros for acquiring/releasing a buffer header's spinlock. Do not apply
!  * these to local buffers!
   *
   * Note: as a general coding rule, if you are using these then you probably
   * want to be using a volatile-qualified pointer to the buffer header, to
***************
*** 152,162 ****
  	SpinLockAcquire(&(bufHdr)->buf_hdr_lock)
  #define UnlockBufHdr(bufHdr)  \
  	SpinLockRelease(&(bufHdr)->buf_hdr_lock)
- #define LockBufHdr_NoHoldoff(bufHdr)  \
- 	SpinLockAcquire_NoHoldoff(&(bufHdr)->buf_hdr_lock)
- #define UnlockBufHdr_NoHoldoff(bufHdr)	\
- 	SpinLockRelease_NoHoldoff(&(bufHdr)->buf_hdr_lock)
-

  /* in buf_init.c */
  extern BufferDesc *BufferDescriptors;
--- 150,155 ----
Index: src/include/storage/spin.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/include/storage/spin.h,v
retrieving revision 1.26
diff -c -r1.26 spin.h
*** src/include/storage/spin.h	13 Oct 2005 06:17:34 -0000	1.26
--- src/include/storage/spin.h	29 Dec 2005 06:41:26 -0000
***************
*** 14,30 ****
   *		Acquire a spinlock, waiting if necessary.
   *		Time out and abort() if unable to acquire the lock in a
   *		"reasonable" amount of time --- typically ~ 1 minute.
-  *		Cancel/die interrupts are held off until the lock is released.
   *
   *	void SpinLockRelease(volatile slock_t *lock)
   *		Unlock a previously acquired lock.
-  *		Release the cancel/die interrupt holdoff.
-  *
-  *	void SpinLockAcquire_NoHoldoff(volatile slock_t *lock)
-  *	void SpinLockRelease_NoHoldoff(volatile slock_t *lock)
-  *		Same as above, except no interrupt holdoff processing is done.
-  *		This pair of macros may be used when there is a surrounding
-  *		interrupt holdoff.
   *
   *	bool SpinLockFree(slock_t *lock)
   *		Tests if the lock is free. Returns TRUE if free, FALSE if locked.
--- 14,22 ----
***************
*** 33,38 ****
--- 25,37 ----
   *	Callers must beware that the macro argument may be evaluated multiple
   *	times!
   *
+  *	We can safely allow Cancel/die interrupts the spinlock providing we
+  *	follow the coding rule: don't use any out-of-line calls that might
+  *	invoke CHECK_FOR_INTERRUPTS() while holding the locks. This is quite
+  *	reasonable since we also require that the spinlock holding time must
+  *	be short. Thus, the "ImmediateInterruptsOK" will help us to leave the
+  *	processing to the proper time.
+  *
   *	CAUTION: Care must be taken to ensure that loads and stores of
   *	shared memory values are not rearranged around spinlock acquire
   *	and release. This is done using the "volatile" qualifier: the C
***************
*** 63,83 ****

  #define SpinLockInit(lock)	S_INIT_LOCK(lock)

! #define SpinLockAcquire(lock) \
! 	do { \
! 		HOLD_INTERRUPTS(); \
! 		S_LOCK(lock); \
! 	} while (0)
!
! #define SpinLockAcquire_NoHoldoff(lock) S_LOCK(lock)
!
! #define SpinLockRelease(lock) \
! 	do { \
! 		S_UNLOCK(lock); \
! 		RESUME_INTERRUPTS(); \
! 	} while (0)

! #define SpinLockRelease_NoHoldoff(lock) S_UNLOCK(lock)

  #define SpinLockFree(lock)	S_LOCK_FREE(lock)

--- 62,70 ----

  #define SpinLockInit(lock)	S_INIT_LOCK(lock)

! #define SpinLockAcquire(lock) S_LOCK(lock)

! #define SpinLockRelease(lock) S_UNLOCK(lock)

  #define SpinLockFree(lock)	S_LOCK_FREE(lock)


In response to

Responses

pgsql-patches by date

Next:From: Neil ConwayDate: 2005-12-29 16:54:29
Subject: pl/python: fix ref leak on elog
Previous:From: David FetterDate: 2005-12-29 04:53:15
Subject: Re: sending mail from Postgres

Privacy Policy | About PostgreSQL
Copyright © 1996-2014 The PostgreSQL Global Development Group