diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index 254d3ab8eb..5ee70d62bf 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -380,6 +380,28 @@ WHERE condition
+
+ on_conflict_log
+
+
+ Specifies to log error record up to specified amount.
+ Instead write the record to log file and
+ precede to the next record
+
+
+
+
+
+ log_file_name
+
+
+ The path name of the log file. It must be an absolute
+ path. Windows users might need to use an E'' string and
+ double any backslashes used in the path name.
+
+
+
+
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index dbb06397e6..3c6afec5b3 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -46,6 +46,7 @@
#include "port/pg_bswap.h"
#include "rewrite/rewriteHandler.h"
#include "storage/fd.h"
+#include "storage/lmgr.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
@@ -123,6 +124,7 @@ typedef struct CopyStateData
int file_encoding; /* file or remote side's character encoding */
bool need_transcoding; /* file encoding diff from server? */
bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
+ FILE *failed_rec_file; /* used if ignore_conflict is true */
/* parameters from the COPY command */
Relation rel; /* relation to copy to or from */
@@ -152,6 +154,9 @@ typedef struct CopyStateData
List *convert_select; /* list of column names (can be NIL) */
bool *convert_select_flags; /* per-column CSV/TEXT CS flags */
Node *whereClause; /* WHERE condition (or NULL) */
+ char *failed_rec_filename; /* failed record filename */
+ bool ignore_conflict;
+ int error_limit; /* total # of error to log */
/* these are just for error messages, see CopyFromErrorCallback */
const char *cur_relname; /* table name for error messages */
@@ -773,6 +778,21 @@ CopyLoadRawBuf(CopyState cstate)
return (inbytes > 0);
}
+/*
+ * LogCopyError log error in to failed record file
+ */
+static void
+LogCopyError(CopyState cstate, const char *str)
+{
+ appendBinaryStringInfo(&cstate->line_buf, str, strlen(str));
+#ifndef WIN32
+ appendStringInfoCharMacro(&cstate->line_buf, '\n');
+#else
+ appendBinaryStringInfo(&cstate->line_buf, "\r\n", strlen("\r\n"));
+#endif
+ fwrite(cstate->line_buf.data, 1, cstate->line_buf.len, cstate->failed_rec_file);
+ cstate->error_limit--;
+}
/*
* DoCopy executes the SQL COPY statement
@@ -1249,6 +1269,32 @@ ProcessCopyOptions(ParseState *pstate,
defel->defname),
parser_errposition(pstate, defel->location)));
}
+ else if (strcmp(defel->defname, "on_conflict_log") == 0)
+ {
+ if (cstate->ignore_conflict)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options"),
+ parser_errposition(pstate, defel->location)));
+
+ cstate->ignore_conflict = true;
+ cstate->error_limit =defGetInt64(defel);
+ if (cstate->error_limit < 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("argument to option \"%s\" must be positive number",
+ defel->defname),
+ parser_errposition(pstate, defel->location)));
+ }
+ else if (strcmp(defel->defname, "log_file_name") == 0)
+ {
+ if (cstate->failed_rec_filename)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options"),
+ parser_errposition(pstate, defel->location)));
+ cstate->failed_rec_filename =defGetString(defel);
+ }
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -1271,6 +1317,21 @@ ProcessCopyOptions(ParseState *pstate,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("cannot specify NULL in BINARY mode")));
+ if (!cstate->error_limit && cstate->failed_rec_filename)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("cannot specify log file name without on conflict log option")));
+
+ if (cstate->error_limit && !cstate->failed_rec_filename)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("cannot specify on conflict log without log file name option")));
+
+ if (cstate->error_limit && !cstate->is_copy_from)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("cannot specify on conflict log on COPY TO")));
+
/* Set defaults for omitted options */
if (!cstate->delim)
cstate->delim = cstate->csv_mode ? "," : "\t";
@@ -1771,6 +1832,11 @@ EndCopy(CopyState cstate)
(errcode_for_file_access(),
errmsg("could not close file \"%s\": %m",
cstate->filename)));
+ if (cstate->failed_rec_filename != NULL && FreeFile(cstate->failed_rec_file))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not close file \"%s\": %m",
+ cstate->failed_rec_filename)));
}
MemoryContextDelete(cstate->copycontext);
@@ -2492,6 +2558,8 @@ CopyFrom(CopyState cstate)
hi_options |= HEAP_INSERT_FROZEN;
}
+ if (!cstate->ignore_conflict)
+ cstate->error_limit = 0;
/*
* We need a ResultRelInfo so we can use the regular executor's
* index-entry-making machinery. (There used to be a huge amount of code
@@ -2619,6 +2687,10 @@ CopyFrom(CopyState cstate)
*/
insertMethod = CIM_SINGLE;
}
+ else if (cstate->ignore_conflict)
+ {
+ insertMethod = CIM_SINGLE;
+ }
else
{
/*
@@ -3000,12 +3072,59 @@ CopyFrom(CopyState cstate)
*/
tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
}
+ else if (cstate->ignore_conflict && cstate->error_limit > 0)
+ {
+ bool specConflict;
+ uint32 specToken;
+ specConflict = false;
+
+ specToken = SpeculativeInsertionLockAcquire(GetCurrentTransactionId());
+ HeapTupleHeaderSetSpeculativeToken(tuple->t_data, specToken);
+
+ /* insert the tuple, with the speculative token */
+ heap_insert(resultRelInfo->ri_RelationDesc, tuple,
+ estate->es_output_cid,
+ HEAP_INSERT_SPECULATIVE,
+ NULL);
+
+ /* insert index entries for tuple */
+ recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
+ estate, true, &specConflict,
+ NIL);
+
+ /* adjust the tuple's state accordingly */
+ if (!specConflict)
+ {
+ heap_finish_speculative(resultRelInfo->ri_RelationDesc, tuple);
+ processed++;
+ }
+ else
+ {
+ heap_abort_speculative(resultRelInfo->ri_RelationDesc, tuple);
+#ifndef WIN32
+ appendStringInfoCharMacro(&cstate->line_buf, '\n');
+#else
+ appendBinaryStringInfo(&cstate->cstate->line_buf, "\r\n", strlen("\r\n"));
+#endif
+ fwrite(cstate->line_buf.data, 1, cstate->line_buf.len, cstate->failed_rec_file);
+ cstate->error_limit--;
+
+ }
+
+ /*
+ * Wake up anyone waiting for our decision. They will re-check
+ * the tuple, see that it's no longer speculative, and wait on our
+ * XID as if this was a regularly inserted tuple all along.
+ */
+ SpeculativeInsertionLockRelease(GetCurrentTransactionId());
+
+ }
else
heap_insert(resultRelInfo->ri_RelationDesc, tuple,
mycid, hi_options, bistate);
/* And create index entries for it */
- if (resultRelInfo->ri_NumIndices > 0)
+ if (resultRelInfo->ri_NumIndices > 0 && cstate->error_limit == 0)
recheckIndexes = ExecInsertIndexTuples(slot,
&(tuple->t_self),
estate,
@@ -3026,7 +3145,8 @@ CopyFrom(CopyState cstate)
* or FDW; this is the same definition used by nodeModifyTable.c
* for counting tuples inserted by an INSERT command.
*/
- processed++;
+ if(!cstate->ignore_conflict)
+ processed++;
}
}
@@ -3316,6 +3436,48 @@ BeginCopyFrom(ParseState *pstate,
cstate->num_defaults = num_defaults;
cstate->is_program = is_program;
+ if (cstate->failed_rec_filename)
+ {
+ mode_t oumask; /* Pre-existing umask value */
+ struct stat st;
+ /*
+ * Prevent write to relative path ... too easy to shoot oneself in
+ * the foot by overwriting a database file ...
+ */
+ if (!is_absolute_path(cstate->failed_rec_filename))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_NAME),
+ errmsg("relative path not allowed for failed record file")));
+ oumask = umask(S_IWGRP | S_IWOTH);
+ PG_TRY();
+ {
+ cstate->failed_rec_file = AllocateFile(cstate->failed_rec_filename, PG_BINARY_W);
+ }
+ PG_CATCH();
+ {
+ umask(oumask);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+ umask(oumask);
+ if (cstate->failed_rec_file == NULL)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\" for writing: %m",
+ cstate->failed_rec_filename)));
+
+ if (fstat(fileno(cstate->failed_rec_file), &st))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file \"%s\": %m",
+ cstate->failed_rec_filename)));
+
+ if (S_ISDIR(st.st_mode))
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is a directory", cstate->failed_rec_filename)));
+ }
+
if (data_source_cb)
{
cstate->copy_dest = COPY_CALLBACK;
@@ -3514,7 +3676,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
/* Initialize all values for row to NULL */
MemSet(values, 0, num_phys_attrs * sizeof(Datum));
MemSet(nulls, true, num_phys_attrs * sizeof(bool));
-
+next_line:
if (!cstate->binary)
{
char **field_strings;
@@ -3529,9 +3691,16 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
/* check for overflowing fields */
if (attr_count > 0 && fldct > attr_count)
- ereport(ERROR,
- (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
- errmsg("extra data after last expected column")));
+ {
+ if (cstate->ignore_conflict && cstate->error_limit > 0)
+ {
+ LogCopyError(cstate, " extra data after last expected column");
+ goto next_line;
+ }else
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("extra data after last expected column")));
+ }
fieldno = 0;
@@ -3543,10 +3712,20 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
Form_pg_attribute att = TupleDescAttr(tupDesc, m);
if (fieldno >= fldct)
- ereport(ERROR,
- (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
- errmsg("missing data for column \"%s\"",
- NameStr(att->attname))));
+ {
+ if (cstate->ignore_conflict && cstate->error_limit > 0)
+ {
+ appendStringInfo(&cstate->line_buf, " missing data for column %s",
+ NameStr(att->attname));
+ LogCopyError(cstate, " ");
+ goto next_line;
+ }else
+
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("missing data for column \"%s\"",
+ NameStr(att->attname))));
+ }
string = field_strings[fieldno++];
if (cstate->convert_select_flags &&
@@ -3633,10 +3812,19 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
}
if (fld_count != attr_count)
- ereport(ERROR,
- (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
- errmsg("row field count is %d, expected %d",
- (int) fld_count, attr_count)));
+ {
+ if (cstate->ignore_conflict && cstate->error_limit > 0)
+ {
+ appendStringInfo(&cstate->line_buf, "row field count is %d, expected %d",
+ (int) fld_count, attr_count);
+ LogCopyError(cstate, " ");
+ goto next_line;
+ }else
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("row field count is %d, expected %d",
+ (int) fld_count, attr_count)));
+ }
i = 0;
foreach(cur, cstate->attnumlist)
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index a68f78e0e0..74d5737d7a 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -631,7 +631,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
EXCLUDE EXCLUDING EXCLUSIVE EXECUTE EXISTS EXPLAIN
EXTENSION EXTERNAL EXTRACT
- FALSE_P FAMILY FETCH FILTER FIRST_P FLOAT_P FOLLOWING FOR
+ FALSE_P FAMILY FETCH FILE_P FILTER FIRST_P FLOAT_P FOLLOWING FOR
FORCE FOREIGN FORWARD FREEZE FROM FULL FUNCTION FUNCTIONS
GENERATED GLOBAL GRANT GRANTED GREATEST GROUP_P GROUPING GROUPS
@@ -649,7 +649,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
LABEL LANGUAGE LARGE_P LAST_P LATERAL_P
LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL
- LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED
+ LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOG_P LOGGED
MAPPING MATCH MATERIALIZED MAXVALUE METHOD MINUTE_P MINVALUE MODE MONTH_P MOVE
@@ -3047,6 +3047,14 @@ copy_opt_item:
{
$$ = makeDefElem("encoding", (Node *)makeString($2), @1);
}
+ | ON CONFLICT LOG_P Iconst
+ {
+ $$ = makeDefElem("on_conflict_log", (Node *)makeInteger($4), @1);
+ }
+ | LOG_P FILE_P NAME_P Sconst
+ {
+ $$ = makeDefElem("log_file_name", (Node *)makeString($4), @1);
+ }
;
/* The following exist for backward compatibility with very old versions */
@@ -15033,6 +15041,7 @@ unreserved_keyword:
| EXTENSION
| EXTERNAL
| FAMILY
+ | FILE_P
| FILTER
| FIRST_P
| FOLLOWING
@@ -15081,6 +15090,7 @@ unreserved_keyword:
| LOCATION
| LOCK_P
| LOCKED
+ | LOG_P
| LOGGED
| MAPPING
| MATCH
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index f05444008c..c161b4cd7a 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -161,6 +161,7 @@ PG_KEYWORD("extract", EXTRACT, COL_NAME_KEYWORD)
PG_KEYWORD("false", FALSE_P, RESERVED_KEYWORD)
PG_KEYWORD("family", FAMILY, UNRESERVED_KEYWORD)
PG_KEYWORD("fetch", FETCH, RESERVED_KEYWORD)
+PG_KEYWORD("file", FILE_P, UNRESERVED_KEYWORD)
PG_KEYWORD("filter", FILTER, UNRESERVED_KEYWORD)
PG_KEYWORD("first", FIRST_P, UNRESERVED_KEYWORD)
PG_KEYWORD("float", FLOAT_P, COL_NAME_KEYWORD)
@@ -241,6 +242,7 @@ PG_KEYWORD("localtimestamp", LOCALTIMESTAMP, RESERVED_KEYWORD)
PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD)
PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD)
PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD)
+PG_KEYWORD("log", LOG_P, UNRESERVED_KEYWORD)
PG_KEYWORD("logged", LOGGED, UNRESERVED_KEYWORD)
PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD)
PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD)