blob: 1cde81be6ae8f5c8b2bbf4dca65644ee1b9d6b33 [file] [log] [blame]
// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'package:http2/src/async_utils/async_utils.dart';
import 'package:http2/src/flowcontrol/connection_queues.dart';
import 'package:http2/src/flowcontrol/queue_messages.dart';
import 'package:http2/src/flowcontrol/window.dart';
import 'package:http2/src/flowcontrol/window_handler.dart';
import 'package:http2/src/frames/frames.dart';
import 'package:http2/transport.dart';
import 'package:mockito/mockito.dart';
import 'package:test/test.dart';
import 'mocks.mocks.dart';
void main() {
group('flowcontrol', () {
test('connection-message-queue-out', () {
var fw = MockFrameWriter();
when(fw.bufferIndicator).thenReturn(BufferIndicator());
when(fw.writeHeadersFrame(any, any, endStream: anyNamed('endStream')))
.thenReturn(null);
when(fw.writeDataFrame(any, any, endStream: anyNamed('endStream')))
.thenReturn(null);
var windowMock = MockOutgoingWindowHandler();
var queue = ConnectionMessageQueueOut(windowMock, fw);
fw.bufferIndicator.markUnBuffered();
expect(queue.pendingMessages, 0);
var headers = [Header.ascii('a', 'b')];
var bytes = [1, 2, 3];
// Send [HeadersMessage].
queue.enqueueMessage(HeadersMessage(99, headers, false));
expect(queue.pendingMessages, 0);
verify(fw.writeHeadersFrame(99, headers, endStream: false)).called(1);
verify(fw.bufferIndicator).called(greaterThan(0));
verifyNoMoreInteractions(fw);
verifyZeroInteractions(windowMock);
clearInteractions(fw);
// Send [DataMessage].
windowMock.peerWindowSize = bytes.length;
windowMock.positiveWindow.markUnBuffered();
queue.enqueueMessage(DataMessage(99, bytes, false));
expect(queue.pendingMessages, 0);
verify(windowMock.decreaseWindow(bytes.length)).called(1);
verify(fw.writeDataFrame(99, bytes, endStream: false)).called(1);
verifyNoMoreInteractions(windowMock);
verify(fw.bufferIndicator).called(greaterThan(0));
verifyNoMoreInteractions(fw);
clearInteractions(fw);
clearInteractions(windowMock);
// Send [DataMessage] if the connection window is too small.
// Should trigger fragmentation and should write 1 byte.
windowMock.peerWindowSize = 1;
// decreaseWindow() marks the window as buffered in this case, so we need
// our mock to do the same (otherwise, the call to markUnBuffered() below
// has no effect).
when(windowMock.decreaseWindow(1)).thenAnswer((_) {
windowMock.positiveWindow.markBuffered();
});
queue.enqueueMessage(DataMessage(99, bytes, true));
expect(queue.pendingMessages, 1);
verify(windowMock.decreaseWindow(1)).called(1);
verify(fw.bufferIndicator).called(greaterThan(0));
verify(fw.writeDataFrame(99, bytes.sublist(0, 1), endStream: false))
.called(1);
verifyNoMoreInteractions(windowMock);
verifyNoMoreInteractions(fw);
clearInteractions(fw);
reset(windowMock);
// Now mark it as unbuffered. This should write the rest of the
// [bytes.length - 1] bytes.
windowMock.peerWindowSize = bytes.length - 1;
windowMock.positiveWindow.markUnBuffered();
verify(windowMock.decreaseWindow(bytes.length - 1)).called(1);
verify(fw.writeDataFrame(99, bytes.sublist(1), endStream: true))
.called(1);
verifyNoMoreInteractions(windowMock);
verify(fw.bufferIndicator).called(greaterThan(0));
verifyNoMoreInteractions(fw);
queue.startClosing();
queue.done.then(expectAsync1((_) {
expect(queue.pendingMessages, 0);
expect(() => queue.enqueueMessage(DataMessage(99, bytes, true)),
throwsA(const TypeMatcher<StateError>()));
}));
});
test('connection-message-queue-in', () {
const STREAM_ID = 99;
final bytes = [1, 2, 3];
var windowMock = MockIncomingWindowHandler();
when(windowMock.gotData(any)).thenReturn(null);
when(windowMock.dataProcessed(any)).thenReturn(null);
var queue = ConnectionMessageQueueIn(windowMock, (f) => f());
expect(queue.pendingMessages, 0);
var streamQueueMock = MockStreamMessageQueueIn();
when(streamQueueMock.bufferIndicator).thenReturn(BufferIndicator());
when(streamQueueMock.enqueueMessage(any)).thenReturn(null);
queue.insertNewStreamMessageQueue(STREAM_ID, streamQueueMock);
// Insert a [DataFrame] and let it be buffered.
var header = FrameHeader(0, 0, 0, STREAM_ID);
queue.processDataFrame(DataFrame(header, 0, bytes));
expect(queue.pendingMessages, 1);
verify(windowMock.gotData(bytes.length)).called(1);
verifyNoMoreInteractions(windowMock);
verify(streamQueueMock.bufferIndicator).called(greaterThan(0));
verifyNoMoreInteractions(streamQueueMock);
clearInteractions(windowMock);
// Indicate that the stream queue has space, and make sure
// the data is propagated from the connection to the stream
// specific queue.
streamQueueMock.bufferIndicator.markUnBuffered();
verify(windowMock.dataProcessed(bytes.length)).called(1);
var capturedMessage = verify(streamQueueMock.enqueueMessage(captureAny))
.captured
.single as DataMessage;
expect(capturedMessage.streamId, STREAM_ID);
expect(capturedMessage.bytes, bytes);
verifyNoMoreInteractions(windowMock);
verify(streamQueueMock.bufferIndicator).called(greaterThan(0));
verifyNoMoreInteractions(streamQueueMock);
// TODO: Write tests for adding HeadersFrame/PushPromiseFrame.
});
test('connection-ignored-message-queue-in', () {
const STREAM_ID = 99;
final bytes = [1, 2, 3];
var windowMock = MockIncomingWindowHandler();
when(windowMock.gotData(any)).thenReturn(null);
var queue = ConnectionMessageQueueIn(windowMock, (f) => f());
// Insert a [DataFrame] and let it be buffered.
var header = FrameHeader(0, 0, 0, STREAM_ID);
queue.processIgnoredDataFrame(DataFrame(header, 0, bytes));
expect(queue.pendingMessages, 0);
verify(windowMock.dataProcessed(bytes.length)).called(1);
verifyNoMoreInteractions(windowMock);
});
});
}
class MockOutgoingWindowHandler extends Mock
implements OutgoingConnectionWindowHandler, OutgoingStreamWindowHandler {
@override
BufferIndicator positiveWindow = BufferIndicator();
@override
int peerWindowSize = Window().size;
}