davidliu
Committed by GitHub

Don't emit RoomEvent.Reconnecting for resumes (#371)

* Don't emit RoomEvent.Reconnecting for resumes

* spotless
1 /* 1 /*
2 - * Copyright 2023 LiveKit, Inc. 2 + * Copyright 2023-2024 LiveKit, Inc.
3 * 3 *
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
@@ -21,4 +21,5 @@ enum class ConnectionState { @@ -21,4 +21,5 @@ enum class ConnectionState {
21 CONNECTED, 21 CONNECTED,
22 DISCONNECTED, 22 DISCONNECTED,
23 RECONNECTING, 23 RECONNECTING,
  24 + RESUMING,
24 } 25 }
@@ -31,7 +31,9 @@ import io.livekit.android.room.util.createAnswer @@ -31,7 +31,9 @@ import io.livekit.android.room.util.createAnswer
31 import io.livekit.android.room.util.setLocalDescription 31 import io.livekit.android.room.util.setLocalDescription
32 import io.livekit.android.util.CloseableCoroutineScope 32 import io.livekit.android.util.CloseableCoroutineScope
33 import io.livekit.android.util.Either 33 import io.livekit.android.util.Either
  34 +import io.livekit.android.util.FlowObservable
34 import io.livekit.android.util.LKLog 35 import io.livekit.android.util.LKLog
  36 +import io.livekit.android.util.flowDelegate
35 import io.livekit.android.util.nullSafe 37 import io.livekit.android.util.nullSafe
36 import io.livekit.android.webrtc.RTCStatsGetter 38 import io.livekit.android.webrtc.RTCStatsGetter
37 import io.livekit.android.webrtc.copy 39 import io.livekit.android.webrtc.copy
@@ -73,14 +75,12 @@ internal constructor( @@ -73,14 +75,12 @@ internal constructor(
73 /** 75 /**
74 * Reflects the combined connection state of SignalClient and primary PeerConnection. 76 * Reflects the combined connection state of SignalClient and primary PeerConnection.
75 */ 77 */
76 - internal var connectionState: ConnectionState = ConnectionState.DISCONNECTED  
77 - set(value) {  
78 - val oldVal = field  
79 - field = value  
80 - if (value == oldVal) {  
81 - return 78 + @FlowObservable
  79 + internal var connectionState: ConnectionState by flowDelegate(ConnectionState.DISCONNECTED) { newVal, oldVal ->
  80 + if (newVal == oldVal) {
  81 + return@flowDelegate
82 } 82 }
83 - when (value) { 83 + when (newVal) {
84 ConnectionState.CONNECTED -> { 84 ConnectionState.CONNECTED -> {
85 if (oldVal == ConnectionState.DISCONNECTED) { 85 if (oldVal == ConnectionState.DISCONNECTED) {
86 LKLog.d { "primary ICE connected" } 86 LKLog.d { "primary ICE connected" }
@@ -88,6 +88,8 @@ internal constructor( @@ -88,6 +88,8 @@ internal constructor(
88 } else if (oldVal == ConnectionState.RECONNECTING) { 88 } else if (oldVal == ConnectionState.RECONNECTING) {
89 LKLog.d { "primary ICE reconnected" } 89 LKLog.d { "primary ICE reconnected" }
90 listener?.onEngineReconnected() 90 listener?.onEngineReconnected()
  91 + } else if (oldVal == ConnectionState.RESUMING) {
  92 + listener?.onEngineResumed()
91 } 93 }
92 } 94 }
93 95
@@ -102,7 +104,6 @@ internal constructor( @@ -102,7 +104,6 @@ internal constructor(
102 } 104 }
103 } 105 }
104 } 106 }
105 -  
106 internal var reconnectType: ReconnectType = ReconnectType.DEFAULT 107 internal var reconnectType: ReconnectType = ReconnectType.DEFAULT
107 private var reconnectingJob: Job? = null 108 private var reconnectingJob: Job? = null
108 private var fullReconnectOnNext = false 109 private var fullReconnectOnNext = false
@@ -373,8 +374,8 @@ internal constructor( @@ -373,8 +374,8 @@ internal constructor(
373 val forceFullReconnect = fullReconnectOnNext 374 val forceFullReconnect = fullReconnectOnNext
374 fullReconnectOnNext = false 375 fullReconnectOnNext = false
375 val job = coroutineScope.launch { 376 val job = coroutineScope.launch {
376 - connectionState = ConnectionState.RECONNECTING  
377 - listener?.onEngineReconnecting() 377 + var hasResumedOnce = false
  378 + var hasReconnectedOnce = false
378 379
379 val reconnectStartTime = SystemClock.elapsedRealtime() 380 val reconnectStartTime = SystemClock.elapsedRealtime()
380 for (retries in 0 until MAX_RECONNECT_RETRIES) { 381 for (retries in 0 until MAX_RECONNECT_RETRIES) {
@@ -406,6 +407,12 @@ internal constructor( @@ -406,6 +407,12 @@ internal constructor(
406 val connectOptions = connectOptions ?: ConnectOptions() 407 val connectOptions = connectOptions ?: ConnectOptions()
407 if (isFullReconnect) { 408 if (isFullReconnect) {
408 LKLog.v { "Attempting full reconnect." } 409 LKLog.v { "Attempting full reconnect." }
  410 +
  411 + if (!hasReconnectedOnce) {
  412 + hasReconnectedOnce = true
  413 + listener?.onEngineReconnecting()
  414 + }
  415 + connectionState = ConnectionState.RECONNECTING
409 try { 416 try {
410 closeResources("Full Reconnecting") 417 closeResources("Full Reconnecting")
411 listener?.onFullReconnecting() 418 listener?.onFullReconnecting()
@@ -416,6 +423,11 @@ internal constructor( @@ -416,6 +423,11 @@ internal constructor(
416 continue 423 continue
417 } 424 }
418 } else { 425 } else {
  426 + if (!hasResumedOnce) {
  427 + hasResumedOnce = true
  428 + listener?.onEngineResuming()
  429 + }
  430 + connectionState = ConnectionState.RESUMING
419 LKLog.v { "Attempting soft reconnect." } 431 LKLog.v { "Attempting soft reconnect." }
420 subscriber?.prepareForIceRestart() 432 subscriber?.prepareForIceRestart()
421 try { 433 try {
@@ -588,7 +600,7 @@ internal constructor( @@ -588,7 +600,7 @@ internal constructor(
588 MediaConstraintKeys.FALSE, 600 MediaConstraintKeys.FALSE,
589 ), 601 ),
590 ) 602 )
591 - if (connectionState == ConnectionState.RECONNECTING) { 603 + if (connectionState == ConnectionState.RECONNECTING || connectionState == ConnectionState.RESUMING) {
592 add( 604 add(
593 MediaConstraints.KeyValuePair( 605 MediaConstraints.KeyValuePair(
594 MediaConstraintKeys.ICE_RESTART, 606 MediaConstraintKeys.ICE_RESTART,
@@ -678,6 +690,8 @@ internal constructor( @@ -678,6 +690,8 @@ internal constructor(
678 fun onEngineConnected() 690 fun onEngineConnected()
679 fun onEngineReconnected() 691 fun onEngineReconnected()
680 fun onEngineReconnecting() 692 fun onEngineReconnecting()
  693 + fun onEngineResuming() {}
  694 + fun onEngineResumed() {}
681 fun onEngineDisconnected(reason: DisconnectReason) 695 fun onEngineDisconnected(reason: DisconnectReason)
682 fun onFailToConnect(error: Throwable) 696 fun onFailToConnect(error: Throwable)
683 fun onJoinResponse(response: JoinResponse) 697 fun onJoinResponse(response: JoinResponse)
@@ -941,7 +941,7 @@ constructor( @@ -941,7 +941,7 @@ constructor(
941 * @suppress 941 * @suppress
942 */ 942 */
943 override fun onSignalConnected(isResume: Boolean) { 943 override fun onSignalConnected(isResume: Boolean) {
944 - if (state == State.RECONNECTING && isResume) { 944 + if (isResume) {
945 // during resume reconnection, need to send sync state upon signal connection. 945 // during resume reconnection, need to send sync state upon signal connection.
946 sendSyncState() 946 sendSyncState()
947 } 947 }
@@ -293,7 +293,8 @@ class RoomMockE2ETest : MockE2ETest() { @@ -293,7 +293,8 @@ class RoomMockE2ETest : MockE2ETest() {
293 @Test 293 @Test
294 fun onConnectionAvailableWillReconnect() = runTest { 294 fun onConnectionAvailableWillReconnect() = runTest {
295 connect() 295 connect()
296 - val eventCollector = EventCollector(room.events, coroutineRule.scope) 296 + val engine = component.rtcEngine()
  297 + val eventCollector = FlowCollector(engine::connectionState.flow, coroutineRule.scope)
297 val network = Mockito.mock(Network::class.java) 298 val network = Mockito.mock(Network::class.java)
298 299
299 val connectivityManager = InstrumentationRegistry.getInstrumentation() 300 val connectivityManager = InstrumentationRegistry.getInstrumentation()
@@ -308,10 +309,16 @@ class RoomMockE2ETest : MockE2ETest() { @@ -308,10 +309,16 @@ class RoomMockE2ETest : MockE2ETest() {
308 callback.onAvailable(network) 309 callback.onAvailable(network)
309 } 310 }
310 311
  312 + coroutineRule.dispatcher.scheduler.advanceUntilIdle()
311 val events = eventCollector.stopCollecting() 313 val events = eventCollector.stopCollecting()
312 314
313 - Assert.assertEquals(1, events.size)  
314 - Assert.assertEquals(true, events[0] is RoomEvent.Reconnecting) 315 + assertEquals(
  316 + listOf(
  317 + ConnectionState.CONNECTED,
  318 + ConnectionState.RESUMING,
  319 + ),
  320 + events,
  321 + )
315 } 322 }
316 323
317 @Test 324 @Test
1 /* 1 /*
2 - * Copyright 2023 LiveKit, Inc. 2 + * Copyright 2023-2024 LiveKit, Inc.
3 * 3 *
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
@@ -66,6 +66,45 @@ class RoomReconnectionTypesMockE2ETest( @@ -66,6 +66,45 @@ class RoomReconnectionTypesMockE2ETest(
66 room.setReconnectionType(reconnectType) 66 room.setReconnectionType(reconnectType)
67 } 67 }
68 68
  69 + private fun expectedEventsForReconnectType(reconnectType: ReconnectType): List<Class<out RoomEvent>> {
  70 + return when (reconnectType) {
  71 + ReconnectType.FORCE_SOFT_RECONNECT -> {
  72 + emptyList()
  73 + }
  74 +
  75 + ReconnectType.FORCE_FULL_RECONNECT -> {
  76 + listOf(
  77 + RoomEvent.Reconnecting::class.java,
  78 + RoomEvent.Reconnected::class.java,
  79 + )
  80 + }
  81 +
  82 + else -> {
  83 + throw IllegalArgumentException()
  84 + }
  85 + }
  86 + }
  87 +
  88 + private fun expectedStatesForReconnectType(reconnectType: ReconnectType): List<Room.State> {
  89 + return when (reconnectType) {
  90 + ReconnectType.FORCE_SOFT_RECONNECT -> {
  91 + listOf(Room.State.CONNECTED)
  92 + }
  93 +
  94 + ReconnectType.FORCE_FULL_RECONNECT -> {
  95 + listOf(
  96 + Room.State.CONNECTED,
  97 + Room.State.RECONNECTING,
  98 + Room.State.CONNECTED,
  99 + )
  100 + }
  101 +
  102 + else -> {
  103 + throw IllegalArgumentException()
  104 + }
  105 + }
  106 + }
  107 +
69 @Test 108 @Test
70 fun reconnectFromPeerConnectionDisconnect() = runTest { 109 fun reconnectFromPeerConnectionDisconnect() = runTest {
71 connect() 110 connect()
@@ -83,19 +122,12 @@ class RoomReconnectionTypesMockE2ETest( @@ -83,19 +122,12 @@ class RoomReconnectionTypesMockE2ETest(
83 val states = stateCollector.stopCollecting() 122 val states = stateCollector.stopCollecting()
84 123
85 assertIsClassList( 124 assertIsClassList(
86 - listOf(  
87 - RoomEvent.Reconnecting::class.java,  
88 - RoomEvent.Reconnected::class.java,  
89 - ), 125 + expectedEventsForReconnectType(reconnectType),
90 events, 126 events,
91 ) 127 )
92 128
93 assertEquals( 129 assertEquals(
94 - listOf(  
95 - Room.State.CONNECTED,  
96 - Room.State.RECONNECTING,  
97 - Room.State.CONNECTED,  
98 - ), 130 + expectedStatesForReconnectType(reconnectType),
99 states, 131 states,
100 ) 132 )
101 } 133 }
@@ -117,19 +149,12 @@ class RoomReconnectionTypesMockE2ETest( @@ -117,19 +149,12 @@ class RoomReconnectionTypesMockE2ETest(
117 val states = stateCollector.stopCollecting() 149 val states = stateCollector.stopCollecting()
118 150
119 assertIsClassList( 151 assertIsClassList(
120 - listOf(  
121 - RoomEvent.Reconnecting::class.java,  
122 - RoomEvent.Reconnected::class.java,  
123 - ), 152 + expectedEventsForReconnectType(reconnectType),
124 events, 153 events,
125 ) 154 )
126 155
127 assertEquals( 156 assertEquals(
128 - listOf(  
129 - Room.State.CONNECTED,  
130 - Room.State.RECONNECTING,  
131 - Room.State.CONNECTED,  
132 - ), 157 + expectedStatesForReconnectType(reconnectType),
133 states, 158 states,
134 ) 159 )
135 } 160 }