Share a body across all versions of a message.

This means that when one message is read, all other messages that were
created from the same original via `change()` will be marked read as
well.

Closes #49

R=rnystrom@google.com

Review URL: https://codereview.chromium.org//1327453002 .
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ff57393..a01f6e9 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,11 @@
+## 0.6.3
+
+* Messages returned by `Request.change()` and `Response.change()` are marked
+  read whenever the original message is read, and vice-versa. This means that
+  it's possible to read a message on which `change()` has been called and to
+  call `change()` on a message more than once, as long as `read()` is called on
+  only one of those messages.
+
 ## 0.6.2+1
 
 * Support `http_parser` 1.0.0.
diff --git a/lib/src/body.dart b/lib/src/body.dart
new file mode 100644
index 0000000..5d39be5
--- /dev/null
+++ b/lib/src/body.dart
@@ -0,0 +1,60 @@
+// Copyright (c) 2015, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library shelf.body;
+
+import 'dart:async';
+import 'dart:convert';
+
+/// The body of a request or response.
+///
+/// This tracks whether the body has been read. It's separate from [Message]
+/// because the message may be changed with [Message.change], but each instance
+/// should share a notion of whether the body was read.
+class Body {
+  /// The contents of the message body.
+  ///
+  /// This will be `null` after [read] is called.
+  Stream<List<int>> _stream;
+
+  Body._(this._stream);
+
+  /// Converts [body] to a byte stream and wraps it in a [Body].
+  ///
+  /// [body] may be either a [Body], a [String], a [Stream<List<int>>], or
+  /// `null`. If it's a [String], [encoding] will be used to convert it to a
+  /// [Stream<List<int>>].
+  factory Body(body, [Encoding encoding]) {
+    if (encoding == null) encoding = UTF8;
+
+    if (body is Body) return body;
+
+    var stream;
+    if (body == null) {
+      stream = new Stream.fromIterable([]);
+    } else if (body is String) {
+      stream = new Stream.fromIterable([encoding.encode(body)]);
+    } else if (body is Stream) {
+      stream = body;
+    } else {
+      throw new ArgumentError('Response body "$body" must be a String or a '
+          'Stream.');
+    }
+
+    return new Body._(stream);
+  }
+
+  /// Returns a [Stream] representing the body.
+  ///
+  /// Can only be called once.
+  Stream<List<int>> read() {
+    if (_stream == null) {
+      throw new StateError("The 'read' method can only be called once on a "
+          "shelf.Request/shelf.Response object.");
+    }
+    var stream = _stream;
+    _stream = null;
+    return stream;
+  }
+}
diff --git a/lib/src/message.dart b/lib/src/message.dart
index 93a0730..0d41b53 100644
--- a/lib/src/message.dart
+++ b/lib/src/message.dart
@@ -9,9 +9,12 @@
 
 import 'package:http_parser/http_parser.dart';
 
+import 'body.dart';
 import 'shelf_unmodifiable_map.dart';
 import 'util.dart';
 
+Body getBody(Message message) => message._body;
+
 /// Represents logic shared between [Request] and [Response].
 abstract class Message {
   /// The HTTP headers.
@@ -35,13 +38,7 @@
   /// The streaming body of the message.
   ///
   /// This can be read via [read] or [readAsString].
-  final Stream<List<int>> _body;
-
-  /// This boolean indicates whether [_body] has been read.
-  ///
-  /// After calling [read], or [readAsString] (which internally calls [read]),
-  /// this will be `true`.
-  bool _bodyWasRead = false;
+  final Body _body;
 
   /// Creates a new [Message].
   ///
@@ -57,7 +54,7 @@
   /// Content-Type header, it will be set to "application/octet-stream".
   Message(body, {Encoding encoding, Map<String, String> headers,
       Map<String, Object> context})
-      : this._body = _bodyToStream(body, encoding),
+      : this._body = new Body(body, encoding),
         this.headers = new ShelfUnmodifiableMap<String>(
             _adjustHeaders(headers, encoding), ignoreKeyCase: true),
         this.context = new ShelfUnmodifiableMap<Object>(context,
@@ -114,14 +111,7 @@
   /// Returns a [Stream] representing the body.
   ///
   /// Can only be called once.
-  Stream<List<int>> read() {
-    if (_bodyWasRead) {
-      throw new StateError("The 'read' method can only be called once on a "
-          "shelf.Request/shelf.Response object.");
-    }
-    _bodyWasRead = true;
-    return _body;
-  }
+  Stream<List<int>> read() => _body.read();
 
   /// Returns a [Future] containing the body as a String.
   ///
@@ -142,20 +132,6 @@
       body});
 }
 
