Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeGatherMerge.c
4 : * Scan a plan in multiple workers, and do order-preserving merge.
5 : *
6 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/executor/nodeGatherMerge.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/relscan.h"
18 : #include "access/xact.h"
19 : #include "executor/execdebug.h"
20 : #include "executor/execParallel.h"
21 : #include "executor/nodeGatherMerge.h"
22 : #include "executor/nodeSubplan.h"
23 : #include "executor/tqueue.h"
24 : #include "lib/binaryheap.h"
25 : #include "miscadmin.h"
26 : #include "utils/memutils.h"
27 : #include "utils/rel.h"
28 :
29 : /*
30 : * Tuple array for each worker
31 : */
32 : typedef struct GMReaderTupleBuffer
33 : {
34 : HeapTuple *tuple;
35 : int readCounter;
36 : int nTuples;
37 : bool done;
38 : } GMReaderTupleBuffer;
39 :
40 : /*
41 : * When we read tuples from workers, it's a good idea to read several at once
42 : * for efficiency when possible: this minimizes context-switching overhead.
43 : * But reading too many at a time wastes memory without improving performance.
44 : */
45 : #define MAX_TUPLE_STORE 10
46 :
47 : static int32 heap_compare_slots(Datum a, Datum b, void *arg);
48 : static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state);
49 : static HeapTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader,
50 : bool nowait, bool *done);
51 : static void gather_merge_init(GatherMergeState *gm_state);
52 : static void ExecShutdownGatherMergeWorkers(GatherMergeState *node);
53 : static bool gather_merge_readnext(GatherMergeState *gm_state, int reader,
54 : bool nowait);
55 : static void form_tuple_array(GatherMergeState *gm_state, int reader);
56 :
57 : /* ----------------------------------------------------------------
58 : * ExecInitGather
59 : * ----------------------------------------------------------------
60 : */
61 : GatherMergeState *
62 74 : ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)
63 : {
64 : GatherMergeState *gm_state;
65 : Plan *outerNode;
66 : bool hasoid;
67 : TupleDesc tupDesc;
68 :
69 : /* Gather merge node doesn't have innerPlan node. */
70 74 : Assert(innerPlan(node) == NULL);
71 :
72 : /*
73 : * create state structure
74 : */
75 74 : gm_state = makeNode(GatherMergeState);
76 74 : gm_state->ps.plan = (Plan *) node;
77 74 : gm_state->ps.state = estate;
78 :
79 : /*
80 : * Miscellaneous initialization
81 : *
82 : * create expression context for node
83 : */
84 74 : ExecAssignExprContext(estate, &gm_state->ps);
85 :
86 : /*
87 : * initialize child expressions
88 : */
89 74 : gm_state->ps.qual =
90 74 : ExecInitQual(node->plan.qual, &gm_state->ps);
91 :
92 : /*
93 : * tuple table initialization
94 : */
95 74 : ExecInitResultTupleSlot(estate, &gm_state->ps);
96 :
97 : /*
98 : * now initialize outer plan
99 : */
100 74 : outerNode = outerPlan(node);
101 74 : outerPlanState(gm_state) = ExecInitNode(outerNode, estate, eflags);
102 :
103 : /*
104 : * Initialize result tuple type and projection info.
105 : */
106 74 : ExecAssignResultTypeFromTL(&gm_state->ps);
107 74 : ExecAssignProjectionInfo(&gm_state->ps, NULL);
108 :
109 74 : gm_state->gm_initialized = false;
110 :
111 : /*
112 : * initialize sort-key information
113 : */
114 74 : if (node->numCols)
115 : {
116 : int i;
117 :
118 74 : gm_state->gm_nkeys = node->numCols;
119 74 : gm_state->gm_sortkeys =
120 74 : palloc0(sizeof(SortSupportData) * node->numCols);
121 :
122 148 : for (i = 0; i < node->numCols; i++)
123 : {
124 74 : SortSupport sortKey = gm_state->gm_sortkeys + i;
125 :
126 74 : sortKey->ssup_cxt = CurrentMemoryContext;
127 74 : sortKey->ssup_collation = node->collations[i];
128 74 : sortKey->ssup_nulls_first = node->nullsFirst[i];
129 74 : sortKey->ssup_attno = node->sortColIdx[i];
130 :
131 : /*
132 : * We don't perform abbreviated key conversion here, for the same
133 : * reasons that it isn't used in MergeAppend
134 : */
135 74 : sortKey->abbreviate = false;
136 :
137 74 : PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
138 : }
139 : }
140 :
141 : /*
142 : * store the tuple descriptor into gather merge state, so we can use it
143 : * later while initializing the gather merge slots.
144 : */
145 74 : if (!ExecContextForcesOids(&gm_state->ps, &hasoid))
146 74 : hasoid = false;
147 74 : tupDesc = ExecTypeFromTL(outerNode->targetlist, hasoid);
148 74 : gm_state->tupDesc = tupDesc;
149 :
150 74 : return gm_state;
151 : }
152 :
153 : /* ----------------------------------------------------------------
154 : * ExecGatherMerge(node)
155 : *
156 : * Scans the relation via multiple workers and returns
157 : * the next qualifying tuple.
158 : * ----------------------------------------------------------------
159 : */
160 : TupleTableSlot *
161 4090288 : ExecGatherMerge(GatherMergeState *node)
162 : {
163 : TupleTableSlot *slot;
164 : ExprContext *econtext;
165 : int i;
166 :
167 : /*
168 : * As with Gather, we don't launch workers until this node is actually
169 : * executed.
170 : */
171 4090288 : if (!node->initialized)
172 : {
173 53 : EState *estate = node->ps.state;
174 53 : GatherMerge *gm = (GatherMerge *) node->ps.plan;
175 :
176 : /*
177 : * Sometimes we might have to run without parallelism; but if parallel
178 : * mode is active then we can try to fire up some workers.
179 : */
180 53 : if (gm->num_workers > 0 && IsInParallelMode())
181 : {
182 : ParallelContext *pcxt;
183 :
184 : /* Initialize data structures for workers. */
185 53 : if (!node->pei)
186 53 : node->pei = ExecInitParallelPlan(node->ps.lefttree,
187 : estate,
188 : gm->num_workers);
189 :
190 : /* Try to launch workers. */
191 53 : pcxt = node->pei->pcxt;
192 53 : LaunchParallelWorkers(pcxt);
193 53 : node->nworkers_launched = pcxt->nworkers_launched;
194 :
195 : /* Set up tuple queue readers to read the results. */
196 53 : if (pcxt->nworkers_launched > 0)
197 : {
198 18 : node->nreaders = 0;
199 18 : node->reader = palloc(pcxt->nworkers_launched *
200 : sizeof(TupleQueueReader *));
201 :
202 18 : Assert(gm->numCols);
203 :
204 87 : for (i = 0; i < pcxt->nworkers_launched; ++i)
205 : {
206 69 : shm_mq_set_handle(node->pei->tqueue[i],
207 69 : pcxt->worker[i].bgwhandle);
208 138 : node->reader[node->nreaders++] =
209 69 : CreateTupleQueueReader(node->pei->tqueue[i],
210 : node->tupDesc);
211 : }
212 : }
213 : else
214 : {
215 : /* No workers? Then never mind. */
216 35 : ExecShutdownGatherMergeWorkers(node);
217 : }
218 : }
219 :
220 : /* always allow leader to participate */
221 53 : node->need_to_scan_locally = true;
222 53 : node->initialized = true;
223 : }
224 :
225 : /*
226 : * Reset per-tuple memory context to free any expression evaluation
227 : * storage allocated in the previous tuple cycle.
228 : */
229 4090288 : econtext = node->ps.ps_ExprContext;
230 4090288 : ResetExprContext(econtext);
231 :
232 : /*
233 : * Get next tuple, either from one of our workers, or by running the
234 : * plan ourselves.
235 : */
236 4090288 : slot = gather_merge_getnext(node);
237 4090288 : if (TupIsNull(slot))
238 40 : return NULL;
239 :
240 : /*
241 : * form the result tuple using ExecProject(), and return it --- unless
242 : * the projection produces an empty set, in which case we must loop
243 : * back around for another tuple
244 : */
245 4090248 : econtext->ecxt_outertuple = slot;
246 4090248 : return ExecProject(node->ps.ps_ProjInfo);
247 : }
248 :
249 : /* ----------------------------------------------------------------
250 : * ExecEndGatherMerge
251 : *
252 : * frees any storage allocated through C routines.
253 : * ----------------------------------------------------------------
254 : */
255 : void
256 74 : ExecEndGatherMerge(GatherMergeState *node)
257 : {
258 74 : ExecEndNode(outerPlanState(node)); /* let children clean up first */
259 74 : ExecShutdownGatherMerge(node);
260 74 : ExecFreeExprContext(&node->ps);
261 74 : ExecClearTuple(node->ps.ps_ResultTupleSlot);
262 74 : }
263 :
264 : /* ----------------------------------------------------------------
265 : * ExecShutdownGatherMerge
266 : *
267 : * Destroy the setup for parallel workers including parallel context.
268 : * Collect all the stats after workers are stopped, else some work
269 : * done by workers won't be accounted.
270 : * ----------------------------------------------------------------
271 : */
272 : void
273 127 : ExecShutdownGatherMerge(GatherMergeState *node)
274 : {
275 127 : ExecShutdownGatherMergeWorkers(node);
276 :
277 : /* Now destroy the parallel context. */
278 127 : if (node->pei != NULL)
279 : {
280 53 : ExecParallelCleanup(node->pei);
281 53 : node->pei = NULL;
282 : }
283 127 : }
284 :
285 : /* ----------------------------------------------------------------
286 : * ExecShutdownGatherMergeWorkers
287 : *
288 : * Destroy the parallel workers. Collect all the stats after
289 : * workers are stopped, else some work done by workers won't be
290 : * accounted.
291 : * ----------------------------------------------------------------
292 : */
293 : static void
294 162 : ExecShutdownGatherMergeWorkers(GatherMergeState *node)
295 : {
296 : /* Shut down tuple queue readers before shutting down workers. */
297 162 : if (node->reader != NULL)
298 : {
299 : int i;
300 :
301 87 : for (i = 0; i < node->nreaders; ++i)
302 69 : if (node->reader[i])
303 54 : DestroyTupleQueueReader(node->reader[i]);
304 :
305 18 : pfree(node->reader);
306 18 : node->reader = NULL;
307 : }
308 :
309 : /* Now shut down the workers. */
310 162 : if (node->pei != NULL)
311 88 : ExecParallelFinish(node->pei);
312 162 : }
313 :
314 : /* ----------------------------------------------------------------
315 : * ExecReScanGatherMerge
316 : *
317 : * Re-initialize the workers and rescans a relation via them.
318 : * ----------------------------------------------------------------
319 : */
320 : void
321 0 : ExecReScanGatherMerge(GatherMergeState *node)
322 : {
323 : /*
324 : * Re-initialize the parallel workers to perform rescan of relation. We
325 : * want to gracefully shutdown all the workers so that they should be able
326 : * to propagate any error or other information to master backend before
327 : * dying. Parallel context will be reused for rescan.
328 : */
329 0 : ExecShutdownGatherMergeWorkers(node);
330 :
331 0 : node->initialized = false;
332 :
333 0 : if (node->pei)
334 0 : ExecParallelReinitialize(node->pei);
335 :
336 0 : ExecReScan(node->ps.lefttree);
337 0 : }
338 :
339 : /*
340 : * Initialize the Gather merge tuple read.
341 : *
342 : * Pull at least a single tuple from each worker + leader and set up the heap.
343 : */
344 : static void
345 53 : gather_merge_init(GatherMergeState *gm_state)
346 : {
347 53 : int nreaders = gm_state->nreaders;
348 53 : bool initialize = true;
349 : int i;
350 :
351 : /*
352 : * Allocate gm_slots for the number of worker + one more slot for leader.
353 : * Last slot is always for leader. Leader always calls ExecProcNode() to
354 : * read the tuple which will return the TupleTableSlot. Later it will
355 : * directly get assigned to gm_slot. So just initialize leader gm_slot
356 : * with NULL. For other slots below code will call
357 : * ExecInitExtraTupleSlot() which will do the initialization of worker
358 : * slots.
359 : */
360 53 : gm_state->gm_slots =
361 53 : palloc((gm_state->nreaders + 1) * sizeof(TupleTableSlot *));
362 53 : gm_state->gm_slots[gm_state->nreaders] = NULL;
363 :
364 : /* Initialize the tuple slot and tuple array for each worker */
365 53 : gm_state->gm_tuple_buffers =
366 53 : (GMReaderTupleBuffer *) palloc0(sizeof(GMReaderTupleBuffer) *
367 53 : (gm_state->nreaders + 1));
368 122 : for (i = 0; i < gm_state->nreaders; i++)
369 : {
370 : /* Allocate the tuple array with MAX_TUPLE_STORE size */
371 138 : gm_state->gm_tuple_buffers[i].tuple =
372 69 : (HeapTuple *) palloc0(sizeof(HeapTuple) * MAX_TUPLE_STORE);
373 :
374 : /* Initialize slot for worker */
375 69 : gm_state->gm_slots[i] = ExecInitExtraTupleSlot(gm_state->ps.state);
376 69 : ExecSetSlotDescriptor(gm_state->gm_slots[i],
377 : gm_state->tupDesc);
378 : }
379 :
380 : /* Allocate the resources for the merge */
381 53 : gm_state->gm_heap = binaryheap_allocate(gm_state->nreaders + 1,
382 : heap_compare_slots,
383 : gm_state);
384 :
385 : /*
386 : * First, try to read a tuple from each worker (including leader) in
387 : * nowait mode, so that we initialize read from each worker as well as
388 : * leader. After this, if all active workers are unable to produce a
389 : * tuple, then re-read and this time use wait mode. For workers that were
390 : * able to produce a tuple in the earlier loop and are still active, just
391 : * try to fill the tuple array if more tuples are avaiable.
392 : */
393 : reread:
394 280 : for (i = 0; i < nreaders + 1; i++)
395 : {
396 417 : if (!gm_state->gm_tuple_buffers[i].done &&
397 380 : (TupIsNull(gm_state->gm_slots[i]) ||
398 17 : gm_state->gm_slots[i]->tts_isempty))
399 : {
400 382 : if (gather_merge_readnext(gm_state, i, initialize))
401 : {
402 86 : binaryheap_add_unordered(gm_state->gm_heap,
403 86 : Int32GetDatum(i));
404 : }
405 : }
406 : else
407 18 : form_tuple_array(gm_state, i);
408 : }
409 71 : initialize = false;
410 :
411 140 : for (i = 0; i < nreaders; i++)
412 130 : if (!gm_state->gm_tuple_buffers[i].done &&
413 111 : (TupIsNull(gm_state->gm_slots[i]) ||
414 25 : gm_state->gm_slots[i]->tts_isempty))
415 : goto reread;
416 :
417 53 : binaryheap_build(gm_state->gm_heap);
418 53 : gm_state->gm_initialized = true;
419 53 : }
420 :
421 : /*
422 : * Clear out the tuple table slots for each gather merge input.
423 : */
424 : static void
425 40 : gather_merge_clear_slots(GatherMergeState *gm_state)
426 : {
427 : int i;
428 :
429 77 : for (i = 0; i < gm_state->nreaders; i++)
430 : {
431 37 : pfree(gm_state->gm_tuple_buffers[i].tuple);
432 37 : gm_state->gm_slots[i] = ExecClearTuple(gm_state->gm_slots[i]);
433 : }
434 :
435 : /* Free tuple array as we don't need it any more */
436 40 : pfree(gm_state->gm_tuple_buffers);
437 : /* Free the binaryheap, which was created for sort */
438 40 : binaryheap_free(gm_state->gm_heap);
439 40 : }
440 :
441 : /*
442 : * Read the next tuple for gather merge.
443 : *
444 : * Fetch the sorted tuple out of the heap.
445 : */
446 : static TupleTableSlot *
447 4090288 : gather_merge_getnext(GatherMergeState *gm_state)
448 : {
449 : int i;
450 :
451 4090288 : if (!gm_state->gm_initialized)
452 : {
453 : /*
454 : * First time through: pull the first tuple from each participant, and
455 : * set up the heap.
456 : */
457 53 : gather_merge_init(gm_state);
458 : }
459 : else
460 : {
461 : /*
462 : * Otherwise, pull the next tuple from whichever participant we
463 : * returned from last time, and reinsert that participant's index into
464 : * the heap, because it might now compare differently against the
465 : * other elements of the heap.
466 : */
467 4090235 : i = DatumGetInt32(binaryheap_first(gm_state->gm_heap));
468 :
469 4090235 : if (gather_merge_readnext(gm_state, i, false))
470 4090181 : binaryheap_replace_first(gm_state->gm_heap, Int32GetDatum(i));
471 : else
472 54 : (void) binaryheap_remove_first(gm_state->gm_heap);
473 : }
474 :
475 4090288 : if (binaryheap_empty(gm_state->gm_heap))
476 : {
477 : /* All the queues are exhausted, and so is the heap */
478 40 : gather_merge_clear_slots(gm_state);
479 40 : return NULL;
480 : }
481 : else
482 : {
483 : /* Return next tuple from whichever participant has the leading one */
484 4090248 : i = DatumGetInt32(binaryheap_first(gm_state->gm_heap));
485 4090248 : return gm_state->gm_slots[i];
486 : }
487 : }
488 :
489 : /*
490 : * Read the tuple for given reader in nowait mode, and form the tuple array.
491 : */
492 : static void
493 12308 : form_tuple_array(GatherMergeState *gm_state, int reader)
494 : {
495 12308 : GMReaderTupleBuffer *tuple_buffer = &gm_state->gm_tuple_buffers[reader];
496 : int i;
497 :
498 : /* Last slot is for leader and we don't build tuple array for leader */
499 12308 : if (reader == gm_state->nreaders)
500 18 : return;
501 :
502 : /*
503 : * We here because we already read all the tuples from the tuple array, so
504 : * initialize the counter to zero.
505 : */
506 12290 : if (tuple_buffer->nTuples == tuple_buffer->readCounter)
507 12290 : tuple_buffer->nTuples = tuple_buffer->readCounter = 0;
508 :
509 : /* Tuple array is already full? */
510 12290 : if (tuple_buffer->nTuples == MAX_TUPLE_STORE)
511 0 : return;
512 :
513 135076 : for (i = tuple_buffer->nTuples; i < MAX_TUPLE_STORE; i++)
514 : {
515 122801 : tuple_buffer->tuple[i] = heap_copytuple(gm_readnext_tuple(gm_state,
516 : reader,
517 : false,
518 : &tuple_buffer->done));
519 122801 : if (!HeapTupleIsValid(tuple_buffer->tuple[i]))
520 15 : break;
521 122786 : tuple_buffer->nTuples++;
522 : }
523 : }
524 :
525 : /*
526 : * Store the next tuple for a given reader into the appropriate slot.
527 : *
528 : * Returns false if the reader is exhausted, and true otherwise.
529 : */
530 : static bool
531 4090426 : gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
532 : {
533 : GMReaderTupleBuffer *tuple_buffer;
534 4090426 : HeapTuple tup = NULL;
535 :
536 : /*
537 : * If we're being asked to generate a tuple from the leader, then we
538 : * just call ExecProcNode as normal to produce one.
539 : */
540 4090426 : if (gm_state->nreaders == reader)
541 : {
542 3955421 : if (gm_state->need_to_scan_locally)
543 : {
544 3955421 : PlanState *outerPlan = outerPlanState(gm_state);
545 : TupleTableSlot *outerTupleSlot;
546 :
547 3955421 : outerTupleSlot = ExecProcNode(outerPlan);
548 :
549 3955421 : if (!TupIsNull(outerTupleSlot))
550 : {
551 3955381 : gm_state->gm_slots[reader] = outerTupleSlot;
552 3955381 : return true;
553 : }
554 40 : gm_state->gm_tuple_buffers[reader].done = true;
555 40 : gm_state->need_to_scan_locally = false;
556 : }
557 40 : return false;
558 : }
559 :
560 : /* Otherwise, check the state of the relevant tuple buffer. */
561 135005 : tuple_buffer = &gm_state->gm_tuple_buffers[reader];
562 :
563 135005 : if (tuple_buffer->nTuples > tuple_buffer->readCounter)
564 : {
565 : /* Return any tuple previously read that is still buffered. */
566 122596 : tuple_buffer = &gm_state->gm_tuple_buffers[reader];
567 122596 : tup = tuple_buffer->tuple[tuple_buffer->readCounter++];
568 : }
569 12409 : else if (tuple_buffer->done)
570 : {
571 : /* Reader is known to be exhausted. */
572 15 : DestroyTupleQueueReader(gm_state->reader[reader]);
573 15 : gm_state->reader[reader] = NULL;
574 15 : return false;
575 : }
576 : else
577 : {
578 : /* Read and buffer next tuple. */
579 12394 : tup = heap_copytuple(gm_readnext_tuple(gm_state,
580 : reader,
581 : nowait,
582 : &tuple_buffer->done));
583 :
584 : /*
585 : * Attempt to read more tuples in nowait mode and store them in
586 : * the tuple array.
587 : */
588 12394 : if (HeapTupleIsValid(tup))
589 12290 : form_tuple_array(gm_state, reader);
590 : else
591 104 : return false;
592 : }
593 :
594 134886 : Assert(HeapTupleIsValid(tup));
595 :
596 : /* Build the TupleTableSlot for the given tuple */
597 134886 : ExecStoreTuple(tup, /* tuple to store */
598 134886 : gm_state->gm_slots[reader], /* slot in which to store the
599 : * tuple */
600 : InvalidBuffer, /* buffer associated with this tuple */
601 : true); /* pfree this pointer if not from heap */
602 :
603 134886 : return true;
604 : }
605 :
606 : /*
607 : * Attempt to read a tuple from given reader.
608 : */
609 : static HeapTuple
610 135195 : gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
611 : bool *done)
612 : {
613 : TupleQueueReader *reader;
614 135195 : HeapTuple tup = NULL;
615 : MemoryContext oldContext;
616 : MemoryContext tupleContext;
617 :
618 135195 : tupleContext = gm_state->ps.ps_ExprContext->ecxt_per_tuple_memory;
619 :
620 135195 : if (done != NULL)
621 135195 : *done = false;
622 :
623 : /* Check for async events, particularly messages from workers. */
624 135195 : CHECK_FOR_INTERRUPTS();
625 :
626 : /* Attempt to read a tuple. */
627 135195 : reader = gm_state->reader[nreader];
628 :
629 : /* Run TupleQueueReaders in per-tuple context */
630 135195 : oldContext = MemoryContextSwitchTo(tupleContext);
631 135195 : tup = TupleQueueReaderNext(reader, nowait, done);
632 135195 : MemoryContextSwitchTo(oldContext);
633 :
634 135195 : return tup;
635 : }
636 :
637 : /*
638 : * We have one slot for each item in the heap array. We use SlotNumber
639 : * to store slot indexes. This doesn't actually provide any formal
640 : * type-safety, but it makes the code more self-documenting.
641 : */
642 : typedef int32 SlotNumber;
643 :
644 : /*
645 : * Compare the tuples in the two given slots.
646 : */
647 : static int32
648 446713 : heap_compare_slots(Datum a, Datum b, void *arg)
649 : {
650 446713 : GatherMergeState *node = (GatherMergeState *) arg;
651 446713 : SlotNumber slot1 = DatumGetInt32(a);
652 446713 : SlotNumber slot2 = DatumGetInt32(b);
653 :
654 446713 : TupleTableSlot *s1 = node->gm_slots[slot1];
655 446713 : TupleTableSlot *s2 = node->gm_slots[slot2];
656 : int nkey;
657 :
658 446713 : Assert(!TupIsNull(s1));
659 446713 : Assert(!TupIsNull(s2));
660 :
661 465154 : for (nkey = 0; nkey < node->gm_nkeys; nkey++)
662 : {
663 446713 : SortSupport sortKey = node->gm_sortkeys + nkey;
664 446713 : AttrNumber attno = sortKey->ssup_attno;
665 : Datum datum1,
666 : datum2;
667 : bool isNull1,
668 : isNull2;
669 : int compare;
670 :
671 446713 : datum1 = slot_getattr(s1, attno, &isNull1);
672 446713 : datum2 = slot_getattr(s2, attno, &isNull2);
673 :
674 446713 : compare = ApplySortComparator(datum1, isNull1,
675 : datum2, isNull2,
676 : sortKey);
677 446713 : if (compare != 0)
678 428272 : return -compare;
679 : }
680 18441 : return 0;
681 : }
|