dcsctp: Add a fastpath for interleaved reassembly

The same as https://webrtc-review.googlesource.com/c/src/+/331340, but
for interleaved messages.

This avoids inserting into maps where possible, and also fixes a bug
when the payload was accidentally copied unintentionally -
crbug.com/365594101.

Bug: chromium:365594101
Change-Id: Iaeaa97b0cf3a26ada9afc61f2545760b7ab4c731
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/363960
Reviewed-by: Florent Castelli <orphis@webrtc.org>
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#43099}
This commit is contained in:
Victor Boivie 2024-09-27 12:09:07 +02:00 committed by WebRTC LUCI CQ
parent ac4998de22
commit db54ea73e3
4 changed files with 148 additions and 11 deletions

View File

@ -37,14 +37,13 @@ InterleavedReassemblyStreams::InterleavedReassemblyStreams(
size_t InterleavedReassemblyStreams::Stream::TryToAssembleMessage( size_t InterleavedReassemblyStreams::Stream::TryToAssembleMessage(
UnwrappedMID mid) { UnwrappedMID mid) {
std::map<UnwrappedMID, ChunkMap>::const_iterator it = std::map<UnwrappedMID, ChunkMap>::iterator it = chunks_by_mid_.find(mid);
chunks_by_mid_.find(mid);
if (it == chunks_by_mid_.end()) { if (it == chunks_by_mid_.end()) {
RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage " RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage "
<< *mid.Wrap() << " - no chunks"; << *mid.Wrap() << " - no chunks";
return 0; return 0;
} }
const ChunkMap& chunks = it->second; ChunkMap& chunks = it->second;
if (!chunks.begin()->second.second.is_beginning || if (!chunks.begin()->second.second.is_beginning ||
!chunks.rbegin()->second.second.is_end) { !chunks.rbegin()->second.second.is_end) {
RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage " RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage "
@ -69,17 +68,22 @@ size_t InterleavedReassemblyStreams::Stream::TryToAssembleMessage(
return removed_bytes; return removed_bytes;
} }
size_t InterleavedReassemblyStreams::Stream::AssembleMessage( size_t InterleavedReassemblyStreams::Stream::AssembleMessage(UnwrappedTSN tsn,
const ChunkMap& tsn_chunks) { Data data) {
size_t count = tsn_chunks.size();
if (count == 1) {
// Fast path - zero-copy
const Data& data = tsn_chunks.begin()->second.second;
size_t payload_size = data.size(); size_t payload_size = data.size();
UnwrappedTSN tsns[1] = {tsn_chunks.begin()->second.first}; UnwrappedTSN tsns[1] = {tsn};
DcSctpMessage message(data.stream_id, data.ppid, std::move(data.payload)); DcSctpMessage message(data.stream_id, data.ppid, std::move(data.payload));
parent_.on_assembled_message_(tsns, std::move(message)); parent_.on_assembled_message_(tsns, std::move(message));
return payload_size; return payload_size;
}
size_t InterleavedReassemblyStreams::Stream::AssembleMessage(
ChunkMap& tsn_chunks) {
size_t count = tsn_chunks.size();
if (count == 1) {
// Fast path - zero-copy
return AssembleMessage(tsn_chunks.begin()->second.first,
std::move(tsn_chunks.begin()->second.second));
} }
// Slow path - will need to concatenate the payload. // Slow path - will need to concatenate the payload.
@ -137,6 +141,21 @@ int InterleavedReassemblyStreams::Stream::Add(UnwrappedTSN tsn, Data data) {
int queued_bytes = data.size(); int queued_bytes = data.size();
UnwrappedMID mid = mid_unwrapper_.Unwrap(data.mid); UnwrappedMID mid = mid_unwrapper_.Unwrap(data.mid);
FSN fsn = data.fsn; FSN fsn = data.fsn;
// Avoid inserting it into any map if it can be delivered directly.
if (stream_id_.unordered && data.is_beginning && data.is_end) {
AssembleMessage(tsn, std::move(data));
return 0;
} else if (!stream_id_.unordered && mid == next_mid_ && data.is_beginning &&
data.is_end) {
AssembleMessage(tsn, std::move(data));
next_mid_.Increment();
// This might unblock assembling more messages.
return -TryToAssembleMessages();
}
// Slow path.
auto [unused, inserted] = auto [unused, inserted] =
chunks_by_mid_[mid].emplace(fsn, std::make_pair(tsn, std::move(data))); chunks_by_mid_[mid].emplace(fsn, std::make_pair(tsn, std::move(data)));
if (!inserted) { if (!inserted) {

View File

@ -81,7 +81,9 @@ class InterleavedReassemblyStreams : public ReassemblyStreams {
// Try to assemble one message identified by `mid`. // Try to assemble one message identified by `mid`.
// Returns the number of bytes assembled if a message was assembled. // Returns the number of bytes assembled if a message was assembled.
size_t TryToAssembleMessage(UnwrappedMID mid); size_t TryToAssembleMessage(UnwrappedMID mid);
size_t AssembleMessage(const ChunkMap& tsn_chunks); size_t AssembleMessage(ChunkMap& tsn_chunks);
size_t AssembleMessage(UnwrappedTSN tsn, Data data);
// Try to assemble one or several messages in order from the stream. // Try to assemble one or several messages in order from the stream.
// Returns the number of bytes assembled if one or more messages were // Returns the number of bytes assembled if one or more messages were
// assembled. // assembled.

View File

@ -24,8 +24,10 @@
namespace dcsctp { namespace dcsctp {
namespace { namespace {
using ::testing::ElementsAre;
using ::testing::MockFunction; using ::testing::MockFunction;
using ::testing::NiceMock; using ::testing::NiceMock;
using ::testing::Property;
class InterleavedReassemblyStreamsTest : public testing::Test { class InterleavedReassemblyStreamsTest : public testing::Test {
protected: protected:
@ -150,5 +152,62 @@ TEST_F(InterleavedReassemblyStreamsTest,
EXPECT_EQ(streams.HandleForwardTsn(tsn(4), skipped), 8u); EXPECT_EQ(streams.HandleForwardTsn(tsn(4), skipped), 8u);
} }
TEST_F(InterleavedReassemblyStreamsTest, CanReassembleFastPathUnordered) {
NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
{
testing::InSequence s;
EXPECT_CALL(on_assembled,
Call(ElementsAre(tsn(1)),
Property(&DcSctpMessage::payload, ElementsAre(1))));
EXPECT_CALL(on_assembled,
Call(ElementsAre(tsn(3)),
Property(&DcSctpMessage::payload, ElementsAre(3))));
EXPECT_CALL(on_assembled,
Call(ElementsAre(tsn(2)),
Property(&DcSctpMessage::payload, ElementsAre(2))));
EXPECT_CALL(on_assembled,
Call(ElementsAre(tsn(4)),
Property(&DcSctpMessage::payload, ElementsAre(4))));
}
InterleavedReassemblyStreams streams("", on_assembled.AsStdFunction());
EXPECT_EQ(streams.Add(tsn(1), gen_.Unordered({1}, "BE")), 0);
EXPECT_EQ(streams.Add(tsn(3), gen_.Unordered({3}, "BE")), 0);
EXPECT_EQ(streams.Add(tsn(2), gen_.Unordered({2}, "BE")), 0);
EXPECT_EQ(streams.Add(tsn(4), gen_.Unordered({4}, "BE")), 0);
}
TEST_F(InterleavedReassemblyStreamsTest, CanReassembleFastPathOrdered) {
NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
{
testing::InSequence s;
EXPECT_CALL(on_assembled,
Call(ElementsAre(tsn(1)),
Property(&DcSctpMessage::payload, ElementsAre(1))));
EXPECT_CALL(on_assembled,
Call(ElementsAre(tsn(2)),
Property(&DcSctpMessage::payload, ElementsAre(2))));
EXPECT_CALL(on_assembled,
Call(ElementsAre(tsn(3)),
Property(&DcSctpMessage::payload, ElementsAre(3))));
EXPECT_CALL(on_assembled,
Call(ElementsAre(tsn(4)),
Property(&DcSctpMessage::payload, ElementsAre(4))));
}
InterleavedReassemblyStreams streams("", on_assembled.AsStdFunction());
Data data1 = gen_.Ordered({1}, "BE");
Data data2 = gen_.Ordered({2}, "BE");
Data data3 = gen_.Ordered({3}, "BE");
Data data4 = gen_.Ordered({4}, "BE");
EXPECT_EQ(streams.Add(tsn(1), std::move(data1)), 0);
EXPECT_EQ(streams.Add(tsn(3), std::move(data3)), 1);
EXPECT_EQ(streams.Add(tsn(2), std::move(data2)), -1);
EXPECT_EQ(streams.Add(tsn(4), std::move(data4)), 0);
}
} // namespace } // namespace
} // namespace dcsctp } // namespace dcsctp

View File

@ -253,5 +253,62 @@ TEST_F(TraditionalReassemblyStreamsTest, CanDeleteFirstOrderedMessage) {
EXPECT_EQ(streams.Add(tsn(2), gen_.Ordered({2, 3, 4}, "BE")), 0); EXPECT_EQ(streams.Add(tsn(2), gen_.Ordered({2, 3, 4}, "BE")), 0);
} }
TEST_F(TraditionalReassemblyStreamsTest, CanReassembleFastPathUnordered) {
NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
{
testing::InSequence s;
EXPECT_CALL(on_assembled,
Call(ElementsAre(tsn(1)),
Property(&DcSctpMessage::payload, ElementsAre(1))));
EXPECT_CALL(on_assembled,
Call(ElementsAre(tsn(3)),
Property(&DcSctpMessage::payload, ElementsAre(3))));
EXPECT_CALL(on_assembled,
Call(ElementsAre(tsn(2)),
Property(&DcSctpMessage::payload, ElementsAre(2))));
EXPECT_CALL(on_assembled,
Call(ElementsAre(tsn(4)),
Property(&DcSctpMessage::payload, ElementsAre(4))));
}
TraditionalReassemblyStreams streams("", on_assembled.AsStdFunction());
EXPECT_EQ(streams.Add(tsn(1), gen_.Unordered({1}, "BE")), 0);
EXPECT_EQ(streams.Add(tsn(3), gen_.Unordered({3}, "BE")), 0);
EXPECT_EQ(streams.Add(tsn(2), gen_.Unordered({2}, "BE")), 0);
EXPECT_EQ(streams.Add(tsn(4), gen_.Unordered({4}, "BE")), 0);
}
TEST_F(TraditionalReassemblyStreamsTest, CanReassembleFastPathOrdered) {
NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
{
testing::InSequence s;
EXPECT_CALL(on_assembled,
Call(ElementsAre(tsn(1)),
Property(&DcSctpMessage::payload, ElementsAre(1))));
EXPECT_CALL(on_assembled,
Call(ElementsAre(tsn(2)),
Property(&DcSctpMessage::payload, ElementsAre(2))));
EXPECT_CALL(on_assembled,
Call(ElementsAre(tsn(3)),
Property(&DcSctpMessage::payload, ElementsAre(3))));
EXPECT_CALL(on_assembled,
Call(ElementsAre(tsn(4)),
Property(&DcSctpMessage::payload, ElementsAre(4))));
}
TraditionalReassemblyStreams streams("", on_assembled.AsStdFunction());
Data data1 = gen_.Ordered({1}, "BE");
Data data2 = gen_.Ordered({2}, "BE");
Data data3 = gen_.Ordered({3}, "BE");
Data data4 = gen_.Ordered({4}, "BE");
EXPECT_EQ(streams.Add(tsn(1), std::move(data1)), 0);
EXPECT_EQ(streams.Add(tsn(3), std::move(data3)), 1);
EXPECT_EQ(streams.Add(tsn(2), std::move(data2)), -1);
EXPECT_EQ(streams.Add(tsn(4), std::move(data4)), 0);
}
} // namespace } // namespace
} // namespace dcsctp } // namespace dcsctp