Add an unhandledError callback (#37)

Allows clients to handle or rethrow errors instead of silently swallowing them.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index c5f9542..878e1f0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,8 @@
+## 2.1.0
+
+* `Server` and related classes can now take an `onUnhandledError` callback to
+  notify callers of unhandled exceptions.
+
 ## 2.0.10
 
 * Allow `stream_channel` version 2.x
diff --git a/lib/src/peer.dart b/lib/src/peer.dart
index 18cac07..7176763 100644
--- a/lib/src/peer.dart
+++ b/lib/src/peer.dart
@@ -39,13 +39,20 @@
   Future get done => _manager.done;
   bool get isClosed => _manager.isClosed;
 
+  @override
+  ErrorCallback get onUnhandledError => _server?.onUnhandledError;
+
   /// Creates a [Peer] that communicates over [channel].
   ///
   /// Note that the peer won't begin listening to [channel] until [Peer.listen]
   /// is called.
-  Peer(StreamChannel<String> channel)
+  ///
+  /// Unhandled exceptions in callbacks will be forwarded to [onUnhandledError].
+  /// If this is not provided, unhandled exceptions will be swallowed.
+  Peer(StreamChannel<String> channel, {ErrorCallback onUnhandledError})
       : this.withoutJson(
-            jsonDocument.bind(channel).transform(respondToFormatExceptions));
+            jsonDocument.bind(channel).transform(respondToFormatExceptions),
+            onUnhandledError: onUnhandledError);
 
   /// Creates a [Peer] that communicates using decoded messages over [channel].
   ///
@@ -54,10 +61,14 @@
   ///
   /// Note that the peer won't begin listening to [channel] until
   /// [Peer.listen] is called.
-  Peer.withoutJson(StreamChannel channel)
+  ///
+  /// Unhandled exceptions in callbacks will be forwarded to [onUnhandledError].
+  /// If this is not provided, unhandled exceptions will be swallowed.
+  Peer.withoutJson(StreamChannel channel, {ErrorCallback onUnhandledError})
       : _manager = new ChannelManager("Peer", channel) {
     _server = new Server.withoutJson(
-        new StreamChannel(_serverIncomingForwarder.stream, channel.sink));
+        new StreamChannel(_serverIncomingForwarder.stream, channel.sink),
+        onUnhandledError: onUnhandledError);
     _client = new Client.withoutJson(
         new StreamChannel(_clientIncomingForwarder.stream, channel.sink));
   }
diff --git a/lib/src/server.dart b/lib/src/server.dart
index 0aa4c2f..5c3b132 100644
--- a/lib/src/server.dart
+++ b/lib/src/server.dart
@@ -15,6 +15,9 @@
 import 'parameters.dart';
 import 'utils.dart';
 
+/// A callback for unhandled exceptions.
+typedef ErrorCallback = void Function(dynamic error, dynamic stackTrace);
+
 /// A JSON-RPC 2.0 server.
 ///
 /// A server exposes methods that are called by requests, to which it provides
@@ -51,13 +54,24 @@
   /// endpoint closes the connection.
   bool get isClosed => _manager.isClosed;
 
+  /// A callback that is fired on unhandled exceptions.
+  ///
+  /// In the case where a user provided callback results in an exception that
+  /// cannot be properly routed back to the client, this handler will be
+  /// invoked. If it is not set, the exception will be swallowed.
+  final ErrorCallback onUnhandledError;
+
   /// Creates a [Server] that communicates over [channel].
   ///
   /// Note that the server won't begin listening to [requests] until
   /// [Server.listen] is called.
-  Server(StreamChannel<String> channel)
+  ///
+  /// Unhandled exceptions in callbacks will be forwarded to [onUnhandledError].
+  /// If this is not provided, unhandled exceptions will be swallowed.
+  Server(StreamChannel<String> channel, {ErrorCallback onUnhandledError})
       : this.withoutJson(
-            jsonDocument.bind(channel).transform(respondToFormatExceptions));
+            jsonDocument.bind(channel).transform(respondToFormatExceptions),
+            onUnhandledError: onUnhandledError);
 
   /// Creates a [Server] that communicates using decoded messages over
   /// [channel].
@@ -67,7 +81,10 @@
   ///
   /// Note that the server won't begin listening to [requests] until
   /// [Server.listen] is called.
-  Server.withoutJson(StreamChannel channel)
+  ///
+  /// Unhandled exceptions in callbacks will be forwarded to [onUnhandledError].
+  /// If this is not provided, unhandled exceptions will be swallowed.
+  Server.withoutJson(StreamChannel channel, {this.onUnhandledError})
       : _manager = new ChannelManager("Server", channel);
 
   /// Starts listening to the underlying stream.
@@ -175,9 +192,11 @@
             request.containsKey('id')) {
           return error.serialize(request);
         } else {
+          onUnhandledError?.call(error, stackTrace);
           return null;
         }
       } else if (!request.containsKey('id')) {
+        onUnhandledError?.call(error, stackTrace);
         return null;
       }
       final chain = new Chain.forTrace(stackTrace);
diff --git a/pubspec.yaml b/pubspec.yaml
index 799fb88..40c0293 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: json_rpc_2
-version: 2.0.10
+version: 2.1.0
 author: Dart Team <misc@dartlang.org>
 description: >-
   Utilities to write a client or server using the JSON-RPC 2.0 spec.
diff --git a/test/peer_test.dart b/test/peer_test.dart
index 94c49e6..491526f 100644
--- a/test/peer_test.dart
+++ b/test/peer_test.dart
@@ -184,4 +184,24 @@
       incoming.add({"completely": "wrong"});
     });
   });
+
+  test("can notify on unhandled errors for if the method throws", () async {
+    Exception exception = Exception('test exception');
+    var incomingController = new StreamController();
+    var outgoingController = new StreamController();
+    final Completer<Exception> completer = Completer<Exception>();
+    peer = new json_rpc.Peer.withoutJson(
+      new StreamChannel(incomingController.stream, outgoingController),
+      onUnhandledError: (error, stack) {
+        completer.complete(error);
+      },
+    );
+    peer
+      ..registerMethod('foo', () => throw exception)
+      ..listen();
+
+    incomingController.add({'jsonrpc': '2.0', 'method': 'foo'});
+    Exception receivedException = await completer.future;
+    expect(receivedException, equals(exception));
+  });
 }
diff --git a/test/server/utils.dart b/test/server/utils.dart
index 079201a..ef089ad 100644
--- a/test/server/utils.dart
+++ b/test/server/utils.dart
@@ -23,9 +23,10 @@
   json_rpc.Server get server => _server;
   json_rpc.Server _server;
 
-  ServerController() {
+  ServerController({json_rpc.ErrorCallback onUnhandledError}) {
     _server = new json_rpc.Server(
-        new StreamChannel(_requestController.stream, _responseController.sink));
+        new StreamChannel(_requestController.stream, _responseController.sink),
+        onUnhandledError: onUnhandledError);
     _server.listen();
   }