*** 14recheck/src/backend/postmaster/autovacuum.c	2007-03-27 16:43:31.000000000 -0400
--- 12vacuum/src/backend/postmaster/autovacuum.c	2007-03-27 17:40:19.000000000 -0400
***************
*** 52,57 ****
--- 52,58 ----
  #include "utils/syscache.h"
  
  
+ static volatile sig_atomic_t got_SIGUSR1 = false;
  static volatile sig_atomic_t got_SIGHUP = false;
  static volatile sig_atomic_t avlauncher_shutdown_request = false;
  
***************
*** 59,64 ****
--- 60,66 ----
   * GUC parameters
   */
  bool		autovacuum_start_daemon = false;
+ int			autovacuum_max_workers;
  int			autovacuum_naptime;
  int			autovacuum_vac_thresh;
  double		autovacuum_vac_scale;
***************
*** 69,75 ****
  int			autovacuum_vac_cost_delay;
  int			autovacuum_vac_cost_limit;
  
! /* Flag to tell if we are in the autovacuum daemon process */
  static bool am_autovacuum_launcher = false;
  static bool am_autovacuum_worker = false;
  
--- 71,77 ----
  int			autovacuum_vac_cost_delay;
  int			autovacuum_vac_cost_limit;
  
! /* Flags to tell if we are in an autovacuum process */
  static bool am_autovacuum_launcher = false;
  static bool am_autovacuum_worker = false;
  
***************
*** 86,91 ****
--- 88,94 ----
  typedef struct autovac_dbase
  {
  	Oid			ad_datid;
+ 	TimestampTz	ad_next_worker;
  	char	   *ad_name;
  	TransactionId ad_frozenxid;
  	PgStat_StatDBEntry *ad_entry;
***************
*** 110,123 ****
  	int			at_vacuum_cost_limit;
  } autovac_table;
  
  typedef struct
  {
! 	Oid		process_db;			/* OID of database to process */
! 	int		worker_pid;			/* PID of the worker process, if any */
  } AutoVacuumShmemStruct;
  
  static AutoVacuumShmemStruct *AutoVacuumShmem;
  
  #ifdef EXEC_BACKEND
  static pid_t avlauncher_forkexec(void);
  static pid_t avworker_forkexec(void);
--- 113,158 ----
  	int			at_vacuum_cost_limit;
  } autovac_table;
  
+ /*-------------
+  * This struct holds information about a single worker's whereabouts.  We keep
+  * an array of these in shared memory, sized according to
+  * autovacuum_max_workers.
+  *
+  * wi_dboid		OID of the database this worker is supposed to work on
+  * wi_tableoid	OID of the table currently being vacuumed
+  * wi_workerpid	PID of the running worker, 0 if not yet started
+  * wi_finished	True when the worker is done and about to exit
+  *-------------
+  */
+ typedef struct
+ {
+ 	Oid			wi_dboid;
+ 	Oid			wi_tableoid;
+ 	int			wi_workerpid;
+ 	bool		wi_finished;
+ } WorkerInfo;
+ 
  typedef struct
  {
! 	pid_t		av_launcherpid;
! 	WorkerInfo	av_workers[1];
! 	/* VARIABLE LENGTH STRUCT */
  } AutoVacuumShmemStruct;
  
+ /* Macro to iterate over all workers.  Beware multiple evaluation of args! */
+ #define foreach_worker(_i, _worker) \
+ 	_worker = (WorkerInfo *) (AutoVacuumShmem + \
+ 							  offsetof(AutoVacuumShmemStruct, av_workers)); \
+ 	for (_i = 0; _i < autovacuum_max_workers; _i++, _worker += sizeof(WorkerInfo))
+ 
  static AutoVacuumShmemStruct *AutoVacuumShmem;
  
+ /* number of currently free worker slots; only valid in the launcher */
+ static int free_workers;
+ /* the database list in the launcher, and the context that contains it */
+ static Dllist *DatabaseList = NULL;
+ static MemoryContext DatabaseListCxt = NULL;
+ 
  #ifdef EXEC_BACKEND
  static pid_t avlauncher_forkexec(void);
  static pid_t avworker_forkexec(void);
***************
*** 125,133 ****
  NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]);
  NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]);
  
! static void do_start_worker(void);
! static void do_autovacuum(Oid dbid);
! static List *autovac_get_database_list(void);
  
  static void relation_check_autovac(Oid relid, Form_pg_class classForm,
  					   Form_pg_autovacuum avForm, PgStat_StatTabEntry *tabentry,
--- 160,174 ----
  NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]);
  NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]);
  
! static Oid do_start_worker(void);
! static int launcher_determine_sleep(void);
! static void launch_worker(TimestampTz now);
! static List *get_database_list(void);
! static void rebuild_database_list(Oid newdb);
! static int avdb_comparator(const void *a, const void *b);
! 
! static void do_autovacuum(WorkerInfo *worker, Oid dbid);
! static HeapTuple get_pg_autovacuum_tuple_relid(Relation avRel, Oid relid);
  
  static void relation_check_autovac(Oid relid, Form_pg_class classForm,
  					   Form_pg_autovacuum avForm, PgStat_StatTabEntry *tabentry,
***************
*** 138,144 ****
  											PgStat_StatDBEntry *dbentry);
  static void autovacuum_do_vac_analyze(Oid relid, bool dovacuum,
  						  bool doanalyze, int freeze_min_age);
