// Copyright (c) 2013, the Dart project authors.  Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

part of dart.io;

const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";

class _WebSocketMessageType {
  static const int NONE = 0;
  static const int BINARY = 1;
  static const int TEXT = 2;
}


class _WebSocketOpcode {
  static const int CONTINUATION = 0;
  static const int TEXT = 1;
  static const int BINARY = 2;
  static const int RESERVED_3 = 3;
  static const int RESERVED_4 = 4;
  static const int RESERVED_5 = 5;
  static const int RESERVED_6 = 6;
  static const int RESERVED_7 = 7;
  static const int CLOSE = 8;
  static const int PING = 9;
  static const int PONG = 10;
  static const int RESERVED_B = 11;
  static const int RESERVED_C = 12;
  static const int RESERVED_D = 13;
  static const int RESERVED_E = 14;
  static const int RESERVED_F = 15;
}

/**
 * The web socket protocol transformer handles the protocol byte stream
 * which is supplied through the [:handleData:]. As the protocol is processed,
 * it'll output frame data as either a List<int> or String.
 *
 * Important infomation about usage: Be sure you use cancelOnError, so the
 * socket will be closed when the processer encounter an error. Not using it
 * will lead to undefined behaviour.
 */
class _WebSocketProtocolTransformer extends StreamEventTransformer {
  static const int START = 0;
  static const int LEN_FIRST = 1;
  static const int LEN_REST = 2;
  static const int MASK = 3;
  static const int PAYLOAD = 4;
  static const int CLOSED = 5;
  static const int FAILURE = 6;

  bool _serverSide;

  _WebSocketProtocolTransformer([bool this._serverSide = false]) {
    _prepareForNextFrame();
    _currentMessageType = _WebSocketMessageType.NONE;
  }

