Skip to content

Commit

Permalink
#1852 Remove fields from Query
Browse files Browse the repository at this point in the history
  • Loading branch information
derekjn authored Aug 16, 2017
1 parent 5babc2e commit a7cf428
Show file tree
Hide file tree
Showing 17 changed files with 167 additions and 49 deletions.
4 changes: 2 additions & 2 deletions src/backend/catalog/pipeline_query.c
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ DefineContinuousView(Oid relid, Query *query, Oid matrelid, Oid seqrelid, int tt
values[Anum_pipeline_query_seqrelid - 1] = ObjectIdGetDatum(seqrelid);
values[Anum_pipeline_query_ttl - 1] = Int32GetDatum(ttl);
values[Anum_pipeline_query_ttl_attno - 1] = Int16GetDatum(ttl_attno);
values[Anum_pipeline_query_step_factor - 1] = Int16GetDatum(query->swStepFactor);
values[Anum_pipeline_query_step_factor - 1] = Int16GetDatum(QueryGetSWStepFactor(query));

/* unused */
values[Anum_pipeline_query_tgfn - 1] = ObjectIdGetDatum(InvalidOid);
Expand Down Expand Up @@ -630,7 +630,7 @@ GetContQueryForId(Oid id)

cq->is_sw = true;
cq->sw_attno = row->ttl_attno;
cq->sw_step_factor = query->swStepFactor;
cq->sw_step_factor = row->step_factor;
i = GetSWInterval(cq->name);
cq->sw_interval_ms = 1000 * (int) DatumGetFloat8(
DirectFunctionCall2(interval_part, CStringGetTextDatum("epoch"), (Datum) i));
Expand Down
9 changes: 8 additions & 1 deletion src/backend/commands/pipelinecmds.c
Original file line number Diff line number Diff line change
Expand Up @@ -797,8 +797,15 @@ ExecCreateContViewStmt(CreateContViewStmt *stmt, const char *querystring)
*/
if (AttributeNumberIsValid(ttl_attno))
{
double sf;

has_sw = true;
cont_query->swStepFactor = cont_query->swStepFactor ? cont_query->swStepFactor : sliding_window_step_factor;
sf = QueryGetSWStepFactor(cont_query);

if (!sf)
sf = sliding_window_step_factor;

QuerySetSWStepFactor(cont_query, sf);
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/backend/commands/view.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "nodes/nodeFuncs.h"
#include "parser/analyze.h"
#include "parser/parse_relation.h"
#include "pipeline/analyzer.h"
#include "rewrite/rewriteDefine.h"
#include "rewrite/rewriteManip.h"
#include "rewrite/rewriteHandler.h"
Expand Down Expand Up @@ -610,7 +611,7 @@ DefineView(ViewStmt *stmt, const char *queryString)
* NOTE: if it already exists and replace is false, the xact will be
* aborted.
*/
if (viewParse->isContinuous)
if (QueryIsContinuous(viewParse))
address = DefineContVirtualRelation(view, viewParse->targetList);
else
address = DefineVirtualRelation(view, viewParse->targetList,
Expand Down
4 changes: 0 additions & 4 deletions src/backend/nodes/copyfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -2709,10 +2709,6 @@ _copyQuery(const Query *from)
COPY_NODE_FIELD(setOperations);
COPY_NODE_FIELD(constraintDeps);
COPY_NODE_FIELD(withCheckOptions);
COPY_SCALAR_FIELD(isContinuous);
COPY_SCALAR_FIELD(isCombine);
COPY_SCALAR_FIELD(isCombineLookup);
COPY_SCALAR_FIELD(swStepFactor);

return newnode;
}
Expand Down
6 changes: 0 additions & 6 deletions src/backend/nodes/outfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -2201,8 +2201,6 @@ _outSelectStmt(StringInfo str, const SelectStmt *node)
WRITE_NODE_FIELD(withClause);
WRITE_ENUM_FIELD(op, SetOperation);
WRITE_BOOL_FIELD(all);
WRITE_BOOL_FIELD(forContinuousView);
WRITE_FLOAT_FIELD(swStepFactor, "%.2f");
WRITE_NODE_FIELD(larg);
WRITE_NODE_FIELD(rarg);
}
Expand Down Expand Up @@ -2396,10 +2394,6 @@ _outQuery(StringInfo str, const Query *node)
WRITE_NODE_FIELD(rowMarks);
WRITE_NODE_FIELD(setOperations);
WRITE_NODE_FIELD(constraintDeps);
WRITE_BOOL_FIELD(isContinuous);
WRITE_BOOL_FIELD(isCombine);
WRITE_BOOL_FIELD(isCombineLookup);
WRITE_FLOAT_FIELD(swStepFactor, "%.2f");
}