- static HeapTuple get_pg_autovacuum_tuple_relid(Relation avRel, Oid relid);
  static PgStat_StatTabEntry *get_pgstat_tabentry_relid(Oid relid, bool isshared,
  						  PgStat_StatDBEntry *shared,
  						  PgStat_StatDBEntry *dbentry);
--- 179,184 ----
***************
*** 149,154 ****
--- 189,195 ----
  static HeapTuple get_pg_autovacuum_tuple_relid(Relation avRel, Oid relid);
  static void autovac_report_activity(VacuumStmt *vacstmt, Oid relid);
  static void avl_sighup_handler(SIGNAL_ARGS);
+ static void avl_sigusr1_handler(SIGNAL_ARGS);
  static void avlauncher_shutdown(SIGNAL_ARGS);
  static void avl_quickdie(SIGNAL_ARGS);
  
***************
*** 232,237 ****
--- 273,312 ----
  
  /*
   * Main loop for the autovacuum launcher process.
+  *
+  * The signalling between launcher and worker is as follows:
+  *
+  * When the worker has finished starting up, it stores its PID in wi_workerpid
+  * and sends a SIGUSR1 signal to the launcher.  The launcher then knows that
+  * the postmaster is ready to start a new worker.  We do it this way because
+  * otherwise we risk calling SendPostmasterSignal() when the postmaster hasn't
+  * yet processed the last one, in which case the second signal would be lost.
+  * This is only useful when two workers need to be started close to one
+  * another, which should be rare but it's possible.
+  *
+  * Additionally, when the worker is finished with the vacuum work, it sets the
+  * wi_finished flag and sends a SIGUSR1 signal to the launcher.  Upon receipt
+  * of this signal, the launcher then clears the entry for future use and may
+  * start another worker right away, if need be.
+  *
+  * Note that there is still a race condition here, because a worker may finish
+  * just after the launcher sent the signal to postmaster, but before postmaster
+  * processes it.  At this point, the launcher receives a signal, sees the empty
+  * slot, so it sends the postmaster the signal again to start another worker.
+  * But the postmaster flag was already set, so the signal is lost.  To avoid
+  * this problem, the launcher will not try to start a new worker until all
+  * WorkerInfo entries that have the wi_dboid field set have a PID assigned.
+  * (FIXME -- this part is not yet implemented.)
+  *
+  * There is an additional problem if, for some reason, a worker starts and
+  * is not able to finish its task correctly.  It will not be able to set its
+  * finished flag, so the launcher will believe that it's still starting up.
+  * To prevent this problem, we check the PGPROCs of worker processes, and
+  * clean them up if we find they are not actually running (or they correspond
+  * to processes that are not autovacuum workers.)  We only do it if all 
+  * WorkerInfo structures are in use, thus frequently enough so that this
+  * problem doesn't cause any starvation, but seldom enough so that it's not a
+  * performance hit.  (FIXME -- this part is not yet implemented.)
   */
  NON_EXEC_STATIC void
  AutoVacLauncherMain(int argc, char *argv[])
***************
*** 266,274 ****
  	 * Set up signal handlers.	Since this is an auxiliary process, it has
  	 * particular signal requirements -- no deadlock checker or sinval
  	 * catchup, for example.
- 	 *
- 	 * XXX It may be a good idea to receive signals when an avworker process
- 	 * finishes.
  	 */
  	pqsignal(SIGHUP, avl_sighup_handler);
  
--- 341,346 ----
***************
*** 278,284 ****
  	pqsignal(SIGALRM, SIG_IGN);
  
  	pqsignal(SIGPIPE, SIG_IGN);
! 	pqsignal(SIGUSR1, SIG_IGN);
  	/* We don't listen for async notifies */
  	pqsignal(SIGUSR2, SIG_IGN);
  	pqsignal(SIGFPE, FloatExceptionHandler);
--- 350,356 ----
  	pqsignal(SIGALRM, SIG_IGN);
  
  	pqsignal(SIGPIPE, SIG_IGN);
! 	pqsignal(SIGUSR1, avl_sigusr1_handler);
  	/* We don't listen for async notifies */
  	pqsignal(SIGUSR2, SIG_IGN);
  	pqsignal(SIGFPE, FloatExceptionHandler);
***************
*** 360,380 ****
  	/* We can now handle ereport(ERROR) */
  	PG_exception_stack = &local_sigjmp_buf;
  
  	ereport(LOG,
  			(errmsg("autovacuum launcher started")));
  
! 	PG_SETMASK(&UnBlockSig);
  
  	/*
! 	 * take a nap before executing the first iteration, unless we were
! 	 * requested an emergency run.
  	 */