  /**
   * Process data received from the underlying communication channel.
   */
  void handleData(Uint8List buffer, EventSink sink) {
    int count = buffer.length;
    int index = 0;
    int lastIndex = count;
    try {
      if (_state == CLOSED) {
        throw new WebSocketException("Data on closed connection");
      }
      if (_state == FAILURE) {
        throw new WebSocketException("Data on failed connection");
      }
      while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) {
        int byte = buffer[index];
        switch (_state) {
          case START:
            _fin = (byte & 0x80) != 0;
            if ((byte & 0x70) != 0) {
              // The RSV1, RSV2 bits RSV3 most be all zero.
              throw new WebSocketException("Protocol error");
            }
            _opcode = (byte & 0xF);
            switch (_opcode) {
            case _WebSocketOpcode.CONTINUATION:
              if (_currentMessageType == _WebSocketMessageType.NONE) {
                throw new WebSocketException("Protocol error");
              }
              break;

            case _WebSocketOpcode.TEXT:
              if (_currentMessageType != _WebSocketMessageType.NONE) {
                throw new WebSocketException("Protocol error");
              }
              _currentMessageType = _WebSocketMessageType.TEXT;
              _controller = new StreamController(sync: true);
              _controller.stream
                  .transform(new Utf8DecoderTransformer(null))
                  .fold(new StringBuffer(), (buffer, str) => buffer..write(str))
                  .then((buffer) {
                    sink.add(buffer.toString());
                  }, onError: (error) {
                    sink.addError(error);
                  });
              break;

            case _WebSocketOpcode.BINARY:
              if (_currentMessageType != _WebSocketMessageType.NONE) {
                throw new WebSocketException("Protocol error");
              }
              _currentMessageType = _WebSocketMessageType.BINARY;
              _controller = new StreamController(sync: true);
              _controller.stream
                  .fold(new BytesBuilder(), (buffer, data) => buffer..add(data))
                  .then((buffer) {
                    sink.add(buffer.takeBytes());
                  }, onError: (error) {
                    sink.addError(error);
                  });
              break;

            case _WebSocketOpcode.CLOSE:
            case _WebSocketOpcode.PING:
            case _WebSocketOpcode.PONG:
              // Control frames cannot be fragmented.
              if (!_fin) throw new WebSocketException("Protocol error");
              break;

            default:
              throw new WebSocketException("Protocol error");
            }
            _state = LEN_FIRST;
            break;

          case LEN_FIRST:
            _masked = (byte & 0x80) != 0;
            _len = byte & 0x7F;
            if (_isControlFrame() && _len > 125) {
              throw new WebSocketException("Protocol error");
            }
            if (_len < 126) {
              _lengthDone(sink);
            } else if (_len == 126) {
              _len = 0;
              _remainingLenBytes = 2;
              _state = LEN_REST;
            } else if (_len == 127) {
              _len = 0;
              _remainingLenBytes = 8;
              _state = LEN_REST;
            }
            break;

          case LEN_REST:
            _len = _len << 8 | byte;
            _remainingLenBytes--;
            if (_remainingLenBytes == 0) {
              _lengthDone(sink);
            }
            break;

          case MASK:
            _maskingKey = _maskingKey << 8 | byte;
            _remainingMaskingKeyBytes--;
            if (_remainingMaskingKeyBytes == 0) {
              _maskDone(sink);
            }
            break;

          case PAYLOAD:
            // The payload is not handled one byte at a time but in blocks.
            int payload;
            if (lastIndex - index <= _remainingPayloadBytes) {
              payload = lastIndex - index;
            } else {
              payload = _remainingPayloadBytes;
            }
            _remainingPayloadBytes -= payload;

            // Unmask payload if masked.
            if (_masked) {
              for (int i = 0; i < payload; i++) {
                int maskingByte =
                    ((_maskingKey >> ((3 - _unmaskingIndex) * 8)) & 0xFF);
                buffer[index + i] = buffer[index + i] ^ maskingByte;
                _unmaskingIndex = (_unmaskingIndex + 1) % 4;
              }
            }

            if (_isControlFrame()) {
              if (payload > 0) {
                // Allocate a buffer for collecting the control frame
                // payload if any.
                if (_controlPayload == null) {
                  _controlPayload = new List<int>();
                }
                _controlPayload.addAll(buffer.sublist(index, index + payload));
                index += payload;
              }

              if (_remainingPayloadBytes == 0) {
                _controlFrameEnd(sink);
              }
            } else {
              if (_currentMessageType != _WebSocketMessageType.TEXT &&
                  _currentMessageType != _WebSocketMessageType.BINARY) {
                  throw new WebSocketException("Protocol error");
              }
              _controller.add(
                  new Uint8List.view(buffer.buffer, index, payload));
              index += payload;
              if (_remainingPayloadBytes == 0) {
                _messageFrameEnd(sink);
              }
            }

            // Hack - as we always do index++ below.
            index--;
            break;
        }

        // Move to the next byte.
        index++;
      }
    } catch (e) {
      _state = FAILURE;
      sink.addError(e);
    }
  }

  void _lengthDone(EventSink sink) {
    if (_masked) {
      if (!_serverSide) {
        throw new WebSocketException("Received masked frame from server");
      }
      _state = MASK;
      _remainingMaskingKeyBytes = 4;
    } else {
      if (_serverSide) {
        throw new WebSocketException("Received unmasked frame from client");
      }
      _remainingPayloadBytes = _len;
      _startPayload(sink);
    }
  }

  void _maskDone(EventSink sink) {
    _remainingPayloadBytes = _len;
    _startPayload(sink);
  }

  void _startPayload(EventSink sink) {
    // If there is no actual payload perform perform callbacks without
    // going through the PAYLOAD state.
    if (_remainingPayloadBytes == 0) {
      if (_isControlFrame()) {
        switch (_opcode) {
          case _WebSocketOpcode.CLOSE:
            _state = CLOSED;
            sink.close();
            break;
          case _WebSocketOpcode.PING:
            sink.add(new _WebSocketPing());
            break;
          case _WebSocketOpcode.PONG:
            sink.add(new _WebSocketPong());
            break;
        }
        _prepareForNextFrame();
      } else {
        _messageFrameEnd(sink);
      }
    } else {
      _state = PAYLOAD;
    }
  }

  void _messageFrameEnd(EventSink sink) {
    if (_fin) {
      switch (_currentMessageType) {
        case _WebSocketMessageType.TEXT:
          _controller.close();
          break;
        case _WebSocketMessageType.BINARY:
          _controller.close();
          break;
      }
      _controller = null;
      _currentMessageType = _WebSocketMessageType.NONE;
    }
    _prepareForNextFrame();
  }

  void _controlFrameEnd(EventSink sink) {
    switch (_opcode) {
      case _WebSocketOpcode.CLOSE:
        closeCode = WebSocketStatus.NO_STATUS_RECEIVED;
        if (_controlPayload.length > 0) {
          if (_controlPayload.length == 1) {
            throw new WebSocketException("Protocol error");
          }
          closeCode = _controlPayload[0] << 8 | _controlPayload[1];
          if (closeCode == WebSocketStatus.NO_STATUS_RECEIVED) {
            throw new WebSocketException("Protocol error");
          }
          if (_controlPayload.length > 2) {
            closeReason = _decodeUtf8Strict(_controlPayload.sublist(2));
          }
        }
        _state = CLOSED;
        sink.close();
        break;

      case _WebSocketOpcode.PING:
        sink.add(new _WebSocketPing(_controlPayload));
        break;

      case _WebSocketOpcode.PONG:
        sink.add(new _WebSocketPong(_controlPayload));
        break;
    }
    _prepareForNextFrame();
  }

  bool _isControlFrame() {
    return _opcode == _WebSocketOpcode.CLOSE ||
           _opcode == _WebSocketOpcode.PING ||
           _opcode == _WebSocketOpcode.PONG;
  }

  void _prepareForNextFrame() {
    if (_state != CLOSED && _state != FAILURE) _state = START;
    _fin = null;
    _opcode = null;
    _len = null;
    _masked = null;
    _maskingKey = 0;
    _remainingLenBytes = null;
    _remainingMaskingKeyBytes = null;
    _remainingPayloadBytes = null;
    _unmaskingIndex = 0;
    _controlPayload = null;
  }

  int _state;
  bool _fin;
  int _opcode;
  int _len;
  bool _masked;
  int _maskingKey;
  int _remainingLenBytes;
  int _remainingMaskingKeyBytes;
  int _remainingPayloadBytes;
  int _unmaskingIndex;

  int _currentMessageType;
  List<int> _controlPayload;
  StreamController _controller;

  int closeCode = WebSocketStatus.NO_STATUS_RECEIVED;
  String closeReason = "";
}