static void
Expand Down
4 changes: 0 additions & 4 deletions src/backend/nodes/readfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,6 @@ _readQuery(void)
READ_NODE_FIELD(rowMarks);
READ_NODE_FIELD(setOperations);
READ_NODE_FIELD(constraintDeps);
READ_BOOL_FIELD(isContinuous);
READ_BOOL_FIELD(isCombine);
READ_BOOL_FIELD(isCombineLookup);
READ_INT_FIELD(swStepFactor);

READ_DONE();
}
Expand Down
3 changes: 1 addition & 2 deletions src/backend/optimizer/util/clauses.c
Original file line number Diff line number Diff line change
Expand Up @@ -507,8 +507,7 @@ count_agg_clauses_walker(Node *node, count_agg_clauses_context *context)
* Continuous queries don't actually do any sorting for ordered-set
* aggs, so only count it if it's not a CQ.
*/
if (context->root->parse->isContinuous == false &&
(aggref->aggorder != NIL || aggref->aggdistinct != NIL))
if (aggref->aggorder != NIL || aggref->aggdistinct != NIL)
costs->numOrderedAggs++;

/* add component function execution costs to appropriate totals */
Expand Down
9 changes: 1 addition & 8 deletions src/backend/optimizer/util/plancat.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,6 @@ get_relation_info(PlannerInfo *root, Oid relationObjectId, bool inhparent,

indexoidlist = RelationGetIndexList(relation);

/*
* When performing the combiner lookup query, we have already acquired a RowExclusiveLock
* on the materialzation table and any indices that might be used while executing the
* query. See cont_combiner.c:combine.
*/
if (root->parse->isCombineLookup)
lmode = NoLock;
/*
* For each index, we get the same type of lock that the executor will
* need, and do not release it. This saves a couple of trips to the
Expand All @@ -165,7 +158,7 @@ get_relation_info(PlannerInfo *root, Oid relationObjectId, bool inhparent,
* index while we hold lock on the parent rel, and neither lock type
* blocks any other kind of index operation.
*/
else if (rel->relid == root->parse->resultRelation)
if (rel->relid == root->parse->resultRelation)
lmode = RowExclusiveLock;
else
lmode = AccessShareLock;
Expand Down
6 changes: 3 additions & 3 deletions src/backend/parser/analyze.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ parse_analyze(Node *parseTree, const char *sourceText,
if (IsA(parseTree, SelectStmt))
{
SelectStmt *stmt = (SelectStmt *) parseTree;
query->isContinuous = stmt->forContinuousView;
query->isCombineLookup = stmt->forCombineLookup;
query->swStepFactor = stmt->swStepFactor;

QuerySetIsContinuous(query, stmt->forContinuousView);
QuerySetSWStepFactor(query, stmt->swStepFactor);
}

if (post_parse_analyze_hook)
Expand Down
140 changes: 136 additions & 4 deletions src/backend/pipeline/analyzer.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/sysattr.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "catalog/binary_upgrade.h"
#include "catalog/namespace.h"
Expand Down Expand Up @@ -876,6 +877,135 @@ validate_target_list(SelectStmt *stmt)
}
}