! 	if (autovacuum_start_daemon)
! 		pg_usleep(autovacuum_naptime * 1000000L); 
  
  	for (;;)
  	{
! 		int		worker_pid;
  
  		/*
  		 * Emergency bailout if postmaster has died.  This is to avoid the
--- 432,464 ----
  	/* We can now handle ereport(ERROR) */
  	PG_exception_stack = &local_sigjmp_buf;
  
+ 	/* in emergency mode, just start a worker and go away */
+ 	if (!autovacuum_start_daemon)
+ 	{
+ 		do_start_worker();
+ 		proc_exit(0);		/* done */
+ 	}
+ 
  	ereport(LOG,
  			(errmsg("autovacuum launcher started")));
  
! 	AutoVacuumShmem->av_launcherpid = MyProcPid;
  
  	/*
! 	 * Create the initial database list.  The invariant we want this list to
! 	 * keep is that it's ordered by decreasing next_time; initially all times
! 	 * are zero, so order doesn't matter.  As soon as an entry is updated to
! 	 * a higher time, it will be moved to the front (which is correct because
! 	 * the only operation is to add autovacuum_naptime to the entry, and time
! 	 * always increases).
  	 */
! 	rebuild_database_list(InvalidOid);
! 	free_workers = autovacuum_max_workers;
  
+ 	PG_SETMASK(&UnBlockSig);
  	for (;;)
  	{
! 		int		millis;
  
  		/*
  		 * Emergency bailout if postmaster has died.  This is to avoid the
***************
*** 383,388 ****
--- 467,478 ----
  		if (!PostmasterIsAlive(true))
  			exit(1);
  
+ 		millis = launcher_determine_sleep();
+ 
+ 		/* Sleep for a while according to schedule */
+ 		pg_usleep(millis * 1000);
+ 
+ 		/* the normal shutdown case */
  		if (avlauncher_shutdown_request)
  			break;
  
***************
*** 392,471 ****
  			ProcessConfigFile(PGC_SIGHUP);
  		}
  
  		/*
! 		 * if there's a worker already running, sleep until it
! 		 * disappears.
  		 */
! 		LWLockAcquire(AutovacuumLock, LW_SHARED);
! 		worker_pid = AutoVacuumShmem->worker_pid;
! 		LWLockRelease(AutovacuumLock);
  
! 		if (worker_pid != 0)
  		{
! 			PGPROC *proc = BackendPidGetProc(worker_pid);
  
! 			if (proc != NULL && proc->isAutovacuum)
! 				goto sleep;
  			else
  			{
  				/*
! 				 * if the worker is not really running (or it's a process
! 				 * that's not an autovacuum worker), remove the PID from shmem.
! 				 * This should not happen, because either the worker exits
! 				 * cleanly, in which case it'll remove the PID, or it dies, in
! 				 * which case postmaster will cause a system reset cycle.
  				 */
! 				LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 				worker_pid = 0;
! 				LWLockRelease(AutovacuumLock);
  			}
  		}
  
- 		do_start_worker();
- 
- sleep:
- 		/*
- 		 * in emergency mode, exit immediately so that the postmaster can
- 		 * request another run right away if needed.
- 		 *
- 		 * XXX -- maybe it would be better to handle this inside the launcher
- 		 * itself.
- 		 */
- 		if (!autovacuum_start_daemon)
- 			break;
- 
  		/* have pgstat read the file again next time */
  		pgstat_clear_snapshot();
- 
- 		/* now sleep until the next autovac iteration */
- 		pg_usleep(autovacuum_naptime * 1000000L); 
  	}
  
  	/* Normal exit from the autovac launcher is here */
  	ereport(LOG,
  			(errmsg("autovacuum launcher shutting down")));
  
  	proc_exit(0);		/* done */
  }
  
  /*
   * do_start_worker
   *
   * Bare-bones procedure for starting an autovacuum worker from the launcher.
   * It determines what database to work on, sets up shared memory stuff and
!  * signals postmaster to start the worker.
   */
! static void
  do_start_worker(void)
  {
  	List	   *dblist;
! 	bool		for_xid_wrap;
! 	autovac_dbase *db;
! 	ListCell *cell;
  	TransactionId xidForceLimit;
  
  	/* Get a list of databases */
! 	dblist = autovac_get_database_list();
  
  	/*
  	 * Determine the oldest datfrozenxid/relfrozenxid that we will allow
--- 482,829 ----
  			ProcessConfigFile(PGC_SIGHUP);
  		}
  
+ 		/* a worker started up or finished */
+ 		if (got_SIGUSR1)
+ 		{
+ 			WorkerInfo *worker;
+ 			int			i;
+ 
+ 			got_SIGUSR1 = false;
+ 
+ 			/* Walk the workers and clean up finished entries. */
+ 			LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);	
+ 			foreach_worker(i, worker)
+ 			{
+ 				if (worker->wi_finished)
+ 				{
+ 					worker->wi_tableoid = InvalidOid;
+ 					worker->wi_dboid = InvalidOid;
+ 					worker->wi_workerpid = 0;
+ 					worker->wi_finished = false;
+ 					free_workers++;
+ 				}
+ 			}
+ 			LWLockRelease(AutovacuumLock);
+ 		}
+ 
+ #if 0
  		/*
! 		 * Find and remove all entries corresponding to workers that failed to
! 		 * start.  Problem: how do we detect that it failed to start, yet leave
! 		 * alone those that are still really starting up?
! 		 *
! 		 * Idea: if we find that a database is listed twice, and none of the
! 		 * workers has registered, then something is wrong.  This fails if
! 		 * all workers failed in different databases however.
! 		 *
! 		 * Another idea: wreak havoc if a worker was started longer than
! 		 * autovac_naptime seconds ago and still hasn't registered.
  		 */
