blob: 0b5ae7a7a160b38d1c2b5de2a4176369a2f8c985 [file] [log] [blame]
// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'package:http2/src/async_utils/async_utils.dart';
import 'package:http2/src/flowcontrol/queue_messages.dart';
import 'package:http2/src/flowcontrol/stream_queues.dart';
import 'package:http2/transport.dart';
import 'package:mockito/mockito.dart';
import 'package:test/test.dart';
import 'mocks.mocks.dart';
void main() {
group('flowcontrol', () {
const STREAM_ID = 99;
const BYTES = [1, 2, 3];
group('stream-message-queue-out', () {
test('window-big-enough', () {
var connectionQueueMock = MockConnectionMessageQueueOut();
when(connectionQueueMock.enqueueMessage(any)).thenReturn(null);
var windowMock = MockOutgoingStreamWindowHandler();
when(windowMock.positiveWindow).thenReturn(BufferIndicator());
when(windowMock.decreaseWindow(any)).thenReturn(null);
windowMock.positiveWindow.markUnBuffered();
var queue =
StreamMessageQueueOut(STREAM_ID, windowMock, connectionQueueMock);
expect(queue.bufferIndicator.wouldBuffer, isFalse);
expect(queue.pendingMessages, 0);
when(windowMock.peerWindowSize).thenReturn(BYTES.length);
queue.enqueueMessage(DataMessage(STREAM_ID, BYTES, true));
verify(windowMock.decreaseWindow(BYTES.length)).called(1);
final capturedMessage =
verify(connectionQueueMock.enqueueMessage(captureAny))
.captured
.single;
expect(capturedMessage, const TypeMatcher<DataMessage>());
var capturedDataMessage = capturedMessage as DataMessage;
expect(capturedDataMessage.bytes, BYTES);
expect(capturedDataMessage.endStream, isTrue);
});
test('window-smaller-than-necessary', () {
var connectionQueueMock = MockConnectionMessageQueueOut();
when(connectionQueueMock.enqueueMessage(any)).thenReturn(null);
var windowMock = MockOutgoingStreamWindowHandler();
when(windowMock.positiveWindow).thenReturn(BufferIndicator());
when(windowMock.decreaseWindow(any)).thenReturn(null);
windowMock.positiveWindow.markUnBuffered();
var queue =
StreamMessageQueueOut(STREAM_ID, windowMock, connectionQueueMock);
expect(queue.bufferIndicator.wouldBuffer, isFalse);
expect(queue.pendingMessages, 0);
// We set the window size fixed to 1, which means all the data messages
// will get fragmented to 1 byte.
when(windowMock.peerWindowSize).thenReturn(1);
queue.enqueueMessage(DataMessage(STREAM_ID, BYTES, true));
expect(queue.pendingMessages, 0);
verify(windowMock.decreaseWindow(1)).called(BYTES.length);
final messages =
verify(connectionQueueMock.enqueueMessage(captureAny)).captured;
expect(messages, hasLength(BYTES.length));
for (var counter = 0; counter < messages.length; counter++) {
expect(messages[counter], const TypeMatcher<DataMessage>());
var dataMessage = messages[counter] as DataMessage;
expect(dataMessage.bytes, BYTES.sublist(counter, counter + 1));
expect(dataMessage.endStream, counter == BYTES.length - 1);
}
verify(windowMock.positiveWindow).called(greaterThan(0));
verify(windowMock.peerWindowSize).called(greaterThan(0));
verifyNoMoreInteractions(windowMock);
});
test('window-empty', () {
var connectionQueueMock = MockConnectionMessageQueueOut();
var windowMock = MockOutgoingStreamWindowHandler();
when(windowMock.positiveWindow).thenReturn(BufferIndicator());
windowMock.positiveWindow.markUnBuffered();
var queue =
StreamMessageQueueOut(STREAM_ID, windowMock, connectionQueueMock);
expect(queue.bufferIndicator.wouldBuffer, isFalse);
expect(queue.pendingMessages, 0);
when(windowMock.peerWindowSize).thenReturn(0);
queue.enqueueMessage(DataMessage(STREAM_ID, BYTES, true));
expect(queue.bufferIndicator.wouldBuffer, isTrue);
expect(queue.pendingMessages, 1);
verify(windowMock.positiveWindow).called(greaterThan(0));
verify(windowMock.peerWindowSize).called(greaterThan(0));
verifyNoMoreInteractions(windowMock);
verifyZeroInteractions(connectionQueueMock);
});
});
group('stream-message-queue-in', () {
test('data-end-of-stream', () {
var windowMock = MockIncomingWindowHandler();
when(windowMock.gotData(any)).thenReturn(null);
when(windowMock.dataProcessed(any)).thenReturn(null);
var queue = StreamMessageQueueIn(windowMock);
expect(queue.pendingMessages, 0);
queue.messages.listen(expectAsync1((StreamMessage message) {
expect(message, isA<DataStreamMessage>());
var dataMessage = message as DataStreamMessage;
expect(dataMessage.bytes, BYTES);
}), onDone: expectAsync0(() {}));
queue.enqueueMessage(DataMessage(STREAM_ID, BYTES, true));
expect(queue.bufferIndicator.wouldBuffer, isFalse);
verifyInOrder([
windowMock.gotData(BYTES.length),
windowMock.dataProcessed(BYTES.length)
]);
verifyNoMoreInteractions(windowMock);
});
});
test('data-end-of-stream--paused', () {
const STREAM_ID = 99;
final bytes = [1, 2, 3];
var windowMock = MockIncomingWindowHandler();
when(windowMock.gotData(any)).thenReturn(null);
var queue = StreamMessageQueueIn(windowMock);
var sub = queue.messages.listen(expectAsync1((_) {}, count: 0),
onDone: expectAsync0(() {}, count: 0));
sub.pause();
expect(queue.pendingMessages, 0);
queue.enqueueMessage(DataMessage(STREAM_ID, bytes, true));
expect(queue.pendingMessages, 1);
expect(queue.bufferIndicator.wouldBuffer, isTrue);
// We assert that we got the data, but it wasn't processed.
verify(windowMock.gotData(bytes.length)).called(1);
// verifyNever(windowMock.dataProcessed(any));
});
// TODO: Add tests for Headers/HeadersPush messages.
});
}