David Liu

cancel join continuation if websocket fails to connect

@@ -144,4 +144,6 @@ sealed class RoomEvent(val room: Room) : Event() { @@ -144,4 +144,6 @@ sealed class RoomEvent(val room: Room) : Event() {
144 class ConnectionQualityChanged(room: Room, val participant: Participant, val quality: ConnectionQuality) : 144 class ConnectionQualityChanged(room: Room, val participant: Participant, val quality: ConnectionQuality) :
145 RoomEvent(room) 145 RoomEvent(room)
146 146
  147 + class FailedToConnect(room: Room, val error: Throwable) : RoomEvent(room)
  148 +
147 } 149 }
@@ -385,7 +385,7 @@ internal constructor( @@ -385,7 +385,7 @@ internal constructor(
385 fun onConnectionQuality(updates: List<LivekitRtc.ConnectionQualityInfo>) 385 fun onConnectionQuality(updates: List<LivekitRtc.ConnectionQualityInfo>)
386 fun onSpeakersChanged(speakers: List<LivekitModels.SpeakerInfo>) 386 fun onSpeakersChanged(speakers: List<LivekitModels.SpeakerInfo>)
387 fun onDisconnect(reason: String) 387 fun onDisconnect(reason: String)
388 - fun onFailToConnect(error: Exception) 388 + fun onFailToConnect(error: Throwable)
389 fun onUserPacket(packet: LivekitModels.UserPacket, kind: LivekitModels.DataPacket.Kind) 389 fun onUserPacket(packet: LivekitModels.UserPacket, kind: LivekitModels.DataPacket.Kind)
390 fun onStreamStateUpdate(streamStates: List<LivekitRtc.StreamStateInfo>) 390 fun onStreamStateUpdate(streamStates: List<LivekitRtc.StreamStateInfo>)
391 } 391 }
@@ -521,7 +521,7 @@ internal constructor( @@ -521,7 +521,7 @@ internal constructor(
521 listener?.onDisconnect("") 521 listener?.onDisconnect("")
522 } 522 }
523 523
524 - override fun onError(error: Exception) { 524 + override fun onError(error: Throwable) {
525 listener?.onFailToConnect(error) 525 listener?.onFailToConnect(error)
526 } 526 }
527 527
@@ -20,7 +20,6 @@ import io.livekit.android.room.participant.* @@ -20,7 +20,6 @@ import io.livekit.android.room.participant.*
20 import io.livekit.android.room.track.* 20 import io.livekit.android.room.track.*
21 import io.livekit.android.util.FlowObservable 21 import io.livekit.android.util.FlowObservable
22 import io.livekit.android.util.LKLog 22 import io.livekit.android.util.LKLog
23 -import io.livekit.android.util.flow  
24 import io.livekit.android.util.flowDelegate 23 import io.livekit.android.util.flowDelegate
25 import kotlinx.coroutines.* 24 import kotlinx.coroutines.*
26 import livekit.LivekitModels 25 import livekit.LivekitModels
@@ -465,8 +464,10 @@ constructor( @@ -465,8 +464,10 @@ constructor(
465 /** 464 /**
466 * @suppress 465 * @suppress
467 */ 466 */
468 - override fun onFailToConnect(error: Exception) { 467 + override fun onFailToConnect(error: Throwable) {
469 listener?.onFailedToConnect(this, error) 468 listener?.onFailedToConnect(this, error)
  469 + // scope will likely be closed already here, so force it out of scope.
  470 + eventBus.tryPostEvent(RoomEvent.FailedToConnect(this, error))
470 } 471 }
471 472
472 //------------------------------- ParticipantListener --------------------------------// 473 //------------------------------- ParticipantListener --------------------------------//
@@ -613,7 +614,7 @@ interface RoomListener { @@ -613,7 +614,7 @@ interface RoomListener {
613 /** 614 /**
614 * Could not connect to the room 615 * Could not connect to the room
615 */ 616 */
616 - fun onFailedToConnect(room: Room, error: Exception) {} 617 + fun onFailedToConnect(room: Room, error: Throwable) {}
617 // fun onReconnecting(room: Room, error: Exception) {} 618 // fun onReconnecting(room: Room, error: Exception) {}
618 // fun onReconnect(room: Room) {} 619 // fun onReconnect(room: Room) {}
619 620
@@ -12,7 +12,6 @@ import io.livekit.android.util.safe @@ -12,7 +12,6 @@ import io.livekit.android.util.safe
12 import kotlinx.coroutines.* 12 import kotlinx.coroutines.*
13 import kotlinx.coroutines.flow.MutableSharedFlow 13 import kotlinx.coroutines.flow.MutableSharedFlow
14 import kotlinx.coroutines.flow.collect 14 import kotlinx.coroutines.flow.collect
15 -import kotlinx.coroutines.flow.collectLatest  
16 import kotlinx.serialization.decodeFromString 15 import kotlinx.serialization.decodeFromString
17 import kotlinx.serialization.encodeToString 16 import kotlinx.serialization.encodeToString
18 import kotlinx.serialization.json.Json 17 import kotlinx.serialization.json.Json
@@ -27,8 +26,6 @@ import org.webrtc.SessionDescription @@ -27,8 +26,6 @@ import org.webrtc.SessionDescription
27 import javax.inject.Inject 26 import javax.inject.Inject
28 import javax.inject.Named 27 import javax.inject.Named
29 import javax.inject.Singleton 28 import javax.inject.Singleton
30 -import kotlin.coroutines.Continuation  
31 -import kotlin.coroutines.suspendCoroutine  
32 29
33 /** 30 /**
34 * SignalClient to LiveKit WS servers 31 * SignalClient to LiveKit WS servers
@@ -186,10 +183,13 @@ constructor( @@ -186,10 +183,13 @@ constructor(
186 183
187 if (reason != null) { 184 if (reason != null) {
188 LKLog.e(t) { "websocket failure: $reason" } 185 LKLog.e(t) { "websocket failure: $reason" }
189 - listener?.onError(Exception(reason)) 186 + val error = Exception(reason)
  187 + listener?.onError(error)
  188 + joinContinuation?.cancel(error)
190 } else { 189 } else {
191 LKLog.e(t) { "websocket failure: $response" } 190 LKLog.e(t) { "websocket failure: $response" }
192 - listener?.onError(t as Exception) 191 + listener?.onError(t)
  192 + joinContinuation?.cancel(t)
193 } 193 }
194 } 194 }
195 195
@@ -443,7 +443,7 @@ constructor( @@ -443,7 +443,7 @@ constructor(
443 fun onRoomUpdate(update: LivekitModels.Room) 443 fun onRoomUpdate(update: LivekitModels.Room)
444 fun onConnectionQuality(updates: List<LivekitRtc.ConnectionQualityInfo>) 444 fun onConnectionQuality(updates: List<LivekitRtc.ConnectionQualityInfo>)
445 fun onLeave() 445 fun onLeave()
446 - fun onError(error: Exception) 446 + fun onError(error: Throwable)
447 fun onStreamStateUpdate(streamStates: List<LivekitRtc.StreamStateInfo>) 447 fun onStreamStateUpdate(streamStates: List<LivekitRtc.StreamStateInfo>)
448 } 448 }
449 449
@@ -69,6 +69,29 @@ class RoomMockE2ETest { @@ -69,6 +69,29 @@ class RoomMockE2ETest {
69 } 69 }
70 70
71 @Test 71 @Test
  72 + fun connectFailureProperlyContinues(){
  73 +
  74 + var didThrowException = false
  75 + val job = coroutineRule.scope.launch {
  76 + try {
  77 + room.connect(
  78 + url = "http://www.example.com",
  79 + token = "",
  80 + )
  81 + } catch (e: Throwable) {
  82 + didThrowException = true
  83 + }
  84 + }
  85 +
  86 + wsFactory.listener.onFailure(wsFactory.ws, Exception(), null)
  87 +
  88 + runBlockingTest {
  89 + job.join()
  90 + }
  91 +
  92 + Assert.assertTrue(didThrowException)
  93 + }
  94 + @Test
72 fun roomUpdateTest() { 95 fun roomUpdateTest() {
73 connect() 96 connect()
74 val eventCollector = EventCollector(room.events, coroutineRule.scope) 97 val eventCollector = EventCollector(room.events, coroutineRule.scope)
@@ -4,9 +4,10 @@ import android.app.Application @@ -4,9 +4,10 @@ import android.app.Application
4 import android.content.Intent 4 import android.content.Intent
5 import androidx.lifecycle.* 5 import androidx.lifecycle.*
6 import com.github.ajalt.timberkt.Timber 6 import com.github.ajalt.timberkt.Timber
7 -import io.livekit.android.ConnectOptions  
8 import io.livekit.android.LiveKit 7 import io.livekit.android.LiveKit
9 import io.livekit.android.RoomOptions 8 import io.livekit.android.RoomOptions
  9 +import io.livekit.android.events.RoomEvent
  10 +import io.livekit.android.events.collect
10 import io.livekit.android.room.Room 11 import io.livekit.android.room.Room
11 import io.livekit.android.room.RoomListener 12 import io.livekit.android.room.RoomListener
12 import io.livekit.android.room.participant.Participant 13 import io.livekit.android.room.participant.Participant
@@ -40,6 +41,9 @@ class CallViewModel( @@ -40,6 +41,9 @@ class CallViewModel(
40 } 41 }
41 } 42 }
42 43
  44 + private val mutableError = MutableStateFlow<Throwable?>(null)
  45 + val error = mutableError.hide()
  46 +
43 private val mutablePrimarySpeaker = MutableStateFlow<Participant?>(null) 47 private val mutablePrimarySpeaker = MutableStateFlow<Participant?>(null)
44 val primarySpeaker: StateFlow<Participant?> = mutablePrimarySpeaker 48 val primarySpeaker: StateFlow<Participant?> = mutablePrimarySpeaker
45 49
@@ -67,24 +71,36 @@ class CallViewModel( @@ -67,24 +71,36 @@ class CallViewModel(
67 71
68 init { 72 init {
69 viewModelScope.launch { 73 viewModelScope.launch {
70 - val room = LiveKit.connect(  
71 - application,  
72 - url,  
73 - token,  
74 - roomOptions = RoomOptions(autoManageVideo = true),  
75 - listener = this@CallViewModel  
76 - )  
77 -  
78 - // Create and publish audio/video tracks  
79 - val localParticipant = room.localParticipant  
80 - localParticipant.setMicrophoneEnabled(true)  
81 - mutableMicEnabled.postValue(localParticipant.isMicrophoneEnabled())  
82 -  
83 - localParticipant.setCameraEnabled(true)  
84 - mutableCameraEnabled.postValue(localParticipant.isCameraEnabled())  
85 - mutableRoom.value = room  
86 -  
87 - mutablePrimarySpeaker.value = room.remoteParticipants.values.firstOrNull() ?: localParticipant 74 + try {
  75 + val room = LiveKit.connect(
  76 + application,
  77 + url,
  78 + token,
  79 + roomOptions = RoomOptions(autoManageVideo = true),
  80 + listener = this@CallViewModel
  81 + )
  82 +
  83 + // Create and publish audio/video tracks
  84 + val localParticipant = room.localParticipant
  85 + localParticipant.setMicrophoneEnabled(true)
  86 + mutableMicEnabled.postValue(localParticipant.isMicrophoneEnabled())
  87 +
  88 + localParticipant.setCameraEnabled(true)
  89 + mutableCameraEnabled.postValue(localParticipant.isCameraEnabled())
  90 + mutableRoom.value = room
  91 +
  92 + mutablePrimarySpeaker.value = room.remoteParticipants.values.firstOrNull() ?: localParticipant
  93 +
  94 + viewModelScope.launch {
  95 + room.events.collect {
  96 + when (it) {
  97 + is RoomEvent.FailedToConnect -> mutableError.value = it.error
  98 + }
  99 + }
  100 + }
  101 + } catch (e: Throwable) {
  102 + mutableError.value = e
  103 + }
88 } 104 }
89 } 105 }
90 106
@@ -166,6 +182,12 @@ class CallViewModel( @@ -166,6 +182,12 @@ class CallViewModel(
166 videoTrack.restartTrack(newOptions) 182 videoTrack.restartTrack(newOptions)
167 } 183 }
168 } 184 }
  185 +
  186 + fun dismissError() {
  187 + mutableError.value = null
  188 + }
169 } 189 }
170 190
171 private fun <T> LiveData<T>.hide(): LiveData<T> = this 191 private fun <T> LiveData<T>.hide(): LiveData<T> = this
  192 +
  193 +private fun <T> MutableStateFlow<T>.hide(): StateFlow<T> = this
@@ -5,6 +5,7 @@ import android.media.AudioManager @@ -5,6 +5,7 @@ import android.media.AudioManager
5 import android.media.projection.MediaProjectionManager 5 import android.media.projection.MediaProjectionManager
6 import android.os.Bundle 6 import android.os.Bundle
7 import android.os.Parcelable 7 import android.os.Parcelable
  8 +import android.widget.Toast
8 import androidx.activity.compose.setContent 9 import androidx.activity.compose.setContent
9 import androidx.activity.result.contract.ActivityResultContracts 10 import androidx.activity.result.contract.ActivityResultContracts
10 import androidx.appcompat.app.AppCompatActivity 11 import androidx.appcompat.app.AppCompatActivity
@@ -23,6 +24,7 @@ import androidx.compose.ui.tooling.preview.Preview @@ -23,6 +24,7 @@ import androidx.compose.ui.tooling.preview.Preview
23 import androidx.compose.ui.unit.dp 24 import androidx.compose.ui.unit.dp
24 import androidx.constraintlayout.compose.ConstraintLayout 25 import androidx.constraintlayout.compose.ConstraintLayout
25 import androidx.constraintlayout.compose.Dimension 26 import androidx.constraintlayout.compose.Dimension
  27 +import androidx.lifecycle.lifecycleScope
26 import com.github.ajalt.timberkt.Timber 28 import com.github.ajalt.timberkt.Timber
27 import com.google.accompanist.pager.ExperimentalPagerApi 29 import com.google.accompanist.pager.ExperimentalPagerApi
28 import io.livekit.android.composesample.ui.theme.AppTheme 30 import io.livekit.android.composesample.ui.theme.AppTheme
@@ -30,6 +32,8 @@ import io.livekit.android.room.Room @@ -30,6 +32,8 @@ import io.livekit.android.room.Room
30 import io.livekit.android.room.participant.Participant 32 import io.livekit.android.room.participant.Participant
31 import io.livekit.android.sample.CallViewModel 33 import io.livekit.android.sample.CallViewModel
32 import kotlinx.coroutines.Dispatchers 34 import kotlinx.coroutines.Dispatchers
  35 +import kotlinx.coroutines.flow.collect
  36 +import kotlinx.coroutines.launch
33 import kotlinx.parcelize.Parcelize 37 import kotlinx.parcelize.Parcelize
34 38
35 @OptIn(ExperimentalPagerApi::class) 39 @OptIn(ExperimentalPagerApi::class)
@@ -82,6 +86,15 @@ class CallActivity : AppCompatActivity() { @@ -82,6 +86,15 @@ class CallActivity : AppCompatActivity() {
82 Timber.v { "Audio focus request failed" } 86 Timber.v { "Audio focus request failed" }
83 } 87 }
84 88
  89 + lifecycleScope.launchWhenStarted {
  90 + viewModel.error.collect {
  91 + if(it != null){
  92 + Toast.makeText(this@CallActivity, "Error: $it", Toast.LENGTH_LONG).show()
  93 + viewModel.dismissError()
  94 + }
  95 + }
  96 + }
  97 +
85 // Setup compose view. 98 // Setup compose view.
86 setContent { 99 setContent {
87 val room by viewModel.room.collectAsState() 100 val room by viewModel.room.collectAsState()
@@ -127,6 +140,8 @@ class CallActivity : AppCompatActivity() { @@ -127,6 +140,8 @@ class CallActivity : AppCompatActivity() {
127 flipButtonEnabled: Boolean = true, 140 flipButtonEnabled: Boolean = true,
128 screencastEnabled: Boolean = false, 141 screencastEnabled: Boolean = false,
129 onExitClick: () -> Unit = {}, 142 onExitClick: () -> Unit = {},
  143 + error: Throwable? = null,
  144 + onSnackbarDismiss: () -> Unit = {}
130 ) { 145 ) {
131 AppTheme(darkTheme = true) { 146 AppTheme(darkTheme = true) {
132 ConstraintLayout( 147 ConstraintLayout(
@@ -271,6 +286,32 @@ class CallActivity : AppCompatActivity() { @@ -271,6 +286,32 @@ class CallActivity : AppCompatActivity() {
271 ) 286 )
272 } 287 }
273 } 288 }
  289 +
  290 + // Snack bar for errors
  291 + val scaffoldState = rememberScaffoldState()
  292 + val scope = rememberCoroutineScope()
  293 + if(error != null) {
  294 + Scaffold(
  295 + scaffoldState = scaffoldState,
  296 + floatingActionButton = {
  297 + ExtendedFloatingActionButton(
  298 + text = { Text("Show snackbar") },
  299 + onClick = {
  300 + // show snackbar as a suspend function
  301 + scope.launch {
  302 + scaffoldState.snackbarHostState.showSnackbar(error?.toString() ?: "")
  303 + }
  304 + }
  305 + )
  306 + },
  307 + content = { innerPadding ->
  308 + Text(
  309 + text = "Body content",
  310 + modifier = Modifier.padding(innerPadding).fillMaxSize().wrapContentSize()
  311 + )
  312 + }
  313 + )
  314 + }
274 } 315 }
275 } 316 }
276 } 317 }
@@ -6,6 +6,7 @@ import android.media.projection.MediaProjectionManager @@ -6,6 +6,7 @@ import android.media.projection.MediaProjectionManager
6 import android.os.Bundle 6 import android.os.Bundle
7 import android.os.Parcelable 7 import android.os.Parcelable
8 import android.view.View 8 import android.view.View
  9 +import android.widget.Toast
9 import androidx.activity.result.contract.ActivityResultContracts 10 import androidx.activity.result.contract.ActivityResultContracts
10 import androidx.appcompat.app.AppCompatActivity 11 import androidx.appcompat.app.AppCompatActivity
11 import androidx.lifecycle.lifecycleScope 12 import androidx.lifecycle.lifecycleScope
@@ -71,6 +72,15 @@ class CallActivity : AppCompatActivity() { @@ -71,6 +72,15 @@ class CallActivity : AppCompatActivity() {
71 } 72 }
72 } 73 }
73 74
  75 + lifecycleScope.launchWhenStarted {
  76 + viewModel.error.collect {
  77 + if(it != null){
  78 + Toast.makeText(this@CallActivity, "Error: $it", Toast.LENGTH_LONG).show()
  79 + viewModel.dismissError()
  80 + }
  81 + }
  82 + }
  83 +
74 // speaker view setup 84 // speaker view setup
75 lifecycleScope.launchWhenCreated { 85 lifecycleScope.launchWhenCreated {
76 viewModel.room.filterNotNull().take(1) 86 viewModel.room.filterNotNull().take(1)