blob: cbeb147a3aa9959e8ff3b8b354ce789c4723b3a0 [file] [log] [blame]
// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'package:http2/src/async_utils/async_utils.dart';
import 'package:http2/src/flowcontrol/queue_messages.dart';
import 'package:http2/src/flowcontrol/stream_queues.dart';
import 'package:http2/transport.dart';
import 'package:mockito/mockito.dart';
import 'package:test/test.dart';
import 'mocks.mocks.dart';
void main() {
group('flowcontrol', () {
const STREAM_ID = 99;
const BYTES = [1, 2, 3];
group('stream-message-queue-out', () {
test('window-big-enough', () {
var connectionQueueMock = MockConnectionMessageQueueOut();
when(connectionQueueMock.enqueueMessage(any)).thenReturn(null);
var windowMock = MockOutgoingStreamWindowHandler();
when(windowMock.positiveWindow).thenReturn(BufferIndicator());
when(windowMock.decreaseWindow(any)).thenReturn(null);
windowMock.positiveWindow.markUnBuffered();
var queue = StreamMessageQueueOut(
STREAM_ID,
windowMock,
connectionQueueMock,
);
expect(queue.bufferIndicator.wouldBuffer, isFalse);
expect(queue.pendingMessages, 0);
when(windowMock.peerWindowSize).thenReturn(BYTES.length);
queue.enqueueMessage(DataMessage(STREAM_ID, BYTES, true));
verify(windowMock.decreaseWindow(BYTES.length)).called(1);
final capturedMessage =
verify(
connectionQueueMock.enqueueMessage(captureAny),
).captured.single;
expect(capturedMessage, const TypeMatcher<DataMessage>());
var capturedDataMessage = capturedMessage as DataMessage;
expect(capturedDataMessage.bytes, BYTES);
expect(capturedDataMessage.endStream, isTrue);
});
test('window-smaller-than-necessary', () {
var connectionQueueMock = MockConnectionMessageQueueOut();
when(connectionQueueMock.enqueueMessage(any)).thenReturn(null);
var windowMock = MockOutgoingStreamWindowHandler();
when(windowMock.positiveWindow).thenReturn(BufferIndicator());
when(windowMock.decreaseWindow(any)).thenReturn(null);
windowMock.positiveWindow.markUnBuffered();
var queue = StreamMessageQueueOut(
STREAM_ID,
windowMock,
connectionQueueMock,
);
expect(queue.bufferIndicator.wouldBuffer, isFalse);
expect(queue.pendingMessages, 0);
// We set the window size fixed to 1, which means all the data messages
// will get fragmented to 1 byte.
when(windowMock.peerWindowSize).thenReturn(1);
queue.enqueueMessage(DataMessage(STREAM_ID, BYTES, true));
expect(queue.pendingMessages, 0);
verify(windowMock.decreaseWindow(1)).called(BYTES.length);
final messages =
verify(connectionQueueMock.enqueueMessage(captureAny)).captured;
expect(messages, hasLength(BYTES.length));
for (var counter = 0; counter < messages.length; counter++) {
expect(messages[counter], const TypeMatcher<DataMessage>());
var dataMessage = messages[counter] as DataMessage;
expect(dataMessage.bytes, BYTES.sublist(counter, counter + 1));
expect(dataMessage.endStream, counter == BYTES.length - 1);
}
verify(windowMock.positiveWindow).called(greaterThan(0));
verify(windowMock.peerWindowSize).called(greaterThan(0));
verifyNoMoreInteractions(windowMock);
});
test('window-empty', () {
var connectionQueueMock = MockConnectionMessageQueueOut();
var windowMock = MockOutgoingStreamWindowHandler();
when(windowMock.positiveWindow).thenReturn(BufferIndicator());
windowMock.positiveWindow.markUnBuffered();
var queue = StreamMessageQueueOut(
STREAM_ID,
windowMock,
connectionQueueMock,
);
expect(queue.bufferIndicator.wouldBuffer, isFalse);
expect(queue.pendingMessages, 0);
when(windowMock.peerWindowSize).thenReturn(0);
queue.enqueueMessage(DataMessage(STREAM_ID, BYTES, true));
expect(queue.bufferIndicator.wouldBuffer, isTrue);
expect(queue.pendingMessages, 1);
verify(windowMock.positiveWindow).called(greaterThan(0));
verify(windowMock.peerWindowSize).called(greaterThan(0));
verifyNoMoreInteractions(windowMock);
verifyZeroInteractions(connectionQueueMock);
});
});
group('stream-message-queue-in', () {
test('data-end-of-stream', () {
var windowMock = MockIncomingWindowHandler();
when(windowMock.gotData(any)).thenReturn(null);
when(windowMock.dataProcessed(any)).thenReturn(null);
var queue = StreamMessageQueueIn(windowMock);
expect(queue.pendingMessages, 0);
queue.messages.listen(
expectAsync1((StreamMessage message) {
expect(message, isA<DataStreamMessage>());
var dataMessage = message as DataStreamMessage;
expect(dataMessage.bytes, BYTES);
}),
onDone: expectAsync0(() {}),
);
queue.enqueueMessage(DataMessage(STREAM_ID, BYTES, true));
expect(queue.bufferIndicator.wouldBuffer, isFalse);
verifyInOrder([
windowMock.gotData(BYTES.length),
windowMock.dataProcessed(BYTES.length),
]);
verifyNoMoreInteractions(windowMock);
});
});
test('data-end-of-stream--paused', () {
const STREAM_ID = 99;
final bytes = [1, 2, 3];
var windowMock = MockIncomingWindowHandler();
when(windowMock.gotData(any)).thenReturn(null);
var queue = StreamMessageQueueIn(windowMock);
var sub = queue.messages.listen(
expectAsync1((_) {}, count: 0),
onDone: expectAsync0(() {}, count: 0),
);
sub.pause();
expect(queue.pendingMessages, 0);
queue.enqueueMessage(DataMessage(STREAM_ID, bytes, true));
expect(queue.pendingMessages, 1);
expect(queue.bufferIndicator.wouldBuffer, isTrue);
// We assert that we got the data, but it wasn't processed.
verify(windowMock.gotData(bytes.length)).called(1);
// verifyNever(windowMock.dataProcessed(any));
});
// TODO: Add tests for Headers/HeadersPush messages.
});
}