David Liu

room events

1 package io.livekit.android.events 1 package io.livekit.android.events
2 2
  3 +import kotlinx.coroutines.CoroutineScope
  4 +import kotlinx.coroutines.Job
3 import kotlinx.coroutines.flow.MutableSharedFlow 5 import kotlinx.coroutines.flow.MutableSharedFlow
4 import kotlinx.coroutines.flow.asSharedFlow 6 import kotlinx.coroutines.flow.asSharedFlow
5 -import kotlinx.coroutines.flow.collect 7 +import kotlinx.coroutines.launch
6 8
7 class BroadcastEventBus<T> : EventListenable<T> { 9 class BroadcastEventBus<T> : EventListenable<T> {
8 private val mutableEvents = MutableSharedFlow<T>() 10 private val mutableEvents = MutableSharedFlow<T>()
@@ -12,11 +14,19 @@ class BroadcastEventBus<T> : EventListenable<T> { @@ -12,11 +14,19 @@ class BroadcastEventBus<T> : EventListenable<T> {
12 mutableEvents.emit(event) 14 mutableEvents.emit(event)
13 } 15 }
14 16
  17 + fun postEvent(event: T, scope: CoroutineScope): Job {
  18 + return scope.launch { postEvent(event) }
  19 + }
  20 +
15 suspend fun postEvents(eventsToPost: Collection<T>) { 21 suspend fun postEvents(eventsToPost: Collection<T>) {
16 eventsToPost.forEach { event -> 22 eventsToPost.forEach { event ->
17 mutableEvents.emit(event) 23 mutableEvents.emit(event)
18 } 24 }
19 } 25 }
20 26
  27 + fun postEvents(eventsToPost: Collection<T>, scope: CoroutineScope): Job {
  28 + return scope.launch { postEvents(eventsToPost) }
  29 + }
  30 +
21 fun readOnly(): EventListenable<T> = this 31 fun readOnly(): EventListenable<T> = this
22 } 32 }
  1 +package io.livekit.android.events
  2 +
  3 +import io.livekit.android.room.Room
  4 +import io.livekit.android.room.participant.ConnectionQuality
  5 +import io.livekit.android.room.participant.LocalParticipant
  6 +import io.livekit.android.room.participant.Participant
  7 +import io.livekit.android.room.participant.RemoteParticipant
  8 +import io.livekit.android.room.track.LocalTrackPublication
  9 +import io.livekit.android.room.track.Track
  10 +import io.livekit.android.room.track.TrackPublication
  11 +
  12 +sealed class RoomEvent : Event() {
  13 + /**
  14 + * A network change has been detected and LiveKit attempts to reconnect to the room
  15 + * When reconnect attempts succeed, the room state will be kept, including tracks that are subscribed/published
  16 + */
  17 + class Reconnecting(val room: Room): RoomEvent()
  18 +
  19 + /**
  20 + * The reconnect attempt had been successful
  21 + */
  22 + class Reconnected(val room: Room): RoomEvent()
  23 +
  24 + /**
  25 + * Disconnected from room
  26 + */
  27 + class Disconnected(val room: Room, val error: Exception?): RoomEvent()
  28 +
  29 + /**
  30 + * When a [RemoteParticipant] joins after the local participant. It will not emit events
  31 + * for participants that are already in the room
  32 + */
  33 + class ParticipantConnected(val room: Room, val participant: RemoteParticipant): RoomEvent()
  34 +
  35 + /**
  36 + * When a [RemoteParticipant] leaves after the local participant has joined.
  37 + */
  38 + class ParticipantDisconnected(val room: Room, val participant: RemoteParticipant): RoomEvent()
  39 +
  40 + /**
  41 + * Active speakers changed. List of speakers are ordered by their audio level. loudest
  42 + * speakers first. This will include the [LocalParticipant] too.
  43 + */
  44 + class ActiveSpeakersChanged(val room: Room, val speakers: List<Participant>,): RoomEvent()
  45 +
  46 + // Participant callbacks
  47 + /**
  48 + * Participant metadata is a simple way for app-specific state to be pushed to all users.
  49 + * When RoomService.UpdateParticipantMetadata is called to change a participant's state,
  50 + * this event will be fired for all clients in the room.
  51 + */
  52 + class MetadataChanged(val room: Room, val participant: Participant, val prevMetadata: String?): RoomEvent()
  53 +
  54 + /**
  55 + * The participant was muted.
  56 + *
  57 + * For the local participant, the callback will be called if setMute was called on the
  58 + * [LocalTrackPublication], or if the server has requested the participant to be muted
  59 + */
  60 + class TrackMuted(val room: Room, val publication: TrackPublication, val participant: Participant): RoomEvent()
  61 +
  62 + /**
  63 + * The participant was unmuted.
  64 + *
  65 + * For the local participant, the callback will be called if setMute was called on the
  66 + * [LocalTrackPublication], or if the server has requested the participant to be muted
  67 + */
  68 + class TrackUnmuted(val room: Room, val publication: TrackPublication, val participant: Participant): RoomEvent()
  69 +
  70 + /**
  71 + * When a new track is published to room after the local participant has joined. It will
  72 + * not fire for tracks that are already published
  73 + */
  74 + class TrackPublished(val room: Room, val publication: TrackPublication, val participant: Participant): RoomEvent()
  75 +
  76 + /**
  77 + * A [Participant] has unpublished a track
  78 + */
  79 + class TrackUnpublished(val room: Room, val publication: TrackPublication, val participant: Participant): RoomEvent()
  80 +
  81 + /**
  82 + * The [LocalParticipant] has subscribed to a new track. This event will always fire as
  83 + * long as new tracks are ready for use.
  84 + */
  85 + class TrackSubscribed(val room: Room, val track: Track, val publication: TrackPublication, val participant: RemoteParticipant): RoomEvent()
  86 +
  87 + /**
  88 + * Could not subscribe to a track
  89 + */
  90 + class TrackSubscriptionFailed(val room: Room, val sid: String, val exception: Exception, val participant: RemoteParticipant): RoomEvent()
  91 +
  92 + /**
  93 + * A subscribed track is no longer available. Clients should listen to this event and ensure
  94 + * the track removes all renderers
  95 + */
  96 + class TrackUnsubscribed(val room: Room, val track: Track, val publications: TrackPublication, val participant: RemoteParticipant): RoomEvent()
  97 +
  98 + /**
  99 + * Received data published by another participant
  100 + */
  101 + class DataReceived(val room: Room, val data: ByteArray, val participant: RemoteParticipant): RoomEvent()
  102 +
  103 + /**
  104 + * The connection quality for a participant has changed.
  105 + *
  106 + * @param participant Either a remote participant or [Room.localParticipant]
  107 + * @param quality the new connection quality
  108 + */
  109 + class ConnectionQualityChanged(val room: Room, val participant: Participant, val quality: ConnectionQuality): RoomEvent()
  110 +
  111 +}