! 		if (free_workers == 0)
! 		{
! 			LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 			foreach_worker(i, worker)
! 			{
! 				if (worker->wi_workerpid == 0 && ...)
! 					clear the worker entry
! 			}
! 			LWLockRelease(AutovacuumLock);
! 		}
! #endif
  
! 		/*
! 		 * See if there's need to start a new worker, and do so if possible.
! 		 * If there are no free worker slots, avoid doing all this work, as
! 		 * we will not be able to start the worker anyway.
! 		 */
! 		if (free_workers > 0)
  		{
! 			TimestampTz	current_time;
! 			Dlelem	   *elem;
! 
! 			elem = DLGetTail(DatabaseList);
  
! 			current_time = GetCurrentTimestamp();
! 
! 			if (elem != NULL)
! 			{
! 				autovac_dbase *avdb = DLE_VAL(elem);
! 				long	secs;
! 				int		usecs;
! 
! 				TimestampDifference(current_time, avdb->ad_next_worker, &secs, &usecs);
! 
! 				/* we have to start a worker now */
! 				if (secs <= 0 && usecs <= 0)
! 					launch_worker(current_time);
! 			}
  			else
  			{
  				/*
! 				 * Special case when the list is empty: start a worker right
! 				 * away.  This covers the initial case, when no database is in
! 				 * pgstats (thus the list is empty).
  				 */
! 				launch_worker(current_time);
  			}
  		}
  
  		/* have pgstat read the file again next time */
  		pgstat_clear_snapshot();
  	}
  
  	/* Normal exit from the autovac launcher is here */
  	ereport(LOG,
  			(errmsg("autovacuum launcher shutting down")));
+ 	AutoVacuumShmem->av_launcherpid = 0;
  
  	proc_exit(0);		/* done */
  }
  
