diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 781a736..54caade 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -78,6 +78,8 @@ static void show_merge_append_keys(MergeAppendState *mstate, List *ancestors,
 					   ExplainState *es);
 static void show_agg_keys(AggState *astate, List *ancestors,
 			  ExplainState *es);
+static void show_agg_batching(AggState *astate, List *ancestors,
+			  ExplainState *es);
 static void show_group_keys(GroupState *gstate, List *ancestors,
 				ExplainState *es);
 static void show_sort_group_keys(PlanState *planstate, const char *qlabel,
@@ -1391,6 +1393,7 @@ ExplainNode(PlanState *planstate, List *ancestors,
 										   planstate, es);
 			break;
 		case T_Agg:
+			show_agg_batching((AggState *) planstate, ancestors, es);
 			show_agg_keys((AggState *) planstate, ancestors, es);
 			show_upper_qual(plan->qual, "Filter", planstate, ancestors, es);
 			if (plan->qual)
@@ -1790,6 +1793,36 @@ show_agg_keys(AggState *astate, List *ancestors,
 }
 
 /*
+ * Show the batching info for an Agg node.
+ */
+static void
+show_agg_batching(AggState *astate, List *ancestors,
+			  ExplainState *es)
+{
+	Agg		   *plan = (Agg *) astate->ss.ps.plan;
+
+	if ((es->analyze) && (plan->aggstrategy == AGG_HASHED))
+	{
+		if (es->format == EXPLAIN_FORMAT_TEXT)
+		{
+			appendStringInfoSpaces(es->str, es->indent * 2);
+			appendStringInfo(es->str, "Batch Count: %d  Rebatches: %d  Smallest: %ldkB  Largest: %ldkB Rescanned: %.0f%%\n",
+							 astate->batch_count, astate->rebatch_count,
+							 astate->batch_min_size / 1024,
+							 astate->batch_max_size / 1024,
+							 astate->ntuples_rescanned * 100.0 / astate->ntuples_scanned);
+		}
+		else
+		{
+			ExplainPropertyLong("Batch Count", astate->batch_count, es);
+			ExplainPropertyLong("Batch Smallest", astate->batch_min_size/1024, es);
+			ExplainPropertyLong("Batch Largest", astate->batch_max_size/1024, es);
+			ExplainPropertyLong("Batch Rescan Rate", (astate->ntuples_rescanned * 100) / astate->ntuples_scanned, es);
+		}
+	}
+}
+
+/*
  * Show the grouping keys for a Group node.
  */
 static void
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index 3ae9583..1266faf 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -118,7 +118,7 @@
 #include "utils/datum.h"
 #include "utils/dynahash.h"
 
-#define HASH_DISK_MIN_PARTITIONS		1
+#define HASH_DISK_MIN_PARTITIONS		2
 #define HASH_DISK_DEFAULT_PARTITIONS	4
 #define HASH_DISK_MAX_PARTITIONS		256
 
@@ -328,6 +328,7 @@ typedef struct HashWork
 	BufFile		 *input_file;	/* input partition, NULL for outer plan */
 	int			  input_bits;	/* number of bits for input partition mask */
 
+	double		  ntuples_expected; /* number of tuples expected on input */
 	int			  n_output_partitions; /* number of output partitions */
 	BufFile		**output_partitions; /* output partition files */
 	int			 *output_ntuples; /* number of tuples in each partition */
@@ -356,7 +357,7 @@ static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos);
 static void build_hash_table(AggState *aggstate, Size tuple_width);
 static AggHashEntry lookup_hash_entry(AggState *aggstate, HashWork * work,
 					uint32 hashvalue, TupleTableSlot *inputslot);
-static HashWork *hash_work(BufFile *input_file, int input_bits);
+static HashWork *hash_work(BufFile *input_file, int input_bits, double ntuples_expected);
 static TupleTableSlot *agg_retrieve_direct(AggState *aggstate);
 static bool agg_fill_hash_table(AggState *aggstate);
 static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate);