class _WebSocketPing {
  final List<int> payload;
  _WebSocketPing([this.payload = null]);
}


class _WebSocketPong {
  final List<int> payload;
  _WebSocketPong([this.payload = null]);
}


class _WebSocketTransformerImpl implements WebSocketTransformer {
  final StreamController<WebSocket> _controller =
      new StreamController<WebSocket>(sync: true);

  Stream<WebSocket> bind(Stream<HttpRequest> stream) {
    stream.listen((request) {
        _upgrade(request)
            .then((WebSocket webSocket) => _controller.add(webSocket))
            .catchError((error) => _controller.addError(error));
    });

    return _controller.stream;
  }

  static Future<WebSocket> _upgrade(HttpRequest request) {
    var response = request.response;
    if (!_isUpgradeRequest(request)) {
      // Send error response and drain the request.
      request.listen((_) {}, onDone: () {
        response.statusCode = HttpStatus.BAD_REQUEST;
        response.contentLength = 0;
        response.close();
      });
      return new Future.error(
          new WebSocketException("Invalid WebSocket upgrade request"));
    }

    // Send the upgrade response.
    response.statusCode = HttpStatus.SWITCHING_PROTOCOLS;
    response.headers.add(HttpHeaders.CONNECTION, "Upgrade");
    response.headers.add(HttpHeaders.UPGRADE, "websocket");
    String key = request.headers.value("Sec-WebSocket-Key");
    _SHA1 sha1 = new _SHA1();
    sha1.add("$key$_webSocketGUID".codeUnits);
    String accept = _CryptoUtils.bytesToBase64(sha1.close());
    response.headers.add("Sec-WebSocket-Accept", accept);
    response.headers.contentLength = 0;
    return response.detachSocket()
        .then((socket) => new _WebSocketImpl._fromSocket(socket, true));
  }

  static bool _isUpgradeRequest(HttpRequest request) {
    if (request.method != "GET") {
      return false;
    }
    if (request.headers[HttpHeaders.CONNECTION] == null) {
      return false;
    }
    bool isUpgrade = false;
    request.headers[HttpHeaders.CONNECTION].forEach((String value) {
      if (value.toLowerCase() == "upgrade") isUpgrade = true;
    });
    if (!isUpgrade) return false;
    String upgrade = request.headers.value(HttpHeaders.UPGRADE);
    if (upgrade == null || upgrade.toLowerCase() != "websocket") {
      return false;
    }
    String version = request.headers.value("Sec-WebSocket-Version");
    if (version == null || version != "13") {
      return false;
    }
    String key = request.headers.value("Sec-WebSocket-Key");
    if (key == null) {
      return false;
    }
    return true;
  }
}


