diff --git a/contrib/Makefile b/contrib/Makefile index d230451..ce6d461 100644 --- a/contrib/Makefile +++ b/contrib/Makefile @@ -43,6 +43,7 @@ SUBDIRS = \ pgcrypto \ pgrowlocks \ pgstattuple \ + postgresql_fdw \ seg \ spi \ tablefunc \ diff --git a/contrib/postgresql_fdw/.gitignore b/contrib/postgresql_fdw/.gitignore new file mode 100644 index 0000000..0854728 --- /dev/null +++ b/contrib/postgresql_fdw/.gitignore @@ -0,0 +1,4 @@ +# Generated subdirectories +/results/ +*.o +*.so diff --git a/contrib/postgresql_fdw/Makefile b/contrib/postgresql_fdw/Makefile new file mode 100644 index 0000000..898036f --- /dev/null +++ b/contrib/postgresql_fdw/Makefile @@ -0,0 +1,22 @@ +# contrib/postgresql_fdw/Makefile + +MODULE_big = postgresql_fdw +OBJS = postgresql_fdw.o option.o deparse.o connection.o +PG_CPPFLAGS = -I$(libpq_srcdir) +SHLIB_LINK = $(libpq) + +EXTENSION = postgresql_fdw +DATA = postgresql_fdw--1.0.sql + +REGRESS = postgresql_fdw + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = contrib/postgresql_fdw +top_builddir = ../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/contrib/postgresql_fdw/connection.c b/contrib/postgresql_fdw/connection.c new file mode 100644 index 0000000..b7574c4 --- /dev/null +++ b/contrib/postgresql_fdw/connection.c @@ -0,0 +1,605 @@ +/*------------------------------------------------------------------------- + * + * connection.c + * Connection management for postgresql_fdw + * + * Portions Copyright (c) 2012, PostgreSQL Global Development Group + * + * IDENTIFICATION + * contrib/postgresql_fdw/connection.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/htup_details.h" +#include "access/xact.h" +#include "catalog/pg_type.h" +#include "foreign/foreign.h" +#include "funcapi.h" +#include "libpq-fe.h" +#include "mb/pg_wchar.h" +#include "miscadmin.h" +#include "utils/array.h" +#include "utils/builtins.h" +#include "utils/hsearch.h" +#include "utils/memutils.h" +#include "utils/resowner.h" +#include "utils/tuplestore.h" + +#include "postgresql_fdw.h" +#include "connection.h" + +/* ============================================================================ + * Connection management functions + * ==========================================================================*/ + +/* + * Connection cache entry managed with hash table. + */ +typedef struct ConnCacheEntry +{ + /* hash key must be first */ + Oid serverid; /* oid of foreign server */ + Oid userid; /* oid of local user */ + + bool use_tx; /* true when using remote transaction */ + int refs; /* reference counter */ + PGconn *conn; /* foreign server connection */ +} ConnCacheEntry; + +/* + * Hash table which is used to cache connection to PostgreSQL servers, will be + * initialized before first attempt to connect PostgreSQL server by the backend. + */ +static HTAB *ConnectionHash; + +/* ---------------------------------------------------------------------------- + * prototype of private functions + * --------------------------------------------------------------------------*/ +static void +cleanup_connection(ResourceReleasePhase phase, + bool isCommit, + bool isTopLevel, + void *arg); +static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user); +static void begin_remote_tx(PGconn *conn); +static void abort_remote_tx(PGconn *conn); + +/* + * Get a PGconn which can be used to execute foreign query on the remote + * PostgreSQL server with the user's authorization. If this was the first + * request for the server, new connection is established. + * + * When use_tx is true, remote transaction is started if caller is the only + * user of the connection. Isolation level of the remote transaction is same + * as local transaction, and remote transaction will be aborted when last + * user release. + * + * TODO: Note that caching connections requires a mechanism to detect change of + * FDW object to invalidate already established connections. + */ +PGconn * +GetConnection(ForeignServer *server, UserMapping *user, bool use_tx) +{ + bool found; + ConnCacheEntry *entry; + ConnCacheEntry key; + + /* initialize connection cache if it isn't */ + if (ConnectionHash == NULL) + { + HASHCTL ctl; + + /* hash key is a pair of oids: serverid and userid */ + MemSet(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(Oid) + sizeof(Oid); + ctl.entrysize = sizeof(ConnCacheEntry); + ctl.hash = tag_hash; + ctl.match = memcmp; + ctl.keycopy = memcpy; + /* allocate ConnectionHash in the cache context */ + ctl.hcxt = CacheMemoryContext; + ConnectionHash = hash_create("postgresql_fdw connections", 32, + &ctl, + HASH_ELEM | HASH_CONTEXT | + HASH_FUNCTION | HASH_COMPARE | + HASH_KEYCOPY); + + /* + * Register postgresql_fdw's own cleanup function for connection + * cleanup. This should be done just once for each backend. + */ + RegisterResourceReleaseCallback(cleanup_connection, ConnectionHash); + } + + /* Create key value for the entry. */ + MemSet(&key, 0, sizeof(key)); + key.serverid = server->serverid; + key.userid = GetOuterUserId(); + + /* + * Find cached entry for requested connection. If we couldn't find, + * callback function of ResourceOwner should be registered to clean the + * connection up on error including user interrupt. + */ + entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found); + if (!found) + { + entry->use_tx = false; + entry->refs = 0; + entry->conn = NULL; + } + + /* + * We don't check the health of cached connection here, because it would + * require some overhead. Broken connection and its cache entry will be + * cleaned up when the connection is actually used. + */ + + /* + * If cache entry doesn't have connection, we have to establish new + * connection. + */ + if (entry->conn == NULL) + { + PGconn *volatile conn = NULL; + + /* + * Use PG_TRY block to ensure closing connection on error. + */ + PG_TRY(); + { + /* + * Connect to the foreign PostgreSQL server, and store it in cache + * entry to keep new connection. + * Note: key items of entry has already been initialized in + * hash_search(HASH_ENTER). + */ + conn = connect_pg_server(server, user); + } + PG_CATCH(); + { + /* Clear connection cache entry on error case. */ + PQfinish(entry->conn); + entry->use_tx = false; + entry->refs = 0; + entry->conn = NULL; + PG_RE_THROW(); + } + PG_END_TRY(); + entry->conn = conn; + elog(DEBUG3, "new postgresql_fdw connection %p for server %s", + entry->conn, server->servername); + } + + /* Increase connection reference counter. */ + entry->refs++; + + /* + * If remote transaction is requested but it has not started, start remote + * transaction with the same isolation level as the local transaction we + * are in. We need to remember whether this connection uses remote + * transaction to abort it when this connection is released completely. + */ + if (use_tx && !entry->use_tx) + { + begin_remote_tx(entry->conn); + entry->use_tx = use_tx; + } + + return entry->conn; +} + +/* + * For non-superusers, insist that the connstr specify a password. This + * prevents a password from being picked up from .pgpass, a service file, + * the environment, etc. We don't want the postgres user's passwords + * to be accessible to non-superusers. + */ +static void +check_conn_params(const char **keywords, const char **values) +{ + int i; + + /* no check required if superuser */ + if (superuser()) + return; + + /* ok if params contain a non-empty password */ + for (i = 0; keywords[i] != NULL; i++) + { + if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0') + return; + } + + ereport(ERROR, + (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED), + errmsg("password is required"), + errdetail("Non-superusers must provide a password in the connection string."))); +} + +static PGconn * +connect_pg_server(ForeignServer *server, UserMapping *user) +{ + const char *conname = server->servername; + PGconn *conn; + const char **all_keywords; + const char **all_values; + const char **keywords; + const char **values; + int n; + int i, j; + + /* + * Construct connection params from generic options of ForeignServer and + * UserMapping. Those two object hold only libpq options. + * Extra 3 items are for: + * *) fallback_application_name + * *) client_encoding + * *) NULL termination (end marker) + * + * Note: We don't omit any parameters even target database might be older + * than local, because unexpected parameters are just ignored. + */ + n = list_length(server->options) + list_length(user->options) + 3; + all_keywords = (const char **) palloc(sizeof(char *) * n); + all_values = (const char **) palloc(sizeof(char *) * n); + keywords = (const char **) palloc(sizeof(char *) * n); + values = (const char **) palloc(sizeof(char *) * n); + n = 0; + n += ExtractConnectionOptions(server->options, + all_keywords + n, all_values + n); + n += ExtractConnectionOptions(user->options, + all_keywords + n, all_values + n); + all_keywords[n] = all_values[n] = NULL; + + for (i = 0, j = 0; all_keywords[i]; i++) + { + keywords[j] = all_keywords[i]; + values[j] = all_values[i]; + j++; + } + + /* Use "postgresql_fdw" as fallback_application_name. */ + keywords[j] = "fallback_application_name"; + values[j++] = "postgresql_fdw"; + + /* Set client_encoding so that libpq can convert encoding properly. */ + keywords[j] = "client_encoding"; + values[j++] = GetDatabaseEncodingName(); + + keywords[j] = values[j] = NULL; + pfree(all_keywords); + pfree(all_values); + + /* verify connection parameters and do connect */ + check_conn_params(keywords, values); + conn = PQconnectdbParams(keywords, values, 0); + if (!conn || PQstatus(conn) != CONNECTION_OK) + ereport(ERROR, + (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), + errmsg("could not connect to server \"%s\"", conname), + errdetail("%s", PQerrorMessage(conn)))); + pfree(keywords); + pfree(values); + + /* + * Check that non-superuser has used password to establish connection. + * This check logic is based on dblink_security_check() in contrib/dblink. + * + * XXX Should we check this even if we don't provide unsafe version like + * dblink_connect_u()? + */ + if (!superuser() && !PQconnectionUsedPassword(conn)) + { + PQfinish(conn); + ereport(ERROR, + (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED), + errmsg("password is required"), + errdetail("Non-superuser cannot connect if the server does not request a password."), + errhint("Target server's authentication method must be changed."))); + } + + return conn; +} + +/* + * Start remote transaction with proper isolation level. + */ +static void +begin_remote_tx(PGconn *conn) +{ + const char *sql = NULL; /* keep compiler quiet. */ + PGresult *res; + + switch (XactIsoLevel) + { + case XACT_READ_UNCOMMITTED: + case XACT_READ_COMMITTED: + case XACT_REPEATABLE_READ: + sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ"; + break; + case XACT_SERIALIZABLE: + sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE"; + break; + default: + elog(ERROR, "unexpected isolation level: %d", XactIsoLevel); + break; + } + + elog(DEBUG3, "starting remote transaction with \"%s\"", sql); + + res = PQexec(conn, sql); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + PQclear(res); + elog(ERROR, "could not start transaction: %s", PQerrorMessage(conn)); + } + PQclear(res); +} + +static void +abort_remote_tx(PGconn *conn) +{ + PGresult *res; + + elog(DEBUG3, "aborting remote transaction"); + + res = PQexec(conn, "ABORT TRANSACTION"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + PQclear(res); + elog(ERROR, "could not abort transaction: %s", PQerrorMessage(conn)); + } + PQclear(res); +} + +/* + * Mark the connection as "unused", and close it if the caller was the last + * user of the connection. + */ +void +ReleaseConnection(PGconn *conn) +{ + HASH_SEQ_STATUS scan; + ConnCacheEntry *entry; + + if (conn == NULL) + return; + + /* + * We need to scan sequentially since we use the address to find + * appropriate PGconn from the hash table. + */ + hash_seq_init(&scan, ConnectionHash); + while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) + { + if (entry->conn == conn) + { + hash_seq_term(&scan); + break; + } + } + + /* + * If the given connection is an orphan, it must be a dangling pointer to + * already released connection. Discarding connection due to remote query + * error would produce such situation (see comments below). + */ + if (entry == NULL) + return; + + /* + * If releasing connection is broken or its transaction has failed, + * discard the connection to recover from the error. PQfinish would cause + * dangling pointer of shared PGconn object, but they won't double-free'd + * because their pointer values don't match any of cached entry and ignored + * at the check above. + * + * Subsequent connection request via GetConnection will create new + * connection. + */ + if (PQstatus(conn) != CONNECTION_OK || + (PQtransactionStatus(conn) != PQTRANS_IDLE && + PQtransactionStatus(conn) != PQTRANS_INTRANS)) + { + elog(DEBUG3, "discarding connection: %s %s", + PQstatus(conn) == CONNECTION_OK ? "OK" : "NG", + PQtransactionStatus(conn) == PQTRANS_IDLE ? "IDLE" : + PQtransactionStatus(conn) == PQTRANS_ACTIVE ? "ACTIVE" : + PQtransactionStatus(conn) == PQTRANS_INTRANS ? "INTRANS" : + PQtransactionStatus(conn) == PQTRANS_INERROR ? "INERROR" : + "UNKNOWN"); + PQfinish(conn); + entry->use_tx = false; + entry->refs = 0; + entry->conn = NULL; + return; + } + + /* + * Decrease reference counter of this connection. Even if the caller was + * the last referrer, we don't unregister it from cache. + */ + entry->refs--; + if (entry->refs < 0) + entry->refs = 0; /* just in case */ + + /* + * If this connection uses remote transaction and there is no user other + * than the caller, abort the remote transaction and forget about it. + */ + if (entry->use_tx && entry->refs == 0) + { + abort_remote_tx(conn); + entry->use_tx = false; + } +} + +/* + * Clean the connection up via ResourceOwner. + */ +static void +cleanup_connection(ResourceReleasePhase phase, + bool isCommit, + bool isTopLevel, + void *arg) +{ + HASH_SEQ_STATUS scan; + ConnCacheEntry *entry = (ConnCacheEntry *) arg; + + /* If the transaction was committed, don't close connections. */ + if (isCommit) + return; + + /* + * We clean the connection up on post-lock because foreign connections are + * backend-internal resource. + */ + if (phase != RESOURCE_RELEASE_AFTER_LOCKS) + return; + + /* + * We ignore cleanup for ResourceOwners other than transaction. At this + * point, such a ResourceOwner is only Portal. + */ + if (CurrentResourceOwner != CurTransactionResourceOwner) + return; + + /* + * We don't need to clean up at end of subtransactions, because they might + * be recovered to consistent state with savepoints. + */ + if (!isTopLevel) + return; + + /* + * Here, it must be after abort of top level transaction. Disconnect all + * cached connections to clear error status out and reset their reference + * counters. + */ + hash_seq_init(&scan, ConnectionHash); + while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) + { + elog(DEBUG3, "discard postgresql_fdw connection %p due to resowner cleanup", + entry->conn); + PQfinish(entry->conn); + entry->use_tx = false; + entry->refs = 0; + entry->conn = NULL; + } +} + +/* + * Get list of connections currently active. + */ +Datum postgresql_fdw_get_connections(PG_FUNCTION_ARGS); +PG_FUNCTION_INFO_V1(postgresql_fdw_get_connections); +Datum +postgresql_fdw_get_connections(PG_FUNCTION_ARGS) +{ + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + HASH_SEQ_STATUS scan; + ConnCacheEntry *entry; + MemoryContext oldcontext = CurrentMemoryContext; + Tuplestorestate *tuplestore; + TupleDesc tupdesc; + + /* We return list of connection with storing them in a Tuplestore. */ + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = NULL; + rsinfo->setDesc = NULL; + + /* Create tuplestore and copy of TupleDesc in per-query context. */ + MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory); + + tupdesc = CreateTemplateTupleDesc(2, false); + TupleDescInitEntry(tupdesc, 1, "srvid", OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, 2, "usesysid", OIDOID, -1, 0); + rsinfo->setDesc = tupdesc; + + tuplestore = tuplestore_begin_heap(false, false, work_mem); + rsinfo->setResult = tuplestore; + + MemoryContextSwitchTo(oldcontext); + + /* + * We need to scan sequentially since we use the address to find + * appropriate PGconn from the hash table. + */ + if (ConnectionHash != NULL) + { + hash_seq_init(&scan, ConnectionHash); + while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) + { + Datum values[2]; + bool nulls[2]; + HeapTuple tuple; + + /* Ignore inactive connections */ + if (PQstatus(entry->conn) != CONNECTION_OK) + continue; + + /* + * Ignore other users' connections if current user isn't a + * superuser. + */ + if (!superuser() && entry->userid != GetUserId()) + continue; + + values[0] = ObjectIdGetDatum(entry->serverid); + values[1] = ObjectIdGetDatum(entry->userid); + nulls[0] = false; + nulls[1] = false; + + tuple = heap_formtuple(tupdesc, values, nulls); + tuplestore_puttuple(tuplestore, tuple); + } + } + tuplestore_donestoring(tuplestore); + + PG_RETURN_VOID(); +} + +/* + * Discard persistent connection designated by given connection name. + */ +Datum postgresql_fdw_disconnect(PG_FUNCTION_ARGS); +PG_FUNCTION_INFO_V1(postgresql_fdw_disconnect); +Datum +postgresql_fdw_disconnect(PG_FUNCTION_ARGS) +{ + Oid serverid = PG_GETARG_OID(0); + Oid userid = PG_GETARG_OID(1); + ConnCacheEntry key; + ConnCacheEntry *entry = NULL; + bool found; + + /* Non-superuser can't discard other users' connection. */ + if (!superuser() && userid != GetOuterUserId()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("only superuser can discard other user's connection"))); + + /* + * If no connection has been established, or no such connections, just + * return "NG" to indicate nothing has done. + */ + if (ConnectionHash == NULL) + PG_RETURN_TEXT_P(cstring_to_text("NG")); + + key.serverid = serverid; + key.userid = userid; + entry = hash_search(ConnectionHash, &key, HASH_FIND, &found); + if (!found) + PG_RETURN_TEXT_P(cstring_to_text("NG")); + + /* Discard cached connection, and clear reference counter. */ + PQfinish(entry->conn); + entry->use_tx = false; + entry->refs = 0; + entry->conn = NULL; + + PG_RETURN_TEXT_P(cstring_to_text("OK")); +} diff --git a/contrib/postgresql_fdw/connection.h b/contrib/postgresql_fdw/connection.h new file mode 100644 index 0000000..17355df --- /dev/null +++ b/contrib/postgresql_fdw/connection.h @@ -0,0 +1,25 @@ +/*------------------------------------------------------------------------- + * + * connection.h + * Connection management for postgresql_fdw + * + * Portions Copyright (c) 2012, PostgreSQL Global Development Group + * + * IDENTIFICATION + * contrib/postgresql_fdw/connection.h + * + *------------------------------------------------------------------------- + */ +#ifndef CONNECTION_H +#define CONNECTION_H + +#include "foreign/foreign.h" +#include "libpq-fe.h" + +/* + * Connection management + */ +PGconn *GetConnection(ForeignServer *server, UserMapping *user, bool use_tx); +void ReleaseConnection(PGconn *conn); + +#endif /* CONNECTION_H */ diff --git a/contrib/postgresql_fdw/deparse.c b/contrib/postgresql_fdw/deparse.c new file mode 100644 index 0000000..698dbb8 --- /dev/null +++ b/contrib/postgresql_fdw/deparse.c @@ -0,0 +1,1203 @@ +/*------------------------------------------------------------------------- + * + * deparse.c + * query deparser for PostgreSQL + * + * Copyright (c) 2012, PostgreSQL Global Development Group + * + * IDENTIFICATION + * contrib/postgresql_fdw/deparse.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/htup_details.h" +#include "access/transam.h" +#include "catalog/pg_class.h" +#include "catalog/pg_operator.h" +#include "catalog/pg_type.h" +#include "commands/defrem.h" +#include "foreign/foreign.h" +#include "lib/stringinfo.h" +#include "nodes/nodeFuncs.h" +#include "nodes/nodes.h" +#include "nodes/makefuncs.h" +#include "optimizer/clauses.h" +#include "optimizer/var.h" +#include "parser/parser.h" +#include "parser/parsetree.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "utils/rel.h" +#include "utils/syscache.h" + +#include "postgresql_fdw.h" + +/* + * Context for walk-through the expression tree. + */ +typedef struct foreign_executable_cxt +{ + PlannerInfo *root; + RelOptInfo *foreignrel; + bool has_param; +} foreign_executable_cxt; + +/* + * Get string representation which can be used in SQL statement from a node. + */ +static void deparseExpr(StringInfo buf, Expr *expr, PlannerInfo *root); +static void deparseRelation(StringInfo buf, RangeTblEntry *rte); +static void deparseVar(StringInfo buf, Var *node, PlannerInfo *root); +static void deparseConst(StringInfo buf, Const *node, PlannerInfo *root); +static void deparseBoolExpr(StringInfo buf, BoolExpr *node, PlannerInfo *root); +static void deparseNullTest(StringInfo buf, NullTest *node, PlannerInfo *root); +static void deparseDistinctExpr(StringInfo buf, DistinctExpr *node, + PlannerInfo *root); +static void deparseRelabelType(StringInfo buf, RelabelType *node, + PlannerInfo *root); +static void deparseFuncExpr(StringInfo buf, FuncExpr *node, PlannerInfo *root); +static void deparseParam(StringInfo buf, Param *node, PlannerInfo *root); +static void deparseScalarArrayOpExpr(StringInfo buf, ScalarArrayOpExpr *node, + PlannerInfo *root); +static void deparseOpExpr(StringInfo buf, OpExpr *node, PlannerInfo *root); +static void deparseArrayRef(StringInfo buf, ArrayRef *node, PlannerInfo *root); +static void deparseArrayExpr(StringInfo buf, ArrayExpr *node, PlannerInfo *root); + +/* + * Determine whether an expression can be evaluated on remote side safely. + */ +static bool is_foreign_expr(PlannerInfo *root, RelOptInfo *baserel, Expr *expr, + bool *has_param); +static bool foreign_expr_walker(Node *node, foreign_executable_cxt *context); +static bool is_builtin(Oid procid); + +/* + * Deparse query representation into SQL statement which suits for remote + * PostgreSQL server. This function basically creates simple query string + * which consists of only SELECT, FROM clauses. + * + * Remote SELECT clause contains only columns which are used in targetlist or + * local_conds (conditions which can't be pushed down and will be checked on + * local side). + */ +void +deparseSimpleSql(StringInfo buf, + PlannerInfo *root, + RelOptInfo *baserel, + List *local_conds) +{ + RangeTblEntry *rte; + ListCell *lc; + StringInfoData foreign_relname; + bool first; + AttrNumber attr; + List *attr_used = NIL; /* List of AttNumber used in the query */ + + initStringInfo(buf); + initStringInfo(&foreign_relname); + + /* + * First of all, determine which column should be retrieved for this scan. + * + * We do this before deparsing SELECT clause because attributes which are + * not used in neither reltargetlist nor baserel->baserestrictinfo, quals + * evaluated on local, can be replaced with literal "NULL" in the SELECT + * clause to reduce overhead of tuple handling tuple and data transfer. + */ + foreach (lc, local_conds) + { + RestrictInfo *ri = (RestrictInfo *) lfirst(lc); + List *attrs; + + /* + * We need to know which attributes are used in qual evaluated + * on the local server, because they should be listed in the + * SELECT clause of remote query. We can ignore attributes + * which are referenced only in ORDER BY/GROUP BY clause because + * such attributes has already been kept in reltargetlist. + */ + attrs = pull_var_clause((Node *) ri->clause, + PVC_RECURSE_AGGREGATES, + PVC_RECURSE_PLACEHOLDERS); + attr_used = list_union(attr_used, attrs); + } + + /* + * deparse SELECT clause + * + * List attributes which are in either target list or local restriction. + * Unused attributes are replaced with a literal "NULL" for optimization. + * + * Note that nothing is added for dropped columns, though tuple constructor + * function requires entries for dropped columns. Such entries must be + * initialized with NULL before calling tuple constructor. + */ + appendStringInfo(buf, "SELECT "); + rte = root->simple_rte_array[baserel->relid]; + attr_used = list_union(attr_used, baserel->reltargetlist); + first = true; + for (attr = 1; attr <= baserel->max_attr; attr++) + { + Var *var = NULL; + ListCell *lc; + + /* Ignore dropped attributes. */ + if (get_rte_attribute_is_dropped(rte, attr)) + continue; + + if (!first) + appendStringInfo(buf, ", "); + first = false; + + /* + * We use linear search here, but it wouldn't be problem since + * attr_used seems to not become so large. + */ + foreach (lc, attr_used) + { + var = lfirst(lc); + if (var->varattno == attr) + break; + var = NULL; + } + if (var != NULL) + deparseVar(buf, var, root); + else + appendStringInfo(buf, "NULL"); + } + appendStringInfoChar(buf, ' '); + + /* + * deparse FROM clause, including alias if any + */ + appendStringInfo(buf, "FROM "); + deparseRelation(buf, root->simple_rte_array[baserel->relid]); +} + +/* + * Examine each element in the list baserestrictinfo of baserel, and classify + * them into three groups: remote_conds contains conditions which can be + * evaluated + * - remote_conds is push-down safe, and don't contain any Param node + * - param_conds is push-down safe, but contain some Param node + * - local_conds is not push-down safe + * + * Only remote_conds can be used in remote EXPLAIN, and remote_conds and + * param_conds can be used in final remote query. + */ +void +classifyConditions(PlannerInfo *root, + RelOptInfo *baserel, + List **remote_conds, + List **param_conds, + List **local_conds) +{ + ListCell *lc; + bool has_param; + + Assert(remote_conds); + Assert(param_conds); + Assert(local_conds); + + foreach(lc, baserel->baserestrictinfo) + { + RestrictInfo *ri = (RestrictInfo *) lfirst(lc); + + if (is_foreign_expr(root, baserel, ri->clause, &has_param)) + { + if (has_param) + *param_conds = lappend(*param_conds, ri); + else + *remote_conds = lappend(*remote_conds, ri); + } + else + *local_conds = lappend(*local_conds, ri); + } +} + +/* + * Deparse SELECT statement to acquire sample rows of given relation into buf. + */ +void +deparseAnalyzeSql(StringInfo buf, Relation rel) +{ + Oid relid = RelationGetRelid(rel); + TupleDesc tupdesc = RelationGetDescr(rel); + int i; + char *colname; + List *options; + ListCell *lc; + bool first = true; + char *nspname; + char *relname; + ForeignTable *table; + + /* Deparse SELECT clause, use attribute name or colname option. */ + appendStringInfo(buf, "SELECT "); + for (i = 0; i < tupdesc->natts; i++) + { + if (tupdesc->attrs[i]->attisdropped) + continue; + + colname = NameStr(tupdesc->attrs[i]->attname); + options = GetForeignColumnOptions(relid, tupdesc->attrs[i]->attnum); + + foreach(lc, options) + { + DefElem *def= (DefElem *) lfirst(lc); + + if (strcmp(def->defname, "colname") == 0) + { + colname = defGetString(def); + break; + } + } + + if (!first) + appendStringInfo(buf, ", "); + appendStringInfo(buf, "%s", quote_identifier(colname)); + first = false; + } + + /* + * Deparse FROM clause, use namespace and relation name, or use nspname and + * colname options respectively. + */ + nspname = get_namespace_name(get_rel_namespace(relid)); + relname = get_rel_name(relid); + table = GetForeignTable(relid); + foreach(lc, table->options) + { + DefElem *def= (DefElem *) lfirst(lc); + + if (strcmp(def->defname, "nspname") == 0) + nspname = defGetString(def); + else if (strcmp(def->defname, "relname") == 0) + relname = defGetString(def); + } + + appendStringInfo(buf, " FROM %s.%s", quote_identifier(nspname), + quote_identifier(relname)); +} + +/* + * Deparse given expression into buf. Actual string operation is delegated to + * node-type-specific functions. + * + * Note that switch statement of this function MUST match the one in + * foreign_expr_walker to avoid unsupported error.. + */ +static void +deparseExpr(StringInfo buf, Expr *node, PlannerInfo *root) +{ + /* + * This part must be match foreign_expr_walker. + */ + switch (nodeTag(node)) + { + case T_Const: + deparseConst(buf, (Const *) node, root); + break; + case T_BoolExpr: + deparseBoolExpr(buf, (BoolExpr *) node, root); + break; + case T_NullTest: + deparseNullTest(buf, (NullTest *) node, root); + break; + case T_DistinctExpr: + deparseDistinctExpr(buf, (DistinctExpr *) node, root); + break; + case T_RelabelType: + deparseRelabelType(buf, (RelabelType *) node, root); + break; + case T_FuncExpr: + deparseFuncExpr(buf, (FuncExpr *) node, root); + break; + case T_Param: + deparseParam(buf, (Param *) node, root); + break; + case T_ScalarArrayOpExpr: + deparseScalarArrayOpExpr(buf, (ScalarArrayOpExpr *) node, root); + break; + case T_OpExpr: + deparseOpExpr(buf, (OpExpr *) node, root); + break; + case T_Var: + deparseVar(buf, (Var *) node, root); + break; + case T_ArrayRef: + deparseArrayRef(buf, (ArrayRef *) node, root); + break; + case T_ArrayExpr: + deparseArrayExpr(buf, (ArrayExpr *) node, root); + break; + default: + { + ereport(ERROR, + (errmsg("unsupported expression for deparse"), + errdetail("%s", nodeToString(node)))); + } + break; + } +} + +/* + * Deparse given Var node into buf. If the column has colname FDW option, use + * its value instead of attribute name. + */ +static void +deparseVar(StringInfo buf, Var *node, PlannerInfo *root) +{ + RangeTblEntry *rte; + char *colname = NULL; + const char *q_colname = NULL; + List *options; + ListCell *lc; + + /* node must not be any of OUTER_VAR,INNER_VAR and INDEX_VAR. */ + Assert(node->varno >= 1 && node->varno <= root->simple_rel_array_size); + + /* Get RangeTblEntry from array in PlannerInfo. */ + rte = root->simple_rte_array[node->varno]; + + /* + * If the node is a column of a foreign table, and it has colname FDW + * option, use its value. + */ + options = GetForeignColumnOptions(rte->relid, node->varattno); + foreach(lc, options) + { + DefElem *def = (DefElem *) lfirst(lc); + + if (strcmp(def->defname, "colname") == 0) + { + colname = defGetString(def); + break; + } + } + + /* + * If the node refers a column of a regular table or it doesn't have colname + * FDW option, use attribute name. + */ + if (colname == NULL) + colname = get_attname(rte->relid, node->varattno); + + q_colname = quote_identifier(colname); + appendStringInfo(buf, "%s", q_colname); +} + +/* + * Deparse a RangeTblEntry node into buf. If rte represents a foreign table, + * use value of relname FDW option (if any) instead of relation's name. + * Similarly, nspname FDW option overrides schema name. + */ +static void +deparseRelation(StringInfo buf, RangeTblEntry *rte) +{ + ForeignTable *table; + ListCell *lc; + const char *nspname = NULL; /* plain namespace name */ + const char *relname = NULL; /* plain relation name */ + const char *q_nspname; /* quoted namespace name */ + const char *q_relname; /* quoted relation name */ + + /* obtain additional catalog information. */ + table = GetForeignTable(rte->relid); + + /* + * Use value of FDW options if any, instead of the name of object + * itself. + */ + foreach(lc, table->options) + { + DefElem *def = (DefElem *) lfirst(lc); + + if (strcmp(def->defname, "nspname") == 0) + nspname = defGetString(def); + else if (strcmp(def->defname, "relname") == 0) + relname = defGetString(def); + } + + /* Quote each identifier, if necessary. */ + if (nspname == NULL) + nspname = get_namespace_name(get_rel_namespace(rte->relid)); + q_nspname = quote_identifier(nspname); + + if (relname == NULL) + relname = get_rel_name(rte->relid); + q_relname = quote_identifier(relname); + + /* Construct relation reference into the buffer. */ + appendStringInfo(buf, "%s.%s", q_nspname, q_relname); +} + +/* + * Deparse given constant value into buf. This function have to be kept in + * sync with get_const_expr. + */ +static void +deparseConst(StringInfo buf, + Const *node, + PlannerInfo *root) +{ + Oid typoutput; + bool typIsVarlena; + char *extval; + bool isfloat = false; + bool needlabel; + + if (node->constisnull) + { + appendStringInfo(buf, "NULL"); + return; + } + + getTypeOutputInfo(node->consttype, + &typoutput, &typIsVarlena); + extval = OidOutputFunctionCall(typoutput, node->constvalue); + + switch (node->consttype) + { + case ANYARRAYOID: + case ANYNONARRAYOID: + elog(ERROR, "anyarray and anyenum are not supported"); + break; + case INT2OID: + case INT4OID: + case INT8OID: + case OIDOID: + case FLOAT4OID: + case FLOAT8OID: + case NUMERICOID: + { + /* + * No need to quote unless they contain special values such as + * 'Nan'. + */ + if (strspn(extval, "0123456789+-eE.") == strlen(extval)) + { + if (extval[0] == '+' || extval[0] == '-') + appendStringInfo(buf, "(%s)", extval); + else + appendStringInfoString(buf, extval); + if (strcspn(extval, "eE.") != strlen(extval)) + isfloat = true; /* it looks like a float */ + } + else + appendStringInfo(buf, "'%s'", extval); + } + break; + case BITOID: + case VARBITOID: + appendStringInfo(buf, "B'%s'", extval); + break; + case BOOLOID: + if (strcmp(extval, "t") == 0) + appendStringInfoString(buf, "true"); + else + appendStringInfoString(buf, "false"); + break; + + default: + { + const char *valptr; + + appendStringInfoChar(buf, '\''); + for (valptr = extval; *valptr; valptr++) + { + char ch = *valptr; + + /* + * standard_conforming_strings of remote session should be + * set to similar value as local session. + */ + if (SQL_STR_DOUBLE(ch, !standard_conforming_strings)) + appendStringInfoChar(buf, ch); + appendStringInfoChar(buf, ch); + } + appendStringInfoChar(buf, '\''); + } + break; + } + + /* + * Append ::typename unless the constant will be implicitly typed as the + * right type when it is read in. + * + * XXX this code has to be kept in sync with the behavior of the parser, + * especially make_const. + */ + switch (node->consttype) + { + case BOOLOID: + case INT4OID: + case UNKNOWNOID: + needlabel = false; + break; + case NUMERICOID: + needlabel = !isfloat || (node->consttypmod >= 0); + break; + default: + needlabel = true; + break; + } + if (needlabel) + { + appendStringInfo(buf, "::%s", + format_type_with_typemod(node->consttype, + node->consttypmod)); + } +} + +static void +deparseBoolExpr(StringInfo buf, + BoolExpr *node, + PlannerInfo *root) +{ + ListCell *lc; + char *op = NULL; /* keep compiler quiet */ + bool first; + + switch (node->boolop) + { + case AND_EXPR: + op = "AND"; + break; + case OR_EXPR: + op = "OR"; + break; + case NOT_EXPR: + appendStringInfo(buf, "(NOT "); + deparseExpr(buf, list_nth(node->args, 0), root); + appendStringInfo(buf, ")"); + return; + } + + first = true; + appendStringInfo(buf, "("); + foreach(lc, node->args) + { + if (!first) + appendStringInfo(buf, " %s ", op); + deparseExpr(buf, (Expr *) lfirst(lc), root); + first = false; + } + appendStringInfo(buf, ")"); +} + +/* + * Deparse given IS [NOT] NULL test expression into buf. + */ +static void +deparseNullTest(StringInfo buf, + NullTest *node, + PlannerInfo *root) +{ + appendStringInfoChar(buf, '('); + deparseExpr(buf, node->arg, root); + if (node->nulltesttype == IS_NULL) + appendStringInfo(buf, " IS NULL)"); + else + appendStringInfo(buf, " IS NOT NULL)"); +} + +static void +deparseDistinctExpr(StringInfo buf, + DistinctExpr *node, + PlannerInfo *root) +{ + Assert(list_length(node->args) == 2); + + deparseExpr(buf, linitial(node->args), root); + appendStringInfo(buf, " IS DISTINCT FROM "); + deparseExpr(buf, lsecond(node->args), root); +} + +static void +deparseRelabelType(StringInfo buf, + RelabelType *node, + PlannerInfo *root) +{ + char *typname; + + Assert(node->arg); + + /* We don't need to deparse cast when argument has same type as result. */ + if (IsA(node->arg, Const) && + ((Const *) node->arg)->consttype == node->resulttype && + ((Const *) node->arg)->consttypmod == -1) + { + deparseExpr(buf, node->arg, root); + return; + } + + typname = format_type_with_typemod(node->resulttype, node->resulttypmod); + appendStringInfoChar(buf, '('); + deparseExpr(buf, node->arg, root); + appendStringInfo(buf, ")::%s", typname); +} + +/* + * Deparse given node which represents a function call into buf. We treat only + * explicit function call and explicit cast (coerce), because others are + * processed on remote side if necessary. + * + * Function name (and type name) is always qualified by schema name to avoid + * problems caused by different setting of search_path on remote side. + */ +static void +deparseFuncExpr(StringInfo buf, + FuncExpr *node, + PlannerInfo *root) +{ + Oid pronamespace; + const char *schemaname; + const char *funcname; + ListCell *arg; + bool first; + + pronamespace = get_func_namespace(node->funcid); + schemaname = quote_identifier(get_namespace_name(pronamespace)); + funcname = quote_identifier(get_func_name(node->funcid)); + + if (node->funcformat == COERCE_EXPLICIT_CALL) + { + /* Function call, deparse all arguments recursively. */ + appendStringInfo(buf, "%s.%s(", schemaname, funcname); + first = true; + foreach(arg, node->args) + { + if (!first) + appendStringInfo(buf, ", "); + deparseExpr(buf, lfirst(arg), root); + first = false; + } + appendStringInfoChar(buf, ')'); + } + else if (node->funcformat == COERCE_EXPLICIT_CAST) + { + /* Explicit cast, deparse only first argument. */ + appendStringInfoChar(buf, '('); + deparseExpr(buf, linitial(node->args), root); + appendStringInfo(buf, ")::%s", funcname); + } + else + { + /* Implicit cast, deparse only first argument. */ + deparseExpr(buf, linitial(node->args), root); + } +} + +/* + * Deparse given Param node into buf. + * + * We don't renumber parameter id, because skipping $1 is not cause problem + * as far as we pass through all arguments. + */ +static void +deparseParam(StringInfo buf, + Param *node, + PlannerInfo *root) +{ + Assert(node->paramkind == PARAM_EXTERN); + + appendStringInfo(buf, "$%d", node->paramid); +} + +/* + * Deparse given ScalarArrayOpExpr expression into buf. To avoid problems + * around priority of operations, we always parenthesize the arguments. Also we + * use OPERATOR(schema.operator) notation to determine remote operator exactly. + */ +static void +deparseScalarArrayOpExpr(StringInfo buf, + ScalarArrayOpExpr *node, + PlannerInfo *root) +{ + HeapTuple tuple; + Form_pg_operator form; + const char *opnspname; + char *opname; + Expr *arg1; + Expr *arg2; + + /* Retrieve necessary information about the operator from system catalog. */ + tuple = SearchSysCache1(OPEROID, ObjectIdGetDatum(node->opno)); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "cache lookup failed for operator %u", node->opno); + form = (Form_pg_operator) GETSTRUCT(tuple); + /* opname is not a SQL identifier, so we don't need to quote it. */ + opname = NameStr(form->oprname); + opnspname = quote_identifier(get_namespace_name(form->oprnamespace)); + ReleaseSysCache(tuple); + + /* Sanity check. */ + Assert(list_length(node->args) == 2); + + /* Always parenthesize the expression. */ + appendStringInfoChar(buf, '('); + + /* Extract operands. */ + arg1 = linitial(node->args); + arg2 = lsecond(node->args); + + /* Deparse fully qualified operator name. */ + deparseExpr(buf, arg1, root); + appendStringInfo(buf, " OPERATOR(%s.%s) %s (", + opnspname, opname, node->useOr ? "ANY" : "ALL"); + deparseExpr(buf, arg2, root); + appendStringInfoChar(buf, ')'); + + /* Always parenthesize the expression. */ + appendStringInfoChar(buf, ')'); +} + +/* + * Deparse given operator expression into buf. To avoid problems around + * priority of operations, we always parenthesize the arguments. Also we use + * OPERATOR(schema.operator) notation to determine remote operator exactly. + */ +static void +deparseOpExpr(StringInfo buf, + OpExpr *node, + PlannerInfo *root) +{ + HeapTuple tuple; + Form_pg_operator form; + const char *opnspname; + char *opname; + char oprkind; + ListCell *arg; + + /* Retrieve necessary information about the operator from system catalog. */ + tuple = SearchSysCache1(OPEROID, ObjectIdGetDatum(node->opno)); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "cache lookup failed for operator %u", node->opno); + form = (Form_pg_operator) GETSTRUCT(tuple); + opnspname = quote_identifier(get_namespace_name(form->oprnamespace)); + /* opname is not a SQL identifier, so we don't need to quote it. */ + opname = NameStr(form->oprname); + oprkind = form->oprkind; + ReleaseSysCache(tuple); + + /* Sanity check. */ + Assert((oprkind == 'r' && list_length(node->args) == 1) || + (oprkind == 'l' && list_length(node->args) == 1) || + (oprkind == 'b' && list_length(node->args) == 2)); + + /* Always parenthesize the expression. */ + appendStringInfoChar(buf, '('); + + /* Deparse first operand. */ + arg = list_head(node->args); + if (oprkind == 'r' || oprkind == 'b') + { + deparseExpr(buf, lfirst(arg), root); + appendStringInfoChar(buf, ' '); + } + + /* Deparse fully qualified operator name. */ + appendStringInfo(buf, "OPERATOR(%s.%s)", opnspname, opname); + + /* Deparse last operand. */ + arg = list_tail(node->args); + if (oprkind == 'l' || oprkind == 'b') + { + appendStringInfoChar(buf, ' '); + deparseExpr(buf, lfirst(arg), root); + } + + appendStringInfoChar(buf, ')'); +} + +static void +deparseArrayRef(StringInfo buf, + ArrayRef *node, + PlannerInfo *root) +{ + ListCell *lowlist_item; + ListCell *uplist_item; + + /* Always parenthesize the expression. */ + appendStringInfoChar(buf, '('); + + /* Deparse referenced array expression first. */ + appendStringInfoChar(buf, '('); + deparseExpr(buf, node->refexpr, root); + appendStringInfoChar(buf, ')'); + + /* Deparse subscripts expression. */ + lowlist_item = list_head(node->reflowerindexpr); /* could be NULL */ + foreach(uplist_item, node->refupperindexpr) + { + appendStringInfoChar(buf, '['); + if (lowlist_item) + { + deparseExpr(buf, lfirst(lowlist_item), root); + appendStringInfoChar(buf, ':'); + lowlist_item = lnext(lowlist_item); + } + deparseExpr(buf, lfirst(uplist_item), root); + appendStringInfoChar(buf, ']'); + } + + appendStringInfoChar(buf, ')'); +} + + +/* + * Deparse given array of something into buf. + */ +static void +deparseArrayExpr(StringInfo buf, + ArrayExpr *node, + PlannerInfo *root) +{ + ListCell *lc; + bool first = true; + + appendStringInfo(buf, "ARRAY["); + foreach(lc, node->elements) + { + if (!first) + appendStringInfo(buf, ", "); + deparseExpr(buf, lfirst(lc), root); + + first = false; + } + appendStringInfoChar(buf, ']'); + + /* If the array is empty, we need explicit cast to the array type. */ + if (node->elements == NIL) + { + char *typname; + + typname = format_type_with_typemod(node->array_typeid, -1); + appendStringInfo(buf, "::%s", typname); + } +} + +/* + * Returns true if given expr is safe to evaluate on the foreign server. If + * result is true, extra information has_param tells whether given expression + * contains any Param node. This is useful to determine whether the expression + * can be used in remote EXPLAIN. + */ +static bool +is_foreign_expr(PlannerInfo *root, + RelOptInfo *baserel, + Expr *expr, + bool *has_param) +{ + foreign_executable_cxt context; + context.root = root; + context.foreignrel = baserel; + context.has_param = false; + + /* + * An expression which includes any mutable function can't be pushed down + * because it's result is not stable. For example, pushing now() down to + * remote side would cause confusion from the clock offset. + * If we have routine mapping infrastructure in future release, we will be + * able to choose function to be pushed down in finer granularity. + */ + if (contain_mutable_functions((Node *) expr)) + { + elog(DEBUG3, "expr has mutable function"); + return false; + } + + /* + * Check that the expression consists of nodes which are known as safe to + * be pushed down. + */ + if (foreign_expr_walker((Node *) expr, &context)) + return false; + + /* + * Tell caller whether the given expression contains any Param node, which + * can't be used in EXPLAIN statement before executor starts. + */ + *has_param = context.has_param; + + return true; +} + +/* + * Return true if node includes any node which is not known as safe to be + * pushed down. + */ +static bool +foreign_expr_walker(Node *node, foreign_executable_cxt *context) +{ + if (node == NULL) + return false; + + /* + * Special case handling for List; expression_tree_walker handles List as + * well as other Expr nodes. For instance, List is used in RestrictInfo + * for args of FuncExpr node. + * + * Although the comments of expression_tree_walker mention that + * RangeTblRef, FromExpr, JoinExpr, and SetOperationStmt are handled as + * well, but we don't care them because they are not used in RestrictInfo. + * If one of them was passed into, default label catches it and give up + * traversing. + */ + if (IsA(node, List)) + { + ListCell *lc; + + foreach(lc, (List *) node) + { + if (foreign_expr_walker(lfirst(lc), context)) + return true; + } + return false; + } + + /* + * If return type of given expression is not built-in, it can't be pushed + * down because it might has incompatible semantics on remote side. + */ + if (!is_builtin(exprType(node))) + { + elog(DEBUG3, "expr has user-defined type"); + return true; + } + + switch (nodeTag(node)) + { + case T_Const: + /* + * Using anyarray and/or anyenum in remote query is not supported. + */ + if (((Const *) node)->consttype == ANYARRAYOID || + ((Const *) node)->consttype == ANYNONARRAYOID) + { + elog(DEBUG3, "expr has anyarray or anyenum"); + return true; + } + break; + case T_BoolExpr: + case T_NullTest: + case T_DistinctExpr: + case T_RelabelType: + /* + * These type of nodes are known as safe to be pushed down. + * Of course the subtree of the node, if any, should be checked + * continuously at the tail of this function. + */ + break; + /* + * If function used by the expression is not built-in, it can't be + * pushed down because it might has incompatible semantics on remote + * side. + */ + case T_FuncExpr: + { + FuncExpr *fe = (FuncExpr *) node; + if (!is_builtin(fe->funcid)) + { + elog(DEBUG3, "expr has user-defined function"); + return true; + } + } + break; + case T_Param: + /* + * Only external parameters can be pushed down.: + */ + { + if (((Param *) node)->paramkind != PARAM_EXTERN) + { + elog(DEBUG3, "expr has non-external parameter"); + return true; + } + + /* Mark that this expression contains Param node. */ + context->has_param = true; + } + break; + case T_ScalarArrayOpExpr: + /* + * Only built-in operators can be pushed down. In addition, + * underlying function must be built-in and immutable, but we don't + * check volatility here; such check must be done already with + * contain_mutable_functions. + */ + { + ScalarArrayOpExpr *oe = (ScalarArrayOpExpr *) node; + + if (!is_builtin(oe->opno) || !is_builtin(oe->opfuncid)) + { + elog(DEBUG3, "expr has user-defined scalar-array operator"); + return true; + } + + /* + * If the operator takes collatable type as operands, we push + * down only "=" and "<>" which are not affected by collation. + * Other operators might be safe about collation, but these two + * seem enough to cover practical use cases. + */ + if (exprInputCollation(node) != InvalidOid) + { + char *opname = get_opname(oe->opno); + + if (strcmp(opname, "=") != 0 && strcmp(opname, "<>") != 0) + { + elog(DEBUG3, "expr has scalar-array operator which takes collatable as operand"); + return true; + } + } + + /* operands are checked later */ + } + break; + case T_OpExpr: + /* + * Only built-in operators can be pushed down. In addition, + * underlying function must be built-in and immutable, but we don't + * check volatility here; such check must be done already with + * contain_mutable_functions. + */ + { + OpExpr *oe = (OpExpr *) node; + + if (!is_builtin(oe->opno) || !is_builtin(oe->opfuncid)) + { + elog(DEBUG3, "expr has user-defined operator"); + return true; + } + + /* + * If the operator takes collatable type as operands, we push + * down only "=" and "<>" which are not affected by collation. + * Other operators might be safe about collation, but these two + * seem enough to cover practical use cases. + */ + if (exprInputCollation(node) != InvalidOid) + { + char *opname = get_opname(oe->opno); + + if (strcmp(opname, "=") != 0 && strcmp(opname, "<>") != 0) + { + elog(DEBUG3, "expr has operator which takes collatable as operand"); + return true; + } + } + + /* operands are checked later */ + } + break; + case T_Var: + /* + * Var can be pushed down if it is in the foreign table. + * XXX Var of other relation can be here? + */ + { + Var *var = (Var *) node; + foreign_executable_cxt *f_context; + + f_context = (foreign_executable_cxt *) context; + if (var->varno != f_context->foreignrel->relid || + var->varlevelsup != 0) + { + elog(DEBUG3, "expr has var of other relation"); + return true; + } + } + break; + case T_ArrayRef: + /* + * ArrayRef which holds non-built-in typed elements can't be pushed + * down. + */ + { + ArrayRef *ar = (ArrayRef *) node;; + + if (!is_builtin(ar->refelemtype)) + { + elog(DEBUG3, "expr has user-defined type as array element"); + return true; + } + + /* Assignment should not be in restrictions. */ + if (ar->refassgnexpr != NULL) + { + elog(DEBUG3, "expr has assignment"); + return true; + } + } + break; + case T_ArrayExpr: + /* + * ArrayExpr which holds non-built-in typed elements can't be pushed + * down. + */ + { + if (!is_builtin(((ArrayExpr *) node)->element_typeid)) + { + elog(DEBUG3, "expr has user-defined type as array element"); + return true; + } + } + break; + default: + { + elog(DEBUG3, "expression is too complex: %s", + nodeToString(node)); + return true; + } + break; + } + + return expression_tree_walker(node, foreign_expr_walker, context); +} + +/* + * Return true if given object is one of built-in objects. + */ +static bool +is_builtin(Oid oid) +{ + return (oid < FirstNormalObjectId); +} + +/* + * Deparse WHERE clause from given list of RestrictInfo and append them to buf. + * We assume that buf already holds a SQL statement which ends with valid WHERE + * clause. + * + * Only when calling the first time for a statement, is_first should be true. + */ +void +appendWhereClause(StringInfo buf, + bool is_first, + List *exprs, + PlannerInfo *root) +{ + bool first = true; + ListCell *lc; + + foreach(lc, exprs) + { + RestrictInfo *ri = (RestrictInfo *) lfirst(lc); + + /* Connect expressions with "AND" and parenthesize whole condition. */ + if (is_first && first) + appendStringInfo(buf, " WHERE "); + else + appendStringInfo(buf, " AND "); + + appendStringInfoChar(buf, '('); + deparseExpr(buf, ri->clause, root); + appendStringInfoChar(buf, ')'); + + first = false; + } +} diff --git a/contrib/postgresql_fdw/expected/postgresql_fdw.out b/contrib/postgresql_fdw/expected/postgresql_fdw.out new file mode 100644 index 0000000..58e3530 --- /dev/null +++ b/contrib/postgresql_fdw/expected/postgresql_fdw.out @@ -0,0 +1,715 @@ +-- =================================================================== +-- create FDW objects +-- =================================================================== +-- Clean up in case a prior regression run failed +-- Suppress NOTICE messages when roles don't exist +SET client_min_messages TO 'error'; +DROP ROLE IF EXISTS postgresql_fdw_user; +RESET client_min_messages; +CREATE ROLE postgresql_fdw_user LOGIN SUPERUSER; +SET SESSION AUTHORIZATION 'postgresql_fdw_user'; +CREATE EXTENSION postgresql_fdw; +CREATE SERVER loopback1 FOREIGN DATA WRAPPER postgresql_fdw; +CREATE SERVER loopback2 FOREIGN DATA WRAPPER postgresql_fdw + OPTIONS (dbname 'contrib_regression'); +CREATE USER MAPPING FOR public SERVER loopback1 + OPTIONS (user 'value', password 'value'); +CREATE USER MAPPING FOR postgresql_fdw_user SERVER loopback2; +-- =================================================================== +-- create objects used through FDW +-- =================================================================== +CREATE TYPE user_enum AS ENUM ('foo', 'bar', 'buz'); +CREATE SCHEMA "S 1"; +CREATE TABLE "S 1"."T 1" ( + "C 1" int NOT NULL, + c2 int NOT NULL, + c3 text, + c4 timestamptz, + c5 timestamp, + c6 varchar(10), + c7 char(10), + c8 user_enum, + CONSTRAINT t1_pkey PRIMARY KEY ("C 1") +); +CREATE TABLE "S 1"."T 2" ( + c1 int NOT NULL, + c2 text, + CONSTRAINT t2_pkey PRIMARY KEY (c1) +); +BEGIN; +TRUNCATE "S 1"."T 1"; +INSERT INTO "S 1"."T 1" + SELECT id, + id % 10, + to_char(id, 'FM00000'), + '1970-01-01'::timestamptz + ((id % 100) || ' days')::interval, + '1970-01-01'::timestamp + ((id % 100) || ' days')::interval, + id % 10, + id % 10, + 'foo'::user_enum + FROM generate_series(1, 1000) id; +TRUNCATE "S 1"."T 2"; +INSERT INTO "S 1"."T 2" + SELECT id, + 'AAA' || to_char(id, 'FM000') + FROM generate_series(1, 100) id; +COMMIT; +-- =================================================================== +-- create foreign tables +-- =================================================================== +CREATE FOREIGN TABLE ft1 ( + c0 int, + c1 int NOT NULL, + c2 int NOT NULL, + c3 text, + c4 timestamptz, + c5 timestamp, + c6 varchar(10), + c7 char(10), + c8 user_enum +) SERVER loopback2; +ALTER FOREIGN TABLE ft1 DROP COLUMN c0; +CREATE FOREIGN TABLE ft2 ( + c0 int, + c1 int NOT NULL, + c2 int NOT NULL, + c3 text, + c4 timestamptz, + c5 timestamp, + c6 varchar(10), + c7 char(10), + c8 user_enum +) SERVER loopback2; +ALTER FOREIGN TABLE ft2 DROP COLUMN c0; +-- =================================================================== +-- tests for postgresql_fdw_validator +-- =================================================================== +ALTER FOREIGN DATA WRAPPER postgresql_fdw OPTIONS (host 'value'); -- ERROR +ERROR: invalid option "host" +HINT: Valid options in this context are: +-- requiressl, krbsrvname and gsslib are omitted because they depend on +-- configure option +ALTER SERVER loopback1 OPTIONS ( + authtype 'value', + service 'value', + connect_timeout 'value', + dbname 'value', + host 'value', + hostaddr 'value', + port 'value', + --client_encoding 'value', + tty 'value', + options 'value', + application_name 'value', + --fallback_application_name 'value', + keepalives 'value', + keepalives_idle 'value', + keepalives_interval 'value', + -- requiressl 'value', + sslcompression 'value', + sslmode 'value', + sslcert 'value', + sslkey 'value', + sslrootcert 'value', + sslcrl 'value' + --requirepeer 'value', + -- krbsrvname 'value', + -- gsslib 'value', + --replication 'value' +); +ALTER SERVER loopback1 OPTIONS (user 'value'); -- ERROR +ERROR: invalid option "user" +HINT: Valid options in this context are: authtype, service, connect_timeout, dbname, host, hostaddr, port, tty, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, requiressl, sslcompression, sslmode, sslcert, sslkey, sslrootcert, sslcrl, requirepeer, krbsrvname, gsslib +ALTER USER MAPPING FOR public SERVER loopback1 + OPTIONS (DROP user, DROP password); +ALTER USER MAPPING FOR public SERVER loopback1 + OPTIONS (host 'value'); -- ERROR +ERROR: invalid option "host" +HINT: Valid options in this context are: user, password +ALTER FOREIGN TABLE ft1 OPTIONS (nspname 'S 1', relname 'T 1'); +ALTER FOREIGN TABLE ft2 OPTIONS (nspname 'S 1', relname 'T 1'); +ALTER FOREIGN TABLE ft1 OPTIONS (invalid 'value'); -- ERROR +ERROR: invalid option "invalid" +HINT: Valid options in this context are: nspname, relname +ALTER FOREIGN TABLE ft1 ALTER COLUMN c1 OPTIONS (invalid 'value'); -- ERROR +ERROR: invalid option "invalid" +HINT: Valid options in this context are: colname +ALTER FOREIGN TABLE ft1 ALTER COLUMN c1 OPTIONS (colname 'C 1'); +ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (colname 'C 1'); +\dew+ + List of foreign-data wrappers + Name | Owner | Handler | Validator | Access privileges | FDW Options | Description +----------------+---------------------+------------------------+--------------------------+-------------------+-------------+------------- + postgresql_fdw | postgresql_fdw_user | postgresql_fdw_handler | postgresql_fdw_validator | | | +(1 row) + +\des+ + List of foreign servers + Name | Owner | Foreign-data wrapper | Access privileges | Type | Version | FDW Options | Description +-----------+---------------------+----------------------+-------------------+------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------- + loopback1 | postgresql_fdw_user | postgresql_fdw | | | | (authtype 'value', service 'value', connect_timeout 'value', dbname 'value', host 'value', hostaddr 'value', port 'value', tty 'value', options 'value', application_name 'value', keepalives 'value', keepalives_idle 'value', keepalives_interval 'value', sslcompression 'value', sslmode 'value', sslcert 'value', sslkey 'value', sslrootcert 'value', sslcrl 'value') | + loopback2 | postgresql_fdw_user | postgresql_fdw | | | | (dbname 'contrib_regression') | +(2 rows) + +\deu+ + List of user mappings + Server | User name | FDW Options +-----------+---------------------+------------- + loopback1 | public | + loopback2 | postgresql_fdw_user | +(2 rows) + +\det+ + List of foreign tables + Schema | Table | Server | FDW Options | Description +--------+-------+-----------+--------------------------------+------------- + public | ft1 | loopback2 | (nspname 'S 1', relname 'T 1') | + public | ft2 | loopback2 | (nspname 'S 1', relname 'T 1') | +(2 rows) + +-- Use only Nested loop for stable results. +SET enable_mergejoin TO off; +SET enable_hashjoin TO off; +-- =================================================================== +-- simple queries +-- =================================================================== +-- single table, with/without alias +EXPLAIN (COSTS false) SELECT * FROM ft1 ORDER BY c3, c1 OFFSET 100 LIMIT 10; + QUERY PLAN +------------------------------------------------------------------------------------- + Limit + -> Sort + Sort Key: c3, c1 + -> Foreign Scan on ft1 + Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" +(5 rows) + +SELECT * FROM ft1 ORDER BY c3, c1 OFFSET 100 LIMIT 10; + c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 +-----+----+-------+------------------------------+--------------------------+----+------------+----- + 101 | 1 | 00101 | Fri Jan 02 00:00:00 1970 PST | Fri Jan 02 00:00:00 1970 | 1 | 1 | foo + 102 | 2 | 00102 | Sat Jan 03 00:00:00 1970 PST | Sat Jan 03 00:00:00 1970 | 2 | 2 | foo + 103 | 3 | 00103 | Sun Jan 04 00:00:00 1970 PST | Sun Jan 04 00:00:00 1970 | 3 | 3 | foo + 104 | 4 | 00104 | Mon Jan 05 00:00:00 1970 PST | Mon Jan 05 00:00:00 1970 | 4 | 4 | foo + 105 | 5 | 00105 | Tue Jan 06 00:00:00 1970 PST | Tue Jan 06 00:00:00 1970 | 5 | 5 | foo + 106 | 6 | 00106 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo + 107 | 7 | 00107 | Thu Jan 08 00:00:00 1970 PST | Thu Jan 08 00:00:00 1970 | 7 | 7 | foo + 108 | 8 | 00108 | Fri Jan 09 00:00:00 1970 PST | Fri Jan 09 00:00:00 1970 | 8 | 8 | foo + 109 | 9 | 00109 | Sat Jan 10 00:00:00 1970 PST | Sat Jan 10 00:00:00 1970 | 9 | 9 | foo + 110 | 0 | 00110 | Sun Jan 11 00:00:00 1970 PST | Sun Jan 11 00:00:00 1970 | 0 | 0 | foo +(10 rows) + +EXPLAIN (COSTS false) SELECT * FROM ft1 t1 ORDER BY t1.c3, t1.c1 OFFSET 100 LIMIT 10; + QUERY PLAN +------------------------------------------------------------------------------------- + Limit + -> Sort + Sort Key: c3, c1 + -> Foreign Scan on ft1 t1 + Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" +(5 rows) + +SELECT * FROM ft1 t1 ORDER BY t1.c3, t1.c1 OFFSET 100 LIMIT 10; + c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 +-----+----+-------+------------------------------+--------------------------+----+------------+----- + 101 | 1 | 00101 | Fri Jan 02 00:00:00 1970 PST | Fri Jan 02 00:00:00 1970 | 1 | 1 | foo + 102 | 2 | 00102 | Sat Jan 03 00:00:00 1970 PST | Sat Jan 03 00:00:00 1970 | 2 | 2 | foo + 103 | 3 | 00103 | Sun Jan 04 00:00:00 1970 PST | Sun Jan 04 00:00:00 1970 | 3 | 3 | foo + 104 | 4 | 00104 | Mon Jan 05 00:00:00 1970 PST | Mon Jan 05 00:00:00 1970 | 4 | 4 | foo + 105 | 5 | 00105 | Tue Jan 06 00:00:00 1970 PST | Tue Jan 06 00:00:00 1970 | 5 | 5 | foo + 106 | 6 | 00106 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo + 107 | 7 | 00107 | Thu Jan 08 00:00:00 1970 PST | Thu Jan 08 00:00:00 1970 | 7 | 7 | foo + 108 | 8 | 00108 | Fri Jan 09 00:00:00 1970 PST | Fri Jan 09 00:00:00 1970 | 8 | 8 | foo + 109 | 9 | 00109 | Sat Jan 10 00:00:00 1970 PST | Sat Jan 10 00:00:00 1970 | 9 | 9 | foo + 110 | 0 | 00110 | Sun Jan 11 00:00:00 1970 PST | Sun Jan 11 00:00:00 1970 | 0 | 0 | foo +(10 rows) + +-- empty result +SELECT * FROM ft1 WHERE false; + c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 +----+----+----+----+----+----+----+---- +(0 rows) + +-- with WHERE clause +EXPLAIN (COSTS false) SELECT * FROM ft1 t1 WHERE t1.c1 = 101 AND t1.c6 = '1' AND t1.c7 >= '1'; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------------------------------------ + Foreign Scan on ft1 t1 + Filter: (c7 >= '1'::bpchar) + Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE (("C 1" OPERATOR(pg_catalog.=) 101)) AND (((c6)::text OPERATOR(pg_catalog.=) '1'::text)) +(3 rows) + +SELECT * FROM ft1 t1 WHERE t1.c1 = 101 AND t1.c6 = '1' AND t1.c7 >= '1'; + c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 +-----+----+-------+------------------------------+--------------------------+----+------------+----- + 101 | 1 | 00101 | Fri Jan 02 00:00:00 1970 PST | Fri Jan 02 00:00:00 1970 | 1 | 1 | foo +(1 row) + +-- aggregate +SELECT COUNT(*) FROM ft1 t1; + count +------- + 1000 +(1 row) + +-- join two tables +SELECT t1.c1 FROM ft1 t1 JOIN ft2 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c3, t1.c1 OFFSET 100 LIMIT 10; + c1 +----- + 101 + 102 + 103 + 104 + 105 + 106 + 107 + 108 + 109 + 110 +(10 rows) + +-- subquery +SELECT * FROM ft1 t1 WHERE t1.c3 IN (SELECT c3 FROM ft2 t2 WHERE c1 <= 10) ORDER BY c1; + c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 +----+----+-------+------------------------------+--------------------------+----+------------+----- + 1 | 1 | 00001 | Fri Jan 02 00:00:00 1970 PST | Fri Jan 02 00:00:00 1970 | 1 | 1 | foo + 2 | 2 | 00002 | Sat Jan 03 00:00:00 1970 PST | Sat Jan 03 00:00:00 1970 | 2 | 2 | foo + 3 | 3 | 00003 | Sun Jan 04 00:00:00 1970 PST | Sun Jan 04 00:00:00 1970 | 3 | 3 | foo + 4 | 4 | 00004 | Mon Jan 05 00:00:00 1970 PST | Mon Jan 05 00:00:00 1970 | 4 | 4 | foo + 5 | 5 | 00005 | Tue Jan 06 00:00:00 1970 PST | Tue Jan 06 00:00:00 1970 | 5 | 5 | foo + 6 | 6 | 00006 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo + 7 | 7 | 00007 | Thu Jan 08 00:00:00 1970 PST | Thu Jan 08 00:00:00 1970 | 7 | 7 | foo + 8 | 8 | 00008 | Fri Jan 09 00:00:00 1970 PST | Fri Jan 09 00:00:00 1970 | 8 | 8 | foo + 9 | 9 | 00009 | Sat Jan 10 00:00:00 1970 PST | Sat Jan 10 00:00:00 1970 | 9 | 9 | foo + 10 | 0 | 00010 | Sun Jan 11 00:00:00 1970 PST | Sun Jan 11 00:00:00 1970 | 0 | 0 | foo +(10 rows) + +-- subquery+MAX +SELECT * FROM ft1 t1 WHERE t1.c3 = (SELECT MAX(c3) FROM ft2 t2) ORDER BY c1; + c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 +------+----+-------+------------------------------+--------------------------+----+------------+----- + 1000 | 0 | 01000 | Thu Jan 01 00:00:00 1970 PST | Thu Jan 01 00:00:00 1970 | 0 | 0 | foo +(1 row) + +-- used in CTE +WITH t1 AS (SELECT * FROM ft1 WHERE c1 <= 10) SELECT t2.c1, t2.c2, t2.c3, t2.c4 FROM t1, ft2 t2 WHERE t1.c1 = t2.c1 ORDER BY t1.c1; + c1 | c2 | c3 | c4 +----+----+-------+------------------------------ + 1 | 1 | 00001 | Fri Jan 02 00:00:00 1970 PST + 2 | 2 | 00002 | Sat Jan 03 00:00:00 1970 PST + 3 | 3 | 00003 | Sun Jan 04 00:00:00 1970 PST + 4 | 4 | 00004 | Mon Jan 05 00:00:00 1970 PST + 5 | 5 | 00005 | Tue Jan 06 00:00:00 1970 PST + 6 | 6 | 00006 | Wed Jan 07 00:00:00 1970 PST + 7 | 7 | 00007 | Thu Jan 08 00:00:00 1970 PST + 8 | 8 | 00008 | Fri Jan 09 00:00:00 1970 PST + 9 | 9 | 00009 | Sat Jan 10 00:00:00 1970 PST + 10 | 0 | 00010 | Sun Jan 11 00:00:00 1970 PST +(10 rows) + +-- fixed values +SELECT 'fixed', NULL FROM ft1 t1 WHERE c1 = 1; + ?column? | ?column? +----------+---------- + fixed | +(1 row) + +-- user-defined operator/function +CREATE FUNCTION postgresql_fdw_abs(int) RETURNS int AS $$ +BEGIN +RETURN abs($1); +END +$$ LANGUAGE plpgsql IMMUTABLE; +CREATE OPERATOR === ( + LEFTARG = int, + RIGHTARG = int, + PROCEDURE = int4eq, + COMMUTATOR = ===, + NEGATOR = !== +); +EXPLAIN (COSTS false) SELECT * FROM ft1 t1 WHERE t1.c1 = postgresql_fdw_abs(t1.c2); + QUERY PLAN +------------------------------------------------------------------------- + Foreign Scan on ft1 t1 + Filter: (c1 = postgresql_fdw_abs(c2)) + Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" +(3 rows) + +EXPLAIN (COSTS false) SELECT * FROM ft1 t1 WHERE t1.c1 === t1.c2; + QUERY PLAN +------------------------------------------------------------------------- + Foreign Scan on ft1 t1 + Filter: (c1 === c2) + Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" +(3 rows) + +EXPLAIN (COSTS false) SELECT * FROM ft1 t1 WHERE t1.c1 = abs(t1.c2); + QUERY PLAN +----------------------------------------------------------------------------------------------------------------------------------- + Foreign Scan on ft1 t1 + Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE (("C 1" OPERATOR(pg_catalog.=) pg_catalog.abs(c2))) +(2 rows) + +EXPLAIN (COSTS false) SELECT * FROM ft1 t1 WHERE t1.c1 = t1.c2; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------- + Foreign Scan on ft1 t1 + Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE (("C 1" OPERATOR(pg_catalog.=) c2)) +(2 rows) + +-- =================================================================== +-- WHERE push down +-- =================================================================== +EXPLAIN (COSTS false) SELECT * FROM ft1 t1 WHERE t1.c1 = 1; -- Var, OpExpr(b), Const + QUERY PLAN +------------------------------------------------------------------------------------------------------------------ + Foreign Scan on ft1 t1 + Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE (("C 1" OPERATOR(pg_catalog.=) 1)) +(2 rows) + +EXPLAIN (COSTS false) SELECT * FROM ft1 t1 WHERE t1.c1 = 100 AND t1.c2 = 0; -- BoolExpr + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------------- + Foreign Scan on ft1 t1 + Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE (("C 1" OPERATOR(pg_catalog.=) 100)) AND ((c2 OPERATOR(pg_catalog.=) 0)) +(2 rows) + +EXPLAIN (COSTS false) SELECT * FROM ft1 t1 WHERE c1 IS NULL; -- NullTest + QUERY PLAN +------------------------------------------------------------------------------------------------- + Foreign Scan on ft1 t1 + Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE (("C 1" IS NULL)) +(2 rows) + +EXPLAIN (COSTS false) SELECT * FROM ft1 t1 WHERE c1 IS NOT NULL; -- NullTest + QUERY PLAN +----------------------------------------------------------------------------------------------------- + Foreign Scan on ft1 t1 + Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE (("C 1" IS NOT NULL)) +(2 rows) + +EXPLAIN (COSTS false) SELECT * FROM ft1 t1 WHERE round(abs(c1), 0) = 1; -- FuncExpr + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------- + Foreign Scan on ft1 t1 + Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE ((pg_catalog.round(pg_catalog.abs("C 1"), 0) OPERATOR(pg_catalog.=) 1::numeric)) +(2 rows) + +EXPLAIN (COSTS false) SELECT * FROM ft1 t1 WHERE c1 = -c1; -- OpExpr(l) + QUERY PLAN +----------------------------------------------------------------------------------------------------------------------------------------------- + Foreign Scan on ft1 t1 + Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE (("C 1" OPERATOR(pg_catalog.=) (OPERATOR(pg_catalog.-) "C 1"))) +(2 rows) + +EXPLAIN (COSTS false) SELECT * FROM ft1 t1 WHERE 1 = c1!; -- OpExpr(r) + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------- + Foreign Scan on ft1 t1 + Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE ((1::numeric OPERATOR(pg_catalog.=) ("C 1" OPERATOR(pg_catalog.!)))) +(2 rows) + +EXPLAIN (COSTS false) SELECT * FROM ft1 t1 WHERE (c1 IS NOT NULL) IS DISTINCT FROM (c1 IS NOT NULL); -- DistinctExpr + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------ + Foreign Scan on ft1 t1 + Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE (("C 1" IS NOT NULL) IS DISTINCT FROM ("C 1" IS NOT NULL)) +(2 rows) + +EXPLAIN (COSTS false) SELECT * FROM ft1 t1 WHERE c1 = ANY(ARRAY[c2, 1, c1 + 0]); -- ScalarArrayOpExpr + QUERY PLAN +--------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Foreign Scan on ft1 t1 + Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE (("C 1" OPERATOR(pg_catalog.=) ANY (ARRAY[c2, 1, ("C 1" OPERATOR(pg_catalog.+) 0)]))) +(2 rows) + +EXPLAIN (COSTS false) SELECT * FROM ft1 t1 WHERE c1 = (ARRAY[c1,c2,3])[1]; -- ArrayRef + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------- + Foreign Scan on ft1 t1 + Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE (("C 1" OPERATOR(pg_catalog.=) ((ARRAY["C 1", c2, 3])[1]))) +(2 rows) + +EXPLAIN (COSTS false) SELECT * FROM ft1 t1 WHERE c8 = 'foo'; -- no push-down + QUERY PLAN +------------------------------------------------------------------------- + Foreign Scan on ft1 t1 + Filter: (c8 = 'foo'::user_enum) + Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" +(3 rows) + +-- =================================================================== +-- parameterized queries +-- =================================================================== +-- simple join +PREPARE st1(int, int) AS SELECT t1.c3, t2.c3 FROM ft1 t1, ft2 t2 WHERE t1.c1 = $1 AND t2.c1 = $2; +EXPLAIN (COSTS false) EXECUTE st1(1, 2); + QUERY PLAN +----------------------------------------------------------------------------------------------------------------------------------- + Nested Loop + -> Foreign Scan on ft1 t1 + Remote SQL: SELECT NULL, NULL, c3, NULL, NULL, NULL, NULL, NULL FROM "S 1"."T 1" WHERE (("C 1" OPERATOR(pg_catalog.=) 1)) + -> Foreign Scan on ft2 t2 + Remote SQL: SELECT NULL, NULL, c3, NULL, NULL, NULL, NULL, NULL FROM "S 1"."T 1" WHERE (("C 1" OPERATOR(pg_catalog.=) 2)) +(5 rows) + +EXECUTE st1(1, 1); + c3 | c3 +-------+------- + 00001 | 00001 +(1 row) + +EXECUTE st1(101, 101); + c3 | c3 +-------+------- + 00101 | 00101 +(1 row) + +-- subquery using stable function (can't be pushed down) +PREPARE st2(int) AS SELECT * FROM ft1 t1 WHERE t1.c1 < $2 AND t1.c3 IN (SELECT c3 FROM ft2 t2 WHERE c1 > $1 AND EXTRACT(dow FROM c4) = 6) ORDER BY c1; +EXPLAIN (COSTS false) EXECUTE st2(10, 20); + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------- + Sort + Sort Key: t1.c1 + -> Nested Loop Semi Join + Join Filter: (t1.c3 = t2.c3) + -> Foreign Scan on ft1 t1 + Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE (("C 1" OPERATOR(pg_catalog.<) 20)) + -> Materialize + -> Foreign Scan on ft2 t2 + Filter: (date_part('dow'::text, c4) = 6::double precision) + Remote SQL: SELECT NULL, NULL, c3, c4, NULL, NULL, NULL, NULL FROM "S 1"."T 1" WHERE (("C 1" OPERATOR(pg_catalog.>) 10)) +(10 rows) + +EXECUTE st2(10, 20); + c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 +----+----+-------+------------------------------+--------------------------+----+------------+----- + 16 | 6 | 00016 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo +(1 row) + +EXECUTE st1(101, 101); + c3 | c3 +-------+------- + 00101 | 00101 +(1 row) + +-- subquery using immutable function (can be pushed down) +PREPARE st3(int) AS SELECT * FROM ft1 t1 WHERE t1.c1 < $2 AND t1.c3 IN (SELECT c3 FROM ft2 t2 WHERE c1 > $1 AND EXTRACT(dow FROM c5) = 6) ORDER BY c1; +EXPLAIN (COSTS false) EXECUTE st3(10, 20); + QUERY PLAN +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Sort + Sort Key: t1.c1 + -> Nested Loop Semi Join + Join Filter: (t1.c3 = t2.c3) + -> Foreign Scan on ft1 t1 + Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE (("C 1" OPERATOR(pg_catalog.<) 20)) + -> Materialize + -> Foreign Scan on ft2 t2 + Remote SQL: SELECT NULL, NULL, c3, NULL, NULL, NULL, NULL, NULL FROM "S 1"."T 1" WHERE (("C 1" OPERATOR(pg_catalog.>) 10)) AND ((pg_catalog.date_part('dow'::text, c5) OPERATOR(pg_catalog.=) 6::double precision)) +(9 rows) + +EXECUTE st3(10, 20); + c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 +----+----+-------+------------------------------+--------------------------+----+------------+----- + 16 | 6 | 00016 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo +(1 row) + +EXECUTE st3(20, 30); + c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 +----+----+-------+------------------------------+--------------------------+----+------------+----- + 23 | 3 | 00023 | Sat Jan 24 00:00:00 1970 PST | Sat Jan 24 00:00:00 1970 | 3 | 3 | foo +(1 row) + +-- custom plan should be chosen +PREPARE st4(int) AS SELECT * FROM ft1 t1 WHERE t1.c1 = $1; +EXPLAIN (COSTS false) EXECUTE st4(1); + QUERY PLAN +------------------------------------------------------------------------------------------------------------------ + Foreign Scan on ft1 t1 + Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE (("C 1" OPERATOR(pg_catalog.=) 1)) +(2 rows) + +EXPLAIN (COSTS false) EXECUTE st4(1); + QUERY PLAN +------------------------------------------------------------------------------------------------------------------ + Foreign Scan on ft1 t1 + Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE (("C 1" OPERATOR(pg_catalog.=) 1)) +(2 rows) + +EXPLAIN (COSTS false) EXECUTE st4(1); + QUERY PLAN +------------------------------------------------------------------------------------------------------------------ + Foreign Scan on ft1 t1 + Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE (("C 1" OPERATOR(pg_catalog.=) 1)) +(2 rows) + +EXPLAIN (COSTS false) EXECUTE st4(1); + QUERY PLAN +------------------------------------------------------------------------------------------------------------------ + Foreign Scan on ft1 t1 + Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE (("C 1" OPERATOR(pg_catalog.=) 1)) +(2 rows) + +EXPLAIN (COSTS false) EXECUTE st4(1); + QUERY PLAN +------------------------------------------------------------------------------------------------------------------ + Foreign Scan on ft1 t1 + Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE (("C 1" OPERATOR(pg_catalog.=) 1)) +(2 rows) + +EXPLAIN (COSTS false) EXECUTE st4(1); + QUERY PLAN +------------------------------------------------------------------------------------------------------------------- + Foreign Scan on ft1 t1 + Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE (("C 1" OPERATOR(pg_catalog.=) $1)) +(2 rows) + +-- cleanup +DEALLOCATE st1; +DEALLOCATE st2; +DEALLOCATE st3; +DEALLOCATE st4; +-- =================================================================== +-- used in pl/pgsql function +-- =================================================================== +CREATE OR REPLACE FUNCTION f_test(p_c1 int) RETURNS int AS $$ +DECLARE + v_c1 int; +BEGIN + SELECT c1 INTO v_c1 FROM ft1 WHERE c1 = p_c1 LIMIT 1; + PERFORM c1 FROM ft1 WHERE c1 = p_c1 AND p_c1 = v_c1 LIMIT 1; + RETURN v_c1; +END; +$$ LANGUAGE plpgsql; +SELECT f_test(100); + f_test +-------- + 100 +(1 row) + +DROP FUNCTION f_test(int); +-- =================================================================== +-- connection management +-- =================================================================== +SELECT srvname, usename FROM postgresql_fdw_connections; + srvname | usename +-----------+--------------------- + loopback2 | postgresql_fdw_user +(1 row) + +SELECT postgresql_fdw_disconnect(srvid, usesysid) FROM postgresql_fdw_get_connections(); + postgresql_fdw_disconnect +--------------------------- + OK +(1 row) + +SELECT srvname, usename FROM postgresql_fdw_connections; + srvname | usename +---------+--------- +(0 rows) + +-- =================================================================== +-- conversion error +-- =================================================================== +ALTER FOREIGN TABLE ft1 ALTER COLUMN c5 TYPE int; +SELECT * FROM ft1 WHERE c1 = 1; -- ERROR +ERROR: invalid input syntax for integer: "1970-01-02 00:00:00" +CONTEXT: column c5 of foreign table ft1 +ALTER FOREIGN TABLE ft1 ALTER COLUMN c5 TYPE timestamp; +-- =================================================================== +-- subtransaction +-- + local/remote error doesn't break cursor +-- + remote error discards connection +-- =================================================================== +BEGIN; +DECLARE c CURSOR FOR SELECT * FROM ft1 ORDER BY c1; +FETCH c; + c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 +----+----+-------+------------------------------+--------------------------+----+------------+----- + 1 | 1 | 00001 | Fri Jan 02 00:00:00 1970 PST | Fri Jan 02 00:00:00 1970 | 1 | 1 | foo +(1 row) + +SAVEPOINT s; +ERROR OUT; -- ERROR +ERROR: syntax error at or near "ERROR" +LINE 1: ERROR OUT; + ^ +ROLLBACK TO s; +SELECT srvname FROM postgresql_fdw_connections; + srvname +----------- + loopback2 +(1 row) + +FETCH c; + c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 +----+----+-------+------------------------------+--------------------------+----+------------+----- + 2 | 2 | 00002 | Sat Jan 03 00:00:00 1970 PST | Sat Jan 03 00:00:00 1970 | 2 | 2 | foo +(1 row) + +SAVEPOINT s; +SELECT * FROM ft1 WHERE 1 / (c1 - 1) > 0; -- ERROR +ERROR: could not execute remote query +DETAIL: ERROR: division by zero + +HINT: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE (((1 OPERATOR(pg_catalog./) ("C 1" OPERATOR(pg_catalog.-) 1)) OPERATOR(pg_catalog.>) 0)) +ROLLBACK TO s; +SELECT srvname FROM postgresql_fdw_connections; + srvname +--------- +(0 rows) + +FETCH c; + c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 +----+----+-------+------------------------------+--------------------------+----+------------+----- + 3 | 3 | 00003 | Sun Jan 04 00:00:00 1970 PST | Sun Jan 04 00:00:00 1970 | 3 | 3 | foo +(1 row) + +SELECT * FROM ft1 ORDER BY c1 LIMIT 1; + c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 +----+----+-------+------------------------------+--------------------------+----+------------+----- + 1 | 1 | 00001 | Fri Jan 02 00:00:00 1970 PST | Fri Jan 02 00:00:00 1970 | 1 | 1 | foo +(1 row) + +COMMIT; +SELECT srvname FROM postgresql_fdw_connections; + srvname +----------- + loopback2 +(1 row) + +ERROR OUT; -- ERROR +ERROR: syntax error at or near "ERROR" +LINE 1: ERROR OUT; + ^ +SELECT srvname FROM postgresql_fdw_connections; + srvname +--------- +(0 rows) + +-- =================================================================== +-- cleanup +-- =================================================================== +DROP OPERATOR === (int, int) CASCADE; +DROP OPERATOR !== (int, int) CASCADE; +DROP FUNCTION postgresql_fdw_abs(int); +DROP SCHEMA "S 1" CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table "S 1"."T 1" +drop cascades to table "S 1"."T 2" +DROP TYPE user_enum CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to foreign table ft1 column c8 +drop cascades to foreign table ft2 column c8 +DROP EXTENSION postgresql_fdw CASCADE; +NOTICE: drop cascades to 6 other objects +DETAIL: drop cascades to server loopback1 +drop cascades to user mapping for public +drop cascades to server loopback2 +drop cascades to user mapping for postgresql_fdw_user +drop cascades to foreign table ft1 +drop cascades to foreign table ft2 +\c +DROP ROLE postgresql_fdw_user; diff --git a/contrib/postgresql_fdw/option.c b/contrib/postgresql_fdw/option.c new file mode 100644 index 0000000..9e1f0e2 --- /dev/null +++ b/contrib/postgresql_fdw/option.c @@ -0,0 +1,222 @@ +/*------------------------------------------------------------------------- + * + * option.c + * FDW option handling + * + * Copyright (c) 2012, PostgreSQL Global Development Group + * + * IDENTIFICATION + * contrib/postgresql_fdw/option.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/reloptions.h" +#include "catalog/pg_foreign_data_wrapper.h" +#include "catalog/pg_foreign_server.h" +#include "catalog/pg_foreign_table.h" +#include "catalog/pg_user_mapping.h" +#include "commands/defrem.h" +#include "fmgr.h" +#include "foreign/foreign.h" +#include "lib/stringinfo.h" +#include "miscadmin.h" + +#include "postgresql_fdw.h" + +/* + * SQL functions + */ +extern Datum postgresql_fdw_validator(PG_FUNCTION_ARGS); +PG_FUNCTION_INFO_V1(postgresql_fdw_validator); + +/* + * Describes the valid options for objects that use this wrapper. + */ +typedef struct PgsqlFdwOption +{ + const char *optname; + Oid optcontext; /* Oid of catalog in which options may appear */ + bool is_libpq_opt; /* true if it's used in libpq */ +} PgsqlFdwOption; + +/* + * Valid options for postgresql_fdw. + */ +static PgsqlFdwOption valid_options[] = { + + /* + * Options for libpq connection. + * Note: This list should be updated along with PQconninfoOptions in + * interfaces/libpq/fe-connect.c, so the order is kept as is. + * + * Some useless libpq connection options are not accepted by postgresql_fdw: + * client_encoding: set to local database encoding automatically + * fallback_application_name: fixed to "postgresql_fdw" + * replication: postgresql_fdw never be replication client + */ + {"authtype", ForeignServerRelationId, true}, + {"service", ForeignServerRelationId, true}, + {"user", UserMappingRelationId, true}, + {"password", UserMappingRelationId, true}, + {"connect_timeout", ForeignServerRelationId, true}, + {"dbname", ForeignServerRelationId, true}, + {"host", ForeignServerRelationId, true}, + {"hostaddr", ForeignServerRelationId, true}, + {"port", ForeignServerRelationId, true}, +#ifdef NOT_USED + {"client_encoding", ForeignServerRelationId, true}, +#endif + {"tty", ForeignServerRelationId, true}, + {"options", ForeignServerRelationId, true}, + {"application_name", ForeignServerRelationId, true}, +#ifdef NOT_USED + {"fallback_application_name", ForeignServerRelationId, true}, +#endif + {"keepalives", ForeignServerRelationId, true}, + {"keepalives_idle", ForeignServerRelationId, true}, + {"keepalives_interval", ForeignServerRelationId, true}, + {"keepalives_count", ForeignServerRelationId, true}, + {"requiressl", ForeignServerRelationId, true}, + {"sslcompression", ForeignServerRelationId, true}, + {"sslmode", ForeignServerRelationId, true}, + {"sslcert", ForeignServerRelationId, true}, + {"sslkey", ForeignServerRelationId, true}, + {"sslrootcert", ForeignServerRelationId, true}, + {"sslcrl", ForeignServerRelationId, true}, + {"requirepeer", ForeignServerRelationId, true}, + {"krbsrvname", ForeignServerRelationId, true}, + {"gsslib", ForeignServerRelationId, true}, +#ifdef NOT_USED + {"replication", ForeignServerRelationId, true}, +#endif + + /* + * Options for translation of object names. + */ + {"nspname", ForeignTableRelationId, false}, + {"relname", ForeignTableRelationId, false}, + {"colname", AttributeRelationId, false}, + + /* Terminating entry --- MUST BE LAST */ + {NULL, InvalidOid, false} +}; + +/* + * Helper functions + */ +static bool is_valid_option(const char *optname, Oid context); + +/* + * Validate the generic options given to a FOREIGN DATA WRAPPER, SERVER, + * USER MAPPING or FOREIGN TABLE that uses postgresql_fdw. + * + * Raise an ERROR if the option or its value is considered invalid. + */ +Datum +postgresql_fdw_validator(PG_FUNCTION_ARGS) +{ + List *options_list = untransformRelOptions(PG_GETARG_DATUM(0)); + Oid catalog = PG_GETARG_OID(1); + ListCell *cell; + + /* + * Check that only options supported by postgresql_fdw, and allowed for the + * current object type, are given. + */ + foreach(cell, options_list) + { + DefElem *def = (DefElem *) lfirst(cell); + + if (!is_valid_option(def->defname, catalog)) + { + PgsqlFdwOption *opt; + StringInfoData buf; + + /* + * Unknown option specified, complain about it. Provide a hint + * with list of valid options for the object. + */ + initStringInfo(&buf); + for (opt = valid_options; opt->optname; opt++) + { + if (catalog == opt->optcontext) + appendStringInfo(&buf, "%s%s", (buf.len > 0) ? ", " : "", + opt->optname); + } + + ereport(ERROR, + (errcode(ERRCODE_FDW_INVALID_OPTION_NAME), + errmsg("invalid option \"%s\"", def->defname), + errhint("Valid options in this context are: %s", + buf.data))); + } + } + + /* + * We don't care option-specific limitation here; they will be validated at + * the execution time. + */ + + PG_RETURN_VOID(); +} + +/* + * Check whether the given option is one of the valid postgresql_fdw options. + * context is the Oid of the catalog holding the object the option is for. + */ +static bool +is_valid_option(const char *optname, Oid context) +{ + PgsqlFdwOption *opt; + + for (opt = valid_options; opt->optname; opt++) + { + if (context == opt->optcontext && strcmp(opt->optname, optname) == 0) + return true; + } + return false; +} + +/* + * Check whether the given option is one of the valid libpq options. + * context is the Oid of the catalog holding the object the option is for. + */ +static bool +is_libpq_option(const char *optname) +{ + PgsqlFdwOption *opt; + + for (opt = valid_options; opt->optname; opt++) + { + if (strcmp(opt->optname, optname) == 0 && opt->is_libpq_opt) + return true; + } + return false; +} + +/* + * Generate key-value arrays which includes only libpq options from the list + * which contains any kind of options. + */ +int +ExtractConnectionOptions(List *defelems, const char **keywords, const char **values) +{ + ListCell *lc; + int i; + + i = 0; + foreach(lc, defelems) + { + DefElem *d = (DefElem *) lfirst(lc); + if (is_libpq_option(d->defname)) + { + keywords[i] = d->defname; + values[i] = defGetString(d); + i++; + } + } + return i; +} + diff --git a/contrib/postgresql_fdw/postgresql_fdw--1.0.sql b/contrib/postgresql_fdw/postgresql_fdw--1.0.sql new file mode 100644 index 0000000..965cb85 --- /dev/null +++ b/contrib/postgresql_fdw/postgresql_fdw--1.0.sql @@ -0,0 +1,39 @@ +/* contrib/postgresql_fdw/postgresql_fdw--1.0.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION postgresql_fdw" to load this file. \quit + +CREATE FUNCTION postgresql_fdw_handler() +RETURNS fdw_handler +AS 'MODULE_PATHNAME' +LANGUAGE C STRICT; + +CREATE FUNCTION postgresql_fdw_validator(text[], oid) +RETURNS void +AS 'MODULE_PATHNAME' +LANGUAGE C STRICT; + +CREATE FOREIGN DATA WRAPPER postgresql_fdw + HANDLER postgresql_fdw_handler + VALIDATOR postgresql_fdw_validator; + +/* connection management functions and view */ +CREATE FUNCTION postgresql_fdw_get_connections(out srvid oid, out usesysid oid) +RETURNS SETOF record +AS 'MODULE_PATHNAME' +LANGUAGE C STRICT; + +CREATE FUNCTION postgresql_fdw_disconnect(oid, oid) +RETURNS text +AS 'MODULE_PATHNAME' +LANGUAGE C STRICT; + +CREATE VIEW postgresql_fdw_connections AS +SELECT c.srvid srvid, + s.srvname srvname, + c.usesysid usesysid, + pg_get_userbyid(c.usesysid) usename + FROM postgresql_fdw_get_connections() c + JOIN pg_catalog.pg_foreign_server s ON (s.oid = c.srvid); +GRANT SELECT ON postgresql_fdw_connections TO public; + diff --git a/contrib/postgresql_fdw/postgresql_fdw.c b/contrib/postgresql_fdw/postgresql_fdw.c new file mode 100644 index 0000000..a023cb8 --- /dev/null +++ b/contrib/postgresql_fdw/postgresql_fdw.c @@ -0,0 +1,1370 @@ +/*------------------------------------------------------------------------- + * + * postgresql_fdw.c + * foreign-data wrapper for remote PostgreSQL servers. + * + * Copyright (c) 2012, PostgreSQL Global Development Group + * + * IDENTIFICATION + * contrib/postgresql_fdw/postgresql_fdw.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" +#include "fmgr.h" + +#include "access/htup_details.h" +#include "catalog/pg_foreign_server.h" +#include "catalog/pg_foreign_table.h" +#include "catalog/pg_type.h" +#include "commands/defrem.h" +#include "commands/explain.h" +#include "commands/vacuum.h" +#include "foreign/fdwapi.h" +#include "funcapi.h" +#include "miscadmin.h" +#include "optimizer/cost.h" +#include "optimizer/pathnode.h" +#include "optimizer/planmain.h" +#include "optimizer/restrictinfo.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/rel.h" + +#include "postgresql_fdw.h" +#include "connection.h" + +PG_MODULE_MAGIC; + +/* + * Cost to establish a connection. + * XXX: should be configurable per server? + */ +#define CONNECTION_COSTS 100.0 + +/* + * Cost to transfer 1 byte from remote server. + * XXX: should be configurable per server? + */ +#define TRANSFER_COSTS_PER_BYTE 0.001 + +/* + * FDW-specific information for RelOptInfo.fdw_private. This is used to pass + * information from pgsqlGetForeignRelSize to pgsqlGetForeignPaths. + */ +typedef struct PgsqlFdwPlanState { + /* + * These are generated in GetForeignRelSize, and also used in subsequent + * GetForeignPaths. + */ + StringInfoData sql; + Cost startup_cost; + Cost total_cost; + List *remote_conds; + List *param_conds; + List *local_conds; + + /* Cached catalog information. */ + ForeignTable *table; + ForeignServer *server; +} PgsqlFdwPlanState; + +/* + * Index of FDW-private information stored in fdw_private list. + * + * We store various information in ForeignScan.fdw_private to pass them beyond + * the boundary between planner and executor. Finally FdwPlan holds items + * below: + * + * 1) plain SELECT statement + * + * These items are indexed with the enum FdwPrivateIndex, so an item + * can be accessed directly via list_nth(). For example of SELECT statement: + * sql = list_nth(fdw_private, FdwPrivateSelectSql) + */ +enum FdwPrivateIndex { + /* SQL statements */ + FdwPrivateSelectSql, + + /* # of elements stored in the list fdw_private */ + FdwPrivateNum, +}; + +/* + * Describe the attribute where data conversion fails. + */ +typedef struct ErrorPos { + Oid relid; /* oid of the foreign table */ + AttrNumber cur_attno; /* attribute number under process */ +} ErrorPos; + +/* + * Describes an execution state of a foreign scan against a foreign table + * using postgresql_fdw. + */ +typedef struct PgsqlFdwExecutionState +{ + List *fdw_private; /* FDW-private information */ + + /* for remote query execution */ + PGconn *conn; /* connection for the scan */ + Oid *param_types; /* type array of external parameter */ + const char **param_values; /* value array of external parameter */ + + /* for tuple generation. */ + AttrNumber attnum; /* # of non-dropped attribute */ + Datum *values; /* column value buffer */ + bool *nulls; /* column null indicator buffer */ + AttInMetadata *attinmeta; /* attribute metadata */ + + /* for storing result tuples */ + MemoryContext scan_cxt; /* context for per-scan lifespan data */ + MemoryContext temp_cxt; /* context for per-tuple temporary data */ + Tuplestorestate *tuples; /* result of the scan */ + + /* for error handling. */ + ErrorPos errpos; +} PgsqlFdwExecutionState; + +/* + * Describes a state of analyze request for a foreign table. + */ +typedef struct PgsqlAnalyzeState +{ + /* for tuple generation. */ + TupleDesc tupdesc; + AttInMetadata *attinmeta; + Datum *values; + bool *nulls; + + /* for random sampling */ + HeapTuple *rows; /* result buffer */ + int targrows; /* target # of sample rows */ + int numrows; /* # of samples collected */ + double samplerows; /* # of rows fetched */ + double rowstoskip; /* # of rows skipped before next sample */ + double rstate; /* random state */ + + /* for storing result tuples */ + MemoryContext anl_cxt; /* context for per-analyze lifespan data */ + MemoryContext temp_cxt; /* context for per-tuple temporary data */ + + /* for error handling. */ + ErrorPos errpos; +} PgsqlAnalyzeState; + +/* + * SQL functions + */ +extern Datum postgresql_fdw_handler(PG_FUNCTION_ARGS); +PG_FUNCTION_INFO_V1(postgresql_fdw_handler); + +/* + * FDW callback routines + */ +static void pgsqlGetForeignRelSize(PlannerInfo *root, + RelOptInfo *baserel, + Oid foreigntableid); +static void pgsqlGetForeignPaths(PlannerInfo *root, + RelOptInfo *baserel, + Oid foreigntableid); +static ForeignScan *pgsqlGetForeignPlan(PlannerInfo *root, + RelOptInfo *baserel, + Oid foreigntableid, + ForeignPath *best_path, + List *tlist, + List *scan_clauses); +static void pgsqlExplainForeignScan(ForeignScanState *node, ExplainState *es); +static void pgsqlBeginForeignScan(ForeignScanState *node, int eflags); +static TupleTableSlot *pgsqlIterateForeignScan(ForeignScanState *node); +static void pgsqlReScanForeignScan(ForeignScanState *node); +static void pgsqlEndForeignScan(ForeignScanState *node); +static bool pgsqlAnalyzeForeignTable(Relation relation, + AcquireSampleRowsFunc *func, + BlockNumber *totalpages); + +/* + * Helper functions + */ +static void get_remote_estimate(const char *sql, + PGconn *conn, + double *rows, + int *width, + Cost *startup_cost, + Cost *total_cost); +static void adjust_costs(double rows, int width, + Cost *startup_cost, Cost *total_cost); +static void execute_query(ForeignScanState *node); +static void query_row_processor(PGresult *res, ForeignScanState *node, + bool first); +static void analyze_row_processor(PGresult *res, PgsqlAnalyzeState *astate, + bool first); +static void postgresql_fdw_error_callback(void *arg); +static int pgsqlAcquireSampleRowsFunc(Relation relation, int elevel, + HeapTuple *rows, int targrows, + double *totalrows, + double *totaldeadrows); + +/* Exported functions, but not written in postgresql_fdw.h. */ +void _PG_init(void); +void _PG_fini(void); + +/* + * Module-specific initialization. + */ +void +_PG_init(void) +{ +} + +/* + * Module-specific clean up. + */ +void +_PG_fini(void) +{ +} + +/* + * Foreign-data wrapper handler function: return a struct with pointers + * to my callback routines. + */ +Datum +postgresql_fdw_handler(PG_FUNCTION_ARGS) +{ + FdwRoutine *routine = makeNode(FdwRoutine); + + /* Required handler functions. */ + routine->GetForeignRelSize = pgsqlGetForeignRelSize; + routine->GetForeignPaths = pgsqlGetForeignPaths; + routine->GetForeignPlan = pgsqlGetForeignPlan; + routine->ExplainForeignScan = pgsqlExplainForeignScan; + routine->BeginForeignScan = pgsqlBeginForeignScan; + routine->IterateForeignScan = pgsqlIterateForeignScan; + routine->ReScanForeignScan = pgsqlReScanForeignScan; + routine->EndForeignScan = pgsqlEndForeignScan; + + /* Optional handler functions. */ + routine->AnalyzeForeignTable = pgsqlAnalyzeForeignTable; + + PG_RETURN_POINTER(routine); +} + +/* + * pgsqlGetForeignRelSize + * Estimate # of rows and width of the result of the scan + * + * Here we estimate number of rows returned by the scan in two steps. In the + * first step, we execute remote EXPLAIN command to obtain the number of rows + * returned from remote side. In the second step, we calculate the selectivity + * of the filtering done on local side, and modify first estimate. + * + * We have to get some catalog objects and generate remote query string here, + * so we store such expensive information in FDW private area of RelOptInfo and + * pass them to subsequent functions for reuse. + */ +static void +pgsqlGetForeignRelSize(PlannerInfo *root, + RelOptInfo *baserel, + Oid foreigntableid) +{ + PgsqlFdwPlanState *fpstate; + StringInfo sql; + ForeignTable *table; + ForeignServer *server; + UserMapping *user; + PGconn *conn; + double rows; + int width; + Cost startup_cost; + Cost total_cost; + List *remote_conds = NIL; + List *param_conds = NIL; + List *local_conds = NIL; + Selectivity sel; + + /* + * We use PgsqlFdwPlanState to pass various information to subsequent + * functions. + */ + fpstate = palloc0(sizeof(PgsqlFdwPlanState)); + initStringInfo(&fpstate->sql); + sql = &fpstate->sql; + + /* Retrieve catalog objects which are necessary to estimate rows. */ + table = GetForeignTable(foreigntableid); + server = GetForeignServer(table->serverid); + user = GetUserMapping(GetOuterUserId(), server->serverid); + + /* + * Construct remote query which consists of SELECT, FROM, and WHERE + * clauses, but conditions contain any Param node are excluded because + * placeholder can't be used in EXPLAIN statement. Such conditions are + * appended later. + */ + classifyConditions(root, baserel, &remote_conds, ¶m_conds, + &local_conds); + deparseSimpleSql(sql, root, baserel, local_conds); + if (list_length(remote_conds) > 0) + appendWhereClause(sql, true, remote_conds, root); + elog(DEBUG3, "Query SQL: %s", sql->data); + + conn = GetConnection(server, user, false); + get_remote_estimate(sql->data, conn, &rows, &width, + &startup_cost, &total_cost); + ReleaseConnection(conn); + if (list_length(param_conds) > 0) + appendWhereClause(sql, !(list_length(remote_conds) > 0), param_conds, + root); + + /* + * Estimate selectivity of conditions which are not used in remote EXPLAIN + * by calling clauselist_selectivity(). The best we can do for + * parameterized condition is to estimate selectivity on the basis of local + * statistics. When we actually obtain result rows, such conditions are + * deparsed into remote query and reduce rows transferred. + */ + sel = 1.0; + sel *= clauselist_selectivity(root, param_conds, + baserel->relid, JOIN_INNER, NULL); + sel *= clauselist_selectivity(root, local_conds, + baserel->relid, JOIN_INNER, NULL); + baserel->rows = rows * sel; + baserel->width = width; + + /* + * Pack obtained information into a object and store it in FDW-private area + * of RelOptInfo to pass them to subsequent functions. + */ + fpstate->startup_cost = startup_cost; + fpstate->total_cost = total_cost; + fpstate->remote_conds = remote_conds; + fpstate->param_conds = param_conds; + fpstate->local_conds = local_conds; + fpstate->table = table; + fpstate->server = server; + baserel->fdw_private = (void *) fpstate; +} + +/* + * pgsqlGetForeignPaths + * Create possible scan paths for a scan on the foreign table + */ +static void +pgsqlGetForeignPaths(PlannerInfo *root, + RelOptInfo *baserel, + Oid foreigntableid) +{ + PgsqlFdwPlanState *fpstate = (PgsqlFdwPlanState *) baserel->fdw_private; + ForeignPath *path; + Cost startup_cost; + Cost total_cost; + List *fdw_private; + + /* + * We have cost values which are estimated on remote side, so adjust them + * for better estimate which respect various stuffs to complete the scan, + * such as sending query, transferring result, and local filtering. + * + * XXX We assume that remote cost factors are same as local, but it might + * be worth to make configurable. + */ + startup_cost = fpstate->startup_cost; + total_cost = fpstate->total_cost; + adjust_costs(baserel->rows, baserel->width, &startup_cost, &total_cost); + + /* Pass SQL statement from planner to executor through FDW private area. */ + fdw_private = list_make1(makeString(fpstate->sql.data)); + + /* + * Create simplest ForeignScan path node and add it to baserel. This path + * corresponds to SeqScan path of regular tables. + */ + path = create_foreignscan_path(root, baserel, + baserel->rows, + startup_cost, + total_cost, + NIL, /* no pathkeys */ + NULL, /* no outer rel either */ + fdw_private); + add_path(baserel, (Path *) path); + + /* + * XXX We can consider sorted path or parameterized path here if we know + * that foreign table is indexed on remote end. For this purpose, we + * might have to support FOREIGN INDEX to represent possible sets of sort + * keys and/or filtering. + */ +} + +/* + * pgsqlGetForeignPlan + * Create ForeignScan plan node which implements selected best path + */ +static ForeignScan * +pgsqlGetForeignPlan(PlannerInfo *root, + RelOptInfo *baserel, + Oid foreigntableid, + ForeignPath *best_path, + List *tlist, + List *scan_clauses) +{ + PgsqlFdwPlanState *fpstate = (PgsqlFdwPlanState *) baserel->fdw_private; + Index scan_relid = baserel->relid; + List *fdw_private = NIL; + List *fdw_exprs = NIL; + List *local_exprs = NIL; + ListCell *lc; + + /* + * We need lists of Expr other than the lists of RestrictInfo. Now we can + * merge remote_conds and param_conds into fdw_exprs, because they are + * evaluated on remote side for actual remote query. + */ + foreach(lc, fpstate->remote_conds) + fdw_exprs = lappend(fdw_exprs, ((RestrictInfo *) lfirst(lc))->clause); + foreach(lc, fpstate->param_conds) + fdw_exprs = lappend(fdw_exprs, ((RestrictInfo *) lfirst(lc))->clause); + foreach(lc, fpstate->local_conds) + local_exprs = lappend(local_exprs, + ((RestrictInfo *) lfirst(lc))->clause); + + /* + * Make a list contains SELECT statement to it to executor with plan node + * for later use. + */ + fdw_private = lappend(fdw_private, makeString(fpstate->sql.data)); + + /* + * Create the ForeignScan node from target list, local filtering + * expressions, remote filtering expressions, and FDW private information. + * + * We remove expressions which are evaluated on remote side from qual of + * the scan node to avoid redundant filtering. Such filter reduction + * can be done only here, done after choosing best path, because + * baserestrictinfo in RelOptInfo is shared by all possible paths until + * best path is chosen. + */ + return make_foreignscan(tlist, + local_exprs, + scan_relid, + fdw_exprs, + fdw_private); +} + +/* + * pgsqlExplainForeignScan + * Produce extra output for EXPLAIN + */ +static void +pgsqlExplainForeignScan(ForeignScanState *node, ExplainState *es) +{ + List *fdw_private; + char *sql; + + fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private; + sql = strVal(list_nth(fdw_private, FdwPrivateSelectSql)); + ExplainPropertyText("Remote SQL", sql, es); +} + +/* + * pgsqlBeginForeignScan + * Initiate access to a foreign PostgreSQL table. + */ +static void +pgsqlBeginForeignScan(ForeignScanState *node, int eflags) +{ + PgsqlFdwExecutionState *festate; + PGconn *conn; + Oid relid; + ForeignTable *table; + ForeignServer *server; + UserMapping *user; + TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; + + /* + * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL. + */ + if (eflags & EXEC_FLAG_EXPLAIN_ONLY) + return; + + /* + * Save state in node->fdw_state. + */ + festate = (PgsqlFdwExecutionState *) palloc(sizeof(PgsqlFdwExecutionState)); + festate->fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private; + + /* + * Create contexts for per-scan tuplestore under per-query context. + */ + festate->scan_cxt = AllocSetContextCreate(node->ss.ps.state->es_query_cxt, + "postgresql_fdw per-scan data", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + festate->temp_cxt = AllocSetContextCreate(node->ss.ps.state->es_query_cxt, + "postgresql_fdw temporary data", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + /* + * Get connection to the foreign server. Connection manager would + * establish new connection if necessary. + */ + relid = RelationGetRelid(node->ss.ss_currentRelation); + table = GetForeignTable(relid); + server = GetForeignServer(table->serverid); + user = GetUserMapping(GetOuterUserId(), server->serverid); + conn = GetConnection(server, user, true); + festate->conn = conn; + + /* Result will be filled in first Iterate call. */ + festate->tuples = NULL; + + /* Allocate buffers for column values. */ + { + TupleDesc tupdesc = slot->tts_tupleDescriptor; + festate->values = palloc(sizeof(Datum) * tupdesc->natts); + festate->nulls = palloc(sizeof(bool) * tupdesc->natts); + festate->attinmeta = TupleDescGetAttInMetadata(tupdesc); + } + + /* + * Allocate buffers for query parameters. + * + * ParamListInfo might include entries for pseudo-parameter such as + * PL/pgSQL's FOUND variable, but we don't care that here, because wasted + * area seems not so large. + */ + { + ParamListInfo params = node->ss.ps.state->es_param_list_info; + int numParams = params ? params->numParams : 0; + + if (numParams > 0) + { + festate->param_types = palloc0(sizeof(Oid) * numParams); + festate->param_values = palloc0(sizeof(char *) * numParams); + } + else + { + festate->param_types = NULL; + festate->param_values = NULL; + } + } + + /* Remember which foreign table we are scanning. */ + festate->errpos.relid = relid; + + /* Store FDW-specific state into ForeignScanState */ + node->fdw_state = (void *) festate; + + return; +} + +/* + * pgsqlIterateForeignScan + * Retrieve next row from the result set, or clear tuple slot to indicate + * EOF. + * + * Note that using per-query context when retrieving tuples from + * tuplestore to ensure that returned tuples can survive until next + * iteration because the tuple is released implicitly via ExecClearTuple. + * Retrieving a tuple from tuplestore in CurrentMemoryContext (it's a + * per-tuple context), ExecClearTuple will free dangling pointer. + */ +static TupleTableSlot * +pgsqlIterateForeignScan(ForeignScanState *node) +{ + PgsqlFdwExecutionState *festate; + TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; + MemoryContext oldcontext = CurrentMemoryContext; + + festate = (PgsqlFdwExecutionState *) node->fdw_state; + + /* + * If this is the first call after Begin or ReScan, we need to execute + * remote query and get result set. + */ + if (festate->tuples == NULL) + execute_query(node); + + /* + * If tuples are still left in tuplestore, just return next tuple from it. + * + * It is necessary to switch to per-scan context to make returned tuple + * valid until next IterateForeignScan call, because it will be released + * with ExecClearTuple then. Otherwise, picked tuple is allocated in + * per-tuple context, and double-free of that tuple might happen. + * + * If we don't have any result in tuplestore, clear result slot to tell + * executor that this scan is over. + */ + MemoryContextSwitchTo(festate->scan_cxt); + tuplestore_gettupleslot(festate->tuples, true, false, slot); + MemoryContextSwitchTo(oldcontext); + + return slot; +} + +/* + * pgsqlReScanForeignScan + * - Restart this scan by clearing old results and set re-execute flag. + */ +static void +pgsqlReScanForeignScan(ForeignScanState *node) +{ + PgsqlFdwExecutionState *festate; + + festate = (PgsqlFdwExecutionState *) node->fdw_state; + + /* If we haven't have valid result yet, nothing to do. */ + if (festate->tuples == NULL) + return; + + /* + * Only rewind the current result set is enough. + */ + tuplestore_rescan(festate->tuples); +} + +/* + * pgsqlEndForeignScan + * Finish scanning foreign table and dispose objects used for this scan + */ +static void +pgsqlEndForeignScan(ForeignScanState *node) +{ + PgsqlFdwExecutionState *festate; + + festate = (PgsqlFdwExecutionState *) node->fdw_state; + + /* if festate is NULL, we are in EXPLAIN; nothing to do */ + if (festate == NULL) + return; + + /* + * The connection which was used for this scan should be valid until the + * end of the scan to make the lifespan of remote transaction same as the + * local query. + */ + ReleaseConnection(festate->conn); + festate->conn = NULL; + + /* Discard fetch results */ + if (festate->tuples != NULL) + { + tuplestore_end(festate->tuples); + festate->tuples = NULL; + } + + /* MemoryContext will be deleted automatically. */ +} + +/* + * Estimate costs of executing given SQL statement. + */ +static void +get_remote_estimate(const char *sql, PGconn *conn, + double *rows, int *width, + Cost *startup_cost, Cost *total_cost) +{ + PGresult *volatile res = NULL; + StringInfoData buf; + char *plan; + char *p; + int n; + + /* + * Construct EXPLAIN statement with given SQL statement. + */ + initStringInfo(&buf); + appendStringInfo(&buf, "EXPLAIN %s", sql); + + /* PGresult must be released before leaving this function. */ + PG_TRY(); + { + res = PQexec(conn, buf.data); + if (PQresultStatus(res) != PGRES_TUPLES_OK || PQntuples(res) == 0) + ereport(ERROR, + (errmsg("could not execute EXPLAIN for cost estimation"), + errdetail("%s", PQerrorMessage(conn)), + errhint("%s", sql))); + + /* + * Find estimation portion from top plan node. Here we search opening + * parentheses from the end of the line to avoid finding unexpected + * parentheses. + */ + plan = PQgetvalue(res, 0, 0); + p = strrchr(plan, '('); + if (p == NULL) + elog(ERROR, "wrong EXPLAIN output: %s", plan); + n = sscanf(p, + "(cost=%lf..%lf rows=%lf width=%d)", + startup_cost, total_cost, rows, width); + if (n != 4) + elog(ERROR, "could not get estimation from EXPLAIN output"); + + PQclear(res); + res = NULL; + } + PG_CATCH(); + { + PQclear(res); + + /* Release connection and let connection manager cleanup. */ + ReleaseConnection(conn); + + PG_RE_THROW(); + } + PG_END_TRY(); +} + +/* + * Adjust costs estimated on remote end with some overheads such as connection + * and data transfer. + */ +static void +adjust_costs(double rows, int width, Cost *startup_cost, Cost *total_cost) +{ + /* + * TODO Selectivity of quals which are NOT pushed down should be also + * considered. + */ + + /* add cost to establish connection. */ + *startup_cost += CONNECTION_COSTS; + *total_cost += CONNECTION_COSTS; + + /* add cost to transfer result. */ + *total_cost += TRANSFER_COSTS_PER_BYTE * width * rows; + *total_cost += cpu_tuple_cost * rows; +} + +/* + * Execute remote query with current parameters. + */ +static void +execute_query(ForeignScanState *node) +{ + PgsqlFdwExecutionState *festate; + ParamListInfo params = node->ss.ps.state->es_param_list_info; + int numParams = params ? params->numParams : 0; + Oid *types = NULL; + const char **values = NULL; + char *sql; + PGconn *conn; + PGresult *volatile res = NULL; + + festate = (PgsqlFdwExecutionState *) node->fdw_state; + types = festate->param_types; + values = festate->param_values; + + /* + * Construct parameter array in text format. We don't release memory for + * the arrays explicitly, because the memory usage would not be very large, + * and anyway they will be released in context cleanup. + * + * If this query is invoked from pl/pgsql function, we have extra entry + * for dummy variable FOUND in ParamListInfo, so we need to check type oid + * to exclude it from remote parameters. + */ + if (numParams > 0) + { + int i; + + for (i = 0; i < numParams; i++) + { + ParamExternData *prm = ¶ms->params[i]; + + /* give hook a chance in case parameter is dynamic */ + if (!OidIsValid(prm->ptype) && params->paramFetch != NULL) + params->paramFetch(params, i + 1); + + /* + * Get string representation of each parameter value by invoking + * type-specific output function unless the value is null or it's + * not used in the query. + */ + types[i] = prm->ptype; + if (!prm->isnull && OidIsValid(types[i])) + { + Oid out_func_oid; + bool isvarlena; + FmgrInfo func; + + getTypeOutputInfo(types[i], &out_func_oid, &isvarlena); + fmgr_info(out_func_oid, &func); + values[i] = OutputFunctionCall(&func, prm->value); + } + else + values[i] = NULL; + + /* + * We use type "text" (groundless but seems most flexible) for + * unused (and type-unknown) parameters. We can't remove entry for + * unused parameter from the arrays, because parameter references + * in remote query ($n) have been indexed based on full length + * parameter list. + */ + if (!OidIsValid(types[i])) + types[i] = TEXTOID; + } + } + + conn = festate->conn; + + /* PGresult must be released before leaving this function. */ + PG_TRY(); + { + bool first = true; + + /* + * Execute remote query with parameters, and retrieve results with + * single-row-mode which returns results row by row. + */ + sql = strVal(list_nth(festate->fdw_private, FdwPrivateSelectSql)); + if (!PQsendQueryParams(conn, sql, numParams, types, values, NULL, NULL, + 0)) + ereport(ERROR, + (errmsg("could not execute remote query"), + errdetail("%s", PQerrorMessage(conn)), + errhint("%s", sql))); + if (!PQsetSingleRowMode(conn)) + ereport(ERROR, + (errmsg("could not set single-row mode"), + errdetail("%s", PQerrorMessage(conn)), + errhint("%s", sql))); + + /* Retrieve result rows one by one, and store them into tuplestore. */ + for (;;) + { + /* Allow users to cancel long query */ + CHECK_FOR_INTERRUPTS(); + + res = PQgetResult(conn); + if (res == NULL) + break; + + /* Store the result row into tuplestore */ + if (PQresultStatus(res) == PGRES_SINGLE_TUPLE) + { + query_row_processor(res, node, first); + PQclear(res); + res = NULL; + first = false; + } + else if (PQresultStatus(res) == PGRES_TUPLES_OK) + { + /* + * PGresult with PGRES_TUPLES_OK means EOF, so we need to + * initialze tuplestore if we have not retrieved any tuple. + */ + if (first) + query_row_processor(res, node, first); + PQclear(res); + res = NULL; + first = true; + } + else + { + /* Something wrong happend, report the error. */ + ereport(ERROR, + (errmsg("could not execute remote query"), + errdetail("%s", PQerrorMessage(conn)), + errhint("%s", sql))); + } + } + + /* + * We can't know whether the scan is over or not in custom row + * processor, so mark that the result is valid here. + */ + tuplestore_donestoring(festate->tuples); + + /* Discard result of SELECT statement. */ + PQclear(res); + res = NULL; + } + PG_CATCH(); + { + PQclear(res); + + /* Release connection and let connection manager cleanup. */ + ReleaseConnection(conn); + + /* propagate error */ + PG_RE_THROW(); + } + PG_END_TRY(); +} + +/* + * Create tuples from PGresult and store them into tuplestore. + * + * Caller must use PG_TRY block to catch exception and release PGresult + * surely. + */ +static void +query_row_processor(PGresult *res, ForeignScanState *node, bool first) +{ + int i; + int j; + int attnum; /* number of non-dropped columns */ + TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; + TupleDesc tupdesc = slot->tts_tupleDescriptor; + Form_pg_attribute *attrs = tupdesc->attrs; + PgsqlFdwExecutionState *festate = (PgsqlFdwExecutionState *) node->fdw_state; + AttInMetadata *attinmeta = festate->attinmeta; + HeapTuple tuple; + ErrorContextCallback errcontext; + MemoryContext oldcontext; + + if (first) + { + int nfields = PQnfields(res); + + /* count non-dropped columns */ + for (attnum = 0, i = 0; i < tupdesc->natts; i++) + if (!attrs[i]->attisdropped) + attnum++; + + /* check result and tuple descriptor have the same number of columns */ + if (attnum > 0 && attnum != nfields) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query result rowtype does not match " + "the specified FROM clause rowtype"), + errdetail("expected %d, actual %d", attnum, nfields))); + + /* First, ensure that the tuplestore is empty. */ + if (festate->tuples == NULL) + { + + /* + * Create tuplestore to store result of the query in per-query + * context. Note that we use this memory context to avoid memory + * leak in error cases. + */ + oldcontext = MemoryContextSwitchTo(festate->scan_cxt); + festate->tuples = tuplestore_begin_heap(false, false, work_mem); + MemoryContextSwitchTo(oldcontext); + } + else + { + /* Clear old result just in case. */ + tuplestore_clear(festate->tuples); + } + + /* Do nothing for empty result */ + if (PQntuples(res) == 0) + return; + } + + /* Should have a single-row result if we get here */ + Assert(PQntuples(res) == 1); + + /* + * Do the following work in a temp context that we reset after each tuple. + * This cleans up not only the data we have direct access to, but any + * cruft the I/O functions might leak. + */ + oldcontext = MemoryContextSwitchTo(festate->temp_cxt); + + for (i = 0, j = 0; i < tupdesc->natts; i++) + { + /* skip dropped columns. */ + if (attrs[i]->attisdropped) + { + festate->nulls[i] = true; + continue; + } + + /* + * Set NULL indicator, and convert text representation to internal + * representation if any. + */ + if (PQgetisnull(res, 0, j)) + festate->nulls[i] = true; + else + { + Datum value; + + festate->nulls[i] = false; + + /* + * Set up and install callback to report where conversion error + * occurs. + */ + festate->errpos.cur_attno = i + 1; + errcontext.callback = postgresql_fdw_error_callback; + errcontext.arg = (void *) &festate->errpos; + errcontext.previous = error_context_stack; + error_context_stack = &errcontext; + + value = InputFunctionCall(&attinmeta->attinfuncs[i], + PQgetvalue(res, 0, j), + attinmeta->attioparams[i], + attinmeta->atttypmods[i]); + festate->values[i] = value; + + /* Uninstall error context callback. */ + error_context_stack = errcontext.previous; + } + j++; + } + + /* + * Build the tuple and put it into the slot. + * We don't have to free the tuple explicitly because it's been + * allocated in the per-tuple context. + */ + tuple = heap_form_tuple(tupdesc, festate->values, festate->nulls); + tuplestore_puttuple(festate->tuples, tuple); + + /* Clean up */ + MemoryContextSwitchTo(oldcontext); + MemoryContextReset(festate->temp_cxt); + + return; +} + +/* + * Callback function which is called when error occurs during column value + * conversion. Print names of column and relation. + */ +static void +postgresql_fdw_error_callback(void *arg) +{ + ErrorPos *errpos = (ErrorPos *) arg; + const char *relname; + const char *colname; + + relname = get_rel_name(errpos->relid); + colname = get_attname(errpos->relid, errpos->cur_attno); + errcontext("column %s of foreign table %s", + quote_identifier(colname), quote_identifier(relname)); +} + +/* + * pgsqlAnalyzeForeignTable + * Test whether analyzing this foreign table is supported + */ +static bool +pgsqlAnalyzeForeignTable(Relation relation, + AcquireSampleRowsFunc *func, + BlockNumber *totalpages) +{ + *totalpages = 0; + *func = pgsqlAcquireSampleRowsFunc; + + return true; +} + +/* + * Acquire a random sample of rows from foreign table managed by postgresql_fdw. + * + * postgresql_fdw doesn't provide direct access to remote buffer, so we execute + * simple SELECT statement which retrieves whole rows from remote side, and + * pick some samples from them. + */ +static int +pgsqlAcquireSampleRowsFunc(Relation relation, int elevel, + HeapTuple *rows, int targrows, + double *totalrows, + double *totaldeadrows) +{ + PgsqlAnalyzeState astate; + StringInfoData sql; + ForeignTable *table; + ForeignServer *server; + UserMapping *user; + PGconn *conn = NULL; + PGresult *volatile res = NULL; + + /* + * Only few information are necessary as input to row processor. Other + * initialization will be done at the first row processor call. + */ + astate.anl_cxt = CurrentMemoryContext; + astate.temp_cxt = AllocSetContextCreate(CurrentMemoryContext, + "postgresql_fdw analyze temporary data", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + astate.rows = rows; + astate.targrows = targrows; + astate.tupdesc = relation->rd_att; + astate.errpos.relid = relation->rd_id; + + /* + * Construct SELECT statement which retrieves whole rows from remote. We + * can't avoid running sequential scan on remote side to get practical + * statistics, so this seems reasonable compromise. + */ + initStringInfo(&sql); + deparseAnalyzeSql(&sql, relation); + elog(DEBUG3, "Analyze SQL: %s", sql.data); + + table = GetForeignTable(relation->rd_id); + server = GetForeignServer(table->serverid); + user = GetUserMapping(GetOuterUserId(), server->serverid); + conn = GetConnection(server, user, true); + + /* + * Acquire sample rows from the result set. + */ + PG_TRY(); + { + bool first = true; + + /* Execute remote query and retrieve results row by row. */ + if (!PQsendQuery(conn, sql.data)) + ereport(ERROR, + (errmsg("could not execute remote query for analyze"), + errdetail("%s", PQerrorMessage(conn)), + errhint("%s", sql.data))); + if (!PQsetSingleRowMode(conn)) + ereport(ERROR, + (errmsg("could not set single-row mode"), + errdetail("%s", PQerrorMessage(conn)), + errhint("%s", sql.data))); + + /* Retrieve result rows one by one, and store them into tuplestore. */ + for (;;) + { + /* Allow users to cancel long query */ + CHECK_FOR_INTERRUPTS(); + + res = PQgetResult(conn); + if (res == NULL) + break; + + /* Store the result row into tuplestore */ + if (PQresultStatus(res) == PGRES_SINGLE_TUPLE) + { + analyze_row_processor(res, &astate, first); + PQclear(res); + res = NULL; + first = false; + } + else if (PQresultStatus(res) == PGRES_TUPLES_OK) + { + /* + * PGresult with PGRES_TUPLES_OK means EOF, so we need to + * initialze tuplestore if we have not retrieved any tuple. + */ + if (first && PQresultStatus(res) == PGRES_TUPLES_OK) + analyze_row_processor(res, &astate, first); + + PQclear(res); + res = NULL; + first = true; + } + else + { + /* Something wrong happend, report the error. */ + ereport(ERROR, + (errmsg("could not execute remote query for analyze"), + errdetail("%s", PQerrorMessage(conn)), + errhint("%s", sql.data))); + } + } + } + PG_CATCH(); + { + PQclear(res); + + /* Release connection and let connection manager cleanup. */ + ReleaseConnection(conn); + + PG_RE_THROW(); + } + PG_END_TRY(); + + ReleaseConnection(conn); + + /* We assume that we have no dead tuple. */ + *totaldeadrows = 0.0; + + /* We've retrieved all living tuples from foreign server. */ + *totalrows = astate.samplerows; + + /* + * We don't update pg_class.relpages because we don't care that in + * planning at all. + */ + + /* + * Emit some interesting relation info + */ + ereport(elevel, + (errmsg("\"%s\": scanned with \"%s\", " + "containing %.0f live rows and %.0f dead rows; " + "%d rows in sample, %.0f estimated total rows", + RelationGetRelationName(relation), sql.data, + astate.samplerows, 0.0, + astate.numrows, astate.samplerows))); + + return astate.numrows; +} + +/* + * Custom row processor for acquire_sample_rows. + * + * Collect sample rows from the result of query. + * - Use all tuples as sample until target rows samples are collected. + * - Once reached the target, skip some tuples and replace already sampled + * tuple randomly. + */ +static void +analyze_row_processor(PGresult *res, PgsqlAnalyzeState *astate, bool first) +{ + int targrows = astate->targrows; + TupleDesc tupdesc = astate->tupdesc; + int i; + int j; + int pos; /* position where next sample should be stored. */ + HeapTuple tuple; + ErrorContextCallback errcontext; + MemoryContext callercontext; + + if (first) + { + /* Prepare for sampling rows */ + astate->attinmeta = TupleDescGetAttInMetadata(tupdesc); + astate->values = (Datum *) palloc(sizeof(Datum) * tupdesc->natts); + astate->nulls = (bool *) palloc(sizeof(bool) * tupdesc->natts); + astate->numrows = 0; + astate->samplerows = 0; + astate->rowstoskip = -1; + astate->numrows = 0; + astate->rstate = anl_init_selection_state(astate->targrows); + + /* Do nothing for empty result */ + if (PQntuples(res) == 0) + return; + } + + /* Should have a single-row result if we get here */ + Assert(PQntuples(res) == 1); + + /* + * Do the following work in a temp context that we reset after each tuple. + * This cleans up not only the data we have direct access to, but any + * cruft the I/O functions might leak. + */ + callercontext = MemoryContextSwitchTo(astate->temp_cxt); + + /* + * First targrows rows are once sampled always. If we have more source + * rows, pick up some of them by skipping and replace already sampled + * tuple randomly. + * + * Here we just determine the slot where next sample should be stored. Set + * pos to negative value to indicates the row should be skipped. + */ + if (astate->numrows < targrows) + pos = astate->numrows++; + else + { + /* + * The first targrows sample rows are simply copied into + * the reservoir. Then we start replacing tuples in the + * sample until we reach the end of the relation. This + * algorithm is from Jeff Vitter's paper, similarly to + * acquire_sample_rows in analyze.c. + * + * We don't have block-wise accessibility, so every row in + * the PGresult is possible to be sample. + */ + if (astate->rowstoskip < 0) + astate->rowstoskip = anl_get_next_S(astate->samplerows, targrows, + &astate->rstate); + + if (astate->rowstoskip <= 0) + { + int k = (int) (targrows * anl_random_fract()); + + Assert(k >= 0 && k < targrows); + + /* + * Create sample tuple from the result, and replace at + * random. + */ + heap_freetuple(astate->rows[k]); + pos = k; + } + else + pos = -1; + + astate->rowstoskip -= 1; + } + + /* Always increment sample row counter. */ + astate->samplerows += 1; + + if (pos >= 0) + { + AttInMetadata *attinmeta = astate->attinmeta; + + /* + * Create sample tuple from current result row, and store it into the + * position determined above. Note that i and j point entries in + * catalog and columns array respectively. + */ + for (i = 0, j = 0; i < tupdesc->natts; i++) + { + if (tupdesc->attrs[i]->attisdropped) + continue; + + if (PQgetisnull(res, 0, j)) + astate->nulls[i] = true; + else + { + Datum value; + + astate->nulls[i] = false; + + /* + * Set up and install callback to report where conversion error + * occurs. + */ + astate->errpos.cur_attno = i + 1; + errcontext.callback = postgresql_fdw_error_callback; + errcontext.arg = (void *) &astate->errpos; + errcontext.previous = error_context_stack; + error_context_stack = &errcontext; + + value = InputFunctionCall(&attinmeta->attinfuncs[i], + PQgetvalue(res, 0, j), + attinmeta->attioparams[i], + attinmeta->atttypmods[i]); + astate->values[i] = value; + + /* Uninstall error callback function. */ + error_context_stack = errcontext.previous; + } + j++; + } + + /* + * Generate tuple from the result row data, and store it into the give + * buffer. Note that we need to allocate the tuple in the analyze + * context to make it valid even after temporary per-tuple context has + * been reset. + */ + MemoryContextSwitchTo(astate->anl_cxt); + tuple = heap_form_tuple(tupdesc, astate->values, astate->nulls); + MemoryContextSwitchTo(astate->temp_cxt); + astate->rows[pos] = tuple; + } + + /* Clean up */ + MemoryContextSwitchTo(callercontext); + MemoryContextReset(astate->temp_cxt); + + return; +} diff --git a/contrib/postgresql_fdw/postgresql_fdw.control b/contrib/postgresql_fdw/postgresql_fdw.control new file mode 100644 index 0000000..a87dc80 --- /dev/null +++ b/contrib/postgresql_fdw/postgresql_fdw.control @@ -0,0 +1,5 @@ +# postgresql_fdw extension +comment = 'foreign-data wrapper for remote PostgreSQL servers' +default_version = '1.0' +module_pathname = '$libdir/postgresql_fdw' +relocatable = true diff --git a/contrib/postgresql_fdw/postgresql_fdw.h b/contrib/postgresql_fdw/postgresql_fdw.h new file mode 100644 index 0000000..691e0ff --- /dev/null +++ b/contrib/postgresql_fdw/postgresql_fdw.h @@ -0,0 +1,44 @@ +/*------------------------------------------------------------------------- + * + * postgresql_fdw.h + * foreign-data wrapper for remote PostgreSQL servers. + * + * Copyright (c) 2012, PostgreSQL Global Development Group + * + * IDENTIFICATION + * contrib/postgresql_fdw/postgresql_fdw.h + * + *------------------------------------------------------------------------- + */ + +#ifndef POSTGRESQL_FDW_H +#define POSTGRESQL_FDW_H + +#include "postgres.h" +#include "foreign/foreign.h" +#include "nodes/relation.h" +#include "utils/relcache.h" + +/* in option.c */ +int ExtractConnectionOptions(List *defelems, + const char **keywords, + const char **values); +int GetFetchCountOption(ForeignTable *table, ForeignServer *server); + +/* in deparse.c */ +void deparseSimpleSql(StringInfo buf, + PlannerInfo *root, + RelOptInfo *baserel, + List *local_conds); +void appendWhereClause(StringInfo buf, + bool has_where, + List *exprs, + PlannerInfo *root); +void classifyConditions(PlannerInfo *root, + RelOptInfo *baserel, + List **remote_conds, + List **param_conds, + List **local_conds); +void deparseAnalyzeSql(StringInfo buf, Relation rel); + +#endif /* POSTGRESQL_FDW_H */ diff --git a/contrib/postgresql_fdw/sql/postgresql_fdw.sql b/contrib/postgresql_fdw/sql/postgresql_fdw.sql new file mode 100644 index 0000000..9d971c5 --- /dev/null +++ b/contrib/postgresql_fdw/sql/postgresql_fdw.sql @@ -0,0 +1,304 @@ +-- =================================================================== +-- create FDW objects +-- =================================================================== + +-- Clean up in case a prior regression run failed + +-- Suppress NOTICE messages when roles don't exist +SET client_min_messages TO 'error'; + +DROP ROLE IF EXISTS postgresql_fdw_user; + +RESET client_min_messages; + +CREATE ROLE postgresql_fdw_user LOGIN SUPERUSER; +SET SESSION AUTHORIZATION 'postgresql_fdw_user'; + +CREATE EXTENSION postgresql_fdw; + +CREATE SERVER loopback1 FOREIGN DATA WRAPPER postgresql_fdw; +CREATE SERVER loopback2 FOREIGN DATA WRAPPER postgresql_fdw + OPTIONS (dbname 'contrib_regression'); + +CREATE USER MAPPING FOR public SERVER loopback1 + OPTIONS (user 'value', password 'value'); +CREATE USER MAPPING FOR postgresql_fdw_user SERVER loopback2; + +-- =================================================================== +-- create objects used through FDW +-- =================================================================== +CREATE TYPE user_enum AS ENUM ('foo', 'bar', 'buz'); +CREATE SCHEMA "S 1"; +CREATE TABLE "S 1"."T 1" ( + "C 1" int NOT NULL, + c2 int NOT NULL, + c3 text, + c4 timestamptz, + c5 timestamp, + c6 varchar(10), + c7 char(10), + c8 user_enum, + CONSTRAINT t1_pkey PRIMARY KEY ("C 1") +); +CREATE TABLE "S 1"."T 2" ( + c1 int NOT NULL, + c2 text, + CONSTRAINT t2_pkey PRIMARY KEY (c1) +); + +BEGIN; +TRUNCATE "S 1"."T 1"; +INSERT INTO "S 1"."T 1" + SELECT id, + id % 10, + to_char(id, 'FM00000'), + '1970-01-01'::timestamptz + ((id % 100) || ' days')::interval, + '1970-01-01'::timestamp + ((id % 100) || ' days')::interval, + id % 10, + id % 10, + 'foo'::user_enum + FROM generate_series(1, 1000) id; +TRUNCATE "S 1"."T 2"; +INSERT INTO "S 1"."T 2" + SELECT id, + 'AAA' || to_char(id, 'FM000') + FROM generate_series(1, 100) id; +COMMIT; + +-- =================================================================== +-- create foreign tables +-- =================================================================== +CREATE FOREIGN TABLE ft1 ( + c0 int, + c1 int NOT NULL, + c2 int NOT NULL, + c3 text, + c4 timestamptz, + c5 timestamp, + c6 varchar(10), + c7 char(10), + c8 user_enum +) SERVER loopback2; +ALTER FOREIGN TABLE ft1 DROP COLUMN c0; + +CREATE FOREIGN TABLE ft2 ( + c0 int, + c1 int NOT NULL, + c2 int NOT NULL, + c3 text, + c4 timestamptz, + c5 timestamp, + c6 varchar(10), + c7 char(10), + c8 user_enum +) SERVER loopback2; +ALTER FOREIGN TABLE ft2 DROP COLUMN c0; + +-- =================================================================== +-- tests for postgresql_fdw_validator +-- =================================================================== +ALTER FOREIGN DATA WRAPPER postgresql_fdw OPTIONS (host 'value'); -- ERROR +-- requiressl, krbsrvname and gsslib are omitted because they depend on +-- configure option +ALTER SERVER loopback1 OPTIONS ( + authtype 'value', + service 'value', + connect_timeout 'value', + dbname 'value', + host 'value', + hostaddr 'value', + port 'value', + --client_encoding 'value', + tty 'value', + options 'value', + application_name 'value', + --fallback_application_name 'value', + keepalives 'value', + keepalives_idle 'value', + keepalives_interval 'value', + -- requiressl 'value', + sslcompression 'value', + sslmode 'value', + sslcert 'value', + sslkey 'value', + sslrootcert 'value', + sslcrl 'value' + --requirepeer 'value', + -- krbsrvname 'value', + -- gsslib 'value', + --replication 'value' +); +ALTER SERVER loopback1 OPTIONS (user 'value'); -- ERROR +ALTER USER MAPPING FOR public SERVER loopback1 + OPTIONS (DROP user, DROP password); +ALTER USER MAPPING FOR public SERVER loopback1 + OPTIONS (host 'value'); -- ERROR +ALTER FOREIGN TABLE ft1 OPTIONS (nspname 'S 1', relname 'T 1'); +ALTER FOREIGN TABLE ft2 OPTIONS (nspname 'S 1', relname 'T 1'); +ALTER FOREIGN TABLE ft1 OPTIONS (invalid 'value'); -- ERROR +ALTER FOREIGN TABLE ft1 ALTER COLUMN c1 OPTIONS (invalid 'value'); -- ERROR +ALTER FOREIGN TABLE ft1 ALTER COLUMN c1 OPTIONS (colname 'C 1'); +ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (colname 'C 1'); +\dew+ +\des+ +\deu+ +\det+ + +-- Use only Nested loop for stable results. +SET enable_mergejoin TO off; +SET enable_hashjoin TO off; + +-- =================================================================== +-- simple queries +-- =================================================================== +-- single table, with/without alias +EXPLAIN (COSTS false) SELECT * FROM ft1 ORDER BY c3, c1 OFFSET 100 LIMIT 10; +SELECT * FROM ft1 ORDER BY c3, c1 OFFSET 100 LIMIT 10; +EXPLAIN (COSTS false) SELECT * FROM ft1 t1 ORDER BY t1.c3, t1.c1 OFFSET 100 LIMIT 10; +SELECT * FROM ft1 t1 ORDER BY t1.c3, t1.c1 OFFSET 100 LIMIT 10; +-- empty result +SELECT * FROM ft1 WHERE false; +-- with WHERE clause +EXPLAIN (COSTS false) SELECT * FROM ft1 t1 WHERE t1.c1 = 101 AND t1.c6 = '1' AND t1.c7 >= '1'; +SELECT * FROM ft1 t1 WHERE t1.c1 = 101 AND t1.c6 = '1' AND t1.c7 >= '1'; +-- aggregate +SELECT COUNT(*) FROM ft1 t1; +-- join two tables +SELECT t1.c1 FROM ft1 t1 JOIN ft2 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c3, t1.c1 OFFSET 100 LIMIT 10; +-- subquery +SELECT * FROM ft1 t1 WHERE t1.c3 IN (SELECT c3 FROM ft2 t2 WHERE c1 <= 10) ORDER BY c1; +-- subquery+MAX +SELECT * FROM ft1 t1 WHERE t1.c3 = (SELECT MAX(c3) FROM ft2 t2) ORDER BY c1; +-- used in CTE +WITH t1 AS (SELECT * FROM ft1 WHERE c1 <= 10) SELECT t2.c1, t2.c2, t2.c3, t2.c4 FROM t1, ft2 t2 WHERE t1.c1 = t2.c1 ORDER BY t1.c1; +-- fixed values +SELECT 'fixed', NULL FROM ft1 t1 WHERE c1 = 1; +-- user-defined operator/function +CREATE FUNCTION postgresql_fdw_abs(int) RETURNS int AS $$ +BEGIN +RETURN abs($1); +END +$$ LANGUAGE plpgsql IMMUTABLE; +CREATE OPERATOR === ( + LEFTARG = int, + RIGHTARG = int, + PROCEDURE = int4eq, + COMMUTATOR = ===, + NEGATOR = !== +); +EXPLAIN (COSTS false) SELECT * FROM ft1 t1 WHERE t1.c1 = postgresql_fdw_abs(t1.c2); +EXPLAIN (COSTS false) SELECT * FROM ft1 t1 WHERE t1.c1 === t1.c2; +EXPLAIN (COSTS false) SELECT * FROM ft1 t1 WHERE t1.c1 = abs(t1.c2); +EXPLAIN (COSTS false) SELECT * FROM ft1 t1 WHERE t1.c1 = t1.c2; + +-- =================================================================== +-- WHERE push down +-- =================================================================== +EXPLAIN (COSTS false) SELECT * FROM ft1 t1 WHERE t1.c1 = 1; -- Var, OpExpr(b), Const +EXPLAIN (COSTS false) SELECT * FROM ft1 t1 WHERE t1.c1 = 100 AND t1.c2 = 0; -- BoolExpr +EXPLAIN (COSTS false) SELECT * FROM ft1 t1 WHERE c1 IS NULL; -- NullTest +EXPLAIN (COSTS false) SELECT * FROM ft1 t1 WHERE c1 IS NOT NULL; -- NullTest +EXPLAIN (COSTS false) SELECT * FROM ft1 t1 WHERE round(abs(c1), 0) = 1; -- FuncExpr +EXPLAIN (COSTS false) SELECT * FROM ft1 t1 WHERE c1 = -c1; -- OpExpr(l) +EXPLAIN (COSTS false) SELECT * FROM ft1 t1 WHERE 1 = c1!; -- OpExpr(r) +EXPLAIN (COSTS false) SELECT * FROM ft1 t1 WHERE (c1 IS NOT NULL) IS DISTINCT FROM (c1 IS NOT NULL); -- DistinctExpr +EXPLAIN (COSTS false) SELECT * FROM ft1 t1 WHERE c1 = ANY(ARRAY[c2, 1, c1 + 0]); -- ScalarArrayOpExpr +EXPLAIN (COSTS false) SELECT * FROM ft1 t1 WHERE c1 = (ARRAY[c1,c2,3])[1]; -- ArrayRef +EXPLAIN (COSTS false) SELECT * FROM ft1 t1 WHERE c8 = 'foo'; -- no push-down + +-- =================================================================== +-- parameterized queries +-- =================================================================== +-- simple join +PREPARE st1(int, int) AS SELECT t1.c3, t2.c3 FROM ft1 t1, ft2 t2 WHERE t1.c1 = $1 AND t2.c1 = $2; +EXPLAIN (COSTS false) EXECUTE st1(1, 2); +EXECUTE st1(1, 1); +EXECUTE st1(101, 101); +-- subquery using stable function (can't be pushed down) +PREPARE st2(int) AS SELECT * FROM ft1 t1 WHERE t1.c1 < $2 AND t1.c3 IN (SELECT c3 FROM ft2 t2 WHERE c1 > $1 AND EXTRACT(dow FROM c4) = 6) ORDER BY c1; +EXPLAIN (COSTS false) EXECUTE st2(10, 20); +EXECUTE st2(10, 20); +EXECUTE st1(101, 101); +-- subquery using immutable function (can be pushed down) +PREPARE st3(int) AS SELECT * FROM ft1 t1 WHERE t1.c1 < $2 AND t1.c3 IN (SELECT c3 FROM ft2 t2 WHERE c1 > $1 AND EXTRACT(dow FROM c5) = 6) ORDER BY c1; +EXPLAIN (COSTS false) EXECUTE st3(10, 20); +EXECUTE st3(10, 20); +EXECUTE st3(20, 30); +-- custom plan should be chosen +PREPARE st4(int) AS SELECT * FROM ft1 t1 WHERE t1.c1 = $1; +EXPLAIN (COSTS false) EXECUTE st4(1); +EXPLAIN (COSTS false) EXECUTE st4(1); +EXPLAIN (COSTS false) EXECUTE st4(1); +EXPLAIN (COSTS false) EXECUTE st4(1); +EXPLAIN (COSTS false) EXECUTE st4(1); +EXPLAIN (COSTS false) EXECUTE st4(1); +-- cleanup +DEALLOCATE st1; +DEALLOCATE st2; +DEALLOCATE st3; +DEALLOCATE st4; + +-- =================================================================== +-- used in pl/pgsql function +-- =================================================================== +CREATE OR REPLACE FUNCTION f_test(p_c1 int) RETURNS int AS $$ +DECLARE + v_c1 int; +BEGIN + SELECT c1 INTO v_c1 FROM ft1 WHERE c1 = p_c1 LIMIT 1; + PERFORM c1 FROM ft1 WHERE c1 = p_c1 AND p_c1 = v_c1 LIMIT 1; + RETURN v_c1; +END; +$$ LANGUAGE plpgsql; +SELECT f_test(100); +DROP FUNCTION f_test(int); + +-- =================================================================== +-- connection management +-- =================================================================== +SELECT srvname, usename FROM postgresql_fdw_connections; +SELECT postgresql_fdw_disconnect(srvid, usesysid) FROM postgresql_fdw_get_connections(); +SELECT srvname, usename FROM postgresql_fdw_connections; + +-- =================================================================== +-- conversion error +-- =================================================================== +ALTER FOREIGN TABLE ft1 ALTER COLUMN c5 TYPE int; +SELECT * FROM ft1 WHERE c1 = 1; -- ERROR +ALTER FOREIGN TABLE ft1 ALTER COLUMN c5 TYPE timestamp; + +-- =================================================================== +-- subtransaction +-- + local/remote error doesn't break cursor +-- + remote error discards connection +-- =================================================================== +BEGIN; +DECLARE c CURSOR FOR SELECT * FROM ft1 ORDER BY c1; +FETCH c; +SAVEPOINT s; +ERROR OUT; -- ERROR +ROLLBACK TO s; +SELECT srvname FROM postgresql_fdw_connections; +FETCH c; +SAVEPOINT s; +SELECT * FROM ft1 WHERE 1 / (c1 - 1) > 0; -- ERROR +ROLLBACK TO s; +SELECT srvname FROM postgresql_fdw_connections; +FETCH c; +SELECT * FROM ft1 ORDER BY c1 LIMIT 1; +COMMIT; +SELECT srvname FROM postgresql_fdw_connections; +ERROR OUT; -- ERROR +SELECT srvname FROM postgresql_fdw_connections; + +-- =================================================================== +-- cleanup +-- =================================================================== +DROP OPERATOR === (int, int) CASCADE; +DROP OPERATOR !== (int, int) CASCADE; +DROP FUNCTION postgresql_fdw_abs(int); +DROP SCHEMA "S 1" CASCADE; +DROP TYPE user_enum CASCADE; +DROP EXTENSION postgresql_fdw CASCADE; +\c +DROP ROLE postgresql_fdw_user; diff --git a/doc/src/sgml/contrib.sgml b/doc/src/sgml/contrib.sgml index 6b13a0a..4ffa2fa 100644 --- a/doc/src/sgml/contrib.sgml +++ b/doc/src/sgml/contrib.sgml @@ -132,6 +132,7 @@ CREATE EXTENSION module_name FROM unpackaged; &pgstatstatements; &pgstattuple; &pgtrgm; + &postgresql-fdw; &seg; &sepgsql; &contrib-spi; diff --git a/doc/src/sgml/filelist.sgml b/doc/src/sgml/filelist.sgml index db4cc3a..373582a 100644 --- a/doc/src/sgml/filelist.sgml +++ b/doc/src/sgml/filelist.sgml @@ -133,6 +133,7 @@ + diff --git a/doc/src/sgml/postgresql-fdw.sgml b/doc/src/sgml/postgresql-fdw.sgml new file mode 100644 index 0000000..b1c4e36 --- /dev/null +++ b/doc/src/sgml/postgresql-fdw.sgml @@ -0,0 +1,235 @@ + + + + postgresql_fdw + + + postgresql_fdw + + + + The postgresql_fdw module provides a foreign-data + wrapper for external PostgreSQL servers. + With this module, users can access data stored in external + PostgreSQL via plain SQL statements. + + + + Note that default wrapper postgresql_fdw is created + automatically during CREATE EXTENSION command for + postgresql_fdw. + + + + FDW Options of postgresql_fdw + + + Connection Options + + A foreign server and user mapping created using this wrapper can have + libpq connection options, expect below: + + + client_encoding + fallback_application_name + replication + + + For details of libpq connection options, see + . + + + + user and password can be + specified on user mappings, and others can be specified on foreign servers. + + + + + Object Name Options + + Foreign tables which were created using this wrapper, or its columns can + have object name options. These options can be used to specify the names + used in SQL statement sent to remote PostgreSQL + server. These options are useful when a remote object has different name + from corresponding local one. + + + + + + nspname + + + This option, which can be specified on a foreign table, is used as a + namespace (schema) reference in the SQL statement. If this options is + omitted, pg_class.nspname of the foreign table is + used. + + + + + + relname + + + This option, which can be specified on a foreign table, is used as a + relation (table) reference in the SQL statement. If this options is + omitted, pg_class.relname of the foreign table is + used. + + + + + + colname + + + This option, which can be specified on a column of a foreign table, is + used as a column (attribute) reference in the SQL statement. If this + option is omitted, pg_attribute.attname of the column + of the foreign table is used. + + + + + + + + + + + + Connection Management + + + The postgresql_fdw establishes a connection to a + foreign server in the beginning of the first query which uses a foreign + table associated to the foreign server, and reuses the connection following + queries and even in following foreign scans in same query. + + You can see the list of active connections via + postgresql_fdw_connections view. It shows pair of + oid and name of server and local role for each active connections + established by postgresql_fdw. For security + reason, only superuser can see other role's connections. + + + + Established connections are kept alive until local role changes or the + current transaction aborts or user requests so. + + + + If role has been changed, active connections established as old local role + is kept alive but never be reused until local role has restored to original + role. This kind of situation happens with SET ROLE and + SET SESSION AUTHORIZATION. + + + + If current transaction aborts by error or user request, all active + connections are disconnected automatically. This behavior avoids possible + connection leaks on error. + + + + You can discard persistent connection at arbitrary timing with + postgresql_fdw_disconnect(). It takes server oid and + user oid as arguments. This function can handle only connections + established in current session; connections established by other backends + are not reachable. + + + + You can discard all active and visible connections in current session with + using postgresql_fdw_connections and + postgresql_fdw_disconnect() together: + +postgres=# SELECT postgresql_fdw_disconnect(srvid, usesysid) FROM postgresql_fdw_connections; + postgresql_fdw_disconnect +---------------------- + OK + OK +(2 rows) + + + + + + Transaction Management + + The postgresql_fdw begins remote transaction at + the beginning of a local query, and terminates it with + ABORT at the end of the local query. This means that all + foreign scans on a foreign server in a local query are executed in one + transaction. + If isolation level of local transaction is SERIALIZABLE, + SERIALIZABLE is used for remote transaction. Otherwise, + if isolation level of local transaction is one of + READ UNCOMMITTED, READ COMMITTED or + REPEATABLE READ, then REPEATABLE READ + is used for remote transaction. + READ UNCOMMITTED and READ COMMITTED + are never used for remote transaction, because even + READ COMMITTED transaction might produce inconsistent + results, if remote data have been updated between two remote queries. + + + Note that even if the isolation level of local transaction was + SERIALIZABLE or REPEATABLE READ, + series of one query might produce different result, because foreign scans + in different local queries are executed in different remote transactions. + For instance, when client started a local transaction + explicitly with isolation level SERIALIZABLE, and + executed same local query which contains a foreign table which references + foreign data which is updated frequently, latter result would be different + from former result. + + + This restriction might be relaxed in future release. + + + + + Estimation of Costs and Rows + + The postgresql_fdw estimates the costs of a + foreign scan by adding up some basic costs: connection costs, remote query + costs and data transfer costs. + To get remote query costs, postgresql_fdw executes + EXPLAIN command on remote server for each foreign scan. + + + On the other hand, estimated rows which was returned by + EXPLAIN is used for local estimation as-is. + + + + + EXPLAIN Output + + For a foreign table using postgresql_fdw, EXPLAIN + shows a remote SQL statement which is sent to remote + PostgreSQL server for a ForeignScan plan node. + For example: + + +postgres=# EXPLAIN SELECT aid FROM pgbench_accounts WHERE abalance < 0; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------- + Foreign Scan on pgbench_accounts (cost=100.00..8105.13 rows=302613 width=8) + Filter: (abalance < 0) + Remote SQL: SELECT aid, NULL, abalance, NULL FROM public.pgbench_accounts +(3 rows) + + + + + Author + + Shigeru Hanada shigeru.hanada@gmail.com + + + +