Bug: webrtc:370878648 Change-Id: I4529c17f54c653864cca27097e44c843210b9c52 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/368061 Reviewed-by: Danil Chapovalov <danilchap@webrtc.org> Commit-Queue: Dor Hen <dorhen@meta.com> Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org> Reviewed-by: Harald Alvestrand <hta@webrtc.org> Cr-Commit-Position: refs/heads/main@{#43429}
292 lines
9.5 KiB
C++
292 lines
9.5 KiB
C++
/*
|
|
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
|
|
*
|
|
* Use of this source code is governed by a BSD-style license
|
|
* that can be found in the LICENSE file in the root of the source
|
|
* tree. An additional intellectual property rights grant can be found
|
|
* in the file PATENTS. All contributing project authors may
|
|
* be found in the AUTHORS file in the root of the source tree.
|
|
*/
|
|
#include "net/dcsctp/rx/interleaved_reassembly_streams.h"
|
|
|
|
#include <stddef.h>
|
|
|
|
#include <cstdint>
|
|
#include <functional>
|
|
#include <iterator>
|
|
#include <map>
|
|
#include <numeric>
|
|
#include <unordered_map>
|
|
#include <utility>
|
|
#include <vector>
|
|
|
|
#include "absl/algorithm/container.h"
|
|
#include "api/array_view.h"
|
|
#include "net/dcsctp/common/sequence_numbers.h"
|
|
#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
|
|
#include "net/dcsctp/packet/data.h"
|
|
#include "net/dcsctp/public/types.h"
|
|
#include "rtc_base/logging.h"
|
|
|
|
namespace dcsctp {
|
|
|
|
InterleavedReassemblyStreams::InterleavedReassemblyStreams(
|
|
absl::string_view log_prefix,
|
|
OnAssembledMessage on_assembled_message)
|
|
: log_prefix_(log_prefix), on_assembled_message_(on_assembled_message) {}
|
|
|
|
size_t InterleavedReassemblyStreams::Stream::TryToAssembleMessage(
|
|
UnwrappedMID mid) {
|
|
std::map<UnwrappedMID, ChunkMap>::iterator it = chunks_by_mid_.find(mid);
|
|
if (it == chunks_by_mid_.end()) {
|
|
RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage "
|
|
<< *mid.Wrap() << " - no chunks";
|
|
return 0;
|
|
}
|
|
ChunkMap& chunks = it->second;
|
|
if (!chunks.begin()->second.second.is_beginning ||
|
|
!chunks.rbegin()->second.second.is_end) {
|
|
RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage "
|
|
<< *mid.Wrap() << "- missing beginning or end";
|
|
return 0;
|
|
}
|
|
int64_t fsn_diff = *chunks.rbegin()->first - *chunks.begin()->first;
|
|
if (fsn_diff != (static_cast<int64_t>(chunks.size()) - 1)) {
|
|
RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage "
|
|
<< *mid.Wrap() << "- not all chunks exist (have "
|
|
<< chunks.size() << ", expect " << (fsn_diff + 1)
|
|
<< ")";
|
|
return 0;
|
|
}
|
|
|
|
size_t removed_bytes = AssembleMessage(chunks);
|
|
RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage "
|
|
<< *mid.Wrap() << " - succeeded and removed "
|
|
<< removed_bytes;
|
|
|
|
chunks_by_mid_.erase(mid);
|
|
return removed_bytes;
|
|
}
|
|
|
|
size_t InterleavedReassemblyStreams::Stream::AssembleMessage(UnwrappedTSN tsn,
|
|
Data data) {
|
|
size_t payload_size = data.size();
|
|
UnwrappedTSN tsns[1] = {tsn};
|
|
DcSctpMessage message(data.stream_id, data.ppid, std::move(data.payload));
|
|
parent_.on_assembled_message_(tsns, std::move(message));
|
|
return payload_size;
|
|
}
|
|
|
|
size_t InterleavedReassemblyStreams::Stream::AssembleMessage(
|
|
ChunkMap& tsn_chunks) {
|
|
size_t count = tsn_chunks.size();
|
|
if (count == 1) {
|
|
// Fast path - zero-copy
|
|
return AssembleMessage(tsn_chunks.begin()->second.first,
|
|
std::move(tsn_chunks.begin()->second.second));
|
|
}
|
|
|
|
// Slow path - will need to concatenate the payload.
|
|
std::vector<UnwrappedTSN> tsns;
|
|
tsns.reserve(count);
|
|
|
|
std::vector<uint8_t> payload;
|
|
size_t payload_size = absl::c_accumulate(
|
|
tsn_chunks, 0,
|
|
[](size_t v, const auto& p) { return v + p.second.second.size(); });
|
|
payload.reserve(payload_size);
|
|
|
|
for (auto& item : tsn_chunks) {
|
|
const UnwrappedTSN tsn = item.second.first;
|
|
const Data& data = item.second.second;
|
|
tsns.push_back(tsn);
|
|
payload.insert(payload.end(), data.payload.begin(), data.payload.end());
|
|
}
|
|
|
|
const Data& data = tsn_chunks.begin()->second.second;
|
|
|
|
DcSctpMessage message(data.stream_id, data.ppid, std::move(payload));
|
|
parent_.on_assembled_message_(tsns, std::move(message));
|
|
return payload_size;
|
|
}
|
|
|
|
size_t InterleavedReassemblyStreams::Stream::EraseTo(MID mid) {
|
|
UnwrappedMID unwrapped_mid = mid_unwrapper_.Unwrap(mid);
|
|
|
|
size_t removed_bytes = 0;
|
|
auto it = chunks_by_mid_.begin();
|
|
while (it != chunks_by_mid_.end() && it->first <= unwrapped_mid) {
|
|
removed_bytes += absl::c_accumulate(
|
|
it->second, 0,
|
|
[](size_t r2, const auto& q) { return r2 + q.second.second.size(); });
|
|
it = chunks_by_mid_.erase(it);
|
|
}
|
|
|
|
if (!stream_id_.unordered) {
|
|
// For ordered streams, erasing a message might suddenly unblock that queue
|
|
// and allow it to deliver any following received messages.
|
|
if (unwrapped_mid >= next_mid_) {
|
|
next_mid_ = unwrapped_mid.next_value();
|
|
}
|
|
|
|
removed_bytes += TryToAssembleMessages();
|
|
}
|
|
|
|
return removed_bytes;
|
|
}
|
|
|
|
int InterleavedReassemblyStreams::Stream::Add(UnwrappedTSN tsn, Data data) {
|
|
RTC_DCHECK_EQ(*data.is_unordered, *stream_id_.unordered);
|
|
RTC_DCHECK_EQ(*data.stream_id, *stream_id_.stream_id);
|
|
int queued_bytes = data.size();
|
|
UnwrappedMID mid = mid_unwrapper_.Unwrap(data.mid);
|
|
FSN fsn = data.fsn;
|
|
|
|
// Avoid inserting it into any map if it can be delivered directly.
|
|
if (stream_id_.unordered && data.is_beginning && data.is_end) {
|
|
AssembleMessage(tsn, std::move(data));
|
|
return 0;
|
|
|
|
} else if (!stream_id_.unordered && mid == next_mid_ && data.is_beginning &&
|
|
data.is_end) {
|
|
AssembleMessage(tsn, std::move(data));
|
|
next_mid_.Increment();
|
|
// This might unblock assembling more messages.
|
|
return -TryToAssembleMessages();
|
|
}
|
|
|
|
// Slow path.
|
|
auto [unused, inserted] =
|
|
chunks_by_mid_[mid].emplace(fsn, std::make_pair(tsn, std::move(data)));
|
|
if (!inserted) {
|
|
return 0;
|
|
}
|
|
|
|
if (stream_id_.unordered) {
|
|
queued_bytes -= TryToAssembleMessage(mid);
|
|
} else {
|
|
if (mid == next_mid_) {
|
|
queued_bytes -= TryToAssembleMessages();
|
|
}
|
|
}
|
|
|
|
return queued_bytes;
|
|
}
|
|
|
|
size_t InterleavedReassemblyStreams::Stream::TryToAssembleMessages() {
|
|
size_t removed_bytes = 0;
|
|
|
|
for (;;) {
|
|
size_t removed_bytes_this_iter = TryToAssembleMessage(next_mid_);
|
|
if (removed_bytes_this_iter == 0) {
|
|
break;
|
|
}
|
|
|
|
removed_bytes += removed_bytes_this_iter;
|
|
next_mid_.Increment();
|
|
}
|
|
return removed_bytes;
|
|
}
|
|
|
|
void InterleavedReassemblyStreams::Stream::AddHandoverState(
|
|
DcSctpSocketHandoverState& state) const {
|
|
if (stream_id_.unordered) {
|
|
DcSctpSocketHandoverState::UnorderedStream state_stream;
|
|
state_stream.id = stream_id_.stream_id.value();
|
|
state.rx.unordered_streams.push_back(std::move(state_stream));
|
|
} else {
|
|
DcSctpSocketHandoverState::OrderedStream state_stream;
|
|
state_stream.id = stream_id_.stream_id.value();
|
|
state_stream.next_ssn = next_mid_.Wrap().value();
|
|
state.rx.ordered_streams.push_back(std::move(state_stream));
|
|
}
|
|
}
|
|
|
|
InterleavedReassemblyStreams::Stream&
|
|
InterleavedReassemblyStreams::GetOrCreateStream(const FullStreamId& stream_id) {
|
|
auto it = streams_.find(stream_id);
|
|
if (it == streams_.end()) {
|
|
it =
|
|
streams_
|
|
.emplace(std::piecewise_construct, std::forward_as_tuple(stream_id),
|
|
std::forward_as_tuple(stream_id, this))
|
|
.first;
|
|
}
|
|
return it->second;
|
|
}
|
|
|
|
int InterleavedReassemblyStreams::Add(UnwrappedTSN tsn, Data data) {
|
|
return GetOrCreateStream(FullStreamId(data.is_unordered, data.stream_id))
|
|
.Add(tsn, std::move(data));
|
|
}
|
|
|
|
size_t InterleavedReassemblyStreams::HandleForwardTsn(
|
|
UnwrappedTSN /* new_cumulative_ack_tsn */,
|
|
rtc::ArrayView<const AnyForwardTsnChunk::SkippedStream> skipped_streams) {
|
|
size_t removed_bytes = 0;
|
|
for (const auto& skipped : skipped_streams) {
|
|
removed_bytes +=
|
|
GetOrCreateStream(FullStreamId(skipped.unordered, skipped.stream_id))
|
|
.EraseTo(skipped.mid);
|
|
}
|
|
return removed_bytes;
|
|
}
|
|
|
|
void InterleavedReassemblyStreams::ResetStreams(
|
|
rtc::ArrayView<const StreamID> stream_ids) {
|
|
if (stream_ids.empty()) {
|
|
for (auto& entry : streams_) {
|
|
entry.second.Reset();
|
|
}
|
|
} else {
|
|
for (StreamID stream_id : stream_ids) {
|
|
GetOrCreateStream(FullStreamId(IsUnordered(true), stream_id)).Reset();
|
|
GetOrCreateStream(FullStreamId(IsUnordered(false), stream_id)).Reset();
|
|
}
|
|
}
|
|
}
|
|
|
|
HandoverReadinessStatus InterleavedReassemblyStreams::GetHandoverReadiness()
|
|
const {
|
|
HandoverReadinessStatus status;
|
|
for (const auto& [stream_id, stream] : streams_) {
|
|
if (stream.has_unassembled_chunks()) {
|
|
status.Add(
|
|
stream_id.unordered
|
|
? HandoverUnreadinessReason::kUnorderedStreamHasUnassembledChunks
|
|
: HandoverUnreadinessReason::kOrderedStreamHasUnassembledChunks);
|
|
break;
|
|
}
|
|
}
|
|
return status;
|
|
}
|
|
|
|
void InterleavedReassemblyStreams::AddHandoverState(
|
|
DcSctpSocketHandoverState& state) {
|
|
for (const auto& [unused, stream] : streams_) {
|
|
stream.AddHandoverState(state);
|
|
}
|
|
}
|
|
|
|
void InterleavedReassemblyStreams::RestoreFromState(
|
|
const DcSctpSocketHandoverState& state) {
|
|
// Validate that the component is in pristine state.
|
|
RTC_DCHECK(streams_.empty());
|
|
|
|
for (const DcSctpSocketHandoverState::OrderedStream& state :
|
|
state.rx.ordered_streams) {
|
|
FullStreamId stream_id(IsUnordered(false), StreamID(state.id));
|
|
streams_.emplace(
|
|
std::piecewise_construct, std::forward_as_tuple(stream_id),
|
|
std::forward_as_tuple(stream_id, this, MID(state.next_ssn)));
|
|
}
|
|
for (const DcSctpSocketHandoverState::UnorderedStream& state :
|
|
state.rx.unordered_streams) {
|
|
FullStreamId stream_id(IsUnordered(true), StreamID(state.id));
|
|
streams_.emplace(std::piecewise_construct, std::forward_as_tuple(stream_id),
|
|
std::forward_as_tuple(stream_id, this));
|
|
}
|
|
}
|
|
|
|
} // namespace dcsctp
|