class _WebSocketOutgoingTransformer extends StreamEventTransformer {
  final _WebSocketImpl webSocket;

  _WebSocketOutgoingTransformer(_WebSocketImpl this.webSocket);

  void handleData(message, EventSink<List<int>> sink) {
    if (message is _WebSocketPong) {
      addFrame(_WebSocketOpcode.PONG, message.payload, sink);
      return;
    }
    if (message is _WebSocketPing) {
      addFrame(_WebSocketOpcode.PING, message.payload, sink);
      return;
    }
    List<int> data;
    int opcode;
    if (message != null) {
      if (message is String) {
        opcode = _WebSocketOpcode.TEXT;
        data = _encodeString(message);
      } else {
        if (message is !List<int>) {
          throw new ArgumentError(message);
        }
        opcode = _WebSocketOpcode.BINARY;
        data = message;
      }
    } else {
      opcode = _WebSocketOpcode.TEXT;
    }
    addFrame(opcode, data, sink);
  }

  void handleDone(EventSink<List<int>> sink) {
    int code = webSocket._outCloseCode;
    String reason = webSocket._outCloseReason;
    List<int> data;
    if (code != null) {
      data = new List<int>();
      data.add((code >> 8) & 0xFF);
      data.add(code & 0xFF);
      if (reason != null) {
        data.addAll(_encodeString(reason));
      }
    }
    addFrame(_WebSocketOpcode.CLOSE, data, sink);
    sink.close();
  }

  void addFrame(int opcode, List<int> data, EventSink<List<int>> sink) {
    createFrame(opcode, data, webSocket._serverSide).forEach(sink.add);
  }

  static Iterable createFrame(int opcode, List<int> data, bool serverSide) {
    bool mask = !serverSide;  // Masking not implemented for server.
    int dataLength = data == null ? 0 : data.length;
    // Determine the header size.
    int headerSize = (mask) ? 6 : 2;
    if (dataLength > 65535) {
      headerSize += 8;
    } else if (dataLength > 125) {
      headerSize += 2;
    }
    List<int> header = new List<int>(headerSize);
    int index = 0;
    // Set FIN and opcode.
    header[index++] = 0x80 | opcode;
    // Determine size and position of length field.
    int lengthBytes = 1;
    int firstLengthByte = 1;
    if (dataLength > 65535) {
      header[index++] = 127;
      lengthBytes = 8;
    } else if (dataLength > 125) {
      header[index++] = 126;
      lengthBytes = 2;
    }
    // Write the length in network byte order into the header.
    for (int i = 0; i < lengthBytes; i++) {
      header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF;
    }
    if (mask) {
      header[1] |= 1 << 7;
      var maskBytes = _IOCrypto.getRandomBytes(4);
      header.setRange(index, index + 4, maskBytes);
      index += 4;
      if (data != null) {
        var list;
        // If this is a text message just do the masking inside the
        // encoded data.
        if (opcode == _WebSocketOpcode.TEXT) {
          list = data;
        } else {
          list = new Uint8List(data.length);
        }
        if (data is Uint8List) {
          for (int i = 0; i < data.length; i++) {
            list[i] = data[i] ^ maskBytes[i % 4];
          }
        } else {
          for (int i = 0; i < data.length; i++) {
            if (data[i] < 0 || 255 < data[i]) {
              throw new ArgumentError(
                  "List element is not a byte value "
                  "(value ${data[i]} at index $i)");
            }
            list[i] = data[i] ^ maskBytes[i % 4];
          }
        }
        data = list;
      }
    }
    assert(index == headerSize);
    if (data == null) {
      return [header];
    } else {
      return [header, data];
    }
  }
}


class _WebSocketConsumer implements StreamConsumer {
  final _WebSocketImpl webSocket;
  final Socket socket;
  StreamController _controller;
  StreamSubscription _subscription;
  bool _issuedPause = false;
  // Only report error if the last message was a user-provided message and not a
  // ping or pong message.
  bool _reportError = false;
  Completer _closeCompleter = new Completer();
  Completer _completer;

  _WebSocketConsumer(_WebSocketImpl this.webSocket, Socket this.socket);

  void _onListen() {
    if (_subscription != null) {
      _subscription.cancel();
    }
  }

