davidliu
Committed by GitHub

session migration (#40)

* update protocol repo

* session migration

* Simulate Scenario debug menu

* some tests cleanup

* fix reconnection implementation

* fix not renegotiating publisher after migration

* clean up dev url/token

* bump ice connect timeout
正在显示 32 个修改的文件 包含 730 行增加227 行删除
@@ -11,6 +11,9 @@ import io.livekit.android.util.debounce @@ -11,6 +11,9 @@ import io.livekit.android.util.debounce
11 import kotlinx.coroutines.CoroutineDispatcher 11 import kotlinx.coroutines.CoroutineDispatcher
12 import kotlinx.coroutines.CoroutineScope 12 import kotlinx.coroutines.CoroutineScope
13 import kotlinx.coroutines.SupervisorJob 13 import kotlinx.coroutines.SupervisorJob
  14 +import kotlinx.coroutines.runBlocking
  15 +import kotlinx.coroutines.sync.Mutex
  16 +import kotlinx.coroutines.sync.withLock
14 import org.webrtc.* 17 import org.webrtc.*
15 import javax.inject.Named 18 import javax.inject.Named
16 19
@@ -37,28 +40,36 @@ constructor( @@ -37,28 +40,36 @@ constructor(
37 40
38 private var renegotiate = false 41 private var renegotiate = false
39 42
  43 + private val mutex = Mutex()
  44 +
40 interface Listener { 45 interface Listener {
41 fun onOffer(sd: SessionDescription) 46 fun onOffer(sd: SessionDescription)
42 } 47 }
43 48
44 fun addIceCandidate(candidate: IceCandidate) { 49 fun addIceCandidate(candidate: IceCandidate) {
  50 + runBlocking {
  51 + mutex.withLock {
45 if (peerConnection.remoteDescription != null && !restartingIce) { 52 if (peerConnection.remoteDescription != null && !restartingIce) {
46 peerConnection.addIceCandidate(candidate) 53 peerConnection.addIceCandidate(candidate)
47 } else { 54 } else {
48 pendingCandidates.add(candidate) 55 pendingCandidates.add(candidate)
49 } 56 }
50 } 57 }
  58 + }
  59 + }
51 60
52 suspend fun setRemoteDescription(sd: SessionDescription): Either<Unit, String?> { 61 suspend fun setRemoteDescription(sd: SessionDescription): Either<Unit, String?> {
53 62
54 val result = peerConnection.setRemoteDescription(sd) 63 val result = peerConnection.setRemoteDescription(sd)
55 if (result is Either.Left) { 64 if (result is Either.Left) {
  65 + mutex.withLock {
56 pendingCandidates.forEach { pending -> 66 pendingCandidates.forEach { pending ->
57 peerConnection.addIceCandidate(pending) 67 peerConnection.addIceCandidate(pending)
58 } 68 }
59 pendingCandidates.clear() 69 pendingCandidates.clear()
60 restartingIce = false 70 restartingIce = false
61 } 71 }
  72 + }
62 73
63 if (this.renegotiate) { 74 if (this.renegotiate) {
64 this.renegotiate = false 75 this.renegotiate = false
@@ -10,10 +10,8 @@ import io.livekit.android.util.CloseableCoroutineScope @@ -10,10 +10,8 @@ import io.livekit.android.util.CloseableCoroutineScope
10 import io.livekit.android.util.Either 10 import io.livekit.android.util.Either
11 import io.livekit.android.util.LKLog 11 import io.livekit.android.util.LKLog
12 import io.livekit.android.webrtc.isConnected 12 import io.livekit.android.webrtc.isConnected
13 -import kotlinx.coroutines.CoroutineDispatcher  
14 -import kotlinx.coroutines.SupervisorJob  
15 -import kotlinx.coroutines.delay  
16 -import kotlinx.coroutines.launch 13 +import io.livekit.android.webrtc.toProtoSessionDescription
  14 +import kotlinx.coroutines.*
17 import livekit.LivekitModels 15 import livekit.LivekitModels
18 import livekit.LivekitRtc 16 import livekit.LivekitRtc
19 import org.webrtc.* 17 import org.webrtc.*
@@ -48,22 +46,25 @@ internal constructor( @@ -48,22 +46,25 @@ internal constructor(
48 when (value) { 46 when (value) {
49 IceState.CONNECTED -> { 47 IceState.CONNECTED -> {
50 if (oldVal == IceState.DISCONNECTED) { 48 if (oldVal == IceState.DISCONNECTED) {
51 - LKLog.d { "publisher ICE connected" } 49 + LKLog.d { "primary ICE connected" }
52 listener?.onIceConnected() 50 listener?.onIceConnected()
53 } else if (oldVal == IceState.RECONNECTING) { 51 } else if (oldVal == IceState.RECONNECTING) {
54 - LKLog.d { "publisher ICE reconnected" } 52 + LKLog.d { "primary ICE reconnected" }
55 listener?.onIceReconnected() 53 listener?.onIceReconnected()
56 } 54 }
57 } 55 }
58 IceState.DISCONNECTED -> { 56 IceState.DISCONNECTED -> {
59 - LKLog.d { "publisher ICE disconnected" }  
60 - listener?.onDisconnect("Peer connection disconnected") 57 + LKLog.d { "primary ICE disconnected" }
  58 + if (oldVal == IceState.CONNECTED) {
  59 + reconnect()
  60 + }
61 } 61 }
62 else -> { 62 else -> {
63 } 63 }
64 } 64 }
65 } 65 }
66 - private var wsRetries: Int = 0 66 +
  67 + private var reconnectingJob: Job? = null
67 private val pendingTrackResolvers: MutableMap<String, Continuation<LivekitModels.TrackInfo>> = 68 private val pendingTrackResolvers: MutableMap<String, Continuation<LivekitModels.TrackInfo>> =
68 mutableMapOf() 69 mutableMapOf()
69 private var sessionUrl: String? = null 70 private var sessionUrl: String? = null
@@ -72,7 +73,7 @@ internal constructor( @@ -72,7 +73,7 @@ internal constructor(
72 private val publisherObserver = PublisherTransportObserver(this, client) 73 private val publisherObserver = PublisherTransportObserver(this, client)
73 private val subscriberObserver = SubscriberTransportObserver(this, client) 74 private val subscriberObserver = SubscriberTransportObserver(this, client)
74 internal lateinit var publisher: PeerConnectionTransport 75 internal lateinit var publisher: PeerConnectionTransport
75 - private lateinit var subscriber: PeerConnectionTransport 76 + internal lateinit var subscriber: PeerConnectionTransport
76 private var reliableDataChannel: DataChannel? = null 77 private var reliableDataChannel: DataChannel? = null
77 private var reliableDataChannelSub: DataChannel? = null 78 private var reliableDataChannelSub: DataChannel? = null
78 private var lossyDataChannel: DataChannel? = null 79 private var lossyDataChannel: DataChannel? = null
@@ -94,6 +95,7 @@ internal constructor( @@ -94,6 +95,7 @@ internal constructor(
94 sessionToken = token 95 sessionToken = token
95 val joinResponse = client.join(url, token, options) 96 val joinResponse = client.join(url, token, options)
96 isClosed = false 97 isClosed = false
  98 + listener?.onSignalConnected()
97 99
98 isSubscriberPrimary = joinResponse.subscriberPrimary 100 isSubscriberPrimary = joinResponse.subscriberPrimary
99 101
@@ -169,12 +171,13 @@ internal constructor( @@ -169,12 +171,13 @@ internal constructor(
169 val state = 171 val state =
170 newState ?: throw NullPointerException("unexpected null new state, what do?") 172 newState ?: throw NullPointerException("unexpected null new state, what do?")
171 LKLog.v { "onIceConnection new state: $newState" } 173 LKLog.v { "onIceConnection new state: $newState" }
172 - if (state == PeerConnection.IceConnectionState.CONNECTED) { 174 + if (state.isConnected()) {
173 iceState = IceState.CONNECTED 175 iceState = IceState.CONNECTED
174 - } else if (state == PeerConnection.IceConnectionState.FAILED) { 176 + } else if (state == PeerConnection.IceConnectionState.DISCONNECTED ||
  177 + state == PeerConnection.IceConnectionState.FAILED
  178 + ) {
175 // when we publish tracks, some WebRTC versions will send out disconnected events periodically 179 // when we publish tracks, some WebRTC versions will send out disconnected events periodically
176 iceState = IceState.DISCONNECTED 180 iceState = IceState.DISCONNECTED
177 - listener?.onDisconnect("Peer connection disconnected")  
178 } 181 }
179 } 182 }
180 183
@@ -242,6 +245,7 @@ internal constructor( @@ -242,6 +245,7 @@ internal constructor(
242 } 245 }
243 246
244 fun close() { 247 fun close() {
  248 + isClosed = true
245 coroutineScope.close() 249 coroutineScope.close()
246 publisher.close() 250 publisher.close()
247 subscriber.close() 251 subscriber.close()
@@ -252,53 +256,74 @@ internal constructor( @@ -252,53 +256,74 @@ internal constructor(
252 * reconnect Signal and PeerConnections 256 * reconnect Signal and PeerConnections
253 */ 257 */
254 internal fun reconnect() { 258 internal fun reconnect() {
  259 + if (reconnectingJob != null) {
  260 + return
  261 + }
  262 + if (this.isClosed) {
  263 + return
  264 + }
255 val url = sessionUrl 265 val url = sessionUrl
256 val token = sessionToken 266 val token = sessionToken
257 if (url == null || token == null) { 267 if (url == null || token == null) {
258 LKLog.w { "couldn't reconnect, no url or no token" } 268 LKLog.w { "couldn't reconnect, no url or no token" }
259 return 269 return
260 } 270 }
261 - if (iceState == IceState.DISCONNECTED || wsRetries >= MAX_SIGNAL_RETRIES) {  
262 - LKLog.w { "could not connect to signal after max attempts, giving up" }  
263 - close()  
264 - listener?.onDisconnect("could not reconnect after limit")  
265 - return  
266 - }  
267 271
  272 + val job = coroutineScope.launch {
  273 + listener?.onReconnecting()
  274 +
  275 + for (wsRetries in 0 until MAX_SIGNAL_RETRIES) {
268 var startDelay = wsRetries.toLong() * wsRetries * 500 276 var startDelay = wsRetries.toLong() * wsRetries * 500
269 if (startDelay > 5000) { 277 if (startDelay > 5000) {
270 startDelay = 5000 278 startDelay = 5000
271 } 279 }
272 - coroutineScope.launch {  
273 - delay(startDelay)  
274 - if (iceState == IceState.DISCONNECTED) {  
275 - LKLog.e { "Ice is disconnected" }  
276 - return@launch  
277 - }  
278 280
  281 + LKLog.i { "Reconnecting to signal, attempt ${wsRetries + 1}" }
  282 +
  283 + delay(startDelay)
  284 + try {
279 client.reconnect(url, token) 285 client.reconnect(url, token)
  286 + } catch (e: Exception) {
  287 + // ws reconnect failed, retry.
  288 + continue
  289 + }
280 290
281 - LKLog.v { "reconnected, restarting ICE" }  
282 - wsRetries = 0 291 + LKLog.v { "ws reconnected, restarting ICE" }
  292 + listener?.onSignalConnected()
283 293
284 - // trigger publisher reconnect  
285 subscriber.prepareForIceRestart() 294 subscriber.prepareForIceRestart()
  295 + iceState = IceState.RECONNECTING
  296 + // trigger publisher reconnect
286 // only restart publisher if it's needed 297 // only restart publisher if it's needed
287 if (hasPublished) { 298 if (hasPublished) {
288 - publisher.negotiate(  
289 - getPublisherOfferConstraints().apply {  
290 - with(mandatory) {  
291 - add(  
292 - MediaConstraints.KeyValuePair(  
293 - MediaConstraintKeys.ICE_RESTART,  
294 - MediaConstraintKeys.TRUE  
295 - )  
296 - ) 299 + negotiate()
297 } 300 }
  301 +
  302 + // wait until ICE connected
  303 + val endTime = SystemClock.elapsedRealtime() + MAX_ICE_CONNECT_TIMEOUT_MS;
  304 + while (SystemClock.elapsedRealtime() < endTime) {
  305 + if (iceState == IceState.CONNECTED) {
  306 + LKLog.v { "reconnected to ICE" }
  307 + break
298 } 308 }
299 - ) 309 + delay(100)
  310 + }
  311 +
  312 + if (iceState == IceState.CONNECTED) {
  313 + return@launch
  314 + }
  315 + }
  316 +
  317 +
  318 + listener?.onDisconnect("failed reconnecting.")
  319 + close()
300 } 320 }
301 321
  322 + reconnectingJob = job
  323 + job.invokeOnCompletion {
  324 + if (reconnectingJob == job) {
  325 + reconnectingJob = null
  326 + }
302 } 327 }
303 } 328 }
304 329
@@ -306,6 +331,9 @@ internal constructor( @@ -306,6 +331,9 @@ internal constructor(
306 if (!client.isConnected) { 331 if (!client.isConnected) {
307 return 332 return
308 } 333 }
  334 +
  335 + hasPublished = true
  336 +
309 coroutineScope.launch { 337 coroutineScope.launch {
310 publisher.negotiate(getPublisherOfferConstraints()) 338 publisher.negotiate(getPublisherOfferConstraints())
311 } 339 }
@@ -409,6 +437,8 @@ internal constructor( @@ -409,6 +437,8 @@ internal constructor(
409 fun onStreamStateUpdate(streamStates: List<LivekitRtc.StreamStateInfo>) 437 fun onStreamStateUpdate(streamStates: List<LivekitRtc.StreamStateInfo>)
410 fun onSubscribedQualityUpdate(subscribedQualityUpdate: LivekitRtc.SubscribedQualityUpdate) 438 fun onSubscribedQualityUpdate(subscribedQualityUpdate: LivekitRtc.SubscribedQualityUpdate)
411 fun onSubscriptionPermissionUpdate(subscriptionPermissionUpdate: LivekitRtc.SubscriptionPermissionUpdate) 439 fun onSubscriptionPermissionUpdate(subscriptionPermissionUpdate: LivekitRtc.SubscriptionPermissionUpdate)
  440 + fun onSignalConnected()
  441 + fun onReconnecting()
412 } 442 }
413 443
414 companion object { 444 companion object {
@@ -416,7 +446,7 @@ internal constructor( @@ -416,7 +446,7 @@ internal constructor(
416 private const val LOSSY_DATA_CHANNEL_LABEL = "_lossy" 446 private const val LOSSY_DATA_CHANNEL_LABEL = "_lossy"
417 internal const val MAX_DATA_PACKET_SIZE = 15000 447 internal const val MAX_DATA_PACKET_SIZE = 15000
418 private const val MAX_SIGNAL_RETRIES = 5 448 private const val MAX_SIGNAL_RETRIES = 5
419 - private const val MAX_ICE_CONNECT_TIMEOUT_MS = 5000 449 + private const val MAX_ICE_CONNECT_TIMEOUT_MS = 20000
420 450
421 internal val CONN_CONSTRAINTS = MediaConstraints().apply { 451 internal val CONN_CONSTRAINTS = MediaConstraints().apply {
422 with(optional) { 452 with(optional) {
@@ -433,11 +463,7 @@ internal constructor( @@ -433,11 +463,7 @@ internal constructor(
433 LKLog.i { sessionDescription.toString() } 463 LKLog.i { sessionDescription.toString() }
434 when (val outcome = publisher.setRemoteDescription(sessionDescription)) { 464 when (val outcome = publisher.setRemoteDescription(sessionDescription)) {
435 is Either.Left -> { 465 is Either.Left -> {
436 - // when reconnecting, ICE might not have disconnected and won't trigger  
437 - // our connected callback, so we'll take a shortcut and set it to active  
438 - if (iceState == IceState.RECONNECTING) {  
439 - iceState = IceState.CONNECTED  
440 - } 466 + // do nothing.
441 } 467 }
442 is Either.Right -> { 468 is Either.Right -> {
443 LKLog.e { "error setting remote description for answer: ${outcome.value} " } 469 LKLog.e { "error setting remote description for answer: ${outcome.value} " }
@@ -520,9 +546,8 @@ internal constructor( @@ -520,9 +546,8 @@ internal constructor(
520 } 546 }
521 547
522 override fun onClose(reason: String, code: Int) { 548 override fun onClose(reason: String, code: Int) {
523 - // TODO: reconnect logic  
524 LKLog.i { "received close event: $reason, code: $code" } 549 LKLog.i { "received close event: $reason, code: $code" }
525 - listener?.onDisconnect(reason) 550 + reconnect()
526 } 551 }
527 552
528 override fun onRemoteMuteChanged(trackSid: String, muted: Boolean) { 553 override fun onRemoteMuteChanged(trackSid: String, muted: Boolean) {
@@ -537,14 +562,17 @@ internal constructor( @@ -537,14 +562,17 @@ internal constructor(
537 listener?.onConnectionQuality(updates) 562 listener?.onConnectionQuality(updates)
538 } 563 }
539 564
540 - override fun onLeave() { 565 + override fun onLeave(leave: LivekitRtc.LeaveRequest) {
541 close() 566 close()
542 - listener?.onDisconnect("") 567 + listener?.onDisconnect("server leave")
543 } 568 }
544 569
  570 + // Signal error
545 override fun onError(error: Throwable) { 571 override fun onError(error: Throwable) {
  572 + if (isClosed) {
546 listener?.onFailToConnect(error) 573 listener?.onFailToConnect(error)
547 } 574 }
  575 + }
548 576
549 override fun onStreamStateUpdate(streamStates: List<LivekitRtc.StreamStateInfo>) { 577 override fun onStreamStateUpdate(streamStates: List<LivekitRtc.StreamStateInfo>) {
550 listener?.onStreamStateUpdate(streamStates) 578 listener?.onStreamStateUpdate(streamStates)
@@ -584,6 +612,21 @@ internal constructor( @@ -584,6 +612,21 @@ internal constructor(
584 } 612 }
585 } 613 }
586 } 614 }
  615 +
  616 + fun sendSyncState(
  617 + subscription: LivekitRtc.UpdateSubscription,
  618 + publishedTracks: List<LivekitRtc.TrackPublishedResponse>
  619 + ) {
  620 + val answer = subscriber.peerConnection.localDescription.toProtoSessionDescription()
  621 +
  622 + val syncState = LivekitRtc.SyncState.newBuilder()
  623 + .setAnswer(answer)
  624 + .setSubscription(subscription)
  625 + .addAllPublishTracks(publishedTracks)
  626 + .build()
  627 +
  628 + client.sendSyncState(syncState)
  629 + }
587 } 630 }
588 631
589 internal enum class IceState { 632 internal enum class IceState {
@@ -133,12 +133,16 @@ constructor( @@ -133,12 +133,16 @@ constructor(
133 get() = mutableActiveSpeakers 133 get() = mutableActiveSpeakers
134 134
135 private var hasLostConnectivity: Boolean = false 135 private var hasLostConnectivity: Boolean = false
  136 + private var connectOptions: ConnectOptions = ConnectOptions()
  137 +
  138 +
136 suspend fun connect(url: String, token: String, options: ConnectOptions = ConnectOptions()) { 139 suspend fun connect(url: String, token: String, options: ConnectOptions = ConnectOptions()) {
137 if (this::coroutineScope.isInitialized) { 140 if (this::coroutineScope.isInitialized) {
138 coroutineScope.cancel() 141 coroutineScope.cancel()
139 } 142 }
140 coroutineScope = CoroutineScope(defaultDispatcher + SupervisorJob()) 143 coroutineScope = CoroutineScope(defaultDispatcher + SupervisorJob())
141 state = State.CONNECTING 144 state = State.CONNECTING
  145 + this.connectOptions = connectOptions
142 val response = engine.join(url, token, options) 146 val response = engine.join(url, token, options)
143 LKLog.i { "Connected to server, server version: ${response.serverVersion}, client version: ${Version.CLIENT_VERSION}" } 147 LKLog.i { "Connected to server, server version: ${response.serverVersion}, client version: ${Version.CLIENT_VERSION}" }
144 148
@@ -174,6 +178,9 @@ constructor( @@ -174,6 +178,9 @@ constructor(
174 } 178 }
175 } 179 }
176 180
  181 + /**
  182 + * Disconnect from the room.
  183 + */
177 fun disconnect() { 184 fun disconnect() {
178 engine.client.sendLeave() 185 engine.client.sendLeave()
179 handleDisconnect() 186 handleDisconnect()
@@ -306,10 +313,7 @@ constructor( @@ -306,10 +313,7 @@ constructor(
306 if (state == State.RECONNECTING) { 313 if (state == State.RECONNECTING) {
307 return 314 return
308 } 315 }
309 - state = State.RECONNECTING  
310 engine.reconnect() 316 engine.reconnect()
311 - listener?.onReconnecting(this)  
312 - eventBus.postEvent(RoomEvent.Reconnecting(this), coroutineScope)  
313 } 317 }
314 318
315 private fun handleDisconnect() { 319 private fun handleDisconnect() {
@@ -345,6 +349,43 @@ constructor( @@ -345,6 +349,43 @@ constructor(
345 coroutineScope.cancel() 349 coroutineScope.cancel()
346 } 350 }
347 351
  352 + fun sendSyncState() {
  353 + // Whether we're sending subscribed tracks or tracks to unsubscribe.
  354 + val sendUnsub = connectOptions.autoSubscribe
  355 + val participantTracksList = mutableListOf<LivekitModels.ParticipantTracks>()
  356 + for (participant in remoteParticipants.values) {
  357 + val builder = LivekitModels.ParticipantTracks.newBuilder()
  358 + builder.participantSid = participant.sid
  359 + for (trackPub in participant.tracks.values) {
  360 + val remoteTrackPub = (trackPub as? RemoteTrackPublication) ?: continue
  361 + if (remoteTrackPub.subscribed != sendUnsub) {
  362 + builder.addTrackSids(remoteTrackPub.sid)
  363 + }
  364 + }
  365 +
  366 + if (builder.trackSidsCount > 0) {
  367 + participantTracksList.add(builder.build())
  368 + }
  369 + }
  370 +
  371 + val subscription = LivekitRtc.UpdateSubscription.newBuilder()
  372 + .setSubscribe(!sendUnsub)
  373 + .addAllParticipantTracks(participantTracksList)
  374 + .build()
  375 + val publishedTracks = localParticipant.publishTracksInfo()
  376 + engine.sendSyncState(subscription, publishedTracks)
  377 + }
  378 +
  379 + /**
  380 + * Sends a simulated scenario for the server to use.
  381 + *
  382 + * To be used for internal testing purposes only.
  383 + * @suppress
  384 + */
  385 + fun sendSimulateScenario(scenario: LivekitRtc.SimulateScenario) {
  386 + engine.client.sendSimulateScenario(scenario)
  387 + }
  388 +
348 /** 389 /**
349 * @suppress 390 * @suppress
350 */ 391 */
@@ -388,6 +429,12 @@ constructor( @@ -388,6 +429,12 @@ constructor(
388 eventBus.postEvent(RoomEvent.Reconnected(this), coroutineScope) 429 eventBus.postEvent(RoomEvent.Reconnected(this), coroutineScope)
389 } 430 }
390 431
  432 + override fun onReconnecting() {
  433 + state = State.RECONNECTING
  434 + listener?.onReconnecting(this)
  435 + eventBus.postEvent(RoomEvent.Reconnecting(this), coroutineScope)
  436 + }
  437 +
391 /** 438 /**
392 * @suppress 439 * @suppress
393 */ 440 */
@@ -513,6 +560,13 @@ constructor( @@ -513,6 +560,13 @@ constructor(
513 eventBus.tryPostEvent(RoomEvent.FailedToConnect(this, error)) 560 eventBus.tryPostEvent(RoomEvent.FailedToConnect(this, error))
514 } 561 }
515 562
  563 + override fun onSignalConnected() {
  564 + if (state == State.RECONNECTING) {
  565 + // during reconnection, need to send sync state upon signal connection.
  566 + sendSyncState()
  567 + }
  568 + }
  569 +
516 //------------------------------- ParticipantListener --------------------------------// 570 //------------------------------- ParticipantListener --------------------------------//
517 /** 571 /**
518 * This is called for both Local and Remote participants 572 * This is called for both Local and Remote participants
@@ -11,6 +11,7 @@ import io.livekit.android.util.CloseableCoroutineScope @@ -11,6 +11,7 @@ import io.livekit.android.util.CloseableCoroutineScope
11 import io.livekit.android.util.Either 11 import io.livekit.android.util.Either
12 import io.livekit.android.util.LKLog 12 import io.livekit.android.util.LKLog
13 import io.livekit.android.util.safe 13 import io.livekit.android.util.safe
  14 +import io.livekit.android.webrtc.toProtoSessionDescription
14 import kotlinx.coroutines.* 15 import kotlinx.coroutines.*
15 import kotlinx.coroutines.flow.MutableSharedFlow 16 import kotlinx.coroutines.flow.MutableSharedFlow
16 import kotlinx.coroutines.flow.collect 17 import kotlinx.coroutines.flow.collect
@@ -59,6 +60,10 @@ constructor( @@ -59,6 +60,10 @@ constructor(
59 private val coroutineScope = CloseableCoroutineScope(SupervisorJob() + ioDispatcher) 60 private val coroutineScope = CloseableCoroutineScope(SupervisorJob() + ioDispatcher)
60 61
61 private val responseFlow = MutableSharedFlow<LivekitRtc.SignalResponse>(Int.MAX_VALUE) 62 private val responseFlow = MutableSharedFlow<LivekitRtc.SignalResponse>(Int.MAX_VALUE)
  63 +
  64 + /**
  65 + * @throws Exception if fails to connect.
  66 + */
62 suspend fun join( 67 suspend fun join(
63 url: String, 68 url: String,
64 token: String, 69 token: String,
@@ -68,6 +73,9 @@ constructor( @@ -68,6 +73,9 @@ constructor(
68 return (joinResponse as Either.Left).value 73 return (joinResponse as Either.Left).value
69 } 74 }
70 75
  76 + /**
  77 + * @throws Exception if fails to connect.
  78 + */
71 suspend fun reconnect(url: String, token: String) { 79 suspend fun reconnect(url: String, token: String) {
72 connect( 80 connect(
73 url, 81 url,
@@ -141,7 +149,6 @@ constructor( @@ -141,7 +149,6 @@ constructor(
141 } 149 }
142 150
143 override fun onMessage(webSocket: WebSocket, text: String) { 151 override fun onMessage(webSocket: WebSocket, text: String) {
144 - LKLog.v { text }  
145 val signalResponseBuilder = LivekitRtc.SignalResponse.newBuilder() 152 val signalResponseBuilder = LivekitRtc.SignalResponse.newBuilder()
146 fromJsonProtobuf.merge(text, signalResponseBuilder) 153 fromJsonProtobuf.merge(text, signalResponseBuilder)
147 val response = signalResponseBuilder.build() 154 val response = signalResponseBuilder.build()
@@ -159,9 +166,7 @@ constructor( @@ -159,9 +166,7 @@ constructor(
159 } 166 }
160 167
161 override fun onClosed(webSocket: WebSocket, code: Int, reason: String) { 168 override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
162 - LKLog.v { "websocket closed" }  
163 -  
164 - listener?.onClose(reason, code) 169 + handleWebSocketClose(reason, code)
165 } 170 }
166 171
167 override fun onClosing(webSocket: WebSocket, code: Int, reason: String) { 172 override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
@@ -193,6 +198,21 @@ constructor( @@ -193,6 +198,21 @@ constructor(
193 listener?.onError(t) 198 listener?.onError(t)
194 joinContinuation?.cancel(t) 199 joinContinuation?.cancel(t)
195 } 200 }
  201 +
  202 + val wasConnected = isConnected
  203 + isConnected = false
  204 +
  205 + if (wasConnected) {
  206 + handleWebSocketClose(
  207 + reason = reason ?: response?.toString() ?: t.localizedMessage ?: "websocket failure",
  208 + code = response?.code ?: 500
  209 + )
  210 + }
  211 + }
  212 +
  213 + private fun handleWebSocketClose(reason: String, code: Int) {
  214 + LKLog.v { "websocket closed" }
  215 + listener?.onClose(reason, code)
196 } 216 }
197 217
198 //------------------------------- End WebSocket Listener ------------------------------------// 218 //------------------------------- End WebSocket Listener ------------------------------------//
@@ -207,21 +227,8 @@ constructor( @@ -207,21 +227,8 @@ constructor(
207 return SessionDescription(rtcSdpType, sd.sdp) 227 return SessionDescription(rtcSdpType, sd.sdp)
208 } 228 }
209 229
210 - private fun toProtoSessionDescription(sdp: SessionDescription): LivekitRtc.SessionDescription {  
211 - val sdBuilder = LivekitRtc.SessionDescription.newBuilder()  
212 - sdBuilder.sdp = sdp.description  
213 - sdBuilder.type = when (sdp.type) {  
214 - SessionDescription.Type.ANSWER -> SD_TYPE_ANSWER  
215 - SessionDescription.Type.OFFER -> SD_TYPE_OFFER  
216 - SessionDescription.Type.PRANSWER -> SD_TYPE_PRANSWER  
217 - else -> throw IllegalArgumentException("invalid RTC SdpType: ${sdp.type}")  
218 - }  
219 -  
220 - return sdBuilder.build()  
221 - }  
222 -  
223 fun sendOffer(offer: SessionDescription) { 230 fun sendOffer(offer: SessionDescription) {
224 - val sd = toProtoSessionDescription(offer) 231 + val sd = offer.toProtoSessionDescription()
225 val request = LivekitRtc.SignalRequest.newBuilder() 232 val request = LivekitRtc.SignalRequest.newBuilder()
226 .setOffer(sd) 233 .setOffer(sd)
227 .build() 234 .build()
@@ -230,7 +237,7 @@ constructor( @@ -230,7 +237,7 @@ constructor(
230 } 237 }
231 238
232 fun sendAnswer(answer: SessionDescription) { 239 fun sendAnswer(answer: SessionDescription) {
233 - val sd = toProtoSessionDescription(answer) 240 + val sd = answer.toProtoSessionDescription()
234 val request = LivekitRtc.SignalRequest.newBuilder() 241 val request = LivekitRtc.SignalRequest.newBuilder()
235 .setAnswer(sd) 242 .setAnswer(sd)
236 .build() 243 .build()
@@ -345,6 +352,22 @@ constructor( @@ -345,6 +352,22 @@ constructor(
345 sendRequest(request) 352 sendRequest(request)
346 } 353 }
347 354
  355 + fun sendSyncState(syncState: LivekitRtc.SyncState) {
  356 + val request = LivekitRtc.SignalRequest.newBuilder()
  357 + .setSyncState(syncState)
  358 + .build()
  359 +
  360 + sendRequest(request)
  361 + }
  362 +
  363 + internal fun sendSimulateScenario(scenario: LivekitRtc.SimulateScenario) {
  364 + val request = LivekitRtc.SignalRequest.newBuilder()
  365 + .setSimulate(scenario)
  366 + .build()
  367 +
  368 + sendRequest(request)
  369 + }
  370 +
348 fun sendLeave() { 371 fun sendLeave() {
349 val request = LivekitRtc.SignalRequest.newBuilder() 372 val request = LivekitRtc.SignalRequest.newBuilder()
350 .setLeave(LivekitRtc.LeaveRequest.newBuilder().build()) 373 .setLeave(LivekitRtc.LeaveRequest.newBuilder().build())
@@ -373,6 +396,8 @@ constructor( @@ -373,6 +396,8 @@ constructor(
373 } 396 }
374 397
375 private fun handleSignalResponse(response: LivekitRtc.SignalResponse) { 398 private fun handleSignalResponse(response: LivekitRtc.SignalResponse) {
  399 + LKLog.v { "response: $response" }
  400 +
376 if (!isConnected) { 401 if (!isConnected) {
377 // Only handle joins if not connected. 402 // Only handle joins if not connected.
378 if (response.hasJoin()) { 403 if (response.hasJoin()) {
@@ -394,7 +419,6 @@ constructor( @@ -394,7 +419,6 @@ constructor(
394 } 419 }
395 420
396 private fun handleSignalResponseImpl(response: LivekitRtc.SignalResponse) { 421 private fun handleSignalResponseImpl(response: LivekitRtc.SignalResponse) {
397 - LKLog.v { "response: $response" }  
398 when (response.messageCase) { 422 when (response.messageCase) {
399 LivekitRtc.SignalResponse.MessageCase.ANSWER -> { 423 LivekitRtc.SignalResponse.MessageCase.ANSWER -> {
400 val sd = fromProtoSessionDescription(response.answer) 424 val sd = fromProtoSessionDescription(response.answer)
@@ -427,7 +451,7 @@ constructor( @@ -427,7 +451,7 @@ constructor(
427 LKLog.d { "received unexpected extra join message?" } 451 LKLog.d { "received unexpected extra join message?" }
428 } 452 }
429 LivekitRtc.SignalResponse.MessageCase.LEAVE -> { 453 LivekitRtc.SignalResponse.MessageCase.LEAVE -> {
430 - listener?.onLeave() 454 + listener?.onLeave(response.leave)
431 } 455 }
432 LivekitRtc.SignalResponse.MessageCase.MUTE -> { 456 LivekitRtc.SignalResponse.MessageCase.MUTE -> {
433 listener?.onRemoteMuteChanged(response.mute.sid, response.mute.muted) 457 listener?.onRemoteMuteChanged(response.mute.sid, response.mute.muted)
@@ -475,7 +499,7 @@ constructor( @@ -475,7 +499,7 @@ constructor(
475 fun onRemoteMuteChanged(trackSid: String, muted: Boolean) 499 fun onRemoteMuteChanged(trackSid: String, muted: Boolean)
476 fun onRoomUpdate(update: LivekitModels.Room) 500 fun onRoomUpdate(update: LivekitModels.Room)
477 fun onConnectionQuality(updates: List<LivekitRtc.ConnectionQualityInfo>) 501 fun onConnectionQuality(updates: List<LivekitRtc.ConnectionQualityInfo>)
478 - fun onLeave() 502 + fun onLeave(leave: LivekitRtc.LeaveRequest)
479 fun onError(error: Throwable) 503 fun onError(error: Throwable)
480 fun onStreamStateUpdate(streamStates: List<LivekitRtc.StreamStateInfo>) 504 fun onStreamStateUpdate(streamStates: List<LivekitRtc.StreamStateInfo>)
481 fun onSubscribedQualityUpdate(subscribedQualityUpdate: LivekitRtc.SubscribedQualityUpdate) 505 fun onSubscribedQualityUpdate(subscribedQualityUpdate: LivekitRtc.SubscribedQualityUpdate)
@@ -486,7 +510,7 @@ constructor( @@ -486,7 +510,7 @@ constructor(
486 const val SD_TYPE_ANSWER = "answer" 510 const val SD_TYPE_ANSWER = "answer"
487 const val SD_TYPE_OFFER = "offer" 511 const val SD_TYPE_OFFER = "offer"
488 const val SD_TYPE_PRANSWER = "pranswer" 512 const val SD_TYPE_PRANSWER = "pranswer"
489 - const val PROTOCOL_VERSION = 5 513 + const val PROTOCOL_VERSION = 6
490 const val SDK_TYPE = "android" 514 const val SDK_TYPE = "android"
491 515
492 private fun iceServer(url: String) = 516 private fun iceServer(url: String) =
@@ -382,12 +382,13 @@ internal constructor( @@ -382,12 +382,13 @@ internal constructor(
382 height = trackHeight 382 height = trackHeight
383 quality = LivekitModels.VideoQuality.HIGH 383 quality = LivekitModels.VideoQuality.HIGH
384 bitrate = 0 384 bitrate = 0
  385 + ssrc = 0
385 }.build() 386 }.build()
386 ) 387 )
387 } else { 388 } else {
388 - encodings.map {  
389 - val scaleDownBy = it.scaleResolutionDownBy ?: 1.0  
390 - var videoQuality = videoQualityForRid(it.rid ?: "") 389 + encodings.map { encoding ->
  390 + val scaleDownBy = encoding.scaleResolutionDownBy ?: 1.0
  391 + var videoQuality = videoQualityForRid(encoding.rid ?: "")
391 if (videoQuality == LivekitModels.VideoQuality.UNRECOGNIZED && encodings.size == 1) { 392 if (videoQuality == LivekitModels.VideoQuality.UNRECOGNIZED && encodings.size == 1) {
392 videoQuality = LivekitModels.VideoQuality.HIGH 393 videoQuality = LivekitModels.VideoQuality.HIGH
393 } 394 }
@@ -395,7 +396,8 @@ internal constructor( @@ -395,7 +396,8 @@ internal constructor(
395 width = (trackWidth / scaleDownBy).roundToInt() 396 width = (trackWidth / scaleDownBy).roundToInt()
396 height = (trackHeight / scaleDownBy).roundToInt() 397 height = (trackHeight / scaleDownBy).roundToInt()
397 quality = videoQuality 398 quality = videoQuality
398 - bitrate = it.maxBitrateBps ?: 0 399 + bitrate = encoding.maxBitrateBps ?: 0
  400 + ssrc = 0
399 }.build() 401 }.build()
400 } 402 }
401 } 403 }
@@ -587,6 +589,17 @@ internal constructor( @@ -587,6 +589,17 @@ internal constructor(
587 } 589 }
588 } 590 }
589 591
  592 +internal fun LocalParticipant.publishTracksInfo(): List<LivekitRtc.TrackPublishedResponse> {
  593 + return tracks.values.mapNotNull { trackPub ->
  594 + val track = trackPub.track ?: return@mapNotNull null
  595 +
  596 + LivekitRtc.TrackPublishedResponse.newBuilder()
  597 + .setCid(track.rtcTrack.id())
  598 + .setTrack(trackPub.trackInfo)
  599 + .build()
  600 + }
  601 +}
  602 +
590 interface TrackPublishOptions { 603 interface TrackPublishOptions {
591 val name: String? 604 val name: String?
592 } 605 }
@@ -11,6 +11,7 @@ open class TrackPublication( @@ -11,6 +11,7 @@ open class TrackPublication(
11 track: Track?, 11 track: Track?,
12 participant: Participant 12 participant: Participant
13 ) { 13 ) {
  14 +
14 @get:FlowObservable 15 @get:FlowObservable
15 open var track: Track? by flowDelegate(track) 16 open var track: Track? by flowDelegate(track)
16 internal set 17 internal set
@@ -35,6 +36,8 @@ open class TrackPublication( @@ -35,6 +36,8 @@ open class TrackPublication(
35 var mimeType: String? = null 36 var mimeType: String? = null
36 internal set 37 internal set
37 38
  39 + internal var trackInfo: LivekitModels.TrackInfo? = null
  40 +
38 var participant: WeakReference<Participant> 41 var participant: WeakReference<Participant>
39 42
40 init { 43 init {
@@ -56,5 +59,7 @@ open class TrackPublication( @@ -56,5 +59,7 @@ open class TrackPublication(
56 dimensions = Track.Dimensions(info.width, info.height) 59 dimensions = Track.Dimensions(info.width, info.height)
57 } 60 }
58 mimeType = info.mimeType 61 mimeType = info.mimeType
  62 +
  63 + trackInfo = info
59 } 64 }
60 } 65 }
@@ -6,10 +6,13 @@ import org.webrtc.PeerConnection @@ -6,10 +6,13 @@ import org.webrtc.PeerConnection
6 * Completed state is a valid state for a connected connection, so this should be used 6 * Completed state is a valid state for a connected connection, so this should be used
7 * when checking for a connected state 7 * when checking for a connected state
8 */ 8 */
9 -internal fun PeerConnection.isConnected(): Boolean {  
10 - return when (iceConnectionState()) { 9 +internal fun PeerConnection.isConnected(): Boolean = iceConnectionState().isConnected()
  10 +
  11 +internal fun PeerConnection.IceConnectionState.isConnected(): Boolean {
  12 + return when (this) {
11 PeerConnection.IceConnectionState.CONNECTED, 13 PeerConnection.IceConnectionState.CONNECTED,
12 PeerConnection.IceConnectionState.COMPLETED -> true 14 PeerConnection.IceConnectionState.COMPLETED -> true
13 else -> false 15 else -> false
14 } 16 }
15 } 17 }
  18 +
  1 +package io.livekit.android.webrtc
  2 +
  3 +import livekit.LivekitRtc
  4 +import org.webrtc.SessionDescription
  5 +
  6 +fun SessionDescription.toProtoSessionDescription(): LivekitRtc.SessionDescription {
  7 + val sdBuilder = LivekitRtc.SessionDescription.newBuilder()
  8 + sdBuilder.sdp = description
  9 + sdBuilder.type = type.canonicalForm()
  10 +
  11 + return sdBuilder.build()
  12 +}
1 -package io.livekit.android  
2 -  
3 -import org.junit.Test  
4 -  
5 -import org.junit.Assert.*  
6 -  
7 -/**  
8 - * Example local unit test, which will execute on the development machine (host).  
9 - *  
10 - * See [testing documentation](http://d.android.com/tools/testing).  
11 - */  
12 -class ExampleUnitTest {  
13 - @Test  
14 - fun addition_isCorrect() {  
15 - assertEquals(4, 2 + 2)  
16 - }  
17 -}  
  1 +package io.livekit.android
  2 +
  3 +import android.content.Context
  4 +import androidx.test.core.app.ApplicationProvider
  5 +import io.livekit.android.coroutines.TestCoroutineRule
  6 +import io.livekit.android.mock.MockWebSocketFactory
  7 +import io.livekit.android.mock.dagger.DaggerTestLiveKitComponent
  8 +import io.livekit.android.mock.dagger.TestCoroutinesModule
  9 +import io.livekit.android.mock.dagger.TestLiveKitComponent
  10 +import io.livekit.android.room.Room
  11 +import io.livekit.android.room.SignalClientTest
  12 +import io.livekit.android.util.toOkioByteString
  13 +import kotlinx.coroutines.ExperimentalCoroutinesApi
  14 +import kotlinx.coroutines.launch
  15 +import kotlinx.coroutines.test.runBlockingTest
  16 +import okhttp3.Protocol
  17 +import okhttp3.Request
  18 +import okhttp3.Response
  19 +import org.junit.Before
  20 +import org.junit.Rule
  21 +import org.mockito.junit.MockitoJUnit
  22 +
  23 +@ExperimentalCoroutinesApi
  24 +abstract class MockE2ETest {
  25 +
  26 + @get:Rule
  27 + var mockitoRule = MockitoJUnit.rule()
  28 +
  29 + @get:Rule
  30 + var coroutineRule = TestCoroutineRule()
  31 +
  32 + lateinit var component: TestLiveKitComponent
  33 + lateinit var context: Context
  34 + lateinit var room: Room
  35 + lateinit var wsFactory: MockWebSocketFactory
  36 +
  37 + @Before
  38 + fun setup() {
  39 + context = ApplicationProvider.getApplicationContext()
  40 + component = DaggerTestLiveKitComponent
  41 + .factory()
  42 + .create(context, TestCoroutinesModule(coroutineRule.dispatcher))
  43 +
  44 + room = component.roomFactory()
  45 + .create(context)
  46 + wsFactory = component.websocketFactory()
  47 + }
  48 +
  49 + fun connect() {
  50 + val job = coroutineRule.scope.launch {
  51 + room.connect(
  52 + url = SignalClientTest.EXAMPLE_URL,
  53 + token = "",
  54 + )
  55 + }
  56 + wsFactory.listener.onOpen(wsFactory.ws, createOpenResponse(wsFactory.request))
  57 + wsFactory.listener.onMessage(wsFactory.ws, SignalClientTest.JOIN.toOkioByteString())
  58 +
  59 + // PeerTransport negotiation is on a debounce delay.
  60 + coroutineRule.dispatcher.advanceTimeBy(1000L)
  61 + runBlockingTest {
  62 + job.join()
  63 + }
  64 + }
  65 +
  66 + fun createOpenResponse(request: Request): Response {
  67 + return Response.Builder()
  68 + .request(request)
  69 + .code(200)
  70 + .protocol(Protocol.HTTP_2)
  71 + .message("")
  72 + .build()
  73 + }
  74 +}
@@ -7,7 +7,7 @@ private class MockNativePeerConnectionFactory : NativePeerConnectionFactory { @@ -7,7 +7,7 @@ private class MockNativePeerConnectionFactory : NativePeerConnectionFactory {
7 } 7 }
8 8
9 class MockPeerConnection( 9 class MockPeerConnection(
10 - private val observer: PeerConnection.Observer? 10 + val observer: PeerConnection.Observer?
11 ) : PeerConnection(MockNativePeerConnectionFactory()) { 11 ) : PeerConnection(MockNativePeerConnectionFactory()) {
12 12
13 var localDesc: SessionDescription? = null 13 var localDesc: SessionDescription? = null
@@ -33,12 +33,12 @@ class MockPeerConnection( @@ -33,12 +33,12 @@ class MockPeerConnection(
33 } 33 }
34 34
35 override fun createOffer(observer: SdpObserver?, constraints: MediaConstraints?) { 35 override fun createOffer(observer: SdpObserver?, constraints: MediaConstraints?) {
36 - val sdp = SessionDescription(SessionDescription.Type.OFFER, "") 36 + val sdp = SessionDescription(SessionDescription.Type.OFFER, "local_offer")
37 observer?.onCreateSuccess(sdp) 37 observer?.onCreateSuccess(sdp)
38 } 38 }
39 39
40 override fun createAnswer(observer: SdpObserver?, constraints: MediaConstraints?) { 40 override fun createAnswer(observer: SdpObserver?, constraints: MediaConstraints?) {
41 - val sdp = SessionDescription(SessionDescription.Type.ANSWER, "") 41 + val sdp = SessionDescription(SessionDescription.Type.ANSWER, "local_answer")
42 observer?.onCreateSuccess(sdp) 42 observer?.onCreateSuccess(sdp)
43 } 43 }
44 44
@@ -143,8 +143,41 @@ class MockPeerConnection( @@ -143,8 +143,41 @@ class MockPeerConnection(
143 return super.signalingState() 143 return super.signalingState()
144 } 144 }
145 145
146 - override fun iceConnectionState(): IceConnectionState {  
147 - return super.iceConnectionState() 146 + private var iceConnectionState = IceConnectionState.NEW
  147 + set(value) {
  148 + if (field != value) {
  149 + field = value
  150 + observer?.onIceConnectionChange(field)
  151 + }
  152 + }
  153 +
  154 + override fun iceConnectionState(): IceConnectionState = iceConnectionState
  155 +
  156 + fun moveToIceConnectionState(newState: IceConnectionState) {
  157 + when (newState) {
  158 + IceConnectionState.NEW,
  159 + IceConnectionState.CHECKING,
  160 + IceConnectionState.CONNECTED,
  161 + IceConnectionState.COMPLETED -> {
  162 + val currentOrdinal = iceConnectionState.ordinal
  163 + val newOrdinal = newState.ordinal
  164 +
  165 + if (currentOrdinal < newOrdinal) {
  166 + // Ensure that we move through each state.
  167 + for (ordinal in ((currentOrdinal + 1)..newOrdinal)) {
  168 + iceConnectionState = IceConnectionState.values()[ordinal]
  169 + }
  170 + } else {
  171 + iceConnectionState = newState
  172 + }
  173 + }
  174 + IceConnectionState.FAILED,
  175 + IceConnectionState.DISCONNECTED,
  176 + IceConnectionState.CLOSED -> {
  177 + // jump to state directly.
  178 + iceConnectionState = newState
  179 + }
  180 + }
148 } 181 }
149 182
150 override fun connectionState(): PeerConnectionState { 183 override fun connectionState(): PeerConnectionState {
1 -package io.livekit.android.mock  
2 -  
3 -import io.livekit.android.room.PeerConnectionTransport  
4 -import kotlinx.coroutines.CoroutineDispatcher  
5 -import org.webrtc.PeerConnection  
6 -import org.webrtc.PeerConnectionFactory  
7 -  
8 -internal class MockPeerConnectionTransportFactory(  
9 - private val dispatcher: CoroutineDispatcher,  
10 -) : PeerConnectionTransport.Factory {  
11 - override fun create(  
12 - config: PeerConnection.RTCConfiguration,  
13 - pcObserver: PeerConnection.Observer,  
14 - listener: PeerConnectionTransport.Listener?  
15 - ): PeerConnectionTransport {  
16 - return PeerConnectionTransport(  
17 - config,  
18 - pcObserver,  
19 - listener,  
20 - dispatcher,  
21 - PeerConnectionFactory.builder()  
22 - .createPeerConnectionFactory()  
23 - )  
24 - }  
25 -}  
  1 +package io.livekit.android.mock
  2 +
  3 +import okhttp3.Request
  4 +import okhttp3.WebSocket
  5 +import okio.ByteString
  6 +
  7 +class MockWebSocket(private val request: Request) : WebSocket {
  8 +
  9 + var isClosed = false
  10 + private set
  11 +
  12 + private val mutableSentRequests = mutableListOf<ByteString>()
  13 + val sentRequests: List<ByteString>
  14 + get() = mutableSentRequests
  15 +
  16 + override fun cancel() {
  17 + isClosed = true
  18 + }
  19 +
  20 + override fun close(code: Int, reason: String?): Boolean {
  21 + val willClose = !isClosed
  22 + isClosed = true
  23 + return willClose
  24 + }
  25 +
  26 + override fun queueSize(): Long = 0
  27 +
  28 + override fun request(): Request = request
  29 +
  30 + override fun send(text: String): Boolean = !isClosed
  31 +
  32 + override fun send(bytes: ByteString): Boolean {
  33 + mutableSentRequests.add(bytes)
  34 + return !isClosed
  35 + }
  36 +
  37 +
  38 +}
@@ -3,14 +3,25 @@ package io.livekit.android.mock @@ -3,14 +3,25 @@ package io.livekit.android.mock
3 import okhttp3.Request 3 import okhttp3.Request
4 import okhttp3.WebSocket 4 import okhttp3.WebSocket
5 import okhttp3.WebSocketListener 5 import okhttp3.WebSocketListener
6 -import org.mockito.Mockito  
7 6
8 -class MockWebsocketFactory : WebSocket.Factory { 7 +class MockWebSocketFactory : WebSocket.Factory {
  8 + /**
  9 + * The most recently created [WebSocket].
  10 + */
9 lateinit var ws: WebSocket 11 lateinit var ws: WebSocket
  12 +
  13 + /**
  14 + * The request used to create [ws]
  15 + */
10 lateinit var request: Request 16 lateinit var request: Request
  17 +
  18 + /**
  19 + * The listener associated with [ws]
  20 + */
11 lateinit var listener: WebSocketListener 21 lateinit var listener: WebSocketListener
12 override fun newWebSocket(request: Request, listener: WebSocketListener): WebSocket { 22 override fun newWebSocket(request: Request, listener: WebSocketListener): WebSocket {
13 - this.ws = Mockito.mock(WebSocket::class.java) 23 + this.ws = MockWebSocket(request)
  24 +
14 this.listener = listener 25 this.listener = listener
15 this.request = request 26 this.request = request
16 return ws 27 return ws
@@ -5,7 +5,8 @@ import dagger.BindsInstance @@ -5,7 +5,8 @@ import dagger.BindsInstance
5 import dagger.Component 5 import dagger.Component
6 import io.livekit.android.dagger.JsonFormatModule 6 import io.livekit.android.dagger.JsonFormatModule
7 import io.livekit.android.dagger.LiveKitComponent 7 import io.livekit.android.dagger.LiveKitComponent
8 -import io.livekit.android.mock.MockWebsocketFactory 8 +import io.livekit.android.mock.MockWebSocketFactory
  9 +import io.livekit.android.room.RTCEngine
9 import javax.inject.Singleton 10 import javax.inject.Singleton
10 11
11 @Singleton 12 @Singleton
@@ -19,10 +20,15 @@ import javax.inject.Singleton @@ -19,10 +20,15 @@ import javax.inject.Singleton
19 ) 20 )
20 internal interface TestLiveKitComponent : LiveKitComponent { 21 internal interface TestLiveKitComponent : LiveKitComponent {
21 22
22 - fun websocketFactory(): MockWebsocketFactory 23 + fun websocketFactory(): MockWebSocketFactory
  24 +
  25 + fun rtcEngine(): RTCEngine
23 26
24 @Component.Factory 27 @Component.Factory
25 interface Factory { 28 interface Factory {
26 - fun create(@BindsInstance appContext: Context, coroutinesModule: TestCoroutinesModule = TestCoroutinesModule()): TestLiveKitComponent 29 + fun create(
  30 + @BindsInstance appContext: Context,
  31 + coroutinesModule: TestCoroutinesModule = TestCoroutinesModule()
  32 + ): TestLiveKitComponent
27 } 33 }
28 } 34 }
@@ -28,12 +28,7 @@ object TestRTCModule { @@ -28,12 +28,7 @@ object TestRTCModule {
28 fun peerConnectionFactory( 28 fun peerConnectionFactory(
29 appContext: Context 29 appContext: Context
30 ): PeerConnectionFactory { 30 ): PeerConnectionFactory {
31 - try {  
32 - ContextUtils.initialize(appContext)  
33 - NativeLibraryLoaderTestHelper.initialize()  
34 - } catch (e: Throwable) {  
35 - // do nothing. this is expected.  
36 - } 31 + WebRTCInitializer.initialize(appContext)
37 32
38 return MockPeerConnectionFactory() 33 return MockPeerConnectionFactory()
39 } 34 }
@@ -2,7 +2,7 @@ package io.livekit.android.mock.dagger @@ -2,7 +2,7 @@ package io.livekit.android.mock.dagger
2 2
3 import dagger.Module 3 import dagger.Module
4 import dagger.Provides 4 import dagger.Provides
5 -import io.livekit.android.mock.MockWebsocketFactory 5 +import io.livekit.android.mock.MockWebSocketFactory
6 import okhttp3.OkHttpClient 6 import okhttp3.OkHttpClient
7 import okhttp3.Response 7 import okhttp3.Response
8 import okhttp3.WebSocket 8 import okhttp3.WebSocket
@@ -26,13 +26,13 @@ object TestWebModule { @@ -26,13 +26,13 @@ object TestWebModule {
26 26
27 @Provides 27 @Provides
28 @Singleton 28 @Singleton
29 - fun websocketFactory(websocketFactory: MockWebsocketFactory): WebSocket.Factory {  
30 - return websocketFactory 29 + fun websocketFactory(webSocketFactory: MockWebSocketFactory): WebSocket.Factory {
  30 + return webSocketFactory
31 } 31 }
32 32
33 @Provides 33 @Provides
34 @Singleton 34 @Singleton
35 - fun mockWebsocketFactory(): MockWebsocketFactory {  
36 - return MockWebsocketFactory() 35 + fun mockWebsocketFactory(): MockWebSocketFactory {
  36 + return MockWebSocketFactory()
37 } 37 }
38 } 38 }
  1 +package io.livekit.android.room
  2 +
  3 +import io.livekit.android.MockE2ETest
  4 +import io.livekit.android.mock.MockPeerConnection
  5 +import io.livekit.android.mock.MockWebSocket
  6 +import io.livekit.android.util.LoggingRule
  7 +import io.livekit.android.util.toPBByteString
  8 +import kotlinx.coroutines.ExperimentalCoroutinesApi
  9 +import kotlinx.coroutines.test.runBlockingTest
  10 +import livekit.LivekitRtc
  11 +import org.junit.Assert
  12 +import org.junit.Before
  13 +import org.junit.Rule
  14 +import org.junit.Test
  15 +import org.junit.runner.RunWith
  16 +import org.robolectric.RobolectricTestRunner
  17 +import org.webrtc.PeerConnection
  18 +import org.webrtc.SessionDescription
  19 +
  20 +
  21 +@ExperimentalCoroutinesApi
  22 +@RunWith(RobolectricTestRunner::class)
  23 +class RTCEngineMockE2ETest : MockE2ETest() {
  24 +
  25 +
  26 + @get:Rule
  27 + var loggingRule = LoggingRule()
  28 +
  29 + lateinit var rtcEngine: RTCEngine
  30 +
  31 + @Before
  32 + fun setupRTCEngine() {
  33 + rtcEngine = component.rtcEngine()
  34 + }
  35 +
  36 + @Test
  37 + fun iceSubscriberConnect() = runBlockingTest {
  38 + connect()
  39 +
  40 + val remoteOffer = SessionDescription(SessionDescription.Type.OFFER, "remote_offer")
  41 + rtcEngine.onOffer(remoteOffer)
  42 +
  43 + Assert.assertEquals(remoteOffer, rtcEngine.subscriber.peerConnection.remoteDescription)
  44 +
  45 + val ws = wsFactory.ws as MockWebSocket
  46 + val sentRequest = LivekitRtc.SignalRequest.newBuilder()
  47 + .mergeFrom(ws.sentRequests[0].toPBByteString())
  48 + .build()
  49 +
  50 + val subPeerConnection = rtcEngine.subscriber.peerConnection as MockPeerConnection
  51 + val localAnswer = subPeerConnection.localDescription ?: throw IllegalStateException("no answer was created.")
  52 + Assert.assertTrue(sentRequest.hasAnswer())
  53 + Assert.assertEquals(localAnswer.description, sentRequest.answer.sdp)
  54 + Assert.assertEquals(localAnswer.type.canonicalForm(), sentRequest.answer.type)
  55 +
  56 + subPeerConnection.moveToIceConnectionState(PeerConnection.IceConnectionState.CONNECTED)
  57 +
  58 + Assert.assertEquals(IceState.CONNECTED, rtcEngine.iceState)
  59 + }
  60 +
  61 + @Test
  62 + fun reconnectOnFailure() = runBlockingTest {
  63 + connect()
  64 + val oldWs = wsFactory.ws
  65 + wsFactory.listener.onFailure(oldWs, Exception(), null)
  66 +
  67 + val newWs = wsFactory.ws
  68 + Assert.assertNotEquals(oldWs, newWs)
  69 + }
  70 +}
1 package io.livekit.android.room 1 package io.livekit.android.room
2 2
3 -import android.content.Context  
4 -import androidx.test.core.app.ApplicationProvider  
5 -import io.livekit.android.coroutines.TestCoroutineRule 3 +import android.net.Network
  4 +import io.livekit.android.MockE2ETest
6 import io.livekit.android.events.EventCollector 5 import io.livekit.android.events.EventCollector
7 -import io.livekit.android.events.ParticipantEvent  
8 import io.livekit.android.events.RoomEvent 6 import io.livekit.android.events.RoomEvent
9 -import io.livekit.android.mock.*  
10 -import io.livekit.android.mock.dagger.DaggerTestLiveKitComponent  
11 -import io.livekit.android.mock.dagger.TestCoroutinesModule 7 +import io.livekit.android.mock.MockAudioStreamTrack
  8 +import io.livekit.android.mock.MockMediaStream
  9 +import io.livekit.android.mock.TestData
  10 +import io.livekit.android.mock.createMediaStreamId
12 import io.livekit.android.room.participant.ConnectionQuality 11 import io.livekit.android.room.participant.ConnectionQuality
13 import io.livekit.android.room.track.Track 12 import io.livekit.android.room.track.Track
14 import io.livekit.android.util.toOkioByteString 13 import io.livekit.android.util.toOkioByteString
@@ -16,54 +15,14 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi @@ -16,54 +15,14 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi
16 import kotlinx.coroutines.launch 15 import kotlinx.coroutines.launch
17 import kotlinx.coroutines.test.runBlockingTest 16 import kotlinx.coroutines.test.runBlockingTest
18 import org.junit.Assert 17 import org.junit.Assert
19 -import org.junit.Before  
20 -import org.junit.Rule  
21 import org.junit.Test 18 import org.junit.Test
22 import org.junit.runner.RunWith 19 import org.junit.runner.RunWith
23 -import org.mockito.junit.MockitoJUnit 20 +import org.mockito.Mockito
24 import org.robolectric.RobolectricTestRunner 21 import org.robolectric.RobolectricTestRunner
25 22
26 @ExperimentalCoroutinesApi 23 @ExperimentalCoroutinesApi
27 @RunWith(RobolectricTestRunner::class) 24 @RunWith(RobolectricTestRunner::class)
28 -class RoomMockE2ETest {  
29 -  
30 - @get:Rule  
31 - var mockitoRule = MockitoJUnit.rule()  
32 -  
33 - @get:Rule  
34 - var coroutineRule = TestCoroutineRule()  
35 -  
36 - lateinit var context: Context  
37 - lateinit var room: Room  
38 - lateinit var wsFactory: MockWebsocketFactory  
39 -  
40 - @Before  
41 - fun setup() {  
42 - context = ApplicationProvider.getApplicationContext()  
43 - val component = DaggerTestLiveKitComponent  
44 - .factory()  
45 - .create(context, TestCoroutinesModule(coroutineRule.dispatcher))  
46 -  
47 - room = component.roomFactory()  
48 - .create(context)  
49 - wsFactory = component.websocketFactory()  
50 - }  
51 -  
52 - fun connect() {  
53 - val job = coroutineRule.scope.launch {  
54 - room.connect(  
55 - url = "http://www.example.com",  
56 - token = "",  
57 - )  
58 - }  
59 - wsFactory.listener.onMessage(wsFactory.ws, SignalClientTest.JOIN.toOkioByteString())  
60 -  
61 - // PeerTransport negotiation is on a debounce delay.  
62 - coroutineRule.dispatcher.advanceTimeBy(1000L)  
63 - runBlockingTest {  
64 - job.join()  
65 - }  
66 - } 25 +class RoomMockE2ETest : MockE2ETest() {
67 26
68 @Test 27 @Test
69 fun connectTest() { 28 fun connectTest() {
@@ -77,7 +36,7 @@ class RoomMockE2ETest { @@ -77,7 +36,7 @@ class RoomMockE2ETest {
77 val job = coroutineRule.scope.launch { 36 val job = coroutineRule.scope.launch {
78 try { 37 try {
79 room.connect( 38 room.connect(
80 - url = "http://www.example.com", 39 + url = SignalClientTest.EXAMPLE_URL,
81 token = "", 40 token = "",
82 ) 41 )
83 } catch (e: Throwable) { 42 } catch (e: Throwable) {
@@ -265,6 +224,19 @@ class RoomMockE2ETest { @@ -265,6 +224,19 @@ class RoomMockE2ETest {
265 } 224 }
266 225
267 @Test 226 @Test
  227 + fun onConnectionAvailableWillReconnect() {
  228 + connect()
  229 + val eventCollector = EventCollector(room.events, coroutineRule.scope)
  230 + val network = Mockito.mock(Network::class.java)
  231 + room.onLost(network)
  232 + room.onAvailable(network)
  233 + val events = eventCollector.stopCollecting()
  234 +
  235 + Assert.assertEquals(1, events.size)
  236 + Assert.assertEquals(true, events[0] is RoomEvent.Reconnecting)
  237 + }
  238 +
  239 + @Test
268 fun leave() { 240 fun leave() {
269 connect() 241 connect()
270 val eventCollector = EventCollector(room.events, coroutineRule.scope) 242 val eventCollector = EventCollector(room.events, coroutineRule.scope)
@@ -75,7 +75,7 @@ class RoomTest { @@ -75,7 +75,7 @@ class RoomTest {
75 } 75 }
76 val job = coroutineRule.scope.launch { 76 val job = coroutineRule.scope.launch {
77 room.connect( 77 room.connect(
78 - url = "http://www.example.com", 78 + url = SignalClientTest.EXAMPLE_URL,
79 token = "", 79 token = "",
80 ) 80 )
81 } 81 }
@@ -93,15 +93,10 @@ class RoomTest { @@ -93,15 +93,10 @@ class RoomTest {
93 fun onConnectionAvailableWillReconnect() { 93 fun onConnectionAvailableWillReconnect() {
94 connect() 94 connect()
95 95
96 - val eventCollector = EventCollector(room.events, coroutineRule.scope)  
97 val network = Mockito.mock(Network::class.java) 96 val network = Mockito.mock(Network::class.java)
98 room.onLost(network) 97 room.onLost(network)
99 room.onAvailable(network) 98 room.onAvailable(network)
100 -  
101 - val events = eventCollector.stopCollecting()  
102 -  
103 - Assert.assertEquals(1, events.size)  
104 - Assert.assertEquals(true, events[0] is RoomEvent.Reconnecting) 99 + Mockito.verify(rtcEngine).reconnect()
105 } 100 }
106 101
107 @Test 102 @Test
1 package io.livekit.android.room 1 package io.livekit.android.room
2 2
3 import com.google.protobuf.util.JsonFormat 3 import com.google.protobuf.util.JsonFormat
4 -import io.livekit.android.mock.MockWebsocketFactory 4 +import io.livekit.android.mock.MockWebSocketFactory
5 import io.livekit.android.mock.TestData 5 import io.livekit.android.mock.TestData
6 import io.livekit.android.util.toOkioByteString 6 import io.livekit.android.util.toOkioByteString
7 import kotlinx.coroutines.ExperimentalCoroutinesApi 7 import kotlinx.coroutines.ExperimentalCoroutinesApi
@@ -12,22 +12,20 @@ import kotlinx.coroutines.test.runBlockingTest @@ -12,22 +12,20 @@ import kotlinx.coroutines.test.runBlockingTest
12 import kotlinx.serialization.json.Json 12 import kotlinx.serialization.json.Json
13 import livekit.LivekitModels 13 import livekit.LivekitModels
14 import livekit.LivekitRtc 14 import livekit.LivekitRtc
15 -import okhttp3.OkHttpClient  
16 -import okhttp3.Protocol  
17 -import okhttp3.Request  
18 -import okhttp3.Response 15 +import okhttp3.*
19 import org.junit.After 16 import org.junit.After
20 import org.junit.Assert 17 import org.junit.Assert
21 import org.junit.Before 18 import org.junit.Before
22 import org.junit.Test 19 import org.junit.Test
23 import org.mockito.Mockito 20 import org.mockito.Mockito
  21 +import org.mockito.kotlin.any
24 import org.mockito.kotlin.argThat 22 import org.mockito.kotlin.argThat
25 import org.webrtc.SessionDescription 23 import org.webrtc.SessionDescription
26 24
27 @ExperimentalCoroutinesApi 25 @ExperimentalCoroutinesApi
28 class SignalClientTest { 26 class SignalClientTest {
29 27
30 - lateinit var wsFactory: MockWebsocketFactory 28 + lateinit var wsFactory: MockWebSocketFactory
31 lateinit var client: SignalClient 29 lateinit var client: SignalClient
32 lateinit var listener: SignalClient.Listener 30 lateinit var listener: SignalClient.Listener
33 lateinit var okHttpClient: OkHttpClient 31 lateinit var okHttpClient: OkHttpClient
@@ -39,7 +37,7 @@ class SignalClientTest { @@ -39,7 +37,7 @@ class SignalClientTest {
39 fun setup() { 37 fun setup() {
40 coroutineDispatcher = TestCoroutineDispatcher() 38 coroutineDispatcher = TestCoroutineDispatcher()
41 coroutineScope = TestCoroutineScope(coroutineDispatcher) 39 coroutineScope = TestCoroutineScope(coroutineDispatcher)
42 - wsFactory = MockWebsocketFactory() 40 + wsFactory = MockWebSocketFactory()
43 okHttpClient = Mockito.mock(OkHttpClient::class.java) 41 okHttpClient = Mockito.mock(OkHttpClient::class.java)
44 client = SignalClient( 42 client = SignalClient(
45 wsFactory, 43 wsFactory,
@@ -68,14 +66,21 @@ class SignalClientTest { @@ -68,14 +66,21 @@ class SignalClientTest {
68 .build() 66 .build()
69 } 67 }
70 68
  69 + /**
  70 + * Supply the needed websocket messages to finish a join call.
  71 + */
  72 + private fun connectWebsocketAndJoin() {
  73 + client.onOpen(wsFactory.ws, createOpenResponse(wsFactory.request))
  74 + client.onMessage(wsFactory.ws, JOIN.toOkioByteString())
  75 + }
  76 +
71 @Test 77 @Test
72 fun joinAndResponse() { 78 fun joinAndResponse() {
73 val job = coroutineScope.async { 79 val job = coroutineScope.async {
74 client.join(EXAMPLE_URL, "") 80 client.join(EXAMPLE_URL, "")
75 } 81 }
76 82
77 - client.onOpen(wsFactory.ws, createOpenResponse(wsFactory.request))  
78 - client.onMessage(wsFactory.ws, JOIN.toOkioByteString()) 83 + connectWebsocketAndJoin()
79 84
80 runBlockingTest { 85 runBlockingTest {
81 val response = job.await() 86 val response = job.await()
@@ -97,15 +102,29 @@ class SignalClientTest { @@ -97,15 +102,29 @@ class SignalClientTest {
97 } 102 }
98 103
99 @Test 104 @Test
100 - fun listenerNotCalledUntilOnReady() {  
101 - val listener = Mockito.mock(SignalClient.Listener::class.java)  
102 - client.listener = listener 105 + fun joinFailure() {
  106 + var failed = false
  107 + val job = coroutineScope.async {
  108 + try {
  109 + client.join(EXAMPLE_URL, "")
  110 + } catch (e: Exception) {
  111 + failed = true
  112 + }
  113 + }
  114 +
  115 + client.onFailure(wsFactory.ws, Exception(), null)
  116 + runBlockingTest { job.await() }
103 117
  118 + Assert.assertTrue(failed)
  119 + }
  120 +
  121 + @Test
  122 + fun listenerNotCalledUntilOnReady() {
104 val job = coroutineScope.async { 123 val job = coroutineScope.async {
105 client.join(EXAMPLE_URL, "") 124 client.join(EXAMPLE_URL, "")
106 } 125 }
107 - client.onOpen(wsFactory.ws, createOpenResponse(wsFactory.request))  
108 - client.onMessage(wsFactory.ws, JOIN.toOkioByteString()) 126 +
  127 + connectWebsocketAndJoin()
109 client.onMessage(wsFactory.ws, OFFER.toOkioByteString()) 128 client.onMessage(wsFactory.ws, OFFER.toOkioByteString())
110 129
111 runBlockingTest { job.await() } 130 runBlockingTest { job.await() }
@@ -115,14 +134,10 @@ class SignalClientTest { @@ -115,14 +134,10 @@ class SignalClientTest {
115 134
116 @Test 135 @Test
117 fun listenerCalledAfterOnReady() { 136 fun listenerCalledAfterOnReady() {
118 - val listener = Mockito.mock(SignalClient.Listener::class.java)  
119 - client.listener = listener  
120 -  
121 val job = coroutineScope.async { 137 val job = coroutineScope.async {
122 client.join(EXAMPLE_URL, "") 138 client.join(EXAMPLE_URL, "")
123 } 139 }
124 - client.onOpen(wsFactory.ws, createOpenResponse(wsFactory.request))  
125 - client.onMessage(wsFactory.ws, JOIN.toOkioByteString()) 140 + connectWebsocketAndJoin()
126 client.onMessage(wsFactory.ws, OFFER.toOkioByteString()) 141 client.onMessage(wsFactory.ws, OFFER.toOkioByteString())
127 142
128 runBlockingTest { job.await() } 143 runBlockingTest { job.await() }
@@ -131,9 +146,27 @@ class SignalClientTest { @@ -131,9 +146,27 @@ class SignalClientTest {
131 .onOffer(argThat { type == SessionDescription.Type.OFFER && description == OFFER.offer.sdp }) 146 .onOffer(argThat { type == SessionDescription.Type.OFFER && description == OFFER.offer.sdp })
132 } 147 }
133 148
  149 + /**
  150 + * [WebSocketListener.onFailure] does not call through to
  151 + * [WebSocketListener.onClosed]. Ensure that listener is called properly.
  152 + */
  153 + @Test
  154 + fun listenerNotifiedAfterFailure() {
  155 + val job = coroutineScope.async {
  156 + client.join(EXAMPLE_URL, "")
  157 + }
  158 + connectWebsocketAndJoin()
  159 + runBlockingTest { job.await() }
  160 +
  161 + client.onFailure(wsFactory.ws, Exception(), null)
  162 +
  163 + Mockito.verify(listener)
  164 + .onClose(any(), any())
  165 + }
  166 +
134 // mock data 167 // mock data
135 companion object { 168 companion object {
136 - private const val EXAMPLE_URL = "http://www.example.com" 169 + const val EXAMPLE_URL = "ws://www.example.com"
137 170
138 val JOIN = with(LivekitRtc.SignalResponse.newBuilder()) { 171 val JOIN = with(LivekitRtc.SignalResponse.newBuilder()) {
139 join = with(joinBuilder) { 172 join = with(joinBuilder) {
@@ -143,6 +176,8 @@ class SignalClientTest { @@ -143,6 +176,8 @@ class SignalClientTest {
143 build() 176 build()
144 } 177 }
145 participant = TestData.LOCAL_PARTICIPANT 178 participant = TestData.LOCAL_PARTICIPANT
  179 + subscriberPrimary = true
  180 + serverVersion = "0.15.2"
146 build() 181 build()
147 } 182 }
148 build() 183 build()
  1 +package io.livekit.android.util
  2 +
  3 +import com.google.protobuf.ByteString
  4 +import okio.ByteString.Companion.toByteString
  5 +
  6 +fun com.google.protobuf.ByteString.toOkioByteString() = toByteArray().toByteString()
  7 +
  8 +fun okio.ByteString.toPBByteString() = ByteString.copyFrom(toByteArray())
  1 +package io.livekit.android.util
  2 +
  3 +import android.util.Log
  4 +import io.livekit.android.LiveKit
  5 +import org.junit.rules.TestRule
  6 +import org.junit.runner.Description
  7 +import org.junit.runners.model.Statement
  8 +import timber.log.Timber
  9 +
  10 +class LoggingRule : TestRule {
  11 +
  12 + val logTree = object : Timber.Tree() {
  13 + override fun log(priority: Int, tag: String?, message: String, t: Throwable?) {
  14 + val priorityChar = when (priority) {
  15 + Log.VERBOSE -> "v"
  16 + Log.DEBUG -> "d"
  17 + Log.INFO -> "i"
  18 + Log.WARN -> "w"
  19 + Log.ERROR -> "e"
  20 + Log.ASSERT -> "a"
  21 + else -> "?"
  22 + }
  23 +
  24 + println("$priorityChar: $tag: $message")
  25 + if (t != null) {
  26 + println(t.toString())
  27 + }
  28 + }
  29 +
  30 + }
  31 +
  32 + override fun apply(base: Statement, description: Description?) = object : Statement() {
  33 + override fun evaluate() {
  34 + val oldLoggingLevel = LiveKit.loggingLevel
  35 + LiveKit.loggingLevel = LoggingLevel.VERBOSE
  36 + Timber.plant(logTree)
  37 + base.evaluate()
  38 + Timber.uproot(logTree)
  39 + LiveKit.loggingLevel = oldLoggingLevel
  40 + }
  41 + }
  42 +}
@@ -2,6 +2,8 @@ package org.webrtc @@ -2,6 +2,8 @@ package org.webrtc
2 2
3 object NativeLibraryLoaderTestHelper { 3 object NativeLibraryLoaderTestHelper {
4 fun initialize() { 4 fun initialize() {
  5 + if (!NativeLibrary.isLoaded()) {
5 NativeLibrary.initialize({ true }, "") 6 NativeLibrary.initialize({ true }, "")
6 } 7 }
  8 + }
7 } 9 }
  1 +package org.webrtc
  2 +
  3 +import android.content.Context
  4 +import org.mockito.Mockito
  5 +
  6 +object WebRTCInitializer {
  7 + fun initialize(context: Context = Mockito.mock(Context::class.java)) {
  8 + try {
  9 + ContextUtils.initialize(context)
  10 + NativeLibraryLoaderTestHelper.initialize()
  11 + } catch (e: Throwable) {
  12 + // do nothing. this is expected.
  13 + }
  14 + }
  15 +}
1 -Subproject commit 2c4c8d7764edf02818d1af686acc89165a5128bc 1 +Subproject commit a4208afda1fd87c5e57efa14242c1fa94e34ec07
@@ -40,6 +40,7 @@ dependencies { @@ -40,6 +40,7 @@ dependencies {
40 api "androidx.lifecycle:lifecycle-runtime-ktx:${versions.androidx_lifecycle}" 40 api "androidx.lifecycle:lifecycle-runtime-ktx:${versions.androidx_lifecycle}"
41 api "androidx.lifecycle:lifecycle-viewmodel-ktx:${versions.androidx_lifecycle}" 41 api "androidx.lifecycle:lifecycle-viewmodel-ktx:${versions.androidx_lifecycle}"
42 api "androidx.lifecycle:lifecycle-common-java8:${versions.androidx_lifecycle}" 42 api "androidx.lifecycle:lifecycle-common-java8:${versions.androidx_lifecycle}"
  43 + api "com.google.protobuf:protobuf-java:${versions.protobuf}"
43 api project(":livekit-android-sdk") 44 api project(":livekit-android-sdk")
44 testImplementation 'junit:junit:4.+' 45 testImplementation 'junit:junit:4.+'
45 androidTestImplementation 'androidx.test.ext:junit:1.1.3' 46 androidTestImplementation 'androidx.test.ext:junit:1.1.3'
@@ -17,6 +17,7 @@ import io.livekit.android.util.flow @@ -17,6 +17,7 @@ import io.livekit.android.util.flow
17 import kotlinx.coroutines.ExperimentalCoroutinesApi 17 import kotlinx.coroutines.ExperimentalCoroutinesApi
18 import kotlinx.coroutines.flow.* 18 import kotlinx.coroutines.flow.*
19 import kotlinx.coroutines.launch 19 import kotlinx.coroutines.launch
  20 +import livekit.LivekitRtc
20 21
21 @OptIn(ExperimentalCoroutinesApi::class) 22 @OptIn(ExperimentalCoroutinesApi::class)
22 class CallViewModel( 23 class CallViewModel(
@@ -246,6 +247,14 @@ class CallViewModel( @@ -246,6 +247,14 @@ class CallViewModel(
246 mutablePermissionAllowed.value = !mutablePermissionAllowed.value 247 mutablePermissionAllowed.value = !mutablePermissionAllowed.value
247 room.value?.localParticipant?.setTrackSubscriptionPermissions(mutablePermissionAllowed.value) 248 room.value?.localParticipant?.setTrackSubscriptionPermissions(mutablePermissionAllowed.value)
248 } 249 }
  250 +
  251 + fun simulateMigration(){
  252 + room.value?.sendSimulateScenario(
  253 + LivekitRtc.SimulateScenario.newBuilder()
  254 + .setMigration(true)
  255 + .build()
  256 + )
  257 + }
249 } 258 }
250 259
251 private fun <T> LiveData<T>.hide(): LiveData<T> = this 260 private fun <T> LiveData<T>.hide(): LiveData<T> = this
  1 +<!-- drawable/dots_horizontal_circle_outline.xml -->
  2 +<vector xmlns:android="http://schemas.android.com/apk/res/android"
  3 + android:height="24dp"
  4 + android:width="24dp"
  5 + android:viewportWidth="24"
  6 + android:viewportHeight="24">
  7 + <path android:fillColor="#000" android:pathData="M12,2A10,10 0 0,1 22,12A10,10 0 0,1 12,22A10,10 0 0,1 2,12A10,10 0 0,1 12,2M12,4A8,8 0 0,0 4,12A8,8 0 0,0 12,20A8,8 0 0,0 20,12A8,8 0 0,0 12,4M12,10.5A1.5,1.5 0 0,1 13.5,12A1.5,1.5 0 0,1 12,13.5A1.5,1.5 0 0,1 10.5,12A1.5,1.5 0 0,1 12,10.5M7.5,10.5A1.5,1.5 0 0,1 9,12A1.5,1.5 0 0,1 7.5,13.5A1.5,1.5 0 0,1 6,12A1.5,1.5 0 0,1 7.5,10.5M16.5,10.5A1.5,1.5 0 0,1 18,12A1.5,1.5 0 0,1 16.5,13.5A1.5,1.5 0 0,1 15,12A1.5,1.5 0 0,1 16.5,10.5Z" />
  8 +</vector>
@@ -27,6 +27,7 @@ import androidx.constraintlayout.compose.Dimension @@ -27,6 +27,7 @@ import androidx.constraintlayout.compose.Dimension
27 import androidx.lifecycle.lifecycleScope 27 import androidx.lifecycle.lifecycleScope
28 import com.github.ajalt.timberkt.Timber 28 import com.github.ajalt.timberkt.Timber
29 import com.google.accompanist.pager.ExperimentalPagerApi 29 import com.google.accompanist.pager.ExperimentalPagerApi
  30 +import io.livekit.android.composesample.ui.DebugMenuDialog
30 import io.livekit.android.composesample.ui.theme.AppTheme 31 import io.livekit.android.composesample.ui.theme.AppTheme
31 import io.livekit.android.room.Room 32 import io.livekit.android.room.Room
32 import io.livekit.android.room.participant.Participant 33 import io.livekit.android.room.participant.Participant
@@ -108,7 +109,8 @@ class CallActivity : AppCompatActivity() { @@ -108,7 +109,8 @@ class CallActivity : AppCompatActivity() {
108 screencastEnabled, 109 screencastEnabled,
109 permissionAllowed = permissionAllowed, 110 permissionAllowed = permissionAllowed,
110 onExitClick = { finish() }, 111 onExitClick = { finish() },
111 - onSendMessage = { viewModel.sendData(it) } 112 + onSendMessage = { viewModel.sendData(it) },
  113 + onSimulateMigration = { viewModel.simulateMigration() },
112 ) 114 )
113 } 115 }
114 } 116 }
@@ -156,6 +158,7 @@ class CallActivity : AppCompatActivity() { @@ -156,6 +158,7 @@ class CallActivity : AppCompatActivity() {
156 error: Throwable? = null, 158 error: Throwable? = null,
157 onSnackbarDismiss: () -> Unit = {}, 159 onSnackbarDismiss: () -> Unit = {},
158 onSendMessage: (String) -> Unit = {}, 160 onSendMessage: (String) -> Unit = {},
  161 + onSimulateMigration: () -> Unit = {},
159 ) { 162 ) {
160 AppTheme(darkTheme = true) { 163 AppTheme(darkTheme = true) {
161 ConstraintLayout( 164 ConstraintLayout(
@@ -229,6 +232,7 @@ class CallActivity : AppCompatActivity() { @@ -229,6 +232,7 @@ class CallActivity : AppCompatActivity() {
229 val controlSize = 40.dp 232 val controlSize = 40.dp
230 val controlPadding = 4.dp 233 val controlPadding = 4.dp
231 Row( 234 Row(
  235 + modifier = Modifier.fillMaxWidth(),
232 horizontalArrangement = Arrangement.SpaceEvenly, 236 horizontalArrangement = Arrangement.SpaceEvenly,
233 verticalAlignment = Alignment.Bottom, 237 verticalAlignment = Alignment.Bottom,
234 ) { 238 ) {
@@ -368,6 +372,7 @@ class CallActivity : AppCompatActivity() { @@ -368,6 +372,7 @@ class CallActivity : AppCompatActivity() {
368 Spacer(modifier = Modifier.height(10.dp)) 372 Spacer(modifier = Modifier.height(10.dp))
369 373
370 Row( 374 Row(
  375 + modifier = Modifier.fillMaxWidth(),
371 horizontalArrangement = Arrangement.SpaceEvenly, 376 horizontalArrangement = Arrangement.SpaceEvenly,
372 verticalAlignment = Alignment.Bottom, 377 verticalAlignment = Alignment.Bottom,
373 ) { 378 ) {
@@ -386,6 +391,28 @@ class CallActivity : AppCompatActivity() { @@ -386,6 +391,28 @@ class CallActivity : AppCompatActivity() {
386 tint = Color.White, 391 tint = Color.White,
387 ) 392 )
388 } 393 }
  394 +
  395 + var showDebugDialog by remember { mutableStateOf(false) }
  396 + Surface(
  397 + onClick = { showDebugDialog = true },
  398 + indication = rememberRipple(false),
  399 + modifier = Modifier
  400 + .size(controlSize)
  401 + .padding(controlPadding)
  402 + ) {
  403 + val resource = R.drawable.dots_horizontal_circle_outline
  404 + Icon(
  405 + painterResource(id = resource),
  406 + contentDescription = "Permissions",
  407 + tint = Color.White,
  408 + )
  409 + }
  410 + if (showDebugDialog) {
  411 + DebugMenuDialog(
  412 + onDismissRequest = { showDebugDialog = false },
  413 + simulateMigration = { onSimulateMigration() }
  414 + )
  415 + }
389 } 416 }
390 } 417 }
391 418
@@ -4,7 +4,6 @@ import android.Manifest @@ -4,7 +4,6 @@ import android.Manifest
4 import android.content.Intent 4 import android.content.Intent
5 import android.content.pm.PackageManager 5 import android.content.pm.PackageManager
6 import android.os.Bundle 6 import android.os.Bundle
7 -import android.widget.Space  
8 import android.widget.Toast 7 import android.widget.Toast
9 import androidx.activity.ComponentActivity 8 import androidx.activity.ComponentActivity
10 import androidx.activity.compose.setContent 9 import androidx.activity.compose.setContent
@@ -175,8 +174,7 @@ class MainActivity : ComponentActivity() { @@ -175,8 +174,7 @@ class MainActivity : ComponentActivity() {
175 const val PREFERENCES_KEY_URL = "url" 174 const val PREFERENCES_KEY_URL = "url"
176 const val PREFERENCES_KEY_TOKEN = "token" 175 const val PREFERENCES_KEY_TOKEN = "token"
177 176
178 - const val URL = "wss://livekit.watercooler.fm"  
179 - const val TOKEN =  
180 - "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE5ODQyMzE0OTgsImlzcyI6IkFQSU1teGlMOHJxdUt6dFpFb1pKVjlGYiIsImp0aSI6ImZvcnRoIiwibmJmIjoxNjI0MjMxNDk4LCJ2aWRlbyI6eyJyb29tIjoibXlyb29tIiwicm9vbUpvaW4iOnRydWV9fQ.PVx_lXAIGxcD2VRslosrbkigc777GXbu-DQME8hjJKI" 177 + const val URL = "wss://www.example.com"
  178 + const val TOKEN = ""
181 } 179 }
182 } 180 }
  1 +package io.livekit.android.composesample.ui
  2 +
  3 +import androidx.compose.foundation.background
  4 +import androidx.compose.foundation.layout.*
  5 +import androidx.compose.foundation.shape.RoundedCornerShape
  6 +import androidx.compose.material.Button
  7 +import androidx.compose.material.Text
  8 +import androidx.compose.runtime.Composable
  9 +import androidx.compose.ui.Alignment
  10 +import androidx.compose.ui.Modifier
  11 +import androidx.compose.ui.graphics.Color
  12 +import androidx.compose.ui.tooling.preview.Preview
  13 +import androidx.compose.ui.unit.dp
  14 +import androidx.compose.ui.window.Dialog
  15 +
  16 +@Preview
  17 +@Composable
  18 +fun DebugMenuDialog(
  19 + onDismissRequest: () -> Unit = {},
  20 + simulateMigration: () -> Unit = {}
  21 +) {
  22 + Dialog(onDismissRequest = onDismissRequest) {
  23 + Column(
  24 + horizontalAlignment = Alignment.CenterHorizontally,
  25 + modifier = Modifier
  26 + .background(Color.DarkGray, shape = RoundedCornerShape(3.dp))
  27 + .fillMaxWidth()
  28 + .padding(10.dp)
  29 + ) {
  30 + Text("Debug Menu", color = Color.White)
  31 + Spacer(modifier = Modifier.height(10.dp))
  32 +
  33 + Button(onClick = {
  34 + simulateMigration()
  35 + onDismissRequest()
  36 + }) {
  37 + Text("Simulate Migration")
  38 + }
  39 + }
  40 + }
  41 +}