Bug: webrtc:370878648 Change-Id: I4529c17f54c653864cca27097e44c843210b9c52 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/368061 Reviewed-by: Danil Chapovalov <danilchap@webrtc.org> Commit-Queue: Dor Hen <dorhen@meta.com> Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org> Reviewed-by: Harald Alvestrand <hta@webrtc.org> Cr-Commit-Position: refs/heads/main@{#43429}
768 lines
32 KiB
C++
768 lines
32 KiB
C++
/*
|
|
* Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
|
|
*
|
|
* Use of this source code is governed by a BSD-style license
|
|
* that can be found in the LICENSE file in the root of the source
|
|
* tree. An additional intellectual property rights grant can be found
|
|
* in the file PATENTS. All contributing project authors may
|
|
* be found in the AUTHORS file in the root of the source tree.
|
|
*/
|
|
#include "net/dcsctp/tx/stream_scheduler.h"
|
|
|
|
#include <vector>
|
|
|
|
#include "net/dcsctp/packet/sctp_packet.h"
|
|
#include "net/dcsctp/public/types.h"
|
|
#include "test/gmock.h"
|
|
|
|
namespace dcsctp {
|
|
namespace {
|
|
using ::testing::Return;
|
|
using ::testing::StrictMock;
|
|
using ::webrtc::Timestamp;
|
|
|
|
constexpr size_t kMtu = 1000;
|
|
constexpr size_t kPayloadSize = 4;
|
|
constexpr Timestamp kNow = Timestamp::Zero();
|
|
|
|
MATCHER_P(HasDataWithMid, mid, "") {
|
|
if (!arg.has_value()) {
|
|
*result_listener << "There was no produced data";
|
|
return false;
|
|
}
|
|
|
|
if (arg->data.mid != mid) {
|
|
*result_listener << "the produced data had mid " << *arg->data.mid
|
|
<< " and not the expected " << *mid;
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
std::function<std::optional<SendQueue::DataToSend>(Timestamp, size_t)>
|
|
CreateChunk(OutgoingMessageId message_id,
|
|
StreamID sid,
|
|
MID mid,
|
|
size_t payload_size = kPayloadSize) {
|
|
return [sid, mid, payload_size, message_id](Timestamp /* now */,
|
|
size_t /* max_size */) {
|
|
return SendQueue::DataToSend(
|
|
message_id,
|
|
Data(sid, SSN(0), mid, FSN(0), PPID(42),
|
|
std::vector<uint8_t>(payload_size), Data::IsBeginning(true),
|
|
Data::IsEnd(true), IsUnordered(true)));
|
|
};
|
|
}
|
|
|
|
std::map<StreamID, size_t> GetPacketCounts(StreamScheduler& scheduler,
|
|
size_t packets_to_generate) {
|
|
std::map<StreamID, size_t> packet_counts;
|
|
for (size_t i = 0; i < packets_to_generate; ++i) {
|
|
std::optional<SendQueue::DataToSend> data = scheduler.Produce(kNow, kMtu);
|
|
if (data.has_value()) {
|
|
++packet_counts[data->data.stream_id];
|
|
}
|
|
}
|
|
return packet_counts;
|
|
}
|
|
|
|
class MockStreamProducer : public StreamScheduler::StreamProducer {
|
|
public:
|
|
MOCK_METHOD(std::optional<SendQueue::DataToSend>,
|
|
Produce,
|
|
(Timestamp, size_t),
|
|
(override));
|
|
MOCK_METHOD(size_t, bytes_to_send_in_next_message, (), (const, override));
|
|
};
|
|
|
|
class TestStream {
|
|
public:
|
|
TestStream(StreamScheduler& scheduler,
|
|
StreamID stream_id,
|
|
StreamPriority priority,
|
|
size_t packet_size = kPayloadSize) {
|
|
EXPECT_CALL(producer_, Produce)
|
|
.WillRepeatedly(
|
|
CreateChunk(OutgoingMessageId(0), stream_id, MID(0), packet_size));
|
|
EXPECT_CALL(producer_, bytes_to_send_in_next_message)
|
|
.WillRepeatedly(Return(packet_size));
|
|
stream_ = scheduler.CreateStream(&producer_, stream_id, priority);
|
|
stream_->MaybeMakeActive();
|
|
}
|
|
|
|
StreamScheduler::Stream& stream() { return *stream_; }
|
|
|
|
private:
|
|
StrictMock<MockStreamProducer> producer_;
|
|
std::unique_ptr<StreamScheduler::Stream> stream_;
|
|
};
|
|
|
|
// A scheduler without active streams doesn't produce data.
|
|
TEST(StreamSchedulerTest, HasNoActiveStreams) {
|
|
StreamScheduler scheduler("", kMtu);
|
|
|
|
EXPECT_EQ(scheduler.Produce(kNow, kMtu), std::nullopt);
|
|
}
|
|
|
|
// Stream properties can be set and retrieved
|
|
TEST(StreamSchedulerTest, CanSetAndGetStreamProperties) {
|
|
StreamScheduler scheduler("", kMtu);
|
|
|
|
StrictMock<MockStreamProducer> producer;
|
|
auto stream =
|
|
scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2));
|
|
|
|
EXPECT_EQ(stream->stream_id(), StreamID(1));
|
|
EXPECT_EQ(stream->priority(), StreamPriority(2));
|
|
|
|
stream->SetPriority(StreamPriority(0));
|
|
EXPECT_EQ(stream->priority(), StreamPriority(0));
|
|
}
|
|
|
|
// A scheduler with a single stream produced packets from it.
|
|
TEST(StreamSchedulerTest, CanProduceFromSingleStream) {
|
|
StreamScheduler scheduler("", kMtu);
|
|
|
|
StrictMock<MockStreamProducer> producer;
|
|
EXPECT_CALL(producer, Produce)
|
|
.WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(0)));
|
|
EXPECT_CALL(producer, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(kPayloadSize)) // When making active
|
|
.WillOnce(Return(0));
|
|
auto stream =
|
|
scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2));
|
|
stream->MaybeMakeActive();
|
|
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(0)));
|
|
EXPECT_EQ(scheduler.Produce(kNow, kMtu), std::nullopt);
|
|
}
|
|
|
|
// Switches between two streams after every packet.
|
|
TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) {
|
|
StreamScheduler scheduler("", kMtu);
|
|
|
|
StrictMock<MockStreamProducer> producer1;
|
|
EXPECT_CALL(producer1, Produce)
|
|
.WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(100)))
|
|
.WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(1), MID(101)))
|
|
.WillOnce(CreateChunk(OutgoingMessageId(2), StreamID(1), MID(102)));
|
|
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(kPayloadSize)) // When making active
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(0));
|
|
auto stream1 =
|
|
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
|
|
stream1->MaybeMakeActive();
|
|
|
|
StrictMock<MockStreamProducer> producer2;
|
|
EXPECT_CALL(producer2, Produce)
|
|
.WillOnce(CreateChunk(OutgoingMessageId(4), StreamID(2), MID(200)))
|
|
.WillOnce(CreateChunk(OutgoingMessageId(5), StreamID(2), MID(201)))
|
|
.WillOnce(CreateChunk(OutgoingMessageId(6), StreamID(2), MID(202)));
|
|
EXPECT_CALL(producer2, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(kPayloadSize)) // When making active
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(0));
|
|
auto stream2 =
|
|
scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
|
|
stream2->MaybeMakeActive();
|
|
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100)));
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200)));
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101)));
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201)));
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102)));
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202)));
|
|
EXPECT_EQ(scheduler.Produce(kNow, kMtu), std::nullopt);
|
|
}
|
|
|
|
// Switches between two streams after every packet, but keeps producing from the
|
|
// same stream when a packet contains of multiple fragments.
|
|
TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) {
|
|
StreamScheduler scheduler("", kMtu);
|
|
|
|
StrictMock<MockStreamProducer> producer1;
|
|
EXPECT_CALL(producer1, Produce)
|
|
.WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(100)))
|
|
.WillOnce([](...) {
|
|
return SendQueue::DataToSend(
|
|
OutgoingMessageId(1),
|
|
Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42),
|
|
std::vector<uint8_t>(4), Data::IsBeginning(true),
|
|
Data::IsEnd(false), IsUnordered(true)));
|
|
})
|
|
.WillOnce([](...) {
|
|
return SendQueue::DataToSend(
|
|
OutgoingMessageId(1),
|
|
Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42),
|
|
std::vector<uint8_t>(4), Data::IsBeginning(false),
|
|
Data::IsEnd(false), IsUnordered(true)));
|
|
})
|
|
.WillOnce([](...) {
|
|
return SendQueue::DataToSend(
|
|
OutgoingMessageId(1),
|
|
Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42),
|
|
std::vector<uint8_t>(4), Data::IsBeginning(false),
|
|
Data::IsEnd(true), IsUnordered(true)));
|
|
})
|
|
.WillOnce(CreateChunk(OutgoingMessageId(2), StreamID(1), MID(102)));
|
|
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(kPayloadSize)) // When making active
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(0));
|
|
auto stream1 =
|
|
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
|
|
stream1->MaybeMakeActive();
|
|
|
|
StrictMock<MockStreamProducer> producer2;
|
|
EXPECT_CALL(producer2, Produce)
|
|
.WillOnce(CreateChunk(OutgoingMessageId(3), StreamID(2), MID(200)))
|
|
.WillOnce(CreateChunk(OutgoingMessageId(4), StreamID(2), MID(201)))
|
|
.WillOnce(CreateChunk(OutgoingMessageId(5), StreamID(2), MID(202)));
|
|
EXPECT_CALL(producer2, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(kPayloadSize)) // When making active
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(0));
|
|
auto stream2 =
|
|
scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
|
|
stream2->MaybeMakeActive();
|
|
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100)));
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200)));
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101)));
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101)));
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101)));
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201)));
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102)));
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202)));
|
|
EXPECT_EQ(scheduler.Produce(kNow, kMtu), std::nullopt);
|
|
}
|
|
|
|
// Deactivates a stream before it has finished producing all packets.
|
|
TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) {
|
|
StreamScheduler scheduler("", kMtu);
|
|
|
|
StrictMock<MockStreamProducer> producer1;
|
|
EXPECT_CALL(producer1, Produce)
|
|
.WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(100)))
|
|
.WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(1), MID(101)));
|
|
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(kPayloadSize)) // When making active
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(kPayloadSize)); // hints that there is a MID(2) coming.
|
|
auto stream1 =
|
|
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
|
|
stream1->MaybeMakeActive();
|
|
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100)));
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101)));
|
|
|
|
// ... but the stream is made inactive before it can be produced.
|
|
stream1->MakeInactive();
|
|
EXPECT_EQ(scheduler.Produce(kNow, kMtu), std::nullopt);
|
|
}
|
|
|
|
// Resumes a paused stream - makes a stream active after inactivating it.
|
|
TEST(StreamSchedulerTest, SingleStreamCanBeResumed) {
|
|
StreamScheduler scheduler("", kMtu);
|
|
|
|
StrictMock<MockStreamProducer> producer1;
|
|
// Callbacks are setup so that they hint that there is a MID(2) coming...
|
|
EXPECT_CALL(producer1, Produce)
|
|
.WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(100)))
|
|
.WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(1), MID(101)))
|
|
.WillOnce(CreateChunk(OutgoingMessageId(2), StreamID(1), MID(102)));
|
|
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(kPayloadSize)) // When making active
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(kPayloadSize)) // When making active again
|
|
.WillOnce(Return(0));
|
|
auto stream1 =
|
|
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
|
|
stream1->MaybeMakeActive();
|
|
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100)));
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101)));
|
|
|
|
stream1->MakeInactive();
|
|
EXPECT_EQ(scheduler.Produce(kNow, kMtu), std::nullopt);
|
|
stream1->MaybeMakeActive();
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102)));
|
|
EXPECT_EQ(scheduler.Produce(kNow, kMtu), std::nullopt);
|
|
}
|
|
|
|
// Iterates between streams, where one is suddenly paused and later resumed.
|
|
TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) {
|
|
StreamScheduler scheduler("", kMtu);
|
|
|
|
StrictMock<MockStreamProducer> producer1;
|
|
EXPECT_CALL(producer1, Produce)
|
|
.WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(100)))
|
|
.WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(1), MID(101)))
|
|
.WillOnce(CreateChunk(OutgoingMessageId(2), StreamID(1), MID(102)));
|
|
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(kPayloadSize)) // When making active
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(kPayloadSize)) // When making active
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(0));
|
|
auto stream1 =
|
|
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
|
|
stream1->MaybeMakeActive();
|
|
|
|
StrictMock<MockStreamProducer> producer2;
|
|
EXPECT_CALL(producer2, Produce)
|
|
.WillOnce(CreateChunk(OutgoingMessageId(3), StreamID(2), MID(200)))
|
|
.WillOnce(CreateChunk(OutgoingMessageId(4), StreamID(2), MID(201)))
|
|
.WillOnce(CreateChunk(OutgoingMessageId(5), StreamID(2), MID(202)));
|
|
EXPECT_CALL(producer2, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(kPayloadSize)) // When making active
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(0));
|
|
auto stream2 =
|
|
scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
|
|
stream2->MaybeMakeActive();
|
|
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100)));
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200)));
|
|
stream1->MakeInactive();
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201)));
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202)));
|
|
stream1->MaybeMakeActive();
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101)));
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102)));
|
|
EXPECT_EQ(scheduler.Produce(kNow, kMtu), std::nullopt);
|
|
}
|
|
|
|
// Verifies that packet counts are evenly distributed in round robin scheduling.
|
|
TEST(StreamSchedulerTest, WillDistributeRoundRobinPacketsEvenlyTwoStreams) {
|
|
StreamScheduler scheduler("", kMtu);
|
|
TestStream stream1(scheduler, StreamID(1), StreamPriority(1));
|
|
TestStream stream2(scheduler, StreamID(2), StreamPriority(1));
|
|
|
|
std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 10);
|
|
EXPECT_EQ(packet_counts[StreamID(1)], 5U);
|
|
EXPECT_EQ(packet_counts[StreamID(2)], 5U);
|
|
}
|
|
|
|
// Verifies that packet counts are evenly distributed among active streams,
|
|
// where a stream is suddenly made inactive, two are added, and then the paused
|
|
// stream is resumed.
|
|
TEST(StreamSchedulerTest, WillDistributeEvenlyWithPausedAndAddedStreams) {
|
|
StreamScheduler scheduler("", kMtu);
|
|
TestStream stream1(scheduler, StreamID(1), StreamPriority(1));
|
|
TestStream stream2(scheduler, StreamID(2), StreamPriority(1));
|
|
|
|
std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 10);
|
|
EXPECT_EQ(packet_counts[StreamID(1)], 5U);
|
|
EXPECT_EQ(packet_counts[StreamID(2)], 5U);
|
|
|
|
stream2.stream().MakeInactive();
|
|
|
|
TestStream stream3(scheduler, StreamID(3), StreamPriority(1));
|
|
TestStream stream4(scheduler, StreamID(4), StreamPriority(1));
|
|
|
|
std::map<StreamID, size_t> counts2 = GetPacketCounts(scheduler, 15);
|
|
EXPECT_EQ(counts2[StreamID(1)], 5U);
|
|
EXPECT_EQ(counts2[StreamID(2)], 0U);
|
|
EXPECT_EQ(counts2[StreamID(3)], 5U);
|
|
EXPECT_EQ(counts2[StreamID(4)], 5U);
|
|
|
|
stream2.stream().MaybeMakeActive();
|
|
|
|
std::map<StreamID, size_t> counts3 = GetPacketCounts(scheduler, 20);
|
|
EXPECT_EQ(counts3[StreamID(1)], 5U);
|
|
EXPECT_EQ(counts3[StreamID(2)], 5U);
|
|
EXPECT_EQ(counts3[StreamID(3)], 5U);
|
|
EXPECT_EQ(counts3[StreamID(4)], 5U);
|
|
}
|
|
|
|
// Degrades to fair queuing with streams having identical priority.
|
|
TEST(StreamSchedulerTest, WillDoFairQueuingWithSamePriority) {
|
|
StreamScheduler scheduler("", kMtu);
|
|
scheduler.EnableMessageInterleaving(true);
|
|
|
|
constexpr size_t kSmallPacket = 30;
|
|
constexpr size_t kLargePacket = 70;
|
|
|
|
StrictMock<MockStreamProducer> callback1;
|
|
EXPECT_CALL(callback1, Produce)
|
|
.WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(100),
|
|
kSmallPacket))
|
|
.WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(1), MID(101),
|
|
kSmallPacket))
|
|
.WillOnce(CreateChunk(OutgoingMessageId(2), StreamID(1), MID(102),
|
|
kSmallPacket));
|
|
EXPECT_CALL(callback1, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(kSmallPacket)) // When making active
|
|
.WillOnce(Return(kSmallPacket))
|
|
.WillOnce(Return(kSmallPacket))
|
|
.WillOnce(Return(0));
|
|
auto stream1 =
|
|
scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
|
|
stream1->MaybeMakeActive();
|
|
|
|
StrictMock<MockStreamProducer> callback2;
|
|
EXPECT_CALL(callback2, Produce)
|
|
.WillOnce(CreateChunk(OutgoingMessageId(3), StreamID(2), MID(200),
|
|
kLargePacket))
|
|
.WillOnce(CreateChunk(OutgoingMessageId(4), StreamID(2), MID(201),
|
|
kLargePacket))
|
|
.WillOnce(CreateChunk(OutgoingMessageId(5), StreamID(2), MID(202),
|
|
kLargePacket));
|
|
EXPECT_CALL(callback2, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(kLargePacket)) // When making active
|
|
.WillOnce(Return(kLargePacket))
|
|
.WillOnce(Return(kLargePacket))
|
|
.WillOnce(Return(0));
|
|
auto stream2 =
|
|
scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2));
|
|
stream2->MaybeMakeActive();
|
|
|
|
// t = 30
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100)));
|
|
// t = 60
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101)));
|
|
// t = 70
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200)));
|
|
// t = 90
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102)));
|
|
// t = 140
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201)));
|
|
// t = 210
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202)));
|
|
EXPECT_EQ(scheduler.Produce(kNow, kMtu), std::nullopt);
|
|
}
|
|
|
|
// Will do weighted fair queuing with three streams having different priority.
|
|
TEST(StreamSchedulerTest, WillDoWeightedFairQueuingSameSizeDifferentPriority) {
|
|
StreamScheduler scheduler("", kMtu);
|
|
scheduler.EnableMessageInterleaving(true);
|
|
|
|
StrictMock<MockStreamProducer> callback1;
|
|
EXPECT_CALL(callback1, Produce)
|
|
.WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(100)))
|
|
.WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(1), MID(101)))
|
|
.WillOnce(CreateChunk(OutgoingMessageId(2), StreamID(1), MID(102)));
|
|
EXPECT_CALL(callback1, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(kPayloadSize)) // When making active
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(0));
|
|
// Priority 125 -> allowed to produce every 1000/125 ~= 80 time units.
|
|
auto stream1 =
|
|
scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(125));
|
|
stream1->MaybeMakeActive();
|
|
|
|
StrictMock<MockStreamProducer> callback2;
|
|
EXPECT_CALL(callback2, Produce)
|
|
.WillOnce(CreateChunk(OutgoingMessageId(3), StreamID(2), MID(200)))
|
|
.WillOnce(CreateChunk(OutgoingMessageId(4), StreamID(2), MID(201)))
|
|
.WillOnce(CreateChunk(OutgoingMessageId(5), StreamID(2), MID(202)));
|
|
EXPECT_CALL(callback2, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(kPayloadSize)) // When making active
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(0));
|
|
// Priority 200 -> allowed to produce every 1000/200 ~= 50 time units.
|
|
auto stream2 =
|
|
scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(200));
|
|
stream2->MaybeMakeActive();
|
|
|
|
StrictMock<MockStreamProducer> callback3;
|
|
EXPECT_CALL(callback3, Produce)
|
|
.WillOnce(CreateChunk(OutgoingMessageId(6), StreamID(3), MID(300)))
|
|
.WillOnce(CreateChunk(OutgoingMessageId(7), StreamID(3), MID(301)))
|
|
.WillOnce(CreateChunk(OutgoingMessageId(8), StreamID(3), MID(302)));
|
|
EXPECT_CALL(callback3, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(kPayloadSize)) // When making active
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(0));
|
|
// Priority 500 -> allowed to produce every 1000/500 ~= 20 time units.
|
|
auto stream3 =
|
|
scheduler.CreateStream(&callback3, StreamID(3), StreamPriority(500));
|
|
stream3->MaybeMakeActive();
|
|
|
|
// t ~= 20
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(300)));
|
|
// t ~= 40
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(301)));
|
|
// t ~= 50
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200)));
|
|
// t ~= 60
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(302)));
|
|
// t ~= 80
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100)));
|
|
// t ~= 100
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201)));
|
|
// t ~= 150
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202)));
|
|
// t ~= 160
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101)));
|
|
// t ~= 240
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102)));
|
|
EXPECT_EQ(scheduler.Produce(kNow, kMtu), std::nullopt);
|
|
}
|
|
|
|
// Will do weighted fair queuing with three streams having different priority
|
|
// and sending different payload sizes.
|
|
TEST(StreamSchedulerTest, WillDoWeightedFairQueuingDifferentSizeAndPriority) {
|
|
StreamScheduler scheduler("", kMtu);
|
|
scheduler.EnableMessageInterleaving(true);
|
|
|
|
constexpr size_t kSmallPacket = 20;
|
|
constexpr size_t kMediumPacket = 50;
|
|
constexpr size_t kLargePacket = 70;
|
|
|
|
// Stream with priority = 125 -> inverse weight ~=80
|
|
StrictMock<MockStreamProducer> callback1;
|
|
EXPECT_CALL(callback1, Produce)
|
|
// virtual finish time ~ 0 + 50 * 80 = 4000
|
|
.WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(100),
|
|
kMediumPacket))
|
|
// virtual finish time ~ 4000 + 20 * 80 = 5600
|
|
.WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(1), MID(101),
|
|
kSmallPacket))
|
|
// virtual finish time ~ 5600 + 70 * 80 = 11200
|
|
.WillOnce(CreateChunk(OutgoingMessageId(2), StreamID(1), MID(102),
|
|
kLargePacket));
|
|
EXPECT_CALL(callback1, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(kMediumPacket)) // When making active
|
|
.WillOnce(Return(kSmallPacket))
|
|
.WillOnce(Return(kLargePacket))
|
|
.WillOnce(Return(0));
|
|
auto stream1 =
|
|
scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(125));
|
|
stream1->MaybeMakeActive();
|
|
|
|
// Stream with priority = 200 -> inverse weight ~=50
|
|
StrictMock<MockStreamProducer> callback2;
|
|
EXPECT_CALL(callback2, Produce)
|
|
// virtual finish time ~ 0 + 50 * 50 = 2500
|
|
.WillOnce(CreateChunk(OutgoingMessageId(3), StreamID(2), MID(200),
|
|
kMediumPacket))
|
|
// virtual finish time ~ 2500 + 70 * 50 = 6000
|
|
.WillOnce(CreateChunk(OutgoingMessageId(4), StreamID(2), MID(201),
|
|
kLargePacket))
|
|
// virtual finish time ~ 6000 + 20 * 50 = 7000
|
|
.WillOnce(CreateChunk(OutgoingMessageId(5), StreamID(2), MID(202),
|
|
kSmallPacket));
|
|
EXPECT_CALL(callback2, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(kMediumPacket)) // When making active
|
|
.WillOnce(Return(kLargePacket))
|
|
.WillOnce(Return(kSmallPacket))
|
|
.WillOnce(Return(0));
|
|
auto stream2 =
|
|
scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(200));
|
|
stream2->MaybeMakeActive();
|
|
|
|
// Stream with priority = 500 -> inverse weight ~=20
|
|
StrictMock<MockStreamProducer> callback3;
|
|
EXPECT_CALL(callback3, Produce)
|
|
// virtual finish time ~ 0 + 20 * 20 = 400
|
|
.WillOnce(CreateChunk(OutgoingMessageId(6), StreamID(3), MID(300),
|
|
kSmallPacket))
|
|
// virtual finish time ~ 400 + 50 * 20 = 1400
|
|
.WillOnce(CreateChunk(OutgoingMessageId(7), StreamID(3), MID(301),
|
|
kMediumPacket))
|
|
// virtual finish time ~ 1400 + 70 * 20 = 2800
|
|
.WillOnce(CreateChunk(OutgoingMessageId(8), StreamID(3), MID(302),
|
|
kLargePacket));
|
|
EXPECT_CALL(callback3, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(kSmallPacket)) // When making active
|
|
.WillOnce(Return(kMediumPacket))
|
|
.WillOnce(Return(kLargePacket))
|
|
.WillOnce(Return(0));
|
|
auto stream3 =
|
|
scheduler.CreateStream(&callback3, StreamID(3), StreamPriority(500));
|
|
stream3->MaybeMakeActive();
|
|
|
|
// t ~= 400
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(300)));
|
|
// t ~= 1400
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(301)));
|
|
// t ~= 2500
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200)));
|
|
// t ~= 2800
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(302)));
|
|
// t ~= 4000
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100)));
|
|
// t ~= 5600
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101)));
|
|
// t ~= 6000
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201)));
|
|
// t ~= 7000
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202)));
|
|
// t ~= 11200
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102)));
|
|
EXPECT_EQ(scheduler.Produce(kNow, kMtu), std::nullopt);
|
|
}
|
|
TEST(StreamSchedulerTest, WillDistributeWFQPacketsInTwoStreamsByPriority) {
|
|
// A simple test with two streams of different priority, but sending packets
|
|
// of identical size. Verifies that the ratio of sent packets represent their
|
|
// priority.
|
|
StreamScheduler scheduler("", kMtu);
|
|
scheduler.EnableMessageInterleaving(true);
|
|
|
|
TestStream stream1(scheduler, StreamID(1), StreamPriority(100), kPayloadSize);
|
|
TestStream stream2(scheduler, StreamID(2), StreamPriority(200), kPayloadSize);
|
|
|
|
std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 15);
|
|
EXPECT_EQ(packet_counts[StreamID(1)], 5U);
|
|
EXPECT_EQ(packet_counts[StreamID(2)], 10U);
|
|
}
|
|
|
|
TEST(StreamSchedulerTest, WillDistributeWFQPacketsInFourStreamsByPriority) {
|
|
// Same as `WillDistributeWFQPacketsInTwoStreamsByPriority` but with more
|
|
// streams.
|
|
StreamScheduler scheduler("", kMtu);
|
|
scheduler.EnableMessageInterleaving(true);
|
|
|
|
TestStream stream1(scheduler, StreamID(1), StreamPriority(100), kPayloadSize);
|
|
TestStream stream2(scheduler, StreamID(2), StreamPriority(200), kPayloadSize);
|
|
TestStream stream3(scheduler, StreamID(3), StreamPriority(300), kPayloadSize);
|
|
TestStream stream4(scheduler, StreamID(4), StreamPriority(400), kPayloadSize);
|
|
|
|
std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 50);
|
|
EXPECT_EQ(packet_counts[StreamID(1)], 5U);
|
|
EXPECT_EQ(packet_counts[StreamID(2)], 10U);
|
|
EXPECT_EQ(packet_counts[StreamID(3)], 15U);
|
|
EXPECT_EQ(packet_counts[StreamID(4)], 20U);
|
|
}
|
|
|
|
TEST(StreamSchedulerTest, WillDistributeFromTwoStreamsFairly) {
|
|
// A simple test with two streams of different priority, but sending packets
|
|
// of different size. Verifies that the ratio of total packet payload
|
|
// represent their priority.
|
|
// In this example,
|
|
// * stream1 has priority 100 and sends packets of size 8
|
|
// * stream2 has priority 400 and sends packets of size 4
|
|
// With round robin, stream1 would get twice as many payload bytes on the wire
|
|
// as stream2, but with WFQ and a 4x priority increase, stream2 should 4x as
|
|
// many payload bytes on the wire. That translates to stream2 getting 8x as
|
|
// many packets on the wire as they are half as large.
|
|
StreamScheduler scheduler("", kMtu);
|
|
// Enable WFQ scheduler.
|
|
scheduler.EnableMessageInterleaving(true);
|
|
|
|
TestStream stream1(scheduler, StreamID(1), StreamPriority(100),
|
|
/*packet_size=*/8);
|
|
TestStream stream2(scheduler, StreamID(2), StreamPriority(400),
|
|
/*packet_size=*/4);
|
|
|
|
std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 90);
|
|
EXPECT_EQ(packet_counts[StreamID(1)], 10U);
|
|
EXPECT_EQ(packet_counts[StreamID(2)], 80U);
|
|
}
|
|
|
|
TEST(StreamSchedulerTest, WillDistributeFromFourStreamsFairly) {
|
|
// Same as `WillDistributeWeightedFairFromTwoStreamsFairly` but more
|
|
// complicated.
|
|
StreamScheduler scheduler("", kMtu);
|
|
// Enable WFQ scheduler.
|
|
scheduler.EnableMessageInterleaving(true);
|
|
|
|
TestStream stream1(scheduler, StreamID(1), StreamPriority(100),
|
|
/*packet_size=*/10);
|
|
TestStream stream2(scheduler, StreamID(2), StreamPriority(200),
|
|
/*packet_size=*/10);
|
|
TestStream stream3(scheduler, StreamID(3), StreamPriority(200),
|
|
/*packet_size=*/20);
|
|
TestStream stream4(scheduler, StreamID(4), StreamPriority(400),
|
|
/*packet_size=*/30);
|
|
|
|
std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 80);
|
|
// 15 packets * 10 bytes = 150 bytes at priority 100.
|
|
EXPECT_EQ(packet_counts[StreamID(1)], 15U);
|
|
// 30 packets * 10 bytes = 300 bytes at priority 200.
|
|
EXPECT_EQ(packet_counts[StreamID(2)], 30U);
|
|
// 15 packets * 20 bytes = 300 bytes at priority 200.
|
|
EXPECT_EQ(packet_counts[StreamID(3)], 15U);
|
|
// 20 packets * 30 bytes = 600 bytes at priority 400.
|
|
EXPECT_EQ(packet_counts[StreamID(4)], 20U);
|
|
}
|
|
|
|
// Sending large messages with small MTU will fragment the messages and produce
|
|
// a first fragment not larger than the MTU, and will then not first send from
|
|
// the stream with the smallest message, as their first fragment will be equally
|
|
// small for both streams. See `LargeMessageWithLargeMtu` for the same test, but
|
|
// with a larger MTU.
|
|
TEST(StreamSchedulerTest, SendLargeMessageWithSmallMtu) {
|
|
StreamScheduler scheduler(
|
|
"", 100 + SctpPacket::kHeaderSize + IDataChunk::kHeaderSize);
|
|
scheduler.EnableMessageInterleaving(true);
|
|
|
|
StrictMock<MockStreamProducer> producer1;
|
|
EXPECT_CALL(producer1, Produce)
|
|
.WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(0), 100))
|
|
.WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(1), MID(0), 100));
|
|
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(200)) // When making active
|
|
.WillOnce(Return(100))
|
|
.WillOnce(Return(0));
|
|
auto stream1 =
|
|
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(1));
|
|
stream1->MaybeMakeActive();
|
|
|
|
StrictMock<MockStreamProducer> producer2;
|
|
EXPECT_CALL(producer2, Produce)
|
|
.WillOnce(CreateChunk(OutgoingMessageId(2), StreamID(2), MID(1), 100))
|
|
.WillOnce(CreateChunk(OutgoingMessageId(3), StreamID(2), MID(1), 50));
|
|
EXPECT_CALL(producer2, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(150)) // When making active
|
|
.WillOnce(Return(50))
|
|
.WillOnce(Return(0));
|
|
auto stream2 =
|
|
scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(1));
|
|
stream2->MaybeMakeActive();
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(0)));
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(1)));
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(1)));
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(0)));
|
|
EXPECT_EQ(scheduler.Produce(kNow, kMtu), std::nullopt);
|
|
}
|
|
|
|
// Sending large messages with large MTU will not fragment messages and will
|
|
// send the message first from the stream that has the smallest message.
|
|
TEST(StreamSchedulerTest, SendLargeMessageWithLargeMtu) {
|
|
StreamScheduler scheduler(
|
|
"", 200 + SctpPacket::kHeaderSize + IDataChunk::kHeaderSize);
|
|
scheduler.EnableMessageInterleaving(true);
|
|
|
|
StrictMock<MockStreamProducer> producer1;
|
|
EXPECT_CALL(producer1, Produce)
|
|
.WillOnce(CreateChunk(OutgoingMessageId(0), StreamID(1), MID(0), 200));
|
|
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(200)) // When making active
|
|
.WillOnce(Return(0));
|
|
auto stream1 =
|
|
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(1));
|
|
stream1->MaybeMakeActive();
|
|
|
|
StrictMock<MockStreamProducer> producer2;
|
|
EXPECT_CALL(producer2, Produce)
|
|
.WillOnce(CreateChunk(OutgoingMessageId(1), StreamID(2), MID(1), 150));
|
|
EXPECT_CALL(producer2, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(150)) // When making active
|
|
.WillOnce(Return(0));
|
|
auto stream2 =
|
|
scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(1));
|
|
stream2->MaybeMakeActive();
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(1)));
|
|
EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(0)));
|
|
EXPECT_EQ(scheduler.Produce(kNow, kMtu), std::nullopt);
|
|
}
|
|
|
|
} // namespace
|
|
} // namespace dcsctp
|