davidliu
Committed by GitHub

Implement RegionUrlProvider and Room.prepareConnection (#463)

* implement RegionUrlProvider and Room.prepareConnection

* spotless

* fix test

* ensure signal client uses proper scheme when connecting

* more scheme enforcement
@@ -51,7 +51,8 @@ androidx-lifecycle-process = { module = "androidx.lifecycle:lifecycle-process", @@ -51,7 +51,8 @@ androidx-lifecycle-process = { module = "androidx.lifecycle:lifecycle-process",
51 androidx-lifecycle-viewmodel-ktx = { module = "androidx.lifecycle:lifecycle-viewmodel-ktx", version.ref = "androidx-lifecycle" } 51 androidx-lifecycle-viewmodel-ktx = { module = "androidx.lifecycle:lifecycle-viewmodel-ktx", version.ref = "androidx-lifecycle" }
52 kotlinx-serialization-json = { module = "org.jetbrains.kotlinx:kotlinx-serialization-json", version.ref = "kotlinx-serialization" } 52 kotlinx-serialization-json = { module = "org.jetbrains.kotlinx:kotlinx-serialization-json", version.ref = "kotlinx-serialization" }
53 leakcanary-android = { module = "com.squareup.leakcanary:leakcanary-android", version.ref = "leakcanaryAndroid" } 53 leakcanary-android = { module = "com.squareup.leakcanary:leakcanary-android", version.ref = "leakcanaryAndroid" }
54 -okhttp = { module = "com.squareup.okhttp3:okhttp", version.ref = "okhttp" } 54 +okhttp-lib = { module = "com.squareup.okhttp3:okhttp", version.ref = "okhttp" }
  55 +okhttp-coroutines = { module = "com.squareup.okhttp3:okhttp", version.ref = "okhttp" }
55 protobuf-javalite = { module = "com.google.protobuf:protobuf-javalite", version.ref = "protobufJavalite" } 56 protobuf-javalite = { module = "com.google.protobuf:protobuf-javalite", version.ref = "protobufJavalite" }
56 semver4j = { module = "com.vdurmont:semver4j", version.ref = "semver4j" } 57 semver4j = { module = "com.vdurmont:semver4j", version.ref = "semver4j" }
57 webrtc = { module = "io.github.webrtc-sdk:android-prefixed", version.ref = "webrtc" } 58 webrtc = { module = "io.github.webrtc-sdk:android-prefixed", version.ref = "webrtc" }
@@ -79,6 +80,7 @@ espresso = { module = "androidx.test.espresso:espresso-core", version = "3.5.1" @@ -79,6 +80,7 @@ espresso = { module = "androidx.test.espresso:espresso-core", version = "3.5.1"
79 junit = { module = "junit:junit", version.ref = "junit-lib" } 80 junit = { module = "junit:junit", version.ref = "junit-lib" }
80 junitJupiterApi = { module = "org.junit.jupiter:junit-jupiter-api", version.ref = "junit-jupiter" } 81 junitJupiterApi = { module = "org.junit.jupiter:junit-jupiter-api", version.ref = "junit-jupiter" }
81 junitJupiterEngine = { module = "org.junit.jupiter:junit-jupiter-engine", version.ref = "junit-jupiter" } 82 junitJupiterEngine = { module = "org.junit.jupiter:junit-jupiter-engine", version.ref = "junit-jupiter" }
  83 +okhttp-mockwebserver = { module = "com.squareup.okhttp3:mockwebserver", version.ref = "okhttp" }
82 84
83 # Mockito 5 requires JVM 11. 85 # Mockito 5 requires JVM 11.
84 #noinspection GradleDependency 86 #noinspection GradleDependency
@@ -144,7 +144,8 @@ dependencies { @@ -144,7 +144,8 @@ dependencies {
144 implementation libs.coroutines.lib 144 implementation libs.coroutines.lib
145 implementation libs.kotlinx.serialization.json 145 implementation libs.kotlinx.serialization.json
146 api libs.webrtc 146 api libs.webrtc
147 - api libs.okhttp 147 + api libs.okhttp.lib
  148 + implementation libs.okhttp.coroutines
148 api libs.audioswitch 149 api libs.audioswitch
149 implementation libs.androidx.annotation 150 implementation libs.androidx.annotation
150 implementation libs.androidx.core 151 implementation libs.androidx.core
@@ -27,6 +27,8 @@ import io.livekit.android.room.network.NetworkCallbackManagerFactory @@ -27,6 +27,8 @@ import io.livekit.android.room.network.NetworkCallbackManagerFactory
27 import io.livekit.android.room.network.NetworkCallbackManagerImpl 27 import io.livekit.android.room.network.NetworkCallbackManagerImpl
28 import io.livekit.android.room.network.NetworkCallbackRegistry 28 import io.livekit.android.room.network.NetworkCallbackRegistry
29 import io.livekit.android.room.network.NetworkCallbackRegistryImpl 29 import io.livekit.android.room.network.NetworkCallbackRegistryImpl
  30 +import io.livekit.android.room.util.ConnectionWarmer
  31 +import io.livekit.android.room.util.OkHttpConnectionWarmer
30 import io.livekit.android.stats.AndroidNetworkInfo 32 import io.livekit.android.stats.AndroidNetworkInfo
31 import io.livekit.android.stats.NetworkInfo 33 import io.livekit.android.stats.NetworkInfo
32 import okhttp3.OkHttpClient 34 import okhttp3.OkHttpClient
@@ -47,6 +49,9 @@ internal object WebModule { @@ -47,6 +49,9 @@ internal object WebModule {
47 } 49 }
48 50
49 @Provides 51 @Provides
  52 + fun connectionWarmer(okHttpConnectionWarmer: OkHttpConnectionWarmer): ConnectionWarmer = okHttpConnectionWarmer
  53 +
  54 + @Provides
50 fun websocketFactory(okHttpClient: OkHttpClient): WebSocket.Factory { 55 fun websocketFactory(okHttpClient: OkHttpClient): WebSocket.Factory {
51 return okHttpClient 56 return okHttpClient
52 } 57 }
@@ -135,6 +135,8 @@ internal constructor( @@ -135,6 +135,8 @@ internal constructor(
135 135
136 private val pendingTrackResolvers: MutableMap<String, Continuation<LivekitModels.TrackInfo>> = 136 private val pendingTrackResolvers: MutableMap<String, Continuation<LivekitModels.TrackInfo>> =
137 mutableMapOf() 137 mutableMapOf()
  138 +
  139 + internal var regionUrlProvider: RegionUrlProvider? = null
138 private var sessionUrl: String? = null 140 private var sessionUrl: String? = null
139 private var sessionToken: String? = null 141 private var sessionToken: String? = null
140 private var connectOptions: ConnectOptions? = null 142 private var connectOptions: ConnectOptions? = null
@@ -368,6 +370,7 @@ internal constructor( @@ -368,6 +370,7 @@ internal constructor(
368 connectOptions = null 370 connectOptions = null
369 lastRoomOptions = null 371 lastRoomOptions = null
370 participantSid = null 372 participantSid = null
  373 + regionUrlProvider = null
371 closeResources(reason) 374 closeResources(reason)
372 connectionState = ConnectionState.DISCONNECTED 375 connectionState = ConnectionState.DISCONNECTED
373 } 376 }
@@ -418,7 +421,7 @@ internal constructor( @@ -418,7 +421,7 @@ internal constructor(
418 LKLog.d { "Skip reconnection - engine is closed" } 421 LKLog.d { "Skip reconnection - engine is closed" }
419 return 422 return
420 } 423 }
421 - val url = sessionUrl 424 + var url = sessionUrl
422 val token = sessionToken 425 val token = sessionToken
423 if (url == null || token == null) { 426 if (url == null || token == null) {
424 LKLog.w { "couldn't reconnect, no url or no token" } 427 LKLog.w { "couldn't reconnect, no url or no token" }
@@ -432,6 +435,15 @@ internal constructor( @@ -432,6 +435,15 @@ internal constructor(
432 435
433 val reconnectStartTime = SystemClock.elapsedRealtime() 436 val reconnectStartTime = SystemClock.elapsedRealtime()
434 for (retries in 0 until MAX_RECONNECT_RETRIES) { 437 for (retries in 0 until MAX_RECONNECT_RETRIES) {
  438 + // First try use previously valid url.
  439 + if (retries != 0) {
  440 + try {
  441 + url = regionUrlProvider?.getNextBestRegionUrl() ?: url
  442 + } catch (e: Exception) {
  443 + LKLog.d(e) { "Exception while getting next best region url while reconnecting." }
  444 + }
  445 + }
  446 +
435 ensureActive() 447 ensureActive()
436 if (retries != 0) { 448 if (retries != 0) {
437 yield() 449 yield()
@@ -469,7 +481,7 @@ internal constructor( @@ -469,7 +481,7 @@ internal constructor(
469 try { 481 try {
470 closeResources("Full Reconnecting") 482 closeResources("Full Reconnecting")
471 listener?.onFullReconnecting() 483 listener?.onFullReconnecting()
472 - joinImpl(url, token, connectOptions, lastRoomOptions ?: RoomOptions()) 484 + joinImpl(url!!, token, connectOptions, lastRoomOptions ?: RoomOptions())
473 } catch (e: Exception) { 485 } catch (e: Exception) {
474 LKLog.w(e) { "Error during reconnection." } 486 LKLog.w(e) { "Error during reconnection." }
475 // reconnect failed, retry. 487 // reconnect failed, retry.
@@ -484,7 +496,7 @@ internal constructor( @@ -484,7 +496,7 @@ internal constructor(
484 LKLog.v { "Attempting soft reconnect." } 496 LKLog.v { "Attempting soft reconnect." }
485 subscriber?.prepareForIceRestart() 497 subscriber?.prepareForIceRestart()
486 try { 498 try {
487 - val response = client.reconnect(url, token, participantSid) 499 + val response = client.reconnect(url!!, token, participantSid)
488 if (response is Either.Left) { 500 if (response is Either.Left) {
489 val reconnectResponse = response.value 501 val reconnectResponse = response.value
490 val rtcConfig = makeRTCConfig(Either.Right(reconnectResponse), connectOptions) 502 val rtcConfig = makeRTCConfig(Either.Right(reconnectResponse), connectOptions)
@@ -514,7 +526,7 @@ internal constructor( @@ -514,7 +526,7 @@ internal constructor(
514 break 526 break
515 } 527 }
516 528
517 - // wait until ICE connected 529 + // wait until publisher ICE connected
518 val endTime = SystemClock.elapsedRealtime() + MAX_ICE_CONNECT_TIMEOUT_MS 530 val endTime = SystemClock.elapsedRealtime() + MAX_ICE_CONNECT_TIMEOUT_MS
519 if (hasPublished) { 531 if (hasPublished) {
520 while (SystemClock.elapsedRealtime() < endTime) { 532 while (SystemClock.elapsedRealtime() < endTime) {
@@ -532,6 +544,7 @@ internal constructor( @@ -532,6 +544,7 @@ internal constructor(
532 break 544 break
533 } 545 }
534 546
  547 + // wait until subscriber ICE connected
535 while (SystemClock.elapsedRealtime() < endTime) { 548 while (SystemClock.elapsedRealtime() < endTime) {
536 if (subscriber?.isConnected() == true) { 549 if (subscriber?.isConnected() == true) {
537 LKLog.v { "reconnected to ICE" } 550 LKLog.v { "reconnected to ICE" }
@@ -546,14 +559,18 @@ internal constructor( @@ -546,14 +559,18 @@ internal constructor(
546 LKLog.v { "RTCEngine closed, aborting reconnection" } 559 LKLog.v { "RTCEngine closed, aborting reconnection" }
547 break 560 break
548 } 561 }
  562 +
549 if (connectionState == ConnectionState.CONNECTED && 563 if (connectionState == ConnectionState.CONNECTED &&
550 (!hasPublished || publisher?.isConnected() == true) 564 (!hasPublished || publisher?.isConnected() == true)
551 ) { 565 ) {
  566 + // Is connected, notify and return.
  567 + regionUrlProvider?.clearAttemptedRegions()
552 client.onPCConnected() 568 client.onPCConnected()
553 listener?.onPostReconnect(isFullReconnect) 569 listener?.onPostReconnect(isFullReconnect)
554 return@launch 570 return@launch
555 } 571 }
556 572
  573 + // Didn't manage to reconnect, check if should continue to next attempt.
557 val curReconnectTime = SystemClock.elapsedRealtime() - reconnectStartTime 574 val curReconnectTime = SystemClock.elapsedRealtime() - reconnectStartTime
558 if (curReconnectTime > MAX_RECONNECT_TIMEOUT) { 575 if (curReconnectTime > MAX_RECONNECT_TIMEOUT) {
559 break 576 break
@@ -954,6 +971,7 @@ internal constructor( @@ -954,6 +971,7 @@ internal constructor(
954 971
955 override fun onRefreshToken(token: String) { 972 override fun onRefreshToken(token: String) {
956 sessionToken = token 973 sessionToken = token
  974 + regionUrlProvider?.token = token
957 } 975 }
958 976
959 override fun onLocalTrackUnpublished(trackUnpublished: LivekitRtc.TrackUnpublishedResponse) { 977 override fun onLocalTrackUnpublished(trackUnpublished: LivekitRtc.TrackUnpublishedResponse) {
  1 +/*
  2 + * Copyright 2024 LiveKit, Inc.
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +
  17 +package io.livekit.android.room
  18 +
  19 +import android.os.SystemClock
  20 +import androidx.annotation.VisibleForTesting
  21 +import dagger.assisted.Assisted
  22 +import dagger.assisted.AssistedFactory
  23 +import dagger.assisted.AssistedInject
  24 +import io.livekit.android.util.LKLog
  25 +import io.livekit.android.util.executeAsync
  26 +import kotlinx.coroutines.ExperimentalCoroutinesApi
  27 +import kotlinx.coroutines.coroutineScope
  28 +import kotlinx.serialization.Serializable
  29 +import kotlinx.serialization.decodeFromString
  30 +import kotlinx.serialization.json.Json
  31 +import okhttp3.OkHttpClient
  32 +import okhttp3.Request
  33 +import java.net.URI
  34 +
  35 +/**
  36 + * @suppress
  37 + */
  38 +class RegionUrlProvider
  39 +@AssistedInject
  40 +constructor(
  41 + @Assisted val serverUrl: URI,
  42 + @Assisted var token: String,
  43 + private val okHttpClient: OkHttpClient,
  44 + private val json: Json,
  45 +) {
  46 + private var regionSettings: RegionSettings? = null
  47 + private var lastUpdateAt: Long = 0L
  48 + private var settingsCacheTimeMs = 30000
  49 + private var attemptedRegions = mutableSetOf<RegionInfo>()
  50 +
  51 + fun isLKCloud() = serverUrl.isLKCloud()
  52 +
  53 + @Throws(RoomException.ConnectException::class)
  54 + suspend fun getNextBestRegionUrl() = coroutineScope {
  55 + if (!isLKCloud()) {
  56 + throw IllegalStateException("Region availability is only supported for LiveKit Cloud domains")
  57 + }
  58 + if (regionSettings == null || SystemClock.elapsedRealtime() - lastUpdateAt > settingsCacheTimeMs) {
  59 + fetchRegionSettings()
  60 + }
  61 + val regions = regionSettings?.regions ?: return@coroutineScope null
  62 + val regionsLeft = regions.filter { region ->
  63 + !attemptedRegions.any { attempted -> attempted.url == region.url }
  64 + }
  65 +
  66 + if (regionsLeft.isEmpty()) {
  67 + return@coroutineScope null
  68 + }
  69 +
  70 + val nextRegion = regionsLeft.first()
  71 + attemptedRegions.add(nextRegion)
  72 + LKLog.d { "next region: $nextRegion" }
  73 + return@coroutineScope nextRegion.url
  74 + }
  75 +
  76 + @Throws(RoomException.ConnectException::class)
  77 + @OptIn(ExperimentalCoroutinesApi::class)
  78 + suspend fun fetchRegionSettings(): RegionSettings? {
  79 + val request = Request.Builder()
  80 + .url(serverUrl.getCloudConfigUrl("/regions").toString())
  81 + .header("Authorization", "Bearer $token")
  82 + .build()
  83 + val bodyString = okHttpClient.newCall(request)
  84 + .executeAsync()
  85 + .use { response ->
  86 + if (!response.isSuccessful) {
  87 + throw RoomException.ConnectException("Could not fetch region settings: ${response.code} ${response.message}")
  88 + }
  89 + return@use response.body?.string() ?: return null
  90 + }
  91 +
  92 + LKLog.e { bodyString }
  93 + return json.decodeFromString<RegionSettings>(bodyString).also {
  94 + regionSettings = it
  95 + lastUpdateAt = SystemClock.elapsedRealtime()
  96 + }
  97 + }
  98 +
  99 + fun clearAttemptedRegions() {
  100 + attemptedRegions.clear()
  101 + }
  102 +
  103 + @AssistedFactory
  104 + interface Factory {
  105 + fun create(serverUrl: URI, token: String): RegionUrlProvider
  106 + }
  107 +}
  108 +
  109 +internal fun URI.isLKCloud() = regionUrlProviderTesting || host.endsWith(".livekit.cloud") || host.endsWith(".livekit.run")
  110 +
  111 +internal fun URI.getCloudConfigUrl(appendPath: String = ""): URI {
  112 + val scheme = if (this.scheme.startsWith("ws")) {
  113 + this.scheme.replaceFirst("ws", "http")
  114 + } else {
  115 + this.scheme
  116 + }
  117 + return URI(
  118 + scheme,
  119 + null,
  120 + this.host,
  121 + this.port,
  122 + "/settings$appendPath",
  123 + null,
  124 + null,
  125 + )
  126 +}
  127 +
  128 +private var regionUrlProviderTesting = false
  129 +
  130 +@VisibleForTesting
  131 +fun setRegionUrlProviderTesting(enable: Boolean) {
  132 + regionUrlProviderTesting = enable
  133 +}
  134 +
  135 +/**
  136 + * @suppress
  137 + */
  138 +@Serializable
  139 +data class RegionSettings(val regions: List<RegionInfo>)
  140 +
  141 +/**
  142 + * @suppress
  143 + */
  144 +@Serializable
  145 +data class RegionInfo(val region: String, val url: String, val distance: Long)
@@ -43,6 +43,7 @@ import io.livekit.android.room.participant.* @@ -43,6 +43,7 @@ import io.livekit.android.room.participant.*
43 import io.livekit.android.room.provisions.LKObjects 43 import io.livekit.android.room.provisions.LKObjects
44 import io.livekit.android.room.track.* 44 import io.livekit.android.room.track.*
45 import io.livekit.android.room.types.toSDKType 45 import io.livekit.android.room.types.toSDKType
  46 +import io.livekit.android.room.util.ConnectionWarmer
46 import io.livekit.android.util.FlowObservable 47 import io.livekit.android.util.FlowObservable
47 import io.livekit.android.util.LKLog 48 import io.livekit.android.util.LKLog
48 import io.livekit.android.util.flow 49 import io.livekit.android.util.flow
@@ -59,6 +60,7 @@ import livekit.LivekitModels @@ -59,6 +60,7 @@ import livekit.LivekitModels
59 import livekit.LivekitRtc 60 import livekit.LivekitRtc
60 import livekit.org.webrtc.* 61 import livekit.org.webrtc.*
61 import livekit.org.webrtc.audio.AudioDeviceModule 62 import livekit.org.webrtc.audio.AudioDeviceModule
  63 +import java.net.URI
62 import javax.inject.Named 64 import javax.inject.Named
63 65
64 class Room 66 class Room
@@ -84,6 +86,8 @@ constructor( @@ -84,6 +86,8 @@ constructor(
84 val lkObjects: LKObjects, 86 val lkObjects: LKObjects,
85 networkCallbackManagerFactory: NetworkCallbackManagerFactory, 87 networkCallbackManagerFactory: NetworkCallbackManagerFactory,
86 private val audioDeviceModule: AudioDeviceModule, 88 private val audioDeviceModule: AudioDeviceModule,
  89 + private val regionUrlProviderFactory: RegionUrlProvider.Factory,
  90 + private val connectionWarmer: ConnectionWarmer,
87 ) : RTCEngine.Listener, ParticipantListener { 91 ) : RTCEngine.Listener, ParticipantListener {
88 92
89 private lateinit var coroutineScope: CoroutineScope 93 private lateinit var coroutineScope: CoroutineScope
@@ -263,6 +267,9 @@ constructor( @@ -263,6 +267,9 @@ constructor(
263 267
264 private var stateLock = Mutex() 268 private var stateLock = Mutex()
265 269
  270 + private var regionUrlProvider: RegionUrlProvider? = null
  271 + private var regionUrl: String? = null
  272 +
266 private fun getCurrentRoomOptions(): RoomOptions = 273 private fun getCurrentRoomOptions(): RoomOptions =
267 RoomOptions( 274 RoomOptions(
268 adaptiveStream = adaptiveStream, 275 adaptiveStream = adaptiveStream,
@@ -275,6 +282,44 @@ constructor( @@ -275,6 +282,44 @@ constructor(
275 ) 282 )
276 283
277 /** 284 /**
  285 + * prepareConnection should be called as soon as the page is loaded, in order
  286 + * to speed up the connection attempt. This function will
  287 + * - perform DNS resolution and pre-warm the DNS cache
  288 + * - establish TLS connection and cache TLS keys
  289 + *
  290 + * With LiveKit Cloud, it will also determine the best edge data center for
  291 + * the current client to connect to if a token is provided.
  292 + */
  293 + suspend fun prepareConnection(url: String, token: String? = null) {
  294 + if (state != State.DISCONNECTED) {
  295 + LKLog.i { "Room is not in disconnected state, ignoring prepareConnection call." }
  296 + return
  297 + }
  298 + LKLog.d { "preparing connection to $url" }
  299 +
  300 + try {
  301 + val urlActual = URI(url)
  302 + if (urlActual.isLKCloud() && token != null) {
  303 + val regionUrlProvider = regionUrlProviderFactory.create(urlActual, token)
  304 + this.regionUrlProvider = regionUrlProvider
  305 +
  306 + val regionUrl = regionUrlProvider.getNextBestRegionUrl()
  307 + // we will not replace the regionUrl if an attempt had already started
  308 + // to avoid overriding regionUrl after a new connection attempt had started
  309 + if (regionUrl != null && state == State.DISCONNECTED) {
  310 + this.regionUrl = regionUrl
  311 + connectionWarmer.fetch(regionUrl)
  312 + LKLog.d { "prepared connection to $regionUrl" }
  313 + }
  314 + } else {
  315 + connectionWarmer.fetch(url)
  316 + }
  317 + } catch (e: Exception) {
  318 + LKLog.e(e) { "Error while preparing connection:" }
  319 + }
  320 + }
  321 +
  322 + /**
278 * Connect to a LiveKit Room. 323 * Connect to a LiveKit Room.
279 * 324 *
280 * @param url 325 * @param url
@@ -309,60 +354,7 @@ constructor( @@ -309,60 +354,7 @@ constructor(
309 354
310 // Setup local participant. 355 // Setup local participant.
311 localParticipant.reinitialize() 356 localParticipant.reinitialize()
312 - coroutineScope.launch {  
313 - localParticipant.events.collect {  
314 - when (it) {  
315 - is ParticipantEvent.TrackPublished -> emitWhenConnected(  
316 - RoomEvent.TrackPublished(  
317 - room = this@Room,  
318 - publication = it.publication,  
319 - participant = it.participant,  
320 - ),  
321 - )  
322 -  
323 - is ParticipantEvent.TrackUnpublished -> emitWhenConnected(  
324 - RoomEvent.TrackUnpublished(  
325 - room = this@Room,  
326 - publication = it.publication,  
327 - participant = it.participant,  
328 - ),  
329 - )  
330 -  
331 - is ParticipantEvent.ParticipantPermissionsChanged -> emitWhenConnected(  
332 - RoomEvent.ParticipantPermissionsChanged(  
333 - room = this@Room,  
334 - participant = it.participant,  
335 - newPermissions = it.newPermissions,  
336 - oldPermissions = it.oldPermissions,  
337 - ),  
338 - )  
339 -  
340 - is ParticipantEvent.MetadataChanged -> {  
341 - emitWhenConnected(  
342 - RoomEvent.ParticipantMetadataChanged(  
343 - this@Room,  
344 - it.participant,  
345 - it.prevMetadata,  
346 - ),  
347 - )  
348 - }  
349 -  
350 - is ParticipantEvent.NameChanged -> {  
351 - emitWhenConnected(  
352 - RoomEvent.ParticipantNameChanged(  
353 - this@Room,  
354 - it.participant,  
355 - it.name,  
356 - ),  
357 - )  
358 - }  
359 -  
360 - else -> {  
361 - // do nothing  
362 - }  
363 - }  
364 - }  
365 - } 357 + setupLocalParticipantEventHandling()
366 358
367 if (roomOptions.e2eeOptions != null) { 359 if (roomOptions.e2eeOptions != null) {
368 e2eeManager = e2EEManagerFactory.create(roomOptions.e2eeOptions.keyProvider).apply { 360 e2eeManager = e2EEManagerFactory.create(roomOptions.e2eeOptions.keyProvider).apply {
@@ -384,7 +376,56 @@ constructor( @@ -384,7 +376,56 @@ constructor(
384 if (audioProcessingController is AuthedAudioProcessingController) { 376 if (audioProcessingController is AuthedAudioProcessingController) {
385 audioProcessingController.authenticate(url, token) 377 audioProcessingController.authenticate(url, token)
386 } 378 }
387 - engine.join(url, token, options, roomOptions) 379 +
  380 + // Don't use URL equals.
  381 + if (regionUrlProvider?.serverUrl.toString() != url) {
  382 + regionUrl = null
  383 + regionUrlProvider = null
  384 + }
  385 +
  386 + val urlObj = URI(url)
  387 + if (urlObj.isLKCloud()) {
  388 + if (regionUrlProvider == null) {
  389 + regionUrlProvider = regionUrlProviderFactory.create(urlObj, token)
  390 + } else {
  391 + regionUrlProvider?.token = token
  392 + }
  393 +
  394 + // trigger the first fetch without waiting for a response
  395 + // if initial connection fails, this will speed up picking regional url
  396 + // on subsequent runs
  397 + launch {
  398 + try {
  399 + regionUrlProvider?.fetchRegionSettings()
  400 + } catch (e: Exception) {
  401 + LKLog.w(e) { "could not fetch region settings" }
  402 + }
  403 + }
  404 + }
  405 +
  406 + var nextUrl: String? = regionUrl ?: url
  407 + regionUrl = null
  408 +
  409 + while (nextUrl != null) {
  410 + val connectUrl = nextUrl
  411 + nextUrl = null
  412 + try {
  413 + engine.regionUrlProvider = regionUrlProvider
  414 + engine.join(connectUrl, token, options, roomOptions)
  415 + } catch (e: Exception) {
  416 + if (e is CancellationException) {
  417 + throw e // rethrow to properly cancel.
  418 + }
  419 +
  420 + nextUrl = regionUrlProvider?.getNextBestRegionUrl()
  421 + if (nextUrl != null) {
  422 + LKLog.d(e) { "Connection to $connectUrl failed, retrying with another region: $nextUrl" }
  423 + } else {
  424 + throw e // rethrow since no more regions to try.
  425 + }
  426 + }
  427 + }
  428 +
388 ensureActive() 429 ensureActive()
389 networkCallbackManager.registerCallback() 430 networkCallbackManager.registerCallback()
390 if (options.audio) { 431 if (options.audio) {
@@ -495,6 +536,63 @@ constructor( @@ -495,6 +536,63 @@ constructor(
495 } 536 }
496 } 537 }
497 538
  539 + private fun setupLocalParticipantEventHandling() {
  540 + coroutineScope.launch {
  541 + localParticipant.events.collect {
  542 + when (it) {
  543 + is ParticipantEvent.TrackPublished -> emitWhenConnected(
  544 + RoomEvent.TrackPublished(
  545 + room = this@Room,
  546 + publication = it.publication,
  547 + participant = it.participant,
  548 + ),
  549 + )
  550 +
  551 + is ParticipantEvent.TrackUnpublished -> emitWhenConnected(
  552 + RoomEvent.TrackUnpublished(
  553 + room = this@Room,
  554 + publication = it.publication,
  555 + participant = it.participant,
  556 + ),
  557 + )
  558 +
  559 + is ParticipantEvent.ParticipantPermissionsChanged -> emitWhenConnected(
  560 + RoomEvent.ParticipantPermissionsChanged(
  561 + room = this@Room,
  562 + participant = it.participant,
  563 + newPermissions = it.newPermissions,
  564 + oldPermissions = it.oldPermissions,
  565 + ),
  566 + )
  567 +
  568 + is ParticipantEvent.MetadataChanged -> {
  569 + emitWhenConnected(
  570 + RoomEvent.ParticipantMetadataChanged(
  571 + this@Room,
  572 + it.participant,
  573 + it.prevMetadata,
  574 + ),
  575 + )
  576 + }
  577 +
  578 + is ParticipantEvent.NameChanged -> {
  579 + emitWhenConnected(
  580 + RoomEvent.ParticipantNameChanged(
  581 + this@Room,
  582 + it.participant,
  583 + it.name,
  584 + ),
  585 + )
  586 + }
  587 +
  588 + else -> {
  589 + // do nothing
  590 + }
  591 + }
  592 + }
  593 + }
  594 + }
  595 +
498 private fun handleParticipantDisconnect(identity: Participant.Identity) { 596 private fun handleParticipantDisconnect(identity: Participant.Identity) {
499 val newParticipants = mutableRemoteParticipants.toMutableMap() 597 val newParticipants = mutableRemoteParticipants.toMutableMap()
500 val removedParticipant = newParticipants.remove(identity) ?: return 598 val removedParticipant = newParticipants.remove(identity) ?: return
@@ -28,6 +28,8 @@ import io.livekit.android.stats.getClientInfo @@ -28,6 +28,8 @@ import io.livekit.android.stats.getClientInfo
28 import io.livekit.android.util.CloseableCoroutineScope 28 import io.livekit.android.util.CloseableCoroutineScope
29 import io.livekit.android.util.Either 29 import io.livekit.android.util.Either
30 import io.livekit.android.util.LKLog 30 import io.livekit.android.util.LKLog
  31 +import io.livekit.android.util.toHttpUrl
  32 +import io.livekit.android.util.toWebsocketUrl
31 import io.livekit.android.webrtc.toProtoSessionDescription 33 import io.livekit.android.webrtc.toProtoSessionDescription
32 import kotlinx.coroutines.CancellableContinuation 34 import kotlinx.coroutines.CancellableContinuation
33 import kotlinx.coroutines.CoroutineDispatcher 35 import kotlinx.coroutines.CoroutineDispatcher
@@ -122,6 +124,7 @@ constructor( @@ -122,6 +124,7 @@ constructor(
122 /** 124 /**
123 * @throws Exception if fails to connect. 125 * @throws Exception if fails to connect.
124 */ 126 */
  127 + @Throws(Exception::class)
125 suspend fun join( 128 suspend fun join(
126 url: String, 129 url: String,
127 token: String, 130 token: String,
@@ -135,6 +138,7 @@ constructor( @@ -135,6 +138,7 @@ constructor(
135 /** 138 /**
136 * @throws Exception if fails to connect. 139 * @throws Exception if fails to connect.
137 */ 140 */
  141 + @Throws(Exception::class)
138 @VisibleForTesting 142 @VisibleForTesting
139 suspend fun reconnect(url: String, token: String, participantSid: String?): Either<ReconnectResponse, Unit> { 143 suspend fun reconnect(url: String, token: String, participantSid: String?): Either<ReconnectResponse, Unit> {
140 val reconnectResponse = connect( 144 val reconnectResponse = connect(
@@ -159,7 +163,7 @@ constructor( @@ -159,7 +163,7 @@ constructor(
159 // Clean up any pre-existing connection. 163 // Clean up any pre-existing connection.
160 close(reason = "Starting new connection", shouldClearQueuedRequests = false) 164 close(reason = "Starting new connection", shouldClearQueuedRequests = false)
161 165
162 - val wsUrlString = "$url/rtc" + createConnectionParams(token, getClientInfo(), options, roomOptions) 166 + val wsUrlString = "${url.toWebsocketUrl()}/rtc" + createConnectionParams(token, getClientInfo(), options, roomOptions)
163 isReconnecting = options.reconnect 167 isReconnecting = options.reconnect
164 168
165 LKLog.i { "connecting to $wsUrlString" } 169 LKLog.i { "connecting to $wsUrlString" }
@@ -305,7 +309,7 @@ constructor( @@ -305,7 +309,7 @@ constructor(
305 var reason: String? = null 309 var reason: String? = null
306 try { 310 try {
307 lastUrl?.let { 311 lastUrl?.let {
308 - val validationUrl = "http" + it.substring(2).replaceFirst("/rtc?", "/rtc/validate?") 312 + val validationUrl = it.toHttpUrl().replaceFirst("/rtc?", "/rtc/validate?")
309 val request = Request.Builder().url(validationUrl).build() 313 val request = Request.Builder().url(validationUrl).build()
310 val resp = okHttpClient.newCall(request).execute() 314 val resp = okHttpClient.newCall(request).execute()
311 val body = resp.body 315 val body = resp.body
  1 +/*
  2 + * Copyright 2024 LiveKit, Inc.
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +
  17 +package io.livekit.android.room.util
  18 +
  19 +import io.livekit.android.util.executeAsync
  20 +import io.livekit.android.util.toHttpUrl
  21 +import kotlinx.coroutines.ExperimentalCoroutinesApi
  22 +import okhttp3.OkHttpClient
  23 +import okhttp3.Request
  24 +import okhttp3.Response
  25 +import javax.inject.Inject
  26 +
  27 +interface ConnectionWarmer {
  28 + suspend fun fetch(url: String): Response
  29 +}
  30 +
  31 +class OkHttpConnectionWarmer
  32 +@Inject
  33 +constructor(
  34 + private val okHttpClient: OkHttpClient,
  35 +) : ConnectionWarmer {
  36 + @OptIn(ExperimentalCoroutinesApi::class)
  37 + override suspend fun fetch(url: String): Response {
  38 + val request = Request.Builder()
  39 + .url(url.toHttpUrl())
  40 + .method("HEAD", null)
  41 + .build()
  42 + return okHttpClient.newCall(request)
  43 + .executeAsync()
  44 + }
  45 +}
  1 +/*
  2 + * Copyright 2024 LiveKit, Inc.
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +
  17 +package io.livekit.android.util
  18 +
  19 +import kotlinx.coroutines.ExperimentalCoroutinesApi
  20 +import kotlinx.coroutines.suspendCancellableCoroutine
  21 +import okhttp3.Call
  22 +import okhttp3.Callback
  23 +import okhttp3.Response
  24 +import okhttp3.internal.closeQuietly
  25 +import okio.IOException
  26 +import kotlin.coroutines.resumeWithException
  27 +
  28 +// TODO: Switch to official executeAsync when released.
  29 +@ExperimentalCoroutinesApi // resume with a resource cleanup.
  30 +suspend fun Call.executeAsync(): Response =
  31 + suspendCancellableCoroutine { continuation ->
  32 + continuation.invokeOnCancellation {
  33 + this.cancel()
  34 + }
  35 + this.enqueue(
  36 + object : Callback {
  37 + override fun onFailure(
  38 + call: Call,
  39 + e: IOException,
  40 + ) {
  41 + continuation.resumeWithException(e)
  42 + }
  43 +
  44 + override fun onResponse(
  45 + call: Call,
  46 + response: Response,
  47 + ) {
  48 + continuation.resume(response) {
  49 + response.closeQuietly()
  50 + }
  51 + }
  52 + },
  53 + )
  54 + }
  1 +/*
  2 + * Copyright 2024 LiveKit, Inc.
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +
  17 +package io.livekit.android.util
  18 +
  19 +fun String.toWebsocketUrl(): String {
  20 + if (startsWith("http")) {
  21 + return replaceFirst("http", "ws")
  22 + }
  23 + return this
  24 +}
  25 +
  26 +fun String.toHttpUrl(): String {
  27 + if (startsWith("ws")) {
  28 + return replaceFirst("ws", "http")
  29 + }
  30 + return this
  31 +}
@@ -78,7 +78,7 @@ dependencies { @@ -78,7 +78,7 @@ dependencies {
78 implementation libs.timber 78 implementation libs.timber
79 implementation libs.coroutines.lib 79 implementation libs.coroutines.lib
80 implementation libs.kotlinx.serialization.json 80 implementation libs.kotlinx.serialization.json
81 - api libs.okhttp 81 + api libs.okhttp.lib
82 api libs.audioswitch 82 api libs.audioswitch
83 implementation libs.androidx.annotation 83 implementation libs.androidx.annotation
84 api libs.protobuf.javalite 84 api libs.protobuf.javalite
@@ -95,6 +95,7 @@ dependencies { @@ -95,6 +95,7 @@ dependencies {
95 95
96 testImplementation libs.junit 96 testImplementation libs.junit
97 testImplementation libs.robolectric 97 testImplementation libs.robolectric
  98 + testImplementation libs.okhttp.mockwebserver
98 kaptTest libs.dagger.compiler 99 kaptTest libs.dagger.compiler
99 100
100 androidTestImplementation libs.androidx.test.junit 101 androidTestImplementation libs.androidx.test.junit
@@ -23,6 +23,7 @@ import dagger.Reusable @@ -23,6 +23,7 @@ import dagger.Reusable
23 import io.livekit.android.memory.CloseableManager 23 import io.livekit.android.memory.CloseableManager
24 import io.livekit.android.room.network.NetworkCallbackManagerFactory 24 import io.livekit.android.room.network.NetworkCallbackManagerFactory
25 import io.livekit.android.room.network.NetworkCallbackManagerImpl 25 import io.livekit.android.room.network.NetworkCallbackManagerImpl
  26 +import io.livekit.android.room.util.ConnectionWarmer
26 import io.livekit.android.stats.NetworkInfo 27 import io.livekit.android.stats.NetworkInfo
27 import io.livekit.android.stats.NetworkType 28 import io.livekit.android.stats.NetworkType
28 import io.livekit.android.test.mock.MockNetworkCallbackRegistry 29 import io.livekit.android.test.mock.MockNetworkCallbackRegistry
@@ -49,6 +50,15 @@ object TestWebModule { @@ -49,6 +50,15 @@ object TestWebModule {
49 } 50 }
50 51
51 @Provides 52 @Provides
  53 + fun connectionWarmer(): ConnectionWarmer {
  54 + return object : ConnectionWarmer {
  55 + override suspend fun fetch(url: String): Response {
  56 + return Response.Builder().code(200).build()
  57 + }
  58 + }
  59 + }
  60 +
  61 + @Provides
52 @Singleton 62 @Singleton
53 fun websocketFactory(webSocketFactory: MockWebSocketFactory): WebSocket.Factory { 63 fun websocketFactory(webSocketFactory: MockWebSocketFactory): WebSocket.Factory {
54 return webSocketFactory 64 return webSocketFactory
  1 +/*
  2 + * Copyright 2024 LiveKit, Inc.
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +
  17 +package io.livekit.android.test.mock.room.util
  18 +
  19 +import io.livekit.android.room.util.ConnectionWarmer
  20 +import okhttp3.Response
  21 +
  22 +class MockConnectionWarmer : ConnectionWarmer {
  23 + override suspend fun fetch(url: String): Response {
  24 + return Response.Builder().code(200).build()
  25 + }
  26 +}
  1 +/*
  2 + * Copyright 2024 LiveKit, Inc.
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +
  17 +package io.livekit.android.room
  18 +
  19 +import io.livekit.android.test.BaseTest
  20 +import kotlinx.coroutines.ExperimentalCoroutinesApi
  21 +import kotlinx.serialization.json.Json
  22 +import okhttp3.OkHttpClient
  23 +import okhttp3.mockwebserver.MockResponse
  24 +import okhttp3.mockwebserver.MockWebServer
  25 +import org.junit.After
  26 +import org.junit.Assert.assertEquals
  27 +import org.junit.Assert.assertNotNull
  28 +import org.junit.Assert.assertNull
  29 +import org.junit.Before
  30 +import org.junit.Test
  31 +import org.junit.runner.RunWith
  32 +import org.robolectric.RobolectricTestRunner
  33 +
  34 +@OptIn(ExperimentalCoroutinesApi::class)
  35 +@RunWith(RobolectricTestRunner::class)
  36 +class RegionUrlProviderTest : BaseTest() {
  37 +
  38 + lateinit var server: MockWebServer
  39 +
  40 + @Before
  41 + fun setup() {
  42 + setRegionUrlProviderTesting(true)
  43 + }
  44 +
  45 + @After
  46 + fun tearDown() {
  47 + setRegionUrlProviderTesting(false)
  48 + }
  49 +
  50 + @Test
  51 + fun fetchRegionSettings() = runTest {
  52 + server = MockWebServer()
  53 + server.enqueue(MockResponse().setBody(regionResponse))
  54 + val regionUrlProvider = RegionUrlProvider(server.url("").toUri(), "token", OkHttpClient.Builder().build(), Json { ignoreUnknownKeys = true })
  55 +
  56 + val settings = regionUrlProvider.fetchRegionSettings()
  57 + assertNotNull(settings)
  58 +
  59 + val regions = settings!!.regions
  60 + assertNotNull(regions)
  61 + assertEquals(3, regions.size)
  62 + val regionA = regions[0]
  63 + assertEquals("a", regionA.region)
  64 + assertEquals("https://regiona.livekit.cloud", regionA.url)
  65 + assertEquals(100L, regionA.distance)
  66 + }
  67 +
  68 + @Test
  69 + fun getNextRegionUrl() = runTest {
  70 + server = MockWebServer()
  71 + server.enqueue(MockResponse().setBody(regionResponse))
  72 + val regionUrlProvider = RegionUrlProvider(server.url("").toUri(), "token", OkHttpClient.Builder().build(), Json { ignoreUnknownKeys = true })
  73 +
  74 + assertEquals("https://regiona.livekit.cloud", regionUrlProvider.getNextBestRegionUrl())
  75 + assertEquals("https://regionb.livekit.cloud", regionUrlProvider.getNextBestRegionUrl())
  76 + assertEquals("https://regionc.livekit.cloud", regionUrlProvider.getNextBestRegionUrl())
  77 + assertNull(regionUrlProvider.getNextBestRegionUrl())
  78 +
  79 + // Check that only one request was needed.
  80 + assertEquals(1, server.requestCount)
  81 + }
  82 +}
  83 +
  84 +private val regionResponse = """{
  85 + "regions": [
  86 + {
  87 + "region": "a",
  88 + "url": "https://regiona.livekit.cloud",
  89 + "distance": "100"
  90 + },
  91 + {
  92 + "region": "b",
  93 + "url": "https://regionb.livekit.cloud",
  94 + "distance": "1000"
  95 + },
  96 + {
  97 + "region": "c",
  98 + "url": "https://regionc.livekit.cloud",
  99 + "distance": "10000"
  100 + }
  101 + ]
  102 +}"""
@@ -39,6 +39,7 @@ import io.livekit.android.test.mock.MockEglBase @@ -39,6 +39,7 @@ import io.livekit.android.test.mock.MockEglBase
39 import io.livekit.android.test.mock.MockLKObjects 39 import io.livekit.android.test.mock.MockLKObjects
40 import io.livekit.android.test.mock.MockNetworkCallbackRegistry 40 import io.livekit.android.test.mock.MockNetworkCallbackRegistry
41 import io.livekit.android.test.mock.TestData 41 import io.livekit.android.test.mock.TestData
  42 +import io.livekit.android.test.mock.room.util.MockConnectionWarmer
42 import kotlinx.coroutines.ExperimentalCoroutinesApi 43 import kotlinx.coroutines.ExperimentalCoroutinesApi
43 import kotlinx.coroutines.async 44 import kotlinx.coroutines.async
44 import kotlinx.coroutines.flow.MutableSharedFlow 45 import kotlinx.coroutines.flow.MutableSharedFlow
@@ -84,6 +85,9 @@ class RoomTest { @@ -84,6 +85,9 @@ class RoomTest {
84 @Mock 85 @Mock
85 lateinit var e2EEManagerFactory: E2EEManager.Factory 86 lateinit var e2EEManagerFactory: E2EEManager.Factory
86 87
  88 + @Mock
  89 + lateinit var regionUrlProviderFactory: RegionUrlProvider.Factory
  90 +
87 lateinit var networkCallbackRegistry: MockNetworkCallbackRegistry 91 lateinit var networkCallbackRegistry: MockNetworkCallbackRegistry
88 92
89 var eglBase: EglBase = MockEglBase() 93 var eglBase: EglBase = MockEglBase()
@@ -125,6 +129,8 @@ class RoomTest { @@ -125,6 +129,8 @@ class RoomTest {
125 NetworkCallbackManagerImpl(networkCallback, networkCallbackRegistry) 129 NetworkCallbackManagerImpl(networkCallback, networkCallbackRegistry)
126 }, 130 },
127 audioDeviceModule = MockAudioDeviceModule(), 131 audioDeviceModule = MockAudioDeviceModule(),
  132 + regionUrlProviderFactory = regionUrlProviderFactory,
  133 + connectionWarmer = MockConnectionWarmer(),
128 ) 134 )
129 } 135 }
130 136