@@ -376,7 +377,7 @@ static TupleTableSlot *
 read_saved_tuple(BufFile *file, uint32 *hashvalue, TupleTableSlot *tupleSlot);
 static void
 save_tuple(AggState *aggstate, HashWork *work, TupleTableSlot *slot,
-		   uint32 hashvalue);
+		   uint32 hashvalue, int64 ntuples_current);
 
 /*
  * The size of the chunks for dense allocation. This needs to be >8kB
@@ -1588,6 +1589,8 @@ agg_fill_hash_table(AggState *aggstate)
 	TupleTableSlot *outerslot = NULL;
 	HashWork   *work;
 	int			i;
+	int64		ntuples = 0;
+	Size		allocated;
 
 	/*
 	 * get state info from node
@@ -1639,6 +1642,7 @@ agg_fill_hash_table(AggState *aggstate)
 				break;
 
 			hashvalue = compute_hash_value(aggstate, outerslot);
+			aggstate->ntuples_scanned += 1;
 		}
 		else
 		{
@@ -1653,8 +1657,12 @@ agg_fill_hash_table(AggState *aggstate)
 				work->input_file = NULL;
 				break;
 			}
+
+			aggstate->ntuples_rescanned += 1;
 		}
 
+		ntuples++;
+
 		/* Find or build hashtable entry for this tuple's group */
 		entry = lookup_hash_entry(aggstate, work, hashvalue, outerslot);
 
@@ -1672,11 +1680,81 @@ agg_fill_hash_table(AggState *aggstate)
 		} else {
 
 			/* no entry for this tuple, and we've reached work_mem */
-			save_tuple(aggstate, work, outerslot, hashvalue);
+			save_tuple(aggstate, work, outerslot, hashvalue, ntuples);
 
 		}
 	}
 
+	/*
+	 * XXX Idea on estimating the number of partitions necessary in the
+	 *     next step, based on estimating group state size etc.
+	 *
+	 * See how much memory is allocated in the memory context, so that
+	 * we can use it to compute average state size (including ovehead).
+	 * When doing the initial partitioning, we did it when reaching
+	 * work_mem, but the states may have grown further.
+	 *
+	 * Assuming the groups are not of wildly different size, we can
+	 * optimize the partitioning in the following work items like this:
+	 *
+	 * 1) computing average group size
+	 *
+	 *   avg_group_size = allocated_bytes / nentries
+	 *
+	 * 2) computing number of groups that fit into work_mem
+	 *
+	 *   groups_work_mem = (work_mem * 1024L) / avg_group_size
+	 *
+	 * 3) computing tuples per group
+	 *
+	 *   ntuples_per_group = (ntuples / nentries)
+	 *
+	 * 4) computing ntuples_in_partition
+	 *
+	 *   optimal_partition_tuples = ntuples_per_group * groups_work_mem
+	 *
+	 * 5) we know how many tuples we wrote into each partition - we can
+	 *    either compute it as (ntuples/npartitions) which is easy, or
+	 *    track the number per partition (more correct), so we can
+	 *    decide into how many partitions should we split it in the next
+	 *    step
+	 *
+	 *     npartitions = ntuples_per_partition / optimal_partition_tuples
+	 *
+	 *    and we can do this immediately at the beginning, using the
+	 *    hash value (assuming it's a power of 2).
+	 *
+	 * This should minimize the number of times a tuple is read from the
+	 * temporary file, only to be written again because there's not enough
+	 * free memory.
+	 *
+	 * The question is how this will deal with exceptionally large
+	 * groups. Technically, all partitions should receive about the
+	 * same number of groups, but if there's a very frequent group the
+	 * partition may be much larger (many more tuples, belonging to the
+	 * very large group). What we need to prevent is splitting the data
+	 * into needlessly small partitions.
+	 */
+
+	allocated = MemoryContextGetAllocated(aggstate->hashtable->htabctx, true);
+
+	/* keep track of the largest/smallest batch size */
+	if (aggstate->batch_count == 1)
+	{
+		aggstate->batch_min_size = allocated;
+		aggstate->batch_max_size = allocated;
+	}
+	else
+	{
+		if (allocated < aggstate->batch_min_size)
+			aggstate->batch_min_size = allocated;
+		if (allocated > aggstate->batch_max_size)
+			aggstate->batch_max_size = allocated;
+	}
+
+	if (work->n_output_partitions > 0)
+		aggstate->rebatch_count += 1;
+
 	/* add each output partition as a new work item */
 	for (i = 0; i < work->n_output_partitions; i++)
 	{
@@ -1693,11 +1771,27 @@ agg_fill_hash_table(AggState *aggstate)
 					(errcode_for_file_access(),
 					 errmsg("could not rewind HashAgg temporary file: %m")));
 
