Room.kt
22.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
package io.livekit.android.room
import android.content.Context
import android.net.ConnectivityManager
import android.net.Network
import android.net.NetworkCapabilities
import android.net.NetworkRequest
import dagger.assisted.Assisted
import dagger.assisted.AssistedFactory
import dagger.assisted.AssistedInject
import io.livekit.android.ConnectOptions
import io.livekit.android.Version
import io.livekit.android.dagger.InjectionNames
import io.livekit.android.events.BroadcastEventBus
import io.livekit.android.events.RoomEvent
import io.livekit.android.renderer.TextureViewRenderer
import io.livekit.android.room.participant.*
import io.livekit.android.room.track.*
import io.livekit.android.util.LKLog
import kotlinx.coroutines.*
import livekit.LivekitModels
import livekit.LivekitRtc
import org.webrtc.*
import javax.inject.Named
class Room
@AssistedInject
constructor(
@Assisted private val context: Context,
private val engine: RTCEngine,
private val eglBase: EglBase,
private val localParticipantFactory: LocalParticipant.Factory,
private val defaultsManager: DefaultsManager,
@Named(InjectionNames.DISPATCHER_DEFAULT)
private val defaultDispatcher: CoroutineDispatcher,
@Named(InjectionNames.DISPATCHER_IO)
private val ioDispatcher: CoroutineDispatcher,
) : RTCEngine.Listener, ParticipantListener, ConnectivityManager.NetworkCallback() {
private lateinit var coroutineScope: CoroutineScope
private val eventBus = BroadcastEventBus<RoomEvent>()
val events = eventBus.readOnly()
init {
engine.listener = this
}
enum class State {
CONNECTING,
CONNECTED,
DISCONNECTED,
RECONNECTING;
}
@JvmInline
value class Sid(val sid: String)
@Deprecated("Use events instead.")
var listener: RoomListener? = null
var sid: Sid? = null
private set
var name: String? = null
private set
var state: State = State.DISCONNECTED
private set
var metadata: String? = null
private set
var autoManageVideo: Boolean = false
var audioTrackCaptureDefaults: LocalAudioTrackOptions by defaultsManager::audioTrackCaptureDefaults
var audioTrackPublishDefaults: AudioTrackPublishDefaults by defaultsManager::audioTrackPublishDefaults
var videoTrackCaptureDefaults: LocalVideoTrackOptions by defaultsManager::videoTrackCaptureDefaults
var videoTrackPublishDefaults: VideoTrackPublishDefaults by defaultsManager::videoTrackPublishDefaults
lateinit var localParticipant: LocalParticipant
private set
private val mutableRemoteParticipants = mutableMapOf<String, RemoteParticipant>()
val remoteParticipants: Map<String, RemoteParticipant>
get() = mutableRemoteParticipants
private val mutableActiveSpeakers = mutableListOf<Participant>()
val activeSpeakers: List<Participant>
get() = mutableActiveSpeakers
private var hasLostConnectivity: Boolean = false
suspend fun connect(url: String, token: String, options: ConnectOptions?) {
if(this::coroutineScope.isInitialized) {
coroutineScope.cancel()
}
coroutineScope = CoroutineScope(defaultDispatcher + SupervisorJob())
state = State.CONNECTING
val response = engine.join(url, token, options)
LKLog.i { "Connected to server, server version: ${response.serverVersion}, client version: ${Version.CLIENT_VERSION}" }
sid = Sid(response.room.sid)
name = response.room.name
if (!response.hasParticipant()) {
listener?.onFailedToConnect(this, RoomException.ConnectException("server didn't return any participants"))
return
}
val lp = localParticipantFactory.create(response.participant)
lp.internalListener = this
localParticipant = lp
if (response.otherParticipantsList.isNotEmpty()) {
response.otherParticipantsList.forEach {
getOrCreateRemoteParticipant(it.sid, it)
}
}
val cm = context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager
val networkRequest = NetworkRequest.Builder()
.addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)
.build()
cm.registerNetworkCallback(networkRequest, this)
}
fun disconnect() {
engine.client.sendLeave()
handleDisconnect()
}
private fun handleParticipantDisconnect(sid: String) {
val removedParticipant = mutableRemoteParticipants.remove(sid) ?: return
removedParticipant.tracks.values.toList().forEach { publication ->
removedParticipant.unpublishTrack(publication.sid)
}
listener?.onParticipantDisconnected(this, removedParticipant)
eventBus.postEvent(RoomEvent.ParticipantDisconnected(this, removedParticipant), coroutineScope)
}
fun getParticipant(sid: String): Participant? {
if(sid == localParticipant.sid){
return localParticipant
} else {
return remoteParticipants[sid]
}
}
@Synchronized
private fun getOrCreateRemoteParticipant(
sid: String,
info: LivekitModels.ParticipantInfo? = null
): RemoteParticipant {
var participant = remoteParticipants[sid]
if (participant != null) {
return participant
}
participant = if (info != null) {
RemoteParticipant(info, engine.client, ioDispatcher)
} else {
RemoteParticipant(sid, null, engine.client, ioDispatcher)
}
participant.internalListener = this
mutableRemoteParticipants[sid] = participant
return participant
}
private fun handleActiveSpeakersUpdate(speakerInfos: List<LivekitModels.SpeakerInfo>) {
val speakers = mutableListOf<Participant>()
val seenSids = mutableSetOf<String>()
val localParticipant = localParticipant
speakerInfos.forEach { speakerInfo ->
val speakerSid = speakerInfo.sid!!
seenSids.add(speakerSid)
val participant = getParticipant(speakerSid) ?: return@forEach
participant.audioLevel = speakerInfo.level
participant.isSpeaking = true
speakers.add(participant)
}
if (!seenSids.contains(localParticipant.sid)) {
localParticipant.audioLevel = 0.0f
localParticipant.isSpeaking = false
}
remoteParticipants.values
.filterNot { seenSids.contains(it.sid) }
.forEach {
it.audioLevel = 0.0f
it.isSpeaking = false
}
mutableActiveSpeakers.clear()
mutableActiveSpeakers.addAll(speakers)
listener?.onActiveSpeakersChanged(speakers, this)
eventBus.postEvent(RoomEvent.ActiveSpeakersChanged(this, speakers), coroutineScope)
}
private fun handleSpeakersChanged(speakerInfos: List<LivekitModels.SpeakerInfo>) {
val updatedSpeakers = mutableMapOf<String, Participant>()
activeSpeakers.forEach {
updatedSpeakers[it.sid] = it
}
speakerInfos.forEach { speaker ->
val participant = getParticipant(speaker.sid) ?: return@forEach
participant.audioLevel = speaker.level
participant.isSpeaking = speaker.active
if(speaker.active) {
updatedSpeakers[speaker.sid] = participant
} else {
updatedSpeakers.remove(speaker.sid)
}
}
val updatedSpeakersList = updatedSpeakers.values.toList()
.sortedBy { it.audioLevel }
mutableActiveSpeakers.clear()
mutableActiveSpeakers.addAll(updatedSpeakersList)
listener?.onActiveSpeakersChanged(updatedSpeakersList, this)
eventBus.postEvent(RoomEvent.ActiveSpeakersChanged(this, updatedSpeakersList), coroutineScope)
}
private fun reconnect() {
if (state == State.RECONNECTING) {
return
}
state = State.RECONNECTING
engine.reconnect()
listener?.onReconnecting(this)
eventBus.postEvent(RoomEvent.Reconnecting(this), coroutineScope)
}
private fun handleDisconnect() {
val cm = context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager
cm.unregisterNetworkCallback(this)
for (pub in localParticipant.tracks.values) {
pub.track?.stop()
}
// stop remote tracks too
for (p in remoteParticipants.values) {
for (pub in p.tracks.values) {
pub.track?.stop()
}
}
engine.close()
state = State.DISCONNECTED
listener?.onDisconnect(this, null)
listener = null
// Ensure all observers see the disconnected before closing scope.
runBlocking {
eventBus.postEvent(RoomEvent.Disconnected(this@Room, null), coroutineScope).join()
}
coroutineScope.cancel()
}
/**
* @suppress
*/
@AssistedFactory
interface Factory {
fun create(context: Context): Room
}
//------------------------------------- NetworkCallback -------------------------------------//
/**
* @suppress
*/
override fun onLost(network: Network) {
// lost connection, flip to reconnecting
hasLostConnectivity = true
}
/**
* @suppress
*/
override fun onAvailable(network: Network) {
// only actually reconnect after connection is re-established
if (!hasLostConnectivity) {
return
}
LKLog.i { "network connection available, reconnecting" }
reconnect()
hasLostConnectivity = false
}
//----------------------------------- RTCEngine.Listener ------------------------------------//
override fun onIceConnected() {
state = State.CONNECTED
}
override fun onIceReconnected() {
state = State.CONNECTED
listener?.onReconnected(this)
eventBus.postEvent(RoomEvent.Reconnected(this), coroutineScope)
}
/**
* @suppress
*/
override fun onAddTrack(track: MediaStreamTrack, streams: Array<out MediaStream>) {
if (streams.count() < 0) {
LKLog.i { "add track with empty streams?" }
return
}
var (participantSid, trackSid) = unpackStreamId(streams.first().id)
if (trackSid == null) {
trackSid = track.id()
}
val participant = getOrCreateRemoteParticipant(participantSid)
participant.addSubscribedMediaTrack(track, trackSid!!, autoManageVideo)
}
/**
* @suppress
*/
override fun onUpdateParticipants(updates: List<LivekitModels.ParticipantInfo>) {
for (info in updates) {
val participantSid = info.sid
if(localParticipant.sid == participantSid) {
localParticipant.updateFromInfo(info)
continue
}
val isNewParticipant = !remoteParticipants.contains(participantSid)
val participant = getOrCreateRemoteParticipant(participantSid, info)
if (info.state == LivekitModels.ParticipantInfo.State.DISCONNECTED) {
handleParticipantDisconnect(participantSid)
} else if (isNewParticipant) {
listener?.onParticipantConnected(this, participant)
eventBus.postEvent(RoomEvent.ParticipantConnected(this, participant), coroutineScope)
} else {
participant.updateFromInfo(info)
}
}
}
/**
* @suppress
*/
override fun onActiveSpeakersUpdate(speakers: List<LivekitModels.SpeakerInfo>) {
handleActiveSpeakersUpdate(speakers)
}
override fun onRemoteMuteChanged(trackSid: String, muted: Boolean) {
localParticipant.onRemoteMuteChanged(trackSid, muted)
}
override fun onRoomUpdate(update: LivekitModels.Room) {
val oldMetadata = metadata
metadata = update.metadata
eventBus.postEvent(RoomEvent.RoomMetadataChanged(this, metadata, oldMetadata), coroutineScope)
}
override fun onConnectionQuality(updates: List<LivekitRtc.ConnectionQualityInfo>) {
updates.forEach { info ->
val quality = ConnectionQuality.fromProto(info.quality)
val participant = getParticipant(info.participantSid) ?: return
participant.connectionQuality = quality
listener?.onConnectionQualityChanged(participant, quality)
eventBus.postEvent(RoomEvent.ConnectionQualityChanged(this, participant, quality), coroutineScope)
}
}
/**
* @suppress
*/
override fun onSpeakersChanged(speakers: List<LivekitModels.SpeakerInfo>) {
handleSpeakersChanged(speakers)
}
/**
* @suppress
*/
override fun onUserPacket(packet: LivekitModels.UserPacket, kind: LivekitModels.DataPacket.Kind) {
val participant = remoteParticipants[packet.participantSid] ?: return
val data = packet.payload.toByteArray()
listener?.onDataReceived(data, participant, this)
eventBus.postEvent(RoomEvent.DataReceived(this, data, participant), coroutineScope)
participant.listener?.onDataReceived(data, participant)
}
/**
* @suppress
*/
override fun onDisconnect(reason: String) {
LKLog.v { "engine did disconnect: $reason" }
handleDisconnect()
}
/**
* @suppress
*/
override fun onFailToConnect(error: Exception) {
listener?.onFailedToConnect(this, error)
}
//------------------------------- ParticipantListener --------------------------------//
/**
* This is called for both Local and Remote participants
* @suppress
*/
override fun onMetadataChanged(participant: Participant, prevMetadata: String?) {
listener?.onMetadataChanged(participant, prevMetadata, this)
eventBus.postEvent(RoomEvent.ParticipantMetadataChanged(this, participant, prevMetadata), coroutineScope)
}
/** @suppress */
override fun onTrackMuted(publication: TrackPublication, participant: Participant) {
listener?.onTrackMuted(publication, participant, this)
eventBus.postEvent(RoomEvent.TrackMuted(this, publication, participant), coroutineScope)
}
/** @suppress */
override fun onTrackUnmuted(publication: TrackPublication, participant: Participant) {
listener?.onTrackUnmuted(publication, participant, this)
eventBus.postEvent(RoomEvent.TrackUnmuted(this, publication, participant), coroutineScope)
}
/**
* @suppress
*/
override fun onTrackPublished(publication: RemoteTrackPublication, participant: RemoteParticipant) {
listener?.onTrackPublished(publication, participant, this)
eventBus.postEvent(RoomEvent.TrackPublished(this, publication, participant), coroutineScope)
}
/**
* @suppress
*/
override fun onTrackUnpublished(publication: RemoteTrackPublication, participant: RemoteParticipant) {
listener?.onTrackUnpublished(publication, participant, this)
eventBus.postEvent(RoomEvent.TrackUnpublished(this, publication, participant), coroutineScope)
}
/**
* @suppress
*/
override fun onTrackPublished(publication: LocalTrackPublication, participant: LocalParticipant) {
listener?.onTrackPublished(publication, participant, this)
eventBus.postEvent(RoomEvent.TrackPublished(this, publication, participant), coroutineScope)
}
/**
* @suppress
*/
override fun onTrackUnpublished(publication: LocalTrackPublication, participant: LocalParticipant) {
listener?.onTrackUnpublished(publication, participant, this)
eventBus.postEvent(RoomEvent.TrackUnpublished(this, publication, participant), coroutineScope)
}
/**
* @suppress
*/
override fun onTrackSubscribed(track: Track, publication: RemoteTrackPublication, participant: RemoteParticipant) {
listener?.onTrackSubscribed(track, publication, participant, this)
eventBus.postEvent(RoomEvent.TrackSubscribed(this, track, publication, participant), coroutineScope)
}
/**
* @suppress
*/
override fun onTrackSubscriptionFailed(
sid: String,
exception: Exception,
participant: RemoteParticipant
) {
listener?.onTrackSubscriptionFailed(sid, exception, participant, this)
eventBus.postEvent(RoomEvent.TrackSubscriptionFailed(this, sid, exception, participant), coroutineScope)
}
/**
* @suppress
*/
override fun onTrackUnsubscribed(
track: Track,
publication: RemoteTrackPublication,
participant: RemoteParticipant
) {
listener?.onTrackUnsubscribed(track, publication, participant, this)
eventBus.postEvent(RoomEvent.TrackUnsubscribed(this, track, publication, participant), coroutineScope)
}
/**
* @suppress
* // TODO(@dl): can this be moved out of Room/SDK?
*/
fun initVideoRenderer(viewRenderer: SurfaceViewRenderer) {
viewRenderer.init(eglBase.eglBaseContext, null)
viewRenderer.setScalingType(RendererCommon.ScalingType.SCALE_ASPECT_FIT)
viewRenderer.setEnableHardwareScaler(false /* enabled */)
}
/**
* @suppress
* // TODO(@dl): can this be moved out of Room/SDK?
*/
fun initVideoRenderer(viewRenderer: TextureViewRenderer) {
viewRenderer.init(eglBase.eglBaseContext, null)
viewRenderer.setScalingType(RendererCommon.ScalingType.SCALE_ASPECT_FIT)
viewRenderer.setEnableHardwareScaler(false /* enabled */)
}
}
/**
* Room Listener, this class provides callbacks that clients should override.
*
*/
@Deprecated("Use Room.events instead")
interface RoomListener {
/**
* A network change has been detected and LiveKit attempts to reconnect to the room
* When reconnect attempts succeed, the room state will be kept, including tracks that are subscribed/published
*/
fun onReconnecting(room: Room) {}
/**
* The reconnect attempt had been successful
*/
fun onReconnected(room: Room) {}
/**
* Disconnected from room
*/
fun onDisconnect(room: Room, error: Exception?) {}
/**
* When a [RemoteParticipant] joins after the local participant. It will not emit events
* for participants that are already in the room
*/
fun onParticipantConnected(room: Room, participant: RemoteParticipant) {}
/**
* When a [RemoteParticipant] leaves after the local participant has joined.
*/
fun onParticipantDisconnected(room: Room, participant: RemoteParticipant) {}
/**
* Could not connect to the room
*/
fun onFailedToConnect(room: Room, error: Exception) {}
// fun onReconnecting(room: Room, error: Exception) {}
// fun onReconnect(room: Room) {}
/**
* Active speakers changed. List of speakers are ordered by their audio level. loudest
* speakers first. This will include the [LocalParticipant] too.
*/
fun onActiveSpeakersChanged(speakers: List<Participant>, room: Room) {}
// Participant callbacks
/**
* Participant metadata is a simple way for app-specific state to be pushed to all users.
* When RoomService.UpdateParticipantMetadata is called to change a participant's state,
* this event will be fired for all clients in the room.
*/
fun onMetadataChanged(participant: Participant, prevMetadata: String?, room: Room) {}
/**
* The participant was muted.
*
* For the local participant, the callback will be called if setMute was called on the
* [LocalTrackPublication], or if the server has requested the participant to be muted
*/
fun onTrackMuted(publication: TrackPublication, participant: Participant, room: Room) {}
/**
* The participant was unmuted.
*
* For the local participant, the callback will be called if setMute was called on the
* [LocalTrackPublication], or if the server has requested the participant to be muted
*/
fun onTrackUnmuted(publication: TrackPublication, participant: Participant, room: Room) {}
/**
* When a new track is published to room after the local participant has joined. It will
* not fire for tracks that are already published
*/
fun onTrackPublished(publication: TrackPublication, participant: RemoteParticipant, room: Room) {}
/**
* A [RemoteParticipant] has unpublished a track
*/
fun onTrackUnpublished(publication: TrackPublication, participant: RemoteParticipant, room: Room) {}
/**
* When a new track is published to room after the local participant has joined.
*/
fun onTrackPublished(publication: LocalTrackPublication, participant: LocalParticipant, room: Room) {}
/**
* [LocalParticipant] has unpublished a track
*/
fun onTrackUnpublished(publication: LocalTrackPublication, participant: LocalParticipant, room: Room) {}
/**
* The [LocalParticipant] has subscribed to a new track. This event will always fire as
* long as new tracks are ready for use.
*/
fun onTrackSubscribed(track: Track, publication: TrackPublication, participant: RemoteParticipant, room: Room) {}
/**
* Could not subscribe to a track
*/
fun onTrackSubscriptionFailed(sid: String, exception: Exception, participant: RemoteParticipant, room: Room) {}
/**
* A subscribed track is no longer available. Clients should listen to this event and ensure
* the track removes all renderers
*/
fun onTrackUnsubscribed(track: Track, publications: TrackPublication, participant: RemoteParticipant, room: Room) {}
/**
* Received data published by another participant
*/
fun onDataReceived(data: ByteArray, participant: RemoteParticipant, room: Room) {}
/**
* The connection quality for a participant has changed.
*
* @param participant Either a remote participant or [Room.localParticipant]
* @param quality the new connection quality
*/
fun onConnectionQualityChanged(participant: Participant, quality: ConnectionQuality) {}
companion object {
fun getDefaultDevice(kind: DeviceManager.Kind): String? {
return DeviceManager.getDefaultDevice(kind)
}
fun setDefaultDevice(kind: DeviceManager.Kind, deviceId: String?) {
DeviceManager.setDefaultDevice(kind, deviceId)
}
}
}
sealed class RoomException(message: String? = null, cause: Throwable? = null) :
Exception(message, cause) {
class ConnectException(message: String? = null, cause: Throwable? = null) :
RoomException(message, cause)
}
internal fun unpackStreamId(packed: String): Pair<String, String?> {
val parts = packed.split('|')
if (parts.size != 2) {
return Pair(packed, null)
}
return Pair(parts[0], parts[1])
}