From 9006612e720a031f8de93bbe7f0d314061dbd28b Mon Sep 17 00:00:00 2001 From: Tatsuo Ishii Date: Tue, 7 Apr 2026 18:27:43 +0900 Subject: [PATCH v1] Feature: load balancing control by table tracking. Prevent routing of read only queries to standby if replication delay of tables used in the query exceeds certain amount of value collected by streaming replication process. To enable this feature, set disable_load_balance_on_write to dml_adaptive_global. In this mode, when tables are modified by INSERT/UPDATE/DELETE/TRUNCATE/MERGE/data modification WITH, for certain peoriod SELECTs using the tables are not load balanced: i.e. routed to the primary PostgreSQL server to avoid the data staleness by replication delay. Unlike dml_adaptive mode, any table modifications decribed above are detected even they happn in other sessions (in dml_adaptive, table modifications are only detected in the same transaction). Note, however, you cannot use dml_adaptive_object_relationship_list to track dependency among table and other objects. Besides dml_adaptive_global, there are some tuning knobs for the feature: - track_table_mutation_ttl_factor Parameter to calculate TTL of each tracking data. - track_table_mutation_max_staleness Maximum duration in milliseconds that a single table entry can continuously force queries to primary. - track_table_mutation_cold_start_duration Duration in milliseconds to route all queries to primary after a child process starts. - track_table_mutation_table_buckets Number of hash buckets for the track table mutation hash table. - track_table_mutation_table_size Maximum number of tables that can be tracked simultaneously in the track table mutation. - track_table_mutation_query_buckets Number of hash buckets for the query parse cache. - track_table_mutation_query_parse_cache_size Maximum number of query parse results to cache. Author: Nadav Shatz Reviewed-by: Tatsuo Ishii Discussion: https://www.postgresql.org/message-id/flat/20260407.181009.1762204033074164841.ishii%40postgresql.org#58c139c1a7f8d5562865921d0733667b --- doc/src/sgml/loadbalance.sgml | 334 ++++ src/Makefile.am | 1 + src/config/pool_config_variables.c | 90 + src/context/pool_query_context.c | 235 ++- src/context/pool_session_context.c | 15 +- src/include/pool.h | 4 +- src/include/pool_config.h | 28 +- src/include/utils/pool_track_table_mutation.h | 247 +++ src/main/pgpool_main.c | 29 +- src/protocol/CommandComplete.c | 28 + src/protocol/child.c | 8 + src/protocol/pool_proto_modules.c | 6 +- src/sample/pgpool.conf.sample-stream | 56 + src/streaming_replication/pool_worker_child.c | 24 + src/test/regression/libs.sh | 2 + .../tests/042.track_table_mutation/test.sh | 354 ++++ .../043.track_table_mutation_watchdog/test.sh | 184 +++ src/tools/pgindent/typedefs.list | 6 + src/utils/pool_track_table_mutation.c | 1450 +++++++++++++++++ 19 files changed, 3080 insertions(+), 21 deletions(-) create mode 100644 src/include/utils/pool_track_table_mutation.h create mode 100755 src/test/regression/tests/042.track_table_mutation/test.sh create mode 100755 src/test/regression/tests/043.track_table_mutation_watchdog/test.sh create mode 100644 src/utils/pool_track_table_mutation.c diff --git a/doc/src/sgml/loadbalance.sgml b/doc/src/sgml/loadbalance.sgml index 9e1e7b39b..7384ce81a 100644 --- a/doc/src/sgml/loadbalance.sgml +++ b/doc/src/sgml/loadbalance.sgml @@ -1110,6 +1110,18 @@ app_name_redirect_preference_list > database_redirect_preference_list > us Dependent functions, triggers, and views on the tables can be configured using + + + If this parameter is set to dml_adaptive_global, + Pgpool-II behaves like dml_adaptive + (per-transaction write tracking) and additionally uses shared memory to track + recently written tables across all sessions cluster-wide. When a table is + written in any session, subsequent reads of that table from any session are + routed to primary until a TTL (based on measured replication delay) expires. + This prevents stale reads after writes even across different connections. + See for the sub-parameters + that control the shared-memory tracking behavior. + @@ -1195,4 +1207,326 @@ dml_adaptive_object_relationship_list = 'table_1:table_2' + + + Table Mutation Map Configuration (Lagless Replica Reads) + + + These parameters configure the track table mutation feature, which is activated by setting + to dml_adaptive_global. + The feature tracks recently written tables to prevent stale reads from replica nodes during + replication lag, implementing the "lagless" architecture pattern for distributed systems + with read replicas. + + + + When a table is modified (INSERT/UPDATE/DELETE), it is marked as "stale" for a TTL period + (replication_delay * track_table_mutation_ttl_factor). Any SELECT queries on stale tables are routed + to the primary node instead of replicas, ensuring read-after-write consistency. + + + + This feature requires time-based replication delay monitoring. This can be provided by either + (external command mode) or by setting + (which uses pg_stat_replication.replay_lag + from PostgreSQL 10+). At least one of these must be configured for the TTL calculation to work. + + + + + Enabling dml_adaptive_global increases shared memory consumption. With default settings, + the feature requires approximately 6.4 MB of shared memory (0.1 MB for table tracking + 6.3 MB for query cache). + Memory usage scales with configuration parameters: + + + + + Table tracking: track_table_mutation_table_size * 40 bytes (default: 2048 * 40 = ~80 KB) + + + + + Query cache: track_table_mutation_query_parse_cache_size * 640 bytes (default: 10000 * 640 = ~6.3 MB) + + + + + For high-traffic systems with large cache sizes (e.g., track_table_mutation_query_parse_cache_size = 100000), + memory usage can reach 64 MB or more. Consider your system's available shared memory when using dml_adaptive_global. + + + + + + + track_table_mutation_ttl_factor (floating point) + + track_table_mutation_ttl_factor configuration parameter + + + + + Multiplier for calculating the TTL: TTL = replication_delay * track_table_mutation_ttl_factor. + Higher values provide more safety margin but may reduce read replica utilization. + + + Valid range: 1.0-100.0. Default is 5.0. + This parameter can be changed by reloading the Pgpool-II configurations. + + + + + + track_table_mutation_max_staleness (integer) + + track_table_mutation_max_staleness configuration parameter + + + + + Maximum duration in milliseconds that a single table entry can continuously force queries to primary, + measured from when the table was first marked stale. When this cap is reached, the entry is expired + regardless of recent writes. If the table is written to again after expiry, a fresh tracking entry + is created. + + + This parameter bounds the cross-session impact of table mutation tracking. Even if a table is written + to in a tight loop, its effect on other sessions' load balancing is limited to this duration. For + legitimately busy tables, the gap between forced expiry and the next write re-marking the table is + negligible (typically milliseconds). + + + Set to 0 to disable the cap (not recommended for production). + Valid range: 0-3600000 ms. Default is 60000 (60 seconds). + This parameter can be changed by reloading the Pgpool-II configurations. + + + + + + track_table_mutation_cold_start_duration (integer) + + track_table_mutation_cold_start_duration configuration parameter + + + + + Duration in milliseconds to route all queries to primary after a child process starts. + This prevents stale reads when a new connection is established before the track table mutation + is populated with recent write history. + + + When watchdog is enabled and the local node becomes the leader, Pgpool-II also triggers a + global cold start for this duration to avoid stale reads after leadership changes. + + + Valid range: 0-60000 ms. Default is 2000 (2 seconds). + Set to 0 to disable cold start behavior. + This parameter can be changed by reloading the Pgpool-II configurations. + + + + + + track_table_mutation_table_buckets (integer) + + track_table_mutation_table_buckets configuration parameter + + + + + Number of hash buckets for the track table mutation hash table. + Higher values reduce hash collisions and improve lookup performance. + + + Valid range: 64-65536. Default is 1024. + This parameter can only be set at server start. + + + + + + track_table_mutation_table_size (integer) + + track_table_mutation_table_size configuration parameter + + + + + Maximum number of tables that can be tracked simultaneously in the track table mutation. + When full, oldest entries are evicted using a simple eviction strategy. + + + Valid range: 128-131072. Default is 2048. + Memory usage: approximately 40 bytes per entry. + This parameter can only be set at server start. + + + + + + track_table_mutation_query_buckets (integer) + + track_table_mutation_query_buckets configuration parameter + + + + + Number of hash buckets for the query parse cache. The cache stores normalized + query strings mapped to their table dependencies to avoid repeated parsing. + + + Valid range: 64-65536. Default is 2048. + This parameter can only be set at server start. + + + + + + track_table_mutation_query_parse_cache_size (integer) + + track_table_mutation_query_parse_cache_size configuration parameter + + + + + Maximum number of query parse results to cache. Uses LRU eviction when full. + Larger caches reduce parsing overhead but consume more shared memory. + + + Valid range: 100-1000000. Default is 10000. + Memory usage: approximately 640 bytes per entry (~6.3 MB for default, ~64 MB for 100000 entries). + This parameter can only be set at server start. + + + + + + + + Track Table Mutation Configuration Example + + To enable track table mutation with replication delay monitoring: + + +# Enable dml_adaptive_global mode (includes track table mutation) +disable_load_balance_on_write = 'dml_adaptive_global' +track_table_mutation_ttl_factor = 5.0 +track_table_mutation_max_staleness = 60000 +track_table_mutation_cold_start_duration = 2000 + +# Option A: Use external command for replication delay +replication_delay_source_cmd = '/path/to/get-replication-delay.sh' +replication_delay_source_timeout = 10 + +# Option B: Use pg_stat_replication replay_lag (PG 10+) +# delay_threshold_by_time = 1000 + +# Adjust cache sizes based on workload (increases memory usage) +track_table_mutation_table_size = 4096 +track_table_mutation_query_parse_cache_size = 50000 + + + Total shared memory required for above configuration: approximately 31.2 MB (31 MB query cache + 0.2 MB table map + overhead). + Default configuration (10000 query cache entries, 2048 tables) requires approximately 6.4 MB. + + + + + Limitations + + The track table mutation feature has the following limitation: + + + + + PREPARE statements are not tracked. When a prepared statement + containing data modification is executed, the table mutation is not recorded. + + + + + If your application uses prepared statements and requires read-after-write consistency, + consider using explicit transaction routing or the /*NO LOAD BALANCE*/ + comment directive for affected queries. + + + The following statement types are tracked and will mark tables as stale: + + + + + INSERT, UPDATE, DELETE + statements (including those with RETURNING clauses). + + + + + TRUNCATE statements (including multiple tables). + + + + + MERGE statements (PostgreSQL 15+). + + + + + WITH clauses containing data modifications (Common Table Expressions + with INSERT, UPDATE, or DELETE). + For example, WITH deleted AS (DELETE FROM t1 RETURNING *) SELECT * FROM deleted + will properly mark table t1 as stale. + + + + + Transaction Rollback Behavior: Within explicit transactions, tables + are only marked as stale in shared memory when the transaction is committed. If the + transaction is rolled back, no tables are marked, since no actual data modification + occurred on replicas. This prevents rolled-back transactions from unnecessarily + disabling load balancing. For autocommit statements (outside explicit transactions), + tables are marked immediately upon command completion. + + + + Cross-Session Impact and Safety Bounds: + Unlike dml_adaptive (which only affects the session that issued the write), + dml_adaptive_global affects all sessions reading the same table in the same database. + The following safety mechanisms bound this cross-session impact: + + + + + Maximum staleness cap: The + parameter (default: 60 seconds) limits how long any single table entry can continuously force primary + routing. Even under sustained writes, the entry expires after this period and is only renewed by + subsequent committed writes. + + + + + Database isolation: Table staleness tracking is scoped by database OID. Writes + in one database never affect load balancing decisions for sessions connected to a different database. + In multi-tenant deployments where tenants use separate databases, one tenant's write activity cannot + influence another tenant's query routing. + + + + + Committed writes only: Only committed transactions mark tables as stale. + Rolled-back transactions have no effect on the shared tracking state. + + + + + Bounded table map size: The shared memory table map has a fixed maximum size + (). At most this many tables can be marked stale + simultaneously, providing a natural ceiling on the feature's impact. + + + + + + + diff --git a/src/Makefile.am b/src/Makefile.am index 4678ab530..39588af58 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -35,6 +35,7 @@ pgpool_SOURCES = main/main.c \ rewrite/pool_timestamp.c \ rewrite/pool_lobj.c \ utils/pool_select_walker.c \ + utils/pool_track_table_mutation.c \ utils/strlcpy.c \ utils/psprintf.c \ utils/pool_params.c \ diff --git a/src/config/pool_config_variables.c b/src/config/pool_config_variables.c index ce13c42f6..d5f4fb605 100644 --- a/src/config/pool_config_variables.c +++ b/src/config/pool_config_variables.c @@ -290,6 +290,7 @@ static const struct config_enum_entry disable_load_balance_on_write_options[] = {"trans_transaction", DLBOW_TRANS_TRANSACTION, false}, {"always", DLBOW_ALWAYS, false}, {"dml_adaptive", DLBOW_DML_ADAPTIVE, false}, + {"dml_adaptive_global", DLBOW_DML_ADAPTIVE_GLOBAL, false}, {NULL, 0, false} }; @@ -1777,6 +1778,19 @@ static struct config_int_array ConfigureNamesIntArray[] = static struct config_double ConfigureNamesDouble[] = { + { + {"track_table_mutation_ttl_factor", + CFGCXT_RELOAD, LOAD_BALANCE_CONFIG, + "TTL multiplier for track table mutation " + "(TTL = replication_delay * factor)", + CONFIG_VAR_TYPE_DOUBLE, false, 0 + }, + &g_pool_config.track_table_mutation_ttl_factor, + 5.0, /* boot value: 5x replication delay */ + 1.0, 100.0, /* min, max */ + NULL, NULL, NULL + }, + /* End-of-list marker */ EMPTY_CONFIG_DOUBLE }; @@ -2397,6 +2411,81 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"track_table_mutation_max_staleness", + CFGCXT_RELOAD, LOAD_BALANCE_CONFIG, + "Maximum duration in milliseconds that a " + "table can be marked stale from its first " + "write. 0 disables the cap.", + CONFIG_VAR_TYPE_INT, false, GUC_UNIT_MS + }, + &g_pool_config.track_table_mutation_max_staleness, + 60000, /* 60 seconds */ + 0, 3600000, /* 0 to 1 hour */ + NULL, NULL, NULL + }, + + { + {"track_table_mutation_cold_start_duration", + CFGCXT_RELOAD, LOAD_BALANCE_CONFIG, + "Duration in milliseconds to force queries " + "to primary after child process starts.", + CONFIG_VAR_TYPE_INT, false, GUC_UNIT_MS + }, + &g_pool_config.track_table_mutation_cold_start_duration, + 2000, /* 2 seconds */ + 0, 60000, /* 0 to 60 seconds */ + NULL, NULL, NULL + }, + + { + {"track_table_mutation_table_buckets", + CFGCXT_INIT, LOAD_BALANCE_CONFIG, + "Number of hash buckets for track table mutation.", + CONFIG_VAR_TYPE_INT, false, 0 + }, + &g_pool_config.track_table_mutation_table_buckets, + 1024, + 64, 65536, + NULL, NULL, NULL + }, + + { + {"track_table_mutation_table_size", + CFGCXT_INIT, LOAD_BALANCE_CONFIG, + "Maximum number of entries in track table mutation.", + CONFIG_VAR_TYPE_INT, false, 0 + }, + &g_pool_config.track_table_mutation_table_size, + 2048, + 128, 131072, + NULL, NULL, NULL + }, + + { + {"track_table_mutation_query_buckets", + CFGCXT_INIT, LOAD_BALANCE_CONFIG, + "Number of hash buckets for query parse cache.", + CONFIG_VAR_TYPE_INT, false, 0 + }, + &g_pool_config.track_table_mutation_query_buckets, + 2048, + 64, 65536, + NULL, NULL, NULL + }, + + { + {"track_table_mutation_query_parse_cache_size", + CFGCXT_INIT, LOAD_BALANCE_CONFIG, + "Maximum number of entries in query parse cache.", + CONFIG_VAR_TYPE_INT, false, 0 + }, + &g_pool_config.track_table_mutation_query_parse_cache_size, + 10000, + 100, 1000000, + NULL, NULL, NULL + }, + /* End-of-list marker */ EMPTY_CONFIG_INT }; @@ -4615,6 +4704,7 @@ static const char * BackendFlagsShowFunc(int index) { unsigned short flag = g_pool_config.backend_desc->backend_info[index].flag; + return pool_flag_to_str(flag); } diff --git a/src/context/pool_query_context.c b/src/context/pool_query_context.c index a056ac596..0190d3673 100644 --- a/src/context/pool_query_context.c +++ b/src/context/pool_query_context.c @@ -29,6 +29,7 @@ #include "utils/statistics.h" #include "utils/pool_select_walker.h" #include "utils/pool_stream.h" +#include "utils/pool_track_table_mutation.h" #include "context/pool_session_context.h" #include "context/pool_query_context.h" #include "parser/nodes.h" @@ -1828,15 +1829,23 @@ is_in_list(char *name, List *list) static bool is_select_object_in_temp_write_list(Node *node, void *context) { - if (node == NULL || pool_config->disable_load_balance_on_write != DLBOW_DML_ADAPTIVE) + if (node == NULL || + !DLBOW_IS_DML_ADAPTIVE( + pool_config->disable_load_balance_on_write)) return false; if (IsA(node, RangeVar)) { RangeVar *rgv = (RangeVar *) node; - POOL_SESSION_CONTEXT *session_context = pool_get_session_context(false); + POOL_SESSION_CONTEXT *session_context; + bool is_adaptive; + + session_context = pool_get_session_context(false); + is_adaptive = DLBOW_IS_DML_ADAPTIVE( + pool_config->disable_load_balance_on_write); - if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE && session_context->is_in_transaction) + if (is_adaptive && + session_context->is_in_transaction) { ereport(DEBUG1, (errmsg("is_select_object_in_temp_write_list: \"%s\", found relation \"%s\"", (char *) context, rgv->relname))); @@ -1880,7 +1889,13 @@ static char *get_associated_object_from_dml_adaptive_relations void check_object_relationship_list(char *name, bool is_func_name) { - if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE && pool_config->parsed_dml_adaptive_object_relationship_list) + bool is_adaptive; + + is_adaptive = DLBOW_IS_DML_ADAPTIVE( + pool_config->disable_load_balance_on_write); + + if (is_adaptive && + pool_config->parsed_dml_adaptive_object_relationship_list) { POOL_SESSION_CONTEXT *session_context = pool_get_session_context(false); @@ -1947,7 +1962,7 @@ add_object_into_temp_write_list(Node *node, void *context) static void dml_adaptive(Node *node, char *query) { - if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE) + if (DLBOW_IS_DML_ADAPTIVE(pool_config->disable_load_balance_on_write)) { /* Set/Unset transaction status flags */ if (IsA(node, TransactionStmt)) @@ -1966,6 +1981,45 @@ dml_adaptive(Node *node, char *query) } else if (is_commit_or_rollback_query(node)) { + /* + * For dml_adaptive_global: on COMMIT, flush the accumulated + * table writes to shared memory. On ROLLBACK, skip -- the + * writes never committed so no stale-read risk exists. This + * prevents polluting the table map with rolled-back + * transactions. + */ + int dlbow = + pool_config->disable_load_balance_on_write; + List *wlist = + session_context->transaction_temp_write_list; + + if (dlbow == DLBOW_DML_ADAPTIVE_GLOBAL && + is_commit_query(node) && + wlist != NIL) + { + ListCell *cell; + int dboid; + + dboid = + pool_track_table_mutation_get_database_oid(); + if (dboid > 0) + { + foreach(cell, wlist) + { + char *tname; + int toid; + + tname = (char *) lfirst(cell); + toid = + pool_table_name_to_oid(tname); + + if (toid > 0) + pool_track_table_mutation_mark_table_written( + toid, dboid); + } + } + } + session_context->is_in_transaction = false; if (session_context->transaction_temp_write_list != NIL) @@ -2008,7 +2062,7 @@ where_to_send_main_replica(POOL_QUERY_CONTEXT *query_context, char *query, Node session_context = pool_get_session_context(false); backend = session_context->backend; - /* + /* * Collect/discard information for disable_load_balance_on_write = * dml_adaptive case. */ @@ -2022,6 +2076,20 @@ where_to_send_main_replica(POOL_QUERY_CONTEXT *query_context, char *query, Node if (dest == POOL_PRIMARY) { pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID); + + /* + * Resolve table and database OIDs now to populate relcache. This + * avoids potential hangs in CommandComplete where we shouldn't be + * running new queries against the backend. + */ + if (pool_config->disable_load_balance_on_write == + DLBOW_DML_ADAPTIVE_GLOBAL) + { + int *oids; + + pool_extract_table_oids(node, &oids); + pool_track_table_mutation_get_database_oid(); + } } /* Should be sent to both primary and standby? */ else if (dest == POOL_BOTH) @@ -2151,6 +2219,153 @@ where_to_send_main_replica(POOL_QUERY_CONTEXT *query_context, char *query, Node { pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID); } + + /* + * Check track table mutation for recently written tables. If + * in cold start or any table was recently written, route to + * primary to avoid stale reads. + */ + else if (pool_config->disable_load_balance_on_write == + DLBOW_DML_ADAPTIVE_GLOBAL) + { + bool force_primary = false; + int lb_node; + POOL_QUERY_CONTEXT *qctx = + session_context->query_context; + + if (pool_track_table_mutation_in_cold_start()) + { + ereport(DEBUG1, + (errmsg("could not load balance" + " because of track table" + " mutation cold start"), + errdetail("destination = PRIMARY" + " for query= \"%s\"", + query))); + force_primary = true; + } + else + { + SelectContext ctx; + int dboid; + int num_oids; + int i; + + memset(&ctx, 0, sizeof(ctx)); + num_oids = + pool_extract_table_oids_from_select_stmt( + node, &ctx); + if (num_oids > 0) + { + dboid = + pool_track_table_mutation_get_database_oid(); + + if (dboid <= 0) + { + ereport(DEBUG1, + (errmsg("could not load" + " balance because" + " database oid was" + " unavailable"), + errdetail("destination" + " = PRIMARY for" + " query= \"%s\"", + query))); + force_primary = true; + } + else + { + for (i = 0; i < num_oids; i++) + { + bool stale; + + stale = + pool_track_table_mutation_table_is_stale( + ctx.table_oids[i], + dboid); + if (stale) + { + ereport(DEBUG1, + (errmsg("could not load" + " balance because" + " table \"%s\" was" + " recently written", + ctx.table_names[i]), + errdetail("destination" + " = PRIMARY for" + " query= \"%s\"", + query))); + force_primary = true; + break; + } + } + } + } + } + + if (force_primary) + { + pool_set_node_to_be_sent( + query_context, + PRIMARY_NODE_ID); + } + else + { + if (pool_config->statement_level_load_balance) + { + session_context->load_balance_node_id = + select_load_balancing_node(); + } + + /* + * If replication delay is too much, and + * prefer_lower_delay_standby is true then elect the + * lowest-delayed node, otherwise send to primary. + */ + lb_node = + session_context->load_balance_node_id; + if (STREAM && + check_replication_delay(lb_node)) + { + ereport(DEBUG1, + (errmsg("could not load" + " balance because of" + " too much replication" + " delay"), + errdetail("destination" + " = %d for" + " query= \"%s\"", + dest, query))); + + if (pool_config->prefer_lower_delay_standby) + { + lb_node = + select_load_balancing_node(); + session_context->load_balance_node_id = + lb_node; + qctx->load_balance_node_id = + lb_node; + pool_set_node_to_be_sent( + query_context, + lb_node); + } + else + { + pool_set_node_to_be_sent( + query_context, + PRIMARY_NODE_ID); + } + } + else + { + qctx->load_balance_node_id = + session_context->load_balance_node_id; + pool_set_node_to_be_sent( + query_context, + qctx->load_balance_node_id); + } + } + } else { if (pool_config->statement_level_load_balance) @@ -2171,7 +2386,8 @@ where_to_send_main_replica(POOL_QUERY_CONTEXT *query_context, char *query, Node errdetail("destination = %d for query= \"%s\"", dest, query))); /* - * If prefer_lower_delay_standby is on, choose lower delay standby. + * If prefer_lower_delay_standby is on, choose lower + * delay standby. */ if (pool_config->prefer_lower_delay_standby) { @@ -2181,7 +2397,8 @@ where_to_send_main_replica(POOL_QUERY_CONTEXT *query_context, char *query, Node session_context->query_context->load_balance_node_id = session_context->load_balance_node_id; pool_set_node_to_be_sent(query_context, session_context->query_context->load_balance_node_id); } - else /* delay is too much. prefer to send to primary */ + else /* delay is too much. prefer to send to + * primary */ { pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID); } @@ -2191,7 +2408,7 @@ where_to_send_main_replica(POOL_QUERY_CONTEXT *query_context, char *query, Node * Not streaming replication mode, or delay_threshold is 0 * or replication delay is small enough. */ - else + else { session_context->query_context->load_balance_node_id = session_context->load_balance_node_id; pool_set_node_to_be_sent(query_context, diff --git a/src/context/pool_session_context.c b/src/context/pool_session_context.c index ded41c7fc..05d0b635b 100644 --- a/src/context/pool_session_context.c +++ b/src/context/pool_session_context.c @@ -532,7 +532,7 @@ dump_sent_message(char *caller, POOL_SENT_MESSAGE *m) static void dml_adaptive_init(void) { - if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE) + if (DLBOW_IS_DML_ADAPTIVE(pool_config->disable_load_balance_on_write)) { session_context->is_in_transaction = false; session_context->transaction_temp_write_list = NIL; @@ -542,7 +542,9 @@ dml_adaptive_init(void) static void dml_adaptive_destroy(void) { - if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE && session_context) + if (DLBOW_IS_DML_ADAPTIVE( + pool_config->disable_load_balance_on_write) && + session_context) { if (session_context->transaction_temp_write_list != NIL) list_free_deep(session_context->transaction_temp_write_list); @@ -738,10 +740,13 @@ void pool_set_writing_transaction(void) { /* - * If disable_transaction_on_write is 'off' or 'dml_adaptive', then never - * turn on writing transaction flag. + * If disable_load_balance_on_write is 'off' or 'dml_adaptive' or + * 'dml_adaptive_global', then never turn on writing transaction flag. */ - if (pool_config->disable_load_balance_on_write != DLBOW_OFF && pool_config->disable_load_balance_on_write != DLBOW_DML_ADAPTIVE) + if (pool_config->disable_load_balance_on_write != + DLBOW_OFF && + !DLBOW_IS_DML_ADAPTIVE( + pool_config->disable_load_balance_on_write)) { pool_get_session_context(false)->writing_transaction = true; ereport(DEBUG5, diff --git a/src/include/pool.h b/src/include/pool.h index 65907dcf1..0e901691a 100644 --- a/src/include/pool.h +++ b/src/include/pool.h @@ -424,7 +424,7 @@ typedef enum #define Min(x, y) ((x) < (y) ? (x) : (y)) -#define MAX_NUM_SEMAPHORES 8 +#define MAX_NUM_SEMAPHORES 10 #define CONN_COUNTER_SEM 0 #define REQUEST_INFO_SEM 1 #define QUERY_CACHE_STATS_SEM 2 @@ -434,6 +434,8 @@ typedef enum #define FOLLOW_PRIMARY_SEM 6 #define MAIN_EXIT_HANDLER_SEM 7 /* used in exit_hander in pgpool main * process */ +#define TRACK_TABLE_MUTATION_TABLE_SEM 8 +#define TRACK_TABLE_MUTATION_QUERY_SEM 9 #define MAX_REQUEST_QUEUE_SIZE 10 #define MAX_SEC_WAIT_FOR_CLUSTER_TRANSACTION 10 /* time in seconds to keep diff --git a/src/include/pool_config.h b/src/include/pool_config.h index 9a397d166..ae507dc60 100644 --- a/src/include/pool_config.h +++ b/src/include/pool_config.h @@ -105,9 +105,13 @@ typedef enum DLBOW_OPTION DLBOW_TRANSACTION, DLBOW_TRANS_TRANSACTION, DLBOW_ALWAYS, - DLBOW_DML_ADAPTIVE + DLBOW_DML_ADAPTIVE, + DLBOW_DML_ADAPTIVE_GLOBAL } DLBOW_OPTION; +#define DLBOW_IS_DML_ADAPTIVE(opt) \ + ((opt) == DLBOW_DML_ADAPTIVE || (opt) == DLBOW_DML_ADAPTIVE_GLOBAL) + typedef enum RELQTARGET_OPTION { RELQTARGET_PRIMARY = 1, @@ -363,8 +367,26 @@ typedef struct char *sr_check_password; /* password for sr_check_user */ char *sr_check_database; /* PostgreSQL database name for streaming * replication check */ - char *replication_delay_source_cmd; /* external command for replication delay */ - int replication_delay_source_timeout; /* timeout for external command in seconds */ + char *replication_delay_source_cmd; /* external command for + * replication delay */ + int replication_delay_source_timeout; /* timeout for external + * command in seconds */ + + /* Track table mutation configuration */ + double track_table_mutation_ttl_factor; /* TTL multiplier for + * replication delay */ + int track_table_mutation_max_staleness; /* max staleness duration + * ms */ + int track_table_mutation_cold_start_duration; /* cold start duration + * ms */ + int track_table_mutation_table_buckets; /* hash buckets for table + * map */ + int track_table_mutation_table_size; /* max table map entries */ + int track_table_mutation_query_buckets; /* hash buckets for query + * cache */ + int track_table_mutation_query_parse_cache_size; /* max query cache + * entries */ + char *failover_command; /* execute command when failover happens */ char *follow_primary_command; /* execute command when failover is * ended */ diff --git a/src/include/utils/pool_track_table_mutation.h b/src/include/utils/pool_track_table_mutation.h new file mode 100644 index 000000000..28dec1c8a --- /dev/null +++ b/src/include/utils/pool_track_table_mutation.h @@ -0,0 +1,247 @@ +/* -*-pgsql-c-*- */ +/* + * pgpool: a language independent connection pool server for PostgreSQL + * written by Tatsuo Ishii + * + * Copyright (c) 2003-2026 PgPool Global Development Group + * + * Permission to use, copy, modify, and distribute this software and + * its documentation for any purpose and without fee is hereby + * granted, provided that the above copyright notice appear in all + * copies and that both that copyright notice and this permission + * notice appear in supporting documentation, and that the name of the + * author not be used in advertising or publicity pertaining to + * distribution of the software without specific, written prior + * permission. The author makes no representations about the + * suitability of this software for any purpose. It is provided "as + * is" without express or implied warranty. + * + * pool_track_table_mutation.h: In-memory tracking of + * recently written tables to prevent stale reads. + */ + +#ifndef POOL_TRACK_TABLE_MUTATION_H +#define POOL_TRACK_TABLE_MUTATION_H + +#include "pool.h" +#include + +/* + * Maximum table name length including schema: "schema"."table" + * Using NAMEDATALEN * 2 + 4 for quotes and dot + */ +#define TRACK_TABLE_MUTATION_TABLE_NAME_LEN (NAMEDATALEN * 2 + 4) + +/* + * Maximum number of tables we track per query + */ +#define TRACK_TABLE_MUTATION_MAX_TABLES_PER_QUERY 8 + +/* + * Invalid index marker for linked lists + */ +#define TRACK_TABLE_MUTATION_INVALID_INDEX (-1) + +/* + * Default TTL in microseconds (100ms) used when replication delay is unknown + */ +#define TRACK_TABLE_MUTATION_DEFAULT_TTL_US (100 * 1000) + +/* + * Entry in the table mutation hash table (keyed by table/database oids) + */ +typedef struct TrackTableMutationEntry +{ + int table_oid; /* Table oid */ + int dboid; /* Database oid */ + struct timeval first_write_time; /* When the entry was first created */ + struct timeval last_write_time; /* When the table was last written */ + uint32 hash; /* Pre-computed hash value */ + int next; /* Next in collision chain */ + bool in_use; /* Is this entry in use? */ +} TrackTableMutationEntry; + +/* + * Header for the table mutation hash table in shared memory + */ +typedef struct TrackTableMutationHashTable +{ + int num_buckets; /* Number of hash buckets */ + int max_entries; /* Maximum entries allowed */ + int num_entries; /* Current number of entries */ + int free_list_head; /* Head of free entry list */ + + /* + * Flexible array members follow in shared memory: int + * buckets[num_buckets]; TrackTableMutationEntry entries[max_entries]; + */ +} TrackTableMutationHashTable; + +/* + * Entry in the query parse cache + */ +typedef struct QueryParseEntry +{ + uint64 query_hash; /* Hash of normalized query */ + bool is_write; /* True if INSERT/UPDATE/DELETE */ + int num_tables; /* Number of tables in query */ + char table_names + [ TRACK_TABLE_MUTATION_MAX_TABLES_PER_QUERY] + [ TRACK_TABLE_MUTATION_TABLE_NAME_LEN]; + int next; /* Next entry in collision chain */ + int lru_prev; /* Previous in LRU list */ + int lru_next; /* Next in LRU list */ + bool in_use; /* Is this entry in use? */ +} QueryParseEntry; + +/* + * Header for the query parse cache in shared memory + */ +typedef struct QueryParseCache +{ + int num_buckets; /* Number of hash buckets */ + int max_entries; /* Maximum entries allowed */ + int num_entries; /* Current number of entries */ + int free_list_head; /* Head of free entry list */ + int lru_head; /* Most recently used */ + int lru_tail; /* Least recently used */ + + /* + * Flexible array members follow in shared memory: int + * buckets[num_buckets]; QueryParseEntry entries[max_entries]; + */ +} QueryParseCache; + +/* + * Global state for track table mutation feature + */ +typedef struct TrackTableMutationState +{ + bool initialized; /* Shmem initialized? */ + uint64 current_ttl_us; /* Current TTL in microseconds */ + struct timeval ttl_last_updated; /* When TTL was last updated */ + struct timeval last_cleanup_time; /* When last expired cleanup ran */ + struct timeval global_cold_start_until; /* Global cold start end time */ + uint32 stats_queries_checked; /* Queries checked */ + uint32 stats_forced_primary; /* Forced to primary */ + uint32 stats_allowed_replica; /* Allowed to replica */ +} TrackTableMutationState; + +/* + * Main shared memory structure containing all components + */ +typedef struct TrackTableMutationShmem +{ + TrackTableMutationState state; + TrackTableMutationHashTable *table_map; + QueryParseCache *query_cache; +} TrackTableMutationShmem; + +/* ---------------- + * Public API functions + * ---------------- + */ + +/* + * Initialize shared memory structures for track table mutation. + * Called from pgpool_main.c after pool_init_pool_info(). + */ +extern void pool_track_table_mutation_init(void); + +/* + * Initialize per-child process state for track table mutation. + * Called from child.c when a new child process starts. + * Sets up cold start tracking. + */ +extern void pool_track_table_mutation_child_init(void); + +/* + * Check if the child process is in cold start period. + * During cold start, all queries are routed to primary. + * Returns true if in cold start, false otherwise. + */ +extern bool pool_track_table_mutation_in_cold_start(void); + +/* + * Trigger a global cold start period for all processes. + * Used after watchdog leader change to avoid stale reads. + */ +extern void pool_track_table_mutation_trigger_global_cold_start(void); + +/* + * Get oid of current database. + */ +extern int pool_track_table_mutation_get_database_oid(void); + +/* + * Check if a table was recently written to (is "stale"). + * If stale, reads from this table should go to primary. + * Returns true if table is stale (recently written), false otherwise. + */ +extern bool pool_track_table_mutation_table_is_stale( + int table_oid, int dboid); + +/* + * Mark tables as recently written. + * Called after INSERT/UPDATE/DELETE queries complete. + * table_oids: array of table oids + * num_tables: number of tables in array + * dboid: database oid + */ +extern void pool_track_table_mutation_mark_tables_written( + const int *table_oids, int num_tables, int dboid); + +/* + * Convenience function to mark a single table as written. + * table_oid: table oid + * dboid: database oid + */ +extern void pool_track_table_mutation_mark_table_written( + int table_oid, int dboid); + +/* + * Update the TTL based on current replication delay. + * Called from pool_worker_child.c when replication delay is updated. + * delay_us: replication delay in microseconds + */ +extern void pool_track_table_mutation_update_ttl(uint64 delay_us); + +/* + * Look up cached parse result for a query. + * hash: hash of normalized query + * is_write: output - true if query is a write + * table_names: output - array to fill with table names + * num_tables: output - number of tables found + * Returns true if found in cache, false otherwise. + */ +extern bool pool_track_table_mutation_get_cached_parse( + uint64 hash, bool *is_write, + char table_names[][TRACK_TABLE_MUTATION_TABLE_NAME_LEN], + int *num_tables); + +/* + * Cache a parse result for a query. + * hash: hash of normalized query + * is_write: true if query is a write + * table_names: array of table names + * num_tables: number of tables + */ +extern void pool_track_table_mutation_cache_parse( + uint64 hash, bool is_write, + const char table_names[][TRACK_TABLE_MUTATION_TABLE_NAME_LEN], + int num_tables); + +/* + * Normalize a query and compute its hash. + * Strips comments, normalizes whitespace and literals. + * query: input SQL query string + * Returns: 64-bit hash of normalized query + */ +extern uint64 pool_track_table_mutation_normalize_and_hash(const char *query); + +/* + * Calculate required shared memory size for track table mutation. + */ +extern Size pool_track_table_mutation_shmem_size(void); + +#endif /* POOL_TRACK_TABLE_MUTATION_H */ diff --git a/src/main/pgpool_main.c b/src/main/pgpool_main.c index bf7c452e2..d4e274f02 100644 --- a/src/main/pgpool_main.c +++ b/src/main/pgpool_main.c @@ -57,6 +57,7 @@ #include "auth/pool_passwd.h" #include "auth/pool_hba.h" #include "query_cache/pool_memqcache.h" +#include "utils/pool_track_table_mutation.h" #include "watchdog/wd_internal_commands.h" #include "watchdog/wd_lifecheck.h" #include "watchdog/watchdog.h" @@ -1500,11 +1501,14 @@ sigusr1_interrupt_processor(void) if (user1SignalSlot->signalFlags[SIG_WATCHDOG_STATE_CHANGED]) { + WD_STATES wd_state; + ereport(LOG, (errmsg("Pgpool-II parent process received watchdog state change signal from watchdog"))); user1SignalSlot->signalFlags[SIG_WATCHDOG_STATE_CHANGED] = false; - if (wd_internal_get_watchdog_local_node_state() == WD_STANDBY) + wd_state = wd_internal_get_watchdog_local_node_state(); + if (wd_state == WD_STANDBY) { ereport(LOG, (errmsg("we have joined the watchdog cluster as STANDBY node"), @@ -1518,6 +1522,12 @@ sigusr1_interrupt_processor(void) */ pool_release_follow_primary_lock(true); } + else if (wd_state == WD_COORDINATOR && + pool_config->disable_load_balance_on_write == + DLBOW_DML_ADAPTIVE_GLOBAL) + { + pool_track_table_mutation_trigger_global_cold_start(); + } } if (user1SignalSlot->signalFlags[SIG_FAILOVER_INTERRUPT]) { @@ -3083,6 +3093,16 @@ initialize_shared_mem_objects(bool clear_memcache_oidmaps) elog(DEBUG1, "watchdog: %zu bytes requested for shared memory", MAXALIGN(wd_ipc_get_shared_mem_size())); } + if (pool_config->disable_load_balance_on_write == + DLBOW_DML_ADAPTIVE_GLOBAL) + { + size += MAXALIGN(pool_track_table_mutation_shmem_size()); + elog(DEBUG1, + "track_table_mutation: %zu bytes requested" + " for shared memory", + MAXALIGN(pool_track_table_mutation_shmem_size())); + } + initialize_shared_memory_main_segment(size); /* Move the backend descriptors to shared memory */ @@ -3199,6 +3219,13 @@ initialize_shared_mem_objects(bool clear_memcache_oidmaps) wd_ipc_initialize_data(); } + /* Initialize track table mutation for recently written tables */ + if (pool_config->disable_load_balance_on_write == + DLBOW_DML_ADAPTIVE_GLOBAL) + { + pool_track_table_mutation_init(); + } + } /* diff --git a/src/protocol/CommandComplete.c b/src/protocol/CommandComplete.c index a3b8f0ea1..f445f268b 100644 --- a/src/protocol/CommandComplete.c +++ b/src/protocol/CommandComplete.c @@ -38,6 +38,8 @@ #include "utils/palloc.h" #include "utils/memutils.h" #include "utils/pool_stream.h" +#include "utils/pool_track_table_mutation.h" +#include "query_cache/pool_memqcache.h" static int extract_ntuples(char *message); static POOL_STATUS handle_mismatch_tuples(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *packet, int packetlen, bool command_complete); @@ -304,6 +306,32 @@ handle_query_context(POOL_CONNECTION_POOL *backend) node = session_context->query_context->parse_tree; + /* + * Track table writes for dml_adaptive_global feature. For autocommit + * statements (not in explicit transaction), mark tables immediately. For + * explicit transactions, marking is deferred to COMMIT in dml_adaptive() + * so that ROLLBACKed writes don't pollute the shared memory table map. + */ + if (pool_config->disable_load_balance_on_write == + DLBOW_DML_ADAPTIVE_GLOBAL && + node != NULL && + !session_context->is_in_transaction) + { + int *oids; + int num_oids; + + num_oids = pool_extract_table_oids(node, &oids); + if (num_oids > 0) + { + int dboid; + + dboid = pool_track_table_mutation_get_database_oid(); + if (dboid > 0) + pool_track_table_mutation_mark_tables_written( + oids, num_oids, dboid); + } + } + if (IsA(node, PrepareStmt)) { if (session_context->uncompleted_message) diff --git a/src/protocol/child.c b/src/protocol/child.c index c34f05728..316b76239 100644 --- a/src/protocol/child.c +++ b/src/protocol/child.c @@ -57,6 +57,7 @@ #include "utils/elog.h" #include "utils/ps_status.h" #include "utils/timestamp.h" +#include "utils/pool_track_table_mutation.h" #include "context/pool_process_context.h" #include "context/pool_session_context.h" @@ -213,6 +214,13 @@ do_child(int *fds) /* Initialize per process context */ pool_init_process_context(); + /* Initialize track table mutation child state for cold start tracking */ + if (pool_config->disable_load_balance_on_write == + DLBOW_DML_ADAPTIVE_GLOBAL) + { + pool_track_table_mutation_child_init(); + } + /* initialize connection pool */ if (pool_init_cp()) { diff --git a/src/protocol/pool_proto_modules.c b/src/protocol/pool_proto_modules.c index f9458bb55..74ee00d16 100644 --- a/src/protocol/pool_proto_modules.c +++ b/src/protocol/pool_proto_modules.c @@ -1461,7 +1461,9 @@ Parse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, pool_where_to_send(query_context, query_context->original_query, query_context->parse_tree); - if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE && strlen(name) != 0) + if (DLBOW_IS_DML_ADAPTIVE( + pool_config->disable_load_balance_on_write) + && strlen(name) != 0) pool_setall_node_to_be_sent(query_context); if (REPLICATION) @@ -1804,7 +1806,7 @@ Bind(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, return POOL_END; } - if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE && + if (DLBOW_IS_DML_ADAPTIVE(pool_config->disable_load_balance_on_write) && TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID) == 'T') { pool_where_to_send(query_context, query_context->original_query, diff --git a/src/sample/pgpool.conf.sample-stream b/src/sample/pgpool.conf.sample-stream index 1ac982907..00132d534 100644 --- a/src/sample/pgpool.conf.sample-stream +++ b/src/sample/pgpool.conf.sample-stream @@ -478,6 +478,14 @@ backend_clustering_mode = streaming_replication # modified within the current explicit transaction will # not be load balanced until the end of the transaction. # + # dml_adaptive_global: + # Superset of dml_adaptive. In addition to per-transaction + # tracking, uses shared memory to track recently written + # tables across all sessions. Reads from recently written + # tables are routed to primary until a TTL (based on + # replication delay) expires. Requires additional shared + # memory. See track_table_mutation_* parameters below. + # # always: # if a write query is issued, read queries will # not be load balanced until the session ends. @@ -499,6 +507,54 @@ backend_clustering_mode = streaming_replication #statement_level_load_balance = off # Enables statement level load balancing +# - Track Table Mutation (used by dml_adaptive_global) - + # WARNING: dml_adaptive_global increases shared memory usage + # Default settings require ~6.4 MB shared memory + # (0.1 MB table tracking + 6.3 MB query cache) + +#track_table_mutation_ttl_factor = 5.0 + # TTL multiplier: TTL = replication_delay * factor + # Higher values provide more safety margin + # Range: 1.0-100.0 (default: 5.0) + # (change requires reload) + +#track_table_mutation_max_staleness = 60000 + # Maximum duration (ms) a table can be marked stale + # from its first write. Bounds cross-session impact: + # even under continuous writes, staleness expires + # after this period and is only renewed by new writes. + # 0 disables the cap. Range: 0-3600000 (default: 60000 = 60s) + # (change requires reload) + +#track_table_mutation_cold_start_duration = 2000 + # Duration in milliseconds to route all queries to primary + # after child process starts (cold start period) + # Range: 0-60000 ms (default: 2000 ms = 2 seconds) + # Set to 0 to disable cold start behavior + # (change requires reload) + +#track_table_mutation_table_buckets = 1024 + # Number of hash buckets for track table mutation + # Higher values reduce hash collisions + # Range: 64-65536 (default: 1024) + # (change requires restart) + +#track_table_mutation_table_size = 2048 + # Maximum number of tables to track simultaneously + # Range: 128-131072 (default: 2048) + # (change requires restart) + +#track_table_mutation_query_buckets = 2048 + # Number of hash buckets for query parse cache + # Range: 64-65536 (default: 2048) + # (change requires restart) + +#track_table_mutation_query_parse_cache_size = 10000 + # Maximum number of query parse results to cache + # Range: 100-1000000 (default: 10000) + # Memory usage: ~640 bytes per entry (~6.3 MB default, ~64 MB for 100000) + # (change requires restart) + #------------------------------------------------------------------------------ # STREAMING REPLICATION MODE #------------------------------------------------------------------------------ diff --git a/src/streaming_replication/pool_worker_child.c b/src/streaming_replication/pool_worker_child.c index 311b63865..cdd570396 100644 --- a/src/streaming_replication/pool_worker_child.c +++ b/src/streaming_replication/pool_worker_child.c @@ -58,6 +58,7 @@ #include "utils/pool_ip.h" #include "utils/ps_status.h" #include "utils/pool_stream.h" +#include "utils/pool_track_table_mutation.h" #include "context/pool_process_context.h" #include "context/pool_session_context.h" @@ -419,6 +420,7 @@ check_replication_time_lag(void) BackendInfo *bkinfo; uint64 lag; uint64 delay_threshold_by_time; + uint64 max_delay_us = 0; ErrorContextCallback callback; int active_standby_node; bool replication_delay_by_time; @@ -643,6 +645,10 @@ check_replication_time_lag(void) * seconds to micro * seconds */ + /* Track max delay for mutation TTL */ + if (lag > max_delay_us) + max_delay_us = lag; + /* Log delay if necessary */ if ((pool_config->log_standby_delay == LSD_ALWAYS && lag > 0) || (pool_config->log_standby_delay == LSD_OVER_THRESHOLD && @@ -668,6 +674,13 @@ check_replication_time_lag(void) } } + /* + * Update track table mutation TTL from the max observed time-based + * replication delay. + */ + if (replication_delay_by_time && max_delay_us > 0) + pool_track_table_mutation_update_ttl(max_delay_us); + error_context_stack = callback.previous; } @@ -695,6 +708,7 @@ check_replication_time_lag_with_cmd(void) double delay_ms; uint64 delay; uint64 delay_threshold_by_time; + uint64 max_delay_us = 0; /* Track max delay for mutation map */ int token_count = 0; int primary_node_id; int save_errno; @@ -1003,6 +1017,10 @@ check_replication_time_lag_with_cmd(void) bkinfo->standby_delay = delay; bkinfo->standby_delay_by_time = true; + /* Track maximum delay for table mutation map TTL calculation */ + if (delay > max_delay_us) + max_delay_us = delay; + /* * Log delay if necessary. threshold is in milliseconds, convert * to microseconds. @@ -1021,6 +1039,12 @@ check_replication_time_lag_with_cmd(void) token = strtok_r(NULL, " \t\n", &saveptr); } + /* Update table mutation TTL based on max observed delay */ + if (pool_config->disable_load_balance_on_write == + DLBOW_DML_ADAPTIVE_GLOBAL && + max_delay_us > 0) + pool_track_table_mutation_update_ttl(max_delay_us); + } PG_CATCH(); { diff --git a/src/test/regression/libs.sh b/src/test/regression/libs.sh index 7c5a0c182..1c8ae392d 100644 --- a/src/test/regression/libs.sh +++ b/src/test/regression/libs.sh @@ -42,6 +42,8 @@ function wait_for_failover_done { function clean_all { pgrep pgpool | xargs kill -9 > /dev/null 2>&1 pgrep postgres | xargs kill -9 > /dev/null 2>&1 + # Clean up leaked SysV IPC resources left behind by kill -9 + ipcrm --all 2>/dev/null || true rm -f $PGSOCKET_DIR/.s.PGSQL.* netstat -t -p 2>/dev/null|grep pgpool } diff --git a/src/test/regression/tests/042.track_table_mutation/test.sh b/src/test/regression/tests/042.track_table_mutation/test.sh new file mode 100755 index 000000000..8b4dd17b8 --- /dev/null +++ b/src/test/regression/tests/042.track_table_mutation/test.sh @@ -0,0 +1,354 @@ +#!/usr/bin/env bash +#------------------------------------------------------------------- +# test script for track table mutation feature (in-memory table tracking). +# Tests routing of queries based on recently written tables. +# +source $TESTLIBS +TESTDIR=testdir +PSQL=$PGBIN/psql +PSQLOPTS="-a -q -X" +PGPOOLBIN=$PGPOOL_INSTALL_DIR/bin +export PGDATABASE=test + +# Only run in streaming replication mode since that's the target use case +for mode in s +do + rm -fr $TESTDIR + mkdir $TESTDIR + cd $TESTDIR + + # Create test environment with 2 nodes + echo -n "creating test environment..." + $PGPOOL_SETUP -m $mode -n 2 || exit 1 + echo "done." + + source ./bashrc.ports + + # Configure track table mutation feature via dml_adaptive_global + echo "disable_load_balance_on_write = 'dml_adaptive_global'" >> etc/pgpool.conf + echo "track_table_mutation_ttl_factor = 5.0" >> etc/pgpool.conf + echo "track_table_mutation_cold_start_duration = 10000" >> etc/pgpool.conf + + # Enable load balancing explicitly + echo "load_balance_mode = on" >> etc/pgpool.conf + + # Configure weights so we can distinguish routing + # Backend 0 (primary) weight=0, Backend 1 (standby) weight=1 + # This means load balanced queries go to node 1 by default + echo "backend_weight0 = 0" >> etc/pgpool.conf + echo "backend_weight1 = 1" >> etc/pgpool.conf + + # Enable debug logging to see routing decisions + echo "log_min_messages = debug1" >> etc/pgpool.conf + + ./startall + + export PGPORT=$PGPOOL_PORT + export PGHOST=localhost + + wait_for_pgpool_startup + + # Create test tables + $PSQL test < /dev/null 2>&1 + + # Check log for cold start message (use -a to handle binary log files) + if grep -a -q "could not load balance because of track table mutation cold start" log/pgpool.log; then + echo "Test 1 PASSED: Cold start routing works" + else + echo "Test 1 FAILED: Cold start routing not detected" + ./shutdownall + exit 1 + fi + + echo "=== Test 2: Wait for cold start to end ===" + # Wait for cold start period to end (10 seconds). + # Use generous margin to avoid flakiness under load (e.g. full regression suite). + sleep 12 + + # Clear the log + > log/pgpool.log + + # Now a clean table query should load balance (go to node 1) + $PSQL test -c "SELECT 'after_cold_start' as marker, * FROM t3;" > /dev/null 2>&1 + + # After cold start, queries to clean tables should load balance + # Check that it did NOT get forced to primary due to track table mutation + if grep -a -q "could not load balance because of track table mutation cold start" log/pgpool.log; then + echo "Test 2 FAILED: Still in cold start after waiting" + ./shutdownall + exit 1 + fi + echo "Test 2 PASSED: Cold start ended correctly" + + echo "=== Test 3: Write-then-Read Routing ===" + # Clear the log + > log/pgpool.log + + # Write to t1 and then read - use single connection to ensure same session + $PSQL test < log/pgpool.log + + # Read from t2 (never written to) - should load balance + $PSQL test -c "SELECT 'clean_table_test' as marker, * FROM t2;" > /dev/null 2>&1 + + # Should NOT see track table mutation blocking message for t2 + if grep -a -q "could not load balance because table.*t2.*was recently written" log/pgpool.log; then + echo "Test 4 FAILED: Clean table incorrectly marked as stale" + ./shutdownall + exit 1 + fi + echo "Test 4 PASSED: Clean tables still load balance" + + echo "=== Test 5: UPDATE Marks Table as Stale ===" + # Clear the log + > log/pgpool.log + + # Update t2 and then read - use single connection + $PSQL test < log/pgpool.log + + # Delete from t3 and then read - use single connection + $PSQL test < log/pgpool.log + + # Create a fresh table for TRUNCATE test + $PSQL test -c "CREATE TABLE t_truncate(i INTEGER);" > /dev/null 2>&1 + $PSQL test -c "INSERT INTO t_truncate VALUES (1), (2), (3);" > /dev/null 2>&1 + + # Wait for any TTL to expire + sleep 3 + + # Clear the log again + > log/pgpool.log + + # Truncate and then read - use single connection + $PSQL test < log/pgpool.log + + # Create a fresh table for WITH test + $PSQL test -c "CREATE TABLE t_cte(i INTEGER);" > /dev/null 2>&1 + $PSQL test -c "INSERT INTO t_cte VALUES (1), (2), (3);" > /dev/null 2>&1 + + # Wait for any TTL to expire + sleep 3 + + # Clear the log again + > log/pgpool.log + + # Use WITH clause with DELETE, then read from the table + $PSQL test </dev/null; then + echo "=== Test 9: MERGE Marks Table as Stale (PostgreSQL $PG_MAJOR_VERSION) ===" + # Clear the log + > log/pgpool.log + + # Create tables for MERGE test + $PSQL test -c "CREATE TABLE t_merge_target(id INTEGER PRIMARY KEY, val TEXT);" > /dev/null 2>&1 + $PSQL test -c "CREATE TABLE t_merge_source(id INTEGER, val TEXT);" > /dev/null 2>&1 + $PSQL test -c "INSERT INTO t_merge_target VALUES (1, 'old');" > /dev/null 2>&1 + $PSQL test -c "INSERT INTO t_merge_source VALUES (1, 'new'), (2, 'insert');" > /dev/null 2>&1 + + # Wait for any TTL to expire + sleep 3 + + # Clear the log again + > log/pgpool.log + + # Use MERGE, then read from the target table + $PSQL test < /dev/null 2>&1 + + # Wait for any TTL to expire + sleep 3 + + # Clear the log + > log/pgpool.log + + # Write inside a transaction, then rollback + $PSQL test < /dev/null 2>&1 + + # Wait for any TTL to expire + sleep 3 + + # Clear the log + > log/pgpool.log + + # Write inside a transaction, then commit, then read + $PSQL test <> pgpool${i}/etc/pgpool.conf < /dev/null 2>&1 + if [ $? = 0 ]; then + break + fi + sleep 2 +done +echo "done." + +# Test 1: Verify leader came up +echo "=== Test 1: Waiting for the pgpool leader... ===" +for i in 1 2 3 4 5 6 7 8 9 10 +do + grep "I am the cluster leader node" \ + pgpool0/log/pgpool.log > /dev/null 2>&1 + if [ $? = 0 ]; then + success_count=$(( success_count + 1 )) + echo "Test 1 PASSED: Leader brought up." + break + fi + echo "[check] $i times" + sleep 2 +done + +if [ $success_count -lt 1 ]; then + echo "Test 1 FAILED: Leader did not start" + ./shutdownall + exit 1 +fi + +# Test 2: Verify standby joined cluster +echo "=== Test 2: Waiting for standby to join... ===" +for i in 1 2 3 4 5 6 7 8 9 10 +do + grep "successfully joined the watchdog cluster" \ + pgpool1/log/pgpool.log > /dev/null 2>&1 + if [ $? = 0 ]; then + success_count=$(( success_count + 1 )) + echo "Test 2 PASSED: Standby joined." + break + fi + echo "[check] $i times" + sleep 2 +done + +if [ $success_count -lt 2 ]; then + echo "Test 2 FAILED: Standby did not join" + ./shutdownall + exit 1 +fi + +# Test 3: Verify track_table_mutation initialized +echo "=== Test 3: Verify feature initialized ===" +if grep -a "track_table_mutation: initialized" \ + pgpool0/log/pgpool.log > /dev/null 2>&1; then + success_count=$(( success_count + 1 )) + echo "Test 3 PASSED: Feature initialized." +else + echo "Test 3 FAILED: Feature not initialized" + ./shutdownall + exit 1 +fi + +# Test 4: Stop leader (pgpool0) to trigger failover +echo "=== Test 4: Stopping leader... ===" +cd pgpool0 +source ./bashrc.ports +$PGPOOL_INSTALL_DIR/bin/pgpool \ + -f etc/pgpool.conf -m f stop +cd .. + +echo "Checking standby detected shutdown..." +for i in 1 2 3 4 5 6 7 8 9 10 +do + grep -a "is shutting down" \ + pgpool1/log/pgpool.log > /dev/null 2>&1 + if [ $? = 0 ]; then + success_count=$(( success_count + 1 )) + echo "Test 4 PASSED: Shutdown detected." + break + fi + echo "[check] $i times" + sleep 2 +done + +if [ $success_count -lt 4 ]; then + echo "Test 4 FAILED: Shutdown not detected" + ./shutdownall + exit 1 +fi + +# Test 5: Verify standby became new leader +echo "=== Test 5: Checking standby takes over... ===" +for i in 1 2 3 4 5 6 7 8 9 10 +do + grep -a "I am the cluster leader node" \ + pgpool1/log/pgpool.log > /dev/null 2>&1 + if [ $? = 0 ]; then + success_count=$(( success_count + 1 )) + echo "Test 5 PASSED: Standby became leader." + break + fi + echo "[check] $i times" + sleep 2 +done + +if [ $success_count -lt 5 ]; then + echo "Test 5 FAILED: Standby did not become leader" + ./shutdownall + exit 1 +fi + +# Test 6: Verify global cold start was triggered +echo "=== Test 6: Checking global cold start... ===" +for i in 1 2 3 4 5 6 7 8 9 10 +do + grep -a "track_table_mutation: global cold start" \ + pgpool1/log/pgpool.log > /dev/null 2>&1 + if [ $? = 0 ]; then + success_count=$(( success_count + 1 )) + echo "Test 6 PASSED: Global cold start triggered." + break + fi + echo "[check] $i times" + sleep 2 +done + +# Cleanup +./shutdownall + +echo "" +echo "$success_count out of 6 successful" + +if test $success_count -eq 6 +then + echo "=== All Watchdog Tests PASSED ===" + exit 0 +fi + +exit 1 diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 939200965..0f1fa884c 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -431,6 +431,8 @@ PublicationObjSpec PublicationObjSpecType PublicationTable Query +QueryParseCache +QueryParseEntry QuerySource RELQTARGET_OPTION RTEKind @@ -519,6 +521,10 @@ TableLikeClause TableSampleClause TargetEntry TokenizedLine +TrackTableMutationEntry +TrackTableMutationHashTable +TrackTableMutationShmem +TrackTableMutationState TransactionId TransactionStmt TransactionStmtKind diff --git a/src/utils/pool_track_table_mutation.c b/src/utils/pool_track_table_mutation.c new file mode 100644 index 000000000..9be46b28f --- /dev/null +++ b/src/utils/pool_track_table_mutation.c @@ -0,0 +1,1450 @@ +/* -*-pgsql-c-*- */ +/* + * pgpool: a language independent connection pool server for PostgreSQL + * written by Tatsuo Ishii + * + * Copyright (c) 2003-2026 PgPool Global Development Group + * + * Permission to use, copy, modify, and distribute this software and + * its documentation for any purpose and without fee is hereby + * granted, provided that the above copyright notice appear in all + * copies and that both that copyright notice and this permission + * notice appear in supporting documentation, and that the name of the + * author not be used in advertising or publicity pertaining to + * distribution of the software without specific, written prior + * permission. The author makes no representations about the + * suitability of this software for any purpose. It is provided "as + * is" without express or implied warranty. + * + * pool_track_table_mutation.c: In-memory tracking of recently + * written tables to prevent stale reads from replicas. + * + * Based on the "lagless" architecture from Tailor Brands. + */ + +#include +#include +#include +#include + +#include "pool.h" +#include "pool_config.h" +#include "context/pool_session_context.h" +#include "utils/pool_track_table_mutation.h" +#include "utils/elog.h" +#include "utils/pool_ipc.h" +#include "utils/palloc.h" +#include "utils/pool_relcache.h" + +#define DATABASE_TO_OID_QUERY \ + "SELECT oid FROM pg_catalog.pg_database" \ + " WHERE datname = '%s'" + +/* + * Helper macro: true when the feature is not active. + */ +#define TRACK_TABLE_MUTATION_DISABLED() \ + (pool_config->disable_load_balance_on_write != \ + DLBOW_DML_ADAPTIVE_GLOBAL || \ + track_table_mutation_shmem == NULL) + +/* ---------------- + * Local variables + * ---------------- + */ + +/* Pointer to shared memory structure */ +static TrackTableMutationShmem *track_table_mutation_shmem = NULL; + +/* Per-process cold start tracking (not in shared memory) */ +static struct timeval process_start_time; +static bool cold_start_initialized = false; + +/* ---------------- + * Helper macros for flexible arrays in shared memory + * ---------------- + */ + +/* Get pointer to bucket array in table map */ +#define TABLE_MAP_BUCKETS(map) \ + ((int *)((char *)(map) + \ + sizeof(TrackTableMutationHashTable))) + +/* Get pointer to entry array in table map */ +#define TABLE_MAP_ENTRIES(map) \ + ((TrackTableMutationEntry *)((char *)(map) + \ + sizeof(TrackTableMutationHashTable) + \ + (map)->num_buckets * sizeof(int))) + +/* Get pointer to bucket array in parse cache */ +#define PARSE_CACHE_BUCKETS(cache) \ + ((int *)((char *)(cache) + sizeof(QueryParseCache))) + +/* Get pointer to entry array in parse cache */ +#define PARSE_CACHE_ENTRIES(cache) \ + ((QueryParseEntry *)((char *)(cache) + \ + sizeof(QueryParseCache) + \ + (cache)->num_buckets * sizeof(int))) + +/* ---------------- + * Semaphore lock helpers + * ---------------- + */ + +static inline void +table_map_lock(void) +{ + pool_semaphore_lock(TRACK_TABLE_MUTATION_TABLE_SEM); +} + +static inline void +table_map_unlock(void) +{ + pool_semaphore_unlock(TRACK_TABLE_MUTATION_TABLE_SEM); +} + +static inline void +parse_cache_lock(void) +{ + pool_semaphore_lock(TRACK_TABLE_MUTATION_QUERY_SEM); +} + +static inline void +parse_cache_unlock(void) +{ + pool_semaphore_unlock(TRACK_TABLE_MUTATION_QUERY_SEM); +} + +/* ---------------- + * Hash functions + * ---------------- + */ + +/* + * FNV-1a hash for table/database oid pair + */ +static uint32 +fnv1a_hash_table_key(int table_oid, int dboid) +{ + uint32 hash = 2166136261u; /* FNV offset basis */ + uint32 data[2]; + const unsigned char *bytes; + size_t i; + + data[0] = (uint32) table_oid; + data[1] = (uint32) dboid; + bytes = (const unsigned char *) data; + + for (i = 0; i < sizeof(data); i++) + { + hash ^= bytes[i]; + hash *= 16777619u; /* FNV prime */ + } + + return hash; +} + +/* + * FNV-1a hash for 64-bit value + */ +static uint64 +fnv1a_hash_64(const char *str, size_t len) +{ + /* FNV offset basis for 64-bit */ + uint64 hash = 14695981039346656037ULL; + size_t i; + + for (i = 0; i < len; i++) + { + hash ^= (uint8) str[i]; + hash *= 1099511628211ULL; /* FNV prime */ + } + + return hash; +} + +/* ---------------- + * Time utilities + * ---------------- + */ + +/* + * Get elapsed time in microseconds between two timevals + */ +static int64 +elapsed_us(struct timeval *start, struct timeval *end) +{ + return ((int64) (end->tv_sec - start->tv_sec) * 1000000) + + (end->tv_usec - start->tv_usec); +} + +/* + * Get current time + */ +static void +get_current_time(struct timeval *tv) +{ + gettimeofday(tv, NULL); +} + +/* ---------------- + * Database oid lookup + * ---------------- + */ + +static int +track_table_mutation_get_database_oid_internal(void) +{ + int oid = 0; + static POOL_RELCACHE *relcache; + POOL_CONNECTION_POOL *backend; + POOL_SESSION_CONTEXT *session_context; + + /* Safety check: must have shmem initialized */ + if (track_table_mutation_shmem == NULL) + return oid; + + session_context = pool_get_session_context(false); + if (session_context == NULL) + return oid; + + backend = session_context->backend; + if (backend == NULL || + MAIN_CONNECTION(backend) == NULL || + MAIN_CONNECTION(backend)->sp == NULL) + return oid; + + /* Ensure database name is valid */ + if (MAIN_CONNECTION(backend)->sp->database == NULL) + return oid; + + if (!relcache) + { + relcache = pool_create_relcache( + pool_config->relcache_size, + DATABASE_TO_OID_QUERY, + int_register_func, + int_unregister_func, + false); + if (relcache == NULL) + { + ereport(LOG, + (errmsg("track_table_mutation: " + "error creating relcache"))); + return oid; + } + } + + oid = (int) (intptr_t) pool_search_relcache( + relcache, backend, + MAIN_CONNECTION(backend)->sp->database); + return oid; +} + +int +pool_track_table_mutation_get_database_oid(void) +{ + return track_table_mutation_get_database_oid_internal(); +} + +/* ---------------- + * Table mutation hash table operations + * ---------------- + */ + +/* + * Initialize table mutation hash table + */ +static void +table_map_init(TrackTableMutationHashTable *map, + int num_buckets, int max_entries) +{ + int *buckets; + TrackTableMutationEntry *entries; + int i; + int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX; + + map->num_buckets = num_buckets; + map->max_entries = max_entries; + map->num_entries = 0; + map->free_list_head = 0; + + buckets = TABLE_MAP_BUCKETS(map); + entries = TABLE_MAP_ENTRIES(map); + + /* Initialize all buckets to empty */ + for (i = 0; i < num_buckets; i++) + buckets[i] = invalid; + + /* Initialize free list - chain all entries */ + for (i = 0; i < max_entries; i++) + { + entries[i].in_use = false; + entries[i].next = (i < max_entries - 1) ? + i + 1 : invalid; + } + + ereport(DEBUG1, + (errmsg("track_table_mutation: " + "table map init %d buckets, " + "%d max entries", + num_buckets, max_entries))); +} + +/* + * Allocate an entry from the free list + */ +static int +table_map_alloc_entry(TrackTableMutationHashTable *map) +{ + TrackTableMutationEntry *entries; + int idx; + int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX; + + entries = TABLE_MAP_ENTRIES(map); + + if (map->free_list_head == invalid) + return invalid; + + idx = map->free_list_head; + map->free_list_head = entries[idx].next; + entries[idx].in_use = true; + entries[idx].next = invalid; + map->num_entries++; + + return idx; +} + +/* + * Free an entry back to the free list + */ +static void +table_map_free_entry(TrackTableMutationHashTable *map, + int idx) +{ + TrackTableMutationEntry *entries; + + entries = TABLE_MAP_ENTRIES(map); + + entries[idx].in_use = false; + entries[idx].next = map->free_list_head; + map->free_list_head = idx; + map->num_entries--; +} + +/* + * Look up a table in the hash table. + * Returns entry index or INVALID_INDEX if not found. + * Must be called with lock held. + */ +static int +table_map_lookup(TrackTableMutationHashTable *map, + int table_oid, int dboid, + uint32 hash) +{ + int *buckets = TABLE_MAP_BUCKETS(map); + TrackTableMutationEntry *entries; + int bucket = hash % map->num_buckets; + int idx = buckets[bucket]; + int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX; + + entries = TABLE_MAP_ENTRIES(map); + + while (idx != invalid) + { + if (entries[idx].hash == hash && + entries[idx].table_oid == table_oid && + entries[idx].dboid == dboid) + { + return idx; + } + idx = entries[idx].next; + } + + return invalid; +} + +/* + * Insert or update a table entry. + * Must be called with lock held. + */ +static void +table_map_insert(TrackTableMutationHashTable *map, + int table_oid, int dboid, + uint32 hash, + struct timeval *write_time) +{ + int *buckets = TABLE_MAP_BUCKETS(map); + TrackTableMutationEntry *entries; + int bucket = hash % map->num_buckets; + int idx; + int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX; + + entries = TABLE_MAP_ENTRIES(map); + + /* Check if entry already exists */ + idx = table_map_lookup(map, table_oid, dboid, hash); + if (idx != invalid) + { + /* Update last write time; keep first_write_time */ + entries[idx].last_write_time = *write_time; + return; + } + + /* Allocate new entry */ + idx = table_map_alloc_entry(map); + if (idx == invalid) + { + int b; + + /* Table is full - evict first non-empty bucket */ + for (b = 0; b < map->num_buckets; b++) + { + if (buckets[b] != invalid) + { + int victim = buckets[b]; + + buckets[b] = entries[victim].next; + table_map_free_entry(map, victim); + idx = table_map_alloc_entry(map); + break; + } + } + + if (idx == invalid) + { + ereport(WARNING, + (errmsg("track_table_mutation: " + "failed to allocate entry " + "for oid %d (dboid %d)", + table_oid, dboid))); + return; + } + } + + /* Initialize new entry */ + entries[idx].table_oid = table_oid; + entries[idx].dboid = dboid; + entries[idx].hash = hash; + entries[idx].first_write_time = *write_time; + entries[idx].last_write_time = *write_time; + + /* Insert at head of bucket chain */ + entries[idx].next = buckets[bucket]; + buckets[bucket] = idx; + + ereport(DEBUG2, + (errmsg("track_table_mutation: " + "marked oid %d (dboid %d) written", + table_oid, dboid))); +} + +/* + * Remove expired entries from the table map. + * Must be called with lock held. + */ +static void +table_map_cleanup_expired( + TrackTableMutationHashTable *map, uint64 ttl_us) +{ + int *buckets = TABLE_MAP_BUCKETS(map); + TrackTableMutationEntry *entries; + struct timeval now; + int64 max_stale_us; + int removed = 0; + int b; + int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX; + + entries = TABLE_MAP_ENTRIES(map); + get_current_time(&now); + + max_stale_us = (int64) pool_config + ->track_table_mutation_max_staleness * 1000LL; + + for (b = 0; b < map->num_buckets; b++) + { + int *prev_ptr = &buckets[b]; + int idx = buckets[b]; + + while (idx != invalid) + { + int64 age; + int64 total_age; + bool expired; + + age = elapsed_us( + &entries[idx].last_write_time, &now); + expired = (age > (int64) ttl_us); + + /* + * Also evict entries that exceed max_staleness from first write. + */ + if (!expired && max_stale_us > 0) + { + total_age = elapsed_us( + &entries[idx].first_write_time, + &now); + expired = (total_age >= max_stale_us); + } + + if (expired) + { + /* Entry has expired - remove it */ + int next = entries[idx].next; + + *prev_ptr = next; + table_map_free_entry(map, idx); + idx = next; + removed++; + } + else + { + prev_ptr = &entries[idx].next; + idx = entries[idx].next; + } + } + } + + if (removed > 0) + { + ereport(DEBUG1, + (errmsg("track_table_mutation: " + "cleaned up %d expired entries", + removed))); + } +} + +/* ---------------- + * Parse cache operations + * ---------------- + */ + +/* + * Initialize parse cache + */ +static void +parse_cache_init(QueryParseCache * cache, + int num_buckets, int max_entries) +{ + int *buckets; + QueryParseEntry *entries; + int i; + int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX; + + cache->num_buckets = num_buckets; + cache->max_entries = max_entries; + cache->num_entries = 0; + cache->free_list_head = 0; + cache->lru_head = invalid; + cache->lru_tail = invalid; + + buckets = PARSE_CACHE_BUCKETS(cache); + entries = PARSE_CACHE_ENTRIES(cache); + + /* Initialize all buckets to empty */ + for (i = 0; i < num_buckets; i++) + buckets[i] = invalid; + + /* Initialize free list */ + for (i = 0; i < max_entries; i++) + { + entries[i].in_use = false; + entries[i].next = (i < max_entries - 1) ? + i + 1 : invalid; + entries[i].lru_prev = invalid; + entries[i].lru_next = invalid; + } + + ereport(DEBUG1, + (errmsg("track_table_mutation: " + "parse cache init %d buckets, " + "%d max entries", + num_buckets, max_entries))); +} + +/* + * Move entry to front of LRU list (most recently used) + */ +static void +parse_cache_lru_touch(QueryParseCache * cache, int idx) +{ + QueryParseEntry *entries = PARSE_CACHE_ENTRIES(cache); + int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX; + + /* Already at head? */ + if (cache->lru_head == idx) + return; + + /* Remove from current position */ + if (entries[idx].lru_prev != invalid) + entries[entries[idx].lru_prev].lru_next = + entries[idx].lru_next; + if (entries[idx].lru_next != invalid) + entries[entries[idx].lru_next].lru_prev = + entries[idx].lru_prev; + if (cache->lru_tail == idx) + cache->lru_tail = entries[idx].lru_prev; + + /* Insert at head */ + entries[idx].lru_prev = invalid; + entries[idx].lru_next = cache->lru_head; + if (cache->lru_head != invalid) + entries[cache->lru_head].lru_prev = idx; + cache->lru_head = idx; + if (cache->lru_tail == invalid) + cache->lru_tail = idx; +} + +/* + * Add entry to LRU list (at head) + */ +static void +parse_cache_lru_add(QueryParseCache * cache, int idx) +{ + QueryParseEntry *entries = PARSE_CACHE_ENTRIES(cache); + int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX; + + entries[idx].lru_prev = invalid; + entries[idx].lru_next = cache->lru_head; + + if (cache->lru_head != invalid) + entries[cache->lru_head].lru_prev = idx; + + cache->lru_head = idx; + + if (cache->lru_tail == invalid) + cache->lru_tail = idx; +} + +/* + * Remove entry from LRU list + */ +static void +parse_cache_lru_remove(QueryParseCache * cache, int idx) +{ + QueryParseEntry *entries = PARSE_CACHE_ENTRIES(cache); + int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX; + + if (entries[idx].lru_prev != invalid) + entries[entries[idx].lru_prev].lru_next = + entries[idx].lru_next; + else + cache->lru_head = entries[idx].lru_next; + + if (entries[idx].lru_next != invalid) + entries[entries[idx].lru_next].lru_prev = + entries[idx].lru_prev; + else + cache->lru_tail = entries[idx].lru_prev; + + entries[idx].lru_prev = invalid; + entries[idx].lru_next = invalid; +} + +/* + * Allocate entry from free list, evicting LRU if needed + */ +static int +parse_cache_alloc_entry(QueryParseCache * cache) +{ + QueryParseEntry *entries = PARSE_CACHE_ENTRIES(cache); + int *buckets = PARSE_CACHE_BUCKETS(cache); + int idx; + int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX; + + if (cache->free_list_head != invalid) + { + idx = cache->free_list_head; + cache->free_list_head = entries[idx].next; + entries[idx].in_use = true; + entries[idx].next = invalid; + cache->num_entries++; + return idx; + } + + /* No free entries - evict LRU */ + if (cache->lru_tail == invalid) + return invalid; + + idx = cache->lru_tail; + + /* Remove from hash bucket */ + { + int bucket; + int *prev_ptr; + int curr; + + bucket = entries[idx].query_hash % + cache->num_buckets; + prev_ptr = &buckets[bucket]; + curr = buckets[bucket]; + + while (curr != invalid) + { + if (curr == idx) + { + *prev_ptr = entries[curr].next; + break; + } + prev_ptr = &entries[curr].next; + curr = entries[curr].next; + } + } + + /* Remove from LRU list */ + parse_cache_lru_remove(cache, idx); + + /* Reinitialize entry */ + entries[idx].in_use = true; + entries[idx].next = invalid; + + return idx; +} + +/* + * Look up a query in the parse cache + */ +static int +parse_cache_lookup(QueryParseCache * cache, uint64 hash) +{ + int *buckets = PARSE_CACHE_BUCKETS(cache); + QueryParseEntry *entries = PARSE_CACHE_ENTRIES(cache); + int bucket = hash % cache->num_buckets; + int idx = buckets[bucket]; + int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX; + + while (idx != invalid) + { + if (entries[idx].query_hash == hash) + return idx; + idx = entries[idx].next; + } + + return invalid; +} + +/* ---------------- + * Query normalization + * ---------------- + */ + +/* + * Simple query normalization: + * - Strip comments (-- and C-style block comments) + * - Collapse whitespace + * - Convert to lowercase (except inside strings) + * - Replace literal values with placeholders + */ +static size_t +normalize_query(const char *query, char *output, + size_t output_size) +{ + const char *src = query; + char *dst = output; + char *dst_end = output + output_size - 1; + bool in_string = false; + char string_char = 0; + bool last_was_space = true; + + while (*src && dst < dst_end) + { + /* Handle string literals */ + if (in_string) + { + if (*src == string_char) + { + if (*(src + 1) == string_char) + { + /* Escaped quote */ + src += 2; + continue; + } + in_string = false; + /* Replace string with placeholder */ + *dst++ = '$'; + } + src++; + continue; + } + + /* Check for string start */ + if (*src == '\'' || *src == '"') + { + in_string = true; + string_char = *src; + src++; + continue; + } + + /* Handle single-line comments */ + if (*src == '-' && *(src + 1) == '-') + { + while (*src && *src != '\n') + src++; + continue; + } + + /* Handle multi-line comments */ + if (*src == '/' && *(src + 1) == '*') + { + src += 2; + while (*src && + !(*src == '*' && *(src + 1) == '/')) + src++; + if (*src) + src += 2; + continue; + } + + /* Handle whitespace */ + if (*src == ' ' || *src == '\t' || + *src == '\n' || *src == '\r') + { + if (!last_was_space) + { + *dst++ = ' '; + last_was_space = true; + } + src++; + continue; + } + + /* Handle numbers - replace with placeholder */ + if ((*src >= '0' && *src <= '9') || + (*src == '.' && *(src + 1) >= '0' && + *(src + 1) <= '9')) + { + while (*src && + ((*src >= '0' && *src <= '9') || + *src == '.')) + src++; + if (!last_was_space && + dst > output && *(dst - 1) != '$') + *dst++ = '$'; + last_was_space = false; + continue; + } + + /* Regular character - convert to lowercase */ + if (*src >= 'A' && *src <= 'Z') + *dst++ = *src + 32; + else + *dst++ = *src; + + last_was_space = false; + src++; + } + + /* Remove trailing space */ + if (dst > output && *(dst - 1) == ' ') + dst--; + + *dst = '\0'; + return dst - output; +} + +/* ---------------- + * Public API implementation + * ---------------- + */ + +/* + * Calculate the total shared memory size required + * for the track table mutation feature. + */ +Size +pool_track_table_mutation_shmem_size(void) +{ + Size size = 0; + int tbl_bkt; + int tbl_sz; + int qry_bkt; + int qry_sz; + + tbl_bkt = pool_config->track_table_mutation_table_buckets; + tbl_sz = pool_config->track_table_mutation_table_size; + qry_bkt = pool_config->track_table_mutation_query_buckets; + qry_sz = pool_config->track_table_mutation_query_parse_cache_size; + + /* Main structure */ + size += sizeof(TrackTableMutationShmem); + + /* Table mutation hash table */ + size += sizeof(TrackTableMutationHashTable); + size += tbl_bkt * sizeof(int); + size += tbl_sz * sizeof(TrackTableMutationEntry); + + /* Parse cache */ + size += sizeof(QueryParseCache); + size += qry_bkt * sizeof(int); + size += qry_sz * sizeof(QueryParseEntry); + + return size; +} + +/* + * Initialize shared memory structures for the + * track table mutation feature. Allocates and sets + * up the table map and parse cache in shared memory. + * Called once from pgpool main process at startup. + */ +void +pool_track_table_mutation_init(void) +{ +#ifndef POOL_PRIVATE + Size shmem_size; + char *shmem_ptr; + TrackTableMutationState *st; + int tbl_bkt; + int tbl_sz; + int qry_bkt; + int qry_sz; + + if (pool_config->disable_load_balance_on_write != + DLBOW_DML_ADAPTIVE_GLOBAL) + { + ereport(DEBUG1, + (errmsg("track_table_mutation: " + "feature disabled"))); + return; + } + + tbl_bkt = pool_config->track_table_mutation_table_buckets; + tbl_sz = pool_config->track_table_mutation_table_size; + qry_bkt = pool_config->track_table_mutation_query_buckets; + qry_sz = pool_config->track_table_mutation_query_parse_cache_size; + + shmem_size = pool_track_table_mutation_shmem_size(); + + /* + * Allocate from the main shared memory segment. Memory is zeroed by + * initialize_shared_memory_main_segment(). + */ + shmem_ptr = pool_shared_memory_segment_get_chunk( + shmem_size); + if (shmem_ptr == NULL) + { + ereport(ERROR, + (errmsg("track_table_mutation: " + "failed to allocate %zu bytes", + shmem_size))); + return; + } + + /* Set up pointers within shared memory */ + track_table_mutation_shmem = + (TrackTableMutationShmem *) shmem_ptr; + shmem_ptr += sizeof(TrackTableMutationShmem); + + track_table_mutation_shmem->table_map = + (TrackTableMutationHashTable *) shmem_ptr; + shmem_ptr += sizeof(TrackTableMutationHashTable); + shmem_ptr += tbl_bkt * sizeof(int); + shmem_ptr += tbl_sz * sizeof(TrackTableMutationEntry); + + track_table_mutation_shmem->query_cache = + (QueryParseCache *) shmem_ptr; + + /* Initialize structures */ + table_map_init( + track_table_mutation_shmem->table_map, + tbl_bkt, tbl_sz); + + parse_cache_init( + track_table_mutation_shmem->query_cache, + qry_bkt, qry_sz); + + /* Initialize global state */ + st = &track_table_mutation_shmem->state; + st->initialized = true; + st->current_ttl_us = TRACK_TABLE_MUTATION_DEFAULT_TTL_US; + get_current_time(&st->ttl_last_updated); + get_current_time(&st->last_cleanup_time); + st->global_cold_start_until.tv_sec = 0; + st->global_cold_start_until.tv_usec = 0; + st->stats_queries_checked = 0; + st->stats_forced_primary = 0; + st->stats_allowed_replica = 0; + + ereport(LOG, + (errmsg("track_table_mutation: " + "initialized with %zu bytes shmem", + shmem_size))); +#endif +} + +/* + * Initialize per-child process state. + * Records the process start time for cold start + * period tracking. Called when a child process starts. + */ +void +pool_track_table_mutation_child_init(void) +{ + int dur; + + if (TRACK_TABLE_MUTATION_DISABLED()) + return; + + get_current_time(&process_start_time); + cold_start_initialized = true; + dur = pool_config->track_table_mutation_cold_start_duration; + + ereport(DEBUG1, + (errmsg("track_table_mutation: " + "child init, cold start %d ms", + dur))); +} + +/* + * Check if the process is in cold start period. + * During cold start, all queries are routed to + * primary to avoid stale reads. Checks both + * per-process and global (watchdog) cold start. + */ +bool +pool_track_table_mutation_in_cold_start(void) +{ + struct timeval now; + int64 elapsed_ms; + int dur; + TrackTableMutationState *st; + + if (TRACK_TABLE_MUTATION_DISABLED()) + return false; + + dur = pool_config->track_table_mutation_cold_start_duration; + if (dur <= 0) + return false; + + get_current_time(&now); + st = &track_table_mutation_shmem->state; + + /* Check watchdog-triggered global cold start */ + if (st->global_cold_start_until.tv_sec != 0 && + elapsed_us(&now, + &st->global_cold_start_until) > 0) + { + return true; + } + + /* Check per-process cold start */ + if (!cold_start_initialized) + return false; + + elapsed_ms = elapsed_us(&process_start_time, &now) / 1000; + + if (elapsed_ms < dur) + { + ereport(DEBUG2, + (errmsg("track_table_mutation: " + "cold start (%ld/%d ms)", + (long) elapsed_ms, dur))); + return true; + } + + return false; +} + +/* + * Trigger a global cold start for all processes. + * Sets the cold start end time in shared memory. + * Called after watchdog leader change to force all + * queries to primary during the transition. + */ +void +pool_track_table_mutation_trigger_global_cold_start(void) +{ + struct timeval now; + struct timeval *until; + int dur; + + if (TRACK_TABLE_MUTATION_DISABLED()) + return; + + dur = pool_config->track_table_mutation_cold_start_duration; + if (dur <= 0) + return; + + get_current_time(&now); + until = &track_table_mutation_shmem->state + .global_cold_start_until; + *until = now; + until->tv_sec += dur / 1000; + until->tv_usec += (dur % 1000) * 1000; + if (until->tv_usec >= 1000000) + { + until->tv_sec += until->tv_usec / 1000000; + until->tv_usec %= 1000000; + } + + ereport(LOG, + (errmsg("track_table_mutation: " + "global cold start for %d ms", + dur))); +} + +/* + * Check if a table was recently written (is "stale"). + * Returns true if reads should go to primary because + * the table was written within the current TTL window. + */ +bool +pool_track_table_mutation_table_is_stale( + int table_oid, int dboid) +{ + TrackTableMutationHashTable *map; + struct timeval now; + uint64 ttl_us; + uint32 hash; + int idx; + bool is_stale = false; + + if (TRACK_TABLE_MUTATION_DISABLED()) + return false; + + if (table_oid <= 0 || dboid <= 0) + { + is_stale = true; + goto update_stats; + } + + map = track_table_mutation_shmem->table_map; + hash = fnv1a_hash_table_key(table_oid, dboid); + + table_map_lock(); + + idx = table_map_lookup(map, table_oid, dboid, hash); + if (idx != TRACK_TABLE_MUTATION_INVALID_INDEX) + { + TrackTableMutationEntry *entries; + int64 age; + int64 total_age; + int64 max_stale_us; + + entries = TABLE_MAP_ENTRIES(map); + get_current_time(&now); + ttl_us = track_table_mutation_shmem->state + .current_ttl_us; + + age = elapsed_us( + &entries[idx].last_write_time, &now); + is_stale = (age < (int64) ttl_us); + + /* + * Enforce max_staleness hard cap: no entry can force primary routing + * longer than max_staleness from its first write. + */ + if (is_stale) + { + max_stale_us = (int64) pool_config + ->track_table_mutation_max_staleness + * 1000LL; + if (max_stale_us > 0) + { + total_age = elapsed_us( + &entries[idx].first_write_time, + &now); + if (total_age >= max_stale_us) + is_stale = false; + } + } + + ereport(DEBUG2, + (errmsg("track_table_mutation: " + "oid %d dboid %d " + "elapsed=%ld ttl=%lu stale=%d", + table_oid, dboid, + (long) age, + (unsigned long) ttl_us, + is_stale))); + } + + table_map_unlock(); + +update_stats: + /* Update statistics using semaphore */ + if (track_table_mutation_shmem != NULL) + { + TrackTableMutationState *st; + + table_map_lock(); + st = &track_table_mutation_shmem->state; + st->stats_queries_checked++; + if (is_stale) + st->stats_forced_primary++; + else + st->stats_allowed_replica++; + table_map_unlock(); + } + + return is_stale; +} + +/* + * Mark multiple tables as recently written. + * Called after DML queries complete to record + * which tables were modified. + */ +void +pool_track_table_mutation_mark_tables_written( + const int *table_oids, int num_tables, int dboid) +{ + TrackTableMutationHashTable *map; + TrackTableMutationState *st; + struct timeval now; + int i; + + if (TRACK_TABLE_MUTATION_DISABLED()) + return; + + if (num_tables <= 0 || table_oids == NULL || + dboid <= 0) + return; + + map = track_table_mutation_shmem->table_map; + st = &track_table_mutation_shmem->state; + get_current_time(&now); + + table_map_lock(); + + /* Periodically clean up expired entries */ + if (map->num_entries > map->max_entries * 3 / 4) + { + int64 since_cleanup; + + since_cleanup = elapsed_us( + &st->last_cleanup_time, &now); + /* 100ms interval */ + if (since_cleanup > 100000) + { + table_map_cleanup_expired( + map, st->current_ttl_us); + st->last_cleanup_time = now; + } + } + + for (i = 0; i < num_tables; i++) + { + uint32 hash; + int table_oid = table_oids[i]; + + if (table_oid > 0) + { + hash = fnv1a_hash_table_key( + table_oid, dboid); + table_map_insert(map, table_oid, + dboid, hash, &now); + } + } + + table_map_unlock(); +} + +/* + * Mark a single table as recently written. + */ +void +pool_track_table_mutation_mark_table_written( + int table_oid, int dboid) +{ + if (table_oid > 0 && dboid > 0) + { + const int tables[1] = {table_oid}; + + pool_track_table_mutation_mark_tables_written( + tables, 1, dboid); + } +} + +/* + * Update the staleness TTL based on observed + * replication delay. New TTL = delay * factor, + * clamped to [default_ttl, 1 hour]. + */ +void +pool_track_table_mutation_update_ttl(uint64 delay_us) +{ + uint64 new_ttl; + double factor; + TrackTableMutationState *st; + + if (TRACK_TABLE_MUTATION_DISABLED()) + return; + + factor = pool_config->track_table_mutation_ttl_factor; + new_ttl = (uint64) (delay_us * factor); + if (new_ttl < TRACK_TABLE_MUTATION_DEFAULT_TTL_US) + new_ttl = TRACK_TABLE_MUTATION_DEFAULT_TTL_US; + + /* Maximum TTL of 1 hour */ + if (new_ttl > 3600ULL * 1000000ULL) + new_ttl = 3600ULL * 1000000ULL; + + st = &track_table_mutation_shmem->state; + st->current_ttl_us = new_ttl; + get_current_time(&st->ttl_last_updated); + + ereport(DEBUG1, + (errmsg("track_table_mutation: " + "TTL=%lu us (delay=%lu factor=%.1f)", + (unsigned long) new_ttl, + (unsigned long) delay_us, + factor))); +} + +/* + * Look up a cached parse result by query hash. + * Returns true and fills output parameters if + * the query was found in the parse cache. + */ +bool +pool_track_table_mutation_get_cached_parse( + uint64 hash, bool *is_write, + char table_names[][TRACK_TABLE_MUTATION_TABLE_NAME_LEN], + int *num_tables) +{ + QueryParseCache *cache; + int idx; + bool found = false; + int max_tables; + + if (TRACK_TABLE_MUTATION_DISABLED()) + return false; + + max_tables = TRACK_TABLE_MUTATION_MAX_TABLES_PER_QUERY; + cache = track_table_mutation_shmem->query_cache; + + parse_cache_lock(); + + idx = parse_cache_lookup(cache, hash); + if (idx != TRACK_TABLE_MUTATION_INVALID_INDEX) + { + QueryParseEntry *entries; + int i; + int namelen; + + entries = PARSE_CACHE_ENTRIES(cache); + namelen = TRACK_TABLE_MUTATION_TABLE_NAME_LEN; + *is_write = entries[idx].is_write; + *num_tables = entries[idx].num_tables; + + for (i = 0; + i < entries[idx].num_tables && + i < max_tables; + i++) + { + strlcpy(table_names[i], + entries[idx].table_names[i], + namelen); + } + + /* Move to front of LRU */ + parse_cache_lru_touch(cache, idx); + found = true; + } + + parse_cache_unlock(); + + return found; +} + +/* + * Store a parse result in the shared cache. + * Evicts the LRU entry if the cache is full. + */ +void +pool_track_table_mutation_cache_parse( + uint64 hash, bool is_write, + const char table_names[][TRACK_TABLE_MUTATION_TABLE_NAME_LEN], + int num_tables) +{ + QueryParseCache *cache; + int *buckets; + QueryParseEntry *entries; + int idx; + int bucket; + int max_tables; + int namelen; + + if (TRACK_TABLE_MUTATION_DISABLED()) + return; + + max_tables = TRACK_TABLE_MUTATION_MAX_TABLES_PER_QUERY; + namelen = TRACK_TABLE_MUTATION_TABLE_NAME_LEN; + cache = track_table_mutation_shmem->query_cache; + + parse_cache_lock(); + + /* Check if already exists */ + idx = parse_cache_lookup(cache, hash); + if (idx != TRACK_TABLE_MUTATION_INVALID_INDEX) + { + parse_cache_unlock(); + return; + } + + /* Allocate new entry (may evict LRU) */ + idx = parse_cache_alloc_entry(cache); + if (idx == TRACK_TABLE_MUTATION_INVALID_INDEX) + { + parse_cache_unlock(); + ereport(WARNING, + (errmsg("track_table_mutation: " + "parse cache alloc failed"))); + return; + } + + entries = PARSE_CACHE_ENTRIES(cache); + buckets = PARSE_CACHE_BUCKETS(cache); + + /* Fill in entry */ + entries[idx].query_hash = hash; + entries[idx].is_write = is_write; + entries[idx].num_tables = + (num_tables > max_tables) ? + max_tables : num_tables; + + { + int i; + + for (i = 0; i < entries[idx].num_tables; i++) + { + strlcpy(entries[idx].table_names[i], + table_names[i], namelen); + } + } + + /* Insert into hash bucket */ + bucket = hash % cache->num_buckets; + entries[idx].next = buckets[bucket]; + buckets[bucket] = idx; + + /* Add to LRU list */ + parse_cache_lru_add(cache, idx); + + parse_cache_unlock(); +} + +/* + * Normalize a SQL query and compute its 64-bit hash. + * Strips comments, collapses whitespace, lowercases, + * and replaces literals with placeholders. + */ +uint64 +pool_track_table_mutation_normalize_and_hash( + const char *query) +{ + char normalized[8192]; + size_t len; + + if (query == NULL || query[0] == '\0') + return 0; + + len = normalize_query(query, normalized, + sizeof(normalized)); + if (len == 0) + return 0; + + return fnv1a_hash_64(normalized, len); +} -- 2.43.0