+		/*
+		 * XXX This assumes all the batches are equally sized, but that
+		 * can easily not be the case - imagine a frequent group. All the
+		 * tuples will get saved into the same batch, thus making it
+		 * much larger than the rest.
+		 *
+		 * At this point, we can also estimate the average group size as
+		 * (allocated memory / nentries), and number of tuples per group
+		 * (ntuples / nentries). Granted, those are pretty rough estimates
+		 * that can go wrong in many ways, but it's better than nothing.
+		 *
+		 * TODO Keep track of number of tuples saved in each group.
+		 */
 		oldContext = MemoryContextSwitchTo(aggstate->aggcontext);
-		aggstate->hash_work = lappend(aggstate->hash_work,
-									  hash_work(file,
-												work->output_bits + work->input_bits));
+		aggstate->hash_work
+			= lappend(aggstate->hash_work,
+					  hash_work(file,
+					  work->output_bits + work->input_bits,
+					  work->output_ntuples[i]));
 		MemoryContextSwitchTo(oldContext);
+		aggstate->batch_count += 1;
 	}
 
 	pfree(work);
@@ -1984,13 +2078,19 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
 		aggstate->hash_init_state = true;
 		aggstate->hash_disk = false;
 
+		/* explain (analyze) counters */
+		aggstate->batch_count = 1;
+		aggstate->rebatch_count = 0;
+		aggstate->ntuples_scanned = 0;
+		aggstate->ntuples_rescanned = 0;
+
 		/* Compute the columns we actually need to hash on */
 		aggstate->hash_needed = find_hash_columns(aggstate);
 
 		/* prime with initial work item to read from outer plan */
 		oldContext = MemoryContextSwitchTo(aggstate->aggcontext);
 		aggstate->hash_work = lappend(aggstate->hash_work,
-									  hash_work(NULL, 0));
+									  hash_work(NULL, 0, outerPlan->plan_rows));
 		MemoryContextSwitchTo(oldContext);
 
 	}
@@ -2467,7 +2567,7 @@ ExecReScanAgg(AggState *node)
 	if (((Agg *) node->ss.ps.plan)->aggstrategy == AGG_HASHED)
 	{
 		MemoryContext oldContext;
-		Plan * outerPlan = outerPlan((Agg *) node->ss.ps.plan);
+		Plan *outerPlan = outerPlan((Agg *) node->ss.ps.plan);
 
 		/* Rebuild an empty hash table */
 		build_hash_table(node, outerPlan->plan_width);
@@ -2476,10 +2576,16 @@ ExecReScanAgg(AggState *node)
 		node->hash_disk = false;
 		node->hash_work = NIL;
 
+		/* explain (analyze) counters */
+		node->batch_count = 1;
+		node->rebatch_count = 0;
+		node->ntuples_scanned = 0;
+		node->ntuples_rescanned = 0;
+
 		/* prime with initial work item to read from outer plan */
 		oldContext = MemoryContextSwitchTo(node->aggcontext);
 		node->hash_work = lappend(node->hash_work,
-								  hash_work(NULL, 0));
+								  hash_work(NULL, 0, outerPlan->plan_rows));
 		MemoryContextSwitchTo(oldContext);
 	}
 	else
