Re: contrib/dblink update

From: Bruce Momjian <pgman(at)candle(dot)pha(dot)pa(dot)us>
To: Joe Conway <mail(at)joeconway(dot)com>
Cc: pgsql-patches(at)postgresql(dot)org
Subject: Re: contrib/dblink update
Date: 2002-04-24 02:28:30
Message-ID: 200204240228.g3O2SUj22558@candle.pha.pa.us
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-patches


Patch applied. Thanks.

---------------------------------------------------------------------------

Joe Conway wrote:
> Attached is an update to contrib/dblink. Please apply if there are no
> objections.
>
> Major changes:
> - removed cursor wrap around input sql to allow for remote
> execution of INSERT/UPDATE/DELETE
> - dblink now returns a resource id instead of a real pointer
> - added several utility functions
>
> I'm still hoping to add explicit cursor open/fetch/close support before
> 7.3 is released, but I need a bit more time on that.
>
> On a somewhat unrelated topic, I never got any feedback on the
> unknownin/out patch and the mb_substring patch. Is there anything else I
> need to do to get those applied?
>
> Thanks,
>
> Joe

> diff -cNr dblink.orig/README.dblink dblink/README.dblink
> *** dblink.orig/README.dblink Thu Dec 13 02:48:39 2001
> --- dblink/README.dblink Sun Apr 14 20:02:06 2002
> ***************
> *** 3,9 ****
> *
> * Functions returning results from a remote database
> *
> ! * Copyright (c) Joseph Conway <joe(dot)conway(at)mail(dot)com>, 2001;
> *
> * Permission to use, copy, modify, and distribute this software and its
> * documentation for any purpose, without fee, and without a written agreement
> --- 3,10 ----
> *
> * Functions returning results from a remote database
> *
> ! * Copyright (c) Joseph Conway <mail(at)joeconway(dot)com>, 2001, 2002,
> ! * ALL RIGHTS RESERVED;
> *
> * Permission to use, copy, modify, and distribute this software and its
> * documentation for any purpose, without fee, and without a written agreement
> ***************
> *** 25,36 ****
> */
>
>
> ! Version 0.3 (14 June, 2001):
> ! Function to test returning data set from remote database
> ! Tested under Linux (Red Hat 6.2 and 7.0) and PostgreSQL 7.1 and 7.2devel
>
> Release Notes:
>
> Version 0.3
> - fixed dblink invalid pointer causing corrupt elog message
> - fixed dblink_tok improper handling of null results
> --- 26,44 ----
> */
>
>
> ! Version 0.4 (7 April, 2002):
> ! Functions allowing remote database INSERT/UPDATE/DELETE/SELECT, and
> ! various utility functions.
> ! Tested under Linux (Red Hat 7.2) and PostgreSQL 7.2 and 7.3devel
>
> Release Notes:
>
> + Version 0.4
> + - removed cursor wrap around input sql to allow for remote
> + execution of INSERT/UPDATE/DELETE
> + - dblink now returns a resource id instead of a real pointer
> + - added several utility functions -- see below
> +
> Version 0.3
> - fixed dblink invalid pointer causing corrupt elog message
> - fixed dblink_tok improper handling of null results
> ***************
> *** 51,64 ****
>
> installs following functions into database template1:
>
> ! dblink() - returns a pointer to results from remote query
> ! dblink_tok() - extracts and returns individual field results
>
> Documentation
> ==================================================================
> Name
>
> ! dblink -- Returns a pointer to a data set from a remote database
>
> Synopsis
>
> --- 59,94 ----
>
> installs following functions into database template1:
>
> ! dblink(text,text) RETURNS setof int
> ! - returns a resource id for results from remote query
> ! dblink_tok(int,int) RETURNS text
> ! - extracts and returns individual field results
> ! dblink_strtok(text,text,int) RETURNS text
> ! - extracts and returns individual token from delimited text
> ! dblink_get_pkey(name) RETURNS setof text
> ! - returns the field names of a relation's primary key fields
> ! dblink_last_oid(int) RETURNS oid
> ! - returns the last inserted oid
> ! dblink_build_sql_insert(name,int2vector,int2,_text,_text) RETURNS text
> ! - builds an insert statement using a local tuple, replacing the
> ! selection key field values with alternate supplied values
> ! dblink_build_sql_delete(name,int2vector,int2,_text) RETURNS text
> ! - builds a delete statement using supplied values for selection
> ! key field values
> ! dblink_build_sql_update(name,int2vector,int2,_text,_text) RETURNS text
> ! - builds an update statement using a local tuple, replacing the
> ! selection key field values with alternate supplied values
> ! dblink_current_query() RETURNS text
> ! - returns the current query string
> ! dblink_replace(text,text,text) RETURNS text
> ! - replace all occurences of substring-a in the input-string
> ! with substring-b
>
> Documentation
> ==================================================================
> Name
>
> ! dblink -- Returns a resource id for a data set from a remote database
>
> Synopsis
>
> ***************
> *** 78,84 ****
>
> Outputs
>
> ! Returns setof int (pointer)
>
> Example usage
>
> --- 108,114 ----
>
> Outputs
>
> ! Returns setof int (res_id)
>
> Example usage
>
> ***************
> *** 94,106 ****
>
> Synopsis
>
> ! dblink_tok(int pointer, int fnumber)
>
> Inputs
>
> ! pointer
>
> ! a pointer returned by a call to dblink()
>
> fnumber
>
> --- 124,136 ----
>
> Synopsis
>
> ! dblink_tok(int res_id, int fnumber)
>
> Inputs
>
> ! res_id
>
> ! a resource id returned by a call to dblink()
>
> fnumber
>
> ***************
> *** 131,136 ****
> --- 161,415 ----
> select f1, f2 from myremotetable where f1 like 'bytea%';
>
> ==================================================================
> + Name
> +
> + dblink_strtok -- Extracts and returns individual token from delimited text
> +
> + Synopsis
> +
> + dblink_strtok(text inputstring, text delimiter, int posn) RETURNS text
> +
> + Inputs
> +
> + inputstring
> +
> + any string you want to parse a token out of;
> + e.g. 'f=1&g=3&h=4'
> +
> + delimiter
> +
> + a single character to use as the delimiter;
> + e.g. '&' or '='
> +
> + posn
> +
> + the position of the token of interest, 0 based;
> + e.g. 1
> +
> + Outputs
> +
> + Returns text
> +
> + Example usage
> +
> + test=# select dblink_strtok(dblink_strtok('f=1&g=3&h=4','&',1),'=',1);
> + dblink_strtok
> + ---------------
> + 3
> + (1 row)
> +
> + ==================================================================
> + Name
> +
> + dblink_get_pkey -- returns the field names of a relation's primary
> + key fields
> +
> + Synopsis
> +
> + dblink_get_pkey(name relname) RETURNS setof text
> +
> + Inputs
> +
> + relname
> +
> + any relation name;
> + e.g. 'foobar'
> +
> + Outputs
> +
> + Returns setof text -- one row for each primary key field, in order of
> + precedence
> +
> + Example usage
> +
> + test=# select dblink_get_pkey('foobar');
> + dblink_get_pkey
> + -----------------
> + f1
> + f2
> + f3
> + f4
> + f5
> + (5 rows)
> +
> +
> + ==================================================================
> + Name
> +
> + dblink_last_oid -- Returns last inserted oid
> +
> + Synopsis
> +
> + dblink_last_oid(int res_id) RETURNS oid
> +
> + Inputs
> +
> + res_id
> +
> + any resource id returned by dblink function;
> +
> + Outputs
> +
> + Returns oid of last inserted tuple
> +
> + Example usage
> +
> + test=# select dblink_last_oid(dblink('hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd'
> + ,'insert into mytable (f1, f2) values (1,2)'));
> +
> + dblink_last_oid
> + ----------------
> + 16553
> + (1 row)
> +
> +
> + ==================================================================
> + Name
> +
> + dblink_build_sql_insert -- builds an insert statement using a local
> + tuple, replacing the selection key field
> + values with alternate supplied values
> + dblink_build_sql_delete -- builds a delete statement using supplied
> + values for selection key field values
> + dblink_build_sql_update -- builds an update statement using a local
> + tuple, replacing the selection key field
> + values with alternate supplied values
> +
> +
> + Synopsis
> +
> + dblink_build_sql_insert(name relname
> + ,int2vector primary_key_attnums
> + ,int2 num_primary_key_atts
> + ,_text src_pk_att_vals_array
> + ,_text tgt_pk_att_vals_array) RETURNS text
> + dblink_build_sql_delete(name relname
> + ,int2vector primary_key_attnums
> + ,int2 num_primary_key_atts
> + ,_text tgt_pk_att_vals_array) RETURNS text
> + dblink_build_sql_update(name relname
> + ,int2vector primary_key_attnums
> + ,int2 num_primary_key_atts
> + ,_text src_pk_att_vals_array
> + ,_text tgt_pk_att_vals_array) RETURNS text
> +
> + Inputs
> +
> + relname
> +
> + any relation name;
> + e.g. 'foobar'
> +
> + primary_key_attnums
> +
> + vector of primary key attnums (1 based, see pg_index.indkey);
> + e.g. '1 2'
> +
> + num_primary_key_atts
> +
> + number of primary key attnums in the vector; e.g. 2
> +
> + src_pk_att_vals_array
> +
> + array of primary key values, used to look up the local matching
> + tuple, the values of which are then used to construct the SQL
> + statement
> +
> + tgt_pk_att_vals_array
> +
> + array of primary key values, used to replace the local tuple
> + values in the SQL statement
> +
> + Outputs
> +
> + Returns text -- requested SQL statement
> +
> + Example usage
> +
> + test=# select dblink_build_sql_insert('foo','1 2',2,'{"1", "a"}','{"1", "b''a"}');
> + dblink_build_sql_insert
> + --------------------------------------------------
> + INSERT INTO foo(f1,f2,f3) VALUES('1','b''a','1')
> + (1 row)
> +
> + test=# select dblink_build_sql_delete('MyFoo','1 2',2,'{"1", "b"}');
> + dblink_build_sql_delete
> + ---------------------------------------------
> + DELETE FROM "MyFoo" WHERE f1='1' AND f2='b'
> + (1 row)
> +
> + test=# select dblink_build_sql_update('foo','1 2',2,'{"1", "a"}','{"1", "b"}');
> + dblink_build_sql_update
> + -------------------------------------------------------------
> + UPDATE foo SET f1='1',f2='b',f3='1' WHERE f1='1' AND f2='b'
> + (1 row)
> +
> +
> + ==================================================================
> + Name
> +
> + dblink_current_query -- returns the current query string
> +
> + Synopsis
> +
> + dblink_current_query () RETURNS text
> +
> + Inputs
> +
> + None
> +
> + Outputs
> +
> + Returns text -- a copy of the currently executing query
> +
> + Example usage
> +
> + test=# select dblink_current_query() from (select dblink('dbname=template1','select oid, proname from pg_proc where proname = ''byteacat''') as f1) as t1;
> + dblink_current_query
> + -----------------------------------------------------------------------------------------------------------------------------------------------------
> + select dblink_current_query() from (select dblink('dbname=template1','select oid, proname from pg_proc where proname = ''byteacat''') as f1) as t1;
> + (1 row)
> +
> +
> + ==================================================================
> + Name
> +
> + dblink_replace -- replace all occurences of substring-a in the
> + input-string with substring-b
> +
> + Synopsis
> +
> + dblink_replace(text input-string, text substring-a, text substring-b) RETURNS text
> +
> + Inputs
> +
> + input-string
> +
> + the starting string, before replacement of substring-a
> +
> + substring-a
> +
> + the substring to find and replace
> +
> + substring-b
> +
> + the substring to be substituted in place of substring-a
> +
> + Outputs
> +
> + Returns text -- a copy of the starting string, but with all occurences of
> + substring-a replaced with substring-b
> +
> + Example usage
> +
> + test=# select dblink_replace('12345678901234567890','56','hello');
> + dblink_replace
> + ----------------------------
> + 1234hello78901234hello7890
> + (1 row)
> +
> + ==================================================================
> +
>
> -- Joe Conway
>
> diff -cNr dblink.orig/dblink.c dblink/dblink.c
> *** dblink.orig/dblink.c Wed Oct 24 22:49:19 2001
> --- dblink/dblink.c Sun Apr 14 20:03:30 2002
> ***************
> *** 3,9 ****
> *
> * Functions returning results from a remote database
> *
> ! * Copyright (c) Joseph Conway <joe(dot)conway(at)mail(dot)com>, 2001;
> *
> * Permission to use, copy, modify, and distribute this software and its
> * documentation for any purpose, without fee, and without a written agreement
> --- 3,10 ----
> *
> * Functions returning results from a remote database
> *
> ! * Copyright (c) Joseph Conway <mail(at)joeconway(dot)com>, 2001, 2002,
> ! * ALL RIGHTS RESERVED;
> *
> * Permission to use, copy, modify, and distribute this software and its
> * documentation for any purpose, without fee, and without a written agreement
> ***************
> *** 26,48 ****
>
> #include "dblink.h"
>
> PG_FUNCTION_INFO_V1(dblink);
> Datum
> dblink(PG_FUNCTION_ARGS)
> {
> ! PGconn *conn = NULL;
> ! PGresult *res = NULL;
> ! dblink_results *results;
> ! char *optstr;
> ! char *sqlstatement;
> ! char *curstr = "DECLARE mycursor CURSOR FOR ";
> ! char *execstatement;
> ! char *msg;
> ! int ntuples = 0;
> ! ReturnSetInfo *rsi;
> !
> ! if (PG_ARGISNULL(0) || PG_ARGISNULL(1))
> ! elog(ERROR, "dblink: NULL arguments are not permitted");
>
> if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo))
> elog(ERROR, "dblink: function called in context that does not accept a set result");
> --- 27,49 ----
>
> #include "dblink.h"
>
> + /* Global */
> + List *res_id = NIL;
> + int res_id_index = 0;
> +
> PG_FUNCTION_INFO_V1(dblink);
> Datum
> dblink(PG_FUNCTION_ARGS)
> {
> ! PGconn *conn = NULL;
> ! PGresult *res = NULL;
> ! dblink_results *results;
> ! char *optstr;
> ! char *sqlstatement;
> ! char *execstatement;
> ! char *msg;
> ! int ntuples = 0;
> ! ReturnSetInfo *rsi;
>
> if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo))
> elog(ERROR, "dblink: function called in context that does not accept a set result");
> ***************
> *** 61,81 ****
> elog(ERROR, "dblink: connection error: %s", msg);
> }
>
> ! res = PQexec(conn, "BEGIN");
> ! if (PQresultStatus(res) != PGRES_COMMAND_OK)
> ! {
> ! msg = pstrdup(PQerrorMessage(conn));
> ! PQclear(res);
> ! PQfinish(conn);
> ! elog(ERROR, "dblink: begin error: %s", msg);
> ! }
> ! PQclear(res);
> !
> ! execstatement = (char *) palloc(strlen(curstr) + strlen(sqlstatement) + 1);
> if (execstatement != NULL)
> {
> ! strcpy(execstatement, curstr);
> ! strcat(execstatement, sqlstatement);
> strcat(execstatement, "\0");
> }
> else
> --- 62,71 ----
> elog(ERROR, "dblink: connection error: %s", msg);
> }
>
> ! execstatement = (char *) palloc(strlen(sqlstatement) + 1);
> if (execstatement != NULL)
> {
> ! strcpy(execstatement, sqlstatement);
> strcat(execstatement, "\0");
> }
> else
> ***************
> *** 94,163 ****
> /*
> * got results, start fetching them
> */
> - PQclear(res);
> -
> - res = PQexec(conn, "FETCH ALL in mycursor");
> - if (!res || PQresultStatus(res) != PGRES_TUPLES_OK)
> - {
> - msg = pstrdup(PQerrorMessage(conn));
> - PQclear(res);
> - PQfinish(conn);
> - elog(ERROR, "dblink: sql error: %s", msg);
> - }
> -
> ntuples = PQntuples(res);
>
> ! if (ntuples > 0)
> ! {
> !
> ! results = init_dblink_results(fcinfo->flinfo->fn_mcxt);
> ! results->tup_num = 0;
> ! results->res = res;
> ! res = NULL;
> !
> ! fcinfo->flinfo->fn_extra = (void *) results;
> !
> ! results = NULL;
> ! results = fcinfo->flinfo->fn_extra;
> !
> ! /* close the cursor */
> ! res = PQexec(conn, "CLOSE mycursor");
> ! PQclear(res);
> !
> ! /* commit the transaction */
> ! res = PQexec(conn, "COMMIT");
> ! PQclear(res);
> !
> ! /* close the connection to the database and cleanup */
> ! PQfinish(conn);
> !
> ! rsi = (ReturnSetInfo *) fcinfo->resultinfo;
> ! rsi->isDone = ExprMultipleResult;
> !
> ! PG_RETURN_POINTER(results);
> !
> ! }
> ! else
> ! {
>
> ! PQclear(res);
>
> ! /* close the cursor */
> ! res = PQexec(conn, "CLOSE mycursor");
> ! PQclear(res);
>
> ! /* commit the transaction */
> ! res = PQexec(conn, "COMMIT");
> ! PQclear(res);
>
> ! /* close the connection to the database and cleanup */
> ! PQfinish(conn);
>
> ! rsi = (ReturnSetInfo *) fcinfo->resultinfo;
> ! rsi->isDone = ExprEndResult;
>
> ! PG_RETURN_NULL();
> ! }
> }
> }
> else
> --- 84,119 ----
> /*
> * got results, start fetching them
> */
> ntuples = PQntuples(res);
>
> ! /*
> ! * increment resource index
> ! */
> ! res_id_index++;
>
> ! results = init_dblink_results(fcinfo->flinfo->fn_mcxt);
> ! results->tup_num = 0;
> ! results->res_id_index = res_id_index;
> ! results->res = res;
>
> ! /*
> ! * Append node to res_id to hold pointer to results.
> ! * Needed by dblink_tok to access the data
> ! */
> ! append_res_ptr(results);
>
> ! /*
> ! * save pointer to results for the next function manager call
> ! */
> ! fcinfo->flinfo->fn_extra = (void *) results;
>
> ! /* close the connection to the database and cleanup */
> ! PQfinish(conn);
>
> ! rsi = (ReturnSetInfo *) fcinfo->resultinfo;
> ! rsi->isDone = ExprMultipleResult;
>
> ! PG_RETURN_INT32(res_id_index);
> }
> }
> else
> ***************
> *** 165,173 ****
> /*
> * check for more results
> */
> -
> results = fcinfo->flinfo->fn_extra;
> results->tup_num++;
> ntuples = PQntuples(results->res);
>
> if (results->tup_num < ntuples)
> --- 121,130 ----
> /*
> * check for more results
> */
> results = fcinfo->flinfo->fn_extra;
> +
> results->tup_num++;
> + res_id_index = results->res_id_index;
> ntuples = PQntuples(results->res);
>
> if (results->tup_num < ntuples)
> ***************
> *** 179,196 ****
> rsi = (ReturnSetInfo *) fcinfo->resultinfo;
> rsi->isDone = ExprMultipleResult;
>
> ! PG_RETURN_POINTER(results);
> !
> }
> else
> {
> /*
> * or if no more, clean things up
> */
> -
> results = fcinfo->flinfo->fn_extra;
>
> PQclear(results->res);
>
> rsi = (ReturnSetInfo *) fcinfo->resultinfo;
> rsi->isDone = ExprEndResult;
> --- 136,154 ----
> rsi = (ReturnSetInfo *) fcinfo->resultinfo;
> rsi->isDone = ExprMultipleResult;
>
> ! PG_RETURN_INT32(res_id_index);
> }
> else
> {
> /*
> * or if no more, clean things up
> */
> results = fcinfo->flinfo->fn_extra;
>
> + remove_res_ptr(results);
> PQclear(results->res);
> + pfree(results);
> + fcinfo->flinfo->fn_extra = NULL;
>
> rsi = (ReturnSetInfo *) fcinfo->resultinfo;
> rsi->isDone = ExprEndResult;
> ***************
> *** 214,249 ****
> dblink_tok(PG_FUNCTION_ARGS)
> {
> dblink_results *results;
> ! int fldnum;
> ! text *result_text;
> ! char *result;
> ! int nfields = 0;
> ! int text_len = 0;
> !
> ! if (PG_ARGISNULL(0) || PG_ARGISNULL(1))
> ! elog(ERROR, "dblink: NULL arguments are not permitted");
>
> ! results = (dblink_results *) PG_GETARG_POINTER(0);
> if (results == NULL)
> ! elog(ERROR, "dblink: function called with invalid result pointer");
>
> fldnum = PG_GETARG_INT32(1);
> if (fldnum < 0)
> ! elog(ERROR, "dblink: field number < 0 not permitted");
>
> nfields = PQnfields(results->res);
> if (fldnum > (nfields - 1))
> ! elog(ERROR, "dblink: field number %d does not exist", fldnum);
>
> if (PQgetisnull(results->res, results->tup_num, fldnum) == 1)
> - {
> -
> PG_RETURN_NULL();
> -
> - }
> else
> {
> -
> text_len = PQgetlength(results->res, results->tup_num, fldnum);
>
> result = (char *) palloc(text_len + 1);
> --- 172,208 ----
> dblink_tok(PG_FUNCTION_ARGS)
> {
> dblink_results *results;
> ! int fldnum;
> ! text *result_text;
> ! char *result;
> ! int nfields = 0;
> ! int text_len = 0;
>
> ! results = get_res_ptr(PG_GETARG_INT32(0));
> if (results == NULL)
> ! {
> ! if (res_id != NIL)
> ! {
> ! freeList(res_id);
> ! res_id = NIL;
> ! res_id_index = 0;
> ! }
> !
> ! elog(ERROR, "dblink_tok: function called with invalid resource id");
> ! }
>
> fldnum = PG_GETARG_INT32(1);
> if (fldnum < 0)
> ! elog(ERROR, "dblink_tok: field number < 0 not permitted");
>
> nfields = PQnfields(results->res);
> if (fldnum > (nfields - 1))
> ! elog(ERROR, "dblink_tok: field number %d does not exist", fldnum);
>
> if (PQgetisnull(results->res, results->tup_num, fldnum) == 1)
> PG_RETURN_NULL();
> else
> {
> text_len = PQgetlength(results->res, results->tup_num, fldnum);
>
> result = (char *) palloc(text_len + 1);
> ***************
> *** 259,270 ****
> --- 218,838 ----
> result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result)));
>
> PG_RETURN_TEXT_P(result_text);
> + }
> + }
> +
> +
> + /*
> + * dblink_strtok
> + * parse input string
> + * return ord item (0 based)
> + * based on provided field separator
> + */
> + PG_FUNCTION_INFO_V1(dblink_strtok);
> + Datum
> + dblink_strtok(PG_FUNCTION_ARGS)
> + {
> + char *fldtext;
> + char *fldsep;
> + int fldnum;
> + char *buffer;
> + text *result_text;
> +
> + fldtext = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(0))));
> + fldsep = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1))));
> + fldnum = PG_GETARG_INT32(2);
> +
> + if (fldtext[0] == '\0')
> + {
> + elog(ERROR, "get_strtok: blank list not permitted");
> + }
> + if (fldsep[0] == '\0')
> + {
> + elog(ERROR, "get_strtok: blank field separator not permitted");
> + }
>
> + buffer = get_strtok(fldtext, fldsep, fldnum);
> +
> + pfree(fldtext);
> + pfree(fldsep);
> +
> + if (buffer == NULL)
> + {
> + PG_RETURN_NULL();
> + }
> + else
> + {
> + result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(buffer)));
> + pfree(buffer);
> +
> + PG_RETURN_TEXT_P(result_text);
> }
> }
>
>
> /*
> + * dblink_get_pkey
> + *
> + * Return comma delimited list of primary key
> + * fields for the supplied relation,
> + * or NULL if none exists.
> + */
> + PG_FUNCTION_INFO_V1(dblink_get_pkey);
> + Datum
> + dblink_get_pkey(PG_FUNCTION_ARGS)
> + {
> + char *relname;
> + Oid relid;
> + char **result;
> + text *result_text;
> + int16 numatts;
> + ReturnSetInfo *rsi;
> + dblink_array_results *ret_set;
> +
> + if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo))
> + elog(ERROR, "dblink: function called in context that does not accept a set result");
> +
> + if (fcinfo->flinfo->fn_extra == NULL)
> + {
> + relname = NameStr(*PG_GETARG_NAME(0));
> +
> + /*
> + * Convert relname to rel OID.
> + */
> + relid = get_relid_from_relname(relname);
> + if (!OidIsValid(relid))
> + elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist",
> + relname);
> +
> + /*
> + * get an array of attnums.
> + */
> + result = get_pkey_attnames(relid, &numatts);
> +
> + if ((result != NULL) && (numatts > 0))
> + {
> + ret_set = init_dblink_array_results(fcinfo->flinfo->fn_mcxt);
> +
> + ret_set->elem_num = 0;
> + ret_set->num_elems = numatts;
> + ret_set->res = result;
> +
> + fcinfo->flinfo->fn_extra = (void *) ret_set;
> +
> + rsi = (ReturnSetInfo *) fcinfo->resultinfo;
> + rsi->isDone = ExprMultipleResult;
> +
> + result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result[ret_set->elem_num])));
> +
> + PG_RETURN_TEXT_P(result_text);
> + }
> + else
> + {
> + rsi = (ReturnSetInfo *) fcinfo->resultinfo;
> + rsi->isDone = ExprEndResult;
> +
> + PG_RETURN_NULL();
> + }
> + }
> + else
> + {
> + /*
> + * check for more results
> + */
> + ret_set = fcinfo->flinfo->fn_extra;
> + ret_set->elem_num++;
> + result = ret_set->res;
> +
> + if (ret_set->elem_num < ret_set->num_elems)
> + {
> + /*
> + * fetch next one
> + */
> + rsi = (ReturnSetInfo *) fcinfo->resultinfo;
> + rsi->isDone = ExprMultipleResult;
> +
> + result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result[ret_set->elem_num])));
> + PG_RETURN_TEXT_P(result_text);
> + }
> + else
> + {
> + int i;
> +
> + /*
> + * or if no more, clean things up
> + */
> + for (i = 0; i < ret_set->num_elems; i++)
> + pfree(result[i]);
> +
> + pfree(ret_set->res);
> + pfree(ret_set);
> +
> + rsi = (ReturnSetInfo *) fcinfo->resultinfo;
> + rsi->isDone = ExprEndResult;
> +
> + PG_RETURN_NULL();
> + }
> + }
> + PG_RETURN_NULL();
> + }
> +
> +
> + /*
> + * dblink_last_oid
> + * return last inserted oid
> + */
> + PG_FUNCTION_INFO_V1(dblink_last_oid);
> + Datum
> + dblink_last_oid(PG_FUNCTION_ARGS)
> + {
> + dblink_results *results;
> +
> + results = get_res_ptr(PG_GETARG_INT32(0));
> + if (results == NULL)
> + {
> + if (res_id != NIL)
> + {
> + freeList(res_id);
> + res_id = NIL;
> + res_id_index = 0;
> + }
> +
> + elog(ERROR, "dblink_tok: function called with invalid resource id");
> + }
> +
> + PG_RETURN_OID(PQoidValue(results->res));
> + }
> +
> +
> + /*
> + * dblink_build_sql_insert
> + *
> + * Used to generate an SQL insert statement
> + * based on an existing tuple in a local relation.
> + * This is useful for selectively replicating data
> + * to another server via dblink.
> + *
> + * API:
> + * <relname> - name of local table of interest
> + * <pkattnums> - an int2vector of attnums which will be used
> + * to identify the local tuple of interest
> + * <pknumatts> - number of attnums in pkattnums
> + * <src_pkattvals_arry> - text array of key values which will be used
> + * to identify the local tuple of interest
> + * <tgt_pkattvals_arry> - text array of key values which will be used
> + * to build the string for execution remotely. These are substituted
> + * for their counterparts in src_pkattvals_arry
> + */
> + PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
> + Datum
> + dblink_build_sql_insert(PG_FUNCTION_ARGS)
> + {
> + Oid relid;
> + char *relname;
> + int16 *pkattnums;
> + int16 pknumatts;
> + char **src_pkattvals;
> + char **tgt_pkattvals;
> + ArrayType *src_pkattvals_arry;
> + ArrayType *tgt_pkattvals_arry;
> + int src_ndim;
> + int *src_dim;
> + int src_nitems;
> + int tgt_ndim;
> + int *tgt_dim;
> + int tgt_nitems;
> + int i;
> + char *ptr;
> + char *sql;
> + text *sql_text;
> +
> + relname = NameStr(*PG_GETARG_NAME(0));
> +
> + /*
> + * Convert relname to rel OID.
> + */
> + relid = get_relid_from_relname(relname);
> + if (!OidIsValid(relid))
> + elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist",
> + relname);
> +
> + pkattnums = (int16 *) PG_GETARG_POINTER(1);
> + pknumatts = PG_GETARG_INT16(2);
> + /*
> + * There should be at least one key attribute
> + */
> + if (pknumatts == 0)
> + elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0.");
> +
> + src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
> + tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
> +
> + /*
> + * Source array is made up of key values that will be used to
> + * locate the tuple of interest from the local system.
> + */
> + src_ndim = ARR_NDIM(src_pkattvals_arry);
> + src_dim = ARR_DIMS(src_pkattvals_arry);
> + src_nitems = ArrayGetNItems(src_ndim, src_dim);
> +
> + /*
> + * There should be one source array key value for each key attnum
> + */
> + if (src_nitems != pknumatts)
> + elog(ERROR, "dblink_build_sql_insert: source key array length does not match number of key attributes.");
> +
> + /*
> + * get array of pointers to c-strings from the input source array
> + */
> + src_pkattvals = (char **) palloc(src_nitems * sizeof(char *));
> + ptr = ARR_DATA_PTR(src_pkattvals_arry);
> + for (i = 0; i < src_nitems; i++)
> + {
> + src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
> + ptr += INTALIGN(*(int32 *) ptr);
> + }
> +
> + /*
> + * Target array is made up of key values that will be used to
> + * build the SQL string for use on the remote system.
> + */
> + tgt_ndim = ARR_NDIM(tgt_pkattvals_arry);
> + tgt_dim = ARR_DIMS(tgt_pkattvals_arry);
> + tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim);
> +
> + /*
> + * There should be one target array key value for each key attnum
> + */
> + if (tgt_nitems != pknumatts)
> + elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes.");
> +
> + /*
> + * get array of pointers to c-strings from the input target array
> + */
> + tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
> + ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
> + for (i = 0; i < tgt_nitems; i++)
> + {
> + tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
> + ptr += INTALIGN(*(int32 *) ptr);
> + }
> +
> + /*
> + * Prep work is finally done. Go get the SQL string.
> + */
> + sql = get_sql_insert(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
> +
> + /*
> + * Make it into TEXT for return to the client
> + */
> + sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql)));
> +
> + /*
> + * And send it
> + */
> + PG_RETURN_TEXT_P(sql_text);
> + }
> +
> +
> + /*
> + * dblink_build_sql_delete
> + *
> + * Used to generate an SQL delete statement.
> + * This is useful for selectively replicating a
> + * delete to another server via dblink.
> + *
> + * API:
> + * <relname> - name of remote table of interest
> + * <pkattnums> - an int2vector of attnums which will be used
> + * to identify the remote tuple of interest
> + * <pknumatts> - number of attnums in pkattnums
> + * <tgt_pkattvals_arry> - text array of key values which will be used
> + * to build the string for execution remotely.
> + */
> + PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
> + Datum
> + dblink_build_sql_delete(PG_FUNCTION_ARGS)
> + {
> + Oid relid;
> + char *relname;
> + int16 *pkattnums;
> + int16 pknumatts;
> + char **tgt_pkattvals;
> + ArrayType *tgt_pkattvals_arry;
> + int tgt_ndim;
> + int *tgt_dim;
> + int tgt_nitems;
> + int i;
> + char *ptr;
> + char *sql;
> + text *sql_text;
> +
> + relname = NameStr(*PG_GETARG_NAME(0));
> +
> + /*
> + * Convert relname to rel OID.
> + */
> + relid = get_relid_from_relname(relname);
> + if (!OidIsValid(relid))
> + elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist",
> + relname);
> +
> + pkattnums = (int16 *) PG_GETARG_POINTER(1);
> + pknumatts = PG_GETARG_INT16(2);
> + /*
> + * There should be at least one key attribute
> + */
> + if (pknumatts == 0)
> + elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0.");
> +
> + tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
> +
> + /*
> + * Target array is made up of key values that will be used to
> + * build the SQL string for use on the remote system.
> + */
> + tgt_ndim = ARR_NDIM(tgt_pkattvals_arry);
> + tgt_dim = ARR_DIMS(tgt_pkattvals_arry);
> + tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim);
> +
> + /*
> + * There should be one target array key value for each key attnum
> + */
> + if (tgt_nitems != pknumatts)
> + elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes.");
> +
> + /*
> + * get array of pointers to c-strings from the input target array
> + */
> + tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
> + ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
> + for (i = 0; i < tgt_nitems; i++)
> + {
> + tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
> + ptr += INTALIGN(*(int32 *) ptr);
> + }
> +
> + /*
> + * Prep work is finally done. Go get the SQL string.
> + */
> + sql = get_sql_delete(relid, pkattnums, pknumatts, tgt_pkattvals);
> +
> + /*
> + * Make it into TEXT for return to the client
> + */
> + sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql)));
> +
> + /*
> + * And send it
> + */
> + PG_RETURN_TEXT_P(sql_text);
> + }
> +
> +
> + /*
> + * dblink_build_sql_update
> + *
> + * Used to generate an SQL update statement
> + * based on an existing tuple in a local relation.
> + * This is useful for selectively replicating data
> + * to another server via dblink.
> + *
> + * API:
> + * <relname> - name of local table of interest
> + * <pkattnums> - an int2vector of attnums which will be used
> + * to identify the local tuple of interest
> + * <pknumatts> - number of attnums in pkattnums
> + * <src_pkattvals_arry> - text array of key values which will be used
> + * to identify the local tuple of interest
> + * <tgt_pkattvals_arry> - text array of key values which will be used
> + * to build the string for execution remotely. These are substituted
> + * for their counterparts in src_pkattvals_arry
> + */
> + PG_FUNCTION_INFO_V1(dblink_build_sql_update);
> + Datum
> + dblink_build_sql_update(PG_FUNCTION_ARGS)
> + {
> + Oid relid;
> + char *relname;
> + int16 *pkattnums;
> + int16 pknumatts;
> + char **src_pkattvals;
> + char **tgt_pkattvals;
> + ArrayType *src_pkattvals_arry;
> + ArrayType *tgt_pkattvals_arry;
> + int src_ndim;
> + int *src_dim;
> + int src_nitems;
> + int tgt_ndim;
> + int *tgt_dim;
> + int tgt_nitems;
> + int i;
> + char *ptr;
> + char *sql;
> + text *sql_text;
> +
> + relname = NameStr(*PG_GETARG_NAME(0));
> +
> + /*
> + * Convert relname to rel OID.
> + */
> + relid = get_relid_from_relname(relname);
> + if (!OidIsValid(relid))
> + elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist",
> + relname);
> +
> + pkattnums = (int16 *) PG_GETARG_POINTER(1);
> + pknumatts = PG_GETARG_INT16(2);
> + /*
> + * There should be one source array key values for each key attnum
> + */
> + if (pknumatts == 0)
> + elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0.");
> +
> + src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
> + tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
> +
> + /*
> + * Source array is made up of key values that will be used to
> + * locate the tuple of interest from the local system.
> + */
> + src_ndim = ARR_NDIM(src_pkattvals_arry);
> + src_dim = ARR_DIMS(src_pkattvals_arry);
> + src_nitems = ArrayGetNItems(src_ndim, src_dim);
> +
> + /*
> + * There should be one source array key value for each key attnum
> + */
> + if (src_nitems != pknumatts)
> + elog(ERROR, "dblink_build_sql_insert: source key array length does not match number of key attributes.");
> +
> + /*
> + * get array of pointers to c-strings from the input source array
> + */
> + src_pkattvals = (char **) palloc(src_nitems * sizeof(char *));
> + ptr = ARR_DATA_PTR(src_pkattvals_arry);
> + for (i = 0; i < src_nitems; i++)
> + {
> + src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
> + ptr += INTALIGN(*(int32 *) ptr);
> + }
> +
> + /*
> + * Target array is made up of key values that will be used to
> + * build the SQL string for use on the remote system.
> + */
> + tgt_ndim = ARR_NDIM(tgt_pkattvals_arry);
> + tgt_dim = ARR_DIMS(tgt_pkattvals_arry);
> + tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim);
> +
> + /*
> + * There should be one target array key value for each key attnum
> + */
> + if (tgt_nitems != pknumatts)
> + elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes.");
> +
> + /*
> + * get array of pointers to c-strings from the input target array
> + */
> + tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
> + ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
> + for (i = 0; i < tgt_nitems; i++)
> + {
> + tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
> + ptr += INTALIGN(*(int32 *) ptr);
> + }
> +
> + /*
> + * Prep work is finally done. Go get the SQL string.
> + */
> + sql = get_sql_update(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
> +
> + /*
> + * Make it into TEXT for return to the client
> + */
> + sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql)));
> +
> + /*
> + * And send it
> + */
> + PG_RETURN_TEXT_P(sql_text);
> + }
> +
> +
> + /*
> + * dblink_current_query
> + * return the current query string
> + * to allow its use in (among other things)
> + * rewrite rules
> + */
> + PG_FUNCTION_INFO_V1(dblink_current_query);
> + Datum
> + dblink_current_query(PG_FUNCTION_ARGS)
> + {
> + text *result_text;
> +
> + result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(debug_query_string)));
> + PG_RETURN_TEXT_P(result_text);
> + }
> +
> +
> + /*
> + * dblink_replace_text
> + * replace all occurences of 'old_sub_str' in 'orig_str'
> + * with 'new_sub_str' to form 'new_str'
> + *
> + * returns 'orig_str' if 'old_sub_str' == '' or 'orig_str' == ''
> + * otherwise returns 'new_str'
> + */
> + PG_FUNCTION_INFO_V1(dblink_replace_text);
> + Datum
> + dblink_replace_text(PG_FUNCTION_ARGS)
> + {
> + text *left_text;
> + text *right_text;
> + text *buf_text;
> + text *ret_text;
> + char *ret_str;
> + int curr_posn;
> + text *src_text = PG_GETARG_TEXT_P(0);
> + int src_text_len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(src_text)));
> + text *from_sub_text = PG_GETARG_TEXT_P(1);
> + int from_sub_text_len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(from_sub_text)));
> + text *to_sub_text = PG_GETARG_TEXT_P(2);
> + char *to_sub_str = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(to_sub_text)));
> + StringInfo str = makeStringInfo();
> +
> + if (src_text_len == 0 || from_sub_text_len == 0)
> + PG_RETURN_TEXT_P(src_text);
> +
> + buf_text = DatumGetTextPCopy(PointerGetDatum(src_text));
> + curr_posn = DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text)));
> +
> + while (curr_posn > 0)
> + {
> + left_text = DatumGetTextP(DirectFunctionCall3(text_substr, PointerGetDatum(buf_text), 1, DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))) - 1));
> + right_text = DatumGetTextP(DirectFunctionCall3(text_substr, PointerGetDatum(buf_text), DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))) + from_sub_text_len, -1));
> +
> + appendStringInfo(str, DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(left_text))));
> + appendStringInfo(str, to_sub_str);
> +
> + pfree(buf_text);
> + pfree(left_text);
> + buf_text = right_text;
> + curr_posn = DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text)));
> + }
> +
> + appendStringInfo(str, DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(buf_text))));
> + pfree(buf_text);
> +
> + ret_str = pstrdup(str->data);
> + ret_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(ret_str)));
> +
> + PG_RETURN_TEXT_P(ret_text);
> + }
> +
> +
> + /*************************************************************
> * internal functions
> */
>
> ***************
> *** 285,293 ****
> --- 853,1408 ----
> MemSet(retval, 0, sizeof(dblink_results));
>
> retval->tup_num = -1;
> + retval->res_id_index =-1;
> retval->res = NULL;
>
> MemoryContextSwitchTo(oldcontext);
>
> return retval;
> }
> +
> +
> + /*
> + * init_dblink_array_results
> + * - create an empty dblink_array_results data structure
> + */
> + dblink_array_results *
> + init_dblink_array_results(MemoryContext fn_mcxt)
> + {
> + MemoryContext oldcontext;
> + dblink_array_results *retval;
> +
> + oldcontext = MemoryContextSwitchTo(fn_mcxt);
> +
> + retval = (dblink_array_results *) palloc(sizeof(dblink_array_results));
> + MemSet(retval, 0, sizeof(dblink_array_results));
> +
> + retval->elem_num = -1;
> + retval->num_elems = 0;
> + retval->res = NULL;
> +
> + MemoryContextSwitchTo(oldcontext);
> +
> + return retval;
> + }
> +
> + /*
> + * get_pkey_attnames
> + *
> + * Get the primary key attnames for the given relation.
> + * Return NULL, and set numatts = 0, if no primary key exists.
> + */
> + char **
> + get_pkey_attnames(Oid relid, int16 *numatts)
> + {
> + Relation indexRelation;
> + ScanKeyData entry;
> + HeapScanDesc scan;
> + HeapTuple indexTuple;
> + int i;
> + char **result = NULL;
> + Relation rel;
> + TupleDesc tupdesc;
> +
> + /*
> + * Open relation using relid, get tupdesc
> + */
> + rel = relation_open(relid, AccessShareLock);
> + tupdesc = rel->rd_att;
> +
> + /*
> + * Initialize numatts to 0 in case no primary key
> + * exists
> + */
> + *numatts = 0;
> +
> + /*
> + * Use relid to get all related indexes
> + */
> + indexRelation = heap_openr(IndexRelationName, AccessShareLock);
> + ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indrelid,
> + F_OIDEQ, ObjectIdGetDatum(relid));
> + scan = heap_beginscan(indexRelation, false, SnapshotNow,
> + 1, &entry);
> +
> + while (HeapTupleIsValid(indexTuple = heap_getnext(scan, 0)))
> + {
> + Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
> +
> + /*
> + * We're only interested if it is the primary key
> + */
> + if (index->indisprimary == TRUE)
> + {
> + i = 0;
> + while (index->indkey[i++] != 0)
> + (*numatts)++;
> +
> + if (*numatts > 0)
> + {
> + result = (char **) palloc(*numatts * sizeof(char *));
> + for (i = 0; i < *numatts; i++)
> + result[i] = SPI_fname(tupdesc, index->indkey[i]);
> + }
> + break;
> + }
> + }
> + heap_endscan(scan);
> + heap_close(indexRelation, AccessShareLock);
> + relation_close(rel, AccessShareLock);
> +
> + return result;
> + }
> +
> +
> + /*
> + * get_strtok
> + *
> + * parse input string
> + * return ord item (0 based)
> + * based on provided field separator
> + */
> + char *
> + get_strtok(char *fldtext, char *fldsep, int fldnum)
> + {
> + int j = 0;
> + char *result;
> +
> + if (fldnum < 0)
> + {
> + elog(ERROR, "get_strtok: field number < 0 not permitted");
> + }
> +
> + if (fldsep[0] == '\0')
> + {
> + elog(ERROR, "get_strtok: blank field separator not permitted");
> + }
> +
> + result = strtok(fldtext, fldsep);
> + for (j = 1; j < fldnum + 1; j++)
> + {
> + result = strtok(NULL, fldsep);
> + if (result == NULL)
> + return NULL;
> + }
> +
> + return pstrdup(result);
> + }
> +
> + char *
> + get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
> + {
> + Relation rel;
> + char *relname;
> + HeapTuple tuple;
> + TupleDesc tupdesc;
> + int natts;
> + StringInfo str = makeStringInfo();
> + char *sql = NULL;
> + char *val = NULL;
> + int16 key;
> + unsigned int i;
> +
> + /*
> + * Open relation using relid
> + */
> + rel = relation_open(relid, AccessShareLock);
> + relname = RelationGetRelationName(rel);
> + tupdesc = rel->rd_att;
> + natts = tupdesc->natts;
> +
> + tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
> +
> + appendStringInfo(str, "INSERT INTO %s(", quote_ident_cstr(relname));
> + for (i = 0; i < natts; i++)
> + {
> + if (i > 0)
> + appendStringInfo(str, ",");
> +
> + appendStringInfo(str, NameStr(tupdesc->attrs[i]->attname));
> + }
> +
> + appendStringInfo(str, ") VALUES(");
> +
> + /*
> + * remember attvals are 1 based
> + */
> + for (i = 0; i < natts; i++)
> + {
> + if (i > 0)
> + appendStringInfo(str, ",");
> +
> + if (tgt_pkattvals != NULL)
> + key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1);
> + else
> + key = -1;
> +
> + if (key > -1)
> + val = pstrdup(tgt_pkattvals[key]);
> + else
> + val = SPI_getvalue(tuple, tupdesc, i + 1);
> +
> + if (val != NULL)
> + {
> + appendStringInfo(str, quote_literal_cstr(val));
> + pfree(val);
> + }
> + else
> + appendStringInfo(str, "NULL");
> + }
> + appendStringInfo(str, ")");
> +
> + sql = pstrdup(str->data);
> + pfree(str->data);
> + pfree(str);
> + relation_close(rel, AccessShareLock);
> +
> + return (sql);
> + }
> +
> + char *
> + get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals)
> + {
> + Relation rel;
> + char *relname;
> + TupleDesc tupdesc;
> + int natts;
> + StringInfo str = makeStringInfo();
> + char *sql = NULL;
> + char *val = NULL;
> + unsigned int i;
> +
> + /*
> + * Open relation using relid
> + */
> + rel = relation_open(relid, AccessShareLock);
> + relname = RelationGetRelationName(rel);
> + tupdesc = rel->rd_att;
> + natts = tupdesc->natts;
> +
> + appendStringInfo(str, "DELETE FROM %s WHERE ", quote_ident_cstr(relname));
> + for (i = 0; i < pknumatts; i++)
> + {
> + int16 pkattnum = pkattnums[i];
> +
> + if (i > 0)
> + appendStringInfo(str, " AND ");
> +
> + appendStringInfo(str, NameStr(tupdesc->attrs[pkattnum - 1]->attname));
> +
> + if (tgt_pkattvals != NULL)
> + val = pstrdup(tgt_pkattvals[i]);
> + else
> + elog(ERROR, "Target key array must not be NULL");
> +
> + if (val != NULL)
> + {
> + appendStringInfo(str, "=");
> + appendStringInfo(str, quote_literal_cstr(val));
> + pfree(val);
> + }
> + else
> + appendStringInfo(str, "IS NULL");
> + }
> +
> + sql = pstrdup(str->data);
> + pfree(str->data);
> + pfree(str);
> + relation_close(rel, AccessShareLock);
> +
> + return (sql);
> + }
> +
> + char *
> + get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
> + {
> + Relation rel;
> + char *relname;
> + HeapTuple tuple;
> + TupleDesc tupdesc;
> + int natts;
> + StringInfo str = makeStringInfo();
> + char *sql = NULL;
> + char *val = NULL;
> + int16 key;
> + int i;
> +
> + /*
> + * Open relation using relid
> + */
> + rel = relation_open(relid, AccessShareLock);
> + relname = RelationGetRelationName(rel);
> + tupdesc = rel->rd_att;
> + natts = tupdesc->natts;
> +
> + tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
> +
> + appendStringInfo(str, "UPDATE %s SET ", quote_ident_cstr(relname));
> +
> + for (i = 0; i < natts; i++)
> + {
> + if (i > 0)
> + appendStringInfo(str, ",");
> +
> + appendStringInfo(str, NameStr(tupdesc->attrs[i]->attname));
> + appendStringInfo(str, "=");
> +
> + if (tgt_pkattvals != NULL)
> + key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1);
> + else
> + key = -1;
> +
> + if (key > -1)
> + val = pstrdup(tgt_pkattvals[key]);
> + else
> + val = SPI_getvalue(tuple, tupdesc, i + 1);
> +
> + if (val != NULL)
> + {
> + appendStringInfo(str, quote_literal_cstr(val));
> + pfree(val);
> + }
> + else
> + appendStringInfo(str, "NULL");
> + }
> +
> + appendStringInfo(str, " WHERE ");
> +
> + for (i = 0; i < pknumatts; i++)
> + {
> + int16 pkattnum = pkattnums[i];
> +
> + if (i > 0)
> + appendStringInfo(str, " AND ");
> +
> + appendStringInfo(str, NameStr(tupdesc->attrs[pkattnum - 1]->attname));
> +
> + if (tgt_pkattvals != NULL)
> + val = pstrdup(tgt_pkattvals[i]);
> + else
> + val = SPI_getvalue(tuple, tupdesc, pkattnum);
> +
> + if (val != NULL)
> + {
> + appendStringInfo(str, "=");
> + appendStringInfo(str, quote_literal_cstr(val));
> + pfree(val);
> + }
> + else
> + appendStringInfo(str, "IS NULL");
> + }
> +
> + sql = pstrdup(str->data);
> + pfree(str->data);
> + pfree(str);
> + relation_close(rel, AccessShareLock);
> +
> + return (sql);
> + }
> +
> + /*
> + * Return a properly quoted literal value.
> + * Uses quote_literal in quote.c
> + */
> + static char *
> + quote_literal_cstr(char *rawstr)
> + {
> + text *rawstr_text;
> + text *result_text;
> + char *result;
> +
> + rawstr_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(rawstr)));
> + result_text = DatumGetTextP(DirectFunctionCall1(quote_literal, PointerGetDatum(rawstr_text)));
> + result = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(result_text)));
> +
> + return result;
> + }
> +
> + /*
> + * Return a properly quoted identifier.
> + * Uses quote_ident in quote.c
> + */
> + static char *
> + quote_ident_cstr(char *rawstr)
> + {
> + text *rawstr_text;
> + text *result_text;
> + char *result;
> +
> + rawstr_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(rawstr)));
> + result_text = DatumGetTextP(DirectFunctionCall1(quote_ident, PointerGetDatum(rawstr_text)));
> + result = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(result_text)));
> +
> + return result;
> + }
> +
> + int16
> + get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key)
> + {
> + int i;
> +
> + /*
> + * Not likely a long list anyway, so just scan for
> + * the value
> + */
> + for (i = 0; i < pknumatts; i++)
> + if (key == pkattnums[i])
> + return i;
> +
> + return -1;
> + }
> +
> + HeapTuple
> + get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals)
> + {
> + Relation rel;
> + char *relname;
> + TupleDesc tupdesc;
> + StringInfo str = makeStringInfo();
> + char *sql = NULL;
> + int ret;
> + HeapTuple tuple;
> + int i;
> + char *val = NULL;
> +
> + /*
> + * Open relation using relid
> + */
> + rel = relation_open(relid, AccessShareLock);
> + relname = RelationGetRelationName(rel);
> + tupdesc = rel->rd_att;
> +
> + /*
> + * Connect to SPI manager
> + */
> + if ((ret = SPI_connect()) < 0)
> + elog(ERROR, "get_tuple_of_interest: SPI_connect returned %d", ret);
> +
> + /*
> + * Build sql statement to look up tuple of interest
> + * Use src_pkattvals as the criteria.
> + */
> + appendStringInfo(str, "SELECT * from %s WHERE ", relname);
> +
> + for (i = 0; i < pknumatts; i++)
> + {
> + int16 pkattnum = pkattnums[i];
> +
> + if (i > 0)
> + appendStringInfo(str, " AND ");
> +
> + appendStringInfo(str, NameStr(tupdesc->attrs[pkattnum - 1]->attname));
> +
> + val = pstrdup(src_pkattvals[i]);
> + if (val != NULL)
> + {
> + appendStringInfo(str, "=");
> + appendStringInfo(str, quote_literal_cstr(val));
> + pfree(val);
> + }
> + else
> + appendStringInfo(str, "IS NULL");
> + }
> +
> + sql = pstrdup(str->data);
> + pfree(str->data);
> + pfree(str);
> + /*
> + * Retrieve the desired tuple
> + */
> + ret = SPI_exec(sql, 0);
> + pfree(sql);
> +
> + /*
> + * Only allow one qualifying tuple
> + */
> + if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
> + {
> + elog(ERROR, "get_tuple_of_interest: Source criteria may not match more than one record.");
> + }
> + else if (ret == SPI_OK_SELECT && SPI_processed == 1)
> + {
> + SPITupleTable *tuptable = SPI_tuptable;
> + tuple = SPI_copytuple(tuptable->vals[0]);
> +
> + return tuple;
> + }
> + else
> + {
> + /*
> + * no qualifying tuples
> + */
> + return NULL;
> + }
> +
> + /*
> + * never reached, but keep compiler quiet
> + */
> + return NULL;
> + }
> +
> + Oid
> + get_relid_from_relname(char *relname)
> + {
> + #ifdef NamespaceRelationName
> + Oid relid;
> +
> + relid = RelnameGetRelid(relname);
> + #else
> + Relation rel;
> + Oid relid;
> +
> + rel = relation_openr(relname, AccessShareLock);
> + relid = RelationGetRelid(rel);
> + relation_close(rel, AccessShareLock);
> + #endif /* NamespaceRelationName */
> +
> + return relid;
> + }
> +
> + dblink_results *
> + get_res_ptr(int32 res_id_index)
> + {
> + List *ptr;
> +
> + /*
> + * short circuit empty list
> + */
> + if(res_id == NIL)
> + return NULL;
> +
> + /*
> + * OK, should be good to go
> + */
> + foreach(ptr, res_id)
> + {
> + dblink_results *this_res_id = (dblink_results *) lfirst(ptr);
> + if (this_res_id->res_id_index == res_id_index)
> + return this_res_id;
> + }
> + return NULL;
> + }
> +
> + /*
> + * Add node to global List res_id
> + */
> + void
> + append_res_ptr(dblink_results *results)
> + {
> + res_id = lappend(res_id, results);
> + }
> +
> + /*
> + * Remove node from global List
> + * using res_id_index
> + */
> + void
> + remove_res_ptr(dblink_results *results)
> + {
> + res_id = lremove(results, res_id);
> +
> + if (res_id == NIL)
> + res_id_index = 0;
> + }
> +
> +
> diff -cNr dblink.orig/dblink.h dblink/dblink.h
> *** dblink.orig/dblink.h Mon Nov 5 09:46:22 2001
> --- dblink/dblink.h Sun Apr 14 18:54:39 2002
> ***************
> *** 3,9 ****
> *
> * Functions returning results from a remote database
> *
> ! * Copyright (c) Joseph Conway <joe(dot)conway(at)mail(dot)com>, 2001;
> *
> * Permission to use, copy, modify, and distribute this software and its
> * documentation for any purpose, without fee, and without a written agreement
> --- 3,10 ----
> *
> * Functions returning results from a remote database
> *
> ! * Copyright (c) Joseph Conway <mail(at)joeconway(dot)com>, 2001, 2002,
> ! * ALL RIGHTS RESERVED;
> *
> * Permission to use, copy, modify, and distribute this software and its
> * documentation for any purpose, without fee, and without a written agreement
> ***************
> *** 33,42 ****
> --- 34,64 ----
> #include "libpq-int.h"
> #include "fmgr.h"
> #include "access/tupdesc.h"
> + #include "access/heapam.h"
> + #include "catalog/catname.h"
> + #include "catalog/pg_index.h"
> + #include "catalog/pg_type.h"
> #include "executor/executor.h"
> + #include "executor/spi.h"
> + #include "lib/stringinfo.h"
> #include "nodes/nodes.h"
> #include "nodes/execnodes.h"
> + #include "nodes/pg_list.h"
> + #include "parser/parse_type.h"
> + #include "tcop/tcopprot.h"
> #include "utils/builtins.h"
> + #include "utils/fmgroids.h"
> + #include "utils/array.h"
> + #include "utils/syscache.h"
> +
> + #ifdef NamespaceRelationName
> + #include "catalog/namespace.h"
> + #endif /* NamespaceRelationName */
> +
> + /*
> + * Max SQL statement size
> + */
> + #define DBLINK_MAX_SQLSTATE_SIZE 16384
>
> /*
> * This struct holds the results of the remote query.
> ***************
> *** 50,70 ****
> int tup_num;
>
> /*
> * the actual query results
> */
> PGresult *res;
> -
> } dblink_results;
>
> /*
> * External declarations
> */
> extern Datum dblink(PG_FUNCTION_ARGS);
> extern Datum dblink_tok(PG_FUNCTION_ARGS);
>
> /*
> * Internal declarations
> */
> dblink_results *init_dblink_results(MemoryContext fn_mcxt);
>
> #endif /* DBLINK_H */
> --- 72,145 ----
> int tup_num;
>
> /*
> + * resource index number for this context
> + */
> + int res_id_index;
> +
> + /*
> * the actual query results
> */
> PGresult *res;
> } dblink_results;
>
> +
> + /*
> + * This struct holds results in the form of an array.
> + * Use fn_extra to hold a pointer to it across calls
> + */
> + typedef struct
> + {
> + /*
> + * elem being accessed
> + */
> + int elem_num;
> +
> + /*
> + * number of elems
> + */
> + int num_elems;
> +
> + /*
> + * the actual array
> + */
> + void *res;
> +
> + } dblink_array_results;
> +
> /*
> * External declarations
> */
> extern Datum dblink(PG_FUNCTION_ARGS);
> extern Datum dblink_tok(PG_FUNCTION_ARGS);
> + extern Datum dblink_strtok(PG_FUNCTION_ARGS);
> + extern Datum dblink_get_pkey(PG_FUNCTION_ARGS);
> + extern Datum dblink_last_oid(PG_FUNCTION_ARGS);
> + extern Datum dblink_build_sql_insert(PG_FUNCTION_ARGS);
> + extern Datum dblink_build_sql_delete(PG_FUNCTION_ARGS);
> + extern Datum dblink_build_sql_update(PG_FUNCTION_ARGS);
> + extern Datum dblink_current_query(PG_FUNCTION_ARGS);
> + extern Datum dblink_replace_text(PG_FUNCTION_ARGS);
>
> /*
> * Internal declarations
> */
> dblink_results *init_dblink_results(MemoryContext fn_mcxt);
> + dblink_array_results *init_dblink_array_results(MemoryContext fn_mcxt);
> + char **get_pkey_attnames(Oid relid, int16 *numatts);
> + char *get_strtok(char *fldtext, char *fldsep, int fldnum);
> + char *getvalue(HeapTuple tuple, TupleDesc tupdesc, int fnumber);
> + char *get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
> + char *get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals);
> + char *get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
> + static char *quote_literal_cstr(char *rawstr);
> + static char *quote_ident_cstr(char *rawstr);
> + int16 get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key);
> + HeapTuple get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals);
> + Oid get_relid_from_relname(char *relname);
> + dblink_results *get_res_ptr(int32 res_id_index);
> + void append_res_ptr(dblink_results *results);
> + void remove_res_ptr(dblink_results *results);
> +
> + extern char *debug_query_string;
>
> #endif /* DBLINK_H */
> diff -cNr dblink.orig/dblink.sql.in dblink/dblink.sql.in
> *** dblink.orig/dblink.sql.in Thu Jun 14 09:49:03 2001
> --- dblink/dblink.sql.in Fri Apr 12 14:36:49 2002
> ***************
> *** 1,5 ****
> ! CREATE FUNCTION dblink (text,text) RETURNS setof int
> ! AS 'MODULE_PATHNAME','dblink' LANGUAGE 'c';
>
> ! CREATE FUNCTION dblink_tok (int,int) RETURNS text
> ! AS 'MODULE_PATHNAME','dblink_tok' LANGUAGE 'c';
> --- 1,38 ----
> ! CREATE OR REPLACE FUNCTION dblink (text,text) RETURNS setof int
> ! AS 'MODULE_PATHNAME','dblink' LANGUAGE 'c'
> ! WITH (isstrict);
>
> ! CREATE OR REPLACE FUNCTION dblink_tok (int,int) RETURNS text
> ! AS 'MODULE_PATHNAME','dblink_tok' LANGUAGE 'c'
> ! WITH (isstrict);
> !
> ! CREATE OR REPLACE FUNCTION dblink_strtok (text,text,int) RETURNS text
> ! AS 'MODULE_PATHNAME','dblink_strtok' LANGUAGE 'c'
> ! WITH (iscachable, isstrict);
> !
> ! CREATE OR REPLACE FUNCTION dblink_get_pkey (name) RETURNS setof text
> ! AS 'MODULE_PATHNAME','dblink_get_pkey' LANGUAGE 'c'
> ! WITH (isstrict);
> !
> ! CREATE OR REPLACE FUNCTION dblink_last_oid (int) RETURNS oid
> ! AS 'MODULE_PATHNAME','dblink_last_oid' LANGUAGE 'c'
> ! WITH (isstrict);
> !
> ! CREATE OR REPLACE FUNCTION dblink_build_sql_insert (name, int2vector, int2, _text, _text) RETURNS text
> ! AS 'MODULE_PATHNAME','dblink_build_sql_insert' LANGUAGE 'c'
> ! WITH (isstrict);
> !
> ! CREATE OR REPLACE FUNCTION dblink_build_sql_delete (name, int2vector, int2, _text) RETURNS text
> ! AS 'MODULE_PATHNAME','dblink_build_sql_delete' LANGUAGE 'c'
> ! WITH (isstrict);
> !
> ! CREATE OR REPLACE FUNCTION dblink_build_sql_update (name, int2vector, int2, _text, _text) RETURNS text
> ! AS 'MODULE_PATHNAME','dblink_build_sql_update' LANGUAGE 'c'
> ! WITH (isstrict);
> !
> ! CREATE OR REPLACE FUNCTION dblink_current_query () RETURNS text
> ! AS 'MODULE_PATHNAME','dblink_current_query' LANGUAGE 'c';
> !
> ! CREATE OR REPLACE FUNCTION dblink_replace (text,text,text) RETURNS text
> ! AS 'MODULE_PATHNAME','dblink_replace_text' LANGUAGE 'c'
> ! WITH (iscachable, isstrict);

>
> ---------------------------(end of broadcast)---------------------------
> TIP 5: Have you checked our extensive FAQ?
>
> http://www.postgresql.org/users-lounge/docs/faq.html

--
Bruce Momjian | http://candle.pha.pa.us
pgman(at)candle(dot)pha(dot)pa(dot)us | (610) 853-3000
+ If your life is a hard drive, | 830 Blythe Avenue
+ Christ can be your backup. | Drexel Hill, Pennsylvania 19026

In response to

Browse pgsql-patches by date

  From Date Subject
Next Message Bruce Momjian 2002-04-24 02:30:31 Re: [PATCH]errors_zh_TW.properties
Previous Message Bruce Momjian 2002-04-24 02:26:18 Re: Win32 Error descriptions + config