Skip to content

Commit

Permalink
#1781 Clean combine handling on delta streams
Browse files Browse the repository at this point in the history
  • Loading branch information
Derek Nelson committed Jul 16, 2017
1 parent 005bc3e commit 5dbac43
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 12 deletions.
44 changes: 32 additions & 12 deletions src/backend/pipeline/analyzer.c
Original file line number Diff line number Diff line change
Expand Up @@ -2989,11 +2989,16 @@ find_cv_attr(ParseState *pstate, RangeVar *cvrv, RangeTblEntry *joinrte, Var *va
* combine_target_for_osrel
*/
static bool
combine_target_for_osrel(Node *node, List *rtable, FieldSelect **fsp, Oid *cqid)
combine_target_for_osrel(Node *node, List *rtable, FieldSelect **fsp, Oid *cqid, Expr **expr)
{
Var *v;
RangeTblEntry *rte;
FieldSelect *fs;
TypeCacheEntry *typ;
ContQuery *cq;
Query *q;
ListCell *lc;
char *target;

/*
* The FieldSelect may be wrapped in a final function
Expand All @@ -3019,8 +3024,30 @@ combine_target_for_osrel(Node *node, List *rtable, FieldSelect **fsp, Oid *cqid)
if (rte->relkind != RELKIND_STREAM)
return false;

if (!RelIdIsForOutputStream(rte->relid, cqid))
return false;

*fsp = fs;
return RelIdIsForOutputStream(rte->relid, cqid);
cq = GetContQueryForId(*cqid);
q = GetContViewQuery(cq->name);
typ = lookup_type_cache(v->vartype, 0);

target = NameStr(typ->tupDesc->attrs[fs->fieldnum - 1]->attname);

foreach(lc, q->targetList)
{
TargetEntry *te = (TargetEntry *) lfirst(lc);

if (te->resname && pg_strcasecmp(te->resname, target) == 0)
{
*expr = te->expr;
return true;
}
}

elog(ERROR, "could not find column \"%s\" in continuous view's output stream", target);

return false;
}

/*
Expand All @@ -3047,6 +3074,7 @@ ParseCombineFuncCall(ParseState *pstate, List *fargs,
AttrNumber cvatt = InvalidAttrNumber;
Oid cqid;
FieldSelect *fs;
Expr *expr;

if (list_length(fargs) != 1)
ereport(ERROR,
Expand All @@ -3067,22 +3095,14 @@ ParseCombineFuncCall(ParseState *pstate, List *fargs,
/*
* 1) Is it a combine call on an output stream?
*/
if (combine_target_for_osrel(arg, pstate->p_rtable, &fs, &cqid))
if (combine_target_for_osrel(arg, pstate->p_rtable, &fs, &cqid, &expr))
{
AttrNumber attno = fs->fieldnum;
ContQuery *cq = GetContQueryForId(cqid);
Query *q = GetContViewQuery(cq->name);
Expr *expr;

target = (TargetEntry *) list_nth(q->targetList, attno - 1);
expr = target->expr;

/*
* We may need to pull off a final function to get to the aggregate
*/
if (IsA(expr, FuncExpr))
{
FuncExpr *f = (FuncExpr *) target->expr;
FuncExpr *f = (FuncExpr *) expr;
if (list_length(f->args) == 1)
expr = linitial(f->args);
}
Expand Down
42 changes: 42 additions & 0 deletions src/test/regress/expected/output_streams.out
Original file line number Diff line number Diff line change
Expand Up @@ -283,3 +283,45 @@ DROP STREAM os_stream CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to continuous transform os_xform
drop cascades to continuous view os4
CREATE STREAM os_stream (x integer, y numeric);
CREATE CONTINUOUS VIEW os5 AS
SELECT x,
abs(sum(y) - sum(y)) AS abs,
count(*),
avg(y) AS avg
FROM os_stream GROUP BY x;
CREATE CONTINUOUS VIEW os6 AS
SELECT
(new).x % 2 AS g,
combine((delta).avg) AS avg
FROM output_of('os5') GROUP BY g;
INSERT INTO os_stream (x, y) SELECT x, x AS y FROM generate_series(1, 100) AS x;
SELECT count(*) FROM os5;
count
-------
100
(1 row)

SELECT combine(avg) FROM os5;
combine
---------------------
50.5000000000000000
(1 row)

SELECT * FROM os6 ORDER BY g;
g | avg
---+---------------------
0 | 51.0000000000000000
1 | 50.0000000000000000
(2 rows)

SELECT combine(avg) FROM os6;
combine
---------------------
50.5000000000000000
(1 row)

DROP STREAM os_stream CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to continuous view os5
drop cascades to continuous view os6
24 changes: 24 additions & 0 deletions src/test/regress/sql/output_streams.sql
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,27 @@ SELECT * FROM os4 ORDER BY x;

DROP STREAM os_stream CASCADE;

CREATE STREAM os_stream (x integer, y numeric);

CREATE CONTINUOUS VIEW os5 AS
SELECT x,
abs(sum(y) - sum(y)) AS abs,
count(*),
avg(y) AS avg
FROM os_stream GROUP BY x;

CREATE CONTINUOUS VIEW os6 AS
SELECT
(new).x % 2 AS g,
combine((delta).avg) AS avg
FROM output_of('os5') GROUP BY g;

INSERT INTO os_stream (x, y) SELECT x, x AS y FROM generate_series(1, 100) AS x;

SELECT count(*) FROM os5;
SELECT combine(avg) FROM os5;

SELECT * FROM os6 ORDER BY g;
SELECT combine(avg) FROM os6;

DROP STREAM os_stream CASCADE;

0 comments on commit 5dbac43

Please sign in to comment.