davidliu
Committed by GitHub

reconnect upon publisher failure (#77)

@@ -12,7 +12,7 @@ class PublisherTransportObserver( @@ -12,7 +12,7 @@ class PublisherTransportObserver(
12 private val client: SignalClient, 12 private val client: SignalClient,
13 ) : PeerConnection.Observer, PeerConnectionTransport.Listener { 13 ) : PeerConnection.Observer, PeerConnectionTransport.Listener {
14 14
15 - var connectionChangeListener: ((newState: PeerConnection.PeerConnectionState?) -> Unit)? = null 15 + var connectionChangeListener: ((newState: PeerConnection.PeerConnectionState) -> Unit)? = null
16 16
17 override fun onIceCandidate(iceCandidate: IceCandidate?) { 17 override fun onIceCandidate(iceCandidate: IceCandidate?) {
18 val candidate = iceCandidate ?: return 18 val candidate = iceCandidate ?: return
@@ -35,7 +35,7 @@ class PublisherTransportObserver( @@ -35,7 +35,7 @@ class PublisherTransportObserver(
35 override fun onStandardizedIceConnectionChange(newState: PeerConnection.IceConnectionState?) { 35 override fun onStandardizedIceConnectionChange(newState: PeerConnection.IceConnectionState?) {
36 } 36 }
37 37
38 - override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) { 38 + override fun onConnectionChange(newState: PeerConnection.PeerConnectionState) {
39 LKLog.v { "onConnection new state: $newState" } 39 LKLog.v { "onConnection new state: $newState" }
40 connectionChangeListener?.invoke(newState) 40 connectionChangeListener?.invoke(newState)
41 } 41 }
@@ -210,13 +210,11 @@ internal constructor( @@ -210,13 +210,11 @@ internal constructor(
210 null, 210 null,
211 ) 211 )
212 212
213 - val connectionStateListener: (PeerConnection.PeerConnectionState?) -> Unit = { newState ->  
214 - val state =  
215 - newState ?: throw NullPointerException("unexpected null new state, what do?") 213 + val connectionStateListener: (PeerConnection.PeerConnectionState) -> Unit = { newState ->
216 LKLog.v { "onIceConnection new state: $newState" } 214 LKLog.v { "onIceConnection new state: $newState" }
217 - if (state.isConnected()) { 215 + if (newState.isConnected()) {
218 connectionState = ConnectionState.CONNECTED 216 connectionState = ConnectionState.CONNECTED
219 - } else if (state.isDisconnected()) { 217 + } else if (newState.isDisconnected()) {
220 connectionState = ConnectionState.DISCONNECTED 218 connectionState = ConnectionState.DISCONNECTED
221 } 219 }
222 } 220 }
@@ -233,6 +231,12 @@ internal constructor( @@ -233,6 +231,12 @@ internal constructor(
233 } 231 }
234 232
235 subscriberObserver.connectionChangeListener = connectionStateListener 233 subscriberObserver.connectionChangeListener = connectionStateListener
  234 + // Also reconnect on publisher disconnect
  235 + publisherObserver.connectionChangeListener = { newState ->
  236 + if (newState.isDisconnected()) {
  237 + reconnect()
  238 + }
  239 + }
236 } else { 240 } else {
237 publisherObserver.connectionChangeListener = connectionStateListener 241 publisherObserver.connectionChangeListener = connectionStateListener
238 } 242 }
@@ -387,6 +391,16 @@ internal constructor( @@ -387,6 +391,16 @@ internal constructor(
387 } 391 }
388 // wait until ICE connected 392 // wait until ICE connected
389 val endTime = SystemClock.elapsedRealtime() + MAX_ICE_CONNECT_TIMEOUT_MS 393 val endTime = SystemClock.elapsedRealtime() + MAX_ICE_CONNECT_TIMEOUT_MS
  394 + if (hasPublished) {
  395 + while (SystemClock.elapsedRealtime() < endTime) {
  396 + if (publisher.peerConnection.connectionState().isConnected()) {
  397 + LKLog.v { "publisher reconnected to ICE" }
  398 + break
  399 + }
  400 + delay(100)
  401 + }
  402 + }
  403 +
390 while (SystemClock.elapsedRealtime() < endTime) { 404 while (SystemClock.elapsedRealtime() < endTime) {
391 if (connectionState == ConnectionState.CONNECTED) { 405 if (connectionState == ConnectionState.CONNECTED) {
392 LKLog.v { "reconnected to ICE" } 406 LKLog.v { "reconnected to ICE" }
@@ -395,7 +409,9 @@ internal constructor( @@ -395,7 +409,9 @@ internal constructor(
395 delay(100) 409 delay(100)
396 } 410 }
397 411
398 - if (connectionState == ConnectionState.CONNECTED) { 412 + if (connectionState == ConnectionState.CONNECTED &&
  413 + (!hasPublished || publisher.peerConnection.connectionState().isConnected())
  414 + ) {
399 client.onPCConnected() 415 client.onPCConnected()
400 listener?.onPostReconnect(isFullReconnect) 416 listener?.onPostReconnect(isFullReconnect)
401 return@launch 417 return@launch
@@ -13,7 +13,7 @@ class SubscriberTransportObserver( @@ -13,7 +13,7 @@ class SubscriberTransportObserver(
13 ) : PeerConnection.Observer { 13 ) : PeerConnection.Observer {
14 14
15 var dataChannelListener: ((DataChannel) -> Unit)? = null 15 var dataChannelListener: ((DataChannel) -> Unit)? = null
16 - var connectionChangeListener: ((PeerConnection.PeerConnectionState?) -> Unit)? = null 16 + var connectionChangeListener: ((PeerConnection.PeerConnectionState) -> Unit)? = null
17 17
18 override fun onIceCandidate(candidate: IceCandidate) { 18 override fun onIceCandidate(candidate: IceCandidate) {
19 LKLog.v { "onIceCandidate: $candidate" } 19 LKLog.v { "onIceCandidate: $candidate" }
@@ -41,7 +41,7 @@ class SubscriberTransportObserver( @@ -41,7 +41,7 @@ class SubscriberTransportObserver(
41 override fun onStandardizedIceConnectionChange(newState: PeerConnection.IceConnectionState?) { 41 override fun onStandardizedIceConnectionChange(newState: PeerConnection.IceConnectionState?) {
42 } 42 }
43 43
44 - override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) { 44 + override fun onConnectionChange(newState: PeerConnection.PeerConnectionState) {
45 LKLog.v { "onConnectionChange new state: $newState" } 45 LKLog.v { "onConnectionChange new state: $newState" }
46 connectionChangeListener?.invoke(newState) 46 connectionChangeListener?.invoke(newState)
47 } 47 }
@@ -223,9 +223,7 @@ class MockPeerConnection( @@ -223,9 +223,7 @@ class MockPeerConnection(
223 } 223 }
224 } 224 }
225 225
226 - override fun connectionState(): PeerConnectionState {  
227 - return super.connectionState()  
228 - } 226 + override fun connectionState(): PeerConnectionState = connectionState
229 227
230 override fun iceGatheringState(): IceGatheringState { 228 override fun iceGatheringState(): IceGatheringState {
231 return super.iceGatheringState() 229 return super.iceGatheringState()
@@ -2,7 +2,6 @@ package io.livekit.android.room @@ -2,7 +2,6 @@ package io.livekit.android.room
2 2
3 import io.livekit.android.MockE2ETest 3 import io.livekit.android.MockE2ETest
4 import io.livekit.android.mock.MockPeerConnection 4 import io.livekit.android.mock.MockPeerConnection
5 -import io.livekit.android.mock.MockWebSocket  
6 import io.livekit.android.util.toOkioByteString 5 import io.livekit.android.util.toOkioByteString
7 import io.livekit.android.util.toPBByteString 6 import io.livekit.android.util.toPBByteString
8 import kotlinx.coroutines.ExperimentalCoroutinesApi 7 import kotlinx.coroutines.ExperimentalCoroutinesApi
@@ -12,6 +11,7 @@ import org.junit.Before @@ -12,6 +11,7 @@ import org.junit.Before
12 import org.junit.Test 11 import org.junit.Test
13 import org.junit.runner.RunWith 12 import org.junit.runner.RunWith
14 import org.robolectric.RobolectricTestRunner 13 import org.robolectric.RobolectricTestRunner
  14 +import org.webrtc.PeerConnection
15 15
16 16
17 @ExperimentalCoroutinesApi 17 @ExperimentalCoroutinesApi
@@ -57,6 +57,30 @@ class RTCEngineMockE2ETest : MockE2ETest() { @@ -57,6 +57,30 @@ class RTCEngineMockE2ETest : MockE2ETest() {
57 } 57 }
58 58
59 @Test 59 @Test
  60 + fun reconnectOnSubscriberFailure() = runTest {
  61 + connect()
  62 + val oldWs = wsFactory.ws
  63 +
  64 + val subPeerConnection = rtcEngine.subscriber.peerConnection as MockPeerConnection
  65 + subPeerConnection.moveToIceConnectionState(PeerConnection.IceConnectionState.FAILED)
  66 +
  67 + val newWs = wsFactory.ws
  68 + Assert.assertNotEquals(oldWs, newWs)
  69 + }
  70 +
  71 + @Test
  72 + fun reconnectOnPublisherFailure() = runTest {
  73 + connect()
  74 + val oldWs = wsFactory.ws
  75 +
  76 + val pubPeerConnection = rtcEngine.publisher.peerConnection as MockPeerConnection
  77 + pubPeerConnection.moveToIceConnectionState(PeerConnection.IceConnectionState.FAILED)
  78 +
  79 + val newWs = wsFactory.ws
  80 + Assert.assertNotEquals(oldWs, newWs)
  81 + }
  82 +
  83 + @Test
60 fun refreshToken() = runTest { 84 fun refreshToken() = runTest {
61 connect() 85 connect()
62 86