blob: 96337770352e954c740e0997aa76576f4b501b5d [file] [log] [blame]
//
// Copyright 2014 Google Inc. All rights reserved.
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file or at
// https://developers.google.com/open-source/licenses/bsd
//
part of charted.charts;
///Function callback to filter items in the input
typedef bool AggregationFilterFunc(var item);
typedef int CompareFunc(Comparable a, Comparable b);
typedef dynamic FieldAccessor(dynamic item, dynamic key);
/// Given list of items, dimensions and facts, compute
/// aggregates (COUNT, SUM, MIN, MAX) for facts at each dimension level.
class AggregationModel {
// Number of aggregations we collect on each fact
int _aggregationTypesCount = 0;
// Currently supported list of aggregations.
static final List<String> supportedAggregationTypes = [
'sum',
'min',
'max',
'valid'
];
// Computed aggregation types.
List<String> computedAggregationTypes;
// Offsets of aggregations that are computed once per fact per dimension
// If an offset is null, it will not be computed
int _offsetSum;
int _offsetMin;
int _offsetMax;
int _offsetCnt;
// Offset of aggregations that one computed once per dimension
int _offsetFilteredCount;
int _offsetSortedIndex;
// Number of bits we can use in an integer without making it medium int
static const int SMI_BITS = 30;
// Computed aggregations
static const int AGGREGATIONS_BUFFER_LENGTH = 1024 * 1024;
Float64List _aggregations;
// Cache of fact values
Float64List _factsCache;
// Cache of enumerated dimensions
List<int> _dimEnumCache;
// Sorted list of indices (for data in _rows/_dimEnumCache/_factsCache)
List<int> _sorted;
// Enumeration map for dimension values
List<Map<Comparable, int>> _dimToIntMap;
// Sort orders for dimension values
List<List<int>> _dimSortOrders;
// Input
List _rows;
List _dimFields;
List _factFields;
// When groupBy is called, this value represents the
// common prefix of the old and new dimensions list.
int _dimPrefixLength = 0;
// Dimensions mapped to computed aggregates
Map<String, int> _dimToAggrMap;
// null when no filter was applied.
// Otherwise, store a bitmap indicating if an item was included.
List<int> _filterResults;
// Cache of entities created for the facts on this aggregation view.
Map<String, AggregationItem> _entityCache;
// List of field names that aggregation items will have.
List<String> _itemFieldNamesCache;
// Walk through the map, by splitting key at '.'
final bool walkThroughMap;
// Map of fieldName to comparator function.
final Map<String, CompareFunc> comparators;
// Timing operations
static final Logger _logger = new Logger('aggregations');
Stopwatch _timeItWatch;
String _timeItName;
FieldAccessor dimensionAccessor;
FieldAccessor factsAccessor;
/// Create a new [AggregationModel] from a [collection] of items,
/// list of [dimensions] on which the items are grouped and a list of [facts]
/// on which aggregations are computed.
AggregationModel(Iterable collection, List dimensions, List facts,
{List<String> aggregationTypes,
this.walkThroughMap: false,
this.comparators: const <String, CompareFunc>{},
this.dimensionAccessor,
this.factsAccessor}) {
_init(collection, dimensions, facts, aggregationTypes);
}
void _timeItStart(String name) {
_timeItName = name;
_timeItWatch = new Stopwatch();
_timeItWatch.start();
}
void _timeItEnd() {
_timeItWatch.stop();
_logger.info('[aggregations/$_timeItName] '
'${_timeItWatch.elapsed.inMilliseconds}ms/${_rows.length}r');
}
List get factFields => _factFields;
List get dimensionFields => _dimFields;
/// Initialize the view
void _init(Iterable collection, List dimensions, List facts,
List<String> aggregationTypes) {
if (collection == null) {
throw new ArgumentError('Data cannot be empty or null');
}
if (facts == null || facts.isEmpty) {
throw new ArgumentError('Facts cannot be empty or null');
}
if (dimensions == null) {
dimensions = [];
}
if (dimensionAccessor == null) {
dimensionAccessor = _fetch;
}
if (factsAccessor == null) {
factsAccessor = _fetch;
}
if (aggregationTypes != null) {
Iterable unknownAggregationTypes =
aggregationTypes.where((e) => !supportedAggregationTypes.contains(e));
if (unknownAggregationTypes.length != 0) {
throw new ArgumentError(
'Unknown aggregation types: ${unknownAggregationTypes.join(', ')}');
}
} else {
aggregationTypes = ['sum'];
}
// Always adding 'count' for correct computation of average and count.
if (!aggregationTypes.contains('valid')) {
aggregationTypes.add('valid');
}
_rows = new List.from(collection, growable: false);
_dimFields = new List.from(dimensions, growable: false);
_factFields = new List.from(facts, growable: false);
_entityCache = new Map<String, AggregationItem>();
_createBuffers();
_aggregationTypesCount = aggregationTypes.length;
for (int i = 0; i < _aggregationTypesCount; i++) {
switch (aggregationTypes[i]) {
case 'sum':
_offsetSum = i;
break;
case 'min':
_offsetMin = i;
break;
case 'max':
_offsetMax = i;
break;
case 'valid':
_offsetCnt = i;
break;
}
}
computedAggregationTypes = new List.from(aggregationTypes, growable: false);
// Preprocess the data
_preprocess();
}
/// Re-calculate aggregations based on new dimensions.
void groupBy(List dimensions, [AggregationFilterFunc filter = null]) {
if (dimensions == null) {
dimensions = [];
}
List savedDimFields = _dimFields;
_dimFields = new List.from(dimensions, growable: false);
_dimPrefixLength = 0;
while (_dimPrefixLength < _dimFields.length &&
_dimPrefixLength < savedDimFields.length &&
savedDimFields[_dimPrefixLength] == _dimFields[_dimPrefixLength]) {
++_dimPrefixLength;
}
_createBuffers();
_preprocess(groupBy: true);
// For groupBy, compute immediately.
compute(filter);
// Ensure that cache represents updated dimensions
_updateCachedEntities();
}
/// Create buffers.
///
/// This method is called when the object is being created and when
/// a groupBy is called to change the dimensions on which
/// aggregations are computed.
void _createBuffers() {
// Create both when object is created and groupBy is called
_dimEnumCache = new Int32List(_dimFields.length * _rows.length);
// Create only when the object is created
if (_factsCache == null) {
_factsCache = new Float64List((_factFields.length + 1) * _rows.length);
}
// Create only when the object is created
if (_filterResults == null) {
_filterResults = new List<int>((_rows.length) ~/ SMI_BITS + 1);
}
// Create both when object is created and groupBy is called
// Reuse dimension enumerations if possible.
var oldDimToInt = _dimToIntMap;
_dimToIntMap = new List<Map<Comparable, int>>.generate(_dimFields.length,
(i) => i < _dimPrefixLength ? oldDimToInt[i] : <Comparable, int>{});
}
/// Check cache entries
/// When data is grouped by a new dimensions, entities that were
/// created prior to the groupBy should be cleared and removed from cache
/// if they aren't valid anymore.
/// Update the entities that are valid after the groupBy.
void _updateCachedEntities() {
var keys = new List<String>.from(_entityCache.keys, growable: false);
keys.forEach((String key) {
_AggregationItemImpl entity = _entityCache[key];
if (entity == null) {
_entityCache.remove(key);
} else if (entity != null && entity.isValid) {
if (key.split(':').length <= _dimPrefixLength) {
entity.update();
} else {
_entityCache.remove(key);
entity.clear();
}
}
});
}
final Map<String, List> _parsedKeys = {};
/// Get value from a map-like object
dynamic _fetch(var item, var _key) {
var key = _key as String;
if (walkThroughMap && key.contains('.')) {
return walk(item, key, _parsedKeys);
} else {
return item[key];
}
}
/// Preprocess Data
/// - Enumerate dimension values
/// - Create sort orders for dimension values
/// - Cache facts in lists
void _preprocess({bool groupBy: false}) {
_timeItStart('preprocess');
// Enumerate dimensions...
// Cache dimensions and facts.
List<int> dimensionValCount =
new List<int>.generate(_dimFields.length, (idx) => 0);
int dimensionsCount = _dimFields.length;
int factsCount = _factFields.length;
int rowCount = _rows.length;
for (int ri = 0, factsDataOffset = 0, dimDataOffset = 0;
ri < rowCount;
++ri, factsDataOffset += factsCount, dimDataOffset += dimensionsCount) {
var item = _rows[ri];
// Cache the fact values in the big buffer, but only
// when we are initializing (not when a groupBy was called
// after initialization)
if (!groupBy) {
for (int fi = 0; fi < factsCount; fi++) {
var value = factsAccessor(item, _factFields[fi]);
_factsCache[factsDataOffset + fi] =
(value == null) ? double.NAN : (value as num).toDouble();
}
}
// Enumerate the dimension values and cache enumerated rows
for (int di = 0; di < dimensionsCount; di++) {
Comparable dimensionVal = dimensionAccessor(item, _dimFields[di]);
int dimensionValEnum = _dimToIntMap[di][dimensionVal];
if (dimensionValEnum == null) {
_dimToIntMap[di][dimensionVal] = dimensionValCount[di];
dimensionValEnum = dimensionValCount[di]++;
}
_dimEnumCache[dimDataOffset + di] = dimensionValEnum;
}
}
// Sort all dimensions internally
// The resulting arrays would be used to sort the entire data
List<List<int>> oldSortOrders = _dimSortOrders;
_dimSortOrders = new List.generate(dimensionsCount, (i) {
if (groupBy && i < _dimPrefixLength) {
return oldSortOrders[i];
}
var dimensionVals = new List<Comparable>.from(_dimToIntMap[i].keys);
var retval = new List<int>(_dimToIntMap[i].length);
// When a comparator is not specified, our implementation of the
// comparator tries to gracefully handle null values.
if (comparators.containsKey(_dimFields[i])) {
dimensionVals.sort(comparators[_dimFields[i]]);
} else {
dimensionVals.sort(_defaultDimComparator);
}
for (int si = 0; si < retval.length; ++si) {
retval[_dimToIntMap[i][dimensionVals[si]]] = si;
}
return retval;
}, growable: false);
// Create a list of sorted indices - only if we are not having a full
// overlap of dimensionFields.
if (!groupBy || _dimPrefixLength != _dimFields.length) {
_sorted = new List<int>.generate(_rows.length, (i) => i, growable: false);
_sorted.sort(_comparator);
}
// Pre-compute frequently used values
_offsetSortedIndex = factsCount * _aggregationTypesCount;
_offsetFilteredCount = factsCount * _aggregationTypesCount + 1;
_timeItEnd();
}
// Ensures that null dimension values don't cause an issue with sorting
int _defaultDimComparator(Comparable left, Comparable right) =>
(left == null && right == null)
? 0
: (left == null) ? -1 : (right == null) ? 1 : left.compareTo(right);
/// Given item indices in rows, compare them based
/// on the sort orders created while pre-processing data.
int _comparator(int one, int two) {
if (one == two) {
return 0;
}
int offsetOne = _dimFields.length * one;
int offsetTwo = _dimFields.length * two;
for (int i = 0; i < _dimFields.length; ++i) {
int diff = _dimSortOrders[i][_dimEnumCache[offsetOne + i]] -
_dimSortOrders[i][_dimEnumCache[offsetTwo + i]];
if (diff != 0) {
return diff;
}
}
return 0;
}
/// Compute aggregations
/// If [filter] is not null, it would be used to filter out items that
/// should not be included in the aggregates.
void compute([AggregationFilterFunc filter = null]) {
_timeItStart('compute');
_dimToAggrMap = new Map<String, int>();
_aggregations = new Float64List(AGGREGATIONS_BUFFER_LENGTH);
_filterResults = filter == null
? null
: new List<int>.filled((_rows.length ~/ SMI_BITS) + 1, 0);
int rowCount = _rows.length;
int dimensionCount = _dimFields.length;
int factsCount = _factFields.length;
// Saves current dimension value to which we are aggregating
// Values of dimensions are in even indices (starting at 0) and
// location of respective dimension in buffer is in odd indices.
List<int> currentDim = new List<int>(dimensionCount * 2);
bool reset = true;
bool isNewDimension = false;
int aggregationSizePerDim = factsCount * _aggregationTypesCount;
// Reserve the 0th position for top-level aggregations.
int currentBufferPos = (factsCount * _aggregationTypesCount + 2);
_dimToAggrMap[''] = 0;
_aggregations[_offsetSortedIndex] = 0.0;
for (int ri = 0, index = 0, dimensionDataOffset = 0, factsDataOffset = 0;
ri < rowCount;
++ri, reset = false) {
// If filter is not null, check if this row must be included in
// the aggregations and mark it accordingly.
index = _sorted[ri];
if (filter != null) {
if (!filter(_rows[index])) {
continue;
} else {
_filterResults[ri ~/ SMI_BITS] |= (1 << (ri % SMI_BITS));
}
}
dimensionDataOffset = index * dimensionCount;
factsDataOffset = index * factsCount;
// Update top-level aggregations.
_updateAggregationsAt(0, factsDataOffset, ri == 0 ? true : false);
// See which dimensions get effected by this row.
// di => the index of the dimension
// ci => index of the cached value in [currentDim]
for (int di = 0, ci = 0; di < dimensionCount; ++di, ci += 2) {
// If a dimension value changed, then all dimensions that are lower
// in the hierarchy change too.
if (reset ||
currentDim[ci] != _dimEnumCache[dimensionDataOffset + di]) {
currentDim[ci] = _dimEnumCache[dimensionDataOffset + di];
currentDim[ci + 1] = currentBufferPos;
// Save location to aggregations position in the buffer
_dimToAggrMap[new List.generate(di + 1, (i) => currentDim[2 * i])
.join(':')] = currentBufferPos;
// Store items start position
_aggregations[currentBufferPos + _offsetSortedIndex] = ri.toDouble();
// After the aggregated values, we save the filtered count,
// index of the first item (in sorted)
currentBufferPos += (aggregationSizePerDim + 2);
reset = true;
isNewDimension = true;
}
_updateAggregationsAt(
currentDim[ci + 1], factsDataOffset, isNewDimension);
isNewDimension = false;
}
}
_timeItEnd();
}
/// Helper function that does the actual aggregations.
/// This function is called once per row per dimension.
_updateAggregationsAt(
int aggrDataOffset, int factsDataOffset, bool isNewDimension) {
// Update count.
_aggregations[aggrDataOffset + _offsetFilteredCount] += 1;
// Update aggregation for each of the facts.
for (int fi = 0, bufferFactOffset = aggrDataOffset;
fi < _factFields.length;
bufferFactOffset += _aggregationTypesCount, ++fi) {
double factValue = _factsCache[factsDataOffset + fi];
if (factValue.isNaN) {
continue;
}
// Sum
if (_offsetSum != null) {
_aggregations[bufferFactOffset + _offsetSum] += factValue;
}
// Min
if (_offsetMin != null &&
(isNewDimension ||
factValue < _aggregations[bufferFactOffset + _offsetMin])) {
_aggregations[bufferFactOffset + _offsetMin] = factValue;
}
// Max
if (_offsetMax != null &&
(isNewDimension ||
factValue > _aggregations[bufferFactOffset + _offsetMax])) {
_aggregations[bufferFactOffset + _offsetMax] = factValue;
}
// Count
if (_offsetCnt != null) {
_aggregations[bufferFactOffset + _offsetCnt]++;
}
}
}
// TODO(prsd):
// 1. Implementation of updates and posting updates to entities.
// patchEntity and addToEntity must add listeners on AggregationItems
// and any changes must be propagated to entities.
// 2. Updates (add/remove/update) should do their best to update the
// aggregations and then maybe do a delayed recomputation (sort etc;)
/// Update an item.
/// If aggregates were already computed, they are updated to reflect the
/// new value and any observers are notified.
void updateItem(dynamic item, String field) {
throw new UnimplementedError();
}
/// Add a new item.
/// If aggregates were already computed, they are updated to reflect
/// values on the new item.
void addItem(dynamic item) {
throw new UnimplementedError();
}
/// Remove an existing item.
/// If aggregates were already computed, they are updated to reflect
/// facts on the removed item.
void removeItem(dynamic item) {
throw new UnimplementedError();
}
/// Return an [AggregationItem] that represents facts for dimension
/// represented by [dimension] Only one instance of an entity is created
/// per dimension (even if this function is called multiple times)
///
/// Callers of this method can observe the returned entity for updates to
/// aggregations caused by changes to filter or done through add, remove
/// or modify of items in the collection.
AggregationItem facts(List<String> dimension) {
List<int> enumeratedList = new List<int>();
for (int i = 0; i < dimension.length; ++i) {
enumeratedList.add(_dimToIntMap[i][dimension[i]]);
}
String key = enumeratedList.join(':');
AggregationItem item = _entityCache[key];
if (item == null && _dimToAggrMap.containsKey(key)) {
item = new _AggregationItemImpl(this, dimension, key);
_entityCache[key] = item;
}
return item;
}
/// Return a list of values that are present for a dimension field.
List valuesForDimension(dynamic dimensionFieldName) {
int di = _dimFields.indexOf(dimensionFieldName);
if (di < 0) {
return null;
}
var values = new List<Comparable>.from(_dimToIntMap[di].keys);
if (comparators.containsKey(dimensionFieldName)) {
values.sort(comparators[dimensionFieldName]);
} else {
values.sort(_defaultDimComparator);
}
return values;
}
}
/// Parse a path for nested map-like objects.
/// Caches the parsed key in the passed map.
///
/// Takes map keys of the format:
/// "list(key=val;val=m).another(state=good).state"
/// and outputs:
/// ["list", {"key": "val", "val": "m"},
/// "another", {"state": "good"}, "state"]
List /* <String|Map<String, String>> */ _parseKey(
String key, Map parsedKeysCache) {
List parts = parsedKeysCache == null ? null : parsedKeysCache[key];
if (parts == null && key != null) {
parts = new List();
if (key.contains(').')) {
int start = 0;
int cursor = 0;
bool inParams = false;
List matchKeyVals;
Map listMatchingMap = {};
while (cursor < key.length) {
if (!inParams) {
cursor = key.indexOf('(', start);
if (cursor == -1) {
parts.addAll(key.substring(start).split('.'));
break;
}
parts.addAll(key.substring(start, cursor).split('.'));
cursor++;
start = cursor;
inParams = true;
} else {
cursor = key.indexOf(')', start);
if (cursor == -1) {
throw new ArgumentError('Invalid field name: $key');
}
listMatchingMap.clear();
matchKeyVals = key.substring(start, cursor).split(';');
matchKeyVals.forEach((value) {
List keyval = value.split('=');
if (keyval.length != 2) {
throw new ArgumentError('Invalid field name: ${key}');
}
listMatchingMap[keyval[0]] = keyval[1];
});
parts.add(listMatchingMap);
cursor += 2;
start = cursor;
inParams = false;
}
}
} else {
parts = key.split('.');
}
if (parsedKeysCache != null) {
parsedKeysCache[key] = parts;
}
}
return parts;
}
/// Walk a map-like structure that could have list in the path.
///
/// Example:
/// Map testMap = {
/// "first": "firstval",
/// "list": [
/// { "val": "m",
/// "key": "val",
/// "another": [
/// { 'state': 'good' },
/// { 'state': 'bad' }
/// ]
/// },
/// { "val": "m", "key": "invalid" },
/// { "val": "o" }
/// ]
/// };
///
/// For the above map:
/// walk(testMap, "list(key=val;val=m).another(state=good).state");
/// outputs:
/// good
dynamic walk(initial, String key, Map parsedKeyCache) {
List parts = _parseKey(key, parsedKeyCache);
return parts.fold(initial, (current, part) {
if (current == null) {
return null;
} else if (current is List && part is Map<String, dynamic>) {
for (int i = 0; i < current.length; i++) {
bool match = true;
part.forEach((String key, val) {
if ((key.contains('.') &&
walk(part, key, parsedKeyCache).toString() != val) ||
part[key] != val) {
match = false;
}
});
if (match) {
return current[i];
}
}
} else {
return current[part];
}
});
}