davidliu
Committed by GitHub

PeerConnectionState instead of IceConnectionState (#43)

* connection state enum

* proper cleanup of signal client

* fix tests

* Observe peer connection state instead of ice connection

* fix tests
package io.livekit.android.room
enum class ConnectionState {
CONNECTING,
CONNECTED,
DISCONNECTED,
RECONNECTING;
}
\ No newline at end of file
... ...
... ... @@ -12,8 +12,7 @@ class PublisherTransportObserver(
private val client: SignalClient,
) : PeerConnection.Observer, PeerConnectionTransport.Listener {
var iceConnectionChangeListener: ((newState: PeerConnection.IceConnectionState?) -> Unit)? =
null
var connectionChangeListener: ((newState: PeerConnection.PeerConnectionState?) -> Unit)? = null
override fun onIceCandidate(iceCandidate: IceCandidate?) {
val candidate = iceCandidate ?: return
... ... @@ -27,7 +26,6 @@ class PublisherTransportObserver(
override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) {
LKLog.v { "onIceConnection new state: $newState" }
iceConnectionChangeListener?.invoke(newState)
}
override fun onOffer(sd: SessionDescription) {
... ... @@ -38,6 +36,8 @@ class PublisherTransportObserver(
}
override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) {
LKLog.v { "onConnection new state: $newState" }
connectionChangeListener?.invoke(newState)
}
override fun onSelectedCandidatePairChanged(event: CandidatePairChangeEvent?) {
... ...
... ... @@ -10,6 +10,7 @@ import io.livekit.android.util.CloseableCoroutineScope
import io.livekit.android.util.Either
import io.livekit.android.util.LKLog
import io.livekit.android.webrtc.isConnected
import io.livekit.android.webrtc.isDisconnected
import io.livekit.android.webrtc.toProtoSessionDescription
import kotlinx.coroutines.*
import livekit.LivekitModels
... ... @@ -36,7 +37,11 @@ internal constructor(
@Named(InjectionNames.DISPATCHER_IO) ioDispatcher: CoroutineDispatcher,
) : SignalClient.Listener, DataChannel.Observer {
internal var listener: Listener? = null
internal var iceState: IceState = IceState.DISCONNECTED
/**
* Reflects the combined connection state of SignalClient and primary PeerConnection.
*/
internal var connectionState: ConnectionState = ConnectionState.DISCONNECTED
set(value) {
val oldVal = field
field = value
... ... @@ -44,18 +49,18 @@ internal constructor(
return
}
when (value) {
IceState.CONNECTED -> {
if (oldVal == IceState.DISCONNECTED) {
ConnectionState.CONNECTED -> {
if (oldVal == ConnectionState.DISCONNECTED) {
LKLog.d { "primary ICE connected" }
listener?.onIceConnected()
} else if (oldVal == IceState.RECONNECTING) {
listener?.onEngineConnected()
} else if (oldVal == ConnectionState.RECONNECTING) {
LKLog.d { "primary ICE reconnected" }
listener?.onIceReconnected()
listener?.onEngineReconnected()
}
}
IceState.DISCONNECTED -> {
ConnectionState.DISCONNECTED -> {
LKLog.d { "primary ICE disconnected" }
if (oldVal == IceState.CONNECTED) {
if (oldVal == ConnectionState.CONNECTED) {
reconnect()
}
}
... ... @@ -167,17 +172,14 @@ internal constructor(
null,
)
val iceConnectionStateListener: (PeerConnection.IceConnectionState?) -> Unit = { newState ->
val connectionStateListener: (PeerConnection.PeerConnectionState?) -> Unit = { newState ->
val state =
newState ?: throw NullPointerException("unexpected null new state, what do?")
LKLog.v { "onIceConnection new state: $newState" }
if (state.isConnected()) {
iceState = IceState.CONNECTED
} else if (state == PeerConnection.IceConnectionState.DISCONNECTED ||
state == PeerConnection.IceConnectionState.FAILED
) {
// when we publish tracks, some WebRTC versions will send out disconnected events periodically
iceState = IceState.DISCONNECTED
connectionState = ConnectionState.CONNECTED
} else if (state.isDisconnected()) {
connectionState = ConnectionState.DISCONNECTED
}
}
... ... @@ -191,9 +193,10 @@ internal constructor(
}
dataChannel.registerObserver(this)
}
subscriberObserver.iceConnectionChangeListener = iceConnectionStateListener
subscriberObserver.connectionChangeListener = connectionStateListener
} else {
publisherObserver.iceConnectionChangeListener = iceConnectionStateListener
publisherObserver.connectionChangeListener = connectionStateListener
}
// data channels
... ... @@ -270,7 +273,7 @@ internal constructor(
}
val job = coroutineScope.launch {
listener?.onReconnecting()
listener?.onEngineReconnecting()
for (wsRetries in 0 until MAX_SIGNAL_RETRIES) {
var startDelay = wsRetries.toLong() * wsRetries * 500
... ... @@ -292,7 +295,7 @@ internal constructor(
listener?.onSignalConnected()
subscriber.prepareForIceRestart()
iceState = IceState.RECONNECTING
connectionState = ConnectionState.RECONNECTING
// trigger publisher reconnect
// only restart publisher if it's needed
if (hasPublished) {
... ... @@ -302,20 +305,20 @@ internal constructor(
// wait until ICE connected
val endTime = SystemClock.elapsedRealtime() + MAX_ICE_CONNECT_TIMEOUT_MS;
while (SystemClock.elapsedRealtime() < endTime) {
if (iceState == IceState.CONNECTED) {
if (connectionState == ConnectionState.CONNECTED) {
LKLog.v { "reconnected to ICE" }
break
}
delay(100)
}
if (iceState == IceState.CONNECTED) {
if (connectionState == ConnectionState.CONNECTED) {
return@launch
}
}
listener?.onDisconnect("failed reconnecting.")
listener?.onEngineDisconnected("failed reconnecting.")
close()
}
... ... @@ -409,7 +412,7 @@ internal constructor(
MediaConstraintKeys.FALSE
)
)
if (iceState == IceState.RECONNECTING) {
if (connectionState == ConnectionState.RECONNECTING) {
add(
MediaConstraints.KeyValuePair(
MediaConstraintKeys.ICE_RESTART,
... ... @@ -422,8 +425,11 @@ internal constructor(
}
internal interface Listener {
fun onIceConnected()
fun onIceReconnected()
fun onEngineConnected()
fun onEngineReconnected()
fun onEngineReconnecting()
fun onEngineDisconnected(reason: String)
fun onFailToConnect(error: Throwable)
fun onAddTrack(track: MediaStreamTrack, streams: Array<out MediaStream>)
fun onUpdateParticipants(updates: List<LivekitModels.ParticipantInfo>)
fun onActiveSpeakersUpdate(speakers: List<LivekitModels.SpeakerInfo>)
... ... @@ -431,14 +437,11 @@ internal constructor(
fun onRoomUpdate(update: LivekitModels.Room)
fun onConnectionQuality(updates: List<LivekitRtc.ConnectionQualityInfo>)
fun onSpeakersChanged(speakers: List<LivekitModels.SpeakerInfo>)
fun onDisconnect(reason: String)
fun onFailToConnect(error: Throwable)
fun onUserPacket(packet: LivekitModels.UserPacket, kind: LivekitModels.DataPacket.Kind)
fun onStreamStateUpdate(streamStates: List<LivekitRtc.StreamStateInfo>)
fun onSubscribedQualityUpdate(subscribedQualityUpdate: LivekitRtc.SubscribedQualityUpdate)
fun onSubscriptionPermissionUpdate(subscriptionPermissionUpdate: LivekitRtc.SubscriptionPermissionUpdate)
fun onSignalConnected()
fun onReconnecting()
}
companion object {
... ... @@ -564,7 +567,7 @@ internal constructor(
override fun onLeave(leave: LivekitRtc.LeaveRequest) {
close()
listener?.onDisconnect("server leave")
listener?.onEngineDisconnected("server leave")
}
// Signal error
... ... @@ -628,9 +631,3 @@ internal constructor(
client.sendSyncState(syncState)
}
}
\ No newline at end of file
internal enum class IceState {
DISCONNECTED,
RECONNECTING,
CONNECTED,
}
\ No newline at end of file
... ...
... ... @@ -419,17 +419,17 @@ constructor(
//----------------------------------- RTCEngine.Listener ------------------------------------//
override fun onIceConnected() {
override fun onEngineConnected() {
state = State.CONNECTED
}
override fun onIceReconnected() {
override fun onEngineReconnected() {
state = State.CONNECTED
listener?.onReconnected(this)
eventBus.postEvent(RoomEvent.Reconnected(this), coroutineScope)
}
override fun onReconnecting() {
override fun onEngineReconnecting() {
state = State.RECONNECTING
listener?.onReconnecting(this)
eventBus.postEvent(RoomEvent.Reconnecting(this), coroutineScope)
... ... @@ -546,7 +546,7 @@ constructor(
/**
* @suppress
*/
override fun onDisconnect(reason: String) {
override fun onEngineDisconnected(reason: String) {
LKLog.v { "engine did disconnect: $reason" }
handleDisconnect()
}
... ...
... ... @@ -46,7 +46,7 @@ constructor(
@Named(InjectionNames.SIGNAL_JSON_ENABLED)
private val useJson: Boolean,
@Named(InjectionNames.DISPATCHER_IO)
ioDispatcher: CoroutineDispatcher,
private val ioDispatcher: CoroutineDispatcher,
) : WebSocketListener() {
var isConnected = false
private set
... ... @@ -57,7 +57,7 @@ constructor(
private var lastUrl: String? = null
private var joinContinuation: CancellableContinuation<Either<LivekitRtc.JoinResponse, Unit>>? = null
private val coroutineScope = CloseableCoroutineScope(SupervisorJob() + ioDispatcher)
private lateinit var coroutineScope: CloseableCoroutineScope
private val responseFlow = MutableSharedFlow<LivekitRtc.SignalResponse>(Int.MAX_VALUE)
... ... @@ -109,13 +109,10 @@ constructor(
LKLog.i { "connecting to $wsUrlString" }
isConnected = false
currentWs?.cancel()
currentWs = null
joinContinuation?.cancel()
joinContinuation = null
// Clean up any pre-existing connection.
close()
coroutineScope = CloseableCoroutineScope(SupervisorJob() + ioDispatcher)
lastUrl = wsUrlString
val request = Request.Builder()
... ... @@ -142,6 +139,7 @@ constructor(
//--------------------------------- WebSocket Listener --------------------------------------//
override fun onOpen(webSocket: WebSocket, response: Response) {
if (isReconnecting) {
// no need to wait for join response on reconnection.
isReconnecting = false
isConnected = true
joinContinuation?.resumeWith(Result.success(Either.Right(Unit)))
... ... @@ -482,10 +480,15 @@ constructor(
}.safe()
}
fun close() {
fun close(code: Int = 1000, reason: String = "Normal Closure") {
isConnected = false
if(::coroutineScope.isInitialized) {
coroutineScope.close()
currentWs?.close(1000, "Normal Closure")
}
currentWs?.close(code, reason)
currentWs = null
joinContinuation?.cancel()
joinContinuation = null
}
interface Listener {
... ...
... ... @@ -13,7 +13,7 @@ class SubscriberTransportObserver(
) : PeerConnection.Observer {
var dataChannelListener: ((DataChannel) -> Unit)? = null
var iceConnectionChangeListener: ((PeerConnection.IceConnectionState?) -> Unit)? = null
var connectionChangeListener: ((PeerConnection.PeerConnectionState?) -> Unit)? = null
override fun onIceCandidate(candidate: IceCandidate) {
LKLog.v { "onIceCandidate: $candidate" }
... ... @@ -43,6 +43,7 @@ class SubscriberTransportObserver(
override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) {
LKLog.v { "onConnectionChange new state: $newState" }
connectionChangeListener?.invoke(newState)
}
override fun onSelectedCandidatePairChanged(event: CandidatePairChangeEvent?) {
... ... @@ -53,7 +54,6 @@ class SubscriberTransportObserver(
override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) {
LKLog.v { "onIceConnection new state: $newState" }
iceConnectionChangeListener?.invoke(newState)
}
override fun onIceConnectionReceivingChange(p0: Boolean) {
... ...
... ... @@ -6,13 +6,22 @@ import org.webrtc.PeerConnection
* Completed state is a valid state for a connected connection, so this should be used
* when checking for a connected state
*/
internal fun PeerConnection.isConnected(): Boolean = iceConnectionState().isConnected()
internal fun PeerConnection.isConnected(): Boolean = connectionState().isConnected()
internal fun PeerConnection.IceConnectionState.isConnected(): Boolean {
internal fun PeerConnection.isDisconnected(): Boolean = connectionState().isDisconnected()
internal fun PeerConnection.PeerConnectionState.isConnected(): Boolean {
return this == PeerConnection.PeerConnectionState.CONNECTED
}
internal fun PeerConnection.PeerConnectionState.isDisconnected(): Boolean {
return when (this) {
PeerConnection.IceConnectionState.CONNECTED,
PeerConnection.IceConnectionState.COMPLETED -> true
/**
* [PeerConnection.PeerConnectionState.DISCONNECTED] is explicitly not included here,
* as that is a temporary state and may return to connected state by itself.
*/
PeerConnection.PeerConnectionState.FAILED,
PeerConnection.PeerConnectionState.CLOSED -> true
else -> false
}
}
... ...
... ... @@ -29,7 +29,7 @@ abstract class MockE2ETest {
@get:Rule
var coroutineRule = TestCoroutineRule()
lateinit var component: TestLiveKitComponent
internal lateinit var component: TestLiveKitComponent
lateinit var context: Context
lateinit var room: Room
lateinit var wsFactory: MockWebSocketFactory
... ...
... ... @@ -10,6 +10,7 @@ class MockPeerConnection(
val observer: PeerConnection.Observer?
) : PeerConnection(MockNativePeerConnectionFactory()) {
private var closed = false
var localDesc: SessionDescription? = null
var remoteDesc: SessionDescription? = null
override fun getLocalDescription(): SessionDescription? = localDesc
... ... @@ -140,7 +141,27 @@ class MockPeerConnection(
}
override fun signalingState(): SignalingState {
return super.signalingState()
if (closed) {
return SignalingState.CLOSED
}
if ((localDesc?.type == null && localDesc?.type == null) ||
(localDesc?.type == SessionDescription.Type.OFFER &&
remoteDesc?.type == SessionDescription.Type.ANSWER) ||
(localDesc?.type == SessionDescription.Type.ANSWER &&
remoteDesc?.type == SessionDescription.Type.OFFER)
) {
return SignalingState.STABLE
}
if (localDesc?.type == SessionDescription.Type.OFFER && remoteDesc?.type == null) {
return SignalingState.HAVE_LOCAL_OFFER
}
if (remoteDesc?.type == SessionDescription.Type.OFFER && localDesc?.type == null) {
return SignalingState.HAVE_REMOTE_OFFER
}
throw IllegalStateException("Illegal signalling state? localDesc: $localDesc, remoteDesc: $remoteDesc")
}
private var iceConnectionState = IceConnectionState.NEW
... ... @@ -148,12 +169,34 @@ class MockPeerConnection(
if (field != value) {
field = value
observer?.onIceConnectionChange(field)
connectionState = when (field) {
IceConnectionState.NEW -> PeerConnectionState.NEW
IceConnectionState.CHECKING -> PeerConnectionState.CONNECTING
IceConnectionState.CONNECTED,
IceConnectionState.COMPLETED -> PeerConnectionState.CONNECTED
IceConnectionState.DISCONNECTED -> PeerConnectionState.DISCONNECTED
IceConnectionState.FAILED -> PeerConnectionState.FAILED
IceConnectionState.CLOSED -> PeerConnectionState.CLOSED
}
}
}
private var connectionState = PeerConnectionState.NEW
set(value) {
if (field != value) {
field = value
observer?.onConnectionChange(field)
}
}
override fun iceConnectionState(): IceConnectionState = iceConnectionState
fun moveToIceConnectionState(newState: IceConnectionState) {
if (closed && newState != IceConnectionState.CLOSED) {
throw IllegalArgumentException("peer connection closed, but attempting to move to $newState")
}
when (newState) {
IceConnectionState.NEW,
IceConnectionState.CHECKING,
... ... @@ -189,9 +232,12 @@ class MockPeerConnection(
}
override fun close() {
dispose()
}
override fun dispose() {
iceConnectionState = IceConnectionState.CLOSED
closed = true
}
override fun getNativePeerConnection(): Long = 0L
... ...
... ... @@ -55,7 +55,7 @@ class RTCEngineMockE2ETest : MockE2ETest() {
subPeerConnection.moveToIceConnectionState(PeerConnection.IceConnectionState.CONNECTED)
Assert.assertEquals(IceState.CONNECTED, rtcEngine.iceState)
Assert.assertEquals(ConnectionState.CONNECTED, rtcEngine.connectionState)
}
@Test
... ...
... ... @@ -104,7 +104,7 @@ class RoomTest {
connect()
val eventCollector = EventCollector(room.events, coroutineRule.scope)
room.onDisconnect("")
room.onEngineDisconnected("")
val events = eventCollector.stopCollecting()
Assert.assertEquals(1, events.size)
... ...