ice-broker scan thread

From: Qingqing Zhou <zhouqq(at)cs(dot)toronto(dot)edu>
To: pgsql-hackers(at)postgresql(dot)org
Subject: ice-broker scan thread
Date: 2005-11-29 03:22:33
Message-ID: Pine.LNX.4.58.0511282217470.13586@josh.db
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers


I am considering add an "ice-broker scan thread" to accelerate PostgreSQL
sequential scan IO speed. The basic idea of this thread is just like the
"read-ahead" method, but the difference is this one does not read the data
into shared buffer pool directly, instead, it reads the data into file
system cache, which makes the integration easy and this is unique to
PostgreSQL.

What happens to the original sequential scan:
for (;;)
{
/*
* a physical read may happen, due to current content of
* file system cache and if the kernel is smart enough to
* understand you want to do sequential scan
*/
physical or logical read a page;
process the page;
}

What happens to the sequential scan with ice-broker:
for (;;)
{
/* since the ice-broker has read the page in already */
logical read a page with big chance;
process the page;
}

I wrote a program to simulate the sequential scan in PostgreSQL
with/without ice-broker. The results indicate this technique has the
following characters:
(1) The important factor of speedup is the how much CPU time PostgreSQL
used on each data page. If PG is fast enough, then no speedup occurs; else
a 10% to 20% speedup is expected due to my test.
(2) It uses more CPU - this is easy to understand, since it does more
work;
(3) The benefits also depends on other factors, like how smart your file
system ...

Here is a test results on my machine:
---
$#uname -a
Linux josh.db 2.4.29-1 #2 Tue Jan 25 17:03:33 EST 2005 i686 unknown
$#cat /proc/meminfo | grep MemTotal
MemTotal: 1030988 kB
$#cat /proc/cpuinfo | grep CPU
model name : Intel(R) Pentium(R) 4 CPU 2.40GHz
$#./seqscan 10 $HOME/pginstall/bin/data/base/10794/18986 50
PostgreSQL sequential scan simulator configuration:
Memory size: 943718400
CPU cost per page: 50
Scan thread read unit size: 4

With scan threads off - duration: 56862.738 ms
With scan threads on - duration: 40611.101 ms
With scan threads off - duration: 46859.207 ms
With scan threads on - duration: 38598.234 ms
With scan threads off - duration: 56919.572 ms
With scan threads on - duration: 47023.606 ms
With scan threads off - duration: 52976.825 ms
With scan threads on - duration: 43056.506 ms
With scan threads off - duration: 54292.979 ms
With scan threads on - duration: 42946.526 ms
With scan threads off - duration: 51893.590 ms
With scan threads on - duration: 42137.684 ms
With scan threads off - duration: 46552.571 ms
With scan threads on - duration: 41892.628 ms
With scan threads off - duration: 45107.800 ms
With scan threads on - duration: 38329.785 ms
With scan threads off - duration: 47527.787 ms
With scan threads on - duration: 38293.581 ms
With scan threads off - duration: 48810.656 ms
With scan threads on - duration: 39018.500 ms
---

Notice in above the cpu_cost=50 might looks too big (if you look into the
code) - but in concurrent situation, it is not that huge. Also, on my
windows box(PIII, 800), a cpu_cost=5 can is enough to prove the benefits
of 10%.

So in general, it does help in some situations, but not a rocket science
since we can't predicate the performance of the file system. It fairly
easy to be integrated, and we should add a GUC parameter to control it.

We need more tests, any comments and tests are welcome,

Regards,
Qingqing

---

/*
* seqscan.c
* PostgreSQL sequential scan simulator with helper scan thread
*
* Note
* I wrote this simulator to see if there is any benefits for sequential scan to
* do read-ahead by another thread. The only thing you may want to change in the
* source file is MEMSZ, make it big enough to thrash your file system cache.
*
* Use the following command to compile:
* $gcc -O2 -Wall -pthread -lm seqscan.c -o seqscan
* To use it:
* $./seqscan <rounds> <datafile> <cpu_cost>
* In which rounds is how many times you want to run the test (notice each round include
* two disk-burn test), datafile is the path to any file (suggest size > 100M), and cpu_cost
* is the cost that processing each page of the file. Try different cpu_cost.
*/

#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <memory.h>
#include <errno.h>
#include <math.h>

