From 0e92a90bd9f944b7f14845e942147d11f81e3b6f Mon Sep 17 00:00:00 2001
From: Atsushi Torikoshi <torikoshia@oss.nttdata.com>
Date: Tue, 24 Sep 2024 14:11:22 +0900
Subject: [PATCH v4] Add new COPY option REJECT_LIMIT number

9e2d870 enabled the COPY FROM command to skip soft errors, but there
was no limitation and skipped all the soft errors.
In some cases, there would be a limitation how many errors are
tolerable. This patch introduces REJECT_LIMIT to specify how many
errors can be skipped.

---
 doc/src/sgml/ref/copy.sgml          | 20 ++++++++++++++
 src/backend/commands/copy.c         | 41 +++++++++++++++++++++++++++++
 src/backend/commands/copyfrom.c     |  5 ++++
 src/include/commands/copy.h         |  1 +
 src/test/regress/expected/copy2.out | 10 +++++++
 src/test/regress/sql/copy2.sql      | 21 +++++++++++++++
 6 files changed, 98 insertions(+)

diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index 1518af8a04..3244384b1a 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -44,6 +44,7 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
     FORCE_NOT_NULL { ( <replaceable class="parameter">column_name</replaceable> [, ...] ) | * }
     FORCE_NULL { ( <replaceable class="parameter">column_name</replaceable> [, ...] ) | * }
     ON_ERROR <replaceable class="parameter">error_action</replaceable>
+    REJECT_LIMIT { <replaceable class="parameter">integer</replaceable> }
     ENCODING '<replaceable class="parameter">encoding_name</replaceable>'
     LOG_VERBOSITY <replaceable class="parameter">verbosity</replaceable>
 </synopsis>
@@ -411,6 +412,25 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>REJECT_LIMIT</literal></term>
+    <listitem>
+     <para>
+      When a positive integer value is specified, <command>COPY</command> limits
+      the maximum tolerable number of errors while converting a column's input
+      value into its data type.
+      If input data caused more errors than the specified value, entire
+      <command>COPY</command> fails.
+      Otherwise, <command>COPY</command> discards the input row and continues
+      with the next one.
+      This option must be used with <literal>ON_ERROR</literal> to be set to
+      <literal>ignore</literal>.
+      Just setting <literal>ON_ERROR</literal> to <literal>ignore</literal>
+      tolerates unlimited number of errors.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><literal>ENCODING</literal></term>
     <listitem>
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 3bb579a3a4..91e01dbc9f 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -418,6 +418,30 @@ defGetCopyOnErrorChoice(DefElem *def, ParseState *pstate, bool is_from)
 	return COPY_ON_ERROR_STOP;	/* keep compiler quiet */
 }
 
+/*
+ * Extract REJECT_LIMIT value from a DefElem.
+ */
+static int64
+defGetCopyRejectLimitOptions(DefElem *def)
+{
+	int64					reject_limit;
+
+	if (nodeTag(def->arg) == T_Integer)
+	{
+		reject_limit = defGetInt64(def);
+		if (reject_limit <= 0)
+			ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("number for REJECT_LIMIT must be greater than zero")));
+	}
+	else
+		ereport(ERROR,
+			(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+			 errmsg("value for REJECT_LIMIT must be positive integer")));
+
+	return reject_limit;
+}
+
 /*
  * Extract a CopyLogVerbosityChoice value from a DefElem.
  */
