blob: 2f220339064d97e83522f5f258444583ee70b2c4 [file] [log] [blame]
// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'package:gcloud/common.dart';
import 'package:gcloud/datastore.dart' as raw;
import 'package:grpc/grpc.dart' as grpc;
import '../errors.dart' as errors;
import '../grpc_api/datastore_api.dart';
const List<String> OAuth2Scopes = <String>[
'https://www.googleapis.com/auth/cloud-platform',
'https://www.googleapis.com/auth/datastore',
];
// TODO(kustermann): Currently we only convert service specific errors to
// different exceptions (e.g. transaction aborted).
// Maybe we want to wrap all grpc.BaseException exceptions and convert them to
// appengine specific ones.
class GrpcDatastoreImpl implements raw.Datastore {
final DatastoreClient _clientRPCStub;
final _Codec _codec;
final String _projectId;
GrpcDatastoreImpl(
grpc.ClientChannel clientChannel,
grpc.HttpBasedAuthenticator authenticator,
String project,
) : _clientRPCStub = DatastoreClient(
clientChannel,
options: authenticator.toCallOptions,
),
_codec = _Codec(project),
_projectId = project;
@override
Future<List<raw.Key>> allocateIds(List<raw.Key> keys) async {
final request = AllocateIdsRequest()..projectId = _projectId;
request.keys
.addAll(keys.map((key) => _codec.encodeKey(key, enforceId: false)));
try {
final response = await _clientRPCStub.allocateIds(request);
return response.keys.map(_codec.decodeKey).toList();
} on grpc.GrpcError catch (_) {
rethrow;
}
}
@override
Future<raw.Transaction> beginTransaction(
{bool crossEntityGroup = false}) async {
final request = BeginTransactionRequest()..projectId = _projectId;
try {
final response = await _clientRPCStub.beginTransaction(request);
return _TransactionImpl(response.transaction);
} on grpc.GrpcError catch (_) {
rethrow;
}
}
@override
Future<raw.CommitResult> commit(
{List<raw.Entity>? inserts,
List<raw.Entity>? autoIdInserts,
List<raw.Key>? deletes,
raw.Transaction? transaction}) async {
final request = CommitRequest()..projectId = _projectId;
if (transaction != null) {
request
..transaction = (transaction as _TransactionImpl)._rpcTransaction
..mode = CommitRequest_Mode.TRANSACTIONAL;
} else {
request.mode = CommitRequest_Mode.NON_TRANSACTIONAL;
}
if (autoIdInserts != null) {
for (final raw.Entity insert in autoIdInserts) {
request.mutations.add(
Mutation()..insert = _codec.encodeEntity(insert, enforceId: false));
}
}
if (inserts != null) {
for (final raw.Entity insert in inserts) {
request.mutations.add(Mutation()..upsert = _codec.encodeEntity(insert));
}
}
if (deletes != null) {
for (final raw.Key delete in deletes) {
request.mutations.add(Mutation()..delete = _codec.encodeKey(delete));
}
}
try {
final CommitResponse response = await _clientRPCStub.commit(request);
var allocatedKeys = <raw.Key>[];
if (autoIdInserts?.isNotEmpty ?? false) {
allocatedKeys = List<raw.Key>.generate(autoIdInserts!.length, (i) {
return _codec.decodeKey(response.mutationResults[i].key);
});
}
return raw.CommitResult(allocatedKeys);
} on grpc.GrpcError catch (error) {
if (error.code == grpc.StatusCode.aborted) {
throw raw.TransactionAbortedError();
} else if (error.code == grpc.StatusCode.invalidArgument) {
throw raw.ApplicationError(error.message ?? 'Unknown error');
} else {
rethrow;
}
}
}
@override
Future rollback(raw.Transaction transaction) async {
final request = RollbackRequest()
..projectId = _projectId
..transaction = (transaction as _TransactionImpl)._rpcTransaction;
try {
await _clientRPCStub.rollback(request);
} on grpc.GrpcError catch (_) {
rethrow;
}
}
@override
Future<List<raw.Entity?>> lookup(List<raw.Key> keys,
{raw.Transaction? transaction}) async {
final request = LookupRequest()
..projectId = _projectId
..readOptions =
(ReadOptions()..readConsistency = ReadOptions_ReadConsistency.STRONG);
if (transaction != null) {
request.readOptions.transaction =
(transaction as _TransactionImpl)._rpcTransaction;
}
request.keys
.addAll(keys.map((key) => _codec.encodeKey(key, enforceId: true)));
try {
final LookupResponse response = await _clientRPCStub.lookup(request);
if (response.deferred.isNotEmpty) {
throw raw.DatastoreError(
'Could not successfully look up all keys due to resource '
'constraints.');
}
final Map<Key, raw.Entity?> entityLookupResult = <Key, raw.Entity?>{};
for (final result in response.found) {
entityLookupResult[result.entity.key] =
_codec.decodeEntity(result.entity);
}
for (final result in response.missing) {
entityLookupResult[result.entity.key] = null;
}
final entities = List<raw.Entity?>.filled(request.keys.length, null);
for (int i = 0; i < request.keys.length; i++) {
final key = request.keys[i];
// The key needs to be in the map, but it's value might be `null`.
if (!entityLookupResult.containsKey(key)) {
throw raw.DatastoreError('Invalid server response: '
'Tried to lookup $key but entity was neither in '
'missing nor in found.\n$response');
}
entities[i] = entityLookupResult[key];
}
return entities;
} on grpc.GrpcError catch (_) {
rethrow;
}
}
@override
Future<Page<raw.Entity>> query(raw.Query query,
{raw.Partition? partition, raw.Transaction? transaction}) async {
final request = RunQueryRequest()..projectId = _projectId;
if (transaction != null) {
request.readOptions = ReadOptions()
..transaction = (transaction as _TransactionImpl)._rpcTransaction
..readConsistency = ReadOptions_ReadConsistency.STRONG;
}
if (partition != null && partition.namespace != null) {
request.partitionId = PartitionId()
..projectId = _projectId
..namespaceId = partition.namespace!;
}
final pbQuery = Query();
if (query.kind != null) {
pbQuery.kind.add(KindExpression()..name = query.kind!);
}
if (query.offset != null) pbQuery.offset = query.offset!;
if (query.limit != null) pbQuery.limit = Int32Value()..value = query.limit!;
// Build a list of property and ancestor query filters. The entries in this
// list will be combined with an AND filter.
final List<Filter> filters = <Filter>[];
if (query.filters != null && query.filters!.isNotEmpty) {
for (final filter in query.filters!) {
final operation = _Codec.FILTER_RELATION_MAPPING[filter.relation]!;
final value = filter.value;
final pbPropertyFilter = PropertyFilter()
..property = (PropertyReference()..name = filter.name)
..op = operation
..value = _codec.encodeValue(value);
final pbFilter = Filter()..propertyFilter = pbPropertyFilter;
filters.add(pbFilter);
}
}
if (query.ancestorKey != null) {
final ancestorKey = _codec.encodeKey(query.ancestorKey!, enforceId: true);
final pbPropertyFilter = PropertyFilter()
..property = (PropertyReference()..name = '__key__')
..op = PropertyFilter_Operator.HAS_ANCESTOR
..value = (Value()..keyValue = ancestorKey);
final pbFilter = Filter()..propertyFilter = pbPropertyFilter;
filters.add(pbFilter);
}
if (filters.length == 1) {
pbQuery.filter = filters[0];
} else if (filters.length > 1) {
final compositeFilter = CompositeFilter()
..op = CompositeFilter_Operator.AND
..filters.addAll(filters);
pbQuery.filter = Filter()..compositeFilter = compositeFilter;
}
if (query.orders != null) {
for (final order in query.orders!) {
final pbOrder = PropertyOrder()
..property = (PropertyReference()..name = order.propertyName)
..direction = order.direction == raw.OrderDirection.Ascending
? PropertyOrder_Direction.ASCENDING
: PropertyOrder_Direction.DESCENDING;
pbQuery.order.add(pbOrder);
}
}
request.query = pbQuery;
try {
final RunQueryResponse response = await _clientRPCStub.runQuery(request);
final QueryResultBatch batch = response.batch;
return _QueryPageImpl.fromQueryResult(
request, _clientRPCStub, _codec, query.offset, 0, query.limit, batch);
} on grpc.GrpcError catch (_) {
rethrow;
}
}
}
class _QueryPageImpl implements Page<raw.Entity> {
final RunQueryRequest _request;
final DatastoreClient _clientGRPCStub;
final _Codec _codec;
final List<int> _cursor;
final List<raw.Entity> _entities;
final bool _isLast;
// This is `Query.offset` and will be carried across page walking.
final int? _offset;
// This is always non-`null` and contains the number of entities that were
// skiped so far.
final int _alreadySkipped;
// If not `null` this will hold the remaining number of entities we are
// allowed to receive according to `Query.limit`.
final int? _remainingNumberOfEntities;
_QueryPageImpl(
this._request,
this._clientGRPCStub,
this._codec,
this._cursor,
this._entities,
this._isLast,
this._offset,
this._alreadySkipped,
this._remainingNumberOfEntities);
static _QueryPageImpl fromQueryResult(
RunQueryRequest request,
DatastoreClient clientGRPCStub,
_Codec codec,
int? offset,
int alreadySkipped,
int? remainingNumberOfEntities,
QueryResultBatch batch) {
// If we have an offset: Check that in total we haven't skipped too many.
if (offset != null &&
offset > 0 &&
batch.hasSkippedResults() &&
batch.skippedResults > (offset - alreadySkipped)) {
throw raw.DatastoreError(
'Datastore was supposed to skip $offset entities, '
'but response indicated '
'${batch.skippedResults + alreadySkipped} entities were '
'skipped (which is more).');
}
// If we have a limit: Check that in total we haven't gotten too many.
if (remainingNumberOfEntities != null &&
remainingNumberOfEntities > 0 &&
batch.entityResults.length > remainingNumberOfEntities) {
throw raw.DatastoreError(
'Datastore returned more entitites (${batch.entityResults.length}) '
'then the limit was ($remainingNumberOfEntities).');
}
// If we have a limit: Calculate the remaining limit.
int? remainingEntities;
if (remainingNumberOfEntities != null && remainingNumberOfEntities > 0) {
remainingEntities =
remainingNumberOfEntities - batch.entityResults.length;
}
// Determine if this is the last query batch.
bool isLast;
if (!batch.hasMoreResults()) {
throw raw.DatastoreError(
'Datastore was supposed to specify the "moreResults" field '
'in the query response, but it was missing.');
}
if (batch.moreResults ==
QueryResultBatch_MoreResultsType.MORE_RESULTS_AFTER_LIMIT ||
batch.moreResults ==
QueryResultBatch_MoreResultsType.MORE_RESULTS_AFTER_CURSOR ||
batch.moreResults == QueryResultBatch_MoreResultsType.NO_MORE_RESULTS) {
isLast = true;
} else if (batch.moreResults ==
QueryResultBatch_MoreResultsType.NOT_FINISHED) {
isLast = false;
} else {
throw raw.DatastoreError(
'Datastore returned an unknown "moreResults" field in the query '
'response');
}
// If we have an offset: Calculate the new number of skipped entities.
int skipped = alreadySkipped;
if (offset != null && offset > 0 && batch.hasSkippedResults()) {
skipped += batch.skippedResults;
}
final entities = batch.entityResults.map((EntityResult result) {
return codec.decodeEntity(result.entity);
}).toList();
return _QueryPageImpl(request, clientGRPCStub, codec, batch.endCursor,
entities, isLast, offset, skipped, remainingEntities);
}
@override
bool get isLast => _isLast;
@override
List<raw.Entity> get items => _entities;
@override
Future<Page<raw.Entity>> next({int? pageSize}) async {
if (isLast) {
throw ArgumentError('Cannot call next() on last page.');
}
// We start at the endCursor we got from the last query.
_request.query.startCursor = _cursor;
// We modify the adjusted offset/limits.
if (_offset != null && (_offset! - _alreadySkipped) > 0) {
_request.query.offset = _offset! - _alreadySkipped;
} else {
_request.query.clearOffset();
}
if (_remainingNumberOfEntities != null) {
_request.query.limit = Int32Value()..value = _remainingNumberOfEntities!;
} else {
_request.query.clearLimit();
}
// Maybe lower the number of entities we want to get back in one go.
if (pageSize != null && pageSize > 0) {
if (!_request.query.hasLimit()) {
_request.query.limit = Int32Value()..value = pageSize;
} else if (_request.query.limit.value >= pageSize) {
_request.query.limit.value = pageSize;
}
}
try {
final response = await _clientGRPCStub.runQuery(_request);
return _QueryPageImpl.fromQueryResult(_request, _clientGRPCStub, _codec,
_offset, _alreadySkipped, _remainingNumberOfEntities, response.batch);
} on grpc.GrpcError catch (_) {
rethrow;
}
}
}
class _TransactionImpl extends raw.Transaction {
final List<int> _rpcTransaction;
_TransactionImpl(this._rpcTransaction);
}
class _Codec {
static const Map<raw.FilterRelation, PropertyFilter_Operator>
FILTER_RELATION_MAPPING = {
raw.FilterRelation.LessThan: PropertyFilter_Operator.LESS_THAN,
raw.FilterRelation.LessThanOrEqual:
PropertyFilter_Operator.LESS_THAN_OR_EQUAL,
raw.FilterRelation.Equal: PropertyFilter_Operator.EQUAL,
// ignore: deprecated_member_use
raw.FilterRelation.GreatherThan: PropertyFilter_Operator.GREATER_THAN,
// ignore: deprecated_member_use
raw.FilterRelation.GreatherThanOrEqual:
PropertyFilter_Operator.GREATER_THAN_OR_EQUAL,
};
final String _projectId;
_Codec(this._projectId);
raw.Entity decodeEntity(Entity pb) {
final properties = <String, Object?>{};
final unIndexedProperties = <String>{};
for (final name in pb.properties.keys) {
final value = decodeValue(pb.properties[name]!);
if (pb.properties[name]!.hasExcludeFromIndexes() &&
pb.properties[name]!.excludeFromIndexes) {
unIndexedProperties.add(name);
}
// NOTE: This is a hackisch way of detecting whether to construct a list
// or not. We may be able to use the [propertyPb.multiple] flag, but
// we could run into issues if we get the same name twice where the flag
// is false. (Or the flag is sometimes set and sometimes not).
//
// The higher-level datastore API will let the user specify a list
// annotation, which will take care of converting 0/1/N valued properties
// to lists (no matter whether they are not present, a value or a list
// from this `properties` here).
if (!properties.containsKey(name)) {
properties[name] = decodeValue(pb.properties[name]!);
} else {
final oldValue = properties[name];
if (oldValue is List) {
oldValue.add(value);
} else {
properties[name] = [oldValue, value];
}
}
}
return raw.Entity(decodeKey(pb.key), properties,
unIndexedProperties: unIndexedProperties);
}
Object? decodeValue(Value value) {
if (value.hasBooleanValue()) {
return value.booleanValue;
} else if (value.hasStringValue()) {
return value.stringValue;
} else if (value.hasIntegerValue()) {
return value.integerValue.toInt();
} else if (value.hasBlobValue()) {
return raw.BlobValue(value.blobValue);
} else if (value.hasDoubleValue()) {
return value.doubleValue;
} else if (value.hasKeyValue()) {
return decodeKey(value.keyValue);
} else if (value.hasNullValue()) {
return null;
} else if (value.hasTimestampValue()) {
final ts = value.timestampValue;
int us = ts.seconds.toInt() * 1000 * 1000;
if (ts.hasNanos()) us += ts.nanos ~/ 1000;
return DateTime.fromMicrosecondsSinceEpoch(us, isUtc: true);
} else if (value.hasArrayValue()) {
return value.arrayValue.values.map(decodeValue).toList();
} else if (value.hasGeoPointValue()) {
throw UnimplementedError('GeoPoint values are not supported yet.');
} else if (value.hasEntityValue()) {
throw UnimplementedError('Entity values are not supported yet.');
}
throw Exception('Unreachable');
}
raw.Key decodeKey(Key pb) {
final keyElements = List<raw.KeyElement?>.filled(pb.path.length, null);
for (int i = 0; i < pb.path.length; i++) {
final part = pb.path[i];
Object id;
if (part.hasName()) {
id = part.name;
} else if (part.hasId()) {
id = part.id.toInt();
} else {
throw const errors.ProtocolError('Neither name nor id present in Key.');
}
keyElements[i] = raw.KeyElement(part.kind, id);
}
raw.Partition? partition;
if (pb.hasPartitionId()) {
final partitionId = pb.partitionId;
if (partitionId.hasNamespaceId()) {
partition = raw.Partition(partitionId.namespaceId);
}
}
partition ??= raw.Partition(null);
return raw.Key(keyElements.cast(), partition: partition);
}
Entity encodeEntity(raw.Entity entity, {bool enforceId = true}) {
final pb = Entity()..key = encodeKey(entity.key, enforceId: enforceId);
final Set<String> unIndexedProperties = entity.unIndexedProperties;
entity.properties.forEach((String property, Object? value) {
final bool indexProperty = !unIndexedProperties.contains(property);
pb.properties[property] = encodeValue(value, !indexProperty);
});
return pb;
}
Key encodeKey(raw.Key key, {bool enforceId = true}) {
final pbPartitionId = PartitionId()..projectId = _projectId;
final pb = Key()..partitionId = pbPartitionId;
final partition = key.partition;
if (partition.namespace != null) {
pbPartitionId.namespaceId = partition.namespace!;
}
for (final part in key.elements) {
final partPb = Key_PathElement()..kind = part.kind;
if (part.id != null) {
if (part.id is String) {
partPb.name = part.id;
} else if (part.id is int) {
partPb.id = Int64(part.id);
} else {
throw raw.ApplicationError(
'Only strings and integers are supported as IDs '
'(was: ${part.id.runtimeType}).');
}
} else {
if (enforceId) {
throw raw.ApplicationError(
'Error while encoding entity key: id was null.');
}
}
pb.path.add(partPb);
}
return pb;
}
Value encodeValue(value, [bool excludeFromIndexes = false]) {
final pb = Value();
if (value is bool) {
pb.booleanValue = value;
if (excludeFromIndexes) pb.excludeFromIndexes = true;
} else if (value is String) {
pb.stringValue = value;
if (excludeFromIndexes) pb.excludeFromIndexes = true;
} else if (value is int) {
pb.integerValue = Int64(value);
if (excludeFromIndexes) pb.excludeFromIndexes = true;
} else if (value is raw.BlobValue) {
pb.blobValue = value.bytes;
if (excludeFromIndexes) pb.excludeFromIndexes = true;
} else if (value is double) {
pb.doubleValue = value;
if (excludeFromIndexes) pb.excludeFromIndexes = true;
} else if (value is raw.Key) {
pb.keyValue = encodeKey(value, enforceId: true);
if (excludeFromIndexes) pb.excludeFromIndexes = true;
} else if (value == null) {
pb.nullValue = NullValue.NULL_VALUE;
if (excludeFromIndexes) pb.excludeFromIndexes = true;
} else if (value is DateTime) {
final int ms = value.millisecondsSinceEpoch;
final int seconds = ms ~/ 1000;
final int ns = 1000 * (value.microsecond + (ms % 1000) * 1000);
pb.timestampValue = Timestamp()
..seconds = (Int64(seconds))
..nanos = ns;
if (excludeFromIndexes) pb.excludeFromIndexes = true;
} else if (value is List) {
// For [ArrayValue]s we need to set the indexing-bit on the individual
// values, not on the array!
pb.arrayValue = ArrayValue()
..values.addAll(value.map((value) {
return encodeValue(value, excludeFromIndexes);
}));
} else {
throw raw.ApplicationError(
'Cannot encode unsupported ${value.runtimeType} type.');
}
return pb;
}
}