diff --git a/src/backend/storage/large_object/inv_api.c b/src/backend/storage/large_object/inv_api.c index 01e3492..60a9c69 100644 --- a/src/backend/storage/large_object/inv_api.c +++ b/src/backend/storage/large_object/inv_api.c @@ -178,10 +178,14 @@ myLargeObjectExists(Oid loid, Snapshot snapshot) static int32 getbytealen(bytea *data) { - Assert(!VARATT_IS_EXTENDED(data)); - if (VARSIZE(data) < VARHDRSZ) - elog(ERROR, "invalid VARSIZE(data)"); - return (VARSIZE(data) - VARHDRSZ); + Size size; + + size = toast_raw_datum_size(PointerGetDatum(data)); + if (size < VARHDRSZ) + elog(ERROR, "invalid size"); + if (size > VARHDRSZ + LOBLKSIZE) + elog(ERROR, "too large page"); + return (size - VARHDRSZ); } @@ -359,22 +363,12 @@ inv_getsize(LargeObjectDesc *obj_desc) { Form_pg_largeobject data; bytea *datafield; - bool pfreeit; if (HeapTupleHasNulls(tuple)) /* paranoia */ elog(ERROR, "null field found in pg_largeobject"); data = (Form_pg_largeobject) GETSTRUCT(tuple); datafield = &(data->data); /* see note at top of file */ - pfreeit = false; - if (VARATT_IS_EXTENDED(datafield)) - { - datafield = (bytea *) - heap_tuple_untoast_attr((struct varlena *) datafield); - pfreeit = true; - } lastbyte = data->pageno * LOBLKSIZE + getbytealen(datafield); - if (pfreeit) - pfree(datafield); } systable_endscan_ordered(sd); @@ -724,8 +718,9 @@ inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes) void inv_truncate(LargeObjectDesc *obj_desc, int len) { - int32 pageno = (int32) (len / LOBLKSIZE); - int off; + const int32 pageno = (int32) (len / LOBLKSIZE); + int32 reqpageno; + const int off = len % LOBLKSIZE; /* offset in the page */ ScanKeyData skey[2]; SysScanDesc sd; HeapTuple oldtuple; @@ -741,6 +736,7 @@ inv_truncate(LargeObjectDesc *obj_desc, int len) Datum values[Natts_pg_largeobject]; bool nulls[Natts_pg_largeobject]; bool replace[Natts_pg_largeobject]; + bool prevpagefull; CatalogIndexState indstate; Assert(PointerIsValid(obj_desc)); @@ -770,10 +766,20 @@ inv_truncate(LargeObjectDesc *obj_desc, int len) BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(obj_desc->id)); + /* + * Request the previous page as well, if we are truncating to the non-0 + * multiple of LOBLKSIZE, so that we can check if a 0-sized EOF marker page + * is necessary or not. + */ + if (off == 0 && pageno > 0) + reqpageno = pageno - 1; + else + reqpageno = pageno; + ScanKeyInit(&skey[1], Anum_pg_largeobject_pageno, BTGreaterEqualStrategyNumber, F_INT4GE, - Int32GetDatum(pageno)); + Int32GetDatum(reqpageno)); sd = systable_beginscan_ordered(lo_heap_r, lo_index_r, obj_desc->snapshot, 2, skey); @@ -781,6 +787,9 @@ inv_truncate(LargeObjectDesc *obj_desc, int len) /* * If possible, get the page the truncation point is in. The truncation * point may be beyond the end of the LO or in a hole. + * + * This can be the previous page in case we are truncating to the non-0 + * multiple of LOBLKSIZE. */ olddata = NULL; if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL) @@ -788,57 +797,97 @@ inv_truncate(LargeObjectDesc *obj_desc, int len) if (HeapTupleHasNulls(oldtuple)) /* paranoia */ elog(ERROR, "null field found in pg_largeobject"); olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple); - Assert(olddata->pageno >= pageno); + Assert(olddata->pageno >= reqpageno); } + /* + * Check if the previous page is full. + * In that case, we don't need to create the 0-sized EOF-marker page. + * + * It's safe to consider the page before pageno 0 full for our purpose. + */ + prevpagefull = pageno == 0; + if (olddata != NULL && olddata->pageno + 1 == pageno) + { + bytea *datafield = &(olddata->data); /* see note at top of + * file */ + Assert(pageno > 0); + Assert(off == 0); + if (getbytealen(datafield) == LOBLKSIZE) + { + prevpagefull = true; + } + /* + * Get the next page. + */ + olddata = NULL; + if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL) + { + if (HeapTupleHasNulls(oldtuple)) /* paranoia */ + elog(ERROR, "null field found in pg_largeobject"); + olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple); + Assert(olddata->pageno >= pageno); + } + } + Assert(olddata == NULL || olddata->pageno >= pageno); /* - * If we found the page of the truncation point we need to truncate the - * data in it. Otherwise if we're in a hole, we need to create a page to - * mark the end of data. + * If we are truncating to a page boundary (off == 0) and the previous + * page is full, remove all tuples beyond the truncation point. + * Otherwise, if we found the page of the truncation point we need to + * truncate the data in it. Otherwise if we're in a hole, we need to + * create a page to mark the end of data. */ - if (olddata != NULL && olddata->pageno == pageno) + if (off == 0 && prevpagefull) + { + if (olddata != NULL) { + Assert(olddata->pageno >= pageno); + simple_heap_delete(lo_heap_r, &oldtuple->t_self); + } + } + else if (olddata != NULL && olddata->pageno == pageno) { - /* First, load old data into workbuf */ bytea *datafield = &(olddata->data); /* see note at top of * file */ bool pfreeit = false; int pagelen; - if (VARATT_IS_EXTENDED(datafield)) - { - datafield = (bytea *) - heap_tuple_untoast_attr((struct varlena *) datafield); - pfreeit = true; - } - pagelen = getbytealen(datafield); - Assert(pagelen <= LOBLKSIZE); - memcpy(workb, VARDATA(datafield), pagelen); - if (pfreeit) - pfree(datafield); + if (getbytealen(datafield) != off) { + /* First, load old data into workbuf */ + if (VARATT_IS_EXTENDED(datafield)) + { + datafield = (bytea *) + heap_tuple_untoast_attr((struct varlena *) datafield); + pfreeit = true; + } + pagelen = getbytealen(datafield); + Assert(pagelen <= LOBLKSIZE); + memcpy(workb, VARDATA(datafield), pagelen); + if (pfreeit) + pfree(datafield); - /* - * Fill any hole - */ - off = len % LOBLKSIZE; - if (off > pagelen) - MemSet(workb + pagelen, 0, off - pagelen); + /* + * Fill any hole + */ + if (off > pagelen) + MemSet(workb + pagelen, 0, off - pagelen); - /* compute length of new page */ - SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ); + /* compute length of new page */ + SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ); - /* - * Form and insert updated tuple - */ - memset(values, 0, sizeof(values)); - memset(nulls, false, sizeof(nulls)); - memset(replace, false, sizeof(replace)); - values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf); - replace[Anum_pg_largeobject_data - 1] = true; - newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r), - values, nulls, replace); - simple_heap_update(lo_heap_r, &newtup->t_self, newtup); - CatalogIndexInsert(indstate, newtup); - heap_freetuple(newtup); + /* + * Form and insert updated tuple + */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replace, false, sizeof(replace)); + values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf); + replace[Anum_pg_largeobject_data - 1] = true; + newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r), + values, nulls, replace); + simple_heap_update(lo_heap_r, &newtup->t_self, newtup); + CatalogIndexInsert(indstate, newtup); + heap_freetuple(newtup); + } } else { @@ -858,7 +907,6 @@ inv_truncate(LargeObjectDesc *obj_desc, int len) * * Fill the hole up to the truncation point */ - off = len % LOBLKSIZE; if (off > 0) MemSet(workb, 0, off);