Index: contrib/dblink/dblink.c =================================================================== RCS file: /cvsroot/pgsql/contrib/dblink/dblink.c,v retrieving revision 1.43 diff -c -c -r1.43 dblink.c *** contrib/dblink/dblink.c 30 May 2005 23:09:06 -0000 1.43 --- contrib/dblink/dblink.c 6 Oct 2005 04:18:34 -0000 *************** *** 60,67 **** typedef struct remoteConn { ! PGconn *con; /* Hold the remote connection */ ! bool remoteTrFlag; /* Indicates whether or not a transaction * on remote database is in progress */ } remoteConn; --- 60,67 ---- typedef struct remoteConn { ! PGconn *conn; /* Hold the remote connection */ ! bool remoteXactOpen; /* Indicates whether or not a transaction * on remote database is in progress */ } remoteConn; *************** *** 86,110 **** /* Global */ List *res_id = NIL; int res_id_index = 0; - PGconn *persistent_conn = NULL; static HTAB *remoteConnHash = NULL; /* ! Following is list that holds multiple remote connections. ! Calling convention of each dblink function changes to accept ! connection name as the first parameter. The connection list is ! much like ecpg e.g. a mapping between a name and a PGconn object. */ typedef struct remoteConnHashEnt { char name[NAMEDATALEN]; ! remoteConn *rcon; } remoteConnHashEnt; /* initial number of connection hashes */ #define NUMCONN 16 /* general utility */ #define GET_TEXT(cstrp) DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(cstrp))) #define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp))) --- 86,115 ---- /* Global */ List *res_id = NIL; int res_id_index = 0; static HTAB *remoteConnHash = NULL; /* ! * Following is list that holds multiple remote connections. ! * Calling convention of each dblink function changes to accept ! * connection name as the first parameter. The connection list is ! * much like ecpg e.g. a mapping between a name and a PGconn object. */ typedef struct remoteConnHashEnt { char name[NAMEDATALEN]; ! remoteConn *rconn; /* EMPTY_CONNECTION_NAME also possible */ } remoteConnHashEnt; /* initial number of connection hashes */ #define NUMCONN 16 + /* + * Because the argument protocol is V1, no connection name behaves + * the same as a NULL-passed connection name + */ + #define EMPTY_CONNECTION_NAME "" + /* general utility */ #define GET_TEXT(cstrp) DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(cstrp))) #define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp))) *************** *** 116,131 **** var_ = NULL; \ } \ } while (0) #define DBLINK_RES_INTERNALERROR(p2) \ do { \ ! msg = pstrdup(PQerrorMessage(conn)); \ if (res) \ PQclear(res); \ elog(ERROR, "%s: %s", p2, msg); \ } while (0) #define DBLINK_RES_ERROR(p2) \ do { \ ! msg = pstrdup(PQerrorMessage(conn)); \ if (res) \ PQclear(res); \ ereport(ERROR, \ --- 121,138 ---- var_ = NULL; \ } \ } while (0) + #define DBLINK_RES_INTERNALERROR(p2) \ do { \ ! msg = pstrdup(PQerrorMessage(rconn->conn)); \ if (res) \ PQclear(res); \ elog(ERROR, "%s: %s", p2, msg); \ } while (0) + #define DBLINK_RES_ERROR(p2) \ do { \ ! msg = pstrdup(PQerrorMessage(rconn->conn)); \ if (res) \ PQclear(res); \ ereport(ERROR, \ *************** *** 133,141 **** errmsg("%s", p2), \ errdetail("%s", msg))); \ } while (0) #define DBLINK_RES_ERROR_AS_NOTICE(p2) \ do { \ ! msg = pstrdup(PQerrorMessage(conn)); \ if (res) \ PQclear(res); \ ereport(NOTICE, \ --- 140,149 ---- errmsg("%s", p2), \ errdetail("%s", msg))); \ } while (0) + #define DBLINK_RES_ERROR_AS_NOTICE(p2) \ do { \ ! msg = pstrdup(PQerrorMessage(rconn->conn)); \ if (res) \ PQclear(res); \ ereport(NOTICE, \ *************** *** 143,151 **** errmsg("%s", p2), \ errdetail("%s", msg))); \ } while (0) #define DBLINK_CONN_NOT_AVAIL \ do { \ ! if(conname) \ ereport(ERROR, \ (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \ errmsg("connection \"%s\" not available", conname))); \ --- 151,160 ---- errmsg("%s", p2), \ errdetail("%s", msg))); \ } while (0) + #define DBLINK_CONN_NOT_AVAIL \ do { \ ! if (conname && strlen(conname) != 0) \ ereport(ERROR, \ (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \ errmsg("connection \"%s\" not available", conname))); \ *************** *** 154,181 **** (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \ errmsg("connection not available"))); \ } while (0) #define DBLINK_GET_CONN \ do { \ ! char *conname_or_str = GET_STR(PG_GETARG_TEXT_P(0)); \ ! rcon = getConnectionByName(conname_or_str); \ ! if(rcon) \ ! { \ ! conn = rcon->con; \ ! } \ ! else \ { \ ! connstr = conname_or_str; \ ! conn = PQconnectdb(connstr); \ ! if (PQstatus(conn) == CONNECTION_BAD) \ { \ ! msg = pstrdup(PQerrorMessage(conn)); \ ! PQfinish(conn); \ ereport(ERROR, \ (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), \ errmsg("could not establish connection"), \ errdetail("%s", msg))); \ } \ ! freeconn = true; \ } \ } while (0) --- 163,194 ---- (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \ errmsg("connection not available"))); \ } while (0) + #define DBLINK_GET_CONN \ do { \ ! rconn = getConnectionByName(conname); \ ! if(!rconn) \ { \ ! /* \ ! * Does not match connection name, so must be conn string. \ ! * Create an rconn structure that we will free before the \ ! * function completes. Don't bother storing it in the hash. \ ! */ \ ! rconn = (remoteConn *) palloc(sizeof(remoteConn)); \ ! rconn->conn = PQconnectdb(conname); \ ! rconn->remoteXactOpen = false; \ ! conname = EMPTY_CONNECTION_NAME; \ ! if (PQstatus(rconn->conn) == CONNECTION_BAD) \ { \ ! msg = pstrdup(PQerrorMessage(rconn->conn)); \ ! PQfinish(rconn->conn); \ ! pfree(rconn); \ ereport(ERROR, \ (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), \ errmsg("could not establish connection"), \ errdetail("%s", msg))); \ } \ ! rconn_is_local = true; \ } \ } while (0) *************** *** 187,221 **** Datum dblink_connect(PG_FUNCTION_ARGS) { ! char *connstr = NULL; ! char *connname = NULL; char *msg; MemoryContext oldcontext; ! PGconn *conn = NULL; ! remoteConn *rcon = NULL; if (PG_NARGS() == 2) { connstr = GET_STR(PG_GETARG_TEXT_P(1)); - connname = GET_STR(PG_GETARG_TEXT_P(0)); } ! else if (PG_NARGS() == 1) connstr = GET_STR(PG_GETARG_TEXT_P(0)); ! oldcontext = MemoryContextSwitchTo(TopMemoryContext); ! if (connname) ! rcon = (remoteConn *) palloc(sizeof(remoteConn)); ! conn = PQconnectdb(connstr); MemoryContextSwitchTo(oldcontext); ! if (PQstatus(conn) == CONNECTION_BAD) { ! msg = pstrdup(PQerrorMessage(conn)); ! PQfinish(conn); ! if (rcon) ! pfree(rcon); ereport(ERROR, (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), --- 200,236 ---- Datum dblink_connect(PG_FUNCTION_ARGS) { ! char *connstr; ! char *conname; char *msg; MemoryContext oldcontext; ! remoteConn *rconn; if (PG_NARGS() == 2) { + conname = GET_STR(PG_GETARG_TEXT_P(0)); connstr = GET_STR(PG_GETARG_TEXT_P(1)); } ! else ! { ! Assert(PG_NARGS() == 1); ! conname = EMPTY_CONNECTION_NAME; connstr = GET_STR(PG_GETARG_TEXT_P(0)); ! } ! oldcontext = MemoryContextSwitchTo(TopMemoryContext); ! rconn = (remoteConn *) palloc(sizeof(remoteConn)); ! ! rconn->conn = PQconnectdb(connstr); MemoryContextSwitchTo(oldcontext); ! if (PQstatus(rconn->conn) == CONNECTION_BAD) { ! msg = pstrdup(PQerrorMessage(rconn->conn)); ! PQfinish(rconn->conn); ! pfree(rconn); ereport(ERROR, (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), *************** *** 223,235 **** errdetail("%s", msg))); } ! if (connname) ! { ! rcon->con = conn; ! createNewConnection(connname, rcon); ! } ! else ! persistent_conn = conn; PG_RETURN_TEXT_P(GET_TEXT("OK")); } --- 238,244 ---- errdetail("%s", msg))); } ! createNewConnection(conname, rconn); PG_RETURN_TEXT_P(GET_TEXT("OK")); } *************** *** 241,271 **** Datum dblink_disconnect(PG_FUNCTION_ARGS) { ! char *conname = NULL; ! remoteConn *rcon = NULL; ! PGconn *conn = NULL; if (PG_NARGS() == 1) - { conname = GET_STR(PG_GETARG_TEXT_P(0)); - rcon = getConnectionByName(conname); - if (rcon) - conn = rcon->con; - } else ! conn = persistent_conn; ! if (!conn) DBLINK_CONN_NOT_AVAIL; ! PQfinish(conn); ! if (rcon) ! { ! deleteConnection(conname); ! pfree(rcon); ! } ! else ! persistent_conn = NULL; PG_RETURN_TEXT_P(GET_TEXT("OK")); } --- 250,275 ---- Datum dblink_disconnect(PG_FUNCTION_ARGS) { ! char *conname; ! remoteConn *rconn; if (PG_NARGS() == 1) conname = GET_STR(PG_GETARG_TEXT_P(0)); else ! { ! Assert(PG_NARGS() == 0); ! conname = EMPTY_CONNECTION_NAME; ! } ! ! rconn = getConnectionByName(conname); ! if (!rconn || !rconn->conn) DBLINK_CONN_NOT_AVAIL; ! PQfinish(rconn->conn); ! ! deleteConnection(conname); ! pfree(rconn); PG_RETURN_TEXT_P(GET_TEXT("OK")); } *************** *** 277,342 **** Datum dblink_open(PG_FUNCTION_ARGS) { char *msg; ! PGresult *res = NULL; ! PGconn *conn = NULL; ! char *curname = NULL; ! char *sql = NULL; ! char *conname = NULL; StringInfo str = makeStringInfo(); - remoteConn *rcon = NULL; bool fail = true; /* default to backward compatible behavior */ ! if (PG_NARGS() == 2) { ! /* text,text */ ! curname = GET_STR(PG_GETARG_TEXT_P(0)); ! sql = GET_STR(PG_GETARG_TEXT_P(1)); ! conn = persistent_conn; } else if (PG_NARGS() == 3) { /* might be text,text,text or text,text,bool */ if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID) { curname = GET_STR(PG_GETARG_TEXT_P(0)); sql = GET_STR(PG_GETARG_TEXT_P(1)); fail = PG_GETARG_BOOL(2); - conn = persistent_conn; } else { conname = GET_STR(PG_GETARG_TEXT_P(0)); curname = GET_STR(PG_GETARG_TEXT_P(1)); sql = GET_STR(PG_GETARG_TEXT_P(2)); - rcon = getConnectionByName(conname); - if (rcon) - conn = rcon->con; } } ! else if (PG_NARGS() == 4) { ! /* text,text,text,bool */ ! conname = GET_STR(PG_GETARG_TEXT_P(0)); ! curname = GET_STR(PG_GETARG_TEXT_P(1)); ! sql = GET_STR(PG_GETARG_TEXT_P(2)); ! fail = PG_GETARG_BOOL(3); ! rcon = getConnectionByName(conname); ! if (rcon) ! conn = rcon->con; } ! if (!conn) DBLINK_CONN_NOT_AVAIL; ! res = PQexec(conn, "BEGIN"); ! if (PQresultStatus(res) != PGRES_COMMAND_OK) ! DBLINK_RES_INTERNALERROR("begin error"); ! PQclear(res); appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname, sql); ! res = PQexec(conn, str->data); if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { if (fail) --- 281,345 ---- Datum dblink_open(PG_FUNCTION_ARGS) { + char *curname; + char *conname; + remoteConn *rconn; char *msg; ! PGresult *res; ! char *sql; StringInfo str = makeStringInfo(); bool fail = true; /* default to backward compatible behavior */ ! if (PG_NARGS() == 4) { ! /* text,text,text,bool */ ! conname = GET_STR(PG_GETARG_TEXT_P(0)); ! curname = GET_STR(PG_GETARG_TEXT_P(1)); ! sql = GET_STR(PG_GETARG_TEXT_P(2)); ! fail = PG_GETARG_BOOL(3); } else if (PG_NARGS() == 3) { /* might be text,text,text or text,text,bool */ if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID) { + conname = EMPTY_CONNECTION_NAME; curname = GET_STR(PG_GETARG_TEXT_P(0)); sql = GET_STR(PG_GETARG_TEXT_P(1)); fail = PG_GETARG_BOOL(2); } else { conname = GET_STR(PG_GETARG_TEXT_P(0)); curname = GET_STR(PG_GETARG_TEXT_P(1)); sql = GET_STR(PG_GETARG_TEXT_P(2)); } } ! else { ! /* text,text */ ! Assert(PG_NARGS() == 2); ! conname = EMPTY_CONNECTION_NAME; ! curname = GET_STR(PG_GETARG_TEXT_P(0)); ! sql = GET_STR(PG_GETARG_TEXT_P(1)); } ! rconn = getConnectionByName(conname); ! if (!rconn || !rconn->conn) DBLINK_CONN_NOT_AVAIL; ! rconn->remoteXactOpen = (PQtransactionStatus(rconn->conn) != PQTRANS_IDLE); ! if (!rconn->remoteXactOpen) ! { ! res = PQexec(rconn->conn, "BEGIN"); ! if (PQresultStatus(res) != PGRES_COMMAND_OK) ! DBLINK_RES_INTERNALERROR("begin error"); ! PQclear(res); ! } appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname, sql); ! res = PQexec(rconn->conn, str->data); if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { if (fail) *************** *** 359,415 **** Datum dblink_close(PG_FUNCTION_ARGS) { ! PGconn *conn = NULL; ! PGresult *res = NULL; ! char *curname = NULL; ! char *conname = NULL; StringInfo str = makeStringInfo(); char *msg; - remoteConn *rcon = NULL; bool fail = true; /* default to backward compatible behavior */ ! if (PG_NARGS() == 1) { ! /* text */ ! curname = GET_STR(PG_GETARG_TEXT_P(0)); ! conn = persistent_conn; } else if (PG_NARGS() == 2) { /* might be text,text or text,bool */ if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID) { curname = GET_STR(PG_GETARG_TEXT_P(0)); fail = PG_GETARG_BOOL(1); - conn = persistent_conn; } else { conname = GET_STR(PG_GETARG_TEXT_P(0)); curname = GET_STR(PG_GETARG_TEXT_P(1)); - rcon = getConnectionByName(conname); - if (rcon) - conn = rcon->con; } } ! if (PG_NARGS() == 3) { ! /* text,text,bool */ ! conname = GET_STR(PG_GETARG_TEXT_P(0)); ! curname = GET_STR(PG_GETARG_TEXT_P(1)); ! fail = PG_GETARG_BOOL(2); ! rcon = getConnectionByName(conname); ! if (rcon) ! conn = rcon->con; } ! if (!conn) DBLINK_CONN_NOT_AVAIL; appendStringInfo(str, "CLOSE %s", curname); /* close the cursor */ ! res = PQexec(conn, str->data); if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { if (fail) --- 362,413 ---- Datum dblink_close(PG_FUNCTION_ARGS) { ! char *curname; ! char *conname; ! remoteConn *rconn; StringInfo str = makeStringInfo(); + PGresult *res; char *msg; bool fail = true; /* default to backward compatible behavior */ ! if (PG_NARGS() == 3) { ! /* text,text,bool */ ! conname = GET_STR(PG_GETARG_TEXT_P(0)); ! curname = GET_STR(PG_GETARG_TEXT_P(1)); ! fail = PG_GETARG_BOOL(2); } else if (PG_NARGS() == 2) { /* might be text,text or text,bool */ if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID) { + conname = EMPTY_CONNECTION_NAME; curname = GET_STR(PG_GETARG_TEXT_P(0)); fail = PG_GETARG_BOOL(1); } else { conname = GET_STR(PG_GETARG_TEXT_P(0)); curname = GET_STR(PG_GETARG_TEXT_P(1)); } } ! else { ! /* text */ ! Assert(PG_NARGS() == 1); ! conname = EMPTY_CONNECTION_NAME; ! curname = GET_STR(PG_GETARG_TEXT_P(0)); } ! rconn = getConnectionByName(conname); ! if (!rconn || !rconn->conn) DBLINK_CONN_NOT_AVAIL; appendStringInfo(str, "CLOSE %s", curname); /* close the cursor */ ! res = PQexec(rconn->conn, str->data); if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { if (fail) *************** *** 423,434 **** PQclear(res); ! /* commit the transaction */ ! res = PQexec(conn, "COMMIT"); ! if (PQresultStatus(res) != PGRES_COMMAND_OK) ! DBLINK_RES_INTERNALERROR("commit error"); ! ! PQclear(res); PG_RETURN_TEXT_P(GET_TEXT("OK")); } --- 421,435 ---- PQclear(res); ! /* We are using the Xact status we recorded during the open */ ! if (!rconn->remoteXactOpen) ! { ! /* commit the transaction */ ! res = PQexec(rconn->conn, "COMMIT"); ! if (PQresultStatus(res) != PGRES_COMMAND_OK) ! DBLINK_RES_INTERNALERROR("commit error"); ! PQclear(res); ! } PG_RETURN_TEXT_P(GET_TEXT("OK")); } *************** *** 440,445 **** --- 441,448 ---- Datum dblink_fetch(PG_FUNCTION_ARGS) { + char *conname = NULL; + remoteConn *rconn = NULL; FuncCallContext *funcctx; TupleDesc tupdesc = NULL; int call_cntr; *************** *** 448,462 **** char *msg; PGresult *res = NULL; MemoryContext oldcontext; - char *conname = NULL; - remoteConn *rcon = NULL; /* stuff done only on the first call of the function */ if (SRF_IS_FIRSTCALL()) { ! PGconn *conn = NULL; StringInfo str = makeStringInfo(); - char *curname = NULL; int howmany = 0; bool fail = true; /* default to backward compatible */ --- 451,462 ---- char *msg; PGresult *res = NULL; MemoryContext oldcontext; /* stuff done only on the first call of the function */ if (SRF_IS_FIRSTCALL()) { ! char *curname; StringInfo str = makeStringInfo(); int howmany = 0; bool fail = true; /* default to backward compatible */ *************** *** 467,507 **** curname = GET_STR(PG_GETARG_TEXT_P(1)); howmany = PG_GETARG_INT32(2); fail = PG_GETARG_BOOL(3); - - rcon = getConnectionByName(conname); - if (rcon) - conn = rcon->con; } else if (PG_NARGS() == 3) { /* text,text,int or text,int,bool */ if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID) { curname = GET_STR(PG_GETARG_TEXT_P(0)); howmany = PG_GETARG_INT32(1); fail = PG_GETARG_BOOL(2); - conn = persistent_conn; } else { conname = GET_STR(PG_GETARG_TEXT_P(0)); curname = GET_STR(PG_GETARG_TEXT_P(1)); howmany = PG_GETARG_INT32(2); - - rcon = getConnectionByName(conname); - if (rcon) - conn = rcon->con; } } ! else if (PG_NARGS() == 2) { /* text,int */ curname = GET_STR(PG_GETARG_TEXT_P(0)); howmany = PG_GETARG_INT32(1); - conn = persistent_conn; } ! if (!conn) DBLINK_CONN_NOT_AVAIL; /* create a function context for cross-call persistence */ --- 467,501 ---- curname = GET_STR(PG_GETARG_TEXT_P(1)); howmany = PG_GETARG_INT32(2); fail = PG_GETARG_BOOL(3); } else if (PG_NARGS() == 3) { /* text,text,int or text,int,bool */ if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID) { + conname = EMPTY_CONNECTION_NAME; curname = GET_STR(PG_GETARG_TEXT_P(0)); howmany = PG_GETARG_INT32(1); fail = PG_GETARG_BOOL(2); } else { conname = GET_STR(PG_GETARG_TEXT_P(0)); curname = GET_STR(PG_GETARG_TEXT_P(1)); howmany = PG_GETARG_INT32(2); } } ! else { /* text,int */ + Assert(PG_NARGS() == 2); + conname = EMPTY_CONNECTION_NAME; curname = GET_STR(PG_GETARG_TEXT_P(0)); howmany = PG_GETARG_INT32(1); } ! rconn = getConnectionByName(conname); ! if (!rconn || !rconn->conn) DBLINK_CONN_NOT_AVAIL; /* create a function context for cross-call persistence */ *************** *** 515,521 **** appendStringInfo(str, "FETCH %d FROM %s", howmany, curname); ! res = PQexec(conn, str->data); if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) --- 509,515 ---- appendStringInfo(str, "FETCH %d FROM %s", howmany, curname); ! res = PQexec(rconn->conn, str->data); if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) *************** *** 638,657 **** int max_calls; AttInMetadata *attinmeta; char *msg; ! PGresult *res = NULL; bool is_sql_cmd = false; char *sql_cmd_status = NULL; MemoryContext oldcontext; ! bool freeconn = false; /* stuff done only on the first call of the function */ if (SRF_IS_FIRSTCALL()) { ! PGconn *conn = NULL; ! char *connstr = NULL; char *sql = NULL; - char *conname = NULL; - remoteConn *rcon = NULL; bool fail = true; /* default to backward compatible */ /* create a function context for cross-call persistence */ --- 632,649 ---- int max_calls; AttInMetadata *attinmeta; char *msg; ! PGresult *res; bool is_sql_cmd = false; char *sql_cmd_status = NULL; MemoryContext oldcontext; ! bool rconn_is_local = false; /* stuff done only on the first call of the function */ if (SRF_IS_FIRSTCALL()) { ! char *conname; ! remoteConn *rconn; char *sql = NULL; bool fail = true; /* default to backward compatible */ /* create a function context for cross-call persistence */ *************** *** 666,704 **** if (PG_NARGS() == 3) { /* text,text,bool */ ! DBLINK_GET_CONN; sql = GET_STR(PG_GETARG_TEXT_P(1)); fail = PG_GETARG_BOOL(2); } else if (PG_NARGS() == 2) { /* text,text or text,bool */ if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID) { ! conn = persistent_conn; sql = GET_STR(PG_GETARG_TEXT_P(0)); fail = PG_GETARG_BOOL(1); } else { ! DBLINK_GET_CONN; sql = GET_STR(PG_GETARG_TEXT_P(1)); } } ! else if (PG_NARGS() == 1) { /* text */ ! conn = persistent_conn; sql = GET_STR(PG_GETARG_TEXT_P(0)); } - else - /* shouldn't happen */ - elog(ERROR, "wrong number of arguments"); ! if (!conn) DBLINK_CONN_NOT_AVAIL; ! res = PQexec(conn, sql); if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) --- 658,698 ---- if (PG_NARGS() == 3) { /* text,text,bool */ ! conname = GET_STR(PG_GETARG_TEXT_P(0)); sql = GET_STR(PG_GETARG_TEXT_P(1)); fail = PG_GETARG_BOOL(2); + DBLINK_GET_CONN; } else if (PG_NARGS() == 2) { /* text,text or text,bool */ if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID) { ! conname = EMPTY_CONNECTION_NAME; ! rconn = getConnectionByName(conname); sql = GET_STR(PG_GETARG_TEXT_P(0)); fail = PG_GETARG_BOOL(1); } else { ! conname = GET_STR(PG_GETARG_TEXT_P(0)); sql = GET_STR(PG_GETARG_TEXT_P(1)); + DBLINK_GET_CONN; } } ! else { /* text */ ! Assert(PG_NARGS() == 1); ! conname = EMPTY_CONNECTION_NAME; ! rconn = getConnectionByName(conname); sql = GET_STR(PG_GETARG_TEXT_P(0)); } ! if (!rconn || !rconn->conn) DBLINK_CONN_NOT_AVAIL; ! res = PQexec(rconn->conn, sql); if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) *************** *** 708,715 **** else { DBLINK_RES_ERROR_AS_NOTICE("sql error"); ! if (freeconn) ! PQfinish(conn); SRF_RETURN_DONE(funcctx); } } --- 702,712 ---- else { DBLINK_RES_ERROR_AS_NOTICE("sql error"); ! if (rconn_is_local) ! { ! PQfinish(rconn->conn); ! pfree(rconn); ! } SRF_RETURN_DONE(funcctx); } } *************** *** 736,744 **** /* got results, keep track of them */ funcctx->user_fctx = res; ! /* if needed, close the connection to the database and cleanup */ ! if (freeconn) ! PQfinish(conn); /* fast track when no results */ if (funcctx->max_calls < 1) --- 733,743 ---- /* got results, keep track of them */ funcctx->user_fctx = res; ! if (rconn_is_local) ! { ! PQfinish(rconn->conn); ! pfree(rconn); ! } /* fast track when no results */ if (funcctx->max_calls < 1) *************** *** 846,895 **** PGresult *res = NULL; text *sql_cmd_status = NULL; TupleDesc tupdesc = NULL; - PGconn *conn = NULL; - char *connstr = NULL; char *sql = NULL; char *conname = NULL; ! remoteConn *rcon = NULL; ! bool freeconn = false; bool fail = true; /* default to backward compatible behavior */ if (PG_NARGS() == 3) { /* must be text,text,bool */ ! DBLINK_GET_CONN; sql = GET_STR(PG_GETARG_TEXT_P(1)); fail = PG_GETARG_BOOL(2); } else if (PG_NARGS() == 2) { /* might be text,text or text,bool */ if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID) { ! conn = persistent_conn; sql = GET_STR(PG_GETARG_TEXT_P(0)); fail = PG_GETARG_BOOL(1); } else { ! DBLINK_GET_CONN; sql = GET_STR(PG_GETARG_TEXT_P(1)); } } ! else if (PG_NARGS() == 1) { /* must be single text argument */ ! conn = persistent_conn; sql = GET_STR(PG_GETARG_TEXT_P(0)); } - else - /* shouldn't happen */ - elog(ERROR, "wrong number of arguments"); ! if (!conn) DBLINK_CONN_NOT_AVAIL; ! res = PQexec(conn, sql); if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) --- 845,894 ---- PGresult *res = NULL; text *sql_cmd_status = NULL; TupleDesc tupdesc = NULL; char *sql = NULL; char *conname = NULL; ! remoteConn *rconn = NULL; ! bool rconn_is_local = false; bool fail = true; /* default to backward compatible behavior */ if (PG_NARGS() == 3) { /* must be text,text,bool */ ! conname = GET_STR(PG_GETARG_TEXT_P(0)); sql = GET_STR(PG_GETARG_TEXT_P(1)); fail = PG_GETARG_BOOL(2); + DBLINK_GET_CONN; } else if (PG_NARGS() == 2) { /* might be text,text or text,bool */ if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID) { ! conname = EMPTY_CONNECTION_NAME; ! rconn = getConnectionByName(conname); sql = GET_STR(PG_GETARG_TEXT_P(0)); fail = PG_GETARG_BOOL(1); } else { ! conname = GET_STR(PG_GETARG_TEXT_P(0)); sql = GET_STR(PG_GETARG_TEXT_P(1)); + DBLINK_GET_CONN; } } ! else { /* must be single text argument */ ! Assert(PG_NARGS() == 1); ! conname = EMPTY_CONNECTION_NAME; ! rconn = getConnectionByName(conname); sql = GET_STR(PG_GETARG_TEXT_P(0)); } ! if (!rconn || !rconn->conn) DBLINK_CONN_NOT_AVAIL; ! res = PQexec(rconn->conn, sql); if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) *************** *** 933,941 **** errmsg("statement returning results not allowed"))); } ! /* if needed, close the connection to the database and cleanup */ ! if (freeconn) ! PQfinish(conn); PG_RETURN_TEXT_P(sql_cmd_status); } --- 932,942 ---- errmsg("statement returning results not allowed"))); } ! if (rconn_is_local) ! { ! PQfinish(rconn->conn); ! pfree(rconn); ! } PG_RETURN_TEXT_P(sql_cmd_status); } *************** *** 1864,1870 **** char *relname; TupleDesc tupdesc; StringInfo str = makeStringInfo(); ! char *sql = NULL; int ret; HeapTuple tuple; int i; --- 1865,1871 ---- char *relname; TupleDesc tupdesc; StringInfo str = makeStringInfo(); ! char *sql; int ret; HeapTuple tuple; int i; *************** *** 2022,2028 **** key, HASH_FIND, NULL); if (hentry) ! return (hentry->rcon); return (NULL); } --- 2023,2029 ---- key, HASH_FIND, NULL); if (hentry) ! return (hentry->rconn); return (NULL); } *************** *** 2039,2045 **** } static void ! createNewConnection(const char *name, remoteConn * con) { remoteConnHashEnt *hentry; bool found; --- 2040,2046 ---- } static void ! createNewConnection(const char *name, remoteConn *rconn) { remoteConnHashEnt *hentry; bool found; *************** *** 2058,2064 **** (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("duplicate connection name"))); ! hentry->rcon = con; strncpy(hentry->name, name, NAMEDATALEN - 1); } --- 2059,2065 ---- (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("duplicate connection name"))); ! hentry->rconn = rconn; strncpy(hentry->name, name, NAMEDATALEN - 1); }