From d00e4a404fc9c6ccc8aec3885210909a21c55005 Mon Sep 17 00:00:00 2001
From: Atsushi Torikoshi <torikoshia@oss.nttdata.com>
Date: Wed, 17 Jul 2024 21:47:52 +0900
Subject: [PATCH v2] Add new COPY option REJECT_LIMIT number

---
 doc/src/sgml/ref/copy.sgml          | 20 +++++++++++++
 src/backend/commands/copy.c         | 46 +++++++++++++++++++++++++++++
 src/backend/commands/copyfrom.c     |  6 ++++
 src/include/commands/copy.h         | 10 +++++++
 src/test/regress/expected/copy2.out | 14 +++++++++
 src/test/regress/sql/copy2.sql      | 30 +++++++++++++++++++
 6 files changed, 126 insertions(+)

diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index 1518af8a04..ee84d9661f 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -44,6 +44,7 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
     FORCE_NOT_NULL { ( <replaceable class="parameter">column_name</replaceable> [, ...] ) | * }
     FORCE_NULL { ( <replaceable class="parameter">column_name</replaceable> [, ...] ) | * }
     ON_ERROR <replaceable class="parameter">error_action</replaceable>
+    REJECT_LIMIT { <replaceable class="parameter">integer</replaceable> | INFINITY }
     ENCODING '<replaceable class="parameter">encoding_name</replaceable>'
     LOG_VERBOSITY <replaceable class="parameter">verbosity</replaceable>
 </synopsis>
@@ -411,6 +412,25 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>REJECT_LIMIT</literal></term>
+    <listitem>
+     <para>
+      When a positive integer value is specified, <command>COPY</command> limits
+      the maximum tolerable number of errors while converting a column's input
+      value into its data type.
+      If input data caused more errors than the specified value, entire
+      <command>COPY</command> fails.
+      Otherwise, <command>COPY</command> discards the input row and continues
+      with the next one.
+     </para>
+     <para>
+      When specified <literal>INFINITY</literal>, <command>COPY</command> ignores all
+      the errors. This is a synonym for <literal>ON_ERROR</literal> <literal>ignore</literal>.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><literal>ENCODING</literal></term>
     <listitem>
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index df7a4a21c9..f298614043 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -415,6 +415,43 @@ defGetCopyOnErrorChoice(DefElem *def, ParseState *pstate, bool is_from)
 	return COPY_ON_ERROR_STOP;	/* keep compiler quiet */
 }
 
+/*
+ * Extract a CopyRejectLimits values from a DefElem.
+ */
+static CopyRejectLimits
+defGetCopyRejectLimitOptions(DefElem *def)
+{
+	CopyRejectLimits	limits;
+	int64					num_err;
+
+	switch(nodeTag(def->arg))
+	{
+		case T_Integer:
+			num_err = defGetInt64(def);
+			if (num_err <= 0)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("number for REJECT_LIMIT must be greater than zero")));
+			break;
+		case T_String:
+			if (pg_strcasecmp(defGetString(def), "INFINITY") == 0)
+				/* when set to 0, it is treated as no limit */
+				num_err = 0;
+			else
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("string for REJECT_LIMIT must be 'INFINITY'")));
+			break;
+		default:
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("value for REJECT_LIMIT must be positive integer or 'INFINITY'")));
+	}
+	limits.num_err = num_err;
+
+	return limits;
+}
+
 /*
  * Extract a CopyLogVerbosityChoice value from a DefElem.
  */
@@ -466,6 +503,7 @@ ProcessCopyOptions(ParseState *pstate,
 	bool		header_specified = false;
 	bool		on_error_specified = false;
 	bool		log_verbosity_specified = false;
+	bool		reject_limit_specified = false;
 	ListCell   *option;
 
 	/* Support external use for option sanity checking */
@@ -632,6 +670,14 @@ ProcessCopyOptions(ParseState *pstate,
 			log_verbosity_specified = true;
 			opts_out->log_verbosity = defGetCopyLogVerbosityChoice(defel, pstate);
 		}
+		else if (strcmp(defel->defname, "reject_limit") == 0)
+		{
+			if (reject_limit_specified)
+				errorConflictingDefElem(defel, pstate);
+			reject_limit_specified = true;
+			opts_out->on_error = COPY_ON_ERROR_IGNORE;
+			opts_out->reject_limits = defGetCopyRejectLimitOptions(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index ce4d62e707..65d950ef07 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -1012,6 +1012,12 @@ CopyFrom(CopyFromState cstate)
 			pgstat_progress_update_param(PROGRESS_COPY_TUPLES_SKIPPED,
 										 ++skipped);
 
+			if (cstate->opts.reject_limits.num_err &&
+				skipped > cstate->opts.reject_limits.num_err)
+				ereport(ERROR,
+						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+						 errmsg("exceeded the number specified by REJECT_LIMIT \"%lld\"",
+								(long long) cstate->opts.reject_limits.num_err)));
 			continue;
 		}
 
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 141fd48dc1..86ba1cbe16 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -49,6 +49,15 @@ typedef enum CopyLogVerbosityChoice
 	COPY_LOG_VERBOSITY_VERBOSE, /* logs additional messages */
 } CopyLogVerbosityChoice;
 