HTAB *query_cache;
TransactionId query_cache_xid = InvalidTransactionId;
int current_query_id = 0;

typedef struct QueryState
{
int queryId;
Oid cqId;
double swStepFactor;
bool isContinuous;
} QueryState;

static QueryState *
get_query_state(Query *query)
{
MemoryContext old;
QueryState *entry;
bool found;

/*
* We lazily create and destroy the Query hashtable such that it's only valid for
* a single transaction. This does mean that a garbage reference to the cache will
* persist after a transaction finishes, but we detect this condition by associating
* a transaction id with the hashtable. If the hashtable's transaction id is different
* from the current transaction's, we immediately destroy and recreate it.
*/
if (TransactionIdIsValid(query_cache_xid))
{
if (query_cache_xid != GetCurrentTransactionId())
{
if (query_cache)
hash_destroy(query_cache);
query_cache = NULL;
current_query_id = 0;
}
}

if (query_cache == NULL)
{
HASHCTL ctl;

Assert(TopMemoryContext);

MemSet(&ctl, 0, sizeof(ctl));
ctl.keysize = sizeof(int);
ctl.entrysize = sizeof(QueryState);
ctl.hcxt = TopMemoryContext;
query_cache = hash_create("query_cache", 16, &ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
query_cache_xid = GetCurrentTransactionId();
}

if (query->queryId <= 0)
query->queryId = ++current_query_id;

old = MemoryContextSwitchTo(TopTransactionContext);
entry = (QueryState *) hash_search(query_cache, &query->queryId, HASH_ENTER, &found);
if (!found)
{
entry->isContinuous = false;
entry->cqId = InvalidOid;
entry->swStepFactor = 0;
}
MemoryContextSwitchTo(old);

Assert(entry);

return entry;
}

/*
* QueryIsContinuous
*/
bool
QueryIsContinuous(Query *query)
{
QueryState *state = get_query_state(query);
return state->isContinuous;
}

/*
* GetSWStepFactor
*/
double
QueryGetSWStepFactor(Query *query)
{
QueryState *state = get_query_state(query);
return state->swStepFactor;
}

/*
* SetSWStepFactor
*/
void
QuerySetSWStepFactor(Query *query, double sf)
{
QueryState *state = get_query_state(query);
state->swStepFactor = sf;
}

/*
* QuerySetIsContinuous
*/
void
QuerySetIsContinuous(Query *query, bool continuous)
{
QueryState *state = get_query_state(query);
state->isContinuous = continuous;
}

/*
* GetQueryId
*/
Oid
QueryGetContQueryId(Query *query)
{
QueryState *state = get_query_state(query);
return state->cqId;
}

/*
* SetContQueryId
*/
void
QuerySetContQueryId(Query *query, Oid id)
{
QueryState *state = get_query_state(query);
state->cqId = id;
}

/*
* ValidateParsedContQuery
*/
Expand Down Expand Up @@ -2503,6 +2633,7 @@ get_cont_query_select_stmt(RangeVar *rv)
char *sql;
Query *query;
SelectStmt *select;
Form_pipeline_query row;

tup = GetPipelineQueryTuple(rv);

Expand All @@ -2511,12 +2642,13 @@ get_cont_query_select_stmt(RangeVar *rv)
(errcode(ERRCODE_UNDEFINED_CONTINUOUS_VIEW),
errmsg("continuous view \"%s\" does not exist", rv->relname)));

row = (Form_pipeline_query) GETSTRUCT(tup);
tmp = PipelineSysCacheGetAttr(PIPELINEQUERYRELID, tup, Anum_pipeline_query_query, &isnull);
query = (Query *) stringToNode(TextDatumGetCString(tmp));

sql = deparse_query_def(query);
select = (SelectStmt *) linitial(pg_parse_query(sql));
select->swStepFactor = query->swStepFactor;
select->swStepFactor = row->step_factor;

ReleaseSysCache(tup);

