blob: 4354d1b750827774c5c84d505217e3cf0fe7f43f [file] [log] [blame]
// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
/**
* File, socket, HTTP, and other I/O support for server applications.
*
* The I/O library is used for Dart server applications,
* which run on a stand-alone Dart VM from the command line.
* *This library does not work in browser-based applications.*
*
* This library allows you to work with files, directories,
* sockets, processes, HTTP servers and clients, and more.
*
* To use this library in your code:
*
* import 'dart:io';
*
* *Note:* Many operations related to input and output are asynchronous
* and are handled using [Future]s or [Stream]s, both of which
* are defined in the `dart:async` library.
*
* ## File, Directory, and Link
*
* An instance of [File], [Directory], or [Link] represents a file,
* directory, or link, respectively, in the native file system.
*
* You can manipulate the file system through objects of these types.
* For example, you can rename a file or directory:
*
* File myFile = new File('myFile.txt');
* myFile.rename('yourFile.txt').then((_) => print('file renamed'));
*
* Many methods provided by the File, Directory, and Link classes
* run asynchronously and return a Future.
*
* ## FileSystemEntity
*
* File, Directory, and Link all extend [FileSystemEntity].
* In addition to being the superclass for these classes,
* FileSystemEntity has a number of static methods for working with paths.
*
* To get information about a path,
* you can use the FileSystemEntity static methods
* such as 'isDirectory', 'isFile', and 'exists'.
* Because file system access involves I/O, these methods
* are asynchronous and return a Future.
*
* FileSystemEntity.isDirectory(myPath).then((isDir) {
* if (isDir) {
* print('$myPath is a directory');
* } else {
* print('$myPath is not a directory');
* }
* });
*
* ## HttpServer and HttpClient
*
* The classes [HttpServer] and [HttpClient]
* provide HTTP server and HTTP client functionality.
*
* The [HttpServer] class provides the basic functionality for
* implementing an HTTP server.
* For some higher-level building-blocks, we recommend that you try
* the [shelf](https://pub.dartlang.org/packages/shelf)
* pub package, which contains
* a set of high-level classes that, together with the [HttpServer] class
* in this library, make it easier to implement HTTP servers.
*
* ## Process
*
* The [Process] class provides a way to run a process on
* the native machine.
* For example, the following code spawns a process that recursively lists
* the files under `web`.
*
* Process.start('ls', ['-R', 'web']).then((process) {
* stdout.addStream(process.stdout);
* stderr.addStream(process.stderr);
* process.exitCode.then(print);
* });
*
* Using `start()` returns a Future, which completes with a [Process] object
* when the process has started. This [Process] object allows you to interact
* with the process while it is running. Using `run()` returns a Future, which
* completes with a [ProcessResult] object when the spawned process has
* terminated. This [ProcessResult] object collects the output and exit code
* from the process.
*
* When using `start()`,
* you need to read all data coming on the stdout and stderr streams otherwise
* the system resources will not be freed.
*
* ## WebSocket
*
* The [WebSocket] class provides support for the web socket protocol. This
* allows full-duplex communications between client and server applications.
* Use the WebSocket class in the `dart:html` library for web clients.
*
* A web socket server uses a normal HTTP server for accepting web socket
* connections. The initial handshake is a HTTP request which is then upgraded to a
* web socket connection.
* The server upgrades the request using [WebSocketTransformer]
* and listens for the data on the returned web socket.
* For example, here's a mini server that listens for 'ws' data
* on a WebSocket:
*
* runZoned(() {
* HttpServer.bind('127.0.0.1', 4040).then((server) {
* server.listen((HttpRequest req) {
* if (req.uri.path == '/ws') {
* WebSocketTransformer.upgrade(req).then((socket) {
* socket.listen(handleMsg);
* });
* }
* });
* });
* },
* onError: (e) => print("An error occurred."));
*
* The client connects to the WebSocket using the `connect()` method
* and a URI that uses the Web Socket protocol.
* The client can write to the WebSocket with the `add()` method.
* For example,
*
* WebSocket.connect('ws://127.0.0.1:4040/ws').then((socket) {
* socket.add('Hello, World!');
* });
*
* Check out the
* [dartiverse_search](https://github.com/dart-lang/sample-dartiverse-search)
* sample for a client/server pair that uses
* WebSockets to communicate.
*
* ## Socket and ServerSocket
*
* Clients and servers use [Socket]s to communicate using the TCP protocol.
* Use [ServerSocket] on the server side and [Socket] on the client.
* The server creates a listening socket using the `bind()` method and
* then listens for incoming connections on the socket. For example:
*
* ServerSocket.bind('127.0.0.1', 4041)
* .then((serverSocket) {
* serverSocket.listen((socket) {
* socket.transform(UTF8.decoder).listen(print);
* });
* });
*
* A client connects a Socket using the `connect()` method,
* which returns a Future.
* Using `write()`, `writeln()`, or `writeAll()` are the easiest ways to
* send data over the socket.
* For example:
*
* Socket.connect('127.0.0.1', 4041).then((socket) {
* socket.write('Hello, World!');
* });
*
* Besides [Socket] and [ServerSocket], the [RawSocket] and
* [RawServerSocket] classes are available for lower-level access
* to async socket IO.
*
* ## Standard output, error, and input streams
*
* This library provides the standard output, error, and input
* streams, named 'stdout', 'stderr', and 'stdin', respectively.
*
* The stdout and stderr streams are both [IOSink]s and have the same set
* of methods and properties.
*
* To write a string to 'stdout':
*
* stdout.writeln('Hello, World!');
*
* To write a list of objects to 'stderr':
*
* stderr.writeAll([ 'That ', 'is ', 'an ', 'error.', '\n']);
*
* The standard input stream is a true [Stream], so it inherits
* properties and methods from the Stream class.
*
* To read text synchronously from the command line
* (the program blocks waiting for user to type information):
*
* String inputText = stdin.readLineSync();
*
* ## Other resources
*
* For an introduction to I/O in Dart, see the [dart:io section of the library
* tour](https://www.dartlang.org/docs/dart-up-and-running/ch03.html#dartio---io-for-command-line-apps).
*
* To learn more about I/O in Dart, refer to the [tutorial about writing
* command-line apps](https://www.dartlang.org/docs/tutorials/cmdline/).
*/
library dart.io;
import 'dart:async';
import 'dart:_internal' hide Symbol;
import 'dart:collection'
show
HashMap,
HashSet,
Queue,
ListQueue,
LinkedList,
LinkedListEntry,
UnmodifiableMapView;
import 'dart:convert';
import 'dart:developer' hide log;
import 'dart:isolate';
import 'dart:math';
import 'dart:typed_data';
import 'dart:nativewrappers';
import 'dart:nativewrappers';
part 'bytes_builder.dart';
part 'common.dart';
part 'crypto.dart';
part 'data_transformer.dart';
part 'directory.dart';
part 'directory_impl.dart';
part 'eventhandler.dart';
part 'file.dart';
part 'file_impl.dart';
part 'file_system_entity.dart';
part 'http.dart';
part 'http_date.dart';
part 'http_headers.dart';
part 'http_impl.dart';
part 'http_parser.dart';
part 'http_session.dart';
part 'io_resource_info.dart';
part 'io_sink.dart';
part 'io_service.dart';
part 'link.dart';
part 'platform.dart';
part 'platform_impl.dart';
part 'process.dart';
part 'secure_server_socket.dart';
part 'secure_socket.dart';
part 'security_context.dart';
part 'service_object.dart';
part 'socket.dart';
part 'stdio.dart';
part 'string_transformer.dart';
part 'websocket.dart';
part 'websocket_impl.dart';
List<_SignalController> _signalControllers = new List(32);
class _AsyncDirectoryListerOpsImpl extends NativeFieldWrapperClass1
implements _AsyncDirectoryListerOps {
_AsyncDirectoryListerOpsImpl._();
factory _AsyncDirectoryListerOpsImpl(int pointer) =>
new _AsyncDirectoryListerOpsImpl._().._setPointer(pointer);
void _setPointer(int pointer)
native "Directory_SetAsyncDirectoryListerPointer";
int getPointer() native "Directory_GetAsyncDirectoryListerPointer";
}
class _RandomAccessFileOpsImpl extends NativeFieldWrapperClass1
implements _RandomAccessFileOps {
_RandomAccessFileOpsImpl._();
factory _RandomAccessFileOpsImpl(int pointer) =>
new _RandomAccessFileOpsImpl._().._setPointer(pointer);
void _setPointer(int pointer) native "File_SetPointer";
int getPointer() native "File_GetPointer";
int close() native "File_Close";
readByte() native "File_ReadByte";
read(int bytes) native "File_Read";
readInto(List<int> buffer, int start, int end) native "File_ReadInto";
writeByte(int value) native "File_WriteByte";
writeFrom(List<int> buffer, int start, int end) native "File_WriteFrom";
position() native "File_Position";
setPosition(int position) native "File_SetPosition";
truncate(int length) native "File_Truncate";
length() native "File_Length";
flush() native "File_Flush";
lock(int lock, int start, int end) native "File_Lock";
}
class _WatcherPath {
final int pathId;
final String path;
final int events;
int count = 0;
_WatcherPath(this.pathId, this.path, this.events);
}
class _InotifyFileSystemWatcher extends _FileSystemWatcher {
static final Map<int, StreamController> _idMap = {};
static StreamSubscription _subscription;
_InotifyFileSystemWatcher(path, events, recursive)
: super._(path, events, recursive);
void _newWatcher() {
int id = _FileSystemWatcher._id;
_subscription =
_FileSystemWatcher._listenOnSocket(id, id, 0).listen((event) {
if (_idMap.containsKey(event[0])) {
if (event[1] != null) {
_idMap[event[0]].add(event[1]);
} else {
_idMap[event[0]].close();
}
}
});
}
void _doneWatcher() {
_subscription.cancel();
}
Stream _pathWatched() {
var pathId = _watcherPath.pathId;
if (!_idMap.containsKey(pathId)) {
_idMap[pathId] = new StreamController.broadcast();
}
return _idMap[pathId].stream;
}
void _pathWatchedEnd() {
var pathId = _watcherPath.pathId;
if (!_idMap.containsKey(pathId)) return;
_idMap[pathId].close();
_idMap.remove(pathId);
}
}
class _Win32FileSystemWatcher extends _FileSystemWatcher {
StreamSubscription _subscription;
StreamController _controller;
_Win32FileSystemWatcher(path, events, recursive)
: super._(path, events, recursive);
Stream _pathWatched() {
var pathId = _watcherPath.pathId;
_controller = new StreamController();
_subscription =
_FileSystemWatcher._listenOnSocket(pathId, 0, pathId).listen((event) {
assert(event[0] == pathId);
if (event[1] != null) {
_controller.add(event[1]);
} else {
_controller.close();
}
});
return _controller.stream;
}
void _pathWatchedEnd() {
_subscription.cancel();
_controller.close();
}
}
class _FSEventStreamFileSystemWatcher extends _FileSystemWatcher {
StreamSubscription _subscription;
StreamController _controller;
_FSEventStreamFileSystemWatcher(path, events, recursive)
: super._(path, events, recursive);
Stream _pathWatched() {
var pathId = _watcherPath.pathId;
var socketId = _FileSystemWatcher._getSocketId(0, pathId);
_controller = new StreamController();
_subscription =
_FileSystemWatcher._listenOnSocket(socketId, 0, pathId).listen((event) {
if (event[1] != null) {
_controller.add(event[1]);
} else {
_controller.close();
}
});
return _controller.stream;
}
void _pathWatchedEnd() {
_subscription.cancel();
_controller.close();
}
}
Uint8List _makeUint8ListView(Uint8List source, int offsetInBytes, int length) {
return new Uint8List.view(source.buffer, offsetInBytes, length);
}
class _FilterImpl extends NativeFieldWrapperClass1 implements _Filter {
void process(List<int> data, int start, int end) native "Filter_Process";
List<int> processed({bool flush: true, bool end: false})
native "Filter_Processed";
}
class _ZLibInflateFilter extends _FilterImpl {
_ZLibInflateFilter(int windowBits, List<int> dictionary, bool raw) {
_init(windowBits, dictionary, raw);
}
void _init(int windowBits, List<int> dictionary, bool raw)
native "Filter_CreateZLibInflate";
}
class _ZLibDeflateFilter extends _FilterImpl {
_ZLibDeflateFilter(bool gzip, int level, int windowBits, int memLevel,
int strategy, List<int> dictionary, bool raw) {
_init(gzip, level, windowBits, memLevel, strategy, dictionary, raw);
}
void _init(bool gzip, int level, int windowBits, int memLevel, int strategy,
List<int> dictionary, bool raw) native "Filter_CreateZLibDeflate";
}
_setupHooks() {
VMLibraryHooks.eventHandlerSendData = _EventHandler._sendData;
VMLibraryHooks.timerMillisecondClock = _EventHandler._timerMillisecondClock;
}
class _SignalController {
final ProcessSignal signal;
StreamController _controller;
var _id;
_SignalController(this.signal) {
_controller =
new StreamController.broadcast(onListen: _listen, onCancel: _cancel);
}
Stream<ProcessSignal> get stream => _controller.stream;
void _listen() {
var id = _setSignalHandler(signal._signalNumber);
if (id is! int) {
_controller
.addError(new SignalException("Failed to listen for $signal", id));
return;
}
_id = id;
var socket = new _RawSocket(new _NativeSocket.watch(id));
socket.listen((event) {
if (event == RawSocketEvent.READ) {
var bytes = socket.read();
for (int i = 0; i < bytes.length; i++) {
_controller.add(signal);
}
}
});
}
void _cancel() {
if (_id != null) {
_clearSignalHandler(signal._signalNumber);
_id = null;
}
}
static _setSignalHandler(int signal) native "Process_SetSignalHandler";
static int _clearSignalHandler(int signal)
native "Process_ClearSignalHandler";
}
Function _getWatchSignalInternal() => _ProcessUtils._watchSignalInternal;
class _ProcessStartStatus {
int _errorCode; // Set to OS error code if process start failed.
String _errorMessage; // Set to OS error message if process start failed.
}
class _ProcessImplNativeWrapper extends NativeFieldWrapperClass1 {}
class _ProcessImpl extends _ProcessImplNativeWrapper implements Process {
_ProcessResourceInfo _resourceInfo;
static bool connectedResourceHandler = false;
_ProcessImpl(
String path,
List<String> arguments,
this._workingDirectory,
Map<String, String> environment,
bool includeParentEnvironment,
bool runInShell,
ProcessStartMode mode)
: super() {
if (!connectedResourceHandler) {
registerExtension(
'ext.dart.io.getProcesses', _ProcessResourceInfo.getStartedProcesses);
registerExtension('ext.dart.io.getProcessById',
_ProcessResourceInfo.getProcessInfoMapById);
connectedResourceHandler = true;
}
if (runInShell) {
arguments = _getShellArguments(path, arguments);
path = _getShellCommand();
}
if (path is! String) {
throw new ArgumentError("Path is not a String: $path");
}
_path = path;
if (arguments is! List) {
throw new ArgumentError("Arguments is not a List: $arguments");
}
int len = arguments.length;
_arguments = new List<String>(len);
for (int i = 0; i < len; i++) {
var arg = arguments[i];
if (arg is! String) {
throw new ArgumentError("Non-string argument: $arg");
}
_arguments[i] = arguments[i];
if (Platform.isWindows) {
_arguments[i] = _windowsArgumentEscape(_arguments[i]);
}
}
if (_workingDirectory != null && _workingDirectory is! String) {
throw new ArgumentError(
"WorkingDirectory is not a String: $_workingDirectory");
}
_environment = [];
// Ensure that we have a non-null environment.
environment = (environment == null) ? (const {}) : environment;
if (environment is! Map) {
throw new ArgumentError("Environment is not a map: $environment");
}
environment.forEach((key, value) {
if (key is! String || value is! String) {
throw new ArgumentError(
"Environment key or value is not a string: ($key, $value)");
}
_environment.add('$key=$value');
});
if (includeParentEnvironment) {
Platform.environment.forEach((key, value) {
assert(key is String);
assert(value is String);
// Do not override keys already set as part of environment.
if (!environment.containsKey(key)) {
_environment.add('$key=$value');
}
});
}
if (mode is! ProcessStartMode) {
throw new ArgumentError("Mode is not a ProcessStartMode: $mode");
}
_mode = mode;
if (mode != ProcessStartMode.DETACHED) {
// stdin going to process.
_stdin = new _StdSink(new _Socket._writePipe());
_stdin._sink._owner = this;
// stdout coming from process.
_stdout = new _StdStream(new _Socket._readPipe());
_stdout._stream._owner = this;
// stderr coming from process.
_stderr = new _StdStream(new _Socket._readPipe());
_stderr._stream._owner = this;
}
if (mode == ProcessStartMode.NORMAL) {
_exitHandler = new _Socket._readPipe();
}
_ended = false;
_started = false;
}
static String _getShellCommand() {
if (Platform.isWindows) {
return 'cmd.exe';
}
return '/bin/sh';
}
static List<String> _getShellArguments(
String executable, List<String> arguments) {
List<String> shellArguments = [];
if (Platform.isWindows) {
shellArguments.add('/c');
shellArguments.add(executable);
for (var arg in arguments) {
shellArguments.add(arg);
}
} else {
var commandLine = new StringBuffer();
executable = executable.replaceAll("'", "'\"'\"'");
commandLine.write("'$executable'");
shellArguments.add("-c");
for (var arg in arguments) {
arg = arg.replaceAll("'", "'\"'\"'");
commandLine.write(" '$arg'");
}
shellArguments.add(commandLine.toString());
}
return shellArguments;
}
String _windowsArgumentEscape(String argument) {
var result = argument;
if (argument.contains('\t') ||
argument.contains(' ') ||
argument.contains('"')) {
// Produce something that the C runtime on Windows will parse
// back as this string.
// Replace any number of '\' followed by '"' with
// twice as many '\' followed by '\"'.
var backslash = '\\'.codeUnitAt(0);
var sb = new StringBuffer();
var nextPos = 0;
var quotePos = argument.indexOf('"', nextPos);
while (quotePos != -1) {
var numBackslash = 0;
var pos = quotePos - 1;
while (pos >= 0 && argument.codeUnitAt(pos) == backslash) {
numBackslash++;
pos--;
}
sb.write(argument.substring(nextPos, quotePos - numBackslash));
for (var i = 0; i < numBackslash; i++) {
sb.write(r'\\');
}
sb.write(r'\"');
nextPos = quotePos + 1;
quotePos = argument.indexOf('"', nextPos);
}
sb.write(argument.substring(nextPos, argument.length));
result = sb.toString();
// Add '"' at the beginning and end and replace all '\' at
// the end with two '\'.
sb = new StringBuffer('"');
sb.write(result);
nextPos = argument.length - 1;
while (argument.codeUnitAt(nextPos) == backslash) {
sb.write('\\');
nextPos--;
}
sb.write('"');
result = sb.toString();
}
return result;
}
int _intFromBytes(List<int> bytes, int offset) {
return (bytes[offset] +
(bytes[offset + 1] << 8) +
(bytes[offset + 2] << 16) +
(bytes[offset + 3] << 24));
}
Future<Process> _start() {
var completer = new Completer();
if (_mode == ProcessStartMode.NORMAL) {
_exitCode = new Completer<int>();
}
// TODO(ager): Make the actual process starting really async instead of
// simulating it with a timer.
Timer.run(() {
var status = new _ProcessStartStatus();
bool success = _startNative(
_path,
_arguments,
_workingDirectory,
_environment,
_mode.index,
_mode == ProcessStartMode.DETACHED
? null
: _stdin._sink._nativeSocket,
_mode == ProcessStartMode.DETACHED
? null
: _stdout._stream._nativeSocket,
_mode == ProcessStartMode.DETACHED
? null
: _stderr._stream._nativeSocket,
_mode != ProcessStartMode.NORMAL ? null : _exitHandler._nativeSocket,
status);
if (!success) {
completer.completeError(new ProcessException(
_path, _arguments, status._errorMessage, status._errorCode));
return;
}
_started = true;
_resourceInfo = new _ProcessResourceInfo(this);
// Setup an exit handler to handle internal cleanup and possible
// callback when a process terminates.
if (_mode == ProcessStartMode.NORMAL) {
int exitDataRead = 0;
final int EXIT_DATA_SIZE = 8;
List<int> exitDataBuffer = new List<int>(EXIT_DATA_SIZE);
_exitHandler.listen((data) {
int exitCode(List<int> ints) {
var code = _intFromBytes(ints, 0);
var negative = _intFromBytes(ints, 4);
assert(negative == 0 || negative == 1);
return (negative == 0) ? code : -code;
}
void handleExit() {
_ended = true;
_exitCode.complete(exitCode(exitDataBuffer));
// Kill stdin, helping hand if the user forgot to do it.
_stdin._sink.destroy();
_resourceInfo.stopped();
}
exitDataBuffer.setRange(
exitDataRead, exitDataRead + data.length, data);
exitDataRead += data.length;
if (exitDataRead == EXIT_DATA_SIZE) {
handleExit();
}
});
}
completer.complete(this);
});
return completer.future;
}
ProcessResult _runAndWait(Encoding stdoutEncoding, Encoding stderrEncoding) {
var status = new _ProcessStartStatus();
_exitCode = new Completer<int>();
bool success = _startNative(
_path,
_arguments,
_workingDirectory,
_environment,
ProcessStartMode.NORMAL.index,
_stdin._sink._nativeSocket,
_stdout._stream._nativeSocket,
_stderr._stream._nativeSocket,
_exitHandler._nativeSocket,
status);
if (!success) {
throw new ProcessException(
_path, _arguments, status._errorMessage, status._errorCode);
}
_resourceInfo = new _ProcessResourceInfo(this);
var result = _wait(
_stdin._sink._nativeSocket,
_stdout._stream._nativeSocket,
_stderr._stream._nativeSocket,
_exitHandler._nativeSocket);
getOutput(output, encoding) {
if (encoding == null) return output;
return encoding.decode(output);
}
_resourceInfo.stopped();
return new ProcessResult(
result[0],
result[1],
getOutput(result[2], stdoutEncoding),
getOutput(result[3], stderrEncoding));
}
bool _startNative(
String path,
List<String> arguments,
String workingDirectory,
List<String> environment,
int mode,
_NativeSocket stdin,
_NativeSocket stdout,
_NativeSocket stderr,
_NativeSocket exitHandler,
_ProcessStartStatus status) native "Process_Start";
_wait(_NativeSocket stdin, _NativeSocket stdout, _NativeSocket stderr,
_NativeSocket exitHandler) native "Process_Wait";
Stream<List<int>> get stdout {
return _stdout;
}
Stream<List<int>> get stderr {
return _stderr;
}
IOSink get stdin {
return _stdin;
}
Future<int> get exitCode => _exitCode != null ? _exitCode.future : null;
bool kill([ProcessSignal signal = ProcessSignal.SIGTERM]) {
if (signal is! ProcessSignal) {
throw new ArgumentError("Argument 'signal' must be a ProcessSignal");
}
assert(_started);
if (_ended) return false;
return _ProcessUtils._killPid(pid, signal._signalNumber);
}
int get pid => _ProcessUtils._pid(this);
String _path;
List<String> _arguments;
String _workingDirectory;
List<String> _environment;
ProcessStartMode _mode;
// Private methods of Socket are used by _in, _out, and _err.
_StdSink _stdin;
_StdStream _stdout;
_StdStream _stderr;
Socket _exitHandler;
bool _ended;
bool _started;
Completer<int> _exitCode;
}
Future<ProcessResult> _runNonInteractiveProcess(
String path,
List<String> arguments,
String workingDirectory,
Map<String, String> environment,
bool includeParentEnvironment,
bool runInShell,
Encoding stdoutEncoding,
Encoding stderrEncoding) {
// Start the underlying process.
return Process
.start(path, arguments,
workingDirectory: workingDirectory,
environment: environment,
includeParentEnvironment: includeParentEnvironment,
runInShell: runInShell)
.then((Process p) {
int pid = p.pid;
// Make sure the process stdin is closed.
p.stdin.close();
// Setup stdout and stderr handling.
Future foldStream(Stream<List<int>> stream, Encoding encoding) {
if (encoding == null) {
return stream
.fold(new BytesBuilder(), (builder, data) => builder..add(data))
.then((builder) => builder.takeBytes());
} else {
return stream.transform(encoding.decoder).fold(new StringBuffer(),
(buf, data) {
buf.write(data);
return buf;
}).then((sb) => sb.toString());
}
}
Future stdout = foldStream(p.stdout, stdoutEncoding);
Future stderr = foldStream(p.stderr, stderrEncoding);
return Future.wait([p.exitCode, stdout, stderr]).then((result) {
return new ProcessResult(pid, result[0], result[1], result[2]);
});
});
}
ProcessResult _runNonInteractiveProcessSync(
String executable,
List<String> arguments,
String workingDirectory,
Map<String, String> environment,
bool includeParentEnvironment,
bool runInShell,
Encoding stdoutEncoding,
Encoding stderrEncoding) {
var process = new _ProcessImpl(
executable,
arguments,
workingDirectory,
environment,
includeParentEnvironment,
runInShell,
ProcessStartMode.NORMAL);
return process._runAndWait(stdoutEncoding, stderrEncoding);
}
void _throwOnBadPort(int port) {
if ((port == null) || (port < 0) || (port > 0xFFFF)) {
throw new ArgumentError("Invalid port $port");
}
}
class _InternetAddress implements InternetAddress {
static const int _ADDRESS_LOOPBACK_IP_V4 = 0;
static const int _ADDRESS_LOOPBACK_IP_V6 = 1;
static const int _ADDRESS_ANY_IP_V4 = 2;
static const int _ADDRESS_ANY_IP_V6 = 3;
static const int _IPV4_ADDR_LENGTH = 4;
static const int _IPV6_ADDR_LENGTH = 16;
static _InternetAddress LOOPBACK_IP_V4 =
new _InternetAddress.fixed(_ADDRESS_LOOPBACK_IP_V4);
static _InternetAddress LOOPBACK_IP_V6 =
new _InternetAddress.fixed(_ADDRESS_LOOPBACK_IP_V6);
static _InternetAddress ANY_IP_V4 =
new _InternetAddress.fixed(_ADDRESS_ANY_IP_V4);
static _InternetAddress ANY_IP_V6 =
new _InternetAddress.fixed(_ADDRESS_ANY_IP_V6);
final String address;
final String _host;
final Uint8List _in_addr;
InternetAddressType get type => _in_addr.length == _IPV4_ADDR_LENGTH
? InternetAddressType.IP_V4
: InternetAddressType.IP_V6;
String get host => _host != null ? _host : address;
List<int> get rawAddress => new Uint8List.fromList(_in_addr);
bool get isLoopback {
switch (type) {
case InternetAddressType.IP_V4:
return _in_addr[0] == 127;
case InternetAddressType.IP_V6:
for (int i = 0; i < _IPV6_ADDR_LENGTH - 1; i++) {
if (_in_addr[i] != 0) return false;
}
return _in_addr[_IPV6_ADDR_LENGTH - 1] == 1;
}
}
bool get isLinkLocal {
switch (type) {
case InternetAddressType.IP_V4:
// Checking for 169.254.0.0/16.
return _in_addr[0] == 169 && _in_addr[1] == 254;
case InternetAddressType.IP_V6:
// Checking for fe80::/10.
return _in_addr[0] == 0xFE && (_in_addr[1] & 0xB0) == 0x80;
}
}
bool get isMulticast {
switch (type) {
case InternetAddressType.IP_V4:
// Checking for 224.0.0.0 through 239.255.255.255.
return _in_addr[0] >= 224 && _in_addr[0] < 240;
case InternetAddressType.IP_V6:
// Checking for ff00::/8.
return _in_addr[0] == 0xFF;
}
}
Future<InternetAddress> reverse() => _NativeSocket.reverseLookup(this);
_InternetAddress(this.address, this._host, this._in_addr);
factory _InternetAddress.parse(String address) {
if (address is! String) {
throw new ArgumentError("Invalid internet address $address");
}
var in_addr = _parse(address);
if (in_addr == null) {
throw new ArgumentError("Invalid internet address $address");
}
return new _InternetAddress(address, null, in_addr);
}
factory _InternetAddress.fixed(int id) {
switch (id) {
case _ADDRESS_LOOPBACK_IP_V4:
var in_addr = new Uint8List(_IPV4_ADDR_LENGTH);
in_addr[0] = 127;
in_addr[_IPV4_ADDR_LENGTH - 1] = 1;
return new _InternetAddress("127.0.0.1", null, in_addr);
case _ADDRESS_LOOPBACK_IP_V6:
var in_addr = new Uint8List(_IPV6_ADDR_LENGTH);
in_addr[_IPV6_ADDR_LENGTH - 1] = 1;
return new _InternetAddress("::1", null, in_addr);
case _ADDRESS_ANY_IP_V4:
var in_addr = new Uint8List(_IPV4_ADDR_LENGTH);
return new _InternetAddress("0.0.0.0", "0.0.0.0", in_addr);
case _ADDRESS_ANY_IP_V6:
var in_addr = new Uint8List(_IPV6_ADDR_LENGTH);
return new _InternetAddress("::", "::", in_addr);
default:
assert(false);
throw new ArgumentError();
}
}
// Create a clone of this _InternetAddress replacing the host.
_InternetAddress _cloneWithNewHost(String host) {
return new _InternetAddress(
address, host, new Uint8List.fromList(_in_addr));
}
bool operator ==(other) {
if (!(other is _InternetAddress)) return false;
if (other.type != type) return false;
bool equals = true;
for (int i = 0; i < _in_addr.length && equals; i++) {
equals = other._in_addr[i] == _in_addr[i];
}
return equals;
}
int get hashCode {
int result = 1;
for (int i = 0; i < _in_addr.length; i++) {
result = (result * 31 + _in_addr[i]) & 0x3FFFFFFF;
}
return result;
}
String toString() {
return "InternetAddress('$address', ${type.name})";
}
static Uint8List _parse(String address) native "InternetAddress_Parse";
}
class _NetworkInterface implements NetworkInterface {
final String name;
final int index;
final List<InternetAddress> addresses = [];
_NetworkInterface(this.name, this.index);
String toString() {
return "NetworkInterface('$name', $addresses)";
}
}
class _NativeSocketNativeWrapper extends NativeFieldWrapperClass1 {}
/**
* _X509CertificateImpl wraps an X509 certificate object held by the BoringSSL
* library. It exposes the fields of the certificate object.
*/
class _X509CertificateImpl extends NativeFieldWrapperClass1
implements X509Certificate {
// The native field must be set manually on a new object, in native code.
// This is done by WrappedX509 in secure_socket.cc.
_X509CertificateImpl();
String get subject native "X509_Subject";
String get issuer native "X509_Issuer";
DateTime get startValidity {
return new DateTime.fromMillisecondsSinceEpoch(_startValidity(),
isUtc: true);
}
DateTime get endValidity {
return new DateTime.fromMillisecondsSinceEpoch(_endValidity(), isUtc: true);
}
int _startValidity() native "X509_StartValidity";
int _endValidity() native "X509_EndValidity";
}
class _RawServerSocket extends Stream<RawSocket> implements RawServerSocket {
final _NativeSocket _socket;
StreamController<RawSocket> _controller;
ReceivePort _referencePort;
bool _v6Only;
static Future<_RawServerSocket> bind(
address, int port, int backlog, bool v6Only, bool shared) {
_throwOnBadPort(port);
if (backlog < 0) throw new ArgumentError("Invalid backlog $backlog");
return _NativeSocket
.bind(address, port, backlog, v6Only, shared)
.then((socket) => new _RawServerSocket(socket, v6Only));
}
_RawServerSocket(this._socket, this._v6Only);
StreamSubscription<RawSocket> listen(void onData(RawSocket event),
{Function onError, void onDone(), bool cancelOnError}) {
if (_controller != null) {
throw new StateError("Stream was already listened to");
}
var zone = Zone.current;
_controller = new StreamController(
sync: true,
onListen: _onSubscriptionStateChange,
onCancel: _onSubscriptionStateChange,
onPause: _onPauseStateChange,
onResume: _onPauseStateChange);
_socket.setHandlers(read: zone.bindCallback(() {
while (_socket.available > 0) {
var socket = _socket.accept();
if (socket == null) return;
_controller.add(new _RawSocket(socket));
if (_controller.isPaused) return;
}
}), error: zone.bindUnaryCallback((e) {
_controller.addError(e);
_controller.close();
}), destroyed: () {
_controller.close();
if (_referencePort != null) {
_referencePort.close();
_referencePort = null;
}
});
return _controller.stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
int get port => _socket.port;
InternetAddress get address => _socket.address;
Future close() {
return _socket.close().then((_) {
if (_referencePort != null) {
_referencePort.close();
_referencePort = null;
}
return this;
});
}
void _pause() {
_socket.setListening(read: false, write: false);
}
void _resume() {
_socket.setListening(read: true, write: false);
}
void _onSubscriptionStateChange() {
if (_controller.hasListener) {
_resume();
} else {
_socket.close();
}
}
void _onPauseStateChange() {
if (_controller.isPaused) {
_pause();
} else {
_resume();
}
}
void set _owner(owner) {
_socket.owner = owner;
}
}
class _RawSocket extends Stream<RawSocketEvent> implements RawSocket {
final _NativeSocket _socket;
StreamController<RawSocketEvent> _controller;
bool _readEventsEnabled = true;
bool _writeEventsEnabled = true;
// Flag to handle Ctrl-D closing of stdio on Mac OS.
bool _isMacOSTerminalInput = false;
static Future<RawSocket> connect(host, int port, sourceAddress) {
return _NativeSocket
.connect(host, port, sourceAddress)
.then((socket) => new _RawSocket(socket));
}
_RawSocket(this._socket) {
var zone = Zone.current;
_controller = new StreamController(
sync: true,
onListen: _onSubscriptionStateChange,
onCancel: _onSubscriptionStateChange,
onPause: _onPauseStateChange,
onResume: _onPauseStateChange);
_socket.setHandlers(
read: () => _controller.add(RawSocketEvent.READ),
write: () {
// The write event handler is automatically disabled by the
// event handler when it fires.
_writeEventsEnabled = false;
_controller.add(RawSocketEvent.WRITE);
},
closed: () => _controller.add(RawSocketEvent.READ_CLOSED),
destroyed: () {
_controller.add(RawSocketEvent.CLOSED);
_controller.close();
},
error: zone.bindUnaryCallback((e) {
_controller.addError(e);
_socket.close();
}));
}
factory _RawSocket._writePipe() {
var native = new _NativeSocket.pipe();
native.isClosedRead = true;
native.closedReadEventSent = true;
return new _RawSocket(native);
}
factory _RawSocket._readPipe(int fd) {
var native = new _NativeSocket.pipe();
native.isClosedWrite = true;
if (fd != null) _getStdioHandle(native, fd);
var result = new _RawSocket(native);
if (fd != null) {
var socketType = _StdIOUtils._nativeSocketType(result._socket);
result._isMacOSTerminalInput =
Platform.isMacOS && socketType == _STDIO_HANDLE_TYPE_TERMINAL;
}
return result;
}
StreamSubscription<RawSocketEvent> listen(void onData(RawSocketEvent event),
{Function onError, void onDone(), bool cancelOnError}) {
return _controller.stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
int available() => _socket.available;
List<int> read([int len]) {
if (_isMacOSTerminalInput) {
var available = this.available();
if (available == 0) return null;
var data = _socket.read(len);
if (data == null || data.length < available) {
// Reading less than available from a Mac OS terminal indicate Ctrl-D.
// This is interpreted as read closed.
scheduleMicrotask(() => _controller.add(RawSocketEvent.READ_CLOSED));
}
return data;
} else {
return _socket.read(len);
}
}
int write(List<int> buffer, [int offset, int count]) =>
_socket.write(buffer, offset, count);
Future close() => _socket.close().then((_) => this);
void shutdown(SocketDirection direction) => _socket.shutdown(direction);
int get port => _socket.port;
int get remotePort => _socket.remotePort;
InternetAddress get address => _socket.address;
InternetAddress get remoteAddress => _socket.remoteAddress;
bool get readEventsEnabled => _readEventsEnabled;
void set readEventsEnabled(bool value) {
if (value != _readEventsEnabled) {
_readEventsEnabled = value;
if (!_controller.isPaused) _resume();
}
}
bool get writeEventsEnabled => _writeEventsEnabled;
void set writeEventsEnabled(bool value) {
if (value != _writeEventsEnabled) {
_writeEventsEnabled = value;
if (!_controller.isPaused) _resume();
}
}
bool setOption(SocketOption option, bool enabled) =>
_socket.setOption(option, enabled);
_pause() {
_socket.setListening(read: false, write: false);
}
void _resume() {
_socket.setListening(read: _readEventsEnabled, write: _writeEventsEnabled);
}
void _onPauseStateChange() {
if (_controller.isPaused) {
_pause();
} else {
_resume();
}
}
void _onSubscriptionStateChange() {
if (_controller.hasListener) {
_resume();
} else {
_socket.close();
}
}
void set _owner(owner) {
_socket.owner = owner;
}
}
class _ServerSocket extends Stream<Socket> implements ServerSocket {
final _socket;
static Future<_ServerSocket> bind(
address, int port, int backlog, bool v6Only, bool shared) {
return _RawServerSocket
.bind(address, port, backlog, v6Only, shared)
.then((socket) => new _ServerSocket(socket));
}
_ServerSocket(this._socket);
StreamSubscription<Socket> listen(void onData(Socket event),
{Function onError, void onDone(), bool cancelOnError}) {
return _socket.map((rawSocket) => new _Socket(rawSocket)).listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
int get port => _socket.port;
InternetAddress get address => _socket.address;
Future close() => _socket.close().then((_) => this);
void set _owner(owner) {
_socket._owner = owner;
}
}
class _SocketStreamConsumer extends StreamConsumer<List<int>> {
StreamSubscription subscription;
final _Socket socket;
int offset;
List<int> buffer;
bool paused = false;
Completer streamCompleter;
_SocketStreamConsumer(this.socket);
Future<Socket> addStream(Stream<List<int>> stream) {
socket._ensureRawSocketSubscription();
streamCompleter = new Completer<Socket>();
if (socket._raw != null) {
subscription = stream.listen((data) {
assert(!paused);
assert(buffer == null);
buffer = data;
offset = 0;
try {
write();
} catch (e) {
socket.destroy();
stop();
done(e);
}
}, onError: (error, [stackTrace]) {
socket.destroy();
done(error, stackTrace);
}, onDone: () {
done();
}, cancelOnError: true);
}
return streamCompleter.future;
}
Future<Socket> close() {
socket._consumerDone();
return new Future.value(socket);
}
void write() {
if (subscription == null) return;
assert(buffer != null);
// Write as much as possible.
offset += socket._write(buffer, offset, buffer.length - offset);
if (offset < buffer.length) {
if (!paused) {
paused = true;
subscription.pause();
}
socket._enableWriteEvent();
} else {
buffer = null;
if (paused) {
paused = false;
subscription.resume();
}
}
}
void done([error, stackTrace]) {
if (streamCompleter != null) {
if (error != null) {
streamCompleter.completeError(error, stackTrace);
} else {
streamCompleter.complete(socket);
}
streamCompleter = null;
}
}
void stop() {
if (subscription == null) return;
subscription.cancel();
subscription = null;
paused = false;
socket._disableWriteEvent();
}
}
class _Socket extends Stream<List<int>> implements Socket {
RawSocket _raw; // Set to null when the raw socket is closed.
bool _closed = false; // Set to true when the raw socket is closed.
StreamController _controller;
bool _controllerClosed = false;
_SocketStreamConsumer _consumer;
IOSink _sink;
var _subscription;
var _detachReady;
_Socket(this._raw) {
_controller = new StreamController<List<int>>(
sync: true,
onListen: _onSubscriptionStateChange,
onCancel: _onSubscriptionStateChange,
onPause: _onPauseStateChange,
onResume: _onPauseStateChange);
_consumer = new _SocketStreamConsumer(this);
_sink = new IOSink(_consumer);
// Disable read events until there is a subscription.
_raw.readEventsEnabled = false;
// Disable write events until the consumer needs it for pending writes.
_raw.writeEventsEnabled = false;
}
factory _Socket._writePipe() {
return new _Socket(new _RawSocket._writePipe());
}
factory _Socket._readPipe([int fd]) {
return new _Socket(new _RawSocket._readPipe(fd));
}
_NativeSocket get _nativeSocket => _raw._socket;
StreamSubscription<List<int>> listen(void onData(List<int> event),
{Function onError, void onDone(), bool cancelOnError}) {
return _controller.stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
Encoding get encoding => _sink.encoding;
void set encoding(Encoding value) {
_sink.encoding = value;
}
void write(Object obj) => _sink.write(obj);
void writeln([Object obj = ""]) => _sink.writeln(obj);
void writeCharCode(int charCode) => _sink.writeCharCode(charCode);
void writeAll(Iterable objects, [sep = ""]) => _sink.writeAll(objects, sep);
void add(List<int> bytes) => _sink.add(bytes);
Future<Socket> addStream(Stream<List<int>> stream) {
return _sink.addStream(stream);
}
Future<Socket> flush() => _sink.flush();
Future<Socket> close() => _sink.close();
Future<Socket> get done => _sink.done;
void destroy() {
// Destroy can always be called to get rid of a socket.
if (_raw == null) return;
_consumer.stop();
_closeRawSocket();
_controllerClosed = true;
_controller.close();
}
bool setOption(SocketOption option, bool enabled) {
if (_raw == null) return false;
return _raw.setOption(option, enabled);
}
int get port {
if (_raw == null) throw const SocketException.closed();
;
return _raw.port;
}
InternetAddress get address {
if (_raw == null) throw const SocketException.closed();
;
return _raw.address;
}
int get remotePort {
if (_raw == null) throw const SocketException.closed();
;
return _raw.remotePort;
}
InternetAddress get remoteAddress {
if (_raw == null) throw const SocketException.closed();
;
return _raw.remoteAddress;
}
Future _detachRaw() {
_detachReady = new Completer();
_sink.close();
return _detachReady.future.then((_) {
assert(_consumer.buffer == null);
var raw = _raw;
_raw = null;
return [raw, _subscription];
});
}
// Ensure a subscription on the raw socket. Both the stream and the
// consumer needs a subscription as they share the error and done
// events from the raw socket.
void _ensureRawSocketSubscription() {
if (_subscription == null && _raw != null) {
_subscription = _raw.listen(_onData,
onError: _onError, onDone: _onDone, cancelOnError: true);
}
}
_closeRawSocket() {
var tmp = _raw;
_raw = null;
_closed = true;
tmp.close();
}
void _onSubscriptionStateChange() {
if (_controller.hasListener) {
_ensureRawSocketSubscription();
// Enable read events for providing data to subscription.
if (_raw != null) {
_raw.readEventsEnabled = true;
}
} else {
_controllerClosed = true;
if (_raw != null) {
_raw.shutdown(SocketDirection.RECEIVE);
}
}
}
void _onPauseStateChange() {
if (_raw != null) {
_raw.readEventsEnabled = !_controller.isPaused;
}
}
void _onData(event) {
switch (event) {
case RawSocketEvent.READ:
var buffer = _raw.read();
if (buffer != null) _controller.add(buffer);
break;
case RawSocketEvent.WRITE:
_consumer.write();
break;
case RawSocketEvent.READ_CLOSED:
_controllerClosed = true;
_controller.close();
break;
}
}
void _onDone() {
if (!_controllerClosed) {
_controllerClosed = true;
_controller.close();
}
_consumer.done();
}
void _onError(error, stackTrace) {
if (!_controllerClosed) {
_controllerClosed = true;
_controller.addError(error, stackTrace);
_controller.close();
}
_consumer.done(error, stackTrace);
}
int _write(List<int> data, int offset, int length) =>
_raw.write(data, offset, length);
void _enableWriteEvent() {
_raw.writeEventsEnabled = true;
}
void _disableWriteEvent() {
if (_raw != null) {
_raw.writeEventsEnabled = false;
}
}
void _consumerDone() {
if (_detachReady != null) {
_detachReady.complete(null);
} else {
if (_raw != null) {
_raw.shutdown(SocketDirection.SEND);
_disableWriteEvent();
}
}
}
void set _owner(owner) {
_raw._owner = owner;
}
}
class _RawDatagramSocket extends Stream implements RawDatagramSocket {
_NativeSocket _socket;
StreamController<RawSocketEvent> _controller;
bool _readEventsEnabled = true;
bool _writeEventsEnabled = true;
_RawDatagramSocket(this._socket) {
var zone = Zone.current;
_controller = new StreamController(
sync: true,
onListen: _onSubscriptionStateChange,
onCancel: _onSubscriptionStateChange,
onPause: _onPauseStateChange,
onResume: _onPauseStateChange);
_socket.setHandlers(
read: () => _controller.add(RawSocketEvent.READ),
write: () {
// The write event handler is automatically disabled by the
// event handler when it fires.
_writeEventsEnabled = false;
_controller.add(RawSocketEvent.WRITE);
},
closed: () => _controller.add(RawSocketEvent.READ_CLOSED),
destroyed: () {
_controller.add(RawSocketEvent.CLOSED);
_controller.close();
},
error: zone.bindUnaryCallback((e) {
_controller.addError(e);
_socket.close();
}));
}
static Future<RawDatagramSocket> bind(host, int port, bool reuseAddress) {
_throwOnBadPort(port);
return _NativeSocket
.bindDatagram(host, port, reuseAddress)
.then((socket) => new _RawDatagramSocket(socket));
}
StreamSubscription<RawSocketEvent> listen(void onData(RawSocketEvent event),
{Function onError, void onDone(), bool cancelOnError}) {
return _controller.stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
Future close() => _socket.close().then((_) => this);
int send(List<int> buffer, InternetAddress address, int port) =>
_socket.send(buffer, 0, buffer.length, address, port);
Datagram receive() {
return _socket.receive();
}
void joinMulticast(InternetAddress group, [NetworkInterface interface]) {
_socket.joinMulticast(group, interface);
}
void leaveMulticast(InternetAddress group, [NetworkInterface interface]) {
_socket.leaveMulticast(group, interface);
}
bool get readEventsEnabled => _readEventsEnabled;
void set readEventsEnabled(bool value) {
if (value != _readEventsEnabled) {
_readEventsEnabled = value;
if (!_controller.isPaused) _resume();
}
}
bool get writeEventsEnabled => _writeEventsEnabled;
void set writeEventsEnabled(bool value) {
if (value != _writeEventsEnabled) {
_writeEventsEnabled = value;
if (!_controller.isPaused) _resume();
}
}
bool get multicastLoopback =>
_socket.getOption(SocketOption._IP_MULTICAST_LOOP);
void set multicastLoopback(bool value) =>
_socket.setOption(SocketOption._IP_MULTICAST_LOOP, value);
int get multicastHops => _socket.getOption(SocketOption._IP_MULTICAST_HOPS);
void set multicastHops(int value) =>
_socket.setOption(SocketOption._IP_MULTICAST_HOPS, value);
NetworkInterface get multicastInterface => throw "Not implemented";
void set multicastInterface(NetworkInterface value) =>
throw "Not implemented";
bool get broadcastEnabled => _socket.getOption(SocketOption._IP_BROADCAST);
void set broadcastEnabled(bool value) =>
_socket.setOption(SocketOption._IP_BROADCAST, value);
int get port => _socket.port;
InternetAddress get address => _socket.address;
_pause() {
_socket.setListening(read: false, write: false);
}
void _resume() {
_socket.setListening(read: _readEventsEnabled, write: _writeEventsEnabled);
}
void _onPauseStateChange() {
if (_controller.isPaused) {
_pause();
} else {
_resume();
}
}
void _onSubscriptionStateChange() {
if (_controller.hasListener) {
_resume();
} else {
_socket.close();
}
}
}
Datagram _makeDatagram(
List<int> data, String address, List<int> in_addr, int port) {
return new Datagram(data, new _InternetAddress(address, null, in_addr), port);
}
_getStdioHandle(_NativeSocket socket, int num) native "Socket_GetStdioHandle";
_getSocketType(_NativeSocket nativeSocket) native "Socket_GetType";
class _SecureSocket extends _Socket implements SecureSocket {
_SecureSocket(RawSecureSocket raw) : super(raw);
void set onBadCertificate(bool callback(X509Certificate certificate)) {
if (_raw == null) {
throw new StateError("onBadCertificate called on destroyed SecureSocket");
}
_raw.onBadCertificate = callback;
}
void renegotiate(
{bool useSessionCache: true,
bool requestClientCertificate: false,
bool requireClientCertificate: false}) {
_raw.renegotiate(
useSessionCache: useSessionCache,
requestClientCertificate: requestClientCertificate,
requireClientCertificate: requireClientCertificate);
}
X509Certificate get peerCertificate {
if (_raw == null) {
throw new StateError("peerCertificate called on destroyed SecureSocket");
}
return _raw.peerCertificate;
}
String get selectedProtocol {
if (_raw == null) {
throw new StateError("selectedProtocol called on destroyed SecureSocket");
}
return _raw.selectedProtocol;
}
}
/**
* _SecureFilterImpl wraps a filter that encrypts and decrypts data travelling
* over an encrypted socket. The filter also handles the handshaking
* and certificate verification.
*
* The filter exposes its input and output buffers as Dart objects that
* are backed by an external C array of bytes, so that both Dart code and
* native code can access the same data.
*/
class _SecureFilterImpl extends NativeFieldWrapperClass1
implements _SecureFilter {
// Performance is improved if a full buffer of plaintext fits
// in the encrypted buffer, when encrypted.
static final int SIZE = 8 * 1024;
static final int ENCRYPTED_SIZE = 10 * 1024;
_SecureFilterImpl() {
buffers = new List<_ExternalBuffer>(_RawSecureSocket.NUM_BUFFERS);
for (int i = 0; i < _RawSecureSocket.NUM_BUFFERS; ++i) {
buffers[i] = new _ExternalBuffer(
_RawSecureSocket._isBufferEncrypted(i) ? ENCRYPTED_SIZE : SIZE);
}
}
void connect(
String hostName,
SecurityContext context,
bool is_server,
bool requestClientCertificate,
bool requireClientCertificate,
Uint8List protocols) native "SecureSocket_Connect";
void destroy() {
buffers = null;
_destroy();
}
void _destroy() native "SecureSocket_Destroy";
void handshake() native "SecureSocket_Handshake";
String selectedProtocol() native "SecureSocket_GetSelectedProtocol";
void renegotiate(bool useSessionCache, bool requestClientCertificate,
bool requireClientCertificate) native "SecureSocket_Renegotiate";
void init() native "SecureSocket_Init";
X509Certificate get peerCertificate native "SecureSocket_PeerCertificate";
void registerBadCertificateCallback(Function callback)
native "SecureSocket_RegisterBadCertificateCallback";
void registerHandshakeCompleteCallback(Function handshakeCompleteHandler)
native "SecureSocket_RegisterHandshakeCompleteCallback";
// This is a security issue, as it exposes a raw pointer to Dart code.
int _pointer() native "SecureSocket_FilterPointer";
List<_ExternalBuffer> buffers;
}
class _SecurityContext extends NativeFieldWrapperClass1
implements SecurityContext {
_SecurityContext() {
_createNativeContext();
}
void _createNativeContext() native "SecurityContext_Allocate";
static final SecurityContext defaultContext = new _SecurityContext()
.._trustBuiltinRoots();
void usePrivateKey(String file, {String password}) {
List<int> bytes = (new File(file)).readAsBytesSync();
usePrivateKeyBytes(bytes, password: password);
}
void usePrivateKeyBytes(List<int> keyBytes, {String password})
native "SecurityContext_UsePrivateKeyBytes";
void setTrustedCertificates(String file, {String password}) {
List<int> bytes = (new File(file)).readAsBytesSync();
setTrustedCertificatesBytes(bytes, password: password);
}
void setTrustedCertificatesBytes(List<int> certBytes, {String password})
native "SecurityContext_SetTrustedCertificatesBytes";
void useCertificateChain(String file, {String password}) {
List<int> bytes = (new File(file)).readAsBytesSync();
useCertificateChainBytes(bytes, password: password);
}
void useCertificateChainBytes(List<int> chainBytes, {String password})
native "SecurityContext_UseCertificateChainBytes";
void setClientAuthorities(String file, {String password}) {
List<int> bytes = (new File(file)).readAsBytesSync();
setClientAuthoritiesBytes(bytes, password: password);
}
void setClientAuthoritiesBytes(List<int> authCertBytes, {String password})
native "SecurityContext_SetClientAuthoritiesBytes";
static bool get alpnSupported => _alpnSupported();
static bool _alpnSupported() native "SecurityContext_AlpnSupported";
void setAlpnProtocols(List<String> protocols, bool isServer) {
Uint8List encodedProtocols =
SecurityContext._protocolsToLengthEncoding(protocols);
_setAlpnProtocols(encodedProtocols, isServer);
}
void _setAlpnProtocols(Uint8List protocols, bool isServer)
native "SecurityContext_SetAlpnProtocols";
void _trustBuiltinRoots() native "SecurityContext_TrustBuiltinRoots";
}
class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject {
// Bit flags used when communicating between the eventhandler and
// dart code. The EVENT flags are used to indicate events of
// interest when sending a message from dart code to the
// eventhandler. When receiving a message from the eventhandler the
// EVENT flags indicate the events that actually happened. The
// COMMAND flags are used to send commands from dart to the
// eventhandler. COMMAND flags are never received from the
// eventhandler. Additional flags are used to communicate other
// information.
static const int READ_EVENT = 0;
static const int WRITE_EVENT = 1;
static const int ERROR_EVENT = 2;
static const int CLOSED_EVENT = 3;
static const int DESTROYED_EVENT = 4;
static const int FIRST_EVENT = READ_EVENT;
static const int LAST_EVENT = DESTROYED_EVENT;
static const int EVENT_COUNT = LAST_EVENT - FIRST_EVENT + 1;
static const int CLOSE_COMMAND = 8;
static const int SHUTDOWN_READ_COMMAND = 9;
static const int SHUTDOWN_WRITE_COMMAND = 10;
// The lower bits of RETURN_TOKEN_COMMAND messages contains the number
// of tokens returned.
static const int RETURN_TOKEN_COMMAND = 11;
static const int SET_EVENT_MASK_COMMAND = 12;
static const int FIRST_COMMAND = CLOSE_COMMAND;
static const int LAST_COMMAND = SET_EVENT_MASK_COMMAND;
// Type flag send to the eventhandler providing additional
// information on the type of the file descriptor.
static const int LISTENING_SOCKET = 16;
static const int PIPE_SOCKET = 17;
static const int TYPE_NORMAL_SOCKET = 0;
static const int TYPE_LISTENING_SOCKET = 1 << LISTENING_SOCKET;
static const int TYPE_PIPE = 1 << PIPE_SOCKET;
static const int TYPE_TYPE_MASK = TYPE_LISTENING_SOCKET | PIPE_SOCKET;
// Protocol flags.
static const int TCP_SOCKET = 18;
static const int UDP_SOCKET = 19;
static const int INTERNAL_SOCKET = 20;
static const int TYPE_TCP_SOCKET = 1 << TCP_SOCKET;
static const int TYPE_UDP_SOCKET = 1 << UDP_SOCKET;
static const int TYPE_INTERNAL_SOCKET = 1 << INTERNAL_SOCKET;
static const int TYPE_PROTOCOL_MASK =
TYPE_TCP_SOCKET | TYPE_UDP_SOCKET | TYPE_INTERNAL_SOCKET;
// Native port messages.
static const HOST_NAME_LOOKUP = 0;
static const LIST_INTERFACES = 1;
static const REVERSE_LOOKUP = 2;
// Protocol flags.
static const int PROTOCOL_IPV4 = 1 << 0;
static const int PROTOCOL_IPV6 = 1 << 1;
static const int NORMAL_TOKEN_BATCH_SIZE = 8;
static const int LISTENING_TOKEN_BATCH_SIZE = 2;
static const Duration _RETRY_DURATION = const Duration(milliseconds: 250);
static const Duration _RETRY_DURATION_LOOPBACK =
const Duration(milliseconds: 25);
// Socket close state
bool isClosed = false;
bool isClosing = false;
bool isClosedRead = false;
bool closedReadEventSent = false;
bool isClosedWrite = false;
Completer closeCompleter = new Completer.sync();
// Handlers and receive port for socket events from the event handler.
final List eventHandlers = new List(EVENT_COUNT + 1);
RawReceivePort eventPort;
bool flagsSent = false;
// The type flags for this socket.
final int typeFlags;
// Holds the port of the socket, 0 if not known.
int localPort = 0;
// Holds the address used to connect or bind the socket.
InternetAddress localAddress;
int available = 0;
int tokens = 0;
bool sendReadEvents = false;
bool readEventIssued = false;
bool sendWriteEvents = false;
bool writeEventIssued = false;
bool writeAvailable = false;
static bool connectedResourceHandler = false;
_ReadWriteResourceInfo resourceInfo;
// The owner object is the object that the Socket is being used by, e.g.
// a HttpServer, a WebSocket connection, a process pipe, etc.
Object owner;
static Future<List<InternetAddress>> lookup(String host,
{InternetAddressType type: InternetAddressType.ANY}) {
return _IOService
._dispatch(_SOCKET_LOOKUP, [host, type._value]).then((response) {
if (isErrorResponse(response)) {
throw createError(response, "Failed host lookup: '$host'");
} else {
return response.skip(1).map((result) {
var type = new InternetAddressType._from(result[0]);
return new _InternetAddress(result[1], host, result[2]);
}).toList();
}
});
}
static Future<InternetAddress> reverseLookup(InternetAddress addr) {
return _IOService
._dispatch(_SOCKET_REVERSE_LOOKUP, [addr._in_addr]).then((response) {
if (isErrorResponse(response)) {
throw createError(response, "Failed reverse host lookup", addr);
} else {
return addr._cloneWithNewHost(response);
}
});
}
static Future<List<NetworkInterface>> listInterfaces(
{bool includeLoopback: false,
bool includeLinkLocal: false,
InternetAddressType type: InternetAddressType.ANY}) {
return _IOService
._dispatch(_SOCKET_LIST_INTERFACES, [type._value]).then((response) {
if (isErrorResponse(response)) {
throw createError(response, "Failed listing interfaces");
} else {
var map = response.skip(1).fold(new Map<String, NetworkInterface>(),
(map, result) {
var type = new InternetAddressType._from(result[0]);
var name = result[3];
var index = result[4];
var address = new _InternetAddress(result[1], "", result[2]);
if (!includeLinkLocal && address.isLinkLocal) return map;
if (!includeLoopback && address.isLoopback) return map;
map.putIfAbsent(name, () => new _NetworkInterface(name, index));
map[name].addresses.add(address);
return map;
});
return map.values.toList();
}
});
}
static Future<_NativeSocket> connect(host, int port, sourceAddress) {
_throwOnBadPort(port);
if (sourceAddress != null && sourceAddress is! _InternetAddress) {
if (sourceAddress is String) {
sourceAddress = new InternetAddress(sourceAddress);
}
}
return new Future.value(host).then((host) {
if (host is _InternetAddress) return [host];
return lookup(host).then((addresses) {
if (addresses.isEmpty) {
throw createError(null, "Failed host lookup: '$host'");
}
return addresses;
});
}).then((addresses) {
assert(addresses is List);
var completer = new Completer();
var it = addresses.iterator;
var error = null;
var connecting = new HashMap();
void connectNext() {
if (!it.moveNext()) {
if (connecting.isEmpty) {
assert(error != null);
completer.completeError(error);
}
return;
}
var address = it.current;
var socket = new _NativeSocket.normal();
socket.localAddress = address;
var result;
if (sourceAddress == null) {
result = socket.nativeCreateConnect(address._in_addr, port);
} else {
assert(sourceAddress is _InternetAddress);
result = socket.nativeCreateBindConnect(
address._in_addr, port, sourceAddress._in_addr);
}
if (result is OSError) {
// Keep first error, if present.
if (error == null) {
int errorCode = result.errorCode;
if (errorCode != null && socket.isBindError(errorCode)) {
error = createError(result, "Bind failed", sourceAddress);
} else {
error = createError(result, "Connection failed", address, port);
}
}
connectNext();
} else {
// Query the local port, for error messages.
try {
socket.port;
} catch (e) {
error = createError(e, "Connection failed", address, port);
connectNext();
}
// Set up timer for when we should retry the next address
// (if any).
var duration =
address.isLoopback ? _RETRY_DURATION_LOOPBACK : _RETRY_DURATION;
var timer = new Timer(duration, connectNext);
setupResourceInfo(socket);
connecting[socket] = timer;
// Setup handlers for receiving the first write event which
// indicate that the socket is fully connected.
socket.setHandlers(write: () {
timer.cancel();
socket.setListening(read: false, write: false);
completer.complete(socket);
connecting.remove(socket);
connecting.forEach((s, t) {
t.cancel();
s.close();
s.setHandlers();
s.setListening(read: false, write: false);
});
}, error: (e) {
timer.cancel();
socket.close();
// Keep first error, if present.
if (error == null) error = e;
connecting.remove(socket);
if (connecting.isEmpty) connectNext();
});
socket.setListening(read: false, write: true);
}
}
connectNext();
return completer.future;
});
}
static Future<_NativeSocket> bind(
host, int port, int backlog, bool v6Only, bool shared) {
_throwOnBadPort(port);
return new Future.value(host).then((host) {
if (host is _InternetAddress) return host;
return lookup(host).then((list) {
if (list.length == 0) {
throw createError(null, "Failed host lookup: '$host'");
}
return list[0];
});
}).then((address) {
var socket = new _NativeSocket.listen();
socket.localAddress = address;
var result = socket.nativeCreateBindListen(
address._in_addr, port, backlog, v6Only, shared);
if (result is OSError) {
throw new SocketException("Failed to create server socket",
osError: result, address: address, port: port);
}
if (port != 0) socket.localPort = port;
setupResourceInfo(socket);
socket.connectToEventHandler();
return socket;
});
}
static void setupResourceInfo(_NativeSocket socket) {
socket.resourceInfo = new _SocketResourceInfo(socket);
}
static Future<_NativeSocket> bindDatagram(host, int port, bool reuseAddress) {
_throwOnBadPort(port);
return new Future.value(host).then((host) {
if (host is _InternetAddress) return host;
return lookup(host).then((list) {
if (list.length == 0) {
throw createError(null, "Failed host lookup: '$host'");
}
return list[0];
});
}).then((address) {
var socket = new _NativeSocket.datagram(address);
var result =
socket.nativeCreateBindDatagram(address._in_addr, port, reuseAddress);
if (result is OSError) {
throw new SocketException("Failed to create datagram socket",
osError: result, address: address, port: port);
}
if (port != 0) socket.localPort = port;
setupResourceInfo(socket);
return socket;
});
}
_NativeSocket.datagram(this.localAddress)
: typeFlags = TYPE_NORMAL_SOCKET | TYPE_UDP_SOCKET;
_NativeSocket.normal() : typeFlags = TYPE_NORMAL_SOCKET | TYPE_TCP_SOCKET;
_NativeSocket.listen() : typeFlags = TYPE_LISTENING_SOCKET | TYPE_TCP_SOCKET {
isClosedWrite = true;
}
_NativeSocket.pipe() : typeFlags = TYPE_PIPE;
_NativeSocket.watch(int id)
: typeFlags = TYPE_NORMAL_SOCKET | TYPE_INTERNAL_SOCKET {
isClosedWrite = true;
nativeSetSocketId(id);
}
bool get isListening => (typeFlags & TYPE_LISTENING_SOCKET) != 0;
bool get isPipe => (typeFlags & TYPE_PIPE) != 0;
bool get isInternal => (typeFlags & TYPE_INTERNAL_SOCKET) != 0;
bool get isTcp => (typeFlags & TYPE_TCP_SOCKET) != 0;
bool get isUdp => (typeFlags & TYPE_UDP_SOCKET) != 0;
List<int> read(int len) {
if (len != null && len <= 0) {
throw new ArgumentError("Illegal length $len");
}
if (isClosing || isClosed) return null;
len = min(available, len == null ? available : len);
if (len == 0) return null;
var result = nativeRead(len);
if (result is OSError) {
reportError(result, "Read failed");
return null;
}
if (result != null) {
available -= result.length;
// TODO(ricow): Remove when we track internal and pipe uses.
assert(resourceInfo != null || isPipe || isInternal);
if (resourceInfo != null) {
resourceInfo.totalRead += result.length;
}
}
// TODO(ricow): Remove when we track internal and pipe uses.
assert(resourceInfo != null || isPipe || isInternal);
if (resourceInfo != null) {
resourceInfo.didRead();
}
return result;
}
Datagram receive() {
if (isClosing || isClosed) return null;
var result = nativeRecvFrom();
if (result is OSError) {
reportError(result, "Receive failed");
return null;
}
if (result != null) {
// Read the next available. Available is only for the next datagram, not
// the sum of all datagrams pending, so we need to call after each
// receive. If available becomes > 0, the _NativeSocket will continue to
// emit read events.
available = nativeAvailable();
// TODO(ricow): Remove when we track internal and pipe uses.
assert(resourceInfo != null || isPipe || isInternal);
if (resourceInfo != null) {
resourceInfo.totalRead += result.data.length;
}
}
// TODO(ricow): Remove when we track internal and pipe uses.
assert(resourceInfo != null || isPipe || isInternal);
if (resourceInfo != null) {
resourceInfo.didRead();
}
return result;
}
int write(List<int> buffer, int offset, int bytes) {
if (buffer is! List) throw new ArgumentError();
if (offset == null) offset = 0;
if (bytes == null) {
if (offset > buffer.length) {
throw new RangeError.value(offset);
}
bytes = buffer.length - offset;
}
if (offset < 0) throw new RangeError.value(offset);
if (bytes < 0) throw new RangeError.value(bytes);
if ((offset + bytes) > buffer.length) {
throw new RangeError.value(offset + bytes);
}
if (offset is! int || bytes is! int) {
throw new ArgumentError("Invalid arguments to write on Socket");
}
if (isClosing || isClosed) return 0;
if (bytes == 0) return 0;
_BufferAndStart bufferAndStart =
_ensureFastAndSerializableByteData(buffer, offset, offset + bytes);
var result =
nativeWrite(bufferAndStart.buffer, bufferAndStart.start, bytes);
if (result is OSError) {
OSError osError = result;
scheduleMicrotask(() => reportError(osError, "Write failed"));
result = 0;
}
// The result may be negative, if we forced a short write for testing
// purpose. In such case, don't mark writeAvailable as false, as we don't
// know if we'll receive an event. It's better to just retry.
if (result >= 0 && result < bytes) {
writeAvailable = false;
}
// Negate the result, as stated above.
if (result < 0) result = -result;
// TODO(ricow): Remove when we track internal and pipe uses.
assert(resourceInfo != null || isPipe || isInternal);
if (resourceInfo != null) {
resourceInfo.addWrite(result);
}
return result;
}
int send(List<int> buffer, int offset, int bytes, InternetAddress address,
int port) {
_throwOnBadPort(port);
if (isClosing || isClosed) return 0;
_BufferAndStart bufferAndStart =
_ensureFastAndSerializableByteData(buffer, offset, bytes);
var result = nativeSendTo(bufferAndStart.buffer, bufferAndStart.start,
bytes, address._in_addr, port);
if (result is OSError) {
OSError osError = result;
scheduleMicrotask(() => reportError(osError, "Send failed"));
result = 0;
}
// TODO(ricow): Remove when we track internal and pipe uses.
assert(resourceInfo != null || isPipe || isInternal);
if (resourceInfo != null) {
resourceInfo.addWrite(result);
}
return result;
}
_NativeSocket accept() {
// Don't issue accept if we're closing.
if (isClosing || isClosed) return null;
assert(available > 0);
available--;
tokens++;
returnTokens(LISTENING_TOKEN_BATCH_SIZE);
var socket = new _NativeSocket.normal();
if (nativeAccept(socket) != true) return null;
socket.localPort = localPort;
socket.localAddress = address;
setupResourceInfo(socket);
// TODO(ricow): Remove when we track internal and pipe uses.
assert(resourceInfo != null || isPipe || isInternal);
if (resourceInfo != null) {
// We track this as read one byte.
resourceInfo.addRead(1);
}
return socket;
}
int get port {
if (localPort != 0) return localPort;
if (isClosing || isClosed) throw const SocketException.closed();
var result = nativeGetPort();
if (result is OSError) throw result;
return localPort = result;
}
int get remotePort {
if (isClosing || isClosed) throw const SocketException.closed();
var result = nativeGetRemotePeer();
if (result is OSError) throw result;
return result[1];
}
InternetAddress get address => localAddress;
InternetAddress get remoteAddress {
if (isClosing || isClosed) throw const SocketException.closed();
var result = nativeGetRemotePeer();
if (result is OSError) throw result;
var addr = result[0];
var type = new InternetAddressType._from(addr[0]);
return new _InternetAddress(addr[1], null, addr[2]);
}
void issueReadEvent() {
if (closedReadEventSent) return;
if (readEventIssued) return;
readEventIssued = true;
void issue() {
readEventIssued = false;
if (isClosing) return;
if (!sendReadEvents) return;
if (available == 0) {
if (isClosedRead && !closedReadEventSent) {
if (isClosedWrite) close();
var handler = eventHandlers[CLOSED_EVENT];
if (handler == null) return;
closedReadEventSent = true;
handler();
}
return;
}
var handler = eventHandlers[READ_EVENT];
if (handler == null) return;
readEventIssued = true;
handler();
scheduleMicrotask(issue);
}
scheduleMicrotask(issue);
}
void issueWriteEvent({bool delayed: true}) {
if (writeEventIssued) return;
if (!writeAvailable) return;
void issue() {
writeEventIssued = false;
if (!writeAvailable) return;
if (isClosing) return;
if (!sendWriteEvents) return;
sendWriteEvents = false;
var handler = eventHandlers[WRITE_EVENT];
if (handler == null) return;
handler();
}
if (delayed) {
writeEventIssued = true;
scheduleMicrotask(issue);
} else {
issue();
}
}
// Multiplexes socket events to the socket handlers.
void multiplex(int events) {
for (int i = FIRST_EVENT; i <= LAST_EVENT; i++) {
if (((events & (1 << i)) != 0)) {
if ((i == CLOSED_EVENT || i == READ_EVENT) && isClosedRead) continue;
if (isClosing && i != DESTROYED_EVENT) continue;
if (i == CLOSED_EVENT && !isListening && !isClosing && !isClosed) {
isClosedRead = true;
issueReadEvent();
continue;
}
if (i == WRITE_EVENT) {
writeAvailable = true;
issueWriteEvent(delayed: false);
continue;
}
if (i == READ_EVENT) {
if (isListening) {
available++;
} else {
available = nativeAvailable();
issueReadEvent();
continue;
}
}
var handler = eventHandlers[i];
if (i == DESTROYED_EVENT) {
assert(isClosing);
assert(!isClosed);
// TODO(ricow): Remove/update when we track internal and pipe uses.
assert(resourceInfo != null || isPipe || isInternal);
if (resourceInfo != null) {
_SocketResourceInfo.SocketClosed(resourceInfo);
}
isClosed = true;
closeCompleter.complete();
disconnectFromEventHandler();
if (handler != null) handler();
continue;
}
if (i == ERROR_EVENT) {
if (!isClosing) {
reportError(nativeGetError(), "");
}
} else if (!isClosed) {
// If the connection is closed right after it's accepted, there's a
// chance the close-handler is not set.
if (handler != null) handler();
}
}
}
if (!isListening) {
tokens++;
returnTokens(NORMAL_TOKEN_BATCH_SIZE);
}
}
void returnTokens(int tokenBatchSize) {
if (!isClosing && !isClosed) {
assert(eventPort != null);
// Return in batches.
if (tokens == tokenBatchSize) {
assert(tokens < (1 << FIRST_COMMAND));
sendToEventHandler((1 << RETURN_TOKEN_COMMAND) | tokens);
tokens = 0;
}
}
}
void setHandlers({read, write, error, closed, destroyed}) {
eventHandlers[READ_EVENT] = read;
eventHandlers[WRITE_EVENT] = write;
eventHandlers[ERROR_EVENT] = error;
eventHandlers[CLOSED_EVENT] = closed;
eventHandlers[DESTROYED_EVENT] = destroyed;
}
void setListening({read: true, write: true}) {
sendReadEvents = read;
sendWriteEvents = write;
if (read) issueReadEvent();
if (write) issueWriteEvent();
if (!flagsSent && !isClosing) {
flagsSent = true;
int flags = 1 << SET_EVENT_MASK_COMMAND;
if (!isClosedRead) flags |= 1 << READ_EVENT;
if (!isClosedWrite) flags |= 1 << WRITE_EVENT;
sendToEventHandler(flags);
}
}
Future close() {
if (!isClosing && !isClosed) {
sendToEventHandler(1 << CLOSE_COMMAND);
isClosing = true;
}
return closeCompleter.future;
}
void shutdown(SocketDirection direction) {
if (!isClosing && !isClosed) {
switch (direction) {
case SocketDirection.RECEIVE:
shutdownRead();
break;
case SocketDirection.SEND:
shutdownWrite();
break;
case SocketDirection.BOTH:
close();
break;
default:
throw new ArgumentError(direction);
}
}
}
void shutdownWrite() {
if (!isClosing && !isClosed) {
if (closedReadEventSent) {
close();
} else {
sendToEventHandler(1 << SHUTDOWN_WRITE_COMMAND);
}
isClosedWrite = true;
}
}
void shutdownRead() {
if (!isClosing && !isClosed) {
if (isClosedWrite) {
close();
} else {
sendToEventHandler(1 << SHUTDOWN_READ_COMMAND);
}
isClosedRead = true;
}
}
void sendToEventHandler(int data) {
int fullData = (typeFlags & TYPE_TYPE_MASK) | data;
assert(!isClosing);
connectToEventHandler();
_EventHandler._sendData(this, eventPort.sendPort, fullData);
}
void connectToEventHandler() {
assert(!isClosed);
if (eventPort == null) {
eventPort = new RawReceivePort(multiplex);
}
if (!connectedResourceHandler) {
registerExtension(
'ext.dart.io.getOpenSockets', _SocketResourceInfo.getOpenSockets);
registerExtension('ext.dart.io.getSocketByID',
_SocketResourceInfo.getSocketInfoMapByID);
connectedResourceHandler = true;
}
}
void disconnectFromEventHandler() {
assert(eventPort != null);
eventPort.close();
eventPort = null;
// Now that we don't track this Socket anymore, we can clear the owner
// field.
owner = null;
}
// Check whether this is an error response from a native port call.
static bool isErrorResponse(response) {
return response is List && response[0] != _SUCCESS_RESPONSE;
}
// Create the appropriate error/exception from different returned
// error objects.
static createError(error, String message,