Direct IP connect functionality for AppRTC Android demo.

This allows connecting between clients without using external servers, which is useful to OEMs if they are working in a network without internet connection. Implementation uses custom AppRTCClient that replaces WebSocketRTCClient if roomId looks like an IP. Instead of a web socket, this class uses direct TCP connection between peers as a signaling channel.

Review-Url: https://codereview.webrtc.org/1963053002
Cr-Commit-Position: refs/heads/master@{#12789}
This commit is contained in:
sakal 2016-05-18 03:36:41 -07:00 committed by Commit bot
parent 5a216d0489
commit 299ccdee0c
5 changed files with 1038 additions and 3 deletions

View File

@ -139,7 +139,6 @@ public class CallActivity extends Activity
private HudFragment hudFragment;
private CpuMonitor cpuMonitor;
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
@ -240,8 +239,15 @@ public class CallActivity extends Activity
commandLineRun = intent.getBooleanExtra(EXTRA_CMDLINE, false);
runTimeMs = intent.getIntExtra(EXTRA_RUNTIME, 0);
// Create connection client and connection parameters.
appRtcClient = new WebSocketRTCClient(this, new LooperExecutor());
// Create connection client. Use DirectRTCClient if room name is an IP otherwise use the
// standard WebSocketRTCClient.
if (loopback || !DirectRTCClient.IP_PATTERN.matcher(roomId).matches()) {
appRtcClient = new WebSocketRTCClient(this, new LooperExecutor());
} else {
Log.i(TAG, "Using DirectRTCClient because room name looks like an IP.");
appRtcClient = new DirectRTCClient(this);
}
// Create connection parameters.
roomConnectionParameters = new RoomConnectionParameters(
roomUri.toString(), roomId, loopback);

View File

@ -0,0 +1,354 @@
/*
* Copyright 2016 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
package org.appspot.apprtc;
import android.util.Log;
import org.appspot.apprtc.util.LooperExecutor;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.webrtc.IceCandidate;
import org.webrtc.PeerConnection;
import org.webrtc.SessionDescription;
import java.util.LinkedList;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Implementation of AppRTCClient that uses direct TCP connection as the signaling channel.
* This eliminates the need for an external server. This class does not support loopback
* connections.
*/
public class DirectRTCClient implements AppRTCClient, TCPChannelClient.TCPChannelEvents {
private static final String TAG = "DirectRTCClient";
private static final int DEFAULT_PORT = 8888;
// Regex pattern used for checking if room id looks like an IP.
static final Pattern IP_PATTERN = Pattern.compile(
"("
// IPv4
+ "((\\d+\\.){3}\\d+)|"
// IPv6
+ "\\[((([0-9a-fA-F]{1,4}:)*[0-9a-fA-F]{1,4})?::"
+ "(([0-9a-fA-F]{1,4}:)*[0-9a-fA-F]{1,4})?)\\]|"
+ "\\[(([0-9a-fA-F]{1,4}:)*[0-9a-fA-F]{1,4})\\]|"
// IPv6 without []
+ "((([0-9a-fA-F]{1,4}:)*[0-9a-fA-F]{1,4})?::(([0-9a-fA-F]{1,4}:)*[0-9a-fA-F]{1,4})?)|"
+ "(([0-9a-fA-F]{1,4}:)*[0-9a-fA-F]{1,4})|"
// Literals
+ "localhost"
+ ")"
// Optional port number
+ "(:(\\d+))?"
);
private final LooperExecutor executor;
private final SignalingEvents events;
private TCPChannelClient tcpClient;
private RoomConnectionParameters connectionParameters;
private enum ConnectionState {
NEW, CONNECTED, CLOSED, ERROR
};
// All alterations of the room state should be done from inside the looper thread.
private ConnectionState roomState;
public DirectRTCClient(SignalingEvents events) {
this.events = events;
executor = new LooperExecutor();
executor.requestStart();
roomState = ConnectionState.NEW;
}
/**
* Connects to the room, roomId in connectionsParameters is required. roomId must be a valid
* IP address matching IP_PATTERN.
*/
@Override
public void connectToRoom(RoomConnectionParameters connectionParameters) {
this.connectionParameters = connectionParameters;
if (connectionParameters.loopback) {
reportError("Loopback connections aren't supported by DirectRTCClient.");
}
executor.execute(new Runnable() {
@Override
public void run() {
connectToRoomInternal();
}
});
}
@Override
public void disconnectFromRoom() {
executor.execute(new Runnable() {
@Override
public void run() {
disconnectFromRoomInternal();
}
});
executor.requestStop();
}
/**
* Connects to the room.
*
* Runs on the looper thread.
*/
private void connectToRoomInternal() {
this.roomState = ConnectionState.NEW;
String endpoint = connectionParameters.roomId;
Matcher matcher = IP_PATTERN.matcher(endpoint);
if (!matcher.matches()) {
reportError("roomId must match IP_PATTERN for DirectRTCClient.");
return;
}
String ip = matcher.group(1);
String portStr = matcher.group(matcher.groupCount());
int port;
if (portStr != null) {
try {
port = Integer.parseInt(portStr);
} catch (NumberFormatException e) {
reportError("Invalid port number: " + portStr);
return;
}
} else {
port = DEFAULT_PORT;
}
tcpClient = new TCPChannelClient(executor, this, ip, port);
}
/**
* Disconnects from the room.
*
* Runs on the looper thread.
*/
private void disconnectFromRoomInternal() {
roomState = ConnectionState.CLOSED;
if (tcpClient != null) {
tcpClient.disconnect();
tcpClient = null;
}
}
@Override
public void sendOfferSdp(final SessionDescription sdp) {
executor.execute(new Runnable() {
@Override
public void run() {
if (roomState != ConnectionState.CONNECTED) {
reportError("Sending offer SDP in non connected state.");
return;
}
JSONObject json = new JSONObject();
jsonPut(json, "sdp", sdp.description);
jsonPut(json, "type", "offer");
sendMessage(json.toString());
}
});
}
@Override
public void sendAnswerSdp(final SessionDescription sdp) {
executor.execute(new Runnable() {
@Override
public void run() {
JSONObject json = new JSONObject();
jsonPut(json, "sdp", sdp.description);
jsonPut(json, "type", "answer");
sendMessage(json.toString());
}
});
}
@Override
public void sendLocalIceCandidate(final IceCandidate candidate) {
executor.execute(new Runnable() {
@Override
public void run() {
JSONObject json = new JSONObject();
jsonPut(json, "type", "candidate");
jsonPut(json, "label", candidate.sdpMLineIndex);
jsonPut(json, "id", candidate.sdpMid);
jsonPut(json, "candidate", candidate.sdp);
if (roomState != ConnectionState.CONNECTED) {
reportError("Sending ICE candidate in non connected state.");
return;
}
sendMessage(json.toString());
}
});
}
/** Send removed Ice candidates to the other participant. */
@Override
public void sendLocalIceCandidateRemovals(final IceCandidate[] candidates) {
executor.execute(new Runnable() {
@Override
public void run() {
JSONObject json = new JSONObject();
jsonPut(json, "type", "remove-candidates");
JSONArray jsonArray = new JSONArray();
for (final IceCandidate candidate : candidates) {
jsonArray.put(toJsonCandidate(candidate));
}
jsonPut(json, "candidates", jsonArray);
if (roomState != ConnectionState.CONNECTED) {
reportError("Sending ICE candidate removals in non connected state.");
return;
}
sendMessage(json.toString());
}
});
}
// -------------------------------------------------------------------
// TCPChannelClient event handlers
/**
* If the client is the server side, this will trigger onConnectedToRoom.
*/
@Override
public void onTCPConnected(boolean isServer) {
if (isServer) {
roomState = ConnectionState.CONNECTED;
SignalingParameters parameters = new SignalingParameters(
// Ice servers are not needed for direct connections.
new LinkedList<PeerConnection.IceServer>(),
isServer, // Server side acts as the initiator on direct connections.
null, // clientId
null, // wssUrl
null, // wwsPostUrl
null, // offerSdp
null // iceCandidates
);
events.onConnectedToRoom(parameters);
}
}
@Override
public void onTCPMessage(String msg) {
try {
JSONObject json = new JSONObject(msg);
String type = json.optString("type");
if (type.equals("candidate")) {
events.onRemoteIceCandidate(toJavaCandidate(json));
} else if (type.equals("remove-candidates")) {
JSONArray candidateArray = json.getJSONArray("candidates");
IceCandidate[] candidates = new IceCandidate[candidateArray.length()];
for (int i = 0; i < candidateArray.length(); ++i) {
candidates[i] = toJavaCandidate(candidateArray.getJSONObject(i));
}
events.onRemoteIceCandidatesRemoved(candidates);
} else if (type.equals("answer")) {
SessionDescription sdp = new SessionDescription(
SessionDescription.Type.fromCanonicalForm(type),
json.getString("sdp"));
events.onRemoteDescription(sdp);
} else if (type.equals("offer")) {
SessionDescription sdp = new SessionDescription(
SessionDescription.Type.fromCanonicalForm(type),
json.getString("sdp"));
SignalingParameters parameters = new SignalingParameters(
// Ice servers are not needed for direct connections.
new LinkedList<PeerConnection.IceServer>(),
false, // This code will only be run on the client side. So, we are not the initiator.
null, // clientId
null, // wssUrl
null, // wssPostUrl
sdp, // offerSdp
null // iceCandidates
);
roomState = ConnectionState.CONNECTED;
events.onConnectedToRoom(parameters);
} else {
reportError("Unexpected TCP message: " + msg);
}
} catch (JSONException e) {
reportError("TCP message JSON parsing error: " + e.toString());
}
}
@Override
public void onTCPError(String description) {
reportError("TCP connection error: " + description);
}
@Override
public void onTCPClose() {
events.onChannelClose();
}
// --------------------------------------------------------------------
// Helper functions.
private void reportError(final String errorMessage) {
Log.e(TAG, errorMessage);
executor.execute(new Runnable() {
@Override
public void run() {
if (roomState != ConnectionState.ERROR) {
roomState = ConnectionState.ERROR;
events.onChannelError(errorMessage);
}
}
});
}
private void sendMessage(final String message) {
executor.execute(new Runnable() {
@Override
public void run() {
tcpClient.send(message);
}
});
}
// Put a |key|->|value| mapping in |json|.
private static void jsonPut(JSONObject json, String key, Object value) {
try {
json.put(key, value);
} catch (JSONException e) {
throw new RuntimeException(e);
}
}
// Converts a Java candidate to a JSONObject.
private static JSONObject toJsonCandidate(final IceCandidate candidate) {
JSONObject json = new JSONObject();
jsonPut(json, "label", candidate.sdpMLineIndex);
jsonPut(json, "id", candidate.sdpMid);
jsonPut(json, "candidate", candidate.sdp);
return json;
}
// Converts a JSON candidate to a Java object.
private static IceCandidate toJavaCandidate(JSONObject json) throws JSONException {
return new IceCandidate(json.getString("id"),
json.getInt("label"),
json.getString("candidate"));
}
}

View File

@ -0,0 +1,362 @@
/*
* Copyright 2016 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
package org.appspot.apprtc;
import android.util.Log;
import org.appspot.apprtc.util.LooperExecutor;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
/**
* Replacement for WebSocketChannelClient for direct communication between two IP addresses. Handles
* the signaling between the two clients using a TCP connection.
*
* <p>All public methods should be called from a looper executor thread
* passed in a constructor, otherwise exception will be thrown.
* All events are dispatched on the same thread.
*/
public class TCPChannelClient {
private static final String TAG = "TCPChannelClient";
private final LooperExecutor executor;
private final TCPChannelEvents eventListener;
private TCPSocket socket;
/**
* Callback interface for messages delivered on TCP Connection. All callbacks are invoked from the
* looper executor thread.
*/
public interface TCPChannelEvents {
void onTCPConnected(boolean server);
void onTCPMessage(String message);
void onTCPError(String description);
void onTCPClose();
}
/**
* Initializes the TCPChannelClient. If IP is a local IP address, starts a listening server on
* that IP. If not, instead connects to the IP.
*
* @param eventListener Listener that will receive events from the client.
* @param ip IP address to listen on or connect to.
* @param port Port to listen on or connect to.
*/
public TCPChannelClient(
LooperExecutor executor, TCPChannelEvents eventListener, String ip, int port) {
this.executor = executor;
this.eventListener = eventListener;
InetAddress address;
try {
address = InetAddress.getByName(ip);
} catch (UnknownHostException e) {
reportError("Invalid IP address.");
return;
}
if (address.isAnyLocalAddress()) {
socket = new TCPSocketServer(address, port);
} else {
socket = new TCPSocketClient(address, port);
}
socket.start();
}
/**
* Disconnects the client if not already disconnected. This will fire the onTCPClose event.
*/
public void disconnect() {
checkIfCalledOnValidThread();
socket.disconnect();
}
/**
* Sends a message on the socket.
*
* @param message Message to be sent.
*/
public void send(String message) {
checkIfCalledOnValidThread();
socket.send(message);
}
/**
* Helper method for firing onTCPError events. Calls onTCPError on the executor thread.
*/
private void reportError(final String message) {
Log.e(TAG, "TCP Error: " + message);
executor.execute(new Runnable() {
@Override
public void run() {
eventListener.onTCPError(message);
}
});
}
/**
* Helper method for debugging purposes.
* Ensures that TCPChannelClient method is called on a looper thread.
*/
private void checkIfCalledOnValidThread() {
if (!executor.checkOnLooperThread()) {
throw new IllegalStateException(
"TCPChannelClient method is not called on valid thread");
}
}
/**
* Base class for server and client sockets. Contains a listening thread that will call
* eventListener.onTCPMessage on new messages.
*/
private abstract class TCPSocket extends Thread {
// Lock for editing out and rawSocket
protected final Object rawSocketLock;
private PrintWriter out;
private Socket rawSocket;
/**
* Connect to the peer, potentially a slow operation.
*
* @return Socket connection, null if connection failed.
*/
public abstract Socket connect();
/** Returns true if sockets is a server rawSocket. */
public abstract boolean isServer();
TCPSocket() {
rawSocketLock = new Object();
}
/**
* The listening thread.
*/
@Override
public void run() {
Log.d(TAG, "Listening thread started...");
// Receive connection to temporary variable first, so we don't block.
Socket tempSocket = connect();
BufferedReader in;
Log.d(TAG, "TCP connection established.");
synchronized (rawSocketLock) {
if (rawSocket != null) {
Log.e(TAG, "Socket already existed and will be replaced.");
}
rawSocket = tempSocket;
// Connecting failed, error has already been reported, just exit.
if (rawSocket == null) {
return;
}
try {
out = new PrintWriter(rawSocket.getOutputStream(), true);
in = new BufferedReader(new InputStreamReader(rawSocket.getInputStream()));
} catch (IOException e) {
reportError("Failed to open IO on rawSocket: " + e.getMessage());
return;
}
}
Log.v(TAG, "Execute onTCPConnected");
executor.execute(new Runnable() {
@Override
public void run() {
Log.v(TAG, "Run onTCPConnected");
eventListener.onTCPConnected(isServer());
}
});
while (true) {
final String message;
try {
message = in.readLine();
} catch (IOException e) {
synchronized (rawSocketLock) {
// If socket was closed, this is expected.
if (rawSocket == null) {
break;
}
}
reportError("Failed to read from rawSocket: " + e.getMessage());
break;
}
// No data received, rawSocket probably closed.
if (message == null) {
break;
}
executor.execute(new Runnable() {
@Override
public void run() {
Log.v(TAG, "Receive: " + message);
eventListener.onTCPMessage(message);
}
});
}
Log.d(TAG, "Receiving thread exiting...");
// Close the rawSocket if it is still open.
disconnect();
}
/**
* Closes the rawSocket if it is still open. Also fires the onTCPClose event.
*/
public void disconnect() {
try {
synchronized (rawSocketLock) {
if (rawSocket != null) {
rawSocket.close();
rawSocket = null;
out = null;
executor.execute(new Runnable() {
@Override
public void run() {
eventListener.onTCPClose();
}
});
}
}
} catch (IOException e) {
reportError("Failed to close rawSocket: " + e.getMessage());
}
}
/**
* Sends a message on the socket. Should only be called on the executor thread.
*/
public void send(String message) {
Log.v(TAG, "Send: " + message);
synchronized (rawSocketLock) {
if (out == null) {
reportError("Sending data on closed socket.");
return;
}
out.write(message + "\n");
out.flush();
}
}
}
private class TCPSocketServer extends TCPSocket {
// Server socket is also guarded by rawSocketLock.
private ServerSocket serverSocket;
final private InetAddress address;
final private int port;
public TCPSocketServer(InetAddress address, int port) {
this.address = address;
this.port = port;
}
/** Opens a listening socket and waits for a connection. */
@Override
public Socket connect() {
Log.d(TAG, "Listening on [" + address.getHostAddress() + "]:" + Integer.toString(port));
final ServerSocket tempSocket;
try {
tempSocket = new ServerSocket(port, 0, address);
} catch (IOException e) {
reportError("Failed to create server socket: " + e.getMessage());
return null;
}
synchronized (rawSocketLock) {
if (serverSocket != null) {
Log.e(TAG, "Server rawSocket was already listening and new will be opened.");
}
serverSocket = tempSocket;
}
try {
return tempSocket.accept();
} catch (IOException e) {
reportError("Failed to receive connection: " + e.getMessage());
return null;
}
}
/** Closes the listening socket and calls super. */
@Override
public void disconnect() {
try {
synchronized (rawSocketLock) {
if (serverSocket != null) {
serverSocket.close();
serverSocket = null;
}
}
} catch (IOException e) {
reportError("Failed to close server socket: " + e.getMessage());
}
super.disconnect();
}
@Override
public boolean isServer() {
return true;
}
}
private class TCPSocketClient extends TCPSocket {
final private InetAddress address;
final private int port;
public TCPSocketClient(InetAddress address, int port) {
this.address = address;
this.port = port;
}
/** Connects to the peer. */
@Override
public Socket connect() {
Log.d(TAG, "Connecting to [" + address.getHostAddress() + "]:" + Integer.toString(port));
try {
return new Socket(address, port);
} catch (IOException e) {
reportError("Failed to connect: " + e.getMessage());
return null;
}
}
@Override
public boolean isServer() {
return false;
}
}
}

View File

@ -0,0 +1,195 @@
/*
* Copyright 2016 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
package org.appspot.apprtc;
import org.appspot.apprtc.util.LooperExecutor;
import org.appspot.apprtc.util.RobolectricLooperExecutor;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.robolectric.RobolectricTestRunner;
import org.robolectric.annotation.Config;
import org.robolectric.shadows.ShadowLog;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@RunWith(RobolectricTestRunner.class)
@Config(manifest = Config.NONE)
public class TCPChannelClientTest {
private static final int PORT = 8888;
/**
* How long we wait before trying to connect to the server. Chosen quite arbitrarily and
* could be made smaller if need be.
*/
private static final int SERVER_WAIT = 10;
private static final int CONNECT_TIMEOUT = 100;
private static final int SEND_TIMEOUT = 100;
private static final int DISCONNECT_TIMEOUT = 100;
private static final String TEST_MESSAGE_SERVER = "Hello, Server!";
private static final String TEST_MESSAGE_CLIENT = "Hello, Client!";
@Mock TCPChannelClient.TCPChannelEvents serverEvents;
@Mock TCPChannelClient.TCPChannelEvents clientEvents;
private RobolectricLooperExecutor executor;
private TCPChannelClient server;
private TCPChannelClient client;
@Before
public void setUp() {
ShadowLog.stream = System.out;
MockitoAnnotations.initMocks(this);
executor = new RobolectricLooperExecutor();
executor.requestStart();
}
@After
public void tearDown() {
verifyNoMoreEvents();
executor.executeAndWait(new Runnable() {
@Override
public void run() {
client.disconnect();
server.disconnect();
}
});
// Stop the executor thread
executor.requestStop();
try {
executor.join();
} catch (InterruptedException e) {
fail(e.getMessage());
}
}
@Test
public void testConnectIPv4() {
setUpIPv4Server();
try {
Thread.sleep(SERVER_WAIT);
} catch (InterruptedException e) {
fail(e.getMessage());
}
setUpIPv4Client();
verify(serverEvents, timeout(CONNECT_TIMEOUT)).onTCPConnected(true);
verify(clientEvents, timeout(CONNECT_TIMEOUT)).onTCPConnected(false);
}
@Test
public void testConnectIPv6() {
setUpIPv6Server();
try {
Thread.sleep(SERVER_WAIT);
} catch (InterruptedException e) {
fail(e.getMessage());
}
setUpIPv6Client();
verify(serverEvents, timeout(CONNECT_TIMEOUT)).onTCPConnected(true);
verify(clientEvents, timeout(CONNECT_TIMEOUT)).onTCPConnected(false);
}
@Test
public void testSendData() {
testConnectIPv4();
executor.executeAndWait(new Runnable() {
@Override
public void run() {
client.send(TEST_MESSAGE_SERVER);
server.send(TEST_MESSAGE_CLIENT);
}
});
verify(serverEvents, timeout(SEND_TIMEOUT)).onTCPMessage(TEST_MESSAGE_SERVER);
verify(clientEvents, timeout(SEND_TIMEOUT)).onTCPMessage(TEST_MESSAGE_CLIENT);
}
@Test
public void testDisconnectServer() {
testConnectIPv4();
executor.executeAndWait(new Runnable() {
@Override
public void run() {
server.disconnect();
}
});
verify(serverEvents, timeout(DISCONNECT_TIMEOUT)).onTCPClose();
verify(clientEvents, timeout(DISCONNECT_TIMEOUT)).onTCPClose();
}
@Test
public void testDisconnectClient() {
testConnectIPv4();
executor.executeAndWait(new Runnable() {
@Override
public void run() {
client.disconnect();
}
});
verify(serverEvents, timeout(DISCONNECT_TIMEOUT)).onTCPClose();
verify(clientEvents, timeout(DISCONNECT_TIMEOUT)).onTCPClose();
}
private void setUpIPv4Server() {
setUpServer("0.0.0.0", PORT);
}
private void setUpIPv4Client() {
setUpClient("127.0.0.1", PORT);
}
private void setUpIPv6Server() {
setUpServer("::", PORT);
}
private void setUpIPv6Client() {
setUpClient("::1", PORT);
}
private void setUpServer(String ip, int port) {
server = new TCPChannelClient(executor, serverEvents, ip, port);
}
private void setUpClient(String ip, int port) {
client = new TCPChannelClient(executor, clientEvents, ip, port);
}
/**
* Verifies no more server or client events have been issued
*/
private void verifyNoMoreEvents() {
verifyNoMoreInteractions(serverEvents);
verifyNoMoreInteractions(clientEvents);
}
}