+ 
+ /*
+  * Determine the time to sleep based on the database list.
+  */
+ static int
+ launcher_determine_sleep(void)
+ {
+ 	long	secs;
+ 	int		usecs;
+ 	Dlelem *elem;
+ 
+ 	/*
+ 	 * We sleep until the next scheduled vacuum.  We trust that when the
+ 	 * database list was built, care was taken so that no entries have
+ 	 * negative times; if the first entry has a too close next_worker value,
+ 	 * we will sleep a small nominal time.
+ 	 */
+ 	elem = DLGetTail(DatabaseList);
+ 	if (elem != NULL)
+ 	{
+ 		autovac_dbase  *avdb = DLE_VAL(elem);
+ 		TimestampTz		current_time = GetCurrentTimestamp();
+ 		TimestampTz		next_wakeup;
+ 
+ 		next_wakeup = avdb->ad_next_worker;
+ 		TimestampDifference(current_time, next_wakeup, &secs, &usecs);
+ 	}
+ 	else
+ 	{
+ 		/* list is empty, sleep for whole autovacuum_naptime seconds  */
+ 		secs = autovacuum_naptime;
+ 		usecs = 0;
+ 	}
+ 	/*
+ 	 * someone screwed up (invalid entry on the list); sleep a nominal amount
+ 	 */
+ 	if (secs <= 0L && usecs <= 0)
+ 	{
+ 		secs = 0;
+ 		usecs = 500000;	/* 500 ms */
+ 	}
+ 
+ 	return secs * 1000 + usecs / 1000;
+ }
+ 
+ /*
+  * Build an updated DatabaseList.  It must only contain databases that appear
+  * in pgstats, and must be sorted by next_worker from highest to lowest, distributed
+  * regularly across the next autovacuum_naptime interval.
+  *
+  * Receives the Oid of the database that made this list be generated (we call
+  * this the "new" database, because when the database was already present on
+  * the list, we expect that this function is not called at all).  The
+  * preexisting list, if any, will be used to preserve the order of the
+  * databases in the autovacuum_naptime period.  The new database is put at the
+  * end of the interval.  The actual values are not saved.
+  */
+ static void
+ rebuild_database_list(Oid newdb)
+ {
+ 	Dllist	   *newdllist;
+ 	List	   *dblist;
+ 	ListCell   *cell;
+ 	autovac_dbase *dbary;
+ 	int			i;
+ 	TimestampTz	current_time;
+ 	TimestampTz	initial_time;
+ 	int			millis_increment = 0;
+ 	int			total_dbs;
+ 	int			unset_dbs = 0;
+ 	MemoryContext newcxt;
+ 	MemoryContext oldcxt;
+ 
+ 	newcxt = AllocSetContextCreate(TopMemoryContext,
+ 								   "dblist context",
+ 								   ALLOCSET_DEFAULT_MINSIZE,
+ 								   ALLOCSET_DEFAULT_INITSIZE,
+ 								   ALLOCSET_DEFAULT_MAXSIZE);
+ 	oldcxt = MemoryContextSwitchTo(newcxt);
+ 
+ 	/*
+ 	 * To build a sorted dllist, we first store the elements in a fixed-size
+ 	 * array, which we sort, and finally we store the individual elements in
+ 	 * the doubly linked list.
+ 	 */
+ 	dblist = get_database_list();
+ 	current_time = GetCurrentTimestamp();
+ 
+ 	/*
+ 	 * The new database array.  We must not free it: these will become the
+ 	 * elements of the Dllist.
+ 	 */
+ 	dbary = palloc(sizeof(autovac_dbase) * list_length(dblist));
+ 
+ 	i = 0;
+ 	foreach(cell, dblist)
+ 	{
+ 		autovac_dbase  *avdb = lfirst(cell);
+ 		PgStat_StatDBEntry *dbentry;
+ 		Dlelem		   *elm;
+ 		
+ 		Assert(avdb->ad_next_worker == 0);
+ 
+ 		dbentry = pgstat_fetch_stat_dbentry(avdb->ad_datid);
+ 		/* skip DBs without pgstat entry */
+ 		if (dbentry == NULL)
+ 			continue;
+ 
+ 		/* We set the new database to "now + autovacuum_naptime" */
+ 		if (avdb->ad_datid == newdb)
+ 		{
+ 			avdb->ad_next_worker =
+ 				TimestampTzPlusMilliseconds(current_time,
+ 											autovacuum_naptime * 1000);
+ 		}
+ 		else
+ 		{
+ 			/*
+ 			 * Otherwise, if the database has an entry on the old list, copy
+ 			 * the next_worker field into the new list.
+ 			 */
+ 			elm = DatabaseList ? DLGetHead(DatabaseList) : NULL;
+ 			while (elm != NULL)
+ 			{
+ 				autovac_dbase	*tmp = DLE_VAL(elm);
+ 
+ 				if (tmp->ad_datid == avdb->ad_datid)
+ 				{
+ 					avdb->ad_next_worker = tmp->ad_next_worker;
+ 					break;
+ 				}
+ 				elm = DLGetSucc(elm);
+ 			}
+ 		}
+ 
+ 		/* other databases will have the time set later */
+ 		if (avdb->ad_next_worker == 0)
+ 			unset_dbs++;
+ 
+ 		/* copy the entry into the array */
+ 		memcpy(&(dbary[i++]), avdb, sizeof(autovac_dbase));
+ 	}
+ 
+ 	total_dbs = i;
+ 	if (unset_dbs > 0)
+ 		millis_increment = autovacuum_naptime * 1000 / unset_dbs;
+ 	initial_time = TimestampTzPlusMilliseconds(current_time, millis_increment);
+ 
+ 	/* now set the time to the unknown databases */
+ 	for (i = 0; i < total_dbs; i++)
+ 	{
+ 		autovac_dbase	*avdb = &dbary[i];
+ 
+ 		if (avdb->ad_next_worker == 0)
+ 		{
+ 			avdb->ad_next_worker = initial_time;
+ 			initial_time = TimestampTzPlusMilliseconds(initial_time,
+ 													   millis_increment);
+ 		}
+ 	}
+ 
+ 	/* sort the array */
+ 	qsort(dbary, i, sizeof(autovac_dbase), avdb_comparator);
+ 
+ 	/* enter each array element into the new dl list */
+ 	newdllist = DLNewList();
+ 	for (i = 0; i < total_dbs; i++)
+ 	{
+ 		Dlelem		   *elem;
+ 
+ 		/* insert the the i-th array element as a dlelem into the new list */
+ 		elem = DLNewElem(&dbary[i]);
+ 		DLAddHead(newdllist, elem);
+ 	}
+ 
+ 	/* Free the old list, and set the new one up */
+ 	if (DatabaseListCxt != NULL)
+ 		MemoryContextDelete(DatabaseListCxt);
+ 	DatabaseListCxt = newcxt;
+ 	DatabaseList = newdllist;
+ 
+ 	MemoryContextSwitchTo(oldcxt);
+ }
+ 
+ /* qsort comparator for autovac_dbase, using next_worker */
+ static int
+ avdb_comparator(const void *a, const void *b)
+ {
+ 	if (((autovac_dbase *) a)->ad_next_worker == ((autovac_dbase *) b)->ad_next_worker)
+ 		return 0;
+ 	else
+ 		return (((autovac_dbase *) a)->ad_next_worker > ((autovac_dbase *) b)->ad_next_worker) ? 1 : -1;
+ }
+ 
+ 
+ 
+ 
  /*
   * do_start_worker
   *
   * Bare-bones procedure for starting an autovacuum worker from the launcher.
   * It determines what database to work on, sets up shared memory stuff and
!  * signals postmaster to start the worker.  It fails gracefully if invoked when
!  * autovacuum_workers are already active.
!  *
!  * Return value is the OID of the database that the worker is going to process,
!  * or InvalidOid if no worker was actually started.
   */
