diff --git a/doc/src/sgml/datatype.sgml b/doc/src/sgml/datatype.sgml index a8d0780387..a86b794ce0 100644 --- a/doc/src/sgml/datatype.sgml +++ b/doc/src/sgml/datatype.sgml @@ -4823,7 +4823,13 @@ SELECT * FROM pg_attribute standard comparison operators, like = and >. Two LSNs can be subtracted using the - operator; the result is the number of bytes separating - those write-ahead log locations. + those write-ahead log locations. Also the number of bytes can be + added into and subtracted from LSN using the + +(pg_lsn,numeric) and + -(pg_lsn,numeric) operators, respectively. Note that + the calculated LSN should be in the range of pg_lsn type, + i.e., between 0/0 and + FFFFFFFF/FFFFFFFF. diff --git a/src/backend/utils/adt/numeric.c b/src/backend/utils/adt/numeric.c index 9986132b45..94593c7f63 100644 --- a/src/backend/utils/adt/numeric.c +++ b/src/backend/utils/adt/numeric.c @@ -41,6 +41,7 @@ #include "utils/guc.h" #include "utils/int8.h" #include "utils/numeric.h" +#include "utils/pg_lsn.h" #include "utils/sortsupport.h" /* ---------- @@ -472,6 +473,7 @@ static void apply_typmod(NumericVar *var, int32 typmod); static bool numericvar_to_int32(const NumericVar *var, int32 *result); static bool numericvar_to_int64(const NumericVar *var, int64 *result); static void int64_to_numericvar(int64 val, NumericVar *var); +static bool numericvar_to_uint64(const NumericVar *var, uint64 *result); #ifdef HAVE_INT128 static bool numericvar_to_int128(const NumericVar *var, int128 *result); static void int128_to_numericvar(int128 val, NumericVar *var); @@ -3688,6 +3690,30 @@ numeric_float4(PG_FUNCTION_ARGS) } +Datum +numeric_pg_lsn(PG_FUNCTION_ARGS) +{ + Numeric num = PG_GETARG_NUMERIC(0); + NumericVar x; + XLogRecPtr result; + + if (NUMERIC_IS_NAN(num)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot convert NaN to pg_lsn"))); + + /* Convert to variable format and thence to pg_lsn */ + init_var_from_num(num, &x); + + if (!numericvar_to_uint64(&x, (uint64 *) &result)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("pg_lsn out of range"))); + + PG_RETURN_LSN(result); +} + + /* ---------------------------------------------------------------------- * * Aggregate functions @@ -6739,6 +6765,78 @@ int64_to_numericvar(int64 val, NumericVar *var) var->weight = ndigits - 1; } +/* + * Convert numeric to uint64, rounding if needed. + * + * If overflow, return false (no error is raised). Return true if okay. + */ +static bool +numericvar_to_uint64(const NumericVar *var, uint64 *result) +{ + NumericDigit *digits; + int ndigits; + int weight; + int i; + uint64 val; + NumericVar rounded; + + /* Round to nearest integer */ + init_var(&rounded); + set_var_from_var(var, &rounded); + round_var(&rounded, 0); + + /* Check for zero input */ + strip_var(&rounded); + ndigits = rounded.ndigits; + if (ndigits == 0) + { + *result = 0; + free_var(&rounded); + return true; + } + + /* Check for negative input */ + if (rounded.sign == NUMERIC_NEG) + { + free_var(&rounded); + return false; + } + + /* + * For input like 10000000000, we must treat stripped digits as real. So + * the loop assumes there are weight+1 digits before the decimal point. + */ + weight = rounded.weight; + Assert(weight >= 0 && ndigits <= weight + 1); + + /* Construct the result */ + digits = rounded.digits; + val = digits[0]; + for (i = 1; i <= weight; i++) + { + if (unlikely(pg_mul_u64_overflow(val, NBASE, &val))) + { + free_var(&rounded); + return false; + } + + if (i < ndigits) + { + if (unlikely(pg_add_u64_overflow(val, digits[i], &val))) + { + free_var(&rounded); + return false; + } + } + } + + free_var(&rounded); + + *result = val; + + return true; +} + #ifdef HAVE_INT128 /* * Convert numeric to int128, rounding if needed. diff --git a/src/backend/utils/adt/pg_lsn.c b/src/backend/utils/adt/pg_lsn.c index d9754a7778..ad0a7bd869 100644 --- a/src/backend/utils/adt/pg_lsn.c +++ b/src/backend/utils/adt/pg_lsn.c @@ -16,6 +16,7 @@ #include "funcapi.h" #include "libpq/pqformat.h" #include "utils/builtins.h" +#include "utils/numeric.h" #include "utils/pg_lsn.h" #define MAXPG_LSNLEN 17 @@ -248,3 +249,71 @@ pg_lsn_mi(PG_FUNCTION_ARGS) return result; } + +/* + * Add the number of bytes to pg_lsn, giving a new pg_lsn. + * Must handle both positive and negative numbers of bytes. + */ +Datum +pg_lsn_pli(PG_FUNCTION_ARGS) +{ + XLogRecPtr lsn = PG_GETARG_LSN(0); + Numeric nbytes = PG_GETARG_NUMERIC(1); + Datum num; + Datum res; + char buf[32]; + + if (numeric_is_nan(nbytes)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot add NaN to pg_lsn"))); + + /* Convert to numeric */ + snprintf(buf, sizeof(buf), UINT64_FORMAT, lsn); + num = DirectFunctionCall3(numeric_in, + CStringGetDatum(buf), + ObjectIdGetDatum(0), + Int32GetDatum(-1)); + + /* Add two numerics */ + res = DirectFunctionCall2(numeric_add, + NumericGetDatum(num), + NumericGetDatum(nbytes)); + + /* Convert to pg_lsn */ + return DirectFunctionCall1(numeric_pg_lsn, res); +} + +/* + * Subtract the number of bytes from pg_lsn, giving a new pg_lsn. + * Must handle both positive and negative numbers of bytes. + */ +Datum +pg_lsn_mii(PG_FUNCTION_ARGS) +{ + XLogRecPtr lsn = PG_GETARG_LSN(0); + Numeric nbytes = PG_GETARG_NUMERIC(1); + Datum num; + Datum res; + char buf[32]; + + if (numeric_is_nan(nbytes)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot subtract NaN from pg_lsn"))); + + /* Convert to numeric */ + snprintf(buf, sizeof(buf), UINT64_FORMAT, lsn); + num = DirectFunctionCall3(numeric_in, + CStringGetDatum(buf), + ObjectIdGetDatum(0), + Int32GetDatum(-1)); + + /* Subtract two numerics */ + res = DirectFunctionCall2(numeric_sub, + NumericGetDatum(num), + NumericGetDatum(nbytes)); + + /* Convert to pg_lsn */ + return DirectFunctionCall1(numeric_pg_lsn, res); +} diff --git a/src/include/catalog/pg_operator.dat b/src/include/catalog/pg_operator.dat index 00ada7e48f..15dfa1497a 100644 --- a/src/include/catalog/pg_operator.dat +++ b/src/include/catalog/pg_operator.dat @@ -2909,6 +2909,17 @@ { oid => '3228', descr => 'minus', oprname => '-', oprleft => 'pg_lsn', oprright => 'pg_lsn', oprresult => 'numeric', oprcode => 'pg_lsn_mi' }, +{ oid => '5025', descr => 'add', + oprname => '+', oprleft => 'pg_lsn', oprright => 'numeric', + oprresult => 'pg_lsn', oprcom => '+(numeric,pg_lsn)', + oprcode => 'pg_lsn_pli' }, +{ oid => '5026', descr => 'add', + oprname => '+', oprleft => 'numeric', oprright => 'pg_lsn', + oprresult => 'pg_lsn', oprcom => '+(pg_lsn,numeric)', + oprcode => 'numeric_pl_pg_lsn' }, +{ oid => '5027', descr => 'subtract', + oprname => '-', oprleft => 'pg_lsn', oprright => 'numeric', + oprresult => 'pg_lsn', oprcode => 'pg_lsn_mii' }, # enum operators { oid => '3516', descr => 'equal', diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 4bce3ad8de..0eb94d92e3 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -4398,6 +4398,9 @@ { oid => '1783', descr => 'convert numeric to int2', proname => 'int2', prorettype => 'int2', proargtypes => 'numeric', prosrc => 'numeric_int2' }, +{ oid => '6103', descr => 'convert numeric to pg_lsn', + proname => 'pg_lsn', prorettype => 'pg_lsn', proargtypes => 'numeric', + prosrc => 'numeric_pg_lsn' }, { oid => '3556', descr => 'convert jsonb to boolean', proname => 'bool', prorettype => 'bool', proargtypes => 'jsonb', @@ -8578,6 +8581,15 @@ { oid => '4188', descr => 'smaller of two', proname => 'pg_lsn_smaller', prorettype => 'pg_lsn', proargtypes => 'pg_lsn pg_lsn', prosrc => 'pg_lsn_smaller' }, +{ oid => '5022', + proname => 'pg_lsn_pli', prorettype => 'pg_lsn', + proargtypes => 'pg_lsn numeric', prosrc => 'pg_lsn_pli' }, +{ oid => '5023', + proname => 'numeric_pl_pg_lsn', prolang => 'sql', prorettype => 'pg_lsn', + proargtypes => 'numeric pg_lsn', prosrc => 'select $2 + $1' }, +{ oid => '5024', + proname => 'pg_lsn_mii', prorettype => 'pg_lsn', + proargtypes => 'pg_lsn numeric', prosrc => 'pg_lsn_mii' }, # enum related procs { oid => '3504', descr => 'I/O', diff --git a/src/test/regress/expected/numeric.out b/src/test/regress/expected/numeric.out index c7fe63d037..978d38b729 100644 --- a/src/test/regress/expected/numeric.out +++ b/src/test/regress/expected/numeric.out @@ -2315,3 +2315,30 @@ FROM (VALUES (0::numeric, 0::numeric), SELECT lcm(9999 * (10::numeric)^131068 + (10::numeric^131068 - 1), 2); -- overflow ERROR: value overflows numeric format +-- +-- Tests for pg_lsn() +-- +SELECT pg_lsn(23783416::numeric); + pg_lsn +----------- + 0/16AE7F8 +(1 row) + +SELECT pg_lsn(0::numeric); + pg_lsn +-------- + 0/0 +(1 row) + +SELECT pg_lsn(18446744073709551615::numeric); + pg_lsn +------------------- + FFFFFFFF/FFFFFFFF +(1 row) + +SELECT pg_lsn(-1::numeric); +ERROR: pg_lsn out of range +SELECT pg_lsn(18446744073709551616::numeric); +ERROR: pg_lsn out of range +SELECT pg_lsn('NaN'::numeric); +ERROR: cannot convert NaN to pg_lsn diff --git a/src/test/regress/expected/pg_lsn.out b/src/test/regress/expected/pg_lsn.out index 64d41dfdad..99a748a6a7 100644 --- a/src/test/regress/expected/pg_lsn.out +++ b/src/test/regress/expected/pg_lsn.out @@ -71,6 +71,56 @@ SELECT '0/16AE7F8'::pg_lsn - '0/16AE7F7'::pg_lsn; 1 (1 row) +SELECT '0/16AE7F7'::pg_lsn + 16::numeric; + ?column? +----------- + 0/16AE807 +(1 row) + +SELECT 16::numeric + '0/16AE7F7'::pg_lsn; + ?column? +----------- + 0/16AE807 +(1 row) + +SELECT '0/16AE7F7'::pg_lsn - 16::numeric; + ?column? +----------- + 0/16AE7E7 +(1 row) + +SELECT 'FFFFFFFF/FFFFFFFE'::pg_lsn + 1::numeric; + ?column? +------------------- + FFFFFFFF/FFFFFFFF +(1 row) + +SELECT 'FFFFFFFF/FFFFFFFE'::pg_lsn + 2::numeric; -- out of range error +ERROR: pg_lsn out of range +SELECT '0/1'::pg_lsn - 1::numeric; + ?column? +---------- + 0/0 +(1 row) + +SELECT '0/1'::pg_lsn - 2::numeric; -- out of range error +ERROR: pg_lsn out of range +SELECT '0/0'::pg_lsn + ('FFFFFFFF/FFFFFFFF'::pg_lsn - '0/0'::pg_lsn); + ?column? +------------------- + FFFFFFFF/FFFFFFFF +(1 row) + +SELECT 'FFFFFFFF/FFFFFFFF'::pg_lsn - ('FFFFFFFF/FFFFFFFF'::pg_lsn - '0/0'::pg_lsn); + ?column? +---------- + 0/0 +(1 row) + +SELECT '0/16AE7F7'::pg_lsn + 'NaN'::numeric; +ERROR: cannot add NaN to pg_lsn +SELECT '0/16AE7F7'::pg_lsn - 'NaN'::numeric; +ERROR: cannot subtract NaN from pg_lsn -- Check btree and hash opclasses EXPLAIN (COSTS OFF) SELECT DISTINCT (i || '/' || j)::pg_lsn f diff --git a/src/test/regress/sql/numeric.sql b/src/test/regress/sql/numeric.sql index 41475a9a24..fdfe6fa3c7 100644 --- a/src/test/regress/sql/numeric.sql +++ b/src/test/regress/sql/numeric.sql @@ -1111,3 +1111,13 @@ FROM (VALUES (0::numeric, 0::numeric), (4232.820::numeric, 132.72000::numeric)) AS v(a, b); SELECT lcm(9999 * (10::numeric)^131068 + (10::numeric^131068 - 1), 2); -- overflow + +-- +-- Tests for pg_lsn() +-- +SELECT pg_lsn(23783416::numeric); +SELECT pg_lsn(0::numeric); +SELECT pg_lsn(18446744073709551615::numeric); +SELECT pg_lsn(-1::numeric); +SELECT pg_lsn(18446744073709551616::numeric); +SELECT pg_lsn('NaN'::numeric); diff --git a/src/test/regress/sql/pg_lsn.sql b/src/test/regress/sql/pg_lsn.sql index 2c143c82ff..615368ba96 100644 --- a/src/test/regress/sql/pg_lsn.sql +++ b/src/test/regress/sql/pg_lsn.sql @@ -27,6 +27,17 @@ SELECT '0/16AE7F7' < '0/16AE7F8'::pg_lsn; SELECT '0/16AE7F8' > pg_lsn '0/16AE7F7'; SELECT '0/16AE7F7'::pg_lsn - '0/16AE7F8'::pg_lsn; SELECT '0/16AE7F8'::pg_lsn - '0/16AE7F7'::pg_lsn; +SELECT '0/16AE7F7'::pg_lsn + 16::numeric; +SELECT 16::numeric + '0/16AE7F7'::pg_lsn; +SELECT '0/16AE7F7'::pg_lsn - 16::numeric; +SELECT 'FFFFFFFF/FFFFFFFE'::pg_lsn + 1::numeric; +SELECT 'FFFFFFFF/FFFFFFFE'::pg_lsn + 2::numeric; -- out of range error +SELECT '0/1'::pg_lsn - 1::numeric; +SELECT '0/1'::pg_lsn - 2::numeric; -- out of range error +SELECT '0/0'::pg_lsn + ('FFFFFFFF/FFFFFFFF'::pg_lsn - '0/0'::pg_lsn); +SELECT 'FFFFFFFF/FFFFFFFF'::pg_lsn - ('FFFFFFFF/FFFFFFFF'::pg_lsn - '0/0'::pg_lsn); +SELECT '0/16AE7F7'::pg_lsn + 'NaN'::numeric; +SELECT '0/16AE7F7'::pg_lsn - 'NaN'::numeric; -- Check btree and hash opclasses EXPLAIN (COSTS OFF)