#ifdef WIN32
#include <io.h>
#include <windows.h>
#define PG_BINARY O_BINARY
#else
#include <unistd.h>
#include <pthread.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/file.h>
#define PG_BINARY 0
#endif

typedef char bool;
#define true ((bool) 1)
#define false ((bool) 0)

#define BLCKSZ 8192
#define UNITSZ 4
#define MEMSZ (950*1024*1024)

char *data_file;
int cpu_cost;
volatile bool stop_scan;
char thread_buffer[BLCKSZ*UNITSZ];

static void
cleanup_cache(void)
{
char *p;

if (NULL == (p = (char *)malloc(MEMSZ)))
{
fprintf(stderr, "insufficient memory\n");
exit(-1);
}

memset(p, 'a', MEMSZ);
free(p);
}

#ifdef WIN32
bool enable_aio = false;

static const unsigned __int64 epoch = 116444736000000000L;
static int gettimeofday(struct timeval * tp, struct timezone * tzp)
{
FILETIME file_time;
SYSTEMTIME system_time;
ULARGE_INTEGER ularge;

GetSystemTime(&system_time);
SystemTimeToFileTime(&system_time, &file_time);
ularge.LowPart = file_time.dwLowDateTime;
ularge.HighPart = file_time.dwHighDateTime;

tp->tv_sec = (long) ((ularge.QuadPart - epoch) / 10000000L);
tp->tv_usec = (long) (system_time.wMilliseconds * 1000);

return 0;
}

static void
sleep(int secs)
{
SleepEx(secs*1000, true);
}

static int
thread_open()
{
HANDLE fd;
SECURITY_ATTRIBUTES sa;

sa.nLength = sizeof(sa);
sa.bInheritHandle = TRUE;
sa.lpSecurityDescriptor = NULL;

fd = CreateFile(data_file,
GENERIC_READ,
FILE_SHARE_READ|FILE_SHARE_WRITE|FILE_SHARE_DELETE,
&sa,
OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL | FILE_FLAG_SEQUENTIAL_SCAN
| (enable_aio?FILE_FLAG_OVERLAPPED:0),
NULL);

if (fd == INVALID_HANDLE_VALUE)
{
int errCode;

switch (errCode = GetLastError())
{
/* EMFILE, ENFILE should not occur from CreateFile. */
case ERROR_PATH_NOT_FOUND:
case ERROR_FILE_NOT_FOUND: errno = ENOENT; break;
case ERROR_FILE_EXISTS: errno = EEXIST; break;
case ERROR_ACCESS_DENIED: errno = EACCES; break;
default:
fprintf(stderr, "thread_open failed: %d\n", errCode);
errno = EINVAL;
}

return -1;
}

return (int)fd;
}

static int
thread_read(int fd, int blkno, size_t nblk, char *buf)
{
long offset = BLCKSZ*blkno;
long nbytes;
OVERLAPPED ol;

memset(&ol, 0, sizeof(OVERLAPPED));
ol.Offset = offset;
ol.OffsetHigh = 0;

if (ReadFile((HANDLE)fd, buf, BLCKSZ*nblk, &nbytes, &ol))
{
/* successfully done without delay */
NULL;
}
else
{
int errCode;
switch (errCode = GetLastError())
{
case ERROR_IO_PENDING:
break;
case ERROR_HANDLE_EOF:
break;
default:
/* unknown error occured */
fprintf(stderr, "asyncread failed: %d\n", errCode);
exit(-1);
}
}

return nbytes;
}

static void
thread_close(int fd)
{
CloseHandle((HANDLE)fd);
}

#else /* non-windows platforms */

static int
thread_open()
{
int fd;

fd = open(data_file, O_RDWR | PG_BINARY, 0600);
if (fd < 0)
{
fprintf(stderr, "thread_open failed: %d\n", errno);
exit(-1);
}

return (int)fd;
}

static int
thread_read(int fd, int blkno, size_t nblk, char *buf)
{
long offset = BLCKSZ*blkno;
long nbytes;

nbytes = lseek(fd, offset, SEEK_SET);
nbytes = read(fd, buf, BLCKSZ*nblk);
if (nbytes <= 0)
{
fprintf(stderr, "thread_read failed: %d\n", errno);
exit(-1);
}

return nbytes;
}

static void
thread_close(int fd)
{
close(fd);
}
#endif