! static Oid
  do_start_worker(void)
  {
  	List	   *dblist;
! 	WorkerInfo *worker;
! 	int			i;
! 	ListCell   *cell;
  	TransactionId xidForceLimit;
+ 	bool		for_xid_wrap;
+ 	autovac_dbase *avdb;
+ 	TimestampTz		current_time;
+ 
+ 	/*
+ 	 * Find an unused WorkerInfo entry to set up.  If there is none, go to
+ 	 * sleep.
+ 	 *
+ 	 * NB: we only read the array here, and save a pointer where we'll
+ 	 * write the entry later.  Since this is the only process that creates
+ 	 * new entries into the array, there's no risk that somebody else will
+ 	 * use that pointer while we weren't looking.
+ 	 */
+ 	LWLockAcquire(AutovacuumLock, LW_SHARED);
+ 	foreach_worker(i, worker)
+ 	{
+ 		/* Invalid database OID means unused worker entry; use it */
+ 		if (!OidIsValid(worker->wi_dboid))
+ 			break;
+ 	}
+ 	LWLockRelease(AutovacuumLock);
+ 
+ 	/* they're all used up */
+ 	if (i >= autovacuum_max_workers)
+ 		return InvalidOid;
  
  	/* Get a list of databases */
! 	dblist = get_database_list();
  
  	/*
  	 * Determine the oldest datfrozenxid/relfrozenxid that we will allow
***************
*** 497,507 ****
  	 * isn't clear how to construct a metric that measures that and not cause
  	 * starvation for less busy databases.
  	 */
! 	db = NULL;
  	for_xid_wrap = false;
  	foreach(cell, dblist)
  	{
  		autovac_dbase *tmp = lfirst(cell);
  
  		/* Find pgstat entry if any */
  		tmp->ad_entry = pgstat_fetch_stat_dbentry(tmp->ad_datid);
--- 855,868 ----
  	 * isn't clear how to construct a metric that measures that and not cause
  	 * starvation for less busy databases.
  	 */
! 	avdb = NULL;
  	for_xid_wrap = false;
+ 	current_time = GetCurrentTimestamp();
  	foreach(cell, dblist)
  	{
  		autovac_dbase *tmp = lfirst(cell);
+ 		bool		skipit;
+ 		Dlelem	   *elem;
  
  		/* Find pgstat entry if any */
  		tmp->ad_entry = pgstat_fetch_stat_dbentry(tmp->ad_datid);
***************
*** 509,517 ****
  		/* Check to see if this one is at risk of wraparound */
  		if (TransactionIdPrecedes(tmp->ad_frozenxid, xidForceLimit))
  		{
! 			if (db == NULL ||
! 				TransactionIdPrecedes(tmp->ad_frozenxid, db->ad_frozenxid))
! 				db = tmp;
  			for_xid_wrap = true;
  			continue;
  		}
--- 870,878 ----
  		/* Check to see if this one is at risk of wraparound */
  		if (TransactionIdPrecedes(tmp->ad_frozenxid, xidForceLimit))
  		{
! 			if (avdb == NULL ||
! 				TransactionIdPrecedes(tmp->ad_frozenxid, avdb->ad_frozenxid))
! 				avdb = tmp;
  			for_xid_wrap = true;
  			continue;
  		}
***************
*** 526,548 ****
  			continue;
  
  		/*
  		 * Remember the db with oldest autovac time.  (If we are here,
  		 * both tmp->entry and db->entry must be non-null.)
  		 */
! 		if (db == NULL ||
! 			tmp->ad_entry->last_autovac_time < db->ad_entry->last_autovac_time)
! 			db = tmp;
  	}
  
  	/* Found a database -- process it */
! 	if (db != NULL)
  	{
  		LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 		AutoVacuumShmem->process_db = db->ad_datid;
  		LWLockRelease(AutovacuumLock);
  
  		SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);
  	}
  }
  
  /* SIGHUP: set flag to re-read config file at next convenient time */
--- 887,1012 ----
  			continue;
  
  		/*
+ 		 * Also, skip a database that appears on the passed database list as
+ 		 * having been processed recently (less than autovacuum_naptime seconds
+ 		 * ago).  We do this so that we don't select a database which we just
+ 		 * selected, but that pgstat hasn't gotten around to updating the
+ 		 * last autovacuum time yet.
+ 		 */
+ 		skipit = false;
+ 		elem = DatabaseList ? DLGetTail(DatabaseList) : NULL;
+ 		while (elem != NULL)
+ 		{
+ 			autovac_dbase *dbp = DLE_VAL(elem);
+ 
+ 			if (dbp->ad_datid == tmp->ad_datid)
+ 			{
+ 				TimestampTz		curr_plus_naptime;
+ 				TimestampTz		next = dbp->ad_next_worker;
+ 				
+ 				curr_plus_naptime =
+ 					TimestampTzPlusMilliseconds(current_time,
+ 												autovacuum_naptime * 1000);
+ 
+ 				/*
+ 				 * What we want here if to skip if next_worker falls between
+ 				 * the current time and the current time plus naptime.
+ 				 */
+ 				if (timestamp_cmp_internal(current_time, next) > 0)
+ 					skipit = false;
+ 				else if (timestamp_cmp_internal(next, curr_plus_naptime) > 0)
+ 					skipit = false;
+ 				else
+ 					skipit = true;
+ 
+ 				break;
+ 			}
+ 			elem = DLGetPred(elem);
+ 		}
+ 		if (skipit)
+ 			continue;
+ 
+ 		/*
  		 * Remember the db with oldest autovac time.  (If we are here,
  		 * both tmp->entry and db->entry must be non-null.)
  		 */
! 		if (avdb == NULL ||
! 			tmp->ad_entry->last_autovac_time < avdb->ad_entry->last_autovac_time)
! 			avdb = tmp;
  	}
  
  	/* Found a database -- process it */
! 	if (avdb != NULL)
  	{
  		LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 		Assert(!OidIsValid(worker->wi_dboid));
! 		worker->wi_dboid = avdb->ad_datid;
! 		worker->wi_workerpid = 0;
  		LWLockRelease(AutovacuumLock);
  
  		SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);
