Revert "[PeerConnection] Use an OperationsChain in PeerConnection for async ops."

This reverts commit 1dddaa1a84330091ca083c950ef2e24a85a48fc8.

Reason for revert: Breaks downstream projects :(

Original change's description:
> [PeerConnection] Use an OperationsChain in PeerConnection for async ops.
> 
> For background, motivation, requirements and implementation notes, see
> https://docs.google.com/document/d/1XLwNN2kUIGGTwz9LQ0NwJNkcybi9oKnynUEZB1jGA14/edit?usp=sharing
> 
> Using the OperationsChain will unblock future CLs from chaining multiple
> operations together such as implementing parameterless
> setLocalDescription().
> 
> In this CL, the OperationsChain is used in existing signaling operations
> with little intended side-effects. An operation that is chained onto an
> empty OperationsChain will for instance execute immediately, and
> SetLocalDescription() and SetRemoteDescription() are implemented as
> "synchronous operations".
> 
> The lifetime of the PeerConnection is not indended to change as a result
> of this CL: All chained operations use a raw pointer to the PC that is
> ensured not to be used-after-free using an "IsAlive" object.
> 
> There is one notable change though: CreateOffer() and CreateAnswer() will
> asynchronously delay other signaling methods from executing until they
> have completed.
> 
> Drive-by fix: This CL also ensures that early failing
> CreateOffer/CreateAnswer operation's observers are invoked if the
> PeerConnection is destroyed while a PostCreateSessionDescriptionFailure
> is pending.
> 
> Bug: webrtc:11019
> Change-Id: I521333e41d20d9bbfb1e721609f2c9db2a5f93a9
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/157305
> Reviewed-by: Steve Anton <steveanton@webrtc.org>
> Commit-Queue: Henrik Boström <hbos@webrtc.org>
> Cr-Commit-Position: refs/heads/master@{#29605}

TBR=steveanton@webrtc.org,hbos@webrtc.org

Change-Id: Ie540dcc8ecdc48ad0c65d23645fbc3ad5f99592b
No-Presubmit: true
No-Tree-Checks: true
No-Try: true
Bug: webrtc:11019
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/158405
Reviewed-by: Henrik Boström <hbos@webrtc.org>
Commit-Queue: Henrik Boström <hbos@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#29611}
This commit is contained in:
Henrik Boström 2019-10-25 09:54:38 +00:00 committed by Commit Bot
parent 1dac707ecb
commit 49c0880afa
4 changed files with 15 additions and 327 deletions

View File

@ -255,9 +255,7 @@ rtc_library("peerconnection") {
"../rtc_base",
"../rtc_base:checks",
"../rtc_base:rtc_base_approved",
"../rtc_base:rtc_operations_chain",
"../rtc_base:safe_minmax",
"../rtc_base:weak_ptr",
"../rtc_base/experiments:field_trial_parser",
"../rtc_base/system:fallthrough",
"../rtc_base/system:file_wrapper",

View File

@ -647,49 +647,6 @@ const ContentInfo* FindTransceiverMSection(
: nullptr;
}
// Wraps a CreateSessionDescriptionObserver and an OperationsChain operation
// complete callback. When the observer is invoked, the wrapped observer is
// invoked followed by invoking the completion callback.
class CreateSessionDescriptionObserverOperationWrapper
: public CreateSessionDescriptionObserver {
public:
CreateSessionDescriptionObserverOperationWrapper(
rtc::scoped_refptr<CreateSessionDescriptionObserver> observer,
std::function<void()> operation_complete_callback)
: observer_(std::move(observer)),
operation_complete_callback_(std::move(operation_complete_callback)) {
RTC_DCHECK(observer_);
}
~CreateSessionDescriptionObserverOperationWrapper() override {
RTC_DCHECK(was_called_);
}
void OnSuccess(SessionDescriptionInterface* desc) override {
RTC_DCHECK(!was_called_);
#ifdef RTC_DCHECK_IS_ON
was_called_ = true;
#endif // RTC_DCHECK_IS_ON
observer_->OnSuccess(desc);
operation_complete_callback_();
}
void OnFailure(RTCError error) override {
RTC_DCHECK(!was_called_);
#ifdef RTC_DCHECK_IS_ON
was_called_ = true;
#endif // RTC_DCHECK_IS_ON
observer_->OnFailure(std::move(error));
operation_complete_callback_();
}
private:
#ifdef RTC_DCHECK_IS_ON
bool was_called_ = false;
#endif // RTC_DCHECK_IS_ON
rtc::scoped_refptr<CreateSessionDescriptionObserver> observer_;
std::function<void()> operation_complete_callback_;
};
} // namespace
class PeerConnection::LocalIceCredentialsToReplace {
@ -935,7 +892,6 @@ PeerConnection::PeerConnection(PeerConnectionFactory* factory,
: factory_(factory),
event_log_(std::move(event_log)),
event_log_ptr_(event_log_.get()),
operations_chain_(rtc::OperationsChain::Create()),
datagram_transport_config_(
field_trial::FindFullName(kDatagramTransportFieldTrial)),
datagram_transport_data_channel_config_(
@ -946,15 +902,12 @@ PeerConnection::PeerConnection(PeerConnectionFactory* factory,
call_(std::move(call)),
call_ptr_(call_.get()),
data_channel_transport_(nullptr),
local_ice_credentials_to_replace_(new LocalIceCredentialsToReplace()),
weak_ptr_factory_(this) {}
local_ice_credentials_to_replace_(new LocalIceCredentialsToReplace()) {}
PeerConnection::~PeerConnection() {
TRACE_EVENT0("webrtc", "PeerConnection::~PeerConnection");
RTC_DCHECK_RUN_ON(signaling_thread());
weak_ptr_factory_.InvalidateWeakPtrs();
// Need to stop transceivers before destroying the stats collector because
// AudioRtpSender has a reference to the StatsCollector it will update when
// stopping.
@ -991,23 +944,6 @@ PeerConnection::~PeerConnection() {
// The event log must outlive call (and any other object that uses it).
event_log_.reset();
});
// Process all pending notifications in the message queue. If we don't do
// this, requests will linger and not know they succeeded or failed.
rtc::MessageList list;
signaling_thread()->Clear(this, rtc::MQID_ANY, &list);
for (auto& msg : list) {
if (msg.message_id == MSG_CREATE_SESSIONDESCRIPTION_FAILED) {
// Processing CreateOffer() and CreateAnswer() messages ensures their
// observers are invoked even if the PeerConnection is destroyed early.
OnMessage(&msg);
} else {
// TODO(hbos): Consider processing all pending messages. This would mean
// that SetLocalDescription() and SetRemoteDescription() observers are
// informed of successes and failures; this is currently NOT the case.
delete msg.pdata;
}
}
}
void PeerConnection::DestroyAllChannels() {
@ -2114,37 +2050,7 @@ void PeerConnection::RestartIce() {
void PeerConnection::CreateOffer(CreateSessionDescriptionObserver* observer,
const RTCOfferAnswerOptions& options) {
RTC_DCHECK_RUN_ON(signaling_thread());
// Chain this operation. If asynchronous operations are pending on the chain,
// this operation will be queued to be invoked, otherwise the contents of the
// lambda will execute immediately.
operations_chain_->ChainOperation(
[this_weak_ptr = weak_ptr_factory_.GetWeakPtr(),
observer_refptr =
rtc::scoped_refptr<CreateSessionDescriptionObserver>(observer),
options](std::function<void()> operations_chain_callback) {
// Abort early if |this_weak_ptr| is no longer valid.
if (!this_weak_ptr) {
observer_refptr->OnFailure(
RTCError(RTCErrorType::INTERNAL_ERROR,
"CreateOffer failed because the session was shut down"));
operations_chain_callback();
return;
}
// The operation completes asynchronously when the wrapper is invoked.
rtc::scoped_refptr<CreateSessionDescriptionObserverOperationWrapper>
observer_wrapper(new rtc::RefCountedObject<
CreateSessionDescriptionObserverOperationWrapper>(
std::move(observer_refptr),
std::move(operations_chain_callback)));
this_weak_ptr->DoCreateOffer(options, observer_wrapper);
});
}
void PeerConnection::DoCreateOffer(
const RTCOfferAnswerOptions& options,
rtc::scoped_refptr<CreateSessionDescriptionObserver> observer) {
RTC_DCHECK_RUN_ON(signaling_thread());
TRACE_EVENT0("webrtc", "PeerConnection::DoCreateOffer");
TRACE_EVENT0("webrtc", "PeerConnection::CreateOffer");
if (!observer) {
RTC_LOG(LS_ERROR) << "CreateOffer - observer is NULL.";
@ -2270,37 +2176,7 @@ PeerConnection::GetReceivingTransceiversOfType(cricket::MediaType media_type) {
void PeerConnection::CreateAnswer(CreateSessionDescriptionObserver* observer,
const RTCOfferAnswerOptions& options) {
RTC_DCHECK_RUN_ON(signaling_thread());
// Chain this operation. If asynchronous operations are pending on the chain,
// this operation will be queued to be invoked, otherwise the contents of the
// lambda will execute immediately.
operations_chain_->ChainOperation(
[this_weak_ptr = weak_ptr_factory_.GetWeakPtr(),
observer_refptr =
rtc::scoped_refptr<CreateSessionDescriptionObserver>(observer),
options](std::function<void()> operations_chain_callback) {
// Abort early if |this_weak_ptr| is no longer valid.
if (!this_weak_ptr) {
observer_refptr->OnFailure(RTCError(
RTCErrorType::INTERNAL_ERROR,
"CreateAnswer failed because the session was shut down"));
operations_chain_callback();
return;
}
// The operation completes asynchronously when the wrapper is invoked.
rtc::scoped_refptr<CreateSessionDescriptionObserverOperationWrapper>
observer_wrapper(new rtc::RefCountedObject<
CreateSessionDescriptionObserverOperationWrapper>(
std::move(observer_refptr),
std::move(operations_chain_callback)));
this_weak_ptr->DoCreateAnswer(options, observer_wrapper);
});
}
void PeerConnection::DoCreateAnswer(
const RTCOfferAnswerOptions& options,
rtc::scoped_refptr<CreateSessionDescriptionObserver> observer) {
RTC_DCHECK_RUN_ON(signaling_thread());
TRACE_EVENT0("webrtc", "PeerConnection::DoCreateAnswer");
TRACE_EVENT0("webrtc", "PeerConnection::CreateAnswer");
if (!observer) {
RTC_LOG(LS_ERROR) << "CreateAnswer - observer is NULL.";
return;
@ -2354,44 +2230,13 @@ void PeerConnection::SetLocalDescription(
SetSessionDescriptionObserver* observer,
SessionDescriptionInterface* desc_ptr) {
RTC_DCHECK_RUN_ON(signaling_thread());
// Chain this operation. If asynchronous operations are pending on the chain,
// this operation will be queued to be invoked, otherwise the contents of the
// lambda will execute immediately.
operations_chain_->ChainOperation(
[this_weak_ptr = weak_ptr_factory_.GetWeakPtr(),
observer_refptr =
rtc::scoped_refptr<SetSessionDescriptionObserver>(observer),
desc = std::unique_ptr<SessionDescriptionInterface>(desc_ptr)](
std::function<void()> operations_chain_callback) mutable {
// Abort early if |this_weak_ptr| is no longer valid.
if (!this_weak_ptr) {
// For consistency with DoSetLocalDescription(), we DO NOT inform the
// |observer_refptr| that the operation failed in this case.
// TODO(hbos): If/when we process SLD messages in ~PeerConnection,
// the consistent thing would be to inform the observer here.
operations_chain_callback();
return;
}
this_weak_ptr->DoSetLocalDescription(std::move(desc),
std::move(observer_refptr));
// DoSetLocalDescription() is currently implemented as a synchronous
// operation but where the |observer|'s callbacks are invoked
// asynchronously in a post to OnMessage().
// For backwards-compatability reasons, we declare the operation as
// completed here (rather than in OnMessage()). This ensures that:
// - This operation is not keeping the PeerConnection alive past this
// point.
// - Subsequent offer/answer operations can start immediately (without
// waiting for OnMessage()).
operations_chain_callback();
});
}
TRACE_EVENT0("webrtc", "PeerConnection::SetLocalDescription");
void PeerConnection::DoSetLocalDescription(
std::unique_ptr<SessionDescriptionInterface> desc,
rtc::scoped_refptr<SetSessionDescriptionObserver> observer) {
RTC_DCHECK_RUN_ON(signaling_thread());
TRACE_EVENT0("webrtc", "PeerConnection::DoSetLocalDescription");
// The SetLocalDescription contract is that we take ownership of the session
// description regardless of the outcome, so wrap it in a unique_ptr right
// away. Ideally, SetLocalDescription's signature will be changed to take the
// description as a unique_ptr argument to formalize this agreement.
std::unique_ptr<SessionDescriptionInterface> desc(desc_ptr);
if (!observer) {
RTC_LOG(LS_ERROR) << "SetLocalDescription - observer is NULL.";
@ -2772,83 +2617,18 @@ void PeerConnection::FillInMissingRemoteMids(
void PeerConnection::SetRemoteDescription(
SetSessionDescriptionObserver* observer,
SessionDescriptionInterface* desc_ptr) {
RTC_DCHECK_RUN_ON(signaling_thread());
// Chain this operation. If asynchronous operations are pending on the chain,
// this operation will be queued to be invoked, otherwise the contents of the
// lambda will execute immediately.
operations_chain_->ChainOperation(
[this_weak_ptr = weak_ptr_factory_.GetWeakPtr(),
observer_refptr =
rtc::scoped_refptr<SetSessionDescriptionObserver>(observer),
desc = std::unique_ptr<SessionDescriptionInterface>(desc_ptr)](
std::function<void()> operations_chain_callback) mutable {
// Abort early if |this_weak_ptr| is no longer valid.
if (!this_weak_ptr) {
// For consistency with SetRemoteDescriptionObserverAdapter, we DO NOT
// inform the |observer_refptr| that the operation failed in this
// case.
// TODO(hbos): If/when we process SRD messages in ~PeerConnection,
// the consistent thing would be to inform the observer here.
operations_chain_callback();
return;
}
this_weak_ptr->DoSetRemoteDescription(
std::move(desc),
rtc::scoped_refptr<SetRemoteDescriptionObserverInterface>(
new SetRemoteDescriptionObserverAdapter(
this_weak_ptr.get(), std::move(observer_refptr))));
// DoSetRemoteDescription() is currently implemented as a synchronous
// operation but where SetRemoteDescriptionObserverAdapter ensures that
// the |observer|'s callbacks are invoked asynchronously in a post to
// OnMessage().
// For backwards-compatability reasons, we declare the operation as
// completed here (rather than in OnMessage()). This ensures that:
// - This operation is not keeping the PeerConnection alive past this
// point.
// - Subsequent offer/answer operations can start immediately (without
// waiting for OnMessage()).
operations_chain_callback();
});
SessionDescriptionInterface* desc) {
SetRemoteDescription(
std::unique_ptr<SessionDescriptionInterface>(desc),
rtc::scoped_refptr<SetRemoteDescriptionObserverInterface>(
new SetRemoteDescriptionObserverAdapter(this, observer)));
}
void PeerConnection::SetRemoteDescription(
std::unique_ptr<SessionDescriptionInterface> desc,
rtc::scoped_refptr<SetRemoteDescriptionObserverInterface> observer) {
RTC_DCHECK_RUN_ON(signaling_thread());
// Chain this operation. If asynchronous operations are pending on the chain,
// this operation will be queued to be invoked, otherwise the contents of the
// lambda will execute immediately.
operations_chain_->ChainOperation(
[this_weak_ptr = weak_ptr_factory_.GetWeakPtr(), observer,
desc = std::move(desc)](
std::function<void()> operations_chain_callback) mutable {
// Abort early if |this_weak_ptr| is no longer valid.
if (!this_weak_ptr) {
// For consistency with DoSetRemoteDescription(), we DO inform the
// |observer| that the operation failed in this case.
observer->OnSetRemoteDescriptionComplete(RTCError(
RTCErrorType::INVALID_STATE,
"Failed to set remote offer sdp: failed because the session was "
"shut down"));
operations_chain_callback();
return;
}
this_weak_ptr->DoSetRemoteDescription(std::move(desc),
std::move(observer));
// DoSetRemoteDescription() is currently implemented as a synchronous
// operation. The |observer| will already have been informed that it
// completed, and we can mark this operation as complete without any
// loose ends.
operations_chain_callback();
});
}
void PeerConnection::DoSetRemoteDescription(
std::unique_ptr<SessionDescriptionInterface> desc,
rtc::scoped_refptr<SetRemoteDescriptionObserverInterface> observer) {
RTC_DCHECK_RUN_ON(signaling_thread());
TRACE_EVENT0("webrtc", "PeerConnection::DoSetRemoteDescription");
TRACE_EVENT0("webrtc", "PeerConnection::SetRemoteDescription");
if (!observer) {
RTC_LOG(LS_ERROR) << "SetRemoteDescription - observer is NULL.";

View File

@ -34,10 +34,8 @@
#include "pc/stream_collection.h"
#include "pc/webrtc_session_description_factory.h"
#include "rtc_base/experiments/field_trial_parser.h"
#include "rtc_base/operations_chain.h"
#include "rtc_base/race_checker.h"
#include "rtc_base/unique_id_generator.h"
#include "rtc_base/weak_ptr.h"
namespace webrtc {
@ -445,22 +443,6 @@ class PeerConnection : public PeerConnectionInternal,
rtc::scoped_refptr<RtpTransceiverProxyWithInternal<RtpTransceiver>>
GetFirstAudioTransceiver() const RTC_RUN_ON(signaling_thread());
// Implementation of the offer/answer exchange operations. These are chained
// onto the |operations_chain_| when the public CreateOffer(), CreateAnswer(),
// SetLocalDescription() and SetRemoteDescription() methods are invoked.
void DoCreateOffer(
const RTCOfferAnswerOptions& options,
rtc::scoped_refptr<CreateSessionDescriptionObserver> observer);
void DoCreateAnswer(
const RTCOfferAnswerOptions& options,
rtc::scoped_refptr<CreateSessionDescriptionObserver> observer);
void DoSetLocalDescription(
std::unique_ptr<SessionDescriptionInterface> desc,
rtc::scoped_refptr<SetSessionDescriptionObserver> observer);
void DoSetRemoteDescription(
std::unique_ptr<SessionDescriptionInterface> desc,
rtc::scoped_refptr<SetRemoteDescriptionObserverInterface> observer);
void CreateAudioReceiver(MediaStreamInterface* stream,
const RtpSenderInfo& remote_sender_info)
RTC_RUN_ON(signaling_thread());
@ -1235,14 +1217,6 @@ class PeerConnection : public PeerConnectionInternal,
// pointer (but not touch the object) from any thread.
RtcEventLog* const event_log_ptr_ RTC_PT_GUARDED_BY(worker_thread());
// The operations chain is used by the offer/answer exchange methods to ensure
// they are executed in the right order. For example, if
// SetRemoteDescription() is invoked while CreateOffer() is still pending, the
// SRD operation will not start until CreateOffer() has completed. See
// https://w3c.github.io/webrtc-pc/#dfn-operations-chain.
rtc::scoped_refptr<rtc::OperationsChain> operations_chain_
RTC_GUARDED_BY(signaling_thread());
SignalingState signaling_state_ RTC_GUARDED_BY(signaling_thread()) = kStable;
IceConnectionState ice_connection_state_ RTC_GUARDED_BY(signaling_thread()) =
kIceConnectionNew;
@ -1472,9 +1446,6 @@ class PeerConnection : public PeerConnectionInternal,
std::unique_ptr<LocalIceCredentialsToReplace>
local_ice_credentials_to_replace_ RTC_GUARDED_BY(signaling_thread());
bool is_negotiation_needed_ RTC_GUARDED_BY(signaling_thread()) = false;
rtc::WeakPtrFactory<PeerConnection> weak_ptr_factory_
RTC_GUARDED_BY(signaling_thread());
};
} // namespace webrtc

View File

@ -41,10 +41,6 @@ using ::testing::Bool;
using ::testing::Combine;
using ::testing::Values;
namespace {
const int64_t kWaitTimeout = 10000;
} // namespace
class PeerConnectionWrapperForSignalingTest : public PeerConnectionWrapper {
public:
using PeerConnectionWrapper::PeerConnectionWrapper;
@ -526,63 +522,6 @@ TEST_P(PeerConnectionSignalingTest, CreateOffersAndShutdown) {
}
}
// Similar to the above test, but by closing the PC first the CreateOffer() will
// fail "early", which triggers a codepath where the PeerConnection is
// reponsible for invoking the observer, instead of the normal codepath where
// the WebRtcSessionDescriptionFactory is responsible for it.
TEST_P(PeerConnectionSignalingTest, CloseCreateOfferAndShutdown) {
auto caller = CreatePeerConnection();
rtc::scoped_refptr<MockCreateSessionDescriptionObserver> observer =
new rtc::RefCountedObject<MockCreateSessionDescriptionObserver>();
caller->pc()->Close();
caller->pc()->CreateOffer(observer, RTCOfferAnswerOptions());
caller.reset(nullptr);
EXPECT_TRUE(observer->called());
}
TEST_P(PeerConnectionSignalingTest, SetRemoteDescriptionExecutesImmediately) {
auto caller = CreatePeerConnectionWithAudioVideo();
auto callee = CreatePeerConnection();
// This offer will cause receivers to be created.
auto offer = caller->CreateOffer(RTCOfferAnswerOptions());
// By not waiting for the observer's callback we can verify that the operation
// executed immediately.
callee->pc()->SetRemoteDescription(std::move(offer),
new MockSetRemoteDescriptionObserver());
EXPECT_EQ(2u, callee->pc()->GetReceivers().size());
}
TEST_P(PeerConnectionSignalingTest, CreateOfferBlocksSetRemoteDescription) {
auto caller = CreatePeerConnectionWithAudioVideo();
auto callee = CreatePeerConnection();
// This offer will cause receivers to be created.
auto offer = caller->CreateOffer(RTCOfferAnswerOptions());
EXPECT_EQ(0u, callee->pc()->GetReceivers().size());
rtc::scoped_refptr<MockCreateSessionDescriptionObserver> offer_observer(
new rtc::RefCountedObject<MockCreateSessionDescriptionObserver>());
// Synchronously invoke CreateOffer() and SetRemoteDescription(). The
// SetRemoteDescription() operation should be chained to be executed
// asynchronously, when CreateOffer() completes.
callee->pc()->CreateOffer(offer_observer, RTCOfferAnswerOptions());
callee->pc()->SetRemoteDescription(std::move(offer),
new MockSetRemoteDescriptionObserver());
// CreateOffer() is asynchronous; without message processing this operation
// should not have completed.
EXPECT_FALSE(offer_observer->called());
// Due to chaining, the receivers should not have been created by the offer
// yet.
EXPECT_EQ(0u, callee->pc()->GetReceivers().size());
// EXPECT_EQ_WAIT causes messages to be processed...
EXPECT_EQ_WAIT(true, offer_observer->called(), kWaitTimeout);
// Now that the offer has been completed, SetRemoteDescription() will have
// been executed next in the chain.
EXPECT_EQ(2u, callee->pc()->GetReceivers().size());
}
INSTANTIATE_TEST_SUITE_P(PeerConnectionSignalingTest,
PeerConnectionSignalingTest,
Values(SdpSemantics::kPlanB,