davidliu
Committed by GitHub

Permanent local participant to fix NPEs when not connected (#108)

* Permanent local participant to fix npe

* fix tests
@@ -135,12 +135,9 @@ constructor( @@ -135,12 +135,9 @@ constructor(
135 */ 135 */
136 var videoTrackPublishDefaults: VideoTrackPublishDefaults by defaultsManager::videoTrackPublishDefaults 136 var videoTrackPublishDefaults: VideoTrackPublishDefaults by defaultsManager::videoTrackPublishDefaults
137 137
138 - var _localParticipant: LocalParticipant? = null  
139 - val localParticipant: LocalParticipant  
140 - get() {  
141 - return _localParticipant  
142 - ?: throw UninitializedPropertyAccessException("localParticipant has not been initialized yet.")  
143 - } 138 + val localParticipant: LocalParticipant = localParticipantFactory.create(dynacast = dynacast).apply {
  139 + internalListener = this@Room
  140 + }
144 141
145 private var mutableRemoteParticipants by flowDelegate(emptyMap<String, RemoteParticipant>()) 142 private var mutableRemoteParticipants by flowDelegate(emptyMap<String, RemoteParticipant>())
146 143
@@ -174,6 +171,44 @@ constructor( @@ -174,6 +171,44 @@ constructor(
174 coroutineScope.cancel() 171 coroutineScope.cancel()
175 } 172 }
176 coroutineScope = CoroutineScope(defaultDispatcher + SupervisorJob()) 173 coroutineScope = CoroutineScope(defaultDispatcher + SupervisorJob())
  174 +
  175 + // Setup local participant.
  176 + localParticipant.reinitialize()
  177 + coroutineScope.launch {
  178 + localParticipant.events.collect {
  179 + when (it) {
  180 + is ParticipantEvent.TrackPublished -> emitWhenConnected(
  181 + RoomEvent.TrackPublished(
  182 + room = this@Room,
  183 + publication = it.publication,
  184 + participant = it.participant,
  185 + )
  186 + )
  187 + is ParticipantEvent.ParticipantPermissionsChanged -> emitWhenConnected(
  188 + RoomEvent.ParticipantPermissionsChanged(
  189 + room = this@Room,
  190 + participant = it.participant,
  191 + newPermissions = it.newPermissions,
  192 + oldPermissions = it.oldPermissions,
  193 + )
  194 + )
  195 + is ParticipantEvent.MetadataChanged -> {
  196 + listener?.onMetadataChanged(it.participant, it.prevMetadata, this@Room)
  197 + emitWhenConnected(
  198 + RoomEvent.ParticipantMetadataChanged(
  199 + this@Room,
  200 + it.participant,
  201 + it.prevMetadata
  202 + )
  203 + )
  204 + }
  205 + else -> {
  206 + /* do nothing */
  207 + }
  208 + }
  209 + }
  210 + }
  211 +
177 state = State.CONNECTING 212 state = State.CONNECTING
178 connectOptions = options 213 connectOptions = options
179 engine.join(url, token, options, getCurrentRoomOptions()) 214 engine.join(url, token, options, getCurrentRoomOptions())
@@ -214,37 +249,7 @@ constructor( @@ -214,37 +249,7 @@ constructor(
214 return 249 return
215 } 250 }
216 251
217 - if (_localParticipant == null) {  
218 - val lp = localParticipantFactory.create(response.participant, dynacast)  
219 - lp.internalListener = this  
220 - coroutineScope.launch {  
221 - lp.events.collect {  
222 - when (it) {  
223 - is ParticipantEvent.TrackPublished -> eventBus.postEvent(  
224 - RoomEvent.TrackPublished(  
225 - room = this@Room,  
226 - publication = it.publication,  
227 - participant = it.participant,  
228 - )  
229 - )  
230 - is ParticipantEvent.ParticipantPermissionsChanged -> eventBus.postEvent(  
231 - RoomEvent.ParticipantPermissionsChanged(  
232 - room = this@Room,  
233 - participant = it.participant,  
234 - newPermissions = it.newPermissions,  
235 - oldPermissions = it.oldPermissions,  
236 - )  
237 - )  
238 - else -> {  
239 - /* do nothing */  
240 - }  
241 - }  
242 - }  
243 - }  
244 - _localParticipant = lp  
245 - } else {  
246 - localParticipant.updateFromInfo(response.participant)  
247 - } 252 + localParticipant.updateFromInfo(response.participant)
248 253
249 if (response.otherParticipantsList.isNotEmpty()) { 254 if (response.otherParticipantsList.isNotEmpty()) {
250 response.otherParticipantsList.forEach { 255 response.otherParticipantsList.forEach {
@@ -319,6 +324,16 @@ constructor( @@ -319,6 +324,16 @@ constructor(
319 it.subscriptionAllowed 324 it.subscriptionAllowed
320 ) 325 )
321 ) 326 )
  327 + is ParticipantEvent.MetadataChanged -> {
  328 + listener?.onMetadataChanged(it.participant, it.prevMetadata, this@Room)
  329 + emitWhenConnected(
  330 + RoomEvent.ParticipantMetadataChanged(
  331 + this@Room,
  332 + it.participant,
  333 + it.prevMetadata
  334 + )
  335 + )
  336 + }
322 is ParticipantEvent.ParticipantPermissionsChanged -> eventBus.postEvent( 337 is ParticipantEvent.ParticipantPermissionsChanged -> eventBus.postEvent(
323 RoomEvent.ParticipantPermissionsChanged( 338 RoomEvent.ParticipantPermissionsChanged(
324 room = this@Room, 339 room = this@Room,
@@ -413,7 +428,7 @@ constructor( @@ -413,7 +428,7 @@ constructor(
413 * Removes all participants and tracks from the room. 428 * Removes all participants and tracks from the room.
414 */ 429 */
415 private fun cleanupRoom() { 430 private fun cleanupRoom() {
416 - _localParticipant?.cleanup() 431 + localParticipant.cleanup()
417 remoteParticipants.keys.toMutableSet() // copy keys to avoid concurrent modifications. 432 remoteParticipants.keys.toMutableSet() // copy keys to avoid concurrent modifications.
418 .forEach { sid -> handleParticipantDisconnect(sid) } 433 .forEach { sid -> handleParticipantDisconnect(sid) }
419 } 434 }
@@ -436,8 +451,7 @@ constructor( @@ -436,8 +451,7 @@ constructor(
436 451
437 listener?.onDisconnect(this, null) 452 listener?.onDisconnect(this, null)
438 listener = null 453 listener = null
439 - _localParticipant?.dispose()  
440 - _localParticipant = null 454 + localParticipant.dispose()
441 455
442 // Ensure all observers see the disconnected before closing scope. 456 // Ensure all observers see the disconnected before closing scope.
443 runBlocking { 457 runBlocking {
@@ -742,8 +756,6 @@ constructor( @@ -742,8 +756,6 @@ constructor(
742 * @suppress 756 * @suppress
743 */ 757 */
744 override fun onMetadataChanged(participant: Participant, prevMetadata: String?) { 758 override fun onMetadataChanged(participant: Participant, prevMetadata: String?) {
745 - listener?.onMetadataChanged(participant, prevMetadata, this)  
746 - eventBus.postEvent(RoomEvent.ParticipantMetadataChanged(this, participant, prevMetadata), coroutineScope)  
747 } 759 }
748 760
749 /** @suppress */ 761 /** @suppress */
@@ -826,6 +838,11 @@ constructor( @@ -826,6 +838,11 @@ constructor(
826 viewRenderer.setEnableHardwareScaler(false /* enabled */) 838 viewRenderer.setEnableHardwareScaler(false /* enabled */)
827 } 839 }
828 840
  841 + private suspend fun emitWhenConnected(event: RoomEvent) {
  842 + if (state == State.CONNECTED) {
  843 + eventBus.postEvent(event)
  844 + }
  845 + }
829 } 846 }
830 847
831 /** 848 /**
@@ -16,7 +16,6 @@ import io.livekit.android.room.track.* @@ -16,7 +16,6 @@ import io.livekit.android.room.track.*
16 import io.livekit.android.room.util.EncodingUtils 16 import io.livekit.android.room.util.EncodingUtils
17 import io.livekit.android.util.LKLog 17 import io.livekit.android.util.LKLog
18 import kotlinx.coroutines.CoroutineDispatcher 18 import kotlinx.coroutines.CoroutineDispatcher
19 -import kotlinx.coroutines.cancel  
20 import livekit.LivekitModels 19 import livekit.LivekitModels
21 import livekit.LivekitRtc 20 import livekit.LivekitRtc
22 import org.webrtc.* 21 import org.webrtc.*
@@ -27,8 +26,6 @@ class LocalParticipant @@ -27,8 +26,6 @@ class LocalParticipant
27 @AssistedInject 26 @AssistedInject
28 internal constructor( 27 internal constructor(
29 @Assisted 28 @Assisted
30 - info: LivekitModels.ParticipantInfo,  
31 - @Assisted  
32 private val dynacast: Boolean, 29 private val dynacast: Boolean,
33 internal val engine: RTCEngine, 30 internal val engine: RTCEngine,
34 private val peerConnectionFactory: PeerConnectionFactory, 31 private val peerConnectionFactory: PeerConnectionFactory,
@@ -39,17 +36,13 @@ internal constructor( @@ -39,17 +36,13 @@ internal constructor(
39 private val defaultsManager: DefaultsManager, 36 private val defaultsManager: DefaultsManager,
40 @Named(InjectionNames.DISPATCHER_DEFAULT) 37 @Named(InjectionNames.DISPATCHER_DEFAULT)
41 coroutineDispatcher: CoroutineDispatcher, 38 coroutineDispatcher: CoroutineDispatcher,
42 -) : Participant(info.sid, info.identity, coroutineDispatcher) { 39 +) : Participant("", null, coroutineDispatcher) {
43 40
44 var audioTrackCaptureDefaults: LocalAudioTrackOptions by defaultsManager::audioTrackCaptureDefaults 41 var audioTrackCaptureDefaults: LocalAudioTrackOptions by defaultsManager::audioTrackCaptureDefaults
45 var audioTrackPublishDefaults: AudioTrackPublishDefaults by defaultsManager::audioTrackPublishDefaults 42 var audioTrackPublishDefaults: AudioTrackPublishDefaults by defaultsManager::audioTrackPublishDefaults
46 var videoTrackCaptureDefaults: LocalVideoTrackOptions by defaultsManager::videoTrackCaptureDefaults 43 var videoTrackCaptureDefaults: LocalVideoTrackOptions by defaultsManager::videoTrackCaptureDefaults
47 var videoTrackPublishDefaults: VideoTrackPublishDefaults by defaultsManager::videoTrackPublishDefaults 44 var videoTrackPublishDefaults: VideoTrackPublishDefaults by defaultsManager::videoTrackPublishDefaults
48 45
49 - init {  
50 - updateFromInfo(info)  
51 - }  
52 -  
53 private val localTrackPublications 46 private val localTrackPublications
54 get() = tracks.values 47 get() = tracks.values
55 .mapNotNull { it as? LocalTrackPublication } 48 .mapNotNull { it as? LocalTrackPublication }
@@ -563,9 +556,9 @@ internal constructor( @@ -563,9 +556,9 @@ internal constructor(
563 } 556 }
564 } 557 }
565 558
566 - fun dispose() { 559 + override fun dispose() {
567 cleanup() 560 cleanup()
568 - scope.cancel() 561 + super.dispose()
569 } 562 }
570 563
571 interface PublishListener { 564 interface PublishListener {
@@ -575,7 +568,7 @@ internal constructor( @@ -575,7 +568,7 @@ internal constructor(
575 568
576 @AssistedFactory 569 @AssistedFactory
577 interface Factory { 570 interface Factory {
578 - fun create(info: LivekitModels.ParticipantInfo, dynacast: Boolean): LocalParticipant 571 + fun create(dynacast: Boolean): LocalParticipant
579 } 572 }
580 573
581 companion object { 574 companion object {
@@ -11,9 +11,7 @@ import io.livekit.android.room.track.TrackPublication @@ -11,9 +11,7 @@ import io.livekit.android.room.track.TrackPublication
11 import io.livekit.android.util.FlowObservable 11 import io.livekit.android.util.FlowObservable
12 import io.livekit.android.util.flow 12 import io.livekit.android.util.flow
13 import io.livekit.android.util.flowDelegate 13 import io.livekit.android.util.flowDelegate
14 -import kotlinx.coroutines.CoroutineDispatcher  
15 -import kotlinx.coroutines.CoroutineScope  
16 -import kotlinx.coroutines.SupervisorJob 14 +import kotlinx.coroutines.*
17 import kotlinx.coroutines.flow.* 15 import kotlinx.coroutines.flow.*
18 import livekit.LivekitModels 16 import livekit.LivekitModels
19 import javax.inject.Named 17 import javax.inject.Named
@@ -22,9 +20,17 @@ open class Participant( @@ -22,9 +20,17 @@ open class Participant(
22 var sid: String, 20 var sid: String,
23 identity: String? = null, 21 identity: String? = null,
24 @Named(InjectionNames.DISPATCHER_DEFAULT) 22 @Named(InjectionNames.DISPATCHER_DEFAULT)
25 - coroutineDispatcher: CoroutineDispatcher, 23 + private val coroutineDispatcher: CoroutineDispatcher,
26 ) { 24 ) {
27 - protected val scope = CoroutineScope(coroutineDispatcher + SupervisorJob()) 25 +
  26 + /**
  27 + * To only be used for flow delegate scoping, and should not be cancelled.
  28 + **/
  29 + private val delegateScope = createScope()
  30 + protected var scope: CoroutineScope = createScope()
  31 + private set
  32 +
  33 + private fun createScope() = CoroutineScope(coroutineDispatcher + SupervisorJob())
28 34
29 protected val eventBus = BroadcastEventBus<ParticipantEvent>() 35 protected val eventBus = BroadcastEventBus<ParticipantEvent>()
30 val events = eventBus.readOnly() 36 val events = eventBus.readOnly()
@@ -159,7 +165,7 @@ open class Participant( @@ -159,7 +165,7 @@ open class Participant(
159 stateFlow = ::tracks.flow 165 stateFlow = ::tracks.flow
160 .map { it.filterValues { publication -> publication.kind == Track.Kind.AUDIO } } 166 .map { it.filterValues { publication -> publication.kind == Track.Kind.AUDIO } }
161 .trackUpdateFlow() 167 .trackUpdateFlow()
162 - .stateIn(scope, SharingStarted.Eagerly, emptyList()) 168 + .stateIn(delegateScope, SharingStarted.Eagerly, emptyList())
163 ) 169 )
164 170
165 /** 171 /**
@@ -171,7 +177,7 @@ open class Participant( @@ -171,7 +177,7 @@ open class Participant(
171 stateFlow = ::tracks.flow 177 stateFlow = ::tracks.flow
172 .map { it.filterValues { publication -> publication.kind == Track.Kind.VIDEO } } 178 .map { it.filterValues { publication -> publication.kind == Track.Kind.VIDEO } }
173 .trackUpdateFlow() 179 .trackUpdateFlow()
174 - .stateIn(scope, SharingStarted.Eagerly, emptyList()) 180 + .stateIn(delegateScope, SharingStarted.Eagerly, emptyList())
175 ) 181 )
176 182
177 /** 183 /**
@@ -294,6 +300,16 @@ open class Participant( @@ -294,6 +300,16 @@ open class Participant(
294 scope 300 scope
295 ) 301 )
296 } 302 }
  303 +
  304 + internal fun reinitialize() {
  305 + if (!scope.isActive) {
  306 + scope = createScope()
  307 + }
  308 + }
  309 +
  310 + internal open fun dispose() {
  311 + scope.cancel()
  312 + }
297 } 313 }
298 314
299 @Deprecated("Use Participant.events instead.") 315 @Deprecated("Use Participant.events instead.")
@@ -43,7 +43,7 @@ class RoomMockE2ETest : MockE2ETest() { @@ -43,7 +43,7 @@ class RoomMockE2ETest : MockE2ETest() {
43 val collector = EventCollector(room.events, coroutineRule.scope) 43 val collector = EventCollector(room.events, coroutineRule.scope)
44 connect() 44 connect()
45 val events = collector.stopCollecting() 45 val events = collector.stopCollecting()
46 - assertEquals(0, events.size) 46 + assertEquals(emptyList<RoomEvent>(), events)
47 } 47 }
48 48
49 @Test 49 @Test
@@ -59,7 +59,7 @@ class RoomMockE2ETest : MockE2ETest() { @@ -59,7 +59,7 @@ class RoomMockE2ETest : MockE2ETest() {
59 val collector = EventCollector(room.events, coroutineRule.scope) 59 val collector = EventCollector(room.events, coroutineRule.scope)
60 connect(joinResponse) 60 connect(joinResponse)
61 val events = collector.stopCollecting() 61 val events = collector.stopCollecting()
62 - assertEquals(0, events.size) 62 + assertEquals(emptyList<RoomEvent>(), events)
63 } 63 }
64 64
65 @Test 65 @Test
@@ -6,12 +6,15 @@ import androidx.test.core.app.ApplicationProvider @@ -6,12 +6,15 @@ import androidx.test.core.app.ApplicationProvider
6 import io.livekit.android.audio.NoAudioHandler 6 import io.livekit.android.audio.NoAudioHandler
7 import io.livekit.android.coroutines.TestCoroutineRule 7 import io.livekit.android.coroutines.TestCoroutineRule
8 import io.livekit.android.events.EventCollector 8 import io.livekit.android.events.EventCollector
  9 +import io.livekit.android.events.EventListenable
  10 +import io.livekit.android.events.ParticipantEvent
9 import io.livekit.android.events.RoomEvent 11 import io.livekit.android.events.RoomEvent
10 import io.livekit.android.mock.* 12 import io.livekit.android.mock.*
11 import io.livekit.android.room.participant.LocalParticipant 13 import io.livekit.android.room.participant.LocalParticipant
12 import kotlinx.coroutines.ExperimentalCoroutinesApi 14 import kotlinx.coroutines.ExperimentalCoroutinesApi
  15 +import kotlinx.coroutines.flow.MutableSharedFlow
  16 +import kotlinx.coroutines.flow.SharedFlow
13 import kotlinx.coroutines.test.runTest 17 import kotlinx.coroutines.test.runTest
14 -import livekit.LivekitModels  
15 import org.junit.Assert 18 import org.junit.Assert
16 import org.junit.Before 19 import org.junit.Before
17 import org.junit.Rule 20 import org.junit.Rule
@@ -42,8 +45,13 @@ class RoomTest { @@ -42,8 +45,13 @@ class RoomTest {
42 var eglBase: EglBase = MockEglBase() 45 var eglBase: EglBase = MockEglBase()
43 46
44 val localParticipantFactory = object : LocalParticipant.Factory { 47 val localParticipantFactory = object : LocalParticipant.Factory {
45 - override fun create(info: LivekitModels.ParticipantInfo, dynacast: Boolean): LocalParticipant { 48 + override fun create(dynacast: Boolean): LocalParticipant {
46 return Mockito.mock(LocalParticipant::class.java) 49 return Mockito.mock(LocalParticipant::class.java)
  50 + .apply {
  51 + whenever(this.events).thenReturn(object : EventListenable<ParticipantEvent> {
  52 + override val events: SharedFlow<ParticipantEvent> = MutableSharedFlow()
  53 + })
  54 + }
47 } 55 }
48 } 56 }
49 57
@@ -137,14 +145,6 @@ class RoomTest { @@ -137,14 +145,6 @@ class RoomTest {
137 Assert.assertEquals(true, events[1] is RoomEvent.TrackUnpublished) 145 Assert.assertEquals(true, events[1] is RoomEvent.TrackUnpublished)
138 Assert.assertEquals(true, events[2] is RoomEvent.ParticipantDisconnected) 146 Assert.assertEquals(true, events[2] is RoomEvent.ParticipantDisconnected)
139 Assert.assertEquals(true, events[3] is RoomEvent.Disconnected) 147 Assert.assertEquals(true, events[3] is RoomEvent.Disconnected)
140 -  
141 - var localParticipantEmpty = false  
142 - try{  
143 - room.localParticipant // should throw  
144 - } catch (e: Exception) {  
145 - localParticipantEmpty = true  
146 - }  
147 - Assert.assertTrue(localParticipantEmpty)  
148 Assert.assertTrue(room.remoteParticipants.isEmpty()) 148 Assert.assertTrue(room.remoteParticipants.isEmpty())
149 } 149 }
150 } 150 }