+ 
+ 		return avdb->ad_datid;
+ 	}
+ 
+ 	return InvalidOid;
+ }
+ 
+ /*
+  * launch_worker
+  *
+  * Wrapper for starting a worker from the launcher.  Besides actually starting
+  * it, update the database list to reflect the next time that another one will
+  * need to be started on the selected database.  The actual database choice is
+  * left to do_start_worker.
+  *
+  * This routine is also expected to insert an entry into the database list if
+  * the selected database was previously absent from the list.  It returns the
+  * new database list.
+  */
+ static void
+ launch_worker(TimestampTz now)
+ {
+ 	Oid		dbid;
+ 	Dlelem *elem;
+ 
+ 	dbid = do_start_worker();
+ 	if (OidIsValid(dbid))
+ 		free_workers--;
+ 
+ 	/*
+ 	 * Walk the database list and update corresponding entry.  If it's not on
+ 	 * the list, we'll recreate the list.
+ 	 */
+ 	elem = DLGetHead(DatabaseList);
+ 	while (elem != NULL)
+ 	{
+ 		autovac_dbase *avdb = DLE_VAL(elem);
+ 
+ 		if (avdb->ad_datid == dbid)
+ 		{
+ 			/*
+ 			 * add autovacuum_naptime seconds to the current time, and use that
+ 			 * as the new "next_worker" field for this database.
+ 			 */
+ 			avdb->ad_next_worker =
+ 				TimestampTzPlusMilliseconds(now, autovacuum_naptime * 1000);
+ 
+ 			DLMoveToFront(elem);
+ 			break;
+ 		}
+ 		elem = DLGetSucc(elem);
  	}
+ 
+ 	/*
+ 	 * If the database was not present in the database list, we rebuild the
+ 	 * list.  It's possible that the database does not get into the list
+ 	 * anyway, for example if it's a database that doesn't have a pgstat entry,
+ 	 * but this is not a problem because we don't want to schedule workers
+ 	 * regularly into those in any case.
+ 	 */
+ 	if (elem == NULL)
+ 		rebuild_database_list(dbid);
  }
  
  /* SIGHUP: set flag to re-read config file at next convenient time */
***************
*** 552,557 ****
--- 1016,1028 ----
  	got_SIGHUP = true;
  }
  
+ /* SIGUSR1: a worker is up and running, or just finished */
+ static void
+ avl_sigusr1_handler(SIGNAL_ARGS)
+ {
+ 	got_SIGUSR1 = true;
+ }
+ 
  static void
  avlauncher_shutdown(SIGNAL_ARGS)
  {
***************
*** 667,673 ****
  AutoVacWorkerMain(int argc, char *argv[])
  {
  	sigjmp_buf	local_sigjmp_buf;
! 	Oid			dbid;
  
  	/* we are a postmaster subprocess now */
  	IsUnderPostmaster = true;
--- 1138,1146 ----
  AutoVacWorkerMain(int argc, char *argv[])
  {
  	sigjmp_buf	local_sigjmp_buf;
! 	Oid			dbid = InvalidOid;
! 	WorkerInfo *worker;
! 	int			i;
  
  	/* we are a postmaster subprocess now */
  	IsUnderPostmaster = true;
***************
*** 765,780 ****
  	SetConfigOption("zero_damaged_pages", "false", PGC_SUSET, PGC_S_OVERRIDE);
  
  	/*
! 	 * Get the database Id we're going to work on, and announce our PID
! 	 * in the shared memory area.  We remove the database OID immediately
! 	 * from the shared memory area.
  	 */
  	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 
! 	dbid = AutoVacuumShmem->process_db;
! 	AutoVacuumShmem->process_db = InvalidOid;
! 	AutoVacuumShmem->worker_pid = MyProcPid;
! 
  	LWLockRelease(AutovacuumLock);
  
  	if (OidIsValid(dbid))
--- 1238,1259 ----
  	SetConfigOption("zero_damaged_pages", "false", PGC_SUSET, PGC_S_OVERRIDE);
  
  	/*
! 	 * Walk the WorkerInfo array, and get the database OID we're going to work
! 	 * on.  Use the first entry with PID 0 in the list, and advertise our PID
! 	 * on it, thus marking it used.
  	 */
  	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 	foreach_worker(i, worker)
! 	{
! 		if (worker->wi_workerpid == 0)
! 		{
! 			dbid = worker->wi_dboid;
! 			worker->wi_workerpid = MyProcPid;
! 			break;
! 		}
! 	}
! 	if (AutoVacuumShmem->av_launcherpid != 0)
! 		kill(AutoVacuumShmem->av_launcherpid, SIGUSR1);
  	LWLockRelease(AutovacuumLock);
  
  	if (OidIsValid(dbid))
***************
*** 812,826 ****
  
  		/* And do an appropriate amount of work */
  		recentXid = ReadNewTransactionId();
! 		do_autovacuum(dbid);
  	}
  
- 	/*
- 	 * Now remove our PID from shared memory, so that the launcher can start
- 	 * another worker as soon as appropriate.
- 	 */
  	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 	AutoVacuumShmem->worker_pid = 0;
  	LWLockRelease(AutovacuumLock);
  
  	/* All done, go away */
--- 1291,1318 ----
  
  		/* And do an appropriate amount of work */
  		recentXid = ReadNewTransactionId();
! 		do_autovacuum(worker, dbid);
  	}
  
  	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 	if (!autovacuum_start_daemon)
! 	{
! 		/* in emergency mode we must cleanup after ourselves */
! 		worker->wi_workerpid = 0;
! 		worker->wi_dboid = InvalidOid;
! 		worker->wi_finished = false;
! 	}
! 	else
! 	{
! 		/*
! 		 * Otherwise, let the launcher know we're done.  Warning: must set the
! 		 * flag before sending the signal.  XXX do we need to prevent compiler
! 		 * overenthusiastic optimization here?
! 		 */
! 		worker->wi_finished = true;
! 		if (AutoVacuumShmem->av_launcherpid != 0)
! 			kill(AutoVacuumShmem->av_launcherpid, SIGUSR1);
! 	}
  	LWLockRelease(AutovacuumLock);
  
  	/* All done, go away */
***************
*** 828,840 ****
  }
  
  /*
!  * autovac_get_database_list
   *
   *		Return a list of all databases.  Note we cannot use pg_database,
   *		because we aren't connected; we use the flat database file.
   */
  static List *