  void _onPause() {
    if (_subscription != null) {
      _subscription.pause();
    } else {
      _issuedPause = true;
    }
  }

  void _onResume() {
    if (_subscription != null) {
      _subscription.resume();
    } else {
      _issuedPause = false;
    }
  }

  _ensureController() {
    if (_controller != null) return;
    _controller = new StreamController(sync: true,
                                       onPause: _onPause,
                                       onResume: _onResume,
                                       onCancel: _onListen);
    var stream = _controller.stream.transform(
        new _WebSocketOutgoingTransformer(webSocket));
    socket.addStream(stream)
        .then((_) {
                _done();
                _closeCompleter.complete(webSocket);
              },
              onError: (error) {
                if (_reportError) {
                  if (!_done(error)) {
                    _closeCompleter.completeError(error);
                  }
                } else {
                  _done();
                  _closeCompleter.complete(webSocket);
                }
              });
  }

  bool _done([error]) {
    if (_completer == null) return false;
    if (error != null) {
      _completer.completeError(error);
    } else {
      _completer.complete(webSocket);
    }
    _completer = null;
    return true;
  }

  Future addStream(var stream) {
    _ensureController();
    _completer = new Completer();
    _subscription = stream.listen(
        (data) {
          _reportError = true;
          _controller.add(data);
        },
        onDone: () {
          _done();
        },
        onError: (error) {
          _done(error);
        },
        cancelOnError: true);
    if (_issuedPause) {
      _subscription.pause();
      _issuedPause = false;
    }
    return _completer.future;
  }

  Future close() {
    _ensureController();
    Future closeSocket() {
      return socket.close().then((_) => webSocket);
    }
    _controller.close();
    return _closeCompleter.future.then((_) => closeSocket());
  }

  void add(data) {
    _ensureController();
    _reportError = false;
    _controller.add(data);
  }
}


class _WebSocketImpl extends Stream implements WebSocket {
  StreamController _controller;
  StreamSubscription _subscription;
  StreamSink _sink;

  final Socket _socket;
  final bool _serverSide;
  int _readyState = WebSocket.CONNECTING;
  bool _writeClosed = false;
  int _closeCode;
  String _closeReason;
  Duration _pingInterval;
  Timer _pingTimer;
  _WebSocketConsumer _consumer;

  int _outCloseCode;
  String _outCloseReason;

  static final HttpClient _httpClient = new HttpClient();

