| From: | Fujii Masao <masao(dot)fujii(at)gmail(dot)com> |
|---|---|
| To: | Dewei Dai <daidewei1970(at)163(dot)com> |
| Cc: | Mircea Cadariu <cadariu(dot)mircea(at)gmail(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org> |
| Subject: | Re: Re: pg_recvlogical: Prevent flushed data from being re-sent after restarting replication |
| Date: | 2026-01-14 01:14:28 |
| Message-ID: | CAHGQGwF5QU2DEALNOznOxtXWdqADD4NLeVac0hKCWwv7cq3BBw@mail.gmail.com |
| Views: | Whole Thread | Raw Message | Download mbox | Resend email |
| Thread: | |
| Lists: | pgsql-hackers |
On Sun, Jan 11, 2026 at 6:49 PM Dewei Dai <daidewei1970(at)163(dot)com> wrote:
>
> Hi Fujii,
>
> At 2026-01-11 17:21:19, "Fujii Masao" <masao(dot)fujii(at)gmail(dot)com> wrote:
> >That's possible. But TBH I'm not sure how much effort is justified here.
> >The test uses pg_recvlogical to activate the slot and doesn't really test
> >pg_recvlogical itself. It's unclear how valuable it is to additionally run
> >this test on Windows...
> >
> I applied the V4 patch and tested it on a CentOS 7 x86_64 platform. The test steps are as follows:
>
> 1. Create a table:
> `create table test_id(id integer);`
> 2. Create a function to close the connection:
> `create or replace function test_f(id integer) returns integer as $$
> declare
> var1 integer;
> begin
> SELECT active_pid into var1 FROM pg_replication_slots WHERE slot_name = 'reconnect_test';
> perform pg_terminate_backend(var1);
> return 1;
> end; $$ language plpgsql;`
>
> 3. Execute the command to receive logs:
> `./pg_recvlogical --create-slot --slot reconnect_test --dbname postgres --start --file decoding.out --fsync-interval 200 --status-interval 100 --verbose`
> 4. Execute the following shell script:
> `while true
> do
> ./psql -d postgres<<EOF
> select test_f(1);
> \q
> EOF
> done`
>
> 5. Execute data insertion using psql:
> `insert into test_id values(1);
> insert into test_id values(2);`
> 6. `tail -f decoding.out`
> I found duplicate insert statements in the file.
> I don't know if this is a problem.
> Additionally, I tried moving the two lines involving `Stream LogicalLog` outside the loop
> in the `main` function, and then it worked correctly.
> `output_written_lsn = InvalidXLogRecPtr;`
> `output_fsync_lsn = InvalidXLogRecPtr;`
Thanks for the test and the investigation!
I was able to reproduce the issue as well. It occurs when the pg_recvlogical
connection is terminated before it has received any messages. The problematic
sequence is roughly:
1. The pg_recvlogical connection is terminated after running for some time.
2. StreamLogicalLog() is called again and initializes
output_written_lsn to InvalidXLogRecPtr.
3. pg_recvlogical reconnects and starts replication from valid startpos.
4. The connection is terminated again.
5. StreamLogicalLog() exits and OutputFsync() sets startpos to
output_written_lsn (i.e., InvalidXLogRecPtr).
As a result, the next StreamLogicalLog() starts replication with
startpos = InvalidXLogRecPtr, which can cause the server to resend
already-streamed data and lead to duplicate output.
The root cause is that StreamLogicalLog() reinitializes output_written_lsn and
output_fsync_lsn on every call. As you suggested, removing that initialization
fixes the issue.
I’ve updated the 0001 patch accordingly.
Attached is the updated version of the patches.
Regards,
--
Fujii Masao
| Attachment | Content-Type | Size |
|---|---|---|
| v5-0001-pg_recvlogical-Prevent-flushed-data-from-being-re.patch | application/octet-stream | 2.4 KB |
| v5-0002-Add-a-new-helper-function-wait_for_file-to-Utils..patch | application/octet-stream | 3.5 KB |
| v5-0004-pg_recvlogical-remove-unnecessary-OutputFsync-ret.patch | application/octet-stream | 2.6 KB |
| v5-0003-Add-test-for-pg_recvlogical-reconnection-behavior.patch | application/octet-stream | 4.2 KB |
| From | Date | Subject | |
|---|---|---|---|
| Next Message | Andreas Karlsson | 2026-01-14 01:17:30 | Re: Add support for EXTRA_REGRESS_OPTS for meson |
| Previous Message | Andreas Karlsson | 2026-01-14 01:07:43 | Re: [PATCH] check kernel version for io_method |