#include #include #define read_barrier() __atomic_thread_fence(__ATOMIC_ACQUIRE) #define write_barrier() __atomic_thread_fence(__ATOMIC_RELEASE) enum PgAioOp { /* intentionally the zero value, to help catch zeroed memory etc */ PGAIO_OP_INVALID = 0, PGAIO_OP_READV = 1, }; enum PgAioTargetID { /* intentionally the zero value, to help catch zeroed memory etc */ PGAIO_TID_INVALID = 0, PGAIO_TID_SMGR, } PgAioTargetID; enum PgAioHandleState { /* not in use */ PGAIO_HS_IDLE = 0, /* * Returned by pgaio_io_acquire(). The next state is either DEFINED (if * pgaio_io_start_*() is called), or IDLE (if pgaio_io_release() is * called). */ PGAIO_HS_HANDED_OUT, /* * pgaio_io_start_*() has been called, but IO is not yet staged. At this * point the handle has all the information for the IO to be executed. */ PGAIO_HS_DEFINED, /* * stage() callbacks have been called, handle ready to be submitted for * execution. Unless in batchmode (see c.f. pgaio_enter_batchmode()), the * IO will be submitted immediately after. */ PGAIO_HS_STAGED, /* IO has been submitted to the IO method for execution */ PGAIO_HS_SUBMITTED, /* IO finished, but result has not yet been processed */ PGAIO_HS_COMPLETED_IO, /* * IO completed, shared completion has been called. * * If the IO completion occurs in the issuing backend, local callbacks * will immediately be called. Otherwise the handle stays in * COMPLETED_SHARED until the issuing backend waits for the completion of * the IO. */ PGAIO_HS_COMPLETED_SHARED, /* * IO completed, local completion has been called. * * After this the handle will be made reusable and go into IDLE state. */ PGAIO_HS_COMPLETED_LOCAL, } PgAioHandleState; typedef struct PgAioHandle { /* all state updates should go through pgaio_io_update_state() */ int state:8; /* what are we operating on */ int target:8; /* which IO operation */ int op:8; } PgAioHandle; void update_state(PgAioHandle* ioh, int new_state) { write_barrier(); ioh->state = new_state; } void reclaim(PgAioHandle* ioh) { assert(ioh->state != PGAIO_HS_IDLE); if (ioh->state == PGAIO_HS_COMPLETED_SHARED) { // read_barrier(); update_state(ioh, PGAIO_HS_COMPLETED_LOCAL); } update_state(ioh, PGAIO_HS_IDLE); ioh->op = PGAIO_OP_INVALID; ioh->target = PGAIO_TID_INVALID; } void wait_for_free(PgAioHandle* ioh) { while (1) { if (ioh->state == PGAIO_HS_COMPLETED_SHARED) { read_barrier(); reclaim(ioh); return; } } } void* io_thread(void* arg) { PgAioHandle* ioh = (PgAioHandle*)arg; while (1) { while (ioh->state != PGAIO_HS_SUBMITTED); read_barrier(); assert(ioh->op != PGAIO_OP_INVALID); update_state(ioh, PGAIO_HS_COMPLETED_SHARED); } } int main() { PgAioHandle ioh = {PGAIO_HS_IDLE, PGAIO_TID_INVALID, PGAIO_OP_INVALID}; pthread_t thread; pthread_create(&thread, 0, &io_thread, &ioh); while (1) { assert(ioh.op == PGAIO_OP_INVALID); assert(ioh.state == PGAIO_HS_IDLE); ioh.op = PGAIO_OP_READV; update_state(&ioh, PGAIO_HS_SUBMITTED); wait_for_free(&ioh); } }