Fix bug in subscription handling in mime package

The parser now parses each received chunk to the end no matter what the
paused state of the part stream is. This will add data to the part
stream controller before it is even listened on, but only the data already
received from the original input stream.

Not trying to stop the parsing on a received chunk when the part stream
is paused makes the code simpler to reason about.

The two added tests modes failed without this change. Either by hanging
or by hitting an exception trying to invoke the getter 'length' on null.

R=kustermann@google.com
BUG=

Review URL: https://codereview.chromium.org//934763004

git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart/pkg/mime@43835 260f80e4-7a28-3924-810f-c04153c831b5
diff --git a/lib/src/bound_multipart_stream.dart b/lib/src/bound_multipart_stream.dart
index 7d16b96..ab06dab 100644
--- a/lib/src/bound_multipart_stream.dart
+++ b/lib/src/bound_multipart_stream.dart
@@ -105,7 +105,7 @@
      _controller = new StreamController(
          sync: true,
          onPause: _pauseStream,
-         onResume:_resumeStream,
+         onResume: _resumeStream,
          onCancel: () {
            _controllerState = _CONTROLLER_STATE_CANCELED;
            _tryPropagateControllerState();
@@ -203,9 +203,6 @@
      boundaryPrefix = _boundaryIndex;
 
      while ((_index < _buffer.length) && _state != _FAIL && _state != _DONE) {
-       if (_multipartController != null && _multipartController.isPaused) {
-         return;
-       }
        int byte;
        if (_index < 0) {
          byte = _boundary[boundaryPrefix + _index];
@@ -315,11 +312,11 @@
            _expectByteValue(byte, CharCode.LF);
            _multipartController = new StreamController(
                sync: true,
+               onListen: () {
+                 if (_subscription.isPaused) _subscription.resume();
+               },
                onPause: _subscription.pause,
-               onResume: () {
-                 _subscription.resume();
-                 _parse();
-               });
+               onResume: _subscription.resume);
            _controller.add(
                new _MimeMultipart(_headers, _multipartController.stream));
            _headers = null;
diff --git a/pubspec.yaml b/pubspec.yaml
index e29681c..2de7eca 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: mime
-version: 0.9.2
+version: 0.9.3
 author: Dart Team <misc@dartlang.org>
 description: Helper-package for working with MIME.
 homepage: http://www.dartlang.org
diff --git a/test/mime_multipart_transformer_test.dart b/test/mime_multipart_transformer_test.dart
index 86ab551..5038999 100644
--- a/test/mime_multipart_transformer_test.dart
+++ b/test/mime_multipart_transformer_test.dart
@@ -24,11 +24,18 @@
 }
 
 
-void _testParse(String message,
-               String boundary,
-               [List<Map> expectedHeaders,
-                List expectedParts,
-                bool expectError = false]) {
+enum TestMode {
+  IMMEDIATE_LISTEN,
+  DELAY_LISTEN,
+  PAUSE_RESUME
+}
+
+void _runParseTest(String message,
+                   String boundary,
+                   TestMode mode,
+                   [List<Map> expectedHeaders,
+                    List expectedParts,
+                    bool expectError = false]) {
   Future testWrite(List<int> data, [int chunkSize = -1]) {
     StreamController controller = new StreamController(sync: true);
 
@@ -42,12 +49,46 @@
       if (expectedHeaders != null) {
         expect(multipart.headers, equals(expectedHeaders[part]));
       }
-      futures.add(multipart.fold([], (buffer, data) => buffer..addAll(data))
-          .then((data) {
-            if (expectedParts[part] != null) {
-              expect(data, equals(expectedParts[part].codeUnits));
-            }
+      switch (mode) {
+        case TestMode.IMMEDIATE_LISTEN:
+          futures.add(multipart.fold([], (buffer, data) => buffer..addAll(data))
+              .then((data) {
+                if (expectedParts[part] != null) {
+                  expect(data, equals(expectedParts[part].codeUnits));
+                }
+              }));
+          break;
+
+        case TestMode.DELAY_LISTEN:
+          futures.add(new Future(() {
+            return multipart.fold([], (buffer, data) => buffer..addAll(data))
+                .then((data) {
+                  if (expectedParts[part] != null) {
+                    expect(data, equals(expectedParts[part].codeUnits));
+                  }
+                });
           }));
+          break;
+
+        case TestMode.PAUSE_RESUME:
+          var completer = new Completer();
+          futures.add(completer.future);
+          var buffer = [];
+          var subscription;
+          subscription = multipart.listen(
+              (data) {
+                buffer.addAll(data);
+                subscription.pause();
+                new Future(() => subscription.resume());
+              },
+              onDone: () {
+                if (expectedParts[part] != null) {
+                  expect(buffer, equals(expectedParts[part].codeUnits));
+                }
+                completer.complete();
+              });
+          break;
+      }
     }, onError: (error) {
       if (!expectError) throw error;
     }, onDone: () {
@@ -162,6 +203,22 @@
   }
 }
 
+void _testParse(String message,
+                String boundary,
+               [List<Map> expectedHeaders,
+                List expectedParts,
+                bool expectError = false]) {
+  _runParseTest(
+      message, boundary, TestMode.IMMEDIATE_LISTEN,
+      expectedHeaders, expectedParts, expectError);
+  _runParseTest(
+      message, boundary, TestMode.DELAY_LISTEN,
+      expectedHeaders, expectedParts, expectError);
+  _runParseTest(
+      message, boundary, TestMode.PAUSE_RESUME,
+      expectedHeaders, expectedParts, expectError);
+}
+
 void _testParseValid() {
   // Empty message from Chrome form post.
   var message = '------WebKitFormBoundaryU3FBruSkJKG0Yor1--\r\n';