blob: 5be5edda7d3d1d9577bfbf82770532142bea97dd [file] [log] [blame]
// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:io';
import 'package:grpc/grpc.dart' as grpc;
import 'package:stack_trace/stack_trace.dart' show Trace;
import '../grpc_api/dart/google/appengine/logging/v1/request_log.pb.dart'
as gae_log;
import '../grpc_api/logging_api.dart' as api;
import '../logging.dart';
import '../logging_impl.dart';
const List<String> OAuth2Scopes = <String>[
'https://www.googleapis.com/auth/cloud-platform',
'https://www.googleapis.com/auth/logging.write',
];
/// A [appengine.Logging] adapter which groups request-specific logging
/// entries and sends them off via the [SharedLoggingService].
///
/// The `package:appengine` framework is responsible for calling [finish]
/// once the request is done in order to flush all logs and send the combined
/// log entry.
///
/// It uses an embedded AppEngine-compatible protobuf message in order to group
/// logging messages together.
class GrpcRequestLoggingImpl extends LoggingImpl {
static const int logEntryBaseLimit = 150;
static const int logEntrySizeLimit = 40 * 1024;
final SharedLoggingService _sharedLoggingService;
final String _httpMethod;
final String _httpResource;
final String _userAgent;
final String _host;
final String _ip;
final String? _traceId;
final String _referrer;
final int _startTimestamp;
final List<gae_log.LogLine> _gaeLogLines = <gae_log.LogLine>[];
late api.LogSeverity _currentSeverity;
late int _estimatedSize;
late bool _isFirst;
GrpcRequestLoggingImpl(
this._sharedLoggingService,
this._httpMethod,
this._httpResource,
this._userAgent,
this._host,
this._ip,
this._traceId,
this._referrer)
: _startTimestamp = DateTime.now().toUtc().millisecondsSinceEpoch {
_resetState();
_isFirst = true;
}
@override
void log(
LogLevel level,
String message, {
DateTime? timestamp,
}) {
final api.LogSeverity severity = _severityFromLogLevel(level);
// The severity of the combined log entry will be the highest severity
// of the individual log lines.
if (severity.value > _currentSeverity.value) {
_currentSeverity = severity;
}
// We'll send out not-yet-finished entries if they become too big (there
// are limits on how big individual log entries and combined log-write RPCs
// can be, we're using conservative estimates).
_estimatedSize += 25 + message.length;
final timestampInMs =
(timestamp ?? DateTime.now().toUtc()).millisecondsSinceEpoch;
final api.Timestamp startTimestamp =
_protobufTimestampFromMilliseconds(timestampInMs);
final logLine = gae_log.LogLine()
..time = startTimestamp
..severity = _currentSeverity
..logMessage = message;
_gaeLogLines.add(logLine);
// NOTE: We could consider starting a timer here, so we're guaranteed
// to flush the logs. Currently we assume that the framework will call
// [finish] when the http request is done.
if (_estimatedSize > logEntrySizeLimit) {
_enqueue(finish: false);
}
}
/// Flushes the so-far collected loglines to the underlying
/// [SharedLoggingService]. There is no guarantee that it will immediately be
/// sent to the server.
@override
Future flush() async {
if (_gaeLogLines.isNotEmpty) {
_enqueue(finish: false);
}
}
/// Finishes the request-specific logs with the given HTTP [responseStatus]
/// and [responseSize].
@override
void finish(int responseStatus, int responseSize) {
if (_gaeLogLines.isNotEmpty) {
_enqueue(
finish: true,
responseStatus: responseStatus,
responseSize: responseSize);
}
}
@override
void reportError(
LogLevel level,
Object error,
StackTrace stackTrace, {
DateTime? timestamp,
}) {
final api.LogSeverity severity = _severityFromLogLevel(level);
timestamp ??= DateTime.now();
final int now = timestamp.toUtc().millisecondsSinceEpoch;
final api.Timestamp nowTimestamp = _protobufTimestampFromMilliseconds(now);
final gaeResource = api.MonitoredResource()
..type = 'gae_app'
..labels.addAll(_sharedLoggingService.resourceLabels);
final logEntry = api.LogEntry()
..textPayload = _formatStackTrace(error, stackTrace)
..resource = gaeResource
..timestamp = nowTimestamp
..severity = severity
// Write to stderr log, see:
// https://cloud.google.com/error-reporting/docs/setup/app-engine-flexible-environment
..logName = _sharedLoggingService.backgroundLogName;
if (_traceId != null) {
_addLabel(logEntry, 'appengine.googleapis.com/trace_id', _traceId!);
}
_sharedLoggingService.enqueue(logEntry);
}
/// Builds up the combined [api.LogEntry] and enqueues it in the underlying
/// [SharedLoggingService].
void _enqueue({bool finish = false, int? responseStatus, int? responseSize}) {
final api.Timestamp startTimestamp =
_protobufTimestampFromMilliseconds(_startTimestamp);
final int now = DateTime.now().toUtc().millisecondsSinceEpoch;
final api.Timestamp nowTimestamp = _protobufTimestampFromMilliseconds(now);
final protoPayload = api.Any()
..typeUrl = 'type.googleapis.com/google.appengine.logging.v1.RequestLog';
final gaeResource = api.MonitoredResource()
..type = 'gae_app'
..labels.addAll(_sharedLoggingService.resourceLabels);
final logEntry = api.LogEntry()
..protoPayload = protoPayload
..resource = gaeResource
..timestamp = nowTimestamp
..severity = _currentSeverity
..logName = _sharedLoggingService.requestLogName;
final appengineRequestLog = gae_log.RequestLog()
..appId = 's~${_sharedLoggingService.projectId}'
..versionId = _sharedLoggingService.versionId
..method = _httpMethod
..resource = _httpResource
..startTime = startTimestamp
..userAgent = _userAgent
..host = _host
..ip = _ip
..line.addAll(_gaeLogLines)
..first = _isFirst
..finished = finish;
if (_sharedLoggingService.instanceId != null) {
appengineRequestLog.instanceId = _sharedLoggingService.instanceId!;
}
if (_traceId != null) {
appengineRequestLog.traceId = _traceId!;
_addLabel(logEntry, 'appengine.googleapis.com/trace_id', _traceId!);
}
appengineRequestLog.referrer = _referrer;
_resetState();
if (finish) {
final int diff = now - _startTimestamp;
final latency = api.Duration()
..seconds = api.Int64(diff ~/ 1000)
..nanos = 1000 * 1000 * (diff % 1000);
appengineRequestLog
..endTime = nowTimestamp
..latency = latency
..status = responseStatus!;
if (responseSize != null) {
appengineRequestLog.responseSize = api.Int64(responseSize);
}
final httpRequest = api.HttpRequest()..status = responseStatus;
logEntry.httpRequest = httpRequest;
}
protoPayload.value = appengineRequestLog.writeToBuffer();
_sharedLoggingService.enqueue(logEntry);
}
void _resetState() {
_isFirst = false;
_gaeLogLines.clear();
_currentSeverity = api.LogSeverity.DEBUG;
_estimatedSize = logEntryBaseLimit;
}
}
/// A [appengine.Logging] adapter which sends log entries off via the
/// [SharedLoggingService].
class GrpcBackgroundLoggingImpl extends LoggingBase {
final SharedLoggingService _sharedLoggingService;
GrpcBackgroundLoggingImpl(this._sharedLoggingService);
@override
void log(LogLevel level, String message, {DateTime? timestamp}) {
final api.LogSeverity severity = _severityFromLogLevel(level);
final int now = DateTime.now().toUtc().millisecondsSinceEpoch;
final api.Timestamp nowTimestamp = _protobufTimestampFromMilliseconds(now);
final gaeResource = api.MonitoredResource()
..type = 'gae_app'
..labels.addAll(_sharedLoggingService.resourceLabels);
final logEntry = api.LogEntry()
..textPayload = message
..resource = gaeResource
..timestamp = nowTimestamp
..severity = severity
..logName = _sharedLoggingService.backgroundLogName;
_sharedLoggingService.enqueue(logEntry);
}
@override
void reportError(
LogLevel level,
Object error,
StackTrace stackTrace, {
DateTime? timestamp,
}) {
final api.LogSeverity severity = _severityFromLogLevel(level);
timestamp ??= DateTime.now();
final int now = timestamp.toUtc().millisecondsSinceEpoch;
final api.Timestamp nowTimestamp = _protobufTimestampFromMilliseconds(now);
final gaeResource = api.MonitoredResource()
..type = 'gae_app'
..labels.addAll(_sharedLoggingService.resourceLabels);
final logEntry = api.LogEntry()
..textPayload = _formatStackTrace(error, stackTrace)
..resource = gaeResource
..timestamp = nowTimestamp
..severity = severity
// Write to stderr log, see:
// https://cloud.google.com/error-reporting/docs/setup/app-engine-flexible-environment
..logName = _sharedLoggingService.backgroundLogName;
_sharedLoggingService.enqueue(logEntry);
}
@override
Future flush() => Future.value();
}
/// A [appengine.Logging] adapter which uses the gRPC logging API to send
/// logs asynchronously to the Stackdriver logging service.
class SharedLoggingService {
static const Duration FLUSH_DURATION = Duration(seconds: 3);
static const int MAX_LOGENTRIES = 25;
final api.LoggingServiceV2Client _clientStub;
final String projectId;
final String serviceId;
final String versionId;
final String? instanceId;
final String _instanceName;
final Map<String, String> resourceLabels;
final String requestLogName;
final String backgroundLogName;
final List<api.LogEntry> _entries = <api.LogEntry>[];
Timer? _timer;
Completer? _closeCompleter;
int _outstandingRequests = 0;
SharedLoggingService(
grpc.ClientChannel clientChannel,
grpc.HttpBasedAuthenticator authenticator,
this.projectId,
this.serviceId,
this.versionId,
String zoneId,
this._instanceName,
this.instanceId)
: _clientStub = api.LoggingServiceV2Client(clientChannel,
options: authenticator.toCallOptions),
resourceLabels = {
'project_id': projectId,
'version_id': versionId,
'module_id': serviceId,
'zone': zoneId,
},
requestLogName =
'projects/$projectId/logs/appengine.googleapis.com%2Frequest_log',
backgroundLogName =
'projects/$projectId/logs/appengine.googleapis.com%2Fstderr';
void enqueue(api.LogEntry entry) {
_addLabel(entry, 'appengine.googleapis.com/instance_name', _instanceName);
_entries.add(entry);
// If all entries have maximum size we should send them once we have 25 in
// order to avoid hitting the size limit for the RPC request.
if (_entries.length > 25) {
flush();
} else {
_timer ??= Timer(FLUSH_DURATION, flush);
}
}
void flush() {
if (_timer != null) {
_timer!.cancel();
_timer = null;
}
if (_entries.isEmpty) {
return;
}
_outstandingRequests++;
final request = api.WriteLogEntriesRequest()
..entries.addAll(_entries)
..partialSuccess =
false /* for now we want to get notified if something goes wrong */;
_entries.clear();
unawaited(() async {
try {
await _clientStub.writeLogEntries(request);
} catch (error, stack) {
// In case the logging API failed, we'll write the error message to
// stderr. The logging daemon on the VM will make another attempt at
// uploading stderr via the logging API.
stderr.writeln('An error occured while writing log entries:\n'
'Error:$error\n'
'$stack');
} finally {
_outstandingRequests--;
_maybeClose();
}
}());
}
Future close() {
assert(_closeCompleter == null);
_closeCompleter = Completer();
// Trigger a last flush which will write out remaining data (if necessary)
// and try to complete the completer if all work was already done.
flush();
_maybeClose();
return _closeCompleter!.future;
}
void _maybeClose() {
if (_outstandingRequests == 0 &&
_closeCompleter != null &&
!_closeCompleter!.isCompleted) {
_closeCompleter!.complete(null);
}
}
}
void _addLabel(api.LogEntry entry, String key, String value) {
entry.labels[key] = value;
}
api.Timestamp _protobufTimestampFromMilliseconds(int ms) {
return api.Timestamp()
..seconds = api.Int64(ms ~/ 1000)
..nanos = 1000 * 1000 * (ms % 1000);
}
api.LogSeverity _severityFromLogLevel(LogLevel level) {
switch (level) {
case LogLevel.CRITICAL:
return api.LogSeverity.CRITICAL;
case LogLevel.ERROR:
return api.LogSeverity.ERROR;
case LogLevel.WARNING:
return api.LogSeverity.WARNING;
case LogLevel.INFO:
return api.LogSeverity.INFO;
case LogLevel.DEBUG:
return api.LogSeverity.DEBUG;
}
throw ArgumentError('Unknown logevel $level');
}
/// Returns a V8-style formatted stack trace.
///
/// This returns a [String] of the this [Trace] formatted using the stack
/// trace format used by V8 in `Error.stack`, regardless of what platform is
/// being used. The first line of this string will always be `'Error:\n'` as
/// this [Trace] doesn't know which [Exception] was thrown.
///
/// This can be useful for submitting stack traces to error correlation
/// services that can parse V8 stack traces, but no Dart stack traces.
/// Though it might be useful to replace the first line with
/// [Exception.toString()] or [Error.toString()] depending what is being
/// reported.
///
/// See: https://cloud.google.com/error-reporting/docs/formatting-error-messages
String _formatStackTrace(Object error, StackTrace stackTrace) =>
'Error: $error\n${Trace.from(stackTrace).frames.map((f) {
// Find member
String? member = f.member;
if (member == '<fn>') {
member = '<anonymous>';
}
// Find a location
String loc = 'unknown location';
if (f.isCore) {
loc = 'native';
} else if (f.line != null) {
loc = '${f.uri}:${f.line}:${f.column ?? 0}';
}
return ' at $member ($loc)\n';
}).join('')}';