pc: Simplify StreamId class

Before this CL, the StreamId class represented either a valid SCTP
stream ID, or "nothing", which means that it was a wrapped
absl::optional. Since created data channels don't have a SCTP stream ID
until it's known whether this peer will use odd or even numbers, the
"nothing" value was used for that state.

This unfortunately made it a bit hard to work with objects of this type,
as one always had to check if it contained a value. And even if a caller
would check this, and then pass the StreamId to a different function,
that function would have to do the check itself (often as a RTC_DCHECK)
since the passed StreamId always could have that state.

This CL simply extracts the "absl::optional" part of it, forcing holders
to wrap it in an optional type - when it can be "nothing". But allowing
the other code to just pass StreamId that can't be "nothing". That
simplifies the code a bit, potentially removing some bugs.

Bug: chromium:41221056
Change-Id: I93104cdd5d2f5fc1dbeb9d9dfc4cf361f11a9d68
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/342440
Reviewed-by: Florent Castelli <orphis@webrtc.org>
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#41880}
This commit is contained in:
Victor Boivie 2024-03-09 21:50:42 +01:00 committed by WebRTC LUCI CQ
parent b4913a549f
commit cd3d29b6fb
8 changed files with 84 additions and 123 deletions

View File

@ -13,6 +13,7 @@
#include <utility> #include <utility>
#include "absl/algorithm/container.h" #include "absl/algorithm/container.h"
#include "absl/types/optional.h"
#include "api/peer_connection_interface.h" #include "api/peer_connection_interface.h"
#include "api/rtc_error.h" #include "api/rtc_error.h"
#include "pc/peer_connection_internal.h" #include "pc/peer_connection_internal.h"
@ -53,7 +54,6 @@ RTCError DataChannelController::SendData(
void DataChannelController::AddSctpDataStream(StreamId sid) { void DataChannelController::AddSctpDataStream(StreamId sid) {
RTC_DCHECK_RUN_ON(network_thread()); RTC_DCHECK_RUN_ON(network_thread());
RTC_DCHECK(sid.HasValue());
if (data_channel_transport_) { if (data_channel_transport_) {
data_channel_transport_->OpenChannel(sid.stream_id_int()); data_channel_transport_->OpenChannel(sid.stream_id_int());
} }
@ -99,7 +99,7 @@ void DataChannelController::OnDataReceived(
return; return;
auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) { auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) {
return c->sid_n().stream_id_int() == channel_id; return c->sid_n().has_value() && c->sid_n()->stream_id_int() == channel_id;
}); });
if (it != sctp_data_channels_n_.end()) if (it != sctp_data_channels_n_.end())
@ -109,7 +109,7 @@ void DataChannelController::OnDataReceived(
void DataChannelController::OnChannelClosing(int channel_id) { void DataChannelController::OnChannelClosing(int channel_id) {
RTC_DCHECK_RUN_ON(network_thread()); RTC_DCHECK_RUN_ON(network_thread());
auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) { auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) {
return c->sid_n().stream_id_int() == channel_id; return c->sid_n().has_value() && c->sid_n()->stream_id_int() == channel_id;
}); });
if (it != sctp_data_channels_n_.end()) if (it != sctp_data_channels_n_.end())
@ -134,7 +134,7 @@ void DataChannelController::OnReadyToSend() {
RTC_DCHECK_RUN_ON(network_thread()); RTC_DCHECK_RUN_ON(network_thread());
auto copy = sctp_data_channels_n_; auto copy = sctp_data_channels_n_;
for (const auto& channel : copy) { for (const auto& channel : copy) {
if (channel->sid_n().HasValue()) { if (channel->sid_n().has_value()) {
channel->OnTransportReady(); channel->OnTransportReady();
} else { } else {
// This happens for role==SSL_SERVER channels when we get notified by // This happens for role==SSL_SERVER channels when we get notified by
@ -157,7 +157,9 @@ void DataChannelController::OnTransportClosed(RTCError error) {
temp_sctp_dcs.swap(sctp_data_channels_n_); temp_sctp_dcs.swap(sctp_data_channels_n_);
for (const auto& channel : temp_sctp_dcs) { for (const auto& channel : temp_sctp_dcs) {
channel->OnTransportChannelClosed(error); channel->OnTransportChannelClosed(error);
sid_allocator_.ReleaseSid(channel->sid_n()); if (channel->sid_n().has_value()) {
sid_allocator_.ReleaseSid(*channel->sid_n());
}
} }
} }
@ -257,13 +259,12 @@ void DataChannelController::OnDataChannelOpenMessage(
// RTC_RUN_ON(network_thread()) // RTC_RUN_ON(network_thread())
RTCError DataChannelController::ReserveOrAllocateSid( RTCError DataChannelController::ReserveOrAllocateSid(
StreamId& sid, absl::optional<StreamId>& sid,
absl::optional<rtc::SSLRole> fallback_ssl_role) { absl::optional<rtc::SSLRole> fallback_ssl_role) {
if (sid.HasValue()) { if (sid.has_value()) {
return sid_allocator_.ReserveSid(sid) return sid_allocator_.ReserveSid(*sid)
? RTCError::OK() ? RTCError::OK()
: RTCError(RTCErrorType::INVALID_RANGE, : RTCError(RTCErrorType::INVALID_RANGE, "StreamId reserved.");
"StreamId out of range or reserved.");
} }
// Attempt to allocate an ID based on the negotiated role. // Attempt to allocate an ID based on the negotiated role.
@ -272,12 +273,12 @@ RTCError DataChannelController::ReserveOrAllocateSid(
role = fallback_ssl_role; role = fallback_ssl_role;
if (role) { if (role) {
sid = sid_allocator_.AllocateSid(*role); sid = sid_allocator_.AllocateSid(*role);
if (!sid.HasValue()) if (!sid.has_value())
return RTCError(RTCErrorType::RESOURCE_EXHAUSTED); return RTCError(RTCErrorType::RESOURCE_EXHAUSTED);
} }
// When we get here, we may still not have an ID, but that's a supported case // When we get here, we may still not have an ID, but that's a supported case
// whereby an id will be assigned later. // whereby an id will be assigned later.
RTC_DCHECK(sid.HasValue() || !role); RTC_DCHECK(sid.has_value() || !role);
return RTCError::OK(); return RTCError::OK();
} }
@ -285,13 +286,22 @@ RTCError DataChannelController::ReserveOrAllocateSid(
RTCErrorOr<rtc::scoped_refptr<SctpDataChannel>> RTCErrorOr<rtc::scoped_refptr<SctpDataChannel>>
DataChannelController::CreateDataChannel(const std::string& label, DataChannelController::CreateDataChannel(const std::string& label,
InternalDataChannelInit& config) { InternalDataChannelInit& config) {
StreamId sid(config.id); absl::optional<StreamId> sid = absl::nullopt;
if (config.id != -1) {
if (config.id < 0 || config.id > cricket::kMaxSctpSid) {
return RTCError(RTCErrorType::INVALID_RANGE, "StreamId out of range.");
}
sid = StreamId(config.id);
}
RTCError err = ReserveOrAllocateSid(sid, config.fallback_ssl_role); RTCError err = ReserveOrAllocateSid(sid, config.fallback_ssl_role);
if (!err.ok()) if (!err.ok())
return err; return err;
// In case `sid` has changed. Update `config` accordingly. // In case `sid` has changed. Update `config` accordingly.
config.id = sid.stream_id_int(); if (sid.has_value()) {
config.id = sid->stream_id_int();
}
rtc::scoped_refptr<SctpDataChannel> channel = SctpDataChannel::Create( rtc::scoped_refptr<SctpDataChannel> channel = SctpDataChannel::Create(
weak_factory_.GetWeakPtr(), label, data_channel_transport_ != nullptr, weak_factory_.GetWeakPtr(), label, data_channel_transport_ != nullptr,
@ -300,8 +310,8 @@ DataChannelController::CreateDataChannel(const std::string& label,
sctp_data_channels_n_.push_back(channel); sctp_data_channels_n_.push_back(channel);
// If we have an id already, notify the transport. // If we have an id already, notify the transport.
if (sid.HasValue()) if (sid.has_value())
AddSctpDataStream(sid); AddSctpDataStream(*sid);
return channel; return channel;
} }
@ -319,7 +329,6 @@ DataChannelController::InternalCreateDataChannelWithProxy(
bool ready_to_send = false; bool ready_to_send = false;
InternalDataChannelInit new_config = config; InternalDataChannelInit new_config = config;
StreamId sid(new_config.id);
auto ret = network_thread()->BlockingCall( auto ret = network_thread()->BlockingCall(
[&]() -> RTCErrorOr<rtc::scoped_refptr<SctpDataChannel>> { [&]() -> RTCErrorOr<rtc::scoped_refptr<SctpDataChannel>> {
RTC_DCHECK_RUN_ON(network_thread()); RTC_DCHECK_RUN_ON(network_thread());
@ -361,16 +370,16 @@ void DataChannelController::AllocateSctpSids(rtc::SSLRole role) {
std::vector<rtc::scoped_refptr<SctpDataChannel>> channels_to_close; std::vector<rtc::scoped_refptr<SctpDataChannel>> channels_to_close;
for (auto it = sctp_data_channels_n_.begin(); for (auto it = sctp_data_channels_n_.begin();
it != sctp_data_channels_n_.end();) { it != sctp_data_channels_n_.end();) {
if (!(*it)->sid_n().HasValue()) { if (!(*it)->sid_n().has_value()) {
StreamId sid = sid_allocator_.AllocateSid(role); absl::optional<StreamId> sid = sid_allocator_.AllocateSid(role);
if (sid.HasValue()) { if (sid.has_value()) {
(*it)->SetSctpSid_n(sid); (*it)->SetSctpSid_n(*sid);
AddSctpDataStream(sid); AddSctpDataStream(*sid);
if (ready_to_send) { if (ready_to_send) {
RTC_LOG(LS_INFO) << "AllocateSctpSids: Id assigned, ready to send."; RTC_LOG(LS_INFO) << "AllocateSctpSids: Id assigned, ready to send.";
(*it)->OnTransportReady(); (*it)->OnTransportReady();
} }
channels_to_update.push_back(std::make_pair((*it).get(), sid)); channels_to_update.push_back(std::make_pair((*it).get(), *sid));
} else { } else {
channels_to_close.push_back(std::move(*it)); channels_to_close.push_back(std::move(*it));
it = sctp_data_channels_n_.erase(it); it = sctp_data_channels_n_.erase(it);
@ -391,8 +400,8 @@ void DataChannelController::OnSctpDataChannelClosed(SctpDataChannel* channel) {
RTC_DCHECK_RUN_ON(network_thread()); RTC_DCHECK_RUN_ON(network_thread());
// After the closing procedure is done, it's safe to use this ID for // After the closing procedure is done, it's safe to use this ID for
// another data channel. // another data channel.
if (channel->sid_n().HasValue()) { if (channel->sid_n().has_value()) {
sid_allocator_.ReleaseSid(channel->sid_n()); sid_allocator_.ReleaseSid(*channel->sid_n());
} }
auto it = absl::c_find_if(sctp_data_channels_n_, auto it = absl::c_find_if(sctp_data_channels_n_,
[&](const auto& c) { return c.get() == channel; }); [&](const auto& c) { return c.get() == channel; });
@ -423,8 +432,8 @@ void DataChannelController::NotifyDataChannelsOfTransportCreated() {
RTC_DCHECK(data_channel_transport_); RTC_DCHECK(data_channel_transport_);
for (const auto& channel : sctp_data_channels_n_) { for (const auto& channel : sctp_data_channels_n_) {
if (channel->sid_n().HasValue()) if (channel->sid_n().has_value())
AddSctpDataStream(channel->sid_n()); AddSctpDataStream(*channel->sid_n());
channel->OnTransportChannelCreated(); channel->OnTransportChannelCreated();
} }
} }

View File

@ -125,7 +125,7 @@ class DataChannelController : public SctpDataChannelControllerInterface,
// will still be unassigned upon return, but will be assigned later. // will still be unassigned upon return, but will be assigned later.
// If the pool has been exhausted or a sid has already been reserved, an // If the pool has been exhausted or a sid has already been reserved, an
// error will be returned. // error will be returned.
RTCError ReserveOrAllocateSid(StreamId& sid, RTCError ReserveOrAllocateSid(absl::optional<StreamId>& sid,
absl::optional<rtc::SSLRole> fallback_ssl_role) absl::optional<rtc::SSLRole> fallback_ssl_role)
RTC_RUN_ON(network_thread()); RTC_RUN_ON(network_thread());

View File

@ -98,7 +98,7 @@ class SctpDataChannelTest : public ::testing::Test {
StreamId sid(0); StreamId sid(0);
network_thread_.BlockingCall([&]() { network_thread_.BlockingCall([&]() {
RTC_DCHECK_RUN_ON(&network_thread_); RTC_DCHECK_RUN_ON(&network_thread_);
if (!inner_channel_->sid_n().HasValue()) { if (!inner_channel_->sid_n().has_value()) {
inner_channel_->SetSctpSid_n(sid); inner_channel_->SetSctpSid_n(sid);
controller_->AddSctpDataStream(sid); controller_->AddSctpDataStream(sid);
} }
@ -115,7 +115,6 @@ class SctpDataChannelTest : public ::testing::Test {
// to run on the network thread. // to run on the network thread.
void SetChannelSid(const rtc::scoped_refptr<SctpDataChannel>& channel, void SetChannelSid(const rtc::scoped_refptr<SctpDataChannel>& channel,
StreamId sid) { StreamId sid) {
RTC_DCHECK(sid.HasValue());
network_thread_.BlockingCall([&]() { network_thread_.BlockingCall([&]() {
channel->SetSctpSid_n(sid); channel->SetSctpSid_n(sid);
controller_->AddSctpDataStream(sid); controller_->AddSctpDataStream(sid);
@ -172,7 +171,7 @@ TEST_F(SctpDataChannelTest, VerifyConfigurationGetters) {
// Check the non-const part of the configuration. // Check the non-const part of the configuration.
EXPECT_EQ(channel_->id(), init_.id); EXPECT_EQ(channel_->id(), init_.id);
network_thread_.BlockingCall( network_thread_.BlockingCall(
[&]() { EXPECT_EQ(inner_channel_->sid_n(), StreamId()); }); [&]() { EXPECT_EQ(inner_channel_->sid_n(), absl::nullopt); });
SetChannelReady(); SetChannelReady();
EXPECT_EQ(channel_->id(), 0); EXPECT_EQ(channel_->id(), 0);
@ -188,12 +187,14 @@ TEST_F(SctpDataChannelTest, ConnectedToTransportOnCreated) {
EXPECT_TRUE(controller_->IsConnected(dc.get())); EXPECT_TRUE(controller_->IsConnected(dc.get()));
// The sid is not set yet, so it should not have added the streams. // The sid is not set yet, so it should not have added the streams.
StreamId sid = network_thread_.BlockingCall([&]() { return dc->sid_n(); }); absl::optional<StreamId> sid =
EXPECT_FALSE(controller_->IsStreamAdded(sid)); network_thread_.BlockingCall([&]() { return dc->sid_n(); });
EXPECT_FALSE(sid.has_value());
SetChannelSid(dc, StreamId(0)); SetChannelSid(dc, StreamId(0));
sid = network_thread_.BlockingCall([&]() { return dc->sid_n(); }); sid = network_thread_.BlockingCall([&]() { return dc->sid_n(); });
EXPECT_TRUE(controller_->IsStreamAdded(sid)); ASSERT_TRUE(sid.has_value());
EXPECT_TRUE(controller_->IsStreamAdded(*sid));
} }
// Tests the state of the data channel. // Tests the state of the data channel.
@ -1035,14 +1036,14 @@ TEST_F(SctpSidAllocatorTest, SctpIdAllocationNoReuse) {
StreamId old_id(1); StreamId old_id(1);
EXPECT_TRUE(allocator_.ReserveSid(old_id)); EXPECT_TRUE(allocator_.ReserveSid(old_id));
StreamId new_id = allocator_.AllocateSid(rtc::SSL_SERVER); absl::optional<StreamId> new_id = allocator_.AllocateSid(rtc::SSL_SERVER);
EXPECT_TRUE(new_id.HasValue()); EXPECT_TRUE(new_id.has_value());
EXPECT_NE(old_id, new_id); EXPECT_NE(old_id, new_id);
old_id = StreamId(0); old_id = StreamId(0);
EXPECT_TRUE(allocator_.ReserveSid(old_id)); EXPECT_TRUE(allocator_.ReserveSid(old_id));
new_id = allocator_.AllocateSid(rtc::SSL_CLIENT); new_id = allocator_.AllocateSid(rtc::SSL_CLIENT);
EXPECT_TRUE(new_id.HasValue()); EXPECT_TRUE(new_id.has_value());
EXPECT_NE(old_id, new_id); EXPECT_NE(old_id, new_id);
} }
@ -1053,17 +1054,18 @@ TEST_F(SctpSidAllocatorTest, SctpIdReusedForRemovedDataChannel) {
EXPECT_TRUE(allocator_.ReserveSid(odd_id)); EXPECT_TRUE(allocator_.ReserveSid(odd_id));
EXPECT_TRUE(allocator_.ReserveSid(even_id)); EXPECT_TRUE(allocator_.ReserveSid(even_id));
StreamId allocated_id = allocator_.AllocateSid(rtc::SSL_SERVER); absl::optional<StreamId> allocated_id =
EXPECT_EQ(odd_id.stream_id_int() + 2, allocated_id.stream_id_int()); allocator_.AllocateSid(rtc::SSL_SERVER);
EXPECT_EQ(odd_id.stream_id_int() + 2, allocated_id->stream_id_int());
allocated_id = allocator_.AllocateSid(rtc::SSL_CLIENT); allocated_id = allocator_.AllocateSid(rtc::SSL_CLIENT);
EXPECT_EQ(even_id.stream_id_int() + 2, allocated_id.stream_id_int()); EXPECT_EQ(even_id.stream_id_int() + 2, allocated_id->stream_id_int());
allocated_id = allocator_.AllocateSid(rtc::SSL_SERVER); allocated_id = allocator_.AllocateSid(rtc::SSL_SERVER);
EXPECT_EQ(odd_id.stream_id_int() + 4, allocated_id.stream_id_int()); EXPECT_EQ(odd_id.stream_id_int() + 4, allocated_id->stream_id_int());
allocated_id = allocator_.AllocateSid(rtc::SSL_CLIENT); allocated_id = allocator_.AllocateSid(rtc::SSL_CLIENT);
EXPECT_EQ(even_id.stream_id_int() + 4, allocated_id.stream_id_int()); EXPECT_EQ(even_id.stream_id_int() + 4, allocated_id->stream_id_int());
allocator_.ReleaseSid(odd_id); allocator_.ReleaseSid(odd_id);
allocator_.ReleaseSid(even_id); allocator_.ReleaseSid(even_id);
@ -1077,10 +1079,10 @@ TEST_F(SctpSidAllocatorTest, SctpIdReusedForRemovedDataChannel) {
// Verifies that used higher ids are not reused. // Verifies that used higher ids are not reused.
allocated_id = allocator_.AllocateSid(rtc::SSL_SERVER); allocated_id = allocator_.AllocateSid(rtc::SSL_SERVER);
EXPECT_EQ(odd_id.stream_id_int() + 6, allocated_id.stream_id_int()); EXPECT_EQ(odd_id.stream_id_int() + 6, allocated_id->stream_id_int());
allocated_id = allocator_.AllocateSid(rtc::SSL_CLIENT); allocated_id = allocator_.AllocateSid(rtc::SSL_CLIENT);
EXPECT_EQ(even_id.stream_id_int() + 6, allocated_id.stream_id_int()); EXPECT_EQ(even_id.stream_id_int() + 6, allocated_id->stream_id_int());
} }
// Code coverage tests for default implementations in data_channel_interface.*. // Code coverage tests for default implementations in data_channel_interface.*.

View File

@ -118,7 +118,7 @@ bool InternalDataChannelInit::IsValid() const {
return true; return true;
} }
StreamId SctpSidAllocator::AllocateSid(rtc::SSLRole role) { absl::optional<StreamId> SctpSidAllocator::AllocateSid(rtc::SSLRole role) {
RTC_DCHECK_RUN_ON(&sequence_checker_); RTC_DCHECK_RUN_ON(&sequence_checker_);
int potential_sid = (role == rtc::SSL_CLIENT) ? 0 : 1; int potential_sid = (role == rtc::SSL_CLIENT) ? 0 : 1;
while (potential_sid <= static_cast<int>(cricket::kMaxSctpSid)) { while (potential_sid <= static_cast<int>(cricket::kMaxSctpSid)) {
@ -128,13 +128,11 @@ StreamId SctpSidAllocator::AllocateSid(rtc::SSLRole role) {
potential_sid += 2; potential_sid += 2;
} }
RTC_LOG(LS_ERROR) << "SCTP sid allocation pool exhausted."; RTC_LOG(LS_ERROR) << "SCTP sid allocation pool exhausted.";
return StreamId(); return absl::nullopt;
} }
bool SctpSidAllocator::ReserveSid(StreamId sid) { bool SctpSidAllocator::ReserveSid(StreamId sid) {
RTC_DCHECK_RUN_ON(&sequence_checker_); RTC_DCHECK_RUN_ON(&sequence_checker_);
if (!sid.HasValue() || sid.stream_id_int() > cricket::kMaxSctpSid)
return false;
return used_sids_.insert(sid).second; return used_sids_.insert(sid).second;
} }
@ -316,7 +314,7 @@ SctpDataChannel::SctpDataChannel(
rtc::Thread* network_thread) rtc::Thread* network_thread)
: signaling_thread_(signaling_thread), : signaling_thread_(signaling_thread),
network_thread_(network_thread), network_thread_(network_thread),
id_n_(config.id), id_n_(config.id == -1 ? absl::nullopt : absl::make_optional(config.id)),
internal_id_(GenerateUniqueId()), internal_id_(GenerateUniqueId()),
label_(label), label_(label),
protocol_(config.protocol), protocol_(config.protocol),
@ -478,7 +476,7 @@ bool SctpDataChannel::negotiated() const {
int SctpDataChannel::id() const { int SctpDataChannel::id() const {
RTC_DCHECK_RUN_ON(network_thread_); RTC_DCHECK_RUN_ON(network_thread_);
return id_n_.stream_id_int(); return id_n_.has_value() ? id_n_->stream_id_int() : -1;
} }
Priority SctpDataChannel::priority() const { Priority SctpDataChannel::priority() const {
@ -615,8 +613,7 @@ void SctpDataChannel::SendAsync(
void SctpDataChannel::SetSctpSid_n(StreamId sid) { void SctpDataChannel::SetSctpSid_n(StreamId sid) {
RTC_DCHECK_RUN_ON(network_thread_); RTC_DCHECK_RUN_ON(network_thread_);
RTC_DCHECK(!id_n_.HasValue()); RTC_DCHECK(!id_n_.has_value());
RTC_DCHECK(sid.HasValue());
RTC_DCHECK_NE(handshake_state_, kHandshakeWaitingForAck); RTC_DCHECK_NE(handshake_state_, kHandshakeWaitingForAck);
RTC_DCHECK_EQ(state_, kConnecting); RTC_DCHECK_EQ(state_, kConnecting);
id_n_ = sid; id_n_ = sid;
@ -672,24 +669,25 @@ DataChannelStats SctpDataChannel::GetStats() const {
void SctpDataChannel::OnDataReceived(DataMessageType type, void SctpDataChannel::OnDataReceived(DataMessageType type,
const rtc::CopyOnWriteBuffer& payload) { const rtc::CopyOnWriteBuffer& payload) {
RTC_DCHECK_RUN_ON(network_thread_); RTC_DCHECK_RUN_ON(network_thread_);
RTC_DCHECK(id_n_.has_value());
if (type == DataMessageType::kControl) { if (type == DataMessageType::kControl) {
if (handshake_state_ != kHandshakeWaitingForAck) { if (handshake_state_ != kHandshakeWaitingForAck) {
// Ignore it if we are not expecting an ACK message. // Ignore it if we are not expecting an ACK message.
RTC_LOG(LS_WARNING) RTC_LOG(LS_WARNING)
<< "DataChannel received unexpected CONTROL message, sid = " << "DataChannel received unexpected CONTROL message, sid = "
<< id_n_.stream_id_int(); << id_n_->stream_id_int();
return; return;
} }
if (ParseDataChannelOpenAckMessage(payload)) { if (ParseDataChannelOpenAckMessage(payload)) {
// We can send unordered as soon as we receive the ACK message. // We can send unordered as soon as we receive the ACK message.
handshake_state_ = kHandshakeReady; handshake_state_ = kHandshakeReady;
RTC_LOG(LS_INFO) << "DataChannel received OPEN_ACK message, sid = " RTC_LOG(LS_INFO) << "DataChannel received OPEN_ACK message, sid = "
<< id_n_.stream_id_int(); << id_n_->stream_id_int();
} else { } else {
RTC_LOG(LS_WARNING) RTC_LOG(LS_WARNING)
<< "DataChannel failed to parse OPEN_ACK message, sid = " << "DataChannel failed to parse OPEN_ACK message, sid = "
<< id_n_.stream_id_int(); << id_n_->stream_id_int();
} }
return; return;
} }
@ -698,7 +696,7 @@ void SctpDataChannel::OnDataReceived(DataMessageType type,
type == DataMessageType::kText); type == DataMessageType::kText);
RTC_DLOG(LS_VERBOSE) << "DataChannel received DATA message, sid = " RTC_DLOG(LS_VERBOSE) << "DataChannel received DATA message, sid = "
<< id_n_.stream_id_int(); << id_n_->stream_id_int();
// We can send unordered as soon as we receive any DATA message since the // We can send unordered as soon as we receive any DATA message since the
// remote side must have received the OPEN (and old clients do not send // remote side must have received the OPEN (and old clients do not send
// OPEN_ACK). // OPEN_ACK).
@ -731,7 +729,7 @@ void SctpDataChannel::OnDataReceived(DataMessageType type,
void SctpDataChannel::OnTransportReady() { void SctpDataChannel::OnTransportReady() {
RTC_DCHECK_RUN_ON(network_thread_); RTC_DCHECK_RUN_ON(network_thread_);
RTC_DCHECK(connected_to_transport()); RTC_DCHECK(connected_to_transport());
RTC_DCHECK(id_n_.HasValue()); RTC_DCHECK(id_n_.has_value());
SendQueuedControlMessages(); SendQueuedControlMessages();
SendQueuedDataMessages(); SendQueuedDataMessages();
@ -796,7 +794,7 @@ void SctpDataChannel::UpdateState() {
DeliverQueuedReceivedData(); DeliverQueuedReceivedData();
} }
} else { } else {
RTC_DCHECK(!id_n_.HasValue()); RTC_DCHECK(!id_n_.has_value());
} }
break; break;
} }
@ -812,9 +810,9 @@ void SctpDataChannel::UpdateState() {
// to complete; after calling RemoveSctpDataStream, // to complete; after calling RemoveSctpDataStream,
// OnClosingProcedureComplete will end up called asynchronously // OnClosingProcedureComplete will end up called asynchronously
// afterwards. // afterwards.
if (!started_closing_procedure_ && id_n_.HasValue()) { if (!started_closing_procedure_ && id_n_.has_value()) {
started_closing_procedure_ = true; started_closing_procedure_ = true;
controller_->RemoveSctpDataStream(id_n_); controller_->RemoveSctpDataStream(*id_n_);
} }
} }
} else { } else {
@ -882,7 +880,7 @@ void SctpDataChannel::SendQueuedDataMessages() {
RTCError SctpDataChannel::SendDataMessage(const DataBuffer& buffer, RTCError SctpDataChannel::SendDataMessage(const DataBuffer& buffer,
bool queue_if_blocked) { bool queue_if_blocked) {
SendDataParams send_params; SendDataParams send_params;
if (!controller_) { if (!controller_ || !id_n_.has_value()) {
error_ = RTCError(RTCErrorType::INVALID_STATE); error_ = RTCError(RTCErrorType::INVALID_STATE);
return error_; return error_;
} }
@ -901,7 +899,7 @@ RTCError SctpDataChannel::SendDataMessage(const DataBuffer& buffer,
send_params.type = send_params.type =
buffer.binary ? DataMessageType::kBinary : DataMessageType::kText; buffer.binary ? DataMessageType::kBinary : DataMessageType::kText;
error_ = controller_->SendData(id_n_, send_params, buffer.data); error_ = controller_->SendData(*id_n_, send_params, buffer.data);
if (error_.ok()) { if (error_.ok()) {
++messages_sent_; ++messages_sent_;
bytes_sent_ += buffer.size(); bytes_sent_ += buffer.size();
@ -959,7 +957,7 @@ void SctpDataChannel::SendQueuedControlMessages() {
// RTC_RUN_ON(network_thread_). // RTC_RUN_ON(network_thread_).
bool SctpDataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) { bool SctpDataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) {
RTC_DCHECK(connected_to_transport()); RTC_DCHECK(connected_to_transport());
RTC_DCHECK(id_n_.HasValue()); RTC_DCHECK(id_n_.has_value());
RTC_DCHECK(controller_); RTC_DCHECK(controller_);
bool is_open_message = handshake_state_ == kHandshakeShouldSendOpen; bool is_open_message = handshake_state_ == kHandshakeShouldSendOpen;
@ -972,10 +970,10 @@ bool SctpDataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) {
send_params.ordered = ordered_ || is_open_message; send_params.ordered = ordered_ || is_open_message;
send_params.type = DataMessageType::kControl; send_params.type = DataMessageType::kControl;
RTCError err = controller_->SendData(id_n_, send_params, buffer); RTCError err = controller_->SendData(*id_n_, send_params, buffer);
if (err.ok()) { if (err.ok()) {
RTC_DLOG(LS_VERBOSE) << "Sent CONTROL message on channel " RTC_DLOG(LS_VERBOSE) << "Sent CONTROL message on channel "
<< id_n_.stream_id_int(); << id_n_->stream_id_int();
if (handshake_state_ == kHandshakeShouldSendAck) { if (handshake_state_ == kHandshakeShouldSendAck) {
handshake_state_ = kHandshakeReady; handshake_state_ = kHandshakeReady;

View File

@ -85,8 +85,8 @@ class SctpSidAllocator {
// Gets the first unused odd/even id based on the DTLS role. If `role` is // Gets the first unused odd/even id based on the DTLS role. If `role` is
// SSL_CLIENT, the allocated id starts from 0 and takes even numbers; // SSL_CLIENT, the allocated id starts from 0 and takes even numbers;
// otherwise, the id starts from 1 and takes odd numbers. // otherwise, the id starts from 1 and takes odd numbers.
// If a `StreamId` cannot be allocated, `StreamId::HasValue()` will be false. // If a `StreamId` cannot be allocated, `absl::nullopt` is returned.
StreamId AllocateSid(rtc::SSLRole role); absl::optional<StreamId> AllocateSid(rtc::SSLRole role);
// Attempts to reserve a specific sid. Returns false if it's unavailable. // Attempts to reserve a specific sid. Returns false if it's unavailable.
bool ReserveSid(StreamId sid); bool ReserveSid(StreamId sid);
@ -215,7 +215,7 @@ class SctpDataChannel : public DataChannelInterface {
// stats purposes (see also `GetStats()`). // stats purposes (see also `GetStats()`).
int internal_id() const { return internal_id_; } int internal_id() const { return internal_id_; }
StreamId sid_n() const { absl::optional<StreamId> sid_n() const {
RTC_DCHECK_RUN_ON(network_thread_); RTC_DCHECK_RUN_ON(network_thread_);
return id_n_; return id_n_;
} }
@ -267,7 +267,8 @@ class SctpDataChannel : public DataChannelInterface {
rtc::Thread* const signaling_thread_; rtc::Thread* const signaling_thread_;
rtc::Thread* const network_thread_; rtc::Thread* const network_thread_;
StreamId id_n_ RTC_GUARDED_BY(network_thread_); absl::optional<StreamId> id_n_ RTC_GUARDED_BY(network_thread_) =
absl::nullopt;
const int internal_id_; const int internal_id_;
const std::string label_; const std::string label_;
const std::string protocol_; const std::string protocol_;

View File

@ -36,36 +36,20 @@ struct DataChannelInit;
class StreamId { class StreamId {
public: public:
StreamId() = default; StreamId() = default;
explicit StreamId(int id) explicit StreamId(uint16_t id) : id_(id) {}
: id_(id >= cricket::kMinSctpSid && id <= cricket::kSpecMaxSctpSid
? absl::optional<uint16_t>(static_cast<uint16_t>(id))
: absl::nullopt) {}
StreamId(const StreamId& sid) = default; StreamId(const StreamId& sid) = default;
StreamId& operator=(const StreamId& sid) = default; StreamId& operator=(const StreamId& sid) = default;
// Returns `true` if a valid stream id is contained, in the range of
// kMinSctpSid - kSpecMaxSctpSid ([0..0xffff]). Note that this
// is different than having `kMaxSctpSid` as the upper bound, which is
// the limit that is internally used by `SctpSidAllocator`. Sid values may
// be assigned to `StreamId` outside of `SctpSidAllocator` and have a higher
// id value than supplied by `SctpSidAllocator`, yet is still valid.
bool HasValue() const { return id_.has_value(); }
// Provided for compatibility with existing code that hasn't been updated // Provided for compatibility with existing code that hasn't been updated
// to use `StreamId` directly. New code should not use 'int' for the stream // to use `StreamId` directly. New code should not use 'int' for the stream
// id but rather `StreamId` directly. // id but rather `StreamId` directly.
int stream_id_int() const { int stream_id_int() const { return static_cast<int>(id_.value()); }
return id_.has_value() ? static_cast<int>(id_.value().value()) : -1;
}
void reset() { id_ = absl::nullopt; }
bool operator==(const StreamId& sid) const { return id_ == sid.id_; } bool operator==(const StreamId& sid) const { return id_ == sid.id_; }
bool operator<(const StreamId& sid) const { return id_ < sid.id_; } bool operator<(const StreamId& sid) const { return id_ < sid.id_; }
bool operator!=(const StreamId& sid) const { return !(operator==(sid)); } bool operator!=(const StreamId& sid) const { return !(operator==(sid)); }
private: private:
absl::optional<dcsctp::StreamID> id_; dcsctp::StreamID id_;
}; };
// Read the message type and return true if it's an OPEN message. // Read the message type and return true if it's an OPEN message.

View File

@ -208,35 +208,4 @@ TEST(SctpSidTest, Basics) {
static_assert( static_assert(
cricket::kSpecMaxSctpSid == std::numeric_limits<uint16_t>::max(), cricket::kSpecMaxSctpSid == std::numeric_limits<uint16_t>::max(),
"Max legal sctp stream value should be 0xffff"); "Max legal sctp stream value should be 0xffff");
// cricket::kMaxSctpSid is a chosen value in the webrtc implementation,
// the highest generated `sid` value chosen for resource reservation reasons.
// It's one less than kMaxSctpStreams (1024) or 1023 since sid values are
// zero based.
EXPECT_TRUE(!StreamId(-1).HasValue());
EXPECT_TRUE(!StreamId(-2).HasValue());
EXPECT_TRUE(StreamId(cricket::kMinSctpSid).HasValue());
EXPECT_TRUE(StreamId(cricket::kMinSctpSid + 1).HasValue());
EXPECT_TRUE(StreamId(cricket::kSpecMaxSctpSid).HasValue());
EXPECT_TRUE(StreamId(cricket::kMaxSctpSid).HasValue());
// Two illegal values are equal (both not valid).
EXPECT_EQ(StreamId(-1), StreamId(-2));
// Two different, but legal, values, are not equal.
EXPECT_NE(StreamId(1), StreamId(2));
// Test operator<() for container compatibility.
EXPECT_LT(StreamId(1), StreamId(2));
// Test assignment, value() and reset().
StreamId sid1;
StreamId sid2(cricket::kMaxSctpSid);
EXPECT_NE(sid1, sid2);
sid1 = sid2;
EXPECT_EQ(sid1, sid2);
EXPECT_EQ(sid1.stream_id_int(), cricket::kMaxSctpSid);
EXPECT_TRUE(sid1.HasValue());
sid1.reset();
EXPECT_FALSE(sid1.HasValue());
} }

View File

@ -59,8 +59,8 @@ class FakeDataChannelController
std::move(my_weak_ptr), std::string(label), std::move(my_weak_ptr), std::string(label),
transport_available_, init, signaling_thread_, transport_available_, init, signaling_thread_,
network_thread_); network_thread_);
if (transport_available_ && channel->sid_n().HasValue()) { if (transport_available_ && channel->sid_n().has_value()) {
AddSctpDataStream(channel->sid_n()); AddSctpDataStream(*channel->sid_n());
} }
if (ready_to_send_) { if (ready_to_send_) {
network_thread_->PostTask([channel = channel] { network_thread_->PostTask([channel = channel] {
@ -97,7 +97,6 @@ class FakeDataChannelController
void AddSctpDataStream(webrtc::StreamId sid) override { void AddSctpDataStream(webrtc::StreamId sid) override {
RTC_DCHECK_RUN_ON(network_thread_); RTC_DCHECK_RUN_ON(network_thread_);
RTC_CHECK(sid.HasValue());
if (!transport_available_) { if (!transport_available_) {
return; return;
} }
@ -106,7 +105,6 @@ class FakeDataChannelController
void RemoveSctpDataStream(webrtc::StreamId sid) override { void RemoveSctpDataStream(webrtc::StreamId sid) override {
RTC_DCHECK_RUN_ON(network_thread_); RTC_DCHECK_RUN_ON(network_thread_);
RTC_CHECK(sid.HasValue());
known_stream_ids_.erase(sid); known_stream_ids_.erase(sid);
// Unlike the real SCTP transport, act like the closing procedure finished // Unlike the real SCTP transport, act like the closing procedure finished
// instantly. // instantly.