@@ -11,11 +11,13 @@ import dagger.assisted.AssistedInject @@ -11,11 +11,13 @@ import dagger.assisted.AssistedInject
11 import io.livekit.android.ConnectOptions 11 import io.livekit.android.ConnectOptions
12 import io.livekit.android.Version 12 import io.livekit.android.Version
13 import io.livekit.android.dagger.InjectionNames 13 import io.livekit.android.dagger.InjectionNames
  14 +import io.livekit.android.events.BroadcastEventBus
  15 +import io.livekit.android.events.RoomEvent
14 import io.livekit.android.renderer.TextureViewRenderer 16 import io.livekit.android.renderer.TextureViewRenderer
15 import io.livekit.android.room.participant.* 17 import io.livekit.android.room.participant.*
16 import io.livekit.android.room.track.* 18 import io.livekit.android.room.track.*
17 import io.livekit.android.util.LKLog 19 import io.livekit.android.util.LKLog
18 -import kotlinx.coroutines.CoroutineDispatcher 20 +import kotlinx.coroutines.*
19 import livekit.LivekitModels 21 import livekit.LivekitModels
20 import livekit.LivekitRtc 22 import livekit.LivekitRtc
21 import org.webrtc.* 23 import org.webrtc.*
@@ -29,9 +31,16 @@ constructor( @@ -29,9 +31,16 @@ constructor(
29 private val eglBase: EglBase, 31 private val eglBase: EglBase,
30 private val localParticipantFactory: LocalParticipant.Factory, 32 private val localParticipantFactory: LocalParticipant.Factory,
31 private val defaultsManager: DefaultsManager, 33 private val defaultsManager: DefaultsManager,
  34 + @Named(InjectionNames.DISPATCHER_DEFAULT)
  35 + private val defaultDispatcher: CoroutineDispatcher,
32 @Named(InjectionNames.DISPATCHER_IO) 36 @Named(InjectionNames.DISPATCHER_IO)
33 private val ioDispatcher: CoroutineDispatcher, 37 private val ioDispatcher: CoroutineDispatcher,
34 ) : RTCEngine.Listener, ParticipantListener, ConnectivityManager.NetworkCallback() { 38 ) : RTCEngine.Listener, ParticipantListener, ConnectivityManager.NetworkCallback() {
  39 +
  40 + private lateinit var coroutineScope: CoroutineScope
  41 + private val eventBus = BroadcastEventBus<RoomEvent>()
  42 + val events = eventBus.readOnly()
  43 +
35 init { 44 init {
36 engine.listener = this 45 engine.listener = this
37 } 46 }
@@ -46,6 +55,7 @@ constructor( @@ -46,6 +55,7 @@ constructor(
46 @JvmInline 55 @JvmInline
47 value class Sid(val sid: String) 56 value class Sid(val sid: String)
48 57
  58 + @Deprecated("Use events instead.")
49 var listener: RoomListener? = null 59 var listener: RoomListener? = null
50 60
51 var sid: Sid? = null 61 var sid: Sid? = null
@@ -75,6 +85,10 @@ constructor( @@ -75,6 +85,10 @@ constructor(
75 85
76 private var hasLostConnectivity: Boolean = false 86 private var hasLostConnectivity: Boolean = false
77 suspend fun connect(url: String, token: String, options: ConnectOptions?) { 87 suspend fun connect(url: String, token: String, options: ConnectOptions?) {
  88 + if(this::coroutineScope.isInitialized) {
  89 + coroutineScope.cancel()
  90 + }
  91 + coroutineScope = CoroutineScope(defaultDispatcher + SupervisorJob())
78 state = State.CONNECTING 92 state = State.CONNECTING
79 val response = engine.join(url, token, options) 93 val response = engine.join(url, token, options)
80 LKLog.i { "Connected to server, server version: ${response.serverVersion}, client version: ${Version.CLIENT_VERSION}" } 94 LKLog.i { "Connected to server, server version: ${response.serverVersion}, client version: ${Version.CLIENT_VERSION}" }
@@ -114,8 +128,16 @@ constructor( @@ -114,8 +128,16 @@ constructor(
114 } 128 }
115 129
116 listener?.onParticipantDisconnected(this, removedParticipant) 130 listener?.onParticipantDisconnected(this, removedParticipant)
  131 + eventBus.postEvent(RoomEvent.ParticipantDisconnected(this, removedParticipant), coroutineScope)
117 } 132 }
118 133
  134 + fun getParticipant(sid: String): Participant? {
  135 + if(sid == localParticipant.sid){
  136 + return localParticipant
  137 + } else {
  138 + return remoteParticipants[sid]
  139 + }
  140 + }
119 @Synchronized 141 @Synchronized
120 private fun getOrCreateRemoteParticipant( 142 private fun getOrCreateRemoteParticipant(
121 sid: String, 143 sid: String,
@@ -144,19 +166,11 @@ constructor( @@ -144,19 +166,11 @@ constructor(
144 val speakerSid = speakerInfo.sid!! 166 val speakerSid = speakerInfo.sid!!
145 seenSids.add(speakerSid) 167 seenSids.add(speakerSid)
146 168
147 - if (speakerSid == localParticipant.sid) {  
148 - localParticipant.audioLevel = speakerInfo.level  
149 - localParticipant.isSpeaking = true  
150 - speakers.add(localParticipant)  
151 - } else {  
152 - val participant = remoteParticipants[speakerSid]  
153 - if (participant != null) { 169 + val participant = getParticipant(speakerSid) ?: return@forEach
154 participant.audioLevel = speakerInfo.level 170 participant.audioLevel = speakerInfo.level
155 participant.isSpeaking = true 171 participant.isSpeaking = true
156 speakers.add(participant) 172 speakers.add(participant)
157 } 173 }
158 - }  
159 - }  
160 174
161 if (!seenSids.contains(localParticipant.sid)) { 175 if (!seenSids.contains(localParticipant.sid)) {
162 localParticipant.audioLevel = 0.0f 176 localParticipant.audioLevel = 0.0f
@@ -172,6 +186,7 @@ constructor( @@ -172,6 +186,7 @@ constructor(
172 mutableActiveSpeakers.clear() 186 mutableActiveSpeakers.clear()
173 mutableActiveSpeakers.addAll(speakers) 187 mutableActiveSpeakers.addAll(speakers)
174 listener?.onActiveSpeakersChanged(speakers, this) 188 listener?.onActiveSpeakersChanged(speakers, this)
  189 + eventBus.postEvent(RoomEvent.ActiveSpeakersChanged(this, speakers), coroutineScope)
175 } 190 }
176 191
177 private fun handleSpeakersChanged(speakerInfos: List<LivekitModels.SpeakerInfo>) { 192 private fun handleSpeakersChanged(speakerInfos: List<LivekitModels.SpeakerInfo>) {
@@ -181,11 +196,7 @@ constructor( @@ -181,11 +196,7 @@ constructor(
181 } 196 }
182 197
183 speakerInfos.forEach { speaker -> 198 speakerInfos.forEach { speaker ->
184 - val participant = if(speaker.sid == localParticipant.sid) {  
185 - localParticipant  
186 - } else {  
187 - remoteParticipants[speaker.sid]  
188 - } ?: return@forEach 199 + val participant = getParticipant(speaker.sid) ?: return@forEach
189 200
190 participant.audioLevel = speaker.level 201 participant.audioLevel = speaker.level
191 participant.isSpeaking = speaker.active 202 participant.isSpeaking = speaker.active
@@ -203,6 +214,7 @@ constructor( @@ -203,6 +214,7 @@ constructor(
203 mutableActiveSpeakers.clear() 214 mutableActiveSpeakers.clear()
204 mutableActiveSpeakers.addAll(updatedSpeakersList) 215 mutableActiveSpeakers.addAll(updatedSpeakersList)
205 listener?.onActiveSpeakersChanged(updatedSpeakersList, this) 216 listener?.onActiveSpeakersChanged(updatedSpeakersList, this)
  217 + eventBus.postEvent(RoomEvent.ActiveSpeakersChanged(this, updatedSpeakersList), coroutineScope)
206 } 218 }
207 219
208 private fun reconnect() { 220 private fun reconnect() {
@@ -212,6 +224,7 @@ constructor( @@ -212,6 +224,7 @@ constructor(
212 state = State.RECONNECTING 224 state = State.RECONNECTING
213 engine.reconnect() 225 engine.reconnect()
214 listener?.onReconnecting(this) 226 listener?.onReconnecting(this)
  227 + eventBus.postEvent(RoomEvent.Reconnecting(this), coroutineScope)
215 } 228 }
216 229
217 private fun handleDisconnect() { 230 private fun handleDisconnect() {
@@ -230,6 +243,12 @@ constructor( @@ -230,6 +243,12 @@ constructor(
230 state = State.DISCONNECTED 243 state = State.DISCONNECTED
231 listener?.onDisconnect(this, null) 244 listener?.onDisconnect(this, null)
232 listener = null 245 listener = null
  246 +
  247 + // Ensure all observers see the disconnected before closing scope.
  248 + runBlocking {
  249 + eventBus.postEvent(RoomEvent.Disconnected(this@Room, null), coroutineScope).join()
  250 + }
  251 + coroutineScope.cancel()
233 } 252 }
234 253
235 /** 254 /**
@@ -272,6 +291,7 @@ constructor( @@ -272,6 +291,7 @@ constructor(
272 override fun onIceReconnected() { 291 override fun onIceReconnected() {
273 state = State.CONNECTED 292 state = State.CONNECTED
274 listener?.onReconnected(this) 293 listener?.onReconnected(this)
  294 + eventBus.postEvent(RoomEvent.Reconnected(this), coroutineScope)
275 } 295 }
276 296
277 /** 297 /**
@@ -310,6 +330,7 @@ constructor( @@ -310,6 +330,7 @@ constructor(
310 handleParticipantDisconnect(participantSid) 330 handleParticipantDisconnect(participantSid)
311 } else if (isNewParticipant) { 331 } else if (isNewParticipant) {
312 listener?.onParticipantConnected(this, participant) 332 listener?.onParticipantConnected(this, participant)
  333 + eventBus.postEvent(RoomEvent.ParticipantConnected(this, participant), coroutineScope)
313 } else { 334 } else {
314 participant.updateFromInfo(info) 335 participant.updateFromInfo(info)
315 } 336 }
@@ -334,14 +355,10 @@ constructor( @@ -334,14 +355,10 @@ constructor(
334 override fun onConnectionQuality(updates: List<LivekitRtc.ConnectionQualityInfo>) { 355 override fun onConnectionQuality(updates: List<LivekitRtc.ConnectionQualityInfo>) {
335 updates.forEach { info -> 356 updates.forEach { info ->
336 val quality = ConnectionQuality.fromProto(info.quality) 357 val quality = ConnectionQuality.fromProto(info.quality)
337 - if (info.participantSid == this.localParticipant.sid) {  
338 - this.localParticipant.connectionQuality = quality  
339 - listener?.onConnectionQualityChanged(localParticipant, quality)  
340 - } else {  
341 - val participant = remoteParticipants[info.participantSid] ?: return@forEach 358 + val participant = getParticipant(info.participantSid) ?: return
342 participant.connectionQuality = quality 359 participant.connectionQuality = quality
343 listener?.onConnectionQualityChanged(participant, quality) 360 listener?.onConnectionQualityChanged(participant, quality)
344 - } 361 + eventBus.postEvent(RoomEvent.ConnectionQualityChanged(this, participant, quality), coroutineScope)
345 } 362 }
346 } 363 }
347 364
@@ -360,6 +377,7 @@ constructor( @@ -360,6 +377,7 @@ constructor(
360 val data = packet.payload.toByteArray() 377 val data = packet.payload.toByteArray()
361 378
362 listener?.onDataReceived(data, participant, this) 379 listener?.onDataReceived(data, participant, this)
  380 + eventBus.postEvent(RoomEvent.DataReceived(this, data, participant), coroutineScope)
363 participant.listener?.onDataReceived(data, participant) 381 participant.listener?.onDataReceived(data, participant)
364 } 382 }
365 383
@@ -385,16 +403,19 @@ constructor( @@ -385,16 +403,19 @@ constructor(
385 */ 403 */
386 override fun onMetadataChanged(participant: Participant, prevMetadata: String?) { 404 override fun onMetadataChanged(participant: Participant, prevMetadata: String?) {
387 listener?.onMetadataChanged(participant, prevMetadata, this) 405 listener?.onMetadataChanged(participant, prevMetadata, this)
  406 + eventBus.postEvent(RoomEvent.MetadataChanged(this, participant, prevMetadata), coroutineScope)
388 } 407 }
389 408
390 /** @suppress */ 409 /** @suppress */
391 override fun onTrackMuted(publication: TrackPublication, participant: Participant) { 410 override fun onTrackMuted(publication: TrackPublication, participant: Participant) {
392 listener?.onTrackMuted(publication, participant, this) 411 listener?.onTrackMuted(publication, participant, this)
  412 + eventBus.postEvent(RoomEvent.TrackMuted(this, publication, participant), coroutineScope)
393 } 413 }
394 414
395 /** @suppress */ 415 /** @suppress */
396 override fun onTrackUnmuted(publication: TrackPublication, participant: Participant) { 416 override fun onTrackUnmuted(publication: TrackPublication, participant: Participant) {
397 listener?.onTrackUnmuted(publication, participant, this) 417 listener?.onTrackUnmuted(publication, participant, this)
  418 + eventBus.postEvent(RoomEvent.TrackUnmuted(this, publication, participant), coroutineScope)
398 } 419 }
399 420
400 /** 421 /**
@@ -402,6 +423,7 @@ constructor( @@ -402,6 +423,7 @@ constructor(
402 */ 423 */
403 override fun onTrackPublished(publication: RemoteTrackPublication, participant: RemoteParticipant) { 424 override fun onTrackPublished(publication: RemoteTrackPublication, participant: RemoteParticipant) {
404 listener?.onTrackPublished(publication, participant, this) 425 listener?.onTrackPublished(publication, participant, this)
  426 + eventBus.postEvent(RoomEvent.TrackPublished(this, publication, participant), coroutineScope)
405 } 427 }
406 428
407 /** 429 /**
@@ -409,6 +431,7 @@ constructor( @@ -409,6 +431,7 @@ constructor(
409 */ 431 */
410 override fun onTrackUnpublished(publication: RemoteTrackPublication, participant: RemoteParticipant) { 432 override fun onTrackUnpublished(publication: RemoteTrackPublication, participant: RemoteParticipant) {
411 listener?.onTrackUnpublished(publication, participant, this) 433 listener?.onTrackUnpublished(publication, participant, this)
  434 + eventBus.postEvent(RoomEvent.TrackUnpublished(this, publication, participant), coroutineScope)
412 } 435 }
413 436
414 /** 437 /**
@@ -416,6 +439,7 @@ constructor( @@ -416,6 +439,7 @@ constructor(
416 */ 439 */
417 override fun onTrackPublished(publication: LocalTrackPublication, participant: LocalParticipant) { 440 override fun onTrackPublished(publication: LocalTrackPublication, participant: LocalParticipant) {
418 listener?.onTrackPublished(publication, participant, this) 441 listener?.onTrackPublished(publication, participant, this)
  442 + eventBus.postEvent(RoomEvent.TrackPublished(this, publication, participant), coroutineScope)
419 } 443 }
420 444
421 /** 445 /**
@@ -423,6 +447,7 @@ constructor( @@ -423,6 +447,7 @@ constructor(
423 */ 447 */
424 override fun onTrackUnpublished(publication: LocalTrackPublication, participant: LocalParticipant) { 448 override fun onTrackUnpublished(publication: LocalTrackPublication, participant: LocalParticipant) {
425 listener?.onTrackUnpublished(publication, participant, this) 449 listener?.onTrackUnpublished(publication, participant, this)
  450 + eventBus.postEvent(RoomEvent.TrackUnpublished(this, publication, participant), coroutineScope)
426 } 451 }
427 452
428 /** 453 /**
@@ -430,6 +455,7 @@ constructor( @@ -430,6 +455,7 @@ constructor(
430 */ 455 */
431 override fun onTrackSubscribed(track: Track, publication: RemoteTrackPublication, participant: RemoteParticipant) { 456 override fun onTrackSubscribed(track: Track, publication: RemoteTrackPublication, participant: RemoteParticipant) {
432 listener?.onTrackSubscribed(track, publication, participant, this) 457 listener?.onTrackSubscribed(track, publication, participant, this)
  458 + eventBus.postEvent(RoomEvent.TrackSubscribed(this, track, publication, participant), coroutineScope)
433 } 459 }
434 460
435 /** 461 /**
@@ -441,6 +467,7 @@ constructor( @@ -441,6 +467,7 @@ constructor(
441 participant: RemoteParticipant 467 participant: RemoteParticipant
442 ) { 468 ) {
443 listener?.onTrackSubscriptionFailed(sid, exception, participant, this) 469 listener?.onTrackSubscriptionFailed(sid, exception, participant, this)
  470 + eventBus.postEvent(RoomEvent.TrackSubscriptionFailed(this, sid, exception, participant), coroutineScope)
444 } 471 }
445 472
446 /** 473 /**
@@ -452,6 +479,7 @@ constructor( @@ -452,6 +479,7 @@ constructor(
452 participant: RemoteParticipant 479 participant: RemoteParticipant
453 ) { 480 ) {
454 listener?.onTrackUnsubscribed(track, publication, participant, this) 481 listener?.onTrackUnsubscribed(track, publication, participant, this)
  482 + eventBus.postEvent(RoomEvent.TrackUnsubscribed(this, track, publication, participant), coroutineScope)
455 } 483 }
456 484
457 /** 485 /**
@@ -480,6 +508,7 @@ constructor( @@ -480,6 +508,7 @@ constructor(
480 * Room Listener, this class provides callbacks that clients should override. 508 * Room Listener, this class provides callbacks that clients should override.
481 * 509 *
482 */ 510 */
  511 +@Deprecated("Use Room.events instead")
483 interface RoomListener { 512 interface RoomListener {
484 /** 513 /**
485 * A network change has been detected and LiveKit attempts to reconnect to the room 514 * A network change has been detected and LiveKit attempts to reconnect to the room