#ifdef WIN32
static DWORD WINAPI
scan_thread(LPVOID args)
#else
static void *
scan_thread(void *args)
#endif
{
int i, fd;
int start, end;

start = 0;
end = (size_t)args;

fd = thread_open();
for (i = start; i < end; i+=UNITSZ)
{
thread_read(fd, i, UNITSZ, (char *)thread_buffer);

/* check if I was asked to stop */
if (stop_scan == true)
break;
}
thread_close(fd);

return 0;
}

static int
init_scan(bool with_threads, size_t *nblocks)
{
int fd;

/* open file for do_scan */
fd = open(data_file, O_RDWR | PG_BINARY, 0600);
if (fd < 0)
{
fprintf(stderr, "failed to open file %s\n", data_file);
exit(-1);
}

*nblocks = lseek(fd, 0, SEEK_END) / BLCKSZ;
if (*nblocks < 0)
{
fprintf(stderr, "failed to get file length %s\n", data_file);
exit(-1);
}

if (with_threads)
{
#ifndef WIN32
pthread_t thread;
#endif
/* create scan threads */
stop_scan = false;
#ifdef WIN32
if (NULL == CreateThread(NULL, 0,
scan_thread, (void *)(*nblocks),
0, NULL))
#else
if (pthread_create(&thread, NULL,
scan_thread, (void *)(*nblocks)))
#endif
{
fprintf(stderr, "failed to start scan thread");
exit(-1);
}
}

return fd;
}

static void
do_scan(int fd, size_t nblocks)
{
int i, j, k, nbytes;
char buffer[BLCKSZ];

for (i = 0; i < nblocks; i++)
{
nbytes = lseek(fd, i*BLCKSZ, SEEK_SET);
nbytes = read(fd, buffer, BLCKSZ);
if (nbytes != BLCKSZ)
{
fprintf(stderr, "do_scan read failed\n");
exit(-1);
}

/* pretend to do some CPU intensive analysis */
for (k = 0; k < cpu_cost; k++)
{
for (j = (k*sizeof(int))%BLCKSZ;
j < BLCKSZ / (5 * sizeof(int));
j += sizeof(int))
{
int x, y;

x = ((int *)buffer)[j];
x = (int)pow((double)x, (double)(x+1));
y = (int)sin((double)x*x);
((int *)buffer)[j] = x*y;
}
}
}
}

static void
close_scan(fd)
{
stop_scan = true;
close(fd);
}

int
main(int argc, char *argv[])
{
int i, rounds, fd;
size_t nblocks;

if (argc != 4)
{
fprintf(stderr, "usage: cache <rounds> <datafile> <cpu_cost>\n");
exit(-1);
}

rounds = atoi(argv[1]);
data_file = argv[2];
cpu_cost = atoi(argv[3]);
fd = init_scan(false, &nblocks);
close_scan(fd);
fprintf(stdout, "PostgreSQL sequential scan simulator configuration:\n"
"\tMemory size: %u\n"
"\tCPU cost per page: %d\n"
"\tScan thread read unit size: %d\n\n",
MEMSZ, cpu_cost, UNITSZ);

for (i = 0; i < 2*rounds; i++)
{
struct timeval start_t, stop_t;
long usecs;
bool enable = i%2?true:false;

/* eliminate system cached data */
cleanup_cache();
sleep(2);

/* do the scan task */
gettimeofday(&start_t, NULL);
fd = init_scan(enable, &nblocks);
do_scan(fd, nblocks);
close_scan(fd);
gettimeofday(&stop_t, NULL);

/* measure the time */
if (stop_t.tv_usec < start_t.tv_usec)
{
stop_t.tv_sec--;
stop_t.tv_usec += 1000000;
}
usecs = (long) (stop_t.tv_sec - start_t.tv_sec) * 1000000
+ (long) (stop_t.tv_usec - start_t.tv_usec);
fprintf (stdout, "With scan threads %s - duration: %ld.%03ld ms\n",
enable?"on":"off",
(long) ((stop_t.tv_sec - start_t.tv_sec) * 1000 +
(stop_t.tv_usec - start_t.tv_usec) / 1000),
(long) (stop_t.tv_usec - start_t.tv_usec) % 1000);

sleep(2);
}

exit(0);
}

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message David Boreham 2005-11-29 03:50:43 Re: ice-broker scan thread
Previous Message Tom Lane 2005-11-29 02:59:29 Re: Checking a heap page