+/*
+ * A struct to hold reject_limit options, in a parsed form.
+ * More values to be added in another patch.
+ */
+typedef struct CopyRejectLimits
+{
+	int64		num_err;	/* maximum tolerable number of errors */
+} CopyRejectLimits;
+
 /*
  * A struct to hold COPY options, in a parsed form. All of these are related
  * to formatting, except for 'freeze', which doesn't really belong here, but
@@ -83,6 +92,7 @@ typedef struct CopyFormatOptions
 	bool		convert_selectively;	/* do selective binary conversion? */
 	CopyOnErrorChoice on_error; /* what to do when error happened */
 	CopyLogVerbosityChoice log_verbosity;	/* verbosity of logged messages */
+	CopyRejectLimits	reject_limits;	/* thresholds of reject_limit */
 	List	   *convert_select; /* list of column names (can be NIL) */
 } CopyFormatOptions;
 
diff --git a/src/test/regress/expected/copy2.out b/src/test/regress/expected/copy2.out
index 931542f268..be04bac6b1 100644
--- a/src/test/regress/expected/copy2.out
+++ b/src/test/regress/expected/copy2.out
@@ -85,6 +85,10 @@ COPY x from stdin (log_verbosity default, log_verbosity verbose);
 ERROR:  conflicting or redundant options
 LINE 1: COPY x from stdin (log_verbosity default, log_verbosity verb...
                                                   ^
+COPY x from stdin (reject_limit 'INFINITY', reject_limit 3);
+ERROR:  conflicting or redundant options
+LINE 1: COPY x from stdin (reject_limit 'INFINITY', reject_limit 3);
+                                                    ^
 -- incorrect options
 COPY x to stdin (format BINARY, delimiter ',');
 ERROR:  cannot specify DELIMITER in BINARY mode
@@ -116,6 +120,8 @@ COPY x to stdout (log_verbosity unsupported);
 ERROR:  COPY LOG_VERBOSITY "unsupported" not recognized
 LINE 1: COPY x to stdout (log_verbosity unsupported);
                           ^
+COPY x from stdin with (reject_limit 0);
+ERROR:  number for REJECT_LIMIT must be greater than zero
 -- too many columns in column list: should fail
 COPY x (a, b, c, d, e, d, c) from stdin;
 ERROR:  column "d" specified more than once
@@ -789,6 +795,14 @@ CONTEXT:  COPY check_ign_err, line 1: "1	{1}"
 COPY check_ign_err FROM STDIN WITH (on_error ignore);
 ERROR:  extra data after last expected column
 CONTEXT:  COPY check_ign_err, line 1: "1	{1}	3	abc"
+-- tests for reject_limit option
+COPY check_ign_err FROM STDIN WITH (reject_limit 3);
+ERROR:  exceeded the number specified by REJECT_LIMIT "3"
+CONTEXT:  COPY check_ign_err, line 5, column n: ""
+COPY check_ign_err FROM STDIN WITH (reject_limit 4);
+NOTICE:  4 rows were skipped due to data type incompatibility
+COPY check_ign_err FROM STDIN WITH (reject_limit 'INFINITY');
+NOTICE:  4 rows were skipped due to data type incompatibility
 -- clean up
 DROP TABLE forcetest;
 DROP TABLE vistest;
diff --git a/src/test/regress/sql/copy2.sql b/src/test/regress/sql/copy2.sql
index 8b14962194..2ddc50ce8d 100644
--- a/src/test/regress/sql/copy2.sql
+++ b/src/test/regress/sql/copy2.sql
@@ -68,6 +68,7 @@ COPY x from stdin (convert_selectively (a), convert_selectively (b));
 COPY x from stdin (encoding 'sql_ascii', encoding 'sql_ascii');
 COPY x from stdin (on_error ignore, on_error ignore);
 COPY x from stdin (log_verbosity default, log_verbosity verbose);
+COPY x from stdin (reject_limit 'INFINITY', reject_limit 3);
 
 -- incorrect options
 COPY x to stdin (format BINARY, delimiter ',');
@@ -82,6 +83,7 @@ COPY x to stdout (format TEXT, force_null(a));
 COPY x to stdin (format CSV, force_null(a));
 COPY x to stdin (format BINARY, on_error unsupported);
 COPY x to stdout (log_verbosity unsupported);
+COPY x from stdin with (reject_limit 0);
 
 -- too many columns in column list: should fail
 COPY x (a, b, c, d, e, d, c) from stdin;
@@ -557,6 +559,34 @@ COPY check_ign_err FROM STDIN WITH (on_error ignore);
 1	{1}	3	abc
 \.
 
+-- tests for reject_limit option
+COPY check_ign_err FROM STDIN WITH (reject_limit 3);
+6	{6}	6
+a	{7}	7
+8	{8}	8888888888
+9	{a, 9}	9
+
+10	{10}	10
+\.
+
+COPY check_ign_err FROM STDIN WITH (reject_limit 4);
+6	{6}	6
+a	{7}	7
+8	{8}	8888888888
+9	{a, 9}	9
+
+10	{10}	10
+\.
+
+COPY check_ign_err FROM STDIN WITH (reject_limit 'INFINITY');
+6	{6}	6
+a	{7}	7
+8	{8}	8888888888
+9	{a, 9}	9
+
+10	{10}	10
+\.
+
 -- clean up
 DROP TABLE forcetest;
 DROP TABLE vistest;

base-commit: 86d33987e8b0364b468c9b40c5f2a0a1aed87ef1
-- 
2.39.2