View File

@ -0,0 +1,118 @@
/*
* Copyright 2016 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
package org.appspot.apprtc.util;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import static org.junit.Assert.fail;
/**
* LooperExecutor that doesn't use Looper because its implementation in Robolectric is not suited
* for our needs. Also implements executeAndWait that can be used to wait until the runnable has
* been executed.
*/
public class RobolectricLooperExecutor extends LooperExecutor {
private volatile boolean running = false;
private static final int RUNNABLE_QUEUE_CAPACITY = 256;
private final BlockingQueue<Runnable> runnableQueue
= new ArrayBlockingQueue<>(RUNNABLE_QUEUE_CAPACITY);
private long threadId;
/**
* Executes the runnable passed to the constructor and sets isDone flag afterwards.
*/
private static class ExecuteAndWaitRunnable implements Runnable {
public boolean isDone = false;
private final Runnable runnable;
ExecuteAndWaitRunnable(Runnable runnable) {
this.runnable = runnable;
}
@Override
public void run() {
runnable.run();
synchronized (this) {
isDone = true;
notifyAll();
}
}
}
@Override
public void run() {
threadId = Thread.currentThread().getId();
while (running) {
final Runnable runnable;
try {
runnable = runnableQueue.take();
} catch (InterruptedException e) {
if (running) {
fail(e.getMessage());
}
return;
}
runnable.run();
}
}
@Override
public synchronized void requestStart() {
if (running) {
return;
}
running = true;
start();
}
@Override
public synchronized void requestStop() {
running = false;
interrupt();
}
@Override
public synchronized void execute(Runnable runnable) {
try {
runnableQueue.put(runnable);
} catch (InterruptedException e) {
fail(e.getMessage());
}
}
/**
* Queues runnable to be run and waits for it to be executed by the executor thread
*/
public void executeAndWait(Runnable runnable) {
ExecuteAndWaitRunnable executeAndWaitRunnable = new ExecuteAndWaitRunnable(runnable);
execute(executeAndWaitRunnable);
synchronized (executeAndWaitRunnable) {
while (!executeAndWaitRunnable.isDone) {
try {
executeAndWaitRunnable.wait();
} catch (InterruptedException e) {
fail(e.getMessage());
}
}
}
}
@Override
public boolean checkOnLooperThread() {
return (Thread.currentThread().getId() == threadId);
}
}