-/// Converts [body] to a byte stream.
-///
-/// [body] may be either a [String], a [Stream<List<int>>], or `null`. If it's a
-/// [String], [encoding] will be used to convert it to a [Stream<List<int>>].
-Stream<List<int>> _bodyToStream(body, Encoding encoding) {
-  if (encoding == null) encoding = UTF8;
-  if (body == null) return new Stream.fromIterable([]);
-  if (body is String) return new Stream.fromIterable([encoding.encode(body)]);
-  if (body is Stream) return body;
-
-  throw new ArgumentError('Response body "$body" must be a String or a '
-      'Stream.');
-}
-
 /// Adds information about [encoding] to [headers].
 ///
 /// Returns a new map without modifying [headers].
diff --git a/lib/src/request.dart b/lib/src/request.dart
index b7882fe..5c5f44c 100644
--- a/lib/src/request.dart
+++ b/lib/src/request.dart
@@ -210,7 +210,7 @@
     headers = updateMap(this.headers, headers);
     context = updateMap(this.context, context);
 
-    if (body == null) body = this.read();
+    if (body == null) body = getBody(this);
 
     var handlerPath = this.handlerPath;
     if (path != null) handlerPath += path;
diff --git a/lib/src/response.dart b/lib/src/response.dart
index 55863cb..79f8994 100644
--- a/lib/src/response.dart
+++ b/lib/src/response.dart
@@ -238,7 +238,7 @@
     headers = updateMap(this.headers, headers);
     context = updateMap(this.context, context);
 
-    if (body == null) body = this.read();
+    if (body == null) body = getBody(this);
 
     return new Response(this.statusCode, body: body, headers: headers,
         context: context);
diff --git a/pubspec.yaml b/pubspec.yaml
index 58bebab..28fc35a 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: shelf
-version: 0.6.2+1
+version: 0.6.3
 author: Dart Team <misc@dartlang.org>
 description: Web Server Middleware for Dart
 homepage: https://github.com/dart-lang/shelf
diff --git a/test/request_test.dart b/test/request_test.dart
index b02de54..d7d2031 100644
--- a/test/request_test.dart
+++ b/test/request_test.dart
@@ -278,5 +278,31 @@
         expect(() => request.change(path: 'di'), throwsArgumentError);
       });
     });
+
+    test("allows the original request to be read", () {
+      var request = _request();
+      var changed = request.change();
+
+      expect(request.read().toList(), completion(isEmpty));
+      expect(changed.read, throwsStateError);
+    });
+
+    test("allows the changed request to be read", () {
+      var request = _request();
+      var changed = request.change();
+
+      expect(changed.read().toList(), completion(isEmpty));
+      expect(request.read, throwsStateError);
+    });
+
+    test("allows another changed request to be read", () {
+      var request = _request();
+      var changed1 = request.change();
+      var changed2 = request.change();
+
+      expect(changed2.read().toList(), completion(isEmpty));
+      expect(changed1.read, throwsStateError);
+      expect(request.read, throwsStateError);
+    });
   });
 }
diff --git a/test/response_test.dart b/test/response_test.dart
index 8b829bb..57c5c1c 100644
--- a/test/response_test.dart
+++ b/test/response_test.dart
@@ -112,5 +112,32 @@
           ..close();
       });
     });
+
+
+    test("allows the original response to be read", () {
+      var response = new Response.ok(null);
+      var changed = response.change();
+
+      expect(response.read().toList(), completion(isEmpty));
+      expect(changed.read, throwsStateError);
+    });
+
+    test("allows the changed response to be read", () {
+      var response = new Response.ok(null);
+      var changed = response.change();
+
+      expect(changed.read().toList(), completion(isEmpty));
+      expect(response.read, throwsStateError);
+    });
+
+    test("allows another changed response to be read", () {
+      var response = new Response.ok(null);
+      var changed1 = response.change();
+      var changed2 = response.change();
+
+      expect(changed2.read().toList(), completion(isEmpty));
+      expect(changed1.read, throwsStateError);
+      expect(response.read, throwsStateError);
+    });
   });
 }