-
-
Notifications
You must be signed in to change notification settings - Fork 110
Expand file tree
/
Copy pathPreAggregator.js
More file actions
410 lines (372 loc) · 14.2 KB
/
PreAggregator.js
File metadata and controls
410 lines (372 loc) · 14.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
import { Query, and, asNode, ceil, collectColumns, createTable, float64, floor, isBetween, int32, mul, round, scaleTransform, sub, isSelectQuery, ExprNode, SelectQuery } from '@uwdata/mosaic-sql';
import { preaggColumns } from './preagg-columns.js';
import { fnv_hash } from '../util/hash.js';
const Skip = { skip: true, result: null };
/**
* @typedef {object} PreAggregateOptions
* @property {string} [schema] Database schema (namespace) in which to write
* pre-aggregated materialzied views (default 'mosaic').
* @property {boolean} [options.enabled=true] Flag to enable or disable the
* pre-aggregation. This flag can be updated later via the `enabled` property.
*/
/**
* Build and query optimized pre-aggregated materaialized views, for fast
* computation of groupby aggregate queries over compatible client queries
* and selections. The materialized views contains pre-aggregated data for a
* Mosaic client, subdivided by possible query values from an active selection
* clause. These materialized views are database tables that can be queried
* for rapid updates.
*
* Compatible client queries must consist of only groupby dimensions and
* supported aggregate functions. Compatible selections must contain an active
* clause that exposes metadata for an interval or point value predicate.
*
* Materialized views are written to a dedicated schema (namespace) that
* can be set using the *schema* constructor option. This schema acts as a
* persistent cache, and materialized view tables may be used across sessions.
* The `dropSchema` method issues a query to remove *all* tables within this
* schema. This may be needed if the original tables have updated data, but
* should be used with care.
*/
export class PreAggregator {
/**
* Create a new manager of materialized views of pre-aggregated data.
* @param {import('../Coordinator.js').Coordinator} coordinator A Mosaic coordinator.
* @param {PreAggregateOptions} [options] Pre-aggregation options.
*/
constructor(coordinator, {
schema = 'mosaic',
enabled = true
} = {}) {
/** @type {Map<import('../MosaicClient.js').MosaicClient, PreAggregateInfo | Skip | null>} */
this.entries = new Map();
this.active = null;
this.mc = coordinator;
this._schema = schema;
this._enabled = enabled;
}
/**
* Set the enabled state of this manager. If false, any local state is
* cleared and subsequent request calls will return null until re-enabled.
* This method has no effect on any pre-aggregated tables already in the
* database.
* @param {boolean} [state] The enabled state to set.
*/
set enabled(state) {
if (this._enabled !== state) {
if (!state) this.clear();
this._enabled = state;
}
}
/**
* Get the enabled state of this manager.
* @returns {boolean} The current enabled state.
*/
get enabled() {
return this._enabled;
}
/**
* Set the database schema used for pre-aggregated materialized view tables.
* Upon changes, any local state is cleared. This method does _not_ drop any
* existing materialized views, use `dropSchema` before changing the schema
* to also remove existing materalized views in the database.
* @param {string} [schema] The schema name to set.
*/
set schema(schema) {
if (this._schema !== schema) {
this.clear();
this._schema = schema;
}
}
/**
* Get the database schema used for pre-aggregated materialized view tables.
* @returns {string} The current schema name.
*/
get schema() {
return this._schema;
}
/**
* Issues a query through the coordinator to drop the current schema for
* pre-aggregated materialized views. *All* materialized view tables in the
* schema will be removed and local state is cleared. Call this method if
* the underlying base tables have been updated, causing materialized view
* to become stale and inaccurate. Use this method with care! Once dropped,
* the schema will be repopulated by future pre-aggregation requests.
* @returns A query result promise.
*/
dropSchema() {
this.clear();
return this.mc.exec(`DROP SCHEMA IF EXISTS "${this.schema}" CASCADE`);
}
/**
* Clear the cache of pre-aggregation entries for the current active
* selection clause. This method does _not_ drop any existing materialized
* views. Use `dropSchema` to remove existing materialized view tables from
* the database.
*/
clear() {
this.entries.clear();
this.active = null;
}
/**
* Return pre-aggregation information for the active state of a
* client-selection pair, or null if the client has unstable filters.
* This method has multiple possible side effects, including materialized
* view creation and updating internal caches.
* @param {import('../MosaicClient.js').MosaicClient} client A Mosaic client.
* @param {import('../Selection.js').Selection} selection A Mosaic selection
* to filter the client by.
* @param {import('../util/selection-types.js').SelectionClause} activeClause
* A representative active selection clause for which to (possibly) generate
* materialized views of pre-aggregates.
* @returns {PreAggregateInfo | Skip | null} Information and query generator
* for pre-aggregated tables, or null if the client has unstable filters.
*/
request(client, selection, activeClause) {
// if not enabled, do nothing
if (!this.enabled) return null;
const { entries, mc, schema } = this;
const { source } = activeClause;
// if there is no clause source to track, do nothing
if (!source) return null;
// if we have cached active columns, check for updates or exit
if (this.active) {
// if the active clause source has changed, clear the state
// this cancels outstanding requests and clears the local cache
// a clear also sets this.active to null
if (this.active.source !== source) this.clear();
// if we've seen this source and it has unstable filters, do nothing
if (this.active?.source === null) return null;
}
// the current active columns cache value
let { active } = this;
// if cached active columns are unset, analyze the active clause
if (!active) {
// generate active dimension columns to select over
// will return an object with null source if it has unstable filters
this.active = active = activeColumns(activeClause);
// if the active clause has unstable filters, exit now
if (active.source === null) return null;
}
// if we have cached pre-aggregate info, return that
if (entries.has(client)) {
return entries.get(client);
}
// get non-active materialized view columns
const preaggCols = preaggColumns(client);
let info;
if (!preaggCols) {
// if client is not indexable, record null info
info = null;
} else if (selection.skip(client, activeClause)) {
// skip client if untouched by cross-filtering
info = Skip;
} else {
// generate materialized view table
const filter = selection.remove(source).predicate(client);
info = preaggregateInfo(client.query(filter), active, preaggCols, schema);
info.result = mc.exec([
`CREATE SCHEMA IF NOT EXISTS ${schema}`,
createTable(info.table, info.create, { temp: false })
]);
info.result.catch(e => mc.logger().error(e));
}
entries.set(client, info);
return info;
}
}
/**
* Determines the active dimension columns to select over. Returns an object
* with the clause source, column definitions, and a predicate generator
* function for the active dimensions of a pre-aggregated materialized view.
* If the active clause is not indexable or is missing metadata, this method
* returns an object with a null source property.
* @param {import('../util/selection-types.js').SelectionClause} clause
* The active selection clause to analyze.
*/
function activeColumns(clause) {
const { source, meta } = clause;
const clausePred = clause.predicate;
const clauseCols = collectColumns(clausePred).map(c => c.column);
let predicate;
let columns;
if (!meta || !clauseCols) {
return { source: null, columns, predicate };
}
// @ts-ignore
const { type, scales, bin, pixelSize = 1 } = meta;
if (type === 'point') {
predicate = x => x;
columns = Object.fromEntries(
clauseCols.map(col => [`${col}`, asNode(col)])
);
} else if (type === 'interval' && scales) {
// determine pixel-level binning
const bins = scales.map(s => binInterval(s, pixelSize, bin));
if (bins.some(b => !b)) {
// bail if a scale type is unsupported
} else if (bins.length === 1) {
// selection clause predicate has type BetweenOpNode
// single interval selection
predicate = p => p ? isBetween('active0', p.extent.map(bins[0])) : [];
// @ts-ignore
columns = { active0: bins[0](clausePred.expr) };
} else {
// selection clause predicate has type AndNode<BetweenOpNode>
// multiple interval selection
predicate = p => p
? and(p.clauses.map(
(c, i) => isBetween(`active${i}`, c.extent.map(bins[i]))
))
: [];
columns = Object.fromEntries(
// @ts-ignore
clausePred.clauses.map((p, i) => [`active${i}`, bins[i](p.expr)])
);
}
}
return { source: columns ? source : null, columns, predicate };
}
const BIN = { ceil, round };
/**
* Returns a bin function generator to discretize a selection interval domain.
* @param {import('../util/selection-types.js').Scale} scale A scale that maps
* domain values to the output range (typically pixels).
* @param {number} pixelSize The interactive pixel size. This value indicates
* the bin step size and may be greater than an actual screen pixel.
* @param {import('../util/selection-types.js').BinMethod} bin The binning
* method to apply, one of `floor`, `ceil', or `round`.
* @returns {(value: any) => ExprNode} A bin function generator.
*/
function binInterval(scale, pixelSize, bin) {
const { type, domain, range, apply, sqlApply } = scaleTransform(scale);
if (!apply) return; // unsupported scale type
const binFn = BIN[`${bin}`.toLowerCase()] || floor;
const lo = apply(Math.min(...domain));
const hi = apply(Math.max(...domain));
const s = (type === 'identity'
? 1
: Math.abs(range[1] - range[0]) / (hi - lo)) / pixelSize;
const scalar = s === 1
? x => x
: x => mul(float64(s), x);
const diff = lo === 0
? x => x
: x => sub(x, float64(lo));
return value => int32(binFn(scalar(diff(sqlApply(value)))));
}
/**
* Generate pre-aggregate query information.
* @param {SelectQuery} clientQuery The original client query.
* @param {ReturnType<activeColumns>} active Active (selected) columns.
* @param {ReturnType<preaggColumns>} preaggCols Pre-aggregation columns.
* @returns {PreAggregateInfo}
*/
function preaggregateInfo(clientQuery, active, preaggCols, schema) {
const { group, output, preagg } = preaggCols;
const { columns } = active;
// build materialized view construction query
const query = clientQuery
.setSelect({ ...preagg, ...columns })
.groupby(Object.keys(columns));
// ensure active clause columns are selected by subqueries
const [subq] = query.subqueries;
if (subq) {
const cols = Object.values(columns)
.flatMap(c => collectColumns(c).map(c => c.column));
subqueryPushdown(subq, cols);
}
// push any having or orderby criteria to output queries
const having = query._having;
const order = query._orderby;
query._having = [];
query._orderby = [];
// generate creation query string and hash id
const create = query.toString();
const id = (fnv_hash(create) >>> 0).toString(16);
// Simplification of Ibis experiment by ensuring all tables land in the default schema
// const table = `${schema}.preagg_${id}`;
const table = `preagg_${id}`;
// generate preaggregate select query
const select = Query
.select(group, output)
.from(table)
.groupby(group)
.having(having)
.orderby(order);
return new PreAggregateInfo({ table, create, active, select });
}
/**
* Push column selections down to subqueries.
*/
function subqueryPushdown(query, cols) {
const memo = new Set;
const pushdown = q => {
if (memo.has(q)) return;
memo.add(q);
if (isSelectQuery(q) && q._from.length) {
q.select(cols);
}
q.subqueries.forEach(pushdown);
};
pushdown(query);
}
/**
* Metadata and query generator for materialized views of pre-aggregated data.
* This object provides the information needed to generate and query the
* materialized views for a client-selection pair relative to a specific
* active clause and selection state.
*/
export class PreAggregateInfo {
/**
* Create a new pre-aggregation information instance.
* @param {object} options Options object.
* @param {string} options.table The materialized view table name.
* @param {string} options.create The table creation query.
* @param {*} options.active Active column information.
* @param {SelectQuery} options.select Base query for requesting updates
* using a pre-aggregated materialized view.
*/
constructor({ table, create, active, select }) {
/**
* The name of the materialized view.
* @type {string}
*/
this.table = table;
/**
* The SQL query used to generate the materialized view.
* @type {string}
*/
this.create = create;
/**
* A result promise returned for the materialized view creation query.
* @type {Promise | null}
*/
this.result = null;
/**
* Definitions and predicate function for the active columns,
* which are dynamically filtered by the active clause.
*/
this.active = active;
/**
* Select query (sans where clause) for materialized views.
* @type {SelectQuery}
*/
this.select = select;
/**
* Boolean flag indicating a client that should be skipped.
* This value is always false for a created materialized view.
* @type {boolean}
*/
this.skip = false;
}
/**
* Generate a materialized view query for the given predicate.
* @param {import('@uwdata/mosaic-sql').ExprNode} predicate The current
* active clause predicate.
* @returns {SelectQuery} A materialized view query.
*/
query(predicate) {
return this.select.clone().where(this.active.predicate(predicate));
}
}