diff -r 3f065ec72ab8 pgbulkload/lib/pg_btree.c --- a/pgbulkload/lib/pg_btree.c Fri Jan 20 09:26:20 2012 -0600 +++ b/pgbulkload/lib/pg_btree.c Wed Jan 25 13:37:43 2012 -0600 @@ -398,6 +398,8 @@ BTReaderTerm(&reader); } +void merge_tuples(Relation heap, IndexTuple itup_dst, IndexTuple itup_src); + /* * _bt_mergeload - Merge two streams of index tuples into new index files. */ @@ -462,7 +464,6 @@ } else { - // TODO -- BSJ if (on_duplicate == ON_DUPLICATE_KEEP_NEW) { self->dup_old++; @@ -470,7 +471,21 @@ RelationGetRelationName(wstate->index)); itup2 = BTReaderGetNextItem(btspool2); } - else + else if (on_duplicate == ON_DUPLICATE_MERGE) + { + self->dup_old++; + + // merge from itup into itup2 where itup's col[i] is not null + // but itup2's col[i] IS null + merge_tuples(heapRel, itup2, itup); + + ItemPointerCopy(&t_tid2, &itup2->t_tid); + self->dup_new++; + remove_duplicate(self, heapRel, itup, + RelationGetRelationName(wstate->index)); + itup = BTSpoolGetNextItem(btspool, itup, &should_free); + } + else { ItemPointerCopy(&t_tid2, &itup2->t_tid); self->dup_new++; @@ -950,6 +965,113 @@ self->dup_old + self->dup_new, relname); } +// returns Buffer after locking it (BUFFER_LOCK_SHARE then BUFFER_LOCK_UNLOCK) +Buffer load_buffer(Relation heap, IndexTuple itup, HeapTupleData *tuple /*OUT */, ItemId *itemid /*OUT */ ) +{ + BlockNumber blknum; + BlockNumber offnum; + Buffer buffer; + Page page; + + blknum = ItemPointerGetBlockNumber(&itup->t_tid); + offnum = ItemPointerGetOffsetNumber(&itup->t_tid); + buffer = ReadBuffer(heap, blknum); + + LockBuffer(buffer, BUFFER_LOCK_SHARE); + page = BufferGetPage(buffer); + *itemid = PageGetItemId(page, offnum); + tuple->t_data = ItemIdIsNormal(*itemid) + ? (HeapTupleHeader) PageGetItem(page, *itemid) + : NULL; + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + return buffer; +} + +void load_tuple(Relation heap, + HeapTuple tuple, + IndexTuple itup, + ItemId itemid, + TupleDesc * tupdesc /* OUT */, + int * ncolumns /* OUT */, + Datum ** values /* OUT */, + bool ** nulls /* OUT */) +{ + *tupdesc = RelationGetDescr(heap); + + tuple->t_len = ItemIdGetLength(itemid); + tuple->t_self = itup->t_tid; + + *ncolumns = (*tupdesc)->natts; + *values = (Datum *) palloc(*ncolumns * sizeof(Datum)); + *nulls = (bool *) palloc(*ncolumns * sizeof(bool)); + + /* Break down the tuple into fields */ + heap_deform_tuple(tuple, *tupdesc, *values, *nulls); +} + +void merge_tuples(Relation heap, IndexTuple itup_dst, IndexTuple itup_src) +{ + HeapTupleData tuple_src; + HeapTupleData tuple_dst; + Buffer buffer_src; + Buffer buffer_dst; + ItemId itemid_src; + ItemId itemid_dst; + + // load buffers + buffer_src = load_buffer(heap, itup_src, &tuple_src, &itemid_src); + buffer_dst = load_buffer(heap, itup_dst, &tuple_dst, &itemid_dst); + + if (tuple_src.t_data != NULL) + { + int ncolumns_src, ncolumns_dst; + int i; + Datum *values_src = NULL, *values_dst = NULL; + TupleDesc tupdesc_dst, tupdesc_src; + bool *nulls_src = NULL, *nulls_dst = NULL; + bool * do_replace = NULL; + bool tuple_updated = false; + + // load source + load_tuple(heap, &tuple_src, itup_src, itemid_src, &tupdesc_src, &ncolumns_src, &values_src, &nulls_src); + + // load destination + load_tuple(heap, &tuple_dst, itup_dst, itemid_dst, &tupdesc_dst, &ncolumns_dst, &values_dst, &nulls_dst); + + do_replace = (bool *) palloc(ncolumns_dst * sizeof(bool)); + + for (i = 0; i < ncolumns_dst && i < ncolumns_src; ++i) + { + do_replace[i] = false; + + if (nulls_dst[i] && !nulls_src[i]) + { + values_dst[i] = values_src[i]; + nulls_dst[i] = nulls_src[i]; + do_replace[i] = true; + + // update new row + tuple_updated = true; + } + } + + if (tuple_updated) + { + HeapTuple new_tuple = heap_modify_tuple(&tuple_dst, tupdesc_dst, values_dst, nulls_dst, do_replace); + simple_heap_update(heap, &(tuple_dst.t_self), new_tuple); + } + + pfree(do_replace); + pfree(values_src); + pfree(nulls_src); + pfree(values_dst); + pfree(nulls_dst); + } + + ReleaseBuffer(buffer_src); + ReleaseBuffer(buffer_dst); +} + char * tuple_to_cstring(TupleDesc tupdesc, HeapTuple tuple) {