Skip to content

Commit

Permalink
#1683 Standardize syscache infrastructure
Browse files Browse the repository at this point in the history
  • Loading branch information
derekjn authored Aug 12, 2017
1 parent e770227 commit 16058f8
Show file tree
Hide file tree
Showing 16 changed files with 370 additions and 134 deletions.
5 changes: 3 additions & 2 deletions src/backend/catalog/objectaddress.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
#include "parser/parse_func.h"
#include "parser/parse_oper.h"
#include "parser/parse_type.h"
#include "pipeline/syscache.h"
#include "rewrite/rewriteSupport.h"
#include "storage/lmgr.h"
#include "storage/sinval.h"
Expand Down Expand Up @@ -3159,7 +3160,7 @@ getObjectDescription(const ObjectAddress *object)
{
HeapTuple tup;

tup = SearchSysCache1(PIPELINEQUERYOID,
tup = SearchPipelineSysCache1(PIPELINEQUERYOID,
ObjectIdGetDatum(object->objectId));
if (!HeapTupleIsValid(tup))
elog(ERROR, "cache lookup failed for pipeline_query %u",
Expand All @@ -3174,7 +3175,7 @@ getObjectDescription(const ObjectAddress *object)
{
HeapTuple tup;

tup = SearchSysCache1(PIPELINESTREAMOID,
tup = SearchPipelineSysCache1(PIPELINESTREAMOID,
ObjectIdGetDatum(object->objectId));
if (!HeapTupleIsValid(tup))
elog(ERROR, "cache lookup failed for pipeline_stream %u",
Expand Down
7 changes: 4 additions & 3 deletions src/backend/catalog/pipeline_combine.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "catalog/pipeline_combine_fn.h"
#include "commands/defrem.h"
#include "miscadmin.h"
#include "pipeline/syscache.h"
#include "utils/acl.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
Expand Down Expand Up @@ -93,7 +94,7 @@ GetCombineInfo(Oid aggfnoid, Oid *combinefn, Oid *transoutfn, Oid *combineinfn,
aggform = (Form_pg_aggregate) GETSTRUCT(aggtup);
ReleaseSysCache(aggtup);

combtup = SearchSysCache2(PIPELINECOMBINETRANSFNOID,
combtup = SearchPipelineSysCache2(PIPELINECOMBINETRANSFNOID,
ObjectIdGetDatum(aggform->aggfinalfn), ObjectIdGetDatum(aggform->aggtransfn));

if (!HeapTupleIsValid(combtup))
Expand Down Expand Up @@ -231,7 +232,7 @@ DefineCombiner(Oid aggoid, List *name, List *args, bool oldstyle, List *paramete

Assert(transouttype != INTERNALOID);

combinetup = SearchSysCache2(PIPELINECOMBINETRANSFNOID,
combinetup = SearchPipelineSysCache2(PIPELINECOMBINETRANSFNOID,
ObjectIdGetDatum(agg->aggfinalfn), ObjectIdGetDatum(agg->aggtransfn));

if (HeapTupleIsValid(combinetup))
Expand Down Expand Up @@ -318,7 +319,7 @@ RemovePipelineCombineById(Oid oid)

pipeline_combine = heap_open(PipelineCombineRelationId, RowExclusiveLock);

tuple = SearchSysCache1(PIPELINECOMBINEOID, ObjectIdGetDatum(oid));
tuple = SearchPipelineSysCache1(PIPELINECOMBINEOID, ObjectIdGetDatum(oid));
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for pipeline_combine tuple with OID %u", oid);

Expand Down
27 changes: 14 additions & 13 deletions src/backend/catalog/pipeline_query.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@
#include "nodes/makefuncs.h"
#include "parser/analyze.h"
#include "pipeline/analyzer.h"
#include "pipeline/scheduler.h"
#include "pipeline/miscutils.h"
#include "pipeline/scheduler.h"
#include "pipeline/syscache.h"
#include "postmaster/bgworker.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
Expand Down Expand Up @@ -201,7 +202,7 @@ GetPipelineQueryTuple(RangeVar *name)

Assert(OidIsValid(namespace));

tuple = SearchSysCache1(PIPELINEQUERYRELID, ObjectIdGetDatum(get_relname_relid(name->relname, namespace)));
tuple = SearchPipelineSysCache1(PIPELINEQUERYRELID, ObjectIdGetDatum(get_relname_relid(name->relname, namespace)));

return tuple;
}
Expand Down Expand Up @@ -276,7 +277,7 @@ void
UpdateContViewIndexIds(Oid cvid, Oid pkindid, Oid lookupindid)
{
Relation pipeline_query = heap_open(PipelineQueryRelationId, RowExclusiveLock);
HeapTuple tup = SearchSysCache1(PIPELINEQUERYID, ObjectIdGetDatum(cvid));
HeapTuple tup = SearchPipelineSysCache1(PIPELINEQUERYID, ObjectIdGetDatum(cvid));
bool replace[Natts_pipeline_query];
bool nulls[Natts_pipeline_query];
Datum values[Natts_pipeline_query];
Expand Down Expand Up @@ -308,7 +309,7 @@ void
UpdateContViewRelIds(Oid cvid, Oid cvrelid, Oid osrelid)
{
Relation pipeline_query = heap_open(PipelineQueryRelationId, RowExclusiveLock);
HeapTuple tup = SearchSysCache1(PIPELINEQUERYID, ObjectIdGetDatum(cvid));
HeapTuple tup = SearchPipelineSysCache1(PIPELINEQUERYID, ObjectIdGetDatum(cvid));
bool replace[Natts_pipeline_query];
bool nulls[Natts_pipeline_query];
Datum values[Natts_pipeline_query];
Expand Down Expand Up @@ -380,7 +381,7 @@ GetCVNameFromMatRelName(RangeVar *name)

Assert(OidIsValid(namespace));

tup = SearchSysCache1(PIPELINEQUERYMATRELID, ObjectIdGetDatum(get_relname_relid(name->relname, namespace)));
tup = SearchPipelineSysCache1(PIPELINEQUERYMATRELID, ObjectIdGetDatum(get_relname_relid(name->relname, namespace)));

if (!HeapTupleIsValid(tup))
return NULL;
Expand All @@ -406,7 +407,7 @@ OpenCVRelFromMatRel(Relation matrel, LOCKMODE lockmode)

Assert(OidIsValid(namespace));

tup = SearchSysCache1(PIPELINEQUERYMATRELID, RelationGetRelid(matrel));
tup = SearchPipelineSysCache1(PIPELINEQUERYMATRELID, RelationGetRelid(matrel));

if (!HeapTupleIsValid(tup))
return NULL;
Expand Down Expand Up @@ -487,7 +488,7 @@ IsAMatRel(RangeVar *name, RangeVar **cvname)

Assert(OidIsValid(namespace));

tup = SearchSysCache1(PIPELINEQUERYMATRELID, ObjectIdGetDatum(get_relname_relid(name->relname, namespace)));
tup = SearchPipelineSysCache1(PIPELINEQUERYMATRELID, ObjectIdGetDatum(get_relname_relid(name->relname, namespace)));

if (!HeapTupleIsValid(tup))
{
Expand Down Expand Up @@ -516,7 +517,7 @@ RelIdIsForMatRel(Oid relid, Oid *id)
{
HeapTuple tup;

tup = SearchSysCache1(PIPELINEQUERYMATRELID, ObjectIdGetDatum(relid));
tup = SearchPipelineSysCache1(PIPELINEQUERYMATRELID, ObjectIdGetDatum(relid));

if (!HeapTupleIsValid(tup))
{
Expand Down Expand Up @@ -577,7 +578,7 @@ IsTTLContView(RangeVar *name)
ContQuery *
GetContQueryForId(Oid id)
{
HeapTuple tup = SearchSysCache1(PIPELINEQUERYID, ObjectIdGetDatum(id));
HeapTuple tup = SearchPipelineSysCache1(PIPELINEQUERYID, ObjectIdGetDatum(id));
ContQuery *cq;
Form_pipeline_query row;
Datum tmp;
Expand Down Expand Up @@ -616,7 +617,7 @@ GetContQueryForId(Oid id)
else
cq->matrel = NULL;

tmp = SysCacheGetAttr(PIPELINEQUERYRELID, tup, Anum_pipeline_query_query, &isnull);
tmp = PipelineSysCacheGetAttr(PIPELINEQUERYRELID, tup, Anum_pipeline_query_query, &isnull);
query = (Query *) stringToNode(TextDatumGetCString(tmp));
cq->sql = deparse_query_def(query);
cq->cvdef = query;
Expand Down Expand Up @@ -644,7 +645,7 @@ GetContQueryForId(Oid id)
char *p;
int i;

val = DatumGetByteaP(SysCacheGetAttr(PIPELINEQUERYRELID, tup, Anum_pipeline_query_tgargs, &isnull));
val = DatumGetByteaP(PipelineSysCacheGetAttr(PIPELINEQUERYRELID, tup, Anum_pipeline_query_tgargs, &isnull));
Assert(!isnull);

p = (char *) VARDATA(val);
Expand Down Expand Up @@ -774,7 +775,7 @@ RemovePipelineQueryById(Oid oid)

pipeline_query = heap_open(PipelineQueryRelationId, ExclusiveLock);

tuple = SearchSysCache1(PIPELINEQUERYOID, ObjectIdGetDatum(oid));
tuple = SearchPipelineSysCache1(PIPELINEQUERYOID, ObjectIdGetDatum(oid));
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for continuous view with OID %u", oid);

Expand Down Expand Up @@ -916,7 +917,7 @@ bool
ContQuerySetActive(Oid id, bool active)
{
Relation pipeline_query = heap_open(PipelineQueryRelationId, RowExclusiveLock);
HeapTuple tup = SearchSysCache1(PIPELINEQUERYID, ObjectIdGetDatum(id));
HeapTuple tup = SearchPipelineSysCache1(PIPELINEQUERYID, ObjectIdGetDatum(id));
Form_pipeline_query row;
bool changed = false;

Expand Down
14 changes: 8 additions & 6 deletions src/backend/catalog/pipeline_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "parser/analyze.h"
#include "pipeline/analyzer.h"
#include "pipeline/stream.h"
#include "pipeline/syscache.h"
#include "utils/builtins.h"
#include "tcop/tcopprot.h"
#include "utils/fmgroids.h"
Expand Down Expand Up @@ -99,7 +100,7 @@ streams_to_meta(Relation pipeline_query)
Form_pipeline_query row = (Form_pipeline_query) GETSTRUCT(tup);
ContAnalyzeContext *context;

tmp = SysCacheGetAttr(PIPELINEQUERYRELID, tup, Anum_pipeline_query_query, &isnull);
tmp = PipelineSysCacheGetAttr(PIPELINEQUERYRELID, tup, Anum_pipeline_query_query, &isnull);
querystring = deparse_query_def((Query *) stringToNode(TextDatumGetCString(tmp)));

parsetree_list = pg_parse_query(querystring);
Expand Down Expand Up @@ -272,7 +273,7 @@ update_pipeline_stream_catalog(Relation pipeline_stream, HTAB *hash)

replaces[Anum_pipeline_stream_queries - 1] = true;

tup = SearchSysCache1(PIPELINESTREAMRELID, ObjectIdGetDatum(entry->relid));
tup = SearchPipelineSysCache1(PIPELINESTREAMRELID, ObjectIdGetDatum(entry->relid));
if (!HeapTupleIsValid(tup))
continue;

Expand Down Expand Up @@ -327,7 +328,7 @@ mark_nonexistent_streams(Relation pipeline_stream, List *keys)
HeapTuple newtup;
bool isnull;

SysCacheGetAttr(PIPELINESTREAMRELID, tup, Anum_pipeline_stream_queries, &isnull);
PipelineSysCacheGetAttr(PIPELINESTREAMRELID, tup, Anum_pipeline_stream_queries, &isnull);

/* If queries is already NULL, this is a noop */
if (isnull)
Expand Down Expand Up @@ -399,10 +400,11 @@ UpdatePipelineStreamCatalog(void)
* Gets a bitmap indexed by continuous query id that represents which
* queries are reading from the given stream.
*/
#include "pipeline/syscache.h"
Bitmapset *
GetAllStreamReaders(Oid relid)
{
HeapTuple tup = SearchSysCache1(PIPELINESTREAMRELID, ObjectIdGetDatum(relid));
HeapTuple tup = SearchPipelineSysCache1(PIPELINESTREAMRELID, ObjectIdGetDatum(relid));
bool isnull;
Datum raw;
bytea *bytes;
Expand All @@ -413,7 +415,7 @@ GetAllStreamReaders(Oid relid)
if (!HeapTupleIsValid(tup))
return NULL;

raw = SysCacheGetAttr(PIPELINESTREAMRELID, tup,
raw = PipelineSysCacheGetAttr(PIPELINESTREAMRELID, tup,
Anum_pipeline_stream_queries, &isnull);

if (isnull)
Expand Down Expand Up @@ -588,7 +590,7 @@ RemovePipelineStreamById(Oid oid)

pipeline_stream = heap_open(PipelineStreamRelationId, RowExclusiveLock);

tuple = SearchSysCache1(PIPELINESTREAMOID, ObjectIdGetDatum(oid));
tuple = SearchPipelineSysCache1(PIPELINESTREAMOID, ObjectIdGetDatum(oid));
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for stream with OID %u", oid);

Expand Down
3 changes: 2 additions & 1 deletion src/backend/executor/nodeAgg.c
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@
#include "optimizer/tlist.h"
#include "parser/parse_agg.h"
#include "parser/parse_coerce.h"
#include "pipeline/syscache.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
Expand Down Expand Up @@ -2377,7 +2378,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
*/
if (AGGKIND_IS_COMBINE(aggref->aggkind))
{
combTuple = SearchSysCache2(PIPELINECOMBINETRANSFNOID,
combTuple = SearchPipelineSysCache2(PIPELINECOMBINETRANSFNOID,
ObjectIdGetDatum(finalfn_oid),
ObjectIdGetDatum(transfn_oid));

Expand Down
3 changes: 2 additions & 1 deletion src/backend/executor/nodeWindowAgg.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include "parser/parse_agg.h"
#include "parser/parse_coerce.h"
#include "pipeline/scheduler.h"
#include "pipeline/syscache.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/datum.h"
Expand Down Expand Up @@ -2199,7 +2200,7 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,

if (AGGKIND_IS_COMBINE(wfunc->winaggkind))
{
HeapTuple combTuple = SearchSysCache2(PIPELINECOMBINETRANSFNOID,
HeapTuple combTuple = SearchPipelineSysCache2(PIPELINECOMBINETRANSFNOID,
ObjectIdGetDatum(finalfn_oid),
ObjectIdGetDatum(transfn_oid));

Expand Down
2 changes: 1 addition & 1 deletion src/backend/pipeline/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ include $(top_builddir)/src/Makefile.global
OBJS = combiner_receiver.o planner.o update.o stream.o \
matrel.o tdigest.o miscutils.o bloom.o hll.o cmsketch.o \
analyzer.o scheduler.o worker.o combiner.o fss.o stream_fdw.o executor.o transform_receiver.o \
queue.o reaper.o tuplestore_scan.o physical_group_lookup.o
queue.o reaper.o tuplestore_scan.o physical_group_lookup.o syscache.o

SUBDIRS = ipc

Expand Down
11 changes: 6 additions & 5 deletions src/backend/pipeline/analyzer.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@
#include "parser/parse_target.h"
#include "parser/parse_type.h"
#include "pipeline/analyzer.h"
#include "pipeline/scheduler.h"
#include "pipeline/matrel.h"
#include "pipeline/scheduler.h"
#include "pipeline/stream.h"
#include "pipeline/syscache.h"
#include "rewrite/rewriteHandler.h"
#include "storage/lock.h"
#include "tcop/tcopprot.h"
Expand Down Expand Up @@ -1249,7 +1250,7 @@ validate_agg(Node *node)
aggform = (Form_pg_aggregate) GETSTRUCT(aggtup);
ReleaseSysCache(aggtup);

combtup = SearchSysCache2(PIPELINECOMBINETRANSFNOID,
combtup = SearchPipelineSysCache2(PIPELINECOMBINETRANSFNOID,
ObjectIdGetDatum(aggform->aggfinalfn), ObjectIdGetDatum(aggform->aggtransfn));

if (!HeapTupleIsValid(combtup))
Expand Down Expand Up @@ -2518,7 +2519,7 @@ get_cont_query_select_stmt(RangeVar *rv)
(errcode(ERRCODE_UNDEFINED_CONTINUOUS_VIEW),
errmsg("continuous view \"%s\" does not exist", rv->relname)));

tmp = SysCacheGetAttr(PIPELINEQUERYRELID, tup, Anum_pipeline_query_query, &isnull);
tmp = PipelineSysCacheGetAttr(PIPELINEQUERYRELID, tup, Anum_pipeline_query_query, &isnull);
query = (Query *) stringToNode(TextDatumGetCString(tmp));

sql = deparse_query_def(query);
Expand Down Expand Up @@ -2682,14 +2683,14 @@ get_worker_query_for_id(Oid id)
RangeVar *matrel;
Form_pipeline_query row;

tup = SearchSysCache1(PIPELINEQUERYID, ObjectIdGetDatum(id));
tup = SearchPipelineSysCache1(PIPELINEQUERYID, ObjectIdGetDatum(id));

if (!HeapTupleIsValid(tup))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_CONTINUOUS_VIEW),
errmsg("continuous view with id \"%d\" does not exist", id)));

tmp = SysCacheGetAttr(PIPELINEQUERYRELID, tup, Anum_pipeline_query_query, &isnull);
tmp = PipelineSysCacheGetAttr(PIPELINEQUERYRELID, tup, Anum_pipeline_query_query, &isnull);
query = (Query *) stringToNode(TextDatumGetCString(tmp));

sql = deparse_query_def(query);
Expand Down
3 changes: 2 additions & 1 deletion src/backend/pipeline/executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "pipeline/ipc/reader.h"
#include "pipeline/miscutils.h"
#include "pipeline/stream.h"
#include "pipeline/syscache.h"
#include "tcop/tcopprot.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
Expand Down Expand Up @@ -205,7 +206,7 @@ get_query_state(ContExecutor *exec)

PushActiveSnapshot(GetTransactionSnapshot());

tup = SearchSysCache1(PIPELINEQUERYID, Int32GetDatum(exec->curr_query_id));
tup = SearchPipelineSysCache1(PIPELINEQUERYID, Int32GetDatum(exec->curr_query_id));

/* Was the continuous view removed? */
if (!HeapTupleIsValid(tup))
Expand Down
Loading

0 comments on commit 16058f8

Please sign in to comment.