Re: libpq compression

From: Konstantin Knizhnik <k(dot)knizhnik(at)postgrespro(dot)ru>
To: Andres Freund <andres(at)anarazel(dot)de>
Cc: PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>, "Andrey M(dot) Borodin" <x4mmm(at)yandex-team(dot)ru>, Даниил Захлыстов <usernamedt(at)yandex-team(dot)ru>
Subject: Re: libpq compression
Date: 2020-10-29 14:45:28
Message-ID: eb8e63d3-bfe5-97f4-d852-13d1be39f040@postgrespro.ru
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

New version of the patch with fixed is attached.

On 28.10.2020 22:27, Andres Freund wrote:
> Hi,
>
> On 2020-10-26 19:20:46 +0300, Konstantin Knizhnik wrote:
>> diff --git a/configure b/configure
>> index ace4ed5..deba608 100755
>> --- a/configure
>> +++ b/configure
>> @@ -700,6 +700,7 @@ LD
>> LDFLAGS_SL
>> LDFLAGS_EX
>> with_zlib
>> +with_zstd
>> with_system_tzdata
>> with_libxslt
>> XML2_LIBS
>
> I don't see a corresponding configure.ac change?
>
>
>> + <varlistentry id="libpq-connect-compression" xreflabel="compression">
>> + <term><literal>compression</literal></term>
>> + <listitem>
>> + <para>
>> + Request compression of libpq traffic. Client sends to the server list of compression algorithms, supported by client library.
>> + If server supports one of this algorithms, then it acknowledges use of this algorithm and then all libpq messages send both from client to server and
>> + visa versa will be compressed. If server is not supporting any of the suggested algorithms, then it replies with 'n' (no compression)
>> + message and it is up to the client whether to continue work without compression or report error.
>> + Supported compression algorithms are chosen at configure time. Right now two libraries are supported: zlib (default) and zstd (if Postgres was
>> + configured with --with-zstd option). In both cases streaming mode is used.
>> + </para>
>> + </listitem>
>> + </varlistentry>
>> +
>
> - there should be a reference to potential security impacts when used in
> combination with encrypted connections
> - What does " and it is up to the client whether to continue work
> without compression or report error" actually mean for a libpq parameter?
> - What is the point of the "streaming mode" reference?
>
>
>
>> @@ -263,6 +272,21 @@
>> </varlistentry>
>>
>> <varlistentry>
>> + <term>CompressionAck</term>
>> + <listitem>
>> + <para>
>> + Server acknowledges using compression for client-server communication protocol.
>> + Compression can be requested by client by including "compression" option in connection string.
>> + Client sends to the server list of compression algorithms, supported by client library
>> + (compression algorithm is identified by one letter: <literal>'f'</literal> - Facebook zstd, <literal>'z'</literal> - zlib,...).
>> + If server supports one of this algorithms, then it acknowledges use of this algorithm and all subsequent libpq messages send both from client to server and
>> + visa versa will be compressed. If server is not supporting any of the suggested algorithms, then it replies with 'n' (no compression)
>> + algorithm identifier and it is up to the client whether to continue work without compression or report error.
>> + </para>
>> + </listitem>
>> + </varlistentry>
> Why are compression methods identified by one byte identifiers? That
> seems unnecessarily small, given this is commonly a once-per-connection
> action?
>
>
> The protocol sounds to me like there's no way to enable/disable
> compression in an existing connection. To me it seems better to have an
> explicit, client initiated, request to use a specific method of
> compression (including none). That allows to enable compression for bulk
> work, and disable it in other cases (e.g. for security sensitive
> content, or for unlikely to compress well content).
>
> I think that would also make cross-version handling easier, because a
> newer client driver can send the compression request and handle the
> error, without needing to reconnect or such.
>
> Most importantly, I think such a design is basically a necessity to make
> connection poolers to work in a sensible way.
>
> And lastly, wouldn't it be reasonable to allow to specify things like
> compression levels? All that doesn't have to be supported now, but I
> think the protocol should take that into account.
>
>
>> +<para>
>> + Used compression algorithm. Right now the following streaming compression algorithms are supported: 'f' - Facebook zstd, 'z' - zlib, 'n' - no compression.
>> +</para>
> I would prefer this just be referenced as zstd or zstandard, not
> facebook zstd. There's an RFC (albeit only "informational"), and it
> doesn't name facebook, except as an employer:
> https://tools.ietf.org/html/rfc8478
>
>
>> +int
>> +pq_configure(Port* port)
>> +{
>> + char* client_compression_algorithms = port->compression_algorithms;
>> + /*
>> + * If client request compression, it sends list of supported compression algorithms.
>> + * Each compression algorirthm is idetified by one letter ('f' - Facebook zsts, 'z' - xlib)
>> + */
> s/algorirthm/algorithm/
> s/idetified/identified/
> s/zsts/zstd/
> s/xlib/zlib/
>
> That's, uh, quite the typo density.
>
>
>> + if (client_compression_algorithms)
>> + {
>> + char server_compression_algorithms[ZPQ_MAX_ALGORITHMS];
>> + char compression_algorithm = ZPQ_NO_COMPRESSION;
>> + char compression[6] = {'z',0,0,0,5,0}; /* message length = 5 */
>> + int rc;
> Why is this hand-rolling protocol messages?
>
>
>> + /* Intersect lists */
>> + while (*client_compression_algorithms != '\0')
>> + {
>> + if (strchr(server_compression_algorithms, *client_compression_algorithms))
>> + {
>> + compression_algorithm = *client_compression_algorithms;
>> + break;
>> + }
>> + client_compression_algorithms += 1;
>> + }
> Why isn't this is handled within zpq?
>
>
>> + /* Send 'z' message to the client with selectde comression algorithm ('n' if match is ont found) */
> s/selectde/selected/
> s/comression/compression/
> s/ont/not/
>
>
>> + socket_set_nonblocking(false);
>> + while ((rc = secure_write(MyProcPort, compression, sizeof(compression))) < 0
>> + && errno == EINTR);
>> + if ((size_t)rc != sizeof(compression))
>> + return -1;
> Huh? This all seems like an abstraction violation.
>
>
>> + /* initialize compression */
>> + if (zpq_set_algorithm(compression_algorithm))
>> + PqStream = zpq_create((zpq_tx_func)secure_write, (zpq_rx_func)secure_read, MyProcPort);
>> + }
>> + return 0;
>> +}
> Why is zpq a wrapper around secure_write/read? I'm a bit worried this
> will reduce the other places we could use zpq.
>
>
>> static int
>> -pq_recvbuf(void)
>> +pq_recvbuf(bool nowait)
>> {
>> + /* If srteaming compression is enabled then use correpondent comression read function. */
> s/srteaming/streaming/
> s/correpondent/correponding/
> s/comression/compression/
>
> Could you please try to proof-read the patch a bit? The typo density
> is quite high.
>
>
>> + r = PqStream
>> + ? zpq_read(PqStream, PqRecvBuffer + PqRecvLength,
>> + PQ_RECV_BUFFER_SIZE - PqRecvLength, &processed)
>> + : secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
>> + PQ_RECV_BUFFER_SIZE - PqRecvLength);
>> + PqRecvLength += processed;
> ? : doesn't make sense to me in this case. This should be an if/else.
>
>
>> if (r < 0)
>> {
>> + if (r == ZPQ_DECOMPRESS_ERROR)
>> + {
>> + char const* msg = zpq_error(PqStream);
>> + if (msg == NULL)
>> + msg = "end of stream";
>> + ereport(COMMERROR,
>> + (errcode_for_socket_access(),
>> + errmsg("failed to decompress data: %s", msg)));
>> + return EOF;
>> + }
> I don't think we should error out with "failed to decompress data:"
> e.g. when the client closed the connection.
>
>
>
>> @@ -1413,13 +1457,18 @@ internal_flush(void)
>> char *bufptr = PqSendBuffer + PqSendStart;
>> char *bufend = PqSendBuffer + PqSendPointer;
>>
>> - while (bufptr < bufend)
>> + while (bufptr < bufend || zpq_buffered(PqStream) != 0) /* has more data to flush or unsent data in internal compression buffer */
>> {
> Overly long line.
>
>
>> - int r;
>> -
>> - r = secure_write(MyProcPort, bufptr, bufend - bufptr);
>> -
>> - if (r <= 0)
>> + int r;
>> + size_t processed = 0;
>> + size_t available = bufend - bufptr;
>> + r = PqStream
>> + ? zpq_write(PqStream, bufptr, available, &processed)
>> + : secure_write(MyProcPort, bufptr, available);
> Same comment as above, re ternary expression.
>
>
>
>> +/*
>> + * Functions implementing streaming compression algorithm
>> + */
>> +typedef struct
>> +{
>> + /*
>> + * Returns letter identifying compression algorithm.
>> + */
>> + char (*name)(void);
>> +
>> + /*
>> + * Create compression stream with using rx/tx function for fetching/sending compressed data
>> + */
>> + ZpqStream* (*create)(zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg);
>> +
>> + /*
>> + * Read up to "size" raw (decompressed) bytes.
>> + * Returns number of decompressed bytes or error code.
>> + * Error code is either ZPQ_DECOMPRESS_ERROR either error code returned by the rx function.
>> + */
>> + ssize_t (*read)(ZpqStream *zs, void *buf, size_t size);
>> +
>> + /*
>> + * Write up to "size" raw (decompressed) bytes.
>> + * Returns number of written raw bytes or error code returned by tx function.
>> + * In the last case amount of written raw bytes is stored in *processed.
>> + */
>> + ssize_t (*write)(ZpqStream *zs, void const *buf, size_t size, size_t *processed);
> This should at least specify how these functions are supposed to handle
> blocking/nonblocking sockets.
>
>
>> +
>> +#define ZSTD_BUFFER_SIZE (8*1024)
>> +#define ZSTD_COMPRESSION_LEVEL 1
> Add some arguments for choosing these parameters.
>
>
>> +
>> +/*
>> + * Array with all supported compression algorithms.
>> + */
>> +static ZpqAlgorithm const zpq_algorithms[] =
>> +{
>> +#if HAVE_LIBZSTD
>> + {zstd_name, zstd_create, zstd_read, zstd_write, zstd_free, zstd_error, zstd_buffered},
>> +#endif
>> +#if HAVE_LIBZ
>> + {zlib_name, zlib_create, zlib_read, zlib_write, zlib_free, zlib_error, zlib_buffered},
>> +#endif
>> + {NULL}
>> +};
> I think it's preferrable to use designated initializers.
>
> Do we really need zero terminated lists? Works fine, but brrr.
>
>> +/*
>> + * Index of used compression algorithm in zpq_algorithms array.
>> + */
>> +static int zpq_algorithm_impl;
> This is just odd API design imo. Why doesn't the dispatch work based on
> an argument for zpq_create() and the ZpqStream * for the rest?
>
> What if there's two libpq connections in one process? To servers
> supporting different compression algorithms? This isn't going to fly.
>
>
>> +/*
>> + * Get list of the supported algorithms.
>> + * Each algorithm is identified by one letter: 'f' - Facebook zstd, 'z' - zlib.
>> + * Algorithm identifies are appended to the provided buffer and terminated by '\0'.
>> + */
>> +void
>> +zpq_get_supported_algorithms(char algorithms[ZPQ_MAX_ALGORITHMS])
>> +{
>> + int i;
>> + for (i = 0; zpq_algorithms[i].name != NULL; i++)
>> + {
>> + Assert(i < ZPQ_MAX_ALGORITHMS);
>> + algorithms[i] = zpq_algorithms[i].name();
>> + }
>> + Assert(i < ZPQ_MAX_ALGORITHMS);
>> + algorithms[i] = '\0';
>> +}
> Uh, doesn't this bake ZPQ_MAX_ALGORITHMS into the ABI? That seems
> entirely unnecessary?
>
>
>
>> @@ -2180,6 +2257,20 @@ build_startup_packet(const PGconn *conn, char *packet,
>> ADD_STARTUP_OPTION("replication", conn->replication);
>> if (conn->pgoptions && conn->pgoptions[0])
>> ADD_STARTUP_OPTION("options", conn->pgoptions);
>> + if (conn->compression && conn->compression[0])
>> + {
>> + bool enabled;
>> + /*
>> + * If compressoin is enabled, then send to the server list of compression algorithms
>> + * supported by client
>> + */
> s/compressoin/compression/
>
>> + if (parse_bool(conn->compression, &enabled))
>> + {
>> + char compression_algorithms[ZPQ_MAX_ALGORITHMS];
>> + zpq_get_supported_algorithms(compression_algorithms);
>> + ADD_STARTUP_OPTION("compression", compression_algorithms);
>> + }
>> + }
> I think this needs to work in a graceful manner across server
> versions. You can make that work with an argument, using the _pq_
> parameter stuff, but as I said earlier, I think it's a mistake to deal
> with this in the startup packet anyway.
>
> Greetings,
>
> Andres Freund

--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company

Attachment Content-Type Size
libpq-compression-21.patch text/x-patch 50.9 KB

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message Dmitry Dolgov 2020-10-29 14:50:25 Re: POC: GROUP BY optimization
Previous Message Magnus Hagander 2020-10-29 14:41:48 -O switch