From f4065c1cae848812515f028dfb90e05f5fbe53fc Mon Sep 17 00:00:00 2001 From: Nadav Shatz Date: Tue, 6 Jan 2026 12:41:50 +0200 Subject: [PATCH] feat(load_balance): add in-memory table mutation tracking Introduces 'dml_adaptive_global' as a new value for disable_load_balance_on_write. This mode is a superset of dml_adaptive: it performs per-transaction local tracking AND cross-session shared-memory tracking of recently written tables, routing reads to primary until a TTL (based on measured replication delay) expires. Sub-parameters (track_table_mutation_*) control TTL factor, cold start duration, hash table sizing, and query parse cache sizing. diff --git a/doc/src/sgml/loadbalance.sgml b/doc/src/sgml/loadbalance.sgml index ee19fabebab2210cd4abe59a711a036ac0ac8943..1838a57913e9acb933bfcbf70cce32122740a490 100644 --- a/doc/src/sgml/loadbalance.sgml +++ b/doc/src/sgml/loadbalance.sgml @@ -1108,6 +1108,18 @@ app_name_redirect_preference_list > database_redirect_preference_list > us Dependent functions, triggers, and views on the tables can be configured using + + + If this parameter is set to dml_adaptive_global, + Pgpool-II behaves like dml_adaptive + (per-transaction write tracking) and additionally uses shared memory to track + recently written tables across all sessions cluster-wide. When a table is + written in any session, subsequent reads of that table from any session are + routed to primary until a TTL (based on measured replication delay) expires. + This prevents stale reads after writes even across different connections. + See for the sub-parameters + that control the shared-memory tracking behavior. + @@ -1193,4 +1205,321 @@ dml_adaptive_object_relationship_list = 'table_1:table_2' + + + Table Mutation Map Configuration (Lagless Replica Reads) + + + These parameters configure the track table mutation feature, which is activated by setting + to dml_adaptive_global. + The feature tracks recently written tables to prevent stale reads from replica nodes during + replication lag, implementing the "lagless" architecture pattern for distributed systems + with read replicas. + + + + When a table is modified (INSERT/UPDATE/DELETE), it is marked as "stale" for a TTL period + (replication_delay * track_table_mutation_ttl_factor). Any SELECT queries on stale tables are routed + to the primary node instead of replicas, ensuring read-after-write consistency. + + + + This feature requires to be configured + for monitoring replication delay from replicas. + + + + + Enabling dml_adaptive_global increases shared memory consumption. With default settings, + the feature requires approximately 6.4 MB of shared memory (0.1 MB for table tracking + 6.3 MB for query cache). + Memory usage scales with configuration parameters: + + + + + Table tracking: track_table_mutation_table_size * 40 bytes (default: 2048 * 40 = ~80 KB) + + + + + Query cache: track_table_mutation_query_parse_cache_size * 640 bytes (default: 10000 * 640 = ~6.3 MB) + + + + + For high-traffic systems with large cache sizes (e.g., track_table_mutation_query_parse_cache_size = 100000), + memory usage can reach 64 MB or more. Consider your system's available shared memory when using dml_adaptive_global. + + + + + + + track_table_mutation_ttl_factor (floating point) + + track_table_mutation_ttl_factor configuration parameter + + + + + Multiplier for calculating the TTL: TTL = replication_delay * track_table_mutation_ttl_factor. + Higher values provide more safety margin but may reduce read replica utilization. + + + Valid range: 1.0-100.0. Default is 5.0. + This parameter can be changed by reloading the Pgpool-II configurations. + + + + + + track_table_mutation_max_staleness (integer) + + track_table_mutation_max_staleness configuration parameter + + + + + Maximum duration in milliseconds that a single table entry can continuously force queries to primary, + measured from when the table was first marked stale. When this cap is reached, the entry is expired + regardless of recent writes. If the table is written to again after expiry, a fresh tracking entry + is created. + + + This parameter bounds the cross-session impact of table mutation tracking. Even if a table is written + to in a tight loop, its effect on other sessions' load balancing is limited to this duration. For + legitimately busy tables, the gap between forced expiry and the next write re-marking the table is + negligible (typically milliseconds). + + + Set to 0 to disable the cap (not recommended for production). + Valid range: 0-3600000 ms. Default is 60000 (60 seconds). + This parameter can be changed by reloading the Pgpool-II configurations. + + + + + + track_table_mutation_cold_start_duration (integer) + + track_table_mutation_cold_start_duration configuration parameter + + + + + Duration in milliseconds to route all queries to primary after a child process starts. + This prevents stale reads when a new connection is established before the track table mutation + is populated with recent write history. + + + When watchdog is enabled and the local node becomes the leader, Pgpool-II also triggers a + global cold start for this duration to avoid stale reads after leadership changes. + + + Valid range: 0-60000 ms. Default is 2000 (2 seconds). + Set to 0 to disable cold start behavior. + This parameter can be changed by reloading the Pgpool-II configurations. + + + + + + track_table_mutation_table_buckets (integer) + + track_table_mutation_table_buckets configuration parameter + + + + + Number of hash buckets for the track table mutation hash table. + Higher values reduce hash collisions and improve lookup performance. + + + Valid range: 64-65536. Default is 1024. + This parameter can only be set at server start. + + + + + + track_table_mutation_table_size (integer) + + track_table_mutation_table_size configuration parameter + + + + + Maximum number of tables that can be tracked simultaneously in the track table mutation. + When full, oldest entries are evicted using a simple eviction strategy. + + + Valid range: 128-131072. Default is 2048. + Memory usage: approximately 40 bytes per entry. + This parameter can only be set at server start. + + + + + + track_table_mutation_query_buckets (integer) + + track_table_mutation_query_buckets configuration parameter + + + + + Number of hash buckets for the query parse cache. The cache stores normalized + query strings mapped to their table dependencies to avoid repeated parsing. + + + Valid range: 64-65536. Default is 2048. + This parameter can only be set at server start. + + + + + + track_table_mutation_query_parse_cache_size (integer) + + track_table_mutation_query_parse_cache_size configuration parameter + + + + + Maximum number of query parse results to cache. Uses LRU eviction when full. + Larger caches reduce parsing overhead but consume more shared memory. + + + Valid range: 100-1000000. Default is 10000. + Memory usage: approximately 640 bytes per entry (~6.3 MB for default, ~64 MB for 100000 entries). + This parameter can only be set at server start. + + + + + + + + Track Table Mutation Configuration Example + + To enable track table mutation with replication delay monitoring: + + +# Enable dml_adaptive_global mode (includes track table mutation) +disable_load_balance_on_write = 'dml_adaptive_global' +track_table_mutation_ttl_factor = 5.0 +track_table_mutation_max_staleness = 60000 +track_table_mutation_cold_start_duration = 2000 + +# Configure external replication delay monitoring +replication_delay_source_cmd = '/path/to/get-replication-delay.sh' +replication_delay_source_timeout = 10 + +# Adjust cache sizes based on workload (increases memory usage) +track_table_mutation_table_size = 4096 # Track up to 4096 tables (~160 KB) +track_table_mutation_query_parse_cache_size = 50000 # Cache 50k queries (~31 MB) + + + Total shared memory required for above configuration: approximately 31.2 MB (31 MB query cache + 0.2 MB table map + overhead). + Default configuration (10000 query cache entries, 2048 tables) requires approximately 6.4 MB. + + + + + Limitations + + The track table mutation feature has the following limitation: + + + + + PREPARE statements are not tracked. When a prepared statement + containing data modification is executed, the table mutation is not recorded. + + + + + If your application uses prepared statements and requires read-after-write consistency, + consider using explicit transaction routing or the /*NO LOAD BALANCE*/ + comment directive for affected queries. + + + The following statement types are tracked and will mark tables as stale: + + + + + INSERT, UPDATE, DELETE + statements (including those with RETURNING clauses). + + + + + TRUNCATE statements (including multiple tables). + + + + + MERGE statements (PostgreSQL 15+). + + + + + WITH clauses containing data modifications (Common Table Expressions + with INSERT, UPDATE, or DELETE). + For example, WITH deleted AS (DELETE FROM t1 RETURNING *) SELECT * FROM deleted + will properly mark table t1 as stale. + + + + + Transaction Rollback Behavior: Within explicit transactions, tables + are only marked as stale in shared memory when the transaction is committed. If the + transaction is rolled back, no tables are marked, since no actual data modification + occurred on replicas. This prevents rolled-back transactions from unnecessarily + disabling load balancing. For autocommit statements (outside explicit transactions), + tables are marked immediately upon command completion. + + + + Cross-Session Impact and Safety Bounds: + Unlike dml_adaptive (which only affects the session that issued the write), + dml_adaptive_global affects all sessions reading the same table in the same database. + The following safety mechanisms bound this cross-session impact: + + + + + Maximum staleness cap: The + parameter (default: 60 seconds) limits how long any single table entry can continuously force primary + routing. Even under sustained writes, the entry expires after this period and is only renewed by + subsequent committed writes. + + + + + Database isolation: Table staleness tracking is scoped by database OID. Writes + in one database never affect load balancing decisions for sessions connected to a different database. + In multi-tenant deployments where tenants use separate databases, one tenant's write activity cannot + influence another tenant's query routing. + + + + + Committed writes only: Only committed transactions mark tables as stale. + Rolled-back transactions have no effect on the shared tracking state. + + + + + Bounded table map size: The shared memory table map has a fixed maximum size + (). At most this many tables can be marked stale + simultaneously, providing a natural ceiling on the feature's impact. + + + + + + + diff --git a/src/Makefile.am b/src/Makefile.am index 4678ab53055e828a37b6477801640aff17ff84a7..39588af58deba045dffc01ae932115b8a9dbfcf2 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -35,6 +35,7 @@ pgpool_SOURCES = main/main.c \ rewrite/pool_timestamp.c \ rewrite/pool_lobj.c \ utils/pool_select_walker.c \ + utils/pool_track_table_mutation.c \ utils/strlcpy.c \ utils/psprintf.c \ utils/pool_params.c \ diff --git a/src/config/pool_config_variables.c b/src/config/pool_config_variables.c index ce13c42f6a81cbecd87ef35c5507d0ff2d7a7f85..a6b909d427f30366fab7325fa2169068c489a263 100644 --- a/src/config/pool_config_variables.c +++ b/src/config/pool_config_variables.c @@ -290,6 +290,7 @@ static const struct config_enum_entry disable_load_balance_on_write_options[] = {"trans_transaction", DLBOW_TRANS_TRANSACTION, false}, {"always", DLBOW_ALWAYS, false}, {"dml_adaptive", DLBOW_DML_ADAPTIVE, false}, + {"dml_adaptive_global", DLBOW_DML_ADAPTIVE_GLOBAL, false}, {NULL, 0, false} }; @@ -1777,6 +1778,19 @@ static struct config_int_array ConfigureNamesIntArray[] = static struct config_double ConfigureNamesDouble[] = { + { + {"track_table_mutation_ttl_factor", + CFGCXT_RELOAD, LOAD_BALANCE_CONFIG, + "TTL multiplier for track table mutation " + "(TTL = replication_delay * factor)", + CONFIG_VAR_TYPE_DOUBLE, false, 0 + }, + &g_pool_config.track_table_mutation_ttl_factor, + 5.0, /* boot value: 5x replication delay */ + 1.0, 100.0, /* min, max */ + NULL, NULL, NULL + }, + /* End-of-list marker */ EMPTY_CONFIG_DOUBLE }; @@ -2397,6 +2411,81 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"track_table_mutation_max_staleness", + CFGCXT_RELOAD, LOAD_BALANCE_CONFIG, + "Maximum duration in milliseconds that a " + "table can be marked stale from its first " + "write. 0 disables the cap.", + CONFIG_VAR_TYPE_INT, false, GUC_UNIT_MS + }, + &g_pool_config.track_table_mutation_max_staleness, + 60000, /* 60 seconds */ + 0, 3600000, /* 0 to 1 hour */ + NULL, NULL, NULL + }, + + { + {"track_table_mutation_cold_start_duration", + CFGCXT_RELOAD, LOAD_BALANCE_CONFIG, + "Duration in milliseconds to force queries " + "to primary after child process starts.", + CONFIG_VAR_TYPE_INT, false, GUC_UNIT_MS + }, + &g_pool_config.track_table_mutation_cold_start_duration, + 2000, /* 2 seconds */ + 0, 60000, /* 0 to 60 seconds */ + NULL, NULL, NULL + }, + + { + {"track_table_mutation_table_buckets", + CFGCXT_INIT, LOAD_BALANCE_CONFIG, + "Number of hash buckets for track table mutation.", + CONFIG_VAR_TYPE_INT, false, 0 + }, + &g_pool_config.track_table_mutation_table_buckets, + 1024, + 64, 65536, + NULL, NULL, NULL + }, + + { + {"track_table_mutation_table_size", + CFGCXT_INIT, LOAD_BALANCE_CONFIG, + "Maximum number of entries in track table mutation.", + CONFIG_VAR_TYPE_INT, false, 0 + }, + &g_pool_config.track_table_mutation_table_size, + 2048, + 128, 131072, + NULL, NULL, NULL + }, + + { + {"track_table_mutation_query_buckets", + CFGCXT_INIT, LOAD_BALANCE_CONFIG, + "Number of hash buckets for query parse cache.", + CONFIG_VAR_TYPE_INT, false, 0 + }, + &g_pool_config.track_table_mutation_query_buckets, + 2048, + 64, 65536, + NULL, NULL, NULL + }, + + { + {"track_table_mutation_query_parse_cache_size", + CFGCXT_INIT, LOAD_BALANCE_CONFIG, + "Maximum number of entries in query parse cache.", + CONFIG_VAR_TYPE_INT, false, 0 + }, + &g_pool_config.track_table_mutation_query_parse_cache_size, + 10000, + 100, 1000000, + NULL, NULL, NULL + }, + /* End-of-list marker */ EMPTY_CONFIG_INT }; diff --git a/src/context/pool_query_context.c b/src/context/pool_query_context.c index 7cf9813eb7d58678bc86a0aaa38bd3c6445b6687..683b0ec66fabd708a5a61a54ba0697bf869ecafe 100644 --- a/src/context/pool_query_context.c +++ b/src/context/pool_query_context.c @@ -29,6 +29,7 @@ #include "utils/statistics.h" #include "utils/pool_select_walker.h" #include "utils/pool_stream.h" +#include "utils/pool_track_table_mutation.h" #include "context/pool_session_context.h" #include "context/pool_query_context.h" #include "parser/nodes.h" @@ -1828,15 +1829,23 @@ is_in_list(char *name, List *list) static bool is_select_object_in_temp_write_list(Node *node, void *context) { - if (node == NULL || pool_config->disable_load_balance_on_write != DLBOW_DML_ADAPTIVE) + if (node == NULL || + !DLBOW_IS_DML_ADAPTIVE( + pool_config->disable_load_balance_on_write)) return false; if (IsA(node, RangeVar)) { RangeVar *rgv = (RangeVar *) node; - POOL_SESSION_CONTEXT *session_context = pool_get_session_context(false); + POOL_SESSION_CONTEXT *session_context; + bool is_adaptive; + + session_context = pool_get_session_context(false); + is_adaptive = DLBOW_IS_DML_ADAPTIVE( + pool_config->disable_load_balance_on_write); - if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE && session_context->is_in_transaction) + if (is_adaptive && + session_context->is_in_transaction) { ereport(DEBUG1, (errmsg("is_select_object_in_temp_write_list: \"%s\", found relation \"%s\"", (char *) context, rgv->relname))); @@ -1880,7 +1889,13 @@ static char *get_associated_object_from_dml_adaptive_relations void check_object_relationship_list(char *name, bool is_func_name) { - if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE && pool_config->parsed_dml_adaptive_object_relationship_list) + bool is_adaptive; + + is_adaptive = DLBOW_IS_DML_ADAPTIVE( + pool_config->disable_load_balance_on_write); + + if (is_adaptive && + pool_config->parsed_dml_adaptive_object_relationship_list) { POOL_SESSION_CONTEXT *session_context = pool_get_session_context(false); @@ -1944,7 +1959,7 @@ add_object_into_temp_write_list(Node *node, void *context) static void dml_adaptive(Node *node, char *query) { - if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE) + if (DLBOW_IS_DML_ADAPTIVE(pool_config->disable_load_balance_on_write)) { /* Set/Unset transaction status flags */ if (IsA(node, TransactionStmt)) @@ -1963,6 +1978,46 @@ dml_adaptive(Node *node, char *query) } else if (is_commit_or_rollback_query(node)) { + /* + * For dml_adaptive_global: on COMMIT, flush + * the accumulated table writes to shared + * memory. On ROLLBACK, skip -- the writes + * never committed so no stale-read risk + * exists. This prevents polluting the table + * map with rolled-back transactions. + */ + int dlbow = + pool_config->disable_load_balance_on_write; + List *wlist = + session_context->transaction_temp_write_list; + + if (dlbow == DLBOW_DML_ADAPTIVE_GLOBAL && + is_commit_query(node) && + wlist != NIL) + { + ListCell *cell; + int dboid; + + dboid = + pool_track_table_mutation_get_database_oid(); + if (dboid > 0) + { + foreach(cell, wlist) + { + char *tname; + int toid; + + tname = (char *) lfirst(cell); + toid = + pool_table_name_to_oid(tname); + + if (toid > 0) + pool_track_table_mutation_mark_table_written( + toid, dboid); + } + } + } + session_context->is_in_transaction = false; if (session_context->transaction_temp_write_list != NIL) @@ -2010,6 +2065,20 @@ where_to_send_main_replica(POOL_QUERY_CONTEXT *query_context, char *query, Node if (dest == POOL_PRIMARY) { pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID); + + /* + * Resolve table and database OIDs now to populate relcache. + * This avoids potential hangs in CommandComplete where we shouldn't + * be running new queries against the backend. + */ + if (pool_config->disable_load_balance_on_write == + DLBOW_DML_ADAPTIVE_GLOBAL) + { + int *oids; + + pool_extract_table_oids(node, &oids); + pool_track_table_mutation_get_database_oid(); + } } /* Should be sent to both primary and standby? */ else if (dest == POOL_BOTH) @@ -2139,6 +2208,154 @@ where_to_send_main_replica(POOL_QUERY_CONTEXT *query_context, char *query, Node { pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID); } + /* + * Check track table mutation for recently + * written tables. If in cold start or any + * table was recently written, route to + * primary to avoid stale reads. + */ + else if (pool_config->disable_load_balance_on_write == + DLBOW_DML_ADAPTIVE_GLOBAL) + { + bool force_primary = false; + int lb_node; + POOL_QUERY_CONTEXT *qctx = + session_context->query_context; + + if (pool_track_table_mutation_in_cold_start()) + { + ereport(DEBUG1, + (errmsg("could not load balance" + " because of track table" + " mutation cold start"), + errdetail("destination = PRIMARY" + " for query= \"%s\"", + query))); + force_primary = true; + } + else + { + SelectContext ctx; + int dboid; + int num_oids; + int i; + + memset(&ctx, 0, sizeof(ctx)); + num_oids = + pool_extract_table_oids_from_select_stmt( + node, &ctx); + if (num_oids > 0) + { + dboid = + pool_track_table_mutation_get_database_oid(); + + if (dboid <= 0) + { + ereport(DEBUG1, + (errmsg("could not load" + " balance because" + " database oid was" + " unavailable"), + errdetail("destination" + " = PRIMARY for" + " query= \"%s\"", + query))); + force_primary = true; + } + else + { + for (i = 0; i < num_oids; i++) + { + bool stale; + + stale = + pool_track_table_mutation_table_is_stale( + ctx.table_oids[i], + dboid); + if (stale) + { + ereport(DEBUG1, + (errmsg("could not load" + " balance because" + " table \"%s\" was" + " recently written", + ctx.table_names[i]), + errdetail("destination" + " = PRIMARY for" + " query= \"%s\"", + query))); + force_primary = true; + break; + } + } + } + } + } + + if (force_primary) + { + pool_set_node_to_be_sent( + query_context, + PRIMARY_NODE_ID); + } + else + { + if (pool_config->statement_level_load_balance) + { + session_context->load_balance_node_id = + select_load_balancing_node(); + } + + /* + * If replication delay is too much, + * and prefer_lower_delay_standby is + * true then elect the lowest-delayed + * node, otherwise send to primary. + */ + lb_node = + session_context->load_balance_node_id; + if (STREAM && + check_replication_delay(lb_node)) + { + ereport(DEBUG1, + (errmsg("could not load" + " balance because of" + " too much replication" + " delay"), + errdetail("destination" + " = %d for" + " query= \"%s\"", + dest, query))); + + if (pool_config->prefer_lower_delay_standby) + { + lb_node = + select_load_balancing_node(); + session_context->load_balance_node_id = + lb_node; + qctx->load_balance_node_id = + lb_node; + pool_set_node_to_be_sent( + query_context, + lb_node); + } + else + { + pool_set_node_to_be_sent( + query_context, + PRIMARY_NODE_ID); + } + } + else + { + qctx->load_balance_node_id = + session_context->load_balance_node_id; + pool_set_node_to_be_sent( + query_context, + qctx->load_balance_node_id); + } + } + } else { if (pool_config->statement_level_load_balance) diff --git a/src/context/pool_session_context.c b/src/context/pool_session_context.c index ded41c7fc64ceba1d1fafd6f4a9f10a750872374..3ebd68e105adc4e94fd8ef96c871d4b04bed8ae0 100644 --- a/src/context/pool_session_context.c +++ b/src/context/pool_session_context.c @@ -532,7 +532,7 @@ dump_sent_message(char *caller, POOL_SENT_MESSAGE *m) static void dml_adaptive_init(void) { - if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE) + if (DLBOW_IS_DML_ADAPTIVE(pool_config->disable_load_balance_on_write)) { session_context->is_in_transaction = false; session_context->transaction_temp_write_list = NIL; @@ -542,7 +542,9 @@ dml_adaptive_init(void) static void dml_adaptive_destroy(void) { - if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE && session_context) + if (DLBOW_IS_DML_ADAPTIVE( + pool_config->disable_load_balance_on_write) && + session_context) { if (session_context->transaction_temp_write_list != NIL) list_free_deep(session_context->transaction_temp_write_list); @@ -738,10 +740,13 @@ void pool_set_writing_transaction(void) { /* - * If disable_transaction_on_write is 'off' or 'dml_adaptive', then never - * turn on writing transaction flag. + * If disable_load_balance_on_write is 'off' or 'dml_adaptive' or + * 'dml_adaptive_global', then never turn on writing transaction flag. */ - if (pool_config->disable_load_balance_on_write != DLBOW_OFF && pool_config->disable_load_balance_on_write != DLBOW_DML_ADAPTIVE) + if (pool_config->disable_load_balance_on_write != + DLBOW_OFF && + !DLBOW_IS_DML_ADAPTIVE( + pool_config->disable_load_balance_on_write)) { pool_get_session_context(false)->writing_transaction = true; ereport(DEBUG5, diff --git a/src/include/pool.h b/src/include/pool.h index ea6f87e120af866b8ed3a15790d9d8a8e009fe91..7168c1aea877856b5978de332ad636325eb9c30c 100644 --- a/src/include/pool.h +++ b/src/include/pool.h @@ -424,7 +424,7 @@ typedef enum #define Min(x, y) ((x) < (y) ? (x) : (y)) -#define MAX_NUM_SEMAPHORES 8 +#define MAX_NUM_SEMAPHORES 10 #define CONN_COUNTER_SEM 0 #define REQUEST_INFO_SEM 1 #define QUERY_CACHE_STATS_SEM 2 @@ -434,6 +434,8 @@ typedef enum #define FOLLOW_PRIMARY_SEM 6 #define MAIN_EXIT_HANDLER_SEM 7 /* used in exit_hander in pgpool main * process */ +#define TRACK_TABLE_MUTATION_TABLE_SEM 8 +#define TRACK_TABLE_MUTATION_QUERY_SEM 9 #define MAX_REQUEST_QUEUE_SIZE 10 #define MAX_SEC_WAIT_FOR_CLUSTER_TRANSACTION 10 /* time in seconds to keep diff --git a/src/include/pool_config.h b/src/include/pool_config.h index 9a397d1666b408dc16dad743a955a718ccbf23f5..c1e6ecc6f0ce62b2fa9d7560f0a199b40126908e 100644 --- a/src/include/pool_config.h +++ b/src/include/pool_config.h @@ -105,9 +105,13 @@ typedef enum DLBOW_OPTION DLBOW_TRANSACTION, DLBOW_TRANS_TRANSACTION, DLBOW_ALWAYS, - DLBOW_DML_ADAPTIVE + DLBOW_DML_ADAPTIVE, + DLBOW_DML_ADAPTIVE_GLOBAL } DLBOW_OPTION; +#define DLBOW_IS_DML_ADAPTIVE(opt) \ + ((opt) == DLBOW_DML_ADAPTIVE || (opt) == DLBOW_DML_ADAPTIVE_GLOBAL) + typedef enum RELQTARGET_OPTION { RELQTARGET_PRIMARY = 1, @@ -365,6 +369,24 @@ typedef struct * replication check */ char *replication_delay_source_cmd; /* external command for replication delay */ int replication_delay_source_timeout; /* timeout for external command in seconds */ + + /* Track table mutation configuration */ + double track_table_mutation_ttl_factor; /* TTL multiplier for + * replication delay */ + int track_table_mutation_max_staleness; /* max staleness + * duration ms */ + int track_table_mutation_cold_start_duration; /* cold start + * duration ms */ + int track_table_mutation_table_buckets; /* hash buckets for + * table map */ + int track_table_mutation_table_size; /* max table map + * entries */ + int track_table_mutation_query_buckets; /* hash buckets for + * query cache */ + int track_table_mutation_query_parse_cache_size; /* max query + * cache + * entries */ + char *failover_command; /* execute command when failover happens */ char *follow_primary_command; /* execute command when failover is * ended */ diff --git a/src/include/utils/pool_track_table_mutation.h b/src/include/utils/pool_track_table_mutation.h new file mode 100644 index 0000000000000000000000000000000000000000..b0de2d8093430500c9c1796c418c9bd1d0edd4b3 --- /dev/null +++ b/src/include/utils/pool_track_table_mutation.h @@ -0,0 +1,245 @@ +/* -*-pgsql-c-*- */ +/* + * pgpool: a language independent connection pool server for PostgreSQL + * written by Tatsuo Ishii + * + * Copyright (c) 2003-2026 PgPool Global Development Group + * + * Permission to use, copy, modify, and distribute this software and + * its documentation for any purpose and without fee is hereby + * granted, provided that the above copyright notice appear in all + * copies and that both that copyright notice and this permission + * notice appear in supporting documentation, and that the name of the + * author not be used in advertising or publicity pertaining to + * distribution of the software without specific, written prior + * permission. The author makes no representations about the + * suitability of this software for any purpose. It is provided "as + * is" without express or implied warranty. + * + * pool_track_table_mutation.h: In-memory tracking of + * recently written tables to prevent stale reads. + */ + +#ifndef POOL_TRACK_TABLE_MUTATION_H +#define POOL_TRACK_TABLE_MUTATION_H + +#include "pool.h" +#include + +/* + * Maximum table name length including schema: "schema"."table" + * Using NAMEDATALEN * 2 + 4 for quotes and dot + */ +#define TRACK_TABLE_MUTATION_TABLE_NAME_LEN (NAMEDATALEN * 2 + 4) + +/* + * Maximum number of tables we track per query + */ +#define TRACK_TABLE_MUTATION_MAX_TABLES_PER_QUERY 8 + +/* + * Invalid index marker for linked lists + */ +#define TRACK_TABLE_MUTATION_INVALID_INDEX (-1) + +/* + * Default TTL in microseconds (100ms) used when replication delay is unknown + */ +#define TRACK_TABLE_MUTATION_DEFAULT_TTL_US (100 * 1000) + +/* + * Entry in the table mutation hash table (keyed by table/database oids) + */ +typedef struct TrackTableMutationEntry +{ + int table_oid; /* Table oid */ + int dboid; /* Database oid */ + struct timeval first_write_time; /* When the entry was first created */ + struct timeval last_write_time; /* When the table was last written */ + uint32 hash; /* Pre-computed hash value */ + int next; /* Next in collision chain */ + bool in_use; /* Is this entry in use? */ +} TrackTableMutationEntry; + +/* + * Header for the table mutation hash table in shared memory + */ +typedef struct TrackTableMutationHashTable +{ + int num_buckets; /* Number of hash buckets */ + int max_entries; /* Maximum entries allowed */ + int num_entries; /* Current number of entries */ + int free_list_head; /* Head of free entry list */ + /* Flexible array members follow in shared memory: + * int buckets[num_buckets]; + * TrackTableMutationEntry entries[max_entries]; + */ +} TrackTableMutationHashTable; + +/* + * Entry in the query parse cache + */ +typedef struct QueryParseEntry +{ + uint64 query_hash; /* Hash of normalized query */ + bool is_write; /* True if INSERT/UPDATE/DELETE */ + int num_tables; /* Number of tables in query */ + char table_names + [TRACK_TABLE_MUTATION_MAX_TABLES_PER_QUERY] + [TRACK_TABLE_MUTATION_TABLE_NAME_LEN]; + int next; /* Next entry in collision chain */ + int lru_prev; /* Previous in LRU list */ + int lru_next; /* Next in LRU list */ + bool in_use; /* Is this entry in use? */ +} QueryParseEntry; + +/* + * Header for the query parse cache in shared memory + */ +typedef struct QueryParseCache +{ + int num_buckets; /* Number of hash buckets */ + int max_entries; /* Maximum entries allowed */ + int num_entries; /* Current number of entries */ + int free_list_head; /* Head of free entry list */ + int lru_head; /* Most recently used */ + int lru_tail; /* Least recently used */ + /* Flexible array members follow in shared memory: + * int buckets[num_buckets]; + * QueryParseEntry entries[max_entries]; + */ +} QueryParseCache; + +/* + * Global state for track table mutation feature + */ +typedef struct TrackTableMutationState +{ + bool initialized; /* Shmem initialized? */ + uint64 current_ttl_us; /* Current TTL in microseconds */ + struct timeval ttl_last_updated; /* When TTL was last updated */ + struct timeval last_cleanup_time; /* When last expired cleanup ran */ + struct timeval global_cold_start_until; /* Global cold start end time */ + uint32 stats_queries_checked; /* Queries checked */ + uint32 stats_forced_primary; /* Forced to primary */ + uint32 stats_allowed_replica; /* Allowed to replica */ +} TrackTableMutationState; + +/* + * Main shared memory structure containing all components + */ +typedef struct TrackTableMutationShmem +{ + TrackTableMutationState state; + TrackTableMutationHashTable *table_map; + QueryParseCache *query_cache; +} TrackTableMutationShmem; + +/* ---------------- + * Public API functions + * ---------------- + */ + +/* + * Initialize shared memory structures for track table mutation. + * Called from pgpool_main.c after pool_init_pool_info(). + */ +extern void pool_track_table_mutation_init(void); + +/* + * Initialize per-child process state for track table mutation. + * Called from child.c when a new child process starts. + * Sets up cold start tracking. + */ +extern void pool_track_table_mutation_child_init(void); + +/* + * Check if the child process is in cold start period. + * During cold start, all queries are routed to primary. + * Returns true if in cold start, false otherwise. + */ +extern bool pool_track_table_mutation_in_cold_start(void); + +/* + * Trigger a global cold start period for all processes. + * Used after watchdog leader change to avoid stale reads. + */ +extern void pool_track_table_mutation_trigger_global_cold_start(void); + +/* + * Get oid of current database. + */ +extern int pool_track_table_mutation_get_database_oid(void); + +/* + * Check if a table was recently written to (is "stale"). + * If stale, reads from this table should go to primary. + * Returns true if table is stale (recently written), false otherwise. + */ +extern bool pool_track_table_mutation_table_is_stale( + int table_oid, int dboid); + +/* + * Mark tables as recently written. + * Called after INSERT/UPDATE/DELETE queries complete. + * table_oids: array of table oids + * num_tables: number of tables in array + * dboid: database oid + */ +extern void pool_track_table_mutation_mark_tables_written( + const int *table_oids, int num_tables, int dboid); + +/* + * Convenience function to mark a single table as written. + * table_oid: table oid + * dboid: database oid + */ +extern void pool_track_table_mutation_mark_table_written( + int table_oid, int dboid); + +/* + * Update the TTL based on current replication delay. + * Called from pool_worker_child.c when replication delay is updated. + * delay_us: replication delay in microseconds + */ +extern void pool_track_table_mutation_update_ttl(uint64 delay_us); + +/* + * Look up cached parse result for a query. + * hash: hash of normalized query + * is_write: output - true if query is a write + * table_names: output - array to fill with table names + * num_tables: output - number of tables found + * Returns true if found in cache, false otherwise. + */ +extern bool pool_track_table_mutation_get_cached_parse( + uint64 hash, bool *is_write, + char table_names[][TRACK_TABLE_MUTATION_TABLE_NAME_LEN], + int *num_tables); + +/* + * Cache a parse result for a query. + * hash: hash of normalized query + * is_write: true if query is a write + * table_names: array of table names + * num_tables: number of tables + */ +extern void pool_track_table_mutation_cache_parse( + uint64 hash, bool is_write, + const char table_names[][TRACK_TABLE_MUTATION_TABLE_NAME_LEN], + int num_tables); + +/* + * Normalize a query and compute its hash. + * Strips comments, normalizes whitespace and literals. + * query: input SQL query string + * Returns: 64-bit hash of normalized query + */ +extern uint64 pool_track_table_mutation_normalize_and_hash(const char *query); + +/* + * Calculate required shared memory size for track table mutation. + */ +extern Size pool_track_table_mutation_shmem_size(void); + +#endif /* POOL_TRACK_TABLE_MUTATION_H */ diff --git a/src/main/pgpool_main.c b/src/main/pgpool_main.c index fa05e15e7ac435e072298063f918c70aa4e5680c..395191a1c53a1d76438ce52148375ea89f4f32cf 100644 --- a/src/main/pgpool_main.c +++ b/src/main/pgpool_main.c @@ -57,6 +57,7 @@ #include "auth/pool_passwd.h" #include "auth/pool_hba.h" #include "query_cache/pool_memqcache.h" +#include "utils/pool_track_table_mutation.h" #include "watchdog/wd_internal_commands.h" #include "watchdog/wd_lifecheck.h" #include "watchdog/watchdog.h" @@ -1485,11 +1486,14 @@ sigusr1_interrupt_processor(void) if (user1SignalSlot->signalFlags[SIG_WATCHDOG_STATE_CHANGED]) { + WD_STATES wd_state; + ereport(LOG, (errmsg("Pgpool-II parent process received watchdog state change signal from watchdog"))); user1SignalSlot->signalFlags[SIG_WATCHDOG_STATE_CHANGED] = false; - if (wd_internal_get_watchdog_local_node_state() == WD_STANDBY) + wd_state = wd_internal_get_watchdog_local_node_state(); + if (wd_state == WD_STANDBY) { ereport(LOG, (errmsg("we have joined the watchdog cluster as STANDBY node"), @@ -1503,6 +1507,12 @@ sigusr1_interrupt_processor(void) */ pool_release_follow_primary_lock(true); } + else if (wd_state == WD_COORDINATOR && + pool_config->disable_load_balance_on_write == + DLBOW_DML_ADAPTIVE_GLOBAL) + { + pool_track_table_mutation_trigger_global_cold_start(); + } } if (user1SignalSlot->signalFlags[SIG_FAILOVER_INTERRUPT]) { @@ -3068,6 +3078,16 @@ initialize_shared_mem_objects(bool clear_memcache_oidmaps) elog(DEBUG1, "watchdog: %zu bytes requested for shared memory", MAXALIGN(wd_ipc_get_shared_mem_size())); } + if (pool_config->disable_load_balance_on_write == + DLBOW_DML_ADAPTIVE_GLOBAL) + { + size += MAXALIGN(pool_track_table_mutation_shmem_size()); + elog(DEBUG1, + "track_table_mutation: %zu bytes requested" + " for shared memory", + MAXALIGN(pool_track_table_mutation_shmem_size())); + } + initialize_shared_memory_main_segment(size); /* Move the backend descriptors to shared memory */ @@ -3184,6 +3204,13 @@ initialize_shared_mem_objects(bool clear_memcache_oidmaps) wd_ipc_initialize_data(); } + /* Initialize track table mutation for recently written tables */ + if (pool_config->disable_load_balance_on_write == + DLBOW_DML_ADAPTIVE_GLOBAL) + { + pool_track_table_mutation_init(); + } + } /* diff --git a/src/protocol/CommandComplete.c b/src/protocol/CommandComplete.c index a3b8f0ea194ffecc79e58566be80562a46eb75ab..a4ec83f938c74a339ab6a1b8bca2dc547cc5c219 100644 --- a/src/protocol/CommandComplete.c +++ b/src/protocol/CommandComplete.c @@ -38,6 +38,8 @@ #include "utils/palloc.h" #include "utils/memutils.h" #include "utils/pool_stream.h" +#include "utils/pool_track_table_mutation.h" +#include "query_cache/pool_memqcache.h" static int extract_ntuples(char *message); static POOL_STATUS handle_mismatch_tuples(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *packet, int packetlen, bool command_complete); @@ -304,6 +306,33 @@ handle_query_context(POOL_CONNECTION_POOL *backend) node = session_context->query_context->parse_tree; + /* + * Track table writes for dml_adaptive_global feature. + * For autocommit statements (not in explicit transaction), mark tables + * immediately. For explicit transactions, marking is deferred to COMMIT + * in dml_adaptive() so that ROLLBACKed writes don't pollute the shared + * memory table map. + */ + if (pool_config->disable_load_balance_on_write == + DLBOW_DML_ADAPTIVE_GLOBAL && + node != NULL && + !session_context->is_in_transaction) + { + int *oids; + int num_oids; + + num_oids = pool_extract_table_oids(node, &oids); + if (num_oids > 0) + { + int dboid; + + dboid = pool_track_table_mutation_get_database_oid(); + if (dboid > 0) + pool_track_table_mutation_mark_tables_written( + oids, num_oids, dboid); + } + } + if (IsA(node, PrepareStmt)) { if (session_context->uncompleted_message) diff --git a/src/protocol/child.c b/src/protocol/child.c index c34f057281be62feaf39db1bb605062f56dc398c..316b76239d163bfdb428f03446384059261f34be 100644 --- a/src/protocol/child.c +++ b/src/protocol/child.c @@ -57,6 +57,7 @@ #include "utils/elog.h" #include "utils/ps_status.h" #include "utils/timestamp.h" +#include "utils/pool_track_table_mutation.h" #include "context/pool_process_context.h" #include "context/pool_session_context.h" @@ -213,6 +214,13 @@ do_child(int *fds) /* Initialize per process context */ pool_init_process_context(); + /* Initialize track table mutation child state for cold start tracking */ + if (pool_config->disable_load_balance_on_write == + DLBOW_DML_ADAPTIVE_GLOBAL) + { + pool_track_table_mutation_child_init(); + } + /* initialize connection pool */ if (pool_init_cp()) { diff --git a/src/protocol/pool_proto_modules.c b/src/protocol/pool_proto_modules.c index f9458bb557acb8128a6f0d3411d4f08c1f598c29..706abff5bdbd24fee407ee2a82e8911f74695ea6 100644 --- a/src/protocol/pool_proto_modules.c +++ b/src/protocol/pool_proto_modules.c @@ -1461,7 +1461,9 @@ Parse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, pool_where_to_send(query_context, query_context->original_query, query_context->parse_tree); - if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE && strlen(name) != 0) + if (DLBOW_IS_DML_ADAPTIVE( + pool_config->disable_load_balance_on_write) + && strlen(name) != 0) pool_setall_node_to_be_sent(query_context); if (REPLICATION) @@ -1804,7 +1806,7 @@ Bind(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, return POOL_END; } - if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE && + if (DLBOW_IS_DML_ADAPTIVE(pool_config->disable_load_balance_on_write) && TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID) == 'T') { pool_where_to_send(query_context, query_context->original_query, diff --git a/src/query_cache/pool_memqcache.c b/src/query_cache/pool_memqcache.c index f38f711469576342ce59469b085c97365116004c..dca93334e9e47bb7978064edece5ca0e40021ce3 100644 --- a/src/query_cache/pool_memqcache.c +++ b/src/query_cache/pool_memqcache.c @@ -1305,6 +1305,12 @@ pool_extract_table_oids(Node *node, int **oidsp) } return num_oids; } + else if (IsA(node, MergeStmt)) + { + MergeStmt *stmt = (MergeStmt *) node; + + table = make_table_name_from_rangevar(stmt->relation); + } else if (IsA(node, ExplainStmt)) { ListCell *cell; diff --git a/src/sample/pgpool.conf.sample-stream b/src/sample/pgpool.conf.sample-stream index 1ac982907d2de3ad8cb8d1c70a73dc41428c1327..00132d534f4c78f6844df16aee67365a8d102a61 100644 --- a/src/sample/pgpool.conf.sample-stream +++ b/src/sample/pgpool.conf.sample-stream @@ -478,6 +478,14 @@ backend_clustering_mode = streaming_replication # modified within the current explicit transaction will # not be load balanced until the end of the transaction. # + # dml_adaptive_global: + # Superset of dml_adaptive. In addition to per-transaction + # tracking, uses shared memory to track recently written + # tables across all sessions. Reads from recently written + # tables are routed to primary until a TTL (based on + # replication delay) expires. Requires additional shared + # memory. See track_table_mutation_* parameters below. + # # always: # if a write query is issued, read queries will # not be load balanced until the session ends. @@ -499,6 +507,54 @@ backend_clustering_mode = streaming_replication #statement_level_load_balance = off # Enables statement level load balancing +# - Track Table Mutation (used by dml_adaptive_global) - + # WARNING: dml_adaptive_global increases shared memory usage + # Default settings require ~6.4 MB shared memory + # (0.1 MB table tracking + 6.3 MB query cache) + +#track_table_mutation_ttl_factor = 5.0 + # TTL multiplier: TTL = replication_delay * factor + # Higher values provide more safety margin + # Range: 1.0-100.0 (default: 5.0) + # (change requires reload) + +#track_table_mutation_max_staleness = 60000 + # Maximum duration (ms) a table can be marked stale + # from its first write. Bounds cross-session impact: + # even under continuous writes, staleness expires + # after this period and is only renewed by new writes. + # 0 disables the cap. Range: 0-3600000 (default: 60000 = 60s) + # (change requires reload) + +#track_table_mutation_cold_start_duration = 2000 + # Duration in milliseconds to route all queries to primary + # after child process starts (cold start period) + # Range: 0-60000 ms (default: 2000 ms = 2 seconds) + # Set to 0 to disable cold start behavior + # (change requires reload) + +#track_table_mutation_table_buckets = 1024 + # Number of hash buckets for track table mutation + # Higher values reduce hash collisions + # Range: 64-65536 (default: 1024) + # (change requires restart) + +#track_table_mutation_table_size = 2048 + # Maximum number of tables to track simultaneously + # Range: 128-131072 (default: 2048) + # (change requires restart) + +#track_table_mutation_query_buckets = 2048 + # Number of hash buckets for query parse cache + # Range: 64-65536 (default: 2048) + # (change requires restart) + +#track_table_mutation_query_parse_cache_size = 10000 + # Maximum number of query parse results to cache + # Range: 100-1000000 (default: 10000) + # Memory usage: ~640 bytes per entry (~6.3 MB default, ~64 MB for 100000) + # (change requires restart) + #------------------------------------------------------------------------------ # STREAMING REPLICATION MODE #------------------------------------------------------------------------------ diff --git a/src/streaming_replication/pool_worker_child.c b/src/streaming_replication/pool_worker_child.c index 311b638658e66ebb56162ad9fa4392315b2df64e..7eaf63010d79c75ba82a27361bff6fcdbc60dfc6 100644 --- a/src/streaming_replication/pool_worker_child.c +++ b/src/streaming_replication/pool_worker_child.c @@ -58,6 +58,7 @@ #include "utils/pool_ip.h" #include "utils/ps_status.h" #include "utils/pool_stream.h" +#include "utils/pool_track_table_mutation.h" #include "context/pool_process_context.h" #include "context/pool_session_context.h" @@ -695,6 +696,7 @@ check_replication_time_lag_with_cmd(void) double delay_ms; uint64 delay; uint64 delay_threshold_by_time; + uint64 max_delay_us = 0; /* Track max delay for mutation map */ int token_count = 0; int primary_node_id; int save_errno; @@ -1003,6 +1005,10 @@ check_replication_time_lag_with_cmd(void) bkinfo->standby_delay = delay; bkinfo->standby_delay_by_time = true; + /* Track maximum delay for table mutation map TTL calculation */ + if (delay > max_delay_us) + max_delay_us = delay; + /* * Log delay if necessary. threshold is in milliseconds, convert * to microseconds. @@ -1021,6 +1027,12 @@ check_replication_time_lag_with_cmd(void) token = strtok_r(NULL, " \t\n", &saveptr); } + /* Update table mutation TTL based on max observed delay */ + if (pool_config->disable_load_balance_on_write == + DLBOW_DML_ADAPTIVE_GLOBAL && + max_delay_us > 0) + pool_track_table_mutation_update_ttl(max_delay_us); + } PG_CATCH(); { diff --git a/src/test/regression/libs.sh b/src/test/regression/libs.sh index 7c5a0c1821191a572430b658d80ab34554110363..1c8ae392daa10056119c09c7127e839d859d700d 100644 --- a/src/test/regression/libs.sh +++ b/src/test/regression/libs.sh @@ -42,6 +42,8 @@ function wait_for_failover_done { function clean_all { pgrep pgpool | xargs kill -9 > /dev/null 2>&1 pgrep postgres | xargs kill -9 > /dev/null 2>&1 + # Clean up leaked SysV IPC resources left behind by kill -9 + ipcrm --all 2>/dev/null || true rm -f $PGSOCKET_DIR/.s.PGSQL.* netstat -t -p 2>/dev/null|grep pgpool } diff --git a/src/test/regression/tests/042.track_table_mutation/test.sh b/src/test/regression/tests/042.track_table_mutation/test.sh new file mode 100755 index 0000000000000000000000000000000000000000..8b4dd17b820d36e3fc48216ac7f0544cbf0f5a9c --- /dev/null +++ b/src/test/regression/tests/042.track_table_mutation/test.sh @@ -0,0 +1,354 @@ +#!/usr/bin/env bash +#------------------------------------------------------------------- +# test script for track table mutation feature (in-memory table tracking). +# Tests routing of queries based on recently written tables. +# +source $TESTLIBS +TESTDIR=testdir +PSQL=$PGBIN/psql +PSQLOPTS="-a -q -X" +PGPOOLBIN=$PGPOOL_INSTALL_DIR/bin +export PGDATABASE=test + +# Only run in streaming replication mode since that's the target use case +for mode in s +do + rm -fr $TESTDIR + mkdir $TESTDIR + cd $TESTDIR + + # Create test environment with 2 nodes + echo -n "creating test environment..." + $PGPOOL_SETUP -m $mode -n 2 || exit 1 + echo "done." + + source ./bashrc.ports + + # Configure track table mutation feature via dml_adaptive_global + echo "disable_load_balance_on_write = 'dml_adaptive_global'" >> etc/pgpool.conf + echo "track_table_mutation_ttl_factor = 5.0" >> etc/pgpool.conf + echo "track_table_mutation_cold_start_duration = 10000" >> etc/pgpool.conf + + # Enable load balancing explicitly + echo "load_balance_mode = on" >> etc/pgpool.conf + + # Configure weights so we can distinguish routing + # Backend 0 (primary) weight=0, Backend 1 (standby) weight=1 + # This means load balanced queries go to node 1 by default + echo "backend_weight0 = 0" >> etc/pgpool.conf + echo "backend_weight1 = 1" >> etc/pgpool.conf + + # Enable debug logging to see routing decisions + echo "log_min_messages = debug1" >> etc/pgpool.conf + + ./startall + + export PGPORT=$PGPOOL_PORT + export PGHOST=localhost + + wait_for_pgpool_startup + + # Create test tables + $PSQL test < /dev/null 2>&1 + + # Check log for cold start message (use -a to handle binary log files) + if grep -a -q "could not load balance because of track table mutation cold start" log/pgpool.log; then + echo "Test 1 PASSED: Cold start routing works" + else + echo "Test 1 FAILED: Cold start routing not detected" + ./shutdownall + exit 1 + fi + + echo "=== Test 2: Wait for cold start to end ===" + # Wait for cold start period to end (10 seconds). + # Use generous margin to avoid flakiness under load (e.g. full regression suite). + sleep 12 + + # Clear the log + > log/pgpool.log + + # Now a clean table query should load balance (go to node 1) + $PSQL test -c "SELECT 'after_cold_start' as marker, * FROM t3;" > /dev/null 2>&1 + + # After cold start, queries to clean tables should load balance + # Check that it did NOT get forced to primary due to track table mutation + if grep -a -q "could not load balance because of track table mutation cold start" log/pgpool.log; then + echo "Test 2 FAILED: Still in cold start after waiting" + ./shutdownall + exit 1 + fi + echo "Test 2 PASSED: Cold start ended correctly" + + echo "=== Test 3: Write-then-Read Routing ===" + # Clear the log + > log/pgpool.log + + # Write to t1 and then read - use single connection to ensure same session + $PSQL test < log/pgpool.log + + # Read from t2 (never written to) - should load balance + $PSQL test -c "SELECT 'clean_table_test' as marker, * FROM t2;" > /dev/null 2>&1 + + # Should NOT see track table mutation blocking message for t2 + if grep -a -q "could not load balance because table.*t2.*was recently written" log/pgpool.log; then + echo "Test 4 FAILED: Clean table incorrectly marked as stale" + ./shutdownall + exit 1 + fi + echo "Test 4 PASSED: Clean tables still load balance" + + echo "=== Test 5: UPDATE Marks Table as Stale ===" + # Clear the log + > log/pgpool.log + + # Update t2 and then read - use single connection + $PSQL test < log/pgpool.log + + # Delete from t3 and then read - use single connection + $PSQL test < log/pgpool.log + + # Create a fresh table for TRUNCATE test + $PSQL test -c "CREATE TABLE t_truncate(i INTEGER);" > /dev/null 2>&1 + $PSQL test -c "INSERT INTO t_truncate VALUES (1), (2), (3);" > /dev/null 2>&1 + + # Wait for any TTL to expire + sleep 3 + + # Clear the log again + > log/pgpool.log + + # Truncate and then read - use single connection + $PSQL test < log/pgpool.log + + # Create a fresh table for WITH test + $PSQL test -c "CREATE TABLE t_cte(i INTEGER);" > /dev/null 2>&1 + $PSQL test -c "INSERT INTO t_cte VALUES (1), (2), (3);" > /dev/null 2>&1 + + # Wait for any TTL to expire + sleep 3 + + # Clear the log again + > log/pgpool.log + + # Use WITH clause with DELETE, then read from the table + $PSQL test </dev/null; then + echo "=== Test 9: MERGE Marks Table as Stale (PostgreSQL $PG_MAJOR_VERSION) ===" + # Clear the log + > log/pgpool.log + + # Create tables for MERGE test + $PSQL test -c "CREATE TABLE t_merge_target(id INTEGER PRIMARY KEY, val TEXT);" > /dev/null 2>&1 + $PSQL test -c "CREATE TABLE t_merge_source(id INTEGER, val TEXT);" > /dev/null 2>&1 + $PSQL test -c "INSERT INTO t_merge_target VALUES (1, 'old');" > /dev/null 2>&1 + $PSQL test -c "INSERT INTO t_merge_source VALUES (1, 'new'), (2, 'insert');" > /dev/null 2>&1 + + # Wait for any TTL to expire + sleep 3 + + # Clear the log again + > log/pgpool.log + + # Use MERGE, then read from the target table + $PSQL test < /dev/null 2>&1 + + # Wait for any TTL to expire + sleep 3 + + # Clear the log + > log/pgpool.log + + # Write inside a transaction, then rollback + $PSQL test < /dev/null 2>&1 + + # Wait for any TTL to expire + sleep 3 + + # Clear the log + > log/pgpool.log + + # Write inside a transaction, then commit, then read + $PSQL test <> pgpool${i}/etc/pgpool.conf < /dev/null 2>&1 + if [ $? = 0 ]; then + break + fi + sleep 2 +done +echo "done." + +# Test 1: Verify leader came up +echo "=== Test 1: Waiting for the pgpool leader... ===" +for i in 1 2 3 4 5 6 7 8 9 10 +do + grep "I am the cluster leader node" \ + pgpool0/log/pgpool.log > /dev/null 2>&1 + if [ $? = 0 ]; then + success_count=$(( success_count + 1 )) + echo "Test 1 PASSED: Leader brought up." + break + fi + echo "[check] $i times" + sleep 2 +done + +if [ $success_count -lt 1 ]; then + echo "Test 1 FAILED: Leader did not start" + ./shutdownall + exit 1 +fi + +# Test 2: Verify standby joined cluster +echo "=== Test 2: Waiting for standby to join... ===" +for i in 1 2 3 4 5 6 7 8 9 10 +do + grep "successfully joined the watchdog cluster" \ + pgpool1/log/pgpool.log > /dev/null 2>&1 + if [ $? = 0 ]; then + success_count=$(( success_count + 1 )) + echo "Test 2 PASSED: Standby joined." + break + fi + echo "[check] $i times" + sleep 2 +done + +if [ $success_count -lt 2 ]; then + echo "Test 2 FAILED: Standby did not join" + ./shutdownall + exit 1 +fi + +# Test 3: Verify track_table_mutation initialized +echo "=== Test 3: Verify feature initialized ===" +if grep -a "track_table_mutation: initialized" \ + pgpool0/log/pgpool.log > /dev/null 2>&1; then + success_count=$(( success_count + 1 )) + echo "Test 3 PASSED: Feature initialized." +else + echo "Test 3 FAILED: Feature not initialized" + ./shutdownall + exit 1 +fi + +# Test 4: Stop leader (pgpool0) to trigger failover +echo "=== Test 4: Stopping leader... ===" +cd pgpool0 +source ./bashrc.ports +$PGPOOL_INSTALL_DIR/bin/pgpool \ + -f etc/pgpool.conf -m f stop +cd .. + +echo "Checking standby detected shutdown..." +for i in 1 2 3 4 5 6 7 8 9 10 +do + grep -a "is shutting down" \ + pgpool1/log/pgpool.log > /dev/null 2>&1 + if [ $? = 0 ]; then + success_count=$(( success_count + 1 )) + echo "Test 4 PASSED: Shutdown detected." + break + fi + echo "[check] $i times" + sleep 2 +done + +if [ $success_count -lt 4 ]; then + echo "Test 4 FAILED: Shutdown not detected" + ./shutdownall + exit 1 +fi + +# Test 5: Verify standby became new leader +echo "=== Test 5: Checking standby takes over... ===" +for i in 1 2 3 4 5 6 7 8 9 10 +do + grep -a "I am the cluster leader node" \ + pgpool1/log/pgpool.log > /dev/null 2>&1 + if [ $? = 0 ]; then + success_count=$(( success_count + 1 )) + echo "Test 5 PASSED: Standby became leader." + break + fi + echo "[check] $i times" + sleep 2 +done + +if [ $success_count -lt 5 ]; then + echo "Test 5 FAILED: Standby did not become leader" + ./shutdownall + exit 1 +fi + +# Test 6: Verify global cold start was triggered +echo "=== Test 6: Checking global cold start... ===" +for i in 1 2 3 4 5 6 7 8 9 10 +do + grep -a "track_table_mutation: global cold start" \ + pgpool1/log/pgpool.log > /dev/null 2>&1 + if [ $? = 0 ]; then + success_count=$(( success_count + 1 )) + echo "Test 6 PASSED: Global cold start triggered." + break + fi + echo "[check] $i times" + sleep 2 +done + +# Cleanup +./shutdownall + +echo "" +echo "$success_count out of 6 successful" + +if test $success_count -eq 6 +then + echo "=== All Watchdog Tests PASSED ===" + exit 0 +fi + +exit 1 diff --git a/src/utils/pool_track_table_mutation.c b/src/utils/pool_track_table_mutation.c new file mode 100644 index 0000000000000000000000000000000000000000..ee09b3f509ed7919c84d1c2ce6906c9c94e5cb06 --- /dev/null +++ b/src/utils/pool_track_table_mutation.c @@ -0,0 +1,1453 @@ +/* -*-pgsql-c-*- */ +/* + * pgpool: a language independent connection pool server for PostgreSQL + * written by Tatsuo Ishii + * + * Copyright (c) 2003-2026 PgPool Global Development Group + * + * Permission to use, copy, modify, and distribute this software and + * its documentation for any purpose and without fee is hereby + * granted, provided that the above copyright notice appear in all + * copies and that both that copyright notice and this permission + * notice appear in supporting documentation, and that the name of the + * author not be used in advertising or publicity pertaining to + * distribution of the software without specific, written prior + * permission. The author makes no representations about the + * suitability of this software for any purpose. It is provided "as + * is" without express or implied warranty. + * + * pool_track_table_mutation.c: In-memory tracking of recently + * written tables to prevent stale reads from replicas. + * + * Based on the "lagless" architecture from Tailor Brands. + */ + +#include +#include +#include +#include + +#include "pool.h" +#include "pool_config.h" +#include "context/pool_session_context.h" +#include "utils/pool_track_table_mutation.h" +#include "utils/elog.h" +#include "utils/pool_ipc.h" +#include "utils/palloc.h" +#include "utils/pool_relcache.h" + +#define DATABASE_TO_OID_QUERY \ + "SELECT oid FROM pg_catalog.pg_database" \ + " WHERE datname = '%s'" + +/* + * Helper macro: true when the feature is not active. + */ +#define TRACK_TABLE_MUTATION_DISABLED() \ + (pool_config->disable_load_balance_on_write != \ + DLBOW_DML_ADAPTIVE_GLOBAL || \ + track_table_mutation_shmem == NULL) + +/* ---------------- + * Local variables + * ---------------- + */ + +/* Pointer to shared memory structure */ +static TrackTableMutationShmem *track_table_mutation_shmem = NULL; + +/* Per-process cold start tracking (not in shared memory) */ +static struct timeval process_start_time; +static bool cold_start_initialized = false; + +/* ---------------- + * Helper macros for flexible arrays in shared memory + * ---------------- + */ + +/* Get pointer to bucket array in table map */ +#define TABLE_MAP_BUCKETS(map) \ + ((int *)((char *)(map) + \ + sizeof(TrackTableMutationHashTable))) + +/* Get pointer to entry array in table map */ +#define TABLE_MAP_ENTRIES(map) \ + ((TrackTableMutationEntry *)((char *)(map) + \ + sizeof(TrackTableMutationHashTable) + \ + (map)->num_buckets * sizeof(int))) + +/* Get pointer to bucket array in parse cache */ +#define PARSE_CACHE_BUCKETS(cache) \ + ((int *)((char *)(cache) + sizeof(QueryParseCache))) + +/* Get pointer to entry array in parse cache */ +#define PARSE_CACHE_ENTRIES(cache) \ + ((QueryParseEntry *)((char *)(cache) + \ + sizeof(QueryParseCache) + \ + (cache)->num_buckets * sizeof(int))) + +/* ---------------- + * Semaphore lock helpers + * ---------------- + */ + +static inline void +table_map_lock(void) +{ + pool_semaphore_lock(TRACK_TABLE_MUTATION_TABLE_SEM); +} + +static inline void +table_map_unlock(void) +{ + pool_semaphore_unlock(TRACK_TABLE_MUTATION_TABLE_SEM); +} + +static inline void +parse_cache_lock(void) +{ + pool_semaphore_lock(TRACK_TABLE_MUTATION_QUERY_SEM); +} + +static inline void +parse_cache_unlock(void) +{ + pool_semaphore_unlock(TRACK_TABLE_MUTATION_QUERY_SEM); +} + +/* ---------------- + * Hash functions + * ---------------- + */ + +/* + * FNV-1a hash for table/database oid pair + */ +static uint32 +fnv1a_hash_table_key(int table_oid, int dboid) +{ + uint32 hash = 2166136261u; /* FNV offset basis */ + uint32 data[2]; + const unsigned char *bytes; + size_t i; + + data[0] = (uint32) table_oid; + data[1] = (uint32) dboid; + bytes = (const unsigned char *) data; + + for (i = 0; i < sizeof(data); i++) + { + hash ^= bytes[i]; + hash *= 16777619u; /* FNV prime */ + } + + return hash; +} + +/* + * FNV-1a hash for 64-bit value + */ +static uint64 +fnv1a_hash_64(const char *str, size_t len) +{ + /* FNV offset basis for 64-bit */ + uint64 hash = 14695981039346656037ULL; + size_t i; + + for (i = 0; i < len; i++) + { + hash ^= (uint8)str[i]; + hash *= 1099511628211ULL; /* FNV prime */ + } + + return hash; +} + +/* ---------------- + * Time utilities + * ---------------- + */ + +/* + * Get elapsed time in microseconds between two timevals + */ +static int64 +elapsed_us(struct timeval *start, struct timeval *end) +{ + return ((int64)(end->tv_sec - start->tv_sec) * 1000000) + + (end->tv_usec - start->tv_usec); +} + +/* + * Get current time + */ +static void +get_current_time(struct timeval *tv) +{ + gettimeofday(tv, NULL); +} + +/* ---------------- + * Database oid lookup + * ---------------- + */ + +static int +track_table_mutation_get_database_oid_internal(void) +{ + int oid = 0; + static POOL_RELCACHE *relcache; + POOL_CONNECTION_POOL *backend; + POOL_SESSION_CONTEXT *session_context; + + /* Safety check: must have shmem initialized */ + if (track_table_mutation_shmem == NULL) + return oid; + + session_context = pool_get_session_context(false); + if (session_context == NULL) + return oid; + + backend = session_context->backend; + if (backend == NULL || + MAIN_CONNECTION(backend) == NULL || + MAIN_CONNECTION(backend)->sp == NULL) + return oid; + + /* Ensure database name is valid */ + if (MAIN_CONNECTION(backend)->sp->database == NULL) + return oid; + + if (!relcache) + { + relcache = pool_create_relcache( + pool_config->relcache_size, + DATABASE_TO_OID_QUERY, + int_register_func, + int_unregister_func, + false); + if (relcache == NULL) + { + ereport(LOG, + (errmsg("track_table_mutation: " + "error creating relcache"))); + return oid; + } + } + + oid = (int) (intptr_t) pool_search_relcache( + relcache, backend, + MAIN_CONNECTION(backend)->sp->database); + return oid; +} + +int +pool_track_table_mutation_get_database_oid(void) +{ + return track_table_mutation_get_database_oid_internal(); +} + +/* ---------------- + * Table mutation hash table operations + * ---------------- + */ + +/* + * Initialize table mutation hash table + */ +static void +table_map_init(TrackTableMutationHashTable *map, + int num_buckets, int max_entries) +{ + int *buckets; + TrackTableMutationEntry *entries; + int i; + int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX; + + map->num_buckets = num_buckets; + map->max_entries = max_entries; + map->num_entries = 0; + map->free_list_head = 0; + + buckets = TABLE_MAP_BUCKETS(map); + entries = TABLE_MAP_ENTRIES(map); + + /* Initialize all buckets to empty */ + for (i = 0; i < num_buckets; i++) + buckets[i] = invalid; + + /* Initialize free list - chain all entries */ + for (i = 0; i < max_entries; i++) + { + entries[i].in_use = false; + entries[i].next = (i < max_entries - 1) ? + i + 1 : invalid; + } + + ereport(DEBUG1, + (errmsg("track_table_mutation: " + "table map init %d buckets, " + "%d max entries", + num_buckets, max_entries))); +} + +/* + * Allocate an entry from the free list + */ +static int +table_map_alloc_entry(TrackTableMutationHashTable *map) +{ + TrackTableMutationEntry *entries; + int idx; + int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX; + + entries = TABLE_MAP_ENTRIES(map); + + if (map->free_list_head == invalid) + return invalid; + + idx = map->free_list_head; + map->free_list_head = entries[idx].next; + entries[idx].in_use = true; + entries[idx].next = invalid; + map->num_entries++; + + return idx; +} + +/* + * Free an entry back to the free list + */ +static void +table_map_free_entry(TrackTableMutationHashTable *map, + int idx) +{ + TrackTableMutationEntry *entries; + + entries = TABLE_MAP_ENTRIES(map); + + entries[idx].in_use = false; + entries[idx].next = map->free_list_head; + map->free_list_head = idx; + map->num_entries--; +} + +/* + * Look up a table in the hash table. + * Returns entry index or INVALID_INDEX if not found. + * Must be called with lock held. + */ +static int +table_map_lookup(TrackTableMutationHashTable *map, + int table_oid, int dboid, + uint32 hash) +{ + int *buckets = TABLE_MAP_BUCKETS(map); + TrackTableMutationEntry *entries; + int bucket = hash % map->num_buckets; + int idx = buckets[bucket]; + int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX; + + entries = TABLE_MAP_ENTRIES(map); + + while (idx != invalid) + { + if (entries[idx].hash == hash && + entries[idx].table_oid == table_oid && + entries[idx].dboid == dboid) + { + return idx; + } + idx = entries[idx].next; + } + + return invalid; +} + +/* + * Insert or update a table entry. + * Must be called with lock held. + */ +static void +table_map_insert(TrackTableMutationHashTable *map, + int table_oid, int dboid, + uint32 hash, + struct timeval *write_time) +{ + int *buckets = TABLE_MAP_BUCKETS(map); + TrackTableMutationEntry *entries; + int bucket = hash % map->num_buckets; + int idx; + int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX; + + entries = TABLE_MAP_ENTRIES(map); + + /* Check if entry already exists */ + idx = table_map_lookup(map, table_oid, dboid, hash); + if (idx != invalid) + { + /* Update last write time; keep first_write_time */ + entries[idx].last_write_time = *write_time; + return; + } + + /* Allocate new entry */ + idx = table_map_alloc_entry(map); + if (idx == invalid) + { + int b; + + /* Table is full - evict first non-empty bucket */ + for (b = 0; b < map->num_buckets; b++) + { + if (buckets[b] != invalid) + { + int victim = buckets[b]; + + buckets[b] = entries[victim].next; + table_map_free_entry(map, victim); + idx = table_map_alloc_entry(map); + break; + } + } + + if (idx == invalid) + { + ereport(WARNING, + (errmsg("track_table_mutation: " + "failed to allocate entry " + "for oid %d (dboid %d)", + table_oid, dboid))); + return; + } + } + + /* Initialize new entry */ + entries[idx].table_oid = table_oid; + entries[idx].dboid = dboid; + entries[idx].hash = hash; + entries[idx].first_write_time = *write_time; + entries[idx].last_write_time = *write_time; + + /* Insert at head of bucket chain */ + entries[idx].next = buckets[bucket]; + buckets[bucket] = idx; + + ereport(DEBUG2, + (errmsg("track_table_mutation: " + "marked oid %d (dboid %d) written", + table_oid, dboid))); +} + +/* + * Remove expired entries from the table map. + * Must be called with lock held. + */ +static void +table_map_cleanup_expired( + TrackTableMutationHashTable *map, uint64 ttl_us) +{ + int *buckets = TABLE_MAP_BUCKETS(map); + TrackTableMutationEntry *entries; + struct timeval now; + int64 max_stale_us; + int removed = 0; + int b; + int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX; + + entries = TABLE_MAP_ENTRIES(map); + get_current_time(&now); + + max_stale_us = (int64)pool_config + ->track_table_mutation_max_staleness * 1000LL; + + for (b = 0; b < map->num_buckets; b++) + { + int *prev_ptr = &buckets[b]; + int idx = buckets[b]; + + while (idx != invalid) + { + int64 age; + int64 total_age; + bool expired; + + age = elapsed_us( + &entries[idx].last_write_time, &now); + expired = (age > (int64)ttl_us); + + /* + * Also evict entries that exceed + * max_staleness from first write. + */ + if (!expired && max_stale_us > 0) + { + total_age = elapsed_us( + &entries[idx].first_write_time, + &now); + expired = (total_age >= max_stale_us); + } + + if (expired) + { + /* Entry has expired - remove it */ + int next = entries[idx].next; + + *prev_ptr = next; + table_map_free_entry(map, idx); + idx = next; + removed++; + } + else + { + prev_ptr = &entries[idx].next; + idx = entries[idx].next; + } + } + } + + if (removed > 0) + { + ereport(DEBUG1, + (errmsg("track_table_mutation: " + "cleaned up %d expired entries", + removed))); + } +} + +/* ---------------- + * Parse cache operations + * ---------------- + */ + +/* + * Initialize parse cache + */ +static void +parse_cache_init(QueryParseCache *cache, + int num_buckets, int max_entries) +{ + int *buckets; + QueryParseEntry *entries; + int i; + int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX; + + cache->num_buckets = num_buckets; + cache->max_entries = max_entries; + cache->num_entries = 0; + cache->free_list_head = 0; + cache->lru_head = invalid; + cache->lru_tail = invalid; + + buckets = PARSE_CACHE_BUCKETS(cache); + entries = PARSE_CACHE_ENTRIES(cache); + + /* Initialize all buckets to empty */ + for (i = 0; i < num_buckets; i++) + buckets[i] = invalid; + + /* Initialize free list */ + for (i = 0; i < max_entries; i++) + { + entries[i].in_use = false; + entries[i].next = (i < max_entries - 1) ? + i + 1 : invalid; + entries[i].lru_prev = invalid; + entries[i].lru_next = invalid; + } + + ereport(DEBUG1, + (errmsg("track_table_mutation: " + "parse cache init %d buckets, " + "%d max entries", + num_buckets, max_entries))); +} + +/* + * Move entry to front of LRU list (most recently used) + */ +static void +parse_cache_lru_touch(QueryParseCache *cache, int idx) +{ + QueryParseEntry *entries = PARSE_CACHE_ENTRIES(cache); + int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX; + + /* Already at head? */ + if (cache->lru_head == idx) + return; + + /* Remove from current position */ + if (entries[idx].lru_prev != invalid) + entries[entries[idx].lru_prev].lru_next = + entries[idx].lru_next; + if (entries[idx].lru_next != invalid) + entries[entries[idx].lru_next].lru_prev = + entries[idx].lru_prev; + if (cache->lru_tail == idx) + cache->lru_tail = entries[idx].lru_prev; + + /* Insert at head */ + entries[idx].lru_prev = invalid; + entries[idx].lru_next = cache->lru_head; + if (cache->lru_head != invalid) + entries[cache->lru_head].lru_prev = idx; + cache->lru_head = idx; + if (cache->lru_tail == invalid) + cache->lru_tail = idx; +} + +/* + * Add entry to LRU list (at head) + */ +static void +parse_cache_lru_add(QueryParseCache *cache, int idx) +{ + QueryParseEntry *entries = PARSE_CACHE_ENTRIES(cache); + int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX; + + entries[idx].lru_prev = invalid; + entries[idx].lru_next = cache->lru_head; + + if (cache->lru_head != invalid) + entries[cache->lru_head].lru_prev = idx; + + cache->lru_head = idx; + + if (cache->lru_tail == invalid) + cache->lru_tail = idx; +} + +/* + * Remove entry from LRU list + */ +static void +parse_cache_lru_remove(QueryParseCache *cache, int idx) +{ + QueryParseEntry *entries = PARSE_CACHE_ENTRIES(cache); + int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX; + + if (entries[idx].lru_prev != invalid) + entries[entries[idx].lru_prev].lru_next = + entries[idx].lru_next; + else + cache->lru_head = entries[idx].lru_next; + + if (entries[idx].lru_next != invalid) + entries[entries[idx].lru_next].lru_prev = + entries[idx].lru_prev; + else + cache->lru_tail = entries[idx].lru_prev; + + entries[idx].lru_prev = invalid; + entries[idx].lru_next = invalid; +} + +/* + * Allocate entry from free list, evicting LRU if needed + */ +static int +parse_cache_alloc_entry(QueryParseCache *cache) +{ + QueryParseEntry *entries = PARSE_CACHE_ENTRIES(cache); + int *buckets = PARSE_CACHE_BUCKETS(cache); + int idx; + int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX; + + if (cache->free_list_head != invalid) + { + idx = cache->free_list_head; + cache->free_list_head = entries[idx].next; + entries[idx].in_use = true; + entries[idx].next = invalid; + cache->num_entries++; + return idx; + } + + /* No free entries - evict LRU */ + if (cache->lru_tail == invalid) + return invalid; + + idx = cache->lru_tail; + + /* Remove from hash bucket */ + { + int bucket; + int *prev_ptr; + int curr; + + bucket = entries[idx].query_hash % + cache->num_buckets; + prev_ptr = &buckets[bucket]; + curr = buckets[bucket]; + + while (curr != invalid) + { + if (curr == idx) + { + *prev_ptr = entries[curr].next; + break; + } + prev_ptr = &entries[curr].next; + curr = entries[curr].next; + } + } + + /* Remove from LRU list */ + parse_cache_lru_remove(cache, idx); + + /* Reinitialize entry */ + entries[idx].in_use = true; + entries[idx].next = invalid; + + return idx; +} + +/* + * Look up a query in the parse cache + */ +static int +parse_cache_lookup(QueryParseCache *cache, uint64 hash) +{ + int *buckets = PARSE_CACHE_BUCKETS(cache); + QueryParseEntry *entries = PARSE_CACHE_ENTRIES(cache); + int bucket = hash % cache->num_buckets; + int idx = buckets[bucket]; + int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX; + + while (idx != invalid) + { + if (entries[idx].query_hash == hash) + return idx; + idx = entries[idx].next; + } + + return invalid; +} + +/* ---------------- + * Query normalization + * ---------------- + */ + +/* + * Simple query normalization: + * - Strip comments (-- and C-style block comments) + * - Collapse whitespace + * - Convert to lowercase (except inside strings) + * - Replace literal values with placeholders + */ +static size_t +normalize_query(const char *query, char *output, + size_t output_size) +{ + const char *src = query; + char *dst = output; + char *dst_end = output + output_size - 1; + bool in_string = false; + char string_char = 0; + bool last_was_space = true; + + while (*src && dst < dst_end) + { + /* Handle string literals */ + if (in_string) + { + if (*src == string_char) + { + if (*(src + 1) == string_char) + { + /* Escaped quote */ + src += 2; + continue; + } + in_string = false; + /* Replace string with placeholder */ + *dst++ = '$'; + } + src++; + continue; + } + + /* Check for string start */ + if (*src == '\'' || *src == '"') + { + in_string = true; + string_char = *src; + src++; + continue; + } + + /* Handle single-line comments */ + if (*src == '-' && *(src + 1) == '-') + { + while (*src && *src != '\n') + src++; + continue; + } + + /* Handle multi-line comments */ + if (*src == '/' && *(src + 1) == '*') + { + src += 2; + while (*src && + !(*src == '*' && *(src + 1) == '/')) + src++; + if (*src) + src += 2; + continue; + } + + /* Handle whitespace */ + if (*src == ' ' || *src == '\t' || + *src == '\n' || *src == '\r') + { + if (!last_was_space) + { + *dst++ = ' '; + last_was_space = true; + } + src++; + continue; + } + + /* Handle numbers - replace with placeholder */ + if ((*src >= '0' && *src <= '9') || + (*src == '.' && *(src + 1) >= '0' && + *(src + 1) <= '9')) + { + while (*src && + ((*src >= '0' && *src <= '9') || + *src == '.')) + src++; + if (!last_was_space && + dst > output && *(dst - 1) != '$') + *dst++ = '$'; + last_was_space = false; + continue; + } + + /* Regular character - convert to lowercase */ + if (*src >= 'A' && *src <= 'Z') + *dst++ = *src + 32; + else + *dst++ = *src; + + last_was_space = false; + src++; + } + + /* Remove trailing space */ + if (dst > output && *(dst - 1) == ' ') + dst--; + + *dst = '\0'; + return dst - output; +} + +/* ---------------- + * Public API implementation + * ---------------- + */ + +/* + * Calculate the total shared memory size required + * for the track table mutation feature. + */ +Size +pool_track_table_mutation_shmem_size(void) +{ + Size size = 0; + int tbl_bkt; + int tbl_sz; + int qry_bkt; + int qry_sz; + + tbl_bkt = pool_config->track_table_mutation_table_buckets; + tbl_sz = pool_config->track_table_mutation_table_size; + qry_bkt = pool_config->track_table_mutation_query_buckets; + qry_sz = pool_config->track_table_mutation_query_parse_cache_size; + + /* Main structure */ + size += sizeof(TrackTableMutationShmem); + + /* Table mutation hash table */ + size += sizeof(TrackTableMutationHashTable); + size += tbl_bkt * sizeof(int); + size += tbl_sz * sizeof(TrackTableMutationEntry); + + /* Parse cache */ + size += sizeof(QueryParseCache); + size += qry_bkt * sizeof(int); + size += qry_sz * sizeof(QueryParseEntry); + + return size; +} + +/* + * Initialize shared memory structures for the + * track table mutation feature. Allocates and sets + * up the table map and parse cache in shared memory. + * Called once from pgpool main process at startup. + */ +void +pool_track_table_mutation_init(void) +{ +#ifndef POOL_PRIVATE + Size shmem_size; + char *shmem_ptr; + TrackTableMutationState *st; + int tbl_bkt; + int tbl_sz; + int qry_bkt; + int qry_sz; + + if (pool_config->disable_load_balance_on_write != + DLBOW_DML_ADAPTIVE_GLOBAL) + { + ereport(DEBUG1, + (errmsg("track_table_mutation: " + "feature disabled"))); + return; + } + + tbl_bkt = pool_config->track_table_mutation_table_buckets; + tbl_sz = pool_config->track_table_mutation_table_size; + qry_bkt = pool_config->track_table_mutation_query_buckets; + qry_sz = pool_config->track_table_mutation_query_parse_cache_size; + + shmem_size = pool_track_table_mutation_shmem_size(); + + /* + * Allocate from the main shared memory segment. + * Memory is zeroed by + * initialize_shared_memory_main_segment(). + */ + shmem_ptr = pool_shared_memory_segment_get_chunk( + shmem_size); + if (shmem_ptr == NULL) + { + ereport(ERROR, + (errmsg("track_table_mutation: " + "failed to allocate %zu bytes", + shmem_size))); + return; + } + + /* Set up pointers within shared memory */ + track_table_mutation_shmem = + (TrackTableMutationShmem *) shmem_ptr; + shmem_ptr += sizeof(TrackTableMutationShmem); + + track_table_mutation_shmem->table_map = + (TrackTableMutationHashTable *) shmem_ptr; + shmem_ptr += sizeof(TrackTableMutationHashTable); + shmem_ptr += tbl_bkt * sizeof(int); + shmem_ptr += tbl_sz * sizeof(TrackTableMutationEntry); + + track_table_mutation_shmem->query_cache = + (QueryParseCache *) shmem_ptr; + + /* Initialize structures */ + table_map_init( + track_table_mutation_shmem->table_map, + tbl_bkt, tbl_sz); + + parse_cache_init( + track_table_mutation_shmem->query_cache, + qry_bkt, qry_sz); + + /* Initialize global state */ + st = &track_table_mutation_shmem->state; + st->initialized = true; + st->current_ttl_us = TRACK_TABLE_MUTATION_DEFAULT_TTL_US; + get_current_time(&st->ttl_last_updated); + get_current_time(&st->last_cleanup_time); + st->global_cold_start_until.tv_sec = 0; + st->global_cold_start_until.tv_usec = 0; + st->stats_queries_checked = 0; + st->stats_forced_primary = 0; + st->stats_allowed_replica = 0; + + ereport(LOG, + (errmsg("track_table_mutation: " + "initialized with %zu bytes shmem", + shmem_size))); +#endif +} + +/* + * Initialize per-child process state. + * Records the process start time for cold start + * period tracking. Called when a child process starts. + */ +void +pool_track_table_mutation_child_init(void) +{ + int dur; + + if (TRACK_TABLE_MUTATION_DISABLED()) + return; + + get_current_time(&process_start_time); + cold_start_initialized = true; + dur = pool_config->track_table_mutation_cold_start_duration; + + ereport(DEBUG1, + (errmsg("track_table_mutation: " + "child init, cold start %d ms", + dur))); +} + +/* + * Check if the process is in cold start period. + * During cold start, all queries are routed to + * primary to avoid stale reads. Checks both + * per-process and global (watchdog) cold start. + */ +bool +pool_track_table_mutation_in_cold_start(void) +{ + struct timeval now; + int64 elapsed_ms; + int dur; + TrackTableMutationState *st; + + if (TRACK_TABLE_MUTATION_DISABLED()) + return false; + + dur = pool_config->track_table_mutation_cold_start_duration; + if (dur <= 0) + return false; + + get_current_time(&now); + st = &track_table_mutation_shmem->state; + + /* Check watchdog-triggered global cold start */ + if (st->global_cold_start_until.tv_sec != 0 && + elapsed_us(&now, + &st->global_cold_start_until) > 0) + { + return true; + } + + /* Check per-process cold start */ + if (!cold_start_initialized) + return false; + + elapsed_ms = elapsed_us(&process_start_time, &now) / 1000; + + if (elapsed_ms < dur) + { + ereport(DEBUG2, + (errmsg("track_table_mutation: " + "cold start (%ld/%d ms)", + (long)elapsed_ms, dur))); + return true; + } + + return false; +} + +/* + * Trigger a global cold start for all processes. + * Sets the cold start end time in shared memory. + * Called after watchdog leader change to force all + * queries to primary during the transition. + */ +void +pool_track_table_mutation_trigger_global_cold_start(void) +{ + struct timeval now; + struct timeval *until; + int dur; + + if (TRACK_TABLE_MUTATION_DISABLED()) + return; + + dur = pool_config->track_table_mutation_cold_start_duration; + if (dur <= 0) + return; + + get_current_time(&now); + until = &track_table_mutation_shmem->state + .global_cold_start_until; + *until = now; + until->tv_sec += dur / 1000; + until->tv_usec += (dur % 1000) * 1000; + if (until->tv_usec >= 1000000) + { + until->tv_sec += until->tv_usec / 1000000; + until->tv_usec %= 1000000; + } + + ereport(LOG, + (errmsg("track_table_mutation: " + "global cold start for %d ms", + dur))); +} + +/* + * Check if a table was recently written (is "stale"). + * Returns true if reads should go to primary because + * the table was written within the current TTL window. + */ +bool +pool_track_table_mutation_table_is_stale( + int table_oid, int dboid) +{ + TrackTableMutationHashTable *map; + struct timeval now; + uint64 ttl_us; + uint32 hash; + int idx; + bool is_stale = false; + + if (TRACK_TABLE_MUTATION_DISABLED()) + return false; + + if (table_oid <= 0 || dboid <= 0) + { + is_stale = true; + goto update_stats; + } + + map = track_table_mutation_shmem->table_map; + hash = fnv1a_hash_table_key(table_oid, dboid); + + table_map_lock(); + + idx = table_map_lookup(map, table_oid, dboid, hash); + if (idx != TRACK_TABLE_MUTATION_INVALID_INDEX) + { + TrackTableMutationEntry *entries; + int64 age; + int64 total_age; + int64 max_stale_us; + + entries = TABLE_MAP_ENTRIES(map); + get_current_time(&now); + ttl_us = track_table_mutation_shmem->state + .current_ttl_us; + + age = elapsed_us( + &entries[idx].last_write_time, &now); + is_stale = (age < (int64)ttl_us); + + /* + * Enforce max_staleness hard cap: no entry + * can force primary routing longer than + * max_staleness from its first write. + */ + if (is_stale) + { + max_stale_us = (int64)pool_config + ->track_table_mutation_max_staleness + * 1000LL; + if (max_stale_us > 0) + { + total_age = elapsed_us( + &entries[idx].first_write_time, + &now); + if (total_age >= max_stale_us) + is_stale = false; + } + } + + ereport(DEBUG2, + (errmsg("track_table_mutation: " + "oid %d dboid %d " + "elapsed=%ld ttl=%lu stale=%d", + table_oid, dboid, + (long)age, + (unsigned long)ttl_us, + is_stale))); + } + + table_map_unlock(); + +update_stats: + /* Update statistics using semaphore */ + if (track_table_mutation_shmem != NULL) + { + TrackTableMutationState *st; + + table_map_lock(); + st = &track_table_mutation_shmem->state; + st->stats_queries_checked++; + if (is_stale) + st->stats_forced_primary++; + else + st->stats_allowed_replica++; + table_map_unlock(); + } + + return is_stale; +} + +/* + * Mark multiple tables as recently written. + * Called after DML queries complete to record + * which tables were modified. + */ +void +pool_track_table_mutation_mark_tables_written( + const int *table_oids, int num_tables, int dboid) +{ + TrackTableMutationHashTable *map; + TrackTableMutationState *st; + struct timeval now; + int i; + + if (TRACK_TABLE_MUTATION_DISABLED()) + return; + + if (num_tables <= 0 || table_oids == NULL || + dboid <= 0) + return; + + map = track_table_mutation_shmem->table_map; + st = &track_table_mutation_shmem->state; + get_current_time(&now); + + table_map_lock(); + + /* Periodically clean up expired entries */ + if (map->num_entries > map->max_entries * 3 / 4) + { + int64 since_cleanup; + + since_cleanup = elapsed_us( + &st->last_cleanup_time, &now); + /* 100ms interval */ + if (since_cleanup > 100000) + { + table_map_cleanup_expired( + map, st->current_ttl_us); + st->last_cleanup_time = now; + } + } + + for (i = 0; i < num_tables; i++) + { + uint32 hash; + int table_oid = table_oids[i]; + + if (table_oid > 0) + { + hash = fnv1a_hash_table_key( + table_oid, dboid); + table_map_insert(map, table_oid, + dboid, hash, &now); + } + } + + table_map_unlock(); +} + +/* + * Mark a single table as recently written. + */ +void +pool_track_table_mutation_mark_table_written( + int table_oid, int dboid) +{ + if (table_oid > 0 && dboid > 0) + { + const int tables[1] = { table_oid }; + + pool_track_table_mutation_mark_tables_written( + tables, 1, dboid); + } +} + +/* + * Update the staleness TTL based on observed + * replication delay. New TTL = delay * factor, + * clamped to [default_ttl, 1 hour]. + */ +void +pool_track_table_mutation_update_ttl(uint64 delay_us) +{ + uint64 new_ttl; + double factor; + TrackTableMutationState *st; + + if (TRACK_TABLE_MUTATION_DISABLED()) + return; + + factor = pool_config->track_table_mutation_ttl_factor; + new_ttl = (uint64)(delay_us * factor); + if (new_ttl < TRACK_TABLE_MUTATION_DEFAULT_TTL_US) + new_ttl = TRACK_TABLE_MUTATION_DEFAULT_TTL_US; + + /* Maximum TTL of 1 hour */ + if (new_ttl > 3600ULL * 1000000ULL) + new_ttl = 3600ULL * 1000000ULL; + + st = &track_table_mutation_shmem->state; + st->current_ttl_us = new_ttl; + get_current_time(&st->ttl_last_updated); + + ereport(DEBUG1, + (errmsg("track_table_mutation: " + "TTL=%lu us (delay=%lu factor=%.1f)", + (unsigned long)new_ttl, + (unsigned long)delay_us, + factor))); +} + +/* + * Look up a cached parse result by query hash. + * Returns true and fills output parameters if + * the query was found in the parse cache. + */ +bool +pool_track_table_mutation_get_cached_parse( + uint64 hash, bool *is_write, + char table_names[][TRACK_TABLE_MUTATION_TABLE_NAME_LEN], + int *num_tables) +{ + QueryParseCache *cache; + int idx; + bool found = false; + int max_tables; + + if (TRACK_TABLE_MUTATION_DISABLED()) + return false; + + max_tables = TRACK_TABLE_MUTATION_MAX_TABLES_PER_QUERY; + cache = track_table_mutation_shmem->query_cache; + + parse_cache_lock(); + + idx = parse_cache_lookup(cache, hash); + if (idx != TRACK_TABLE_MUTATION_INVALID_INDEX) + { + QueryParseEntry *entries; + int i; + int namelen; + + entries = PARSE_CACHE_ENTRIES(cache); + namelen = TRACK_TABLE_MUTATION_TABLE_NAME_LEN; + *is_write = entries[idx].is_write; + *num_tables = entries[idx].num_tables; + + for (i = 0; + i < entries[idx].num_tables && + i < max_tables; + i++) + { + strlcpy(table_names[i], + entries[idx].table_names[i], + namelen); + } + + /* Move to front of LRU */ + parse_cache_lru_touch(cache, idx); + found = true; + } + + parse_cache_unlock(); + + return found; +} + +/* + * Store a parse result in the shared cache. + * Evicts the LRU entry if the cache is full. + */ +void +pool_track_table_mutation_cache_parse( + uint64 hash, bool is_write, + const char table_names[][TRACK_TABLE_MUTATION_TABLE_NAME_LEN], + int num_tables) +{ + QueryParseCache *cache; + int *buckets; + QueryParseEntry *entries; + int idx; + int bucket; + int max_tables; + int namelen; + + if (TRACK_TABLE_MUTATION_DISABLED()) + return; + + max_tables = TRACK_TABLE_MUTATION_MAX_TABLES_PER_QUERY; + namelen = TRACK_TABLE_MUTATION_TABLE_NAME_LEN; + cache = track_table_mutation_shmem->query_cache; + + parse_cache_lock(); + + /* Check if already exists */ + idx = parse_cache_lookup(cache, hash); + if (idx != TRACK_TABLE_MUTATION_INVALID_INDEX) + { + parse_cache_unlock(); + return; + } + + /* Allocate new entry (may evict LRU) */ + idx = parse_cache_alloc_entry(cache); + if (idx == TRACK_TABLE_MUTATION_INVALID_INDEX) + { + parse_cache_unlock(); + ereport(WARNING, + (errmsg("track_table_mutation: " + "parse cache alloc failed"))); + return; + } + + entries = PARSE_CACHE_ENTRIES(cache); + buckets = PARSE_CACHE_BUCKETS(cache); + + /* Fill in entry */ + entries[idx].query_hash = hash; + entries[idx].is_write = is_write; + entries[idx].num_tables = + (num_tables > max_tables) ? + max_tables : num_tables; + + { + int i; + + for (i = 0; i < entries[idx].num_tables; i++) + { + strlcpy(entries[idx].table_names[i], + table_names[i], namelen); + } + } + + /* Insert into hash bucket */ + bucket = hash % cache->num_buckets; + entries[idx].next = buckets[bucket]; + buckets[bucket] = idx; + + /* Add to LRU list */ + parse_cache_lru_add(cache, idx); + + parse_cache_unlock(); +} + +/* + * Normalize a SQL query and compute its 64-bit hash. + * Strips comments, collapses whitespace, lowercases, + * and replaces literals with placeholders. + */ +uint64 +pool_track_table_mutation_normalize_and_hash( + const char *query) +{ + char normalized[8192]; + size_t len; + + if (query == NULL || query[0] == '\0') + return 0; + + len = normalize_query(query, normalized, + sizeof(normalized)); + if (len == 0) + return 0; + + return fnv1a_hash_64(normalized, len); +} -- 2.53.0