Expand Down Expand Up @@ -2682,14 +2814,14 @@ get_worker_query_for_id(Oid id)
(errcode(ERRCODE_UNDEFINED_CONTINUOUS_VIEW),
errmsg("continuous view with id \"%d\" does not exist", id)));

row = (Form_pipeline_query) GETSTRUCT(tup);
tmp = PipelineSysCacheGetAttr(PIPELINEQUERYRELID, tup, Anum_pipeline_query_query, &isnull);
query = (Query *) stringToNode(TextDatumGetCString(tmp));

sql = deparse_query_def(query);
sel = (SelectStmt *) linitial(pg_parse_query(sql));
sel->swStepFactor = query->swStepFactor;
sel->swStepFactor = row->step_factor;

row = (Form_pipeline_query) GETSTRUCT(tup);
matrel = makeRangeVar(get_namespace_name(get_rel_namespace(row->matrelid)), get_rel_name(row->matrelid), -1);

ReleaseSysCache(tup);
Expand Down Expand Up @@ -3904,7 +4036,7 @@ ApplySlidingWindow(SelectStmt *stmt, DefElem *sw, int *ttl)
if (sw_cv)
{
Interval *sw_interval = GetSWInterval(sw_cv);
int step_factor = GetContWorkerQuery(sw_cv)->swStepFactor;
int step_factor = QueryGetSWStepFactor(GetContWorkerQuery(sw_cv));
Interval *view_interval = parse_node_to_interval((Node *) interval);
Interval *step_interval;
Interval *min_interval;
Expand Down
5 changes: 2 additions & 3 deletions src/backend/pipeline/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,8 @@ get_plan_from_stmt(Oid id, Node *node, const char *sql, bool is_combine)

query = linitial(pg_analyze_and_rewrite(node, sql, NULL, 0));

query->isContinuous = true;
query->isCombine = is_combine;
query->cqId = id;
QuerySetIsContinuous(query, true);
QuerySetContQueryId(query, id);

plan = pg_plan_query(query, 0, NULL);

Expand Down
3 changes: 2 additions & 1 deletion src/backend/pipeline/stream_fdw.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "parser/parse_expr.h"
#include "parser/parsetree.h"
#include "pgstat.h"
#include "pipeline/analyzer.h"
#include "pipeline/executor.h"
#include "pipeline/scheduler.h"
#include "pipeline/ipc/pzmq.h"
Expand Down Expand Up @@ -137,7 +138,7 @@ GetStreamPaths(PlannerInfo *root, RelOptInfo *baserel, Oid relid)
while (parent->parent_root != NULL)
parent = parent->parent_root;

if (!parent->parse->isContinuous)
if (!QueryIsContinuous(parent->parse))
{
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
Expand Down
2 changes: 1 addition & 1 deletion src/backend/pipeline/tuplestore_scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ create_tuplestore_scan_plan(PlannerInfo *root, RelOptInfo *rel, struct CustomPat
{
CustomScan *scan = makeNode(CustomScan);
Plan *plan = &scan->scan.plan;
ContQuery *cv = GetContQueryForViewId(root->parse->cqId);
ContQuery *cv = GetContQueryForViewId(QueryGetContQueryId(root->parse));
Relation matrel = heap_openrv(cv->matrel, NoLock);
TupleDesc desc = RelationGetDescr(matrel);
AttrNumber attrno;
Expand Down
1 change: 1 addition & 0 deletions src/backend/utils/init/pipelineinit.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "miscadmin.h"
#include "pipeline/ipc/microbatch.h"
#include "pipeline/analyzer.h"
#include "pipeline/planner.h"
#include "pipeline/scheduler.h"
#include "pipeline/syscache.h"
Expand Down
2 changes: 1 addition & 1 deletion src/include/catalog/catversion.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,6 @@
*/

/* yyyymmddN */
#define CATALOG_VERSION_NO 201703160
#define CATALOG_VERSION_NO 201708140

#endif
Loading

0 comments on commit a7cf428

Please sign in to comment.