  static Future<WebSocket> connect(String url, [protocols]) {
    Uri uri = Uri.parse(url);
    if (uri.scheme != "ws" && uri.scheme != "wss") {
      throw new WebSocketException("Unsupported URL scheme '${uri.scheme}'");
    }
    if (uri.userInfo != "") {
      throw new WebSocketException("Unsupported user info '${uri.userInfo}'");
    }

    Random random = new Random();
    // Generate 16 random bytes.
    List<int> nonceData = new List<int>(16);
    for (int i = 0; i < 16; i++) {
      nonceData[i] = random.nextInt(256);
    }
    String nonce = _CryptoUtils.bytesToBase64(nonceData);

    uri = new Uri(scheme: uri.scheme == "wss" ? "https" : "http",
                  userInfo: uri.userInfo,
                  host: uri.host,
                  port: uri.port,
                  path: uri.path,
                  query: uri.query,
                  fragment: uri.fragment);
    return _httpClient.openUrl("GET", uri)
      .then((request) {
        // Setup the initial handshake.
        request.headers.add(HttpHeaders.CONNECTION, "upgrade");
        request.headers.set(HttpHeaders.UPGRADE, "websocket");
        request.headers.set("Sec-WebSocket-Key", nonce);
        request.headers.set("Sec-WebSocket-Version", "13");
        return request.close();
      })
      .then((response) {
        void error(String message) {
          // Flush data.
          response.detachSocket().then((socket) {
            socket.destroy();
          });
          throw new WebSocketException(message);
        }
        if (response.statusCode != HttpStatus.SWITCHING_PROTOCOLS ||
            response.headers[HttpHeaders.CONNECTION] == null ||
            !response.headers[HttpHeaders.CONNECTION].any(
                (value) => value.toLowerCase() == "upgrade") ||
            response.headers.value(HttpHeaders.UPGRADE).toLowerCase() !=
                "websocket") {
          error("Connection to '$uri' was not upgraded to websocket");
        }
        String accept = response.headers.value("Sec-WebSocket-Accept");
        if (accept == null) {
          error("Response did not contain a 'Sec-WebSocket-Accept' header");
        }
        _SHA1 sha1 = new _SHA1();
        sha1.add("$nonce$_webSocketGUID".codeUnits);
        List<int> expectedAccept = sha1.close();
        List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept);
        if (expectedAccept.length != receivedAccept.length) {
          error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length");
        }
        for (int i = 0; i < expectedAccept.length; i++) {
          if (expectedAccept[i] != receivedAccept[i]) {
            error("Bad response 'Sec-WebSocket-Accept' header");
          }
        }
        return response.detachSocket()
            .then((socket) => new _WebSocketImpl._fromSocket(socket));
      });
  }

  _WebSocketImpl._fromSocket(Socket this._socket,
                             [bool this._serverSide = false]) {
    _consumer = new _WebSocketConsumer(this, _socket);
    _sink = new _StreamSinkImpl(_consumer);
    _readyState = WebSocket.OPEN;

    var transformer = new _WebSocketProtocolTransformer(_serverSide);
    _subscription = _socket.transform(transformer).listen(
        (data) {
          if (data is _WebSocketPing) {
            if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload));
          } else if (data is _WebSocketPong) {
            // Simply set pingInterval, as it'll cancel any timers.
            pingInterval = _pingInterval;
          } else {
            _controller.add(data);
          }
        },
        onError: (error) {
          if (error is ArgumentError) {
            close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA);
          } else {
            close(WebSocketStatus.PROTOCOL_ERROR);
          }
          _controller.addError(error);
          _controller.close();
        },
        onDone: () {
          if (_readyState == WebSocket.OPEN) {
            _readyState = WebSocket.CLOSING;
            if (!_isReservedStatusCode(transformer.closeCode)) {
              close(transformer.closeCode);
            } else {
              close();
            }
            _readyState = WebSocket.CLOSED;
          }
          _closeCode = transformer.closeCode;
          _closeReason = transformer.closeReason;
          _controller.close();
        },
        cancelOnError: true);
    _subscription.pause();
    _controller = new StreamController(sync: true,
                                       onListen: _subscription.resume,
                                       onPause: _subscription.pause,
                                       onResume: _subscription.resume);
  }

  StreamSubscription listen(void onData(message),
                            {void onError(error),
                             void onDone(),
                             bool cancelOnError}) {
    return _controller.stream.listen(onData,
                                     onError: onError,
                                     onDone: onDone,
                                     cancelOnError: cancelOnError);
  }

  Duration get pingInterval => _pingInterval;

  void set pingInterval(Duration interval) {
    if (_writeClosed) return;
    if (_pingTimer != null) _pingTimer.cancel();
    _pingInterval = interval;

    if (_pingInterval == null) return;

    _pingTimer = new Timer(_pingInterval, () {
      if (_writeClosed) return;
      _consumer.add(new _WebSocketPing());
      _pingTimer = new Timer(_pingInterval, () {
        // No pong received.
        close(WebSocketStatus.GOING_AWAY);
      });
    });
  }

  int get readyState => _readyState;

  String get extensions => null;
  String get protocol => null;
  int get closeCode => _closeCode;
  String get closeReason => _closeReason;

  void add(data) => _sink.add(data);
  void addError(error) => _sink.addError(error);
  Future addStream(Stream stream) => _sink.addStream(stream);
  Future get done => _sink.done;

  Future close([int code, String reason]) {
    if (!_writeClosed) {
      if (_isReservedStatusCode(code)) {
        throw new WebSocketException("Reserved status code $code");
      }
      _outCloseCode = code;
      _outCloseReason = reason;
      _writeClosed = true;
    }
    return _sink.close();
  }

  static bool _isReservedStatusCode(int code) {
    return code != null &&
           (code < WebSocketStatus.NORMAL_CLOSURE ||
            code == WebSocketStatus.RESERVED_1004 ||
            code == WebSocketStatus.NO_STATUS_RECEIVED ||
            code == WebSocketStatus.ABNORMAL_CLOSURE ||
            (code > WebSocketStatus.INTERNAL_SERVER_ERROR &&
             code < WebSocketStatus.RESERVED_1015) ||
            (code >= WebSocketStatus.RESERVED_1015 &&
             code < 3000));
  }
}
