Re: Introducing coarse grain parallelism by postgres_fdw.

From: Ashutosh Bapat <ashutosh(dot)bapat(at)enterprisedb(dot)com>
To: Kyotaro HORIGUCHI <horiguchi(dot)kyotaro(at)lab(dot)ntt(dot)co(dot)jp>
Cc: pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: Introducing coarse grain parallelism by postgres_fdw.
Date: 2014-07-25 10:07:01
Message-ID: CAFjFpRfjwgESmLELcuvSz9jBdAueUROn7OqX6oMrNObXbm7Owg@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi Kyotaro,
fetch_more_rows() always runs "FETCH 100 FROM <cursor_name>" on the foreign
server to get the next set of rows. The changes you have made seem to run
only the first FETCHes from all the nodes but not the subsequent ones. The
optimization will be helpful only when there are less than 100 rows per
postgres connection in the query. If there are more than 100 rows from a
single foreign server, the second onwards FETCHes will be serialized.

Is my understanding correct?

On Fri, Jul 25, 2014 at 2:05 PM, Kyotaro HORIGUCHI <
horiguchi(dot)kyotaro(at)lab(dot)ntt(dot)co(dot)jp> wrote:

> Hello,
>
> I noticed that postgresql_fdw can run in parallel by very small
> change. The attached patch let scans by postgres_fdws on
> different foreign servers run sumiltaneously. This seems a
> convenient entry point to parallel execution.
>
> For the testing configuration which the attched sql script makes,
> it almost halves the response time because the remote queries
> take far longer startup time than running time. The two foreign
> tables fvs1, fvs2 and fvs1_2 are defined on the same table but
> fvs1 and fvs1_2 are on the same foreign server pgs1 and fvs2 is
> on the another foreign server pgs2.
>
> =# EXPLAIN (ANALYZE on, COSTS off) SELECT a.a, a.b, b.c FROM fvs1 a join
> fvs1_2 b on (a.a = b.a);
> QUERY PLAN
> -----------------------------------------------------------------------
> Hash Join (actual time=12083.640..12083.657 rows=16 loops=1)
> Hash Cond: (a.a = b.a)
> -> Foreign Scan on fvs1 a (actual time=6091.405..6091.407 rows=10
> loops=1)
> -> Hash (actual time=5992.212..5992.212 rows=10 loops=1)
> Buckets: 1024 Batches: 1 Memory Usage: 7kB
> -> Foreign Scan on fvs1_2 b (actual time=5992.191..5992.198 rows=10
> loops=1)
> Execution time: 12085.330 ms
> (7 rows)
>
> =# EXPLAIN (ANALYZE on, COSTS off) SELECT a.a, a.b, b.c FROM fvs1 a join
> fvs2 b on (a.a = b.a);
> QUERY PLAN
> -----------------------------------------------------------------------
> Hash Join (actual time=6325.004..6325.019 rows=16 loops=1)
> Hash Cond: (a.a = b.a)
> -> Foreign Scan on fvs1 a (actual time=6324.910..6324.913 rows=10
> loops=1)
> -> Hash (actual time=0.073..0.073 rows=10 loops=1)
> Buckets: 1024 Batches: 1 Memory Usage: 7kB
> -> Foreign Scan on fvs2 b (actual time=0.048..0.052 rows=10 loops=1)
> Execution time: 6327.708 ms
> (7 rows)
>
> In turn, pure local query is executed as below..
>
> =# EXPLAIN (ANALYZE on, COSTS off) SELECT a.a, a.b, b.c FROM v a join v b
> on (a.a = b.a);
> QUERY PLAN
>
> ------------------------------------------------------------------------------
> Hash Join (actual time=15757.915..15757.925 rows=16 loops=1)
> Hash Cond: (a.a = b.a)
> -> Limit (actual time=7795.919..7795.922 rows=10 loops=1)
> -> Sort (actual time=7795.915..7795.915 rows=10 loops=1)
> -> Nested Loop (actual time=54.769..7795.618 rows=252 loops=1)
> -> Seq Scan on t a (actual time=0.010..2.117 rows=5000
> loops=1)
> -> Materialize (actual time=0.000..0.358 rows=5000
> loops=5000)
> -> Seq Scan on t b_1 (actual time=0.004..2.829 rows=5000
> ...
> -> Hash (actual time=7961.969..7961.969 rows=10 loops=1)
> -> Subquery Scan on b (actual time=7961.948..7961.952 rows=10
> loops=1)
> -> Limit (actual time=7961.946..7961.950 rows=10 loops=1)
> -> Sort (actual time=7961.946..7961.948 rows=10 loops=1)
> -> Nested Loop (actual time=53.518..7961.611 rows=252
> loops=1)
> -> Seq Scan on t a_1 (actual time=0.004..2.247
> rows=5000...
> -> Materialize (actual time=0.000..0.357 rows=5000...
> -> Seq Scan on t b_2 (actual time=0.001..1.565
> rows=500..
> Execution time: 15758.629 ms
> (26 rows)
>
>
> I will try this way for the present.
>
> Any opinions or suggestions?
>
> - Is this a correct entry point?
>
> - Parallel postgres_fdw is of course a intermediate shape. It
> should go toward more intrinsic form.
>
> - Planner should be aware of parallelism. The first step seems to
> be doable since postgres_fdw can get correct startup and running
> costs. But they might should be calculated locally for loopback
> connections finally. Dedicated node would be needed.
>
> - The far effective intercommunication means between backends
> including backend workers (which seems to be discussed in
> another thread) is needed and this could be the test bench for
> it.
>
> - This patch is the minimal implement to get parallel scan
> available. A facility to exporting/importing execution trees may
> promise far flexible parallelism. Deparsing is usable to
> reconstruct partial query?
>
> - The means for resource management, especially on number of
> backends is required. This could be done on foreign server in a
> simple form for the present. Finally this will be moved into
> intrinsic loopback connection manager?
>
> - Any other points to consider?
>
>
> regards,
>
> --
> Kyotaro Horiguchi
> NTT Open Source Software Center
>
> DROP SERVER IF EXISTS pgs1 CASCADE;
> DROP SERVER IF EXISTS pgs2 CASCADE;
> DROP VIEW IF EXISTS v CASCADE;
> DROP TABLE IF EXISTS t CASCADE;
>
> CREATE SERVER pgs1 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '/tmp',
> dbname 'postgres', use_remote_estimate 'true');
> CREATE SERVER pgs2 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '/tmp',
> dbname 'postgres', use_remote_estimate 'true');
>
> CREATE USER MAPPING FOR CURRENT_USER SERVER pgs1;
> CREATE USER MAPPING FOR CURRENT_USER SERVER pgs2;
>
> CREATE TABLE t (a int, b int, c text);
> ALTER TABLE t ALTER COLUMN c SET STORAGE PLAIN;
> INSERT INTO t (SELECT random() * 10000, random() * 10000, repeat('X',
> (random() * 1000)::int) FROM generate_series(0, 4999));
> -- EXPLAIN ANALYZE SELECT * FROM t a, t b WHERE a.b + b.b = 1000 ORDER BY
> a.b LIMIT 10;
> CREATE VIEW v AS SELECT a.a, a.b, a.c, b.a AS a2, b.b AS b2, b.c AS c2
> FROM t a, t b WHERE a.b + b.b = 1000 ORDER BY a.b LIMIT 10;
>
> CREATE FOREIGN TABLE fvs1 (a int, b int, c text, a2 int, b2 int, c2 text)
> SERVER pgs1 OPTIONS (table_name 'v');
> CREATE FOREIGN TABLE fvs1_2 (a int, b int, c text, a2 int, b2 int, c2
> text) SERVER pgs1 OPTIONS (table_name 'v');
> CREATE FOREIGN TABLE fvs2 (a int, b int, c text, a2 int, b2 int, c2 text)
> SERVER pgs2 OPTIONS (table_name 'v');
>
>
> EXPLAIN ANALYZE SELECT a.a, a.b, b.c FROM fvs1 a join fvs2 b on (a.a =
> b.a);
> EXPLAIN ANALYZE SELECT a.a, a.b, b.c FROM fvs1 a join fvs1_2 b on (a.a =
> b.a);
>
>
>
> --
> Sent via pgsql-hackers mailing list (pgsql-hackers(at)postgresql(dot)org)
> To make changes to your subscription:
> http://www.postgresql.org/mailpref/pgsql-hackers
>
>

--
Best Wishes,
Ashutosh Bapat
EnterpriseDB Corporation
The Postgres Database Company

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Ashutosh Bapat 2014-07-25 10:40:37 Re: Introducing coarse grain parallelism by postgres_fdw.
Previous Message Krystian Piećko 2014-07-25 09:53:43 sendLong in custom communication doesn't work