@@ -470,6 +494,7 @@ ProcessCopyOptions(ParseState *pstate,
 	bool		header_specified = false;
 	bool		on_error_specified = false;
 	bool		log_verbosity_specified = false;
+	bool		reject_limit_specified = false;
 	ListCell   *option;
 
 	/* Support external use for option sanity checking */
@@ -636,6 +661,13 @@ ProcessCopyOptions(ParseState *pstate,
 			log_verbosity_specified = true;
 			opts_out->log_verbosity = defGetCopyLogVerbosityChoice(defel, pstate);
 		}
+		else if (strcmp(defel->defname, "reject_limit") == 0)
+		{
+			if (reject_limit_specified)
+				errorConflictingDefElem(defel, pstate);
+			reject_limit_specified = true;
+			opts_out->reject_limit = defGetCopyRejectLimitOptions(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -669,6 +701,15 @@ ProcessCopyOptions(ParseState *pstate,
 				(errcode(ERRCODE_SYNTAX_ERROR),
 				 errmsg("only ON_ERROR STOP is allowed in BINARY mode")));
 
+	if (opts_out->reject_limit && !opts_out->on_error)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+		/*- translator: first and second %s are the names of COPY
+		 * option, e.g. ON_ERROR, thrid is the value of the COPY option,
+		 * e.g. IGNORE */
+				 errmsg("COPY %s requires %s to be set to %s",
+						 "REJECT_LIMIT", "ON_ERROR", "IGNORE")));
+
 	/* Set defaults for omitted options */
 	if (!opts_out->delim)
 		opts_out->delim = opts_out->csv_mode ? "," : "\t";
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 2d3462913e..c17fbf71bc 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -1024,6 +1024,11 @@ CopyFrom(CopyFromState cstate)
 			pgstat_progress_update_param(PROGRESS_COPY_TUPLES_SKIPPED,
 										 ++skipped);
 
+			if (cstate->opts.reject_limit && skipped > cstate->opts.reject_limit)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+						 errmsg("skipped more than REJECT_LIMIT rows: \"%lld\",",
+								(long long) cstate->opts.reject_limit)));
 			continue;
 		}
 
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 141fd48dc1..c3a7613778 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -83,6 +83,7 @@ typedef struct CopyFormatOptions
 	bool		convert_selectively;	/* do selective binary conversion? */
 	CopyOnErrorChoice on_error; /* what to do when error happened */
 	CopyLogVerbosityChoice log_verbosity;	/* verbosity of logged messages */
+	int64		reject_limit;	/* maximum tolerable number of errors */
 	List	   *convert_select; /* list of column names (can be NIL) */
 } CopyFormatOptions;
 
diff --git a/src/test/regress/expected/copy2.out b/src/test/regress/expected/copy2.out
index 61a19cdc4c..f63228f5bc 100644
--- a/src/test/regress/expected/copy2.out
+++ b/src/test/regress/expected/copy2.out
@@ -116,6 +116,10 @@ COPY x to stdout (log_verbosity unsupported);
 ERROR:  COPY LOG_VERBOSITY "unsupported" not recognized
 LINE 1: COPY x to stdout (log_verbosity unsupported);
                           ^
+COPY x from stdin with (reject_limit 1);
+ERROR:  COPY REJECT_LIMIT requires ON_ERROR to be set to IGNORE
+COPY x from stdin with (on_error ignore, reject_limit 0);
+ERROR:  number for REJECT_LIMIT must be greater than zero
 -- too many columns in column list: should fail
 COPY x (a, b, c, d, e, d, c) from stdin;
 ERROR:  column "d" specified more than once
@@ -789,6 +793,12 @@ CONTEXT:  COPY check_ign_err, line 1: "1	{1}"
 COPY check_ign_err FROM STDIN WITH (on_error ignore);
 ERROR:  extra data after last expected column
 CONTEXT:  COPY check_ign_err, line 1: "1	{1}	3	abc"
+-- tests for reject_limit option
+COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit 3);
+ERROR:  skipped more than REJECT_LIMIT rows: "3",
+CONTEXT:  COPY check_ign_err, line 5, column n: ""
+COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit 4);
+NOTICE:  4 rows were skipped due to data type incompatibility
 -- clean up
 DROP TABLE forcetest;
 DROP TABLE vistest;
diff --git a/src/test/regress/sql/copy2.sql b/src/test/regress/sql/copy2.sql
index 8b14962194..2d775d9c97 100644
--- a/src/test/regress/sql/copy2.sql
+++ b/src/test/regress/sql/copy2.sql
@@ -82,6 +82,8 @@ COPY x to stdout (format TEXT, force_null(a));
 COPY x to stdin (format CSV, force_null(a));
 COPY x to stdin (format BINARY, on_error unsupported);
 COPY x to stdout (log_verbosity unsupported);
+COPY x from stdin with (reject_limit 1);
+COPY x from stdin with (on_error ignore, reject_limit 0);
 
 -- too many columns in column list: should fail
 COPY x (a, b, c, d, e, d, c) from stdin;
@@ -557,6 +559,25 @@ COPY check_ign_err FROM STDIN WITH (on_error ignore);
 1	{1}	3	abc
 \.
 
+-- tests for reject_limit option
+COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit 3);
+6	{6}	6
+a	{7}	7
+8	{8}	8888888888
+9	{a, 9}	9
+
+10	{10}	10
+\.
+
+COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit 4);
+6	{6}	6
+a	{7}	7
+8	{8}	8888888888
+9	{a, 9}	9
+
+10	{10}	10
+\.
+
 -- clean up
 DROP TABLE forcetest;
 DROP TABLE vistest;

base-commit: bbba59e69a56e1622e270f5e47b402c3a904cefc
-- 
2.39.2