@@ -2992,12 +3098,13 @@ AggHashEntry IteratorGetNext(AggHashTable htab)
  * done. Should be called in the aggregate's memory context.
  */
 static HashWork *
-hash_work(BufFile *input_file, int input_bits)
+hash_work(BufFile *input_file, int input_bits, double ntuples_expected)
 {
 	HashWork *work = palloc(sizeof(HashWork));
 
 	work->input_file = input_file;
 	work->input_bits = input_bits;
+	work->ntuples_expected = ntuples_expected;
 
 	/*
 	 * Will be set only if we run out of memory and need to partition an
@@ -3019,7 +3126,7 @@ hash_work(BufFile *input_file, int input_bits)
  */
 static void
 save_tuple(AggState *aggstate, HashWork *work, TupleTableSlot *slot,
-		   uint32 hashvalue)
+		   uint32 hashvalue, int64 ntuples_current)
 {
 	int					 partition;
 	MinimalTuple		 tuple;
@@ -3028,7 +3135,35 @@ save_tuple(AggState *aggstate, HashWork *work, TupleTableSlot *slot,
 
 	if (work->output_partitions == NULL)
 	{
-		int npartitions = HASH_DISK_DEFAULT_PARTITIONS; //TODO choose
+		/*
+		 * We will choose the number of partitions based on when we
+		 * reached full work_mem. We expect to get work->ntuples in
+		 * total, and we've reached work_mem at ntuples, so we expect
+		 * to get (work->ntuples - ntuples) additional tuples.
+		 * 
+		 * By assuming to reach work_mem every 'ntuples' tuples, we can
+		 * estimate the number of batches as
+		 * 
+		 *    ceil((work->ntuples - ntuples)/ntuples)
+		 * 
+		 * which is actually
+		 * 
+		 *    (int)(1 + (work->ntuples - ntuples)/ntuples)
+		 * 
+		 * and that's just (work->ntuples/ntuples). We'll impose some
+		 * basic safety numbers on those values.
+		 * 
+		 * It's probably better to over-estimate here, under-estimation
+		 * means we'll have to read the tuples and write then again,
+		 * into another set of batches. Not really efficient.
+		 *
+		 * Also, even if there's a huge group in this batch, the state
+		 * size usually grows along with the number of tuples passed
+		 * to the transition function. So even in this case it should
+		 * be a good estimate (e.g. the batching should not be triggered
+		 * too early).
+		 */
+		int npartitions = (work->ntuples_expected / ntuples_current);
 		int partition_bits;
 		int i;
 
@@ -3037,7 +3172,9 @@ save_tuple(AggState *aggstate, HashWork *work, TupleTableSlot *slot,
 		if (npartitions > HASH_DISK_MAX_PARTITIONS)
 			npartitions = HASH_DISK_MAX_PARTITIONS;
 
+		/* make it a power of 2 */
 		partition_bits = my_log2(npartitions);
+		npartitions = (1 << partition_bits);
 
 		/* make sure that we don't exhaust the hash bits */
 		if (partition_bits + work->input_bits >= 32)
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 1a61ac7..9b2558e 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1731,6 +1731,20 @@ typedef struct AggState
 	bool		hash_disk;		/* have we exceeded memory yet? */
 	List	   *hash_work;		/* remaining work to be done */
 
+	/* counters used mostly for explain (analyze) */
+	int			batch_count;		/* number of batches generated in total */
+	int			rebatch_count;		/* number of rebatches (splitting of a batch) */
+	Size		batch_min_size;		/* minimum batch size (bytes) */
+	Size		batch_max_size;		/* maximum batch size (bytes) */
+
+	/*
+	 * These two counters allow evaluation of how many times the tuples
+	 * were saved/read. With no batching, rescanned=0. With a single
+	 * level of batching (rescanned/scanned < 1.00) and with multi-level
+	 * batching it may happen that (rescanned/scanned > 1.00). */
+	int64		ntuples_scanned;	/* number of input tuples scanned */
+	int64		ntuples_rescanned;	/* number of tuples saved/read */
+
 } AggState;
 
 /* ----------------