! autovac_get_database_list(void)
  {
  	char	   *filename;
  	List	   *dblist = NIL;
--- 1320,1332 ----
  }
  
  /*
!  * get_database_list
   *
   *		Return a list of all databases.  Note we cannot use pg_database,
   *		because we aren't connected; we use the flat database file.
   */
  static List *
! get_database_list(void)
  {
  	char	   *filename;
  	List	   *dblist = NIL;
***************
*** 862,867 ****
--- 1354,1360 ----
  		avdb->ad_name = pstrdup(thisname);
  		avdb->ad_frozenxid = db_frozenxid;
  		/* this gets set later: */
+ 		avdb->ad_next_worker = 0;
  		avdb->ad_entry = NULL;
  
  		dblist = lappend(dblist, avdb);
***************
*** 880,886 ****
   * order not to ignore shutdown commands for too long.
   */
  static void
! do_autovacuum(Oid dbid)
  {
  	Relation	classRel,
  				avRel;
--- 1373,1379 ----
   * order not to ignore shutdown commands for too long.
   */
  static void
! do_autovacuum(WorkerInfo *worker, Oid dbid)
  {
  	Relation	classRel,
  				avRel;
***************
*** 1027,1035 ****
--- 1520,1550 ----
  	{
  		Oid		relid = lfirst_oid(cell);
  		autovac_table *tab;
+ 		WorkerInfo *other_worker;
+ 		int			i;
+ 		bool		skipit;
  
  		CHECK_FOR_INTERRUPTS();
  
+ 		LWLockAcquire(AutovacuumScheduleLock, LW_EXCLUSIVE);
+ 
+ 		/*
+ 		 * Check whether the table is being vacuumed concurrently by another
+ 		 * worker.
+ 		 */
+ 		skipit = false;
+ 		foreach_worker(i, other_worker)
+ 		{
+ 			if (other_worker->wi_tableoid == relid)
+ 			{
+ 				LWLockRelease(AutovacuumScheduleLock);
+ 				skipit = true;
+ 				break;
+ 			}
+ 		}
+ 		if (skipit)
+ 			continue;
+ 
  		/*
  		 * Check whether pgstat data still says we need to vacuum this table.
  		 * It could have changed if other worker processed the table while we
***************
*** 1046,1054 ****
  		if (tab == NULL)
  		{
  			/* someone else vacuumed the table */
  			continue;
  		}
! 		/* Ok, good to go! */
  
  		/* Set the vacuum cost parameters for this table */
  		VacuumCostDelay = tab->at_vacuum_cost_delay;
--- 1561,1576 ----
  		if (tab == NULL)
  		{
  			/* someone else vacuumed the table */
+ 			LWLockRelease(AutovacuumScheduleLock);
  			continue;
  		}
! 
! 		/*
! 		 * Ok, good to go.  Store the table in shared memory before releasing
! 		 * the lock so that other workers don't vacuum it concurrently.
! 		 */
! 		worker->wi_tableoid = relid;
! 		LWLockRelease(AutovacuumScheduleLock);
  
  		/* Set the vacuum cost parameters for this table */
  		VacuumCostDelay = tab->at_vacuum_cost_delay;
***************
*** 1615,1621 ****
  Size
  AutoVacuumShmemSize(void)
  {
! 	return sizeof(AutoVacuumShmemStruct);
  }
  
  /*
--- 2137,2144 ----
  Size
  AutoVacuumShmemSize(void)
  {
! 	return add_size(offsetof(AutoVacuumShmemStruct, av_workers),
!  					mul_size(autovacuum_max_workers, sizeof(WorkerInfo)));
  }
  
  /*
***************
*** 1638,1642 ****
  	if (found)
  		return;                 /* already initialized */
  
! 	MemSet(AutoVacuumShmem, 0, sizeof(AutoVacuumShmemStruct));
  }
--- 2161,2165 ----
  	if (found)
  		return;                 /* already initialized */
  
! 	MemSet(AutoVacuumShmem, 0, AutoVacuumShmemSize());
  }
