David Liu

Ensure that rtc engine is setup prior to any other further messages being consumed.

@@ -2,9 +2,24 @@ package io.livekit.android.dagger @@ -2,9 +2,24 @@ package io.livekit.android.dagger
2 2
3 object InjectionNames { 3 object InjectionNames {
4 4
  5 + /**
  6 + * @see [kotlinx.coroutines.Dispatchers.Default]
  7 + */
5 internal const val DISPATCHER_DEFAULT = "dispatcher_default" 8 internal const val DISPATCHER_DEFAULT = "dispatcher_default"
  9 +
  10 + /**
  11 + * @see [kotlinx.coroutines.Dispatchers.IO]
  12 + */
6 internal const val DISPATCHER_IO = "dispatcher_io"; 13 internal const val DISPATCHER_IO = "dispatcher_io";
  14 +
  15 + /**
  16 + * @see [kotlinx.coroutines.Dispatchers.Main]
  17 + */
7 internal const val DISPATCHER_MAIN = "dispatcher_main" 18 internal const val DISPATCHER_MAIN = "dispatcher_main"
  19 +
  20 + /**
  21 + * @see [kotlinx.coroutines.Dispatchers.Unconfined]
  22 + */
8 internal const val DISPATCHER_UNCONFINED = "dispatcher_unconfined" 23 internal const val DISPATCHER_UNCONFINED = "dispatcher_unconfined"
9 24
10 internal const val SIGNAL_JSON_ENABLED = "signal_json_enabled" 25 internal const val SIGNAL_JSON_ENABLED = "signal_json_enabled"
@@ -106,6 +106,7 @@ internal constructor( @@ -106,6 +106,7 @@ internal constructor(
106 if (!this.isSubscriberPrimary) { 106 if (!this.isSubscriberPrimary) {
107 negotiate() 107 negotiate()
108 } 108 }
  109 + client.onReady()
109 return joinResponse 110 return joinResponse
110 } 111 }
111 112
@@ -5,11 +5,14 @@ import io.livekit.android.ConnectOptions @@ -5,11 +5,14 @@ import io.livekit.android.ConnectOptions
5 import io.livekit.android.Version 5 import io.livekit.android.Version
6 import io.livekit.android.dagger.InjectionNames 6 import io.livekit.android.dagger.InjectionNames
7 import io.livekit.android.room.track.Track 7 import io.livekit.android.room.track.Track
  8 +import io.livekit.android.util.CloseableCoroutineScope
8 import io.livekit.android.util.Either 9 import io.livekit.android.util.Either
9 import io.livekit.android.util.LKLog 10 import io.livekit.android.util.LKLog
10 import io.livekit.android.util.safe 11 import io.livekit.android.util.safe
11 -import kotlinx.coroutines.CancellableContinuation  
12 -import kotlinx.coroutines.suspendCancellableCoroutine 12 +import kotlinx.coroutines.*
  13 +import kotlinx.coroutines.flow.MutableSharedFlow
  14 +import kotlinx.coroutines.flow.collect
  15 +import kotlinx.coroutines.flow.collectLatest
13 import kotlinx.serialization.decodeFromString 16 import kotlinx.serialization.decodeFromString
14 import kotlinx.serialization.encodeToString 17 import kotlinx.serialization.encodeToString
15 import kotlinx.serialization.json.Json 18 import kotlinx.serialization.json.Json
@@ -40,6 +43,8 @@ constructor( @@ -40,6 +43,8 @@ constructor(
40 private val okHttpClient: OkHttpClient, 43 private val okHttpClient: OkHttpClient,
41 @Named(InjectionNames.SIGNAL_JSON_ENABLED) 44 @Named(InjectionNames.SIGNAL_JSON_ENABLED)
42 private val useJson: Boolean, 45 private val useJson: Boolean,
  46 + @Named(InjectionNames.DISPATCHER_IO)
  47 + ioDispatcher: CoroutineDispatcher,
43 ) : WebSocketListener() { 48 ) : WebSocketListener() {
44 var isConnected = false 49 var isConnected = false
45 private set 50 private set
@@ -49,7 +54,9 @@ constructor( @@ -49,7 +54,9 @@ constructor(
49 private var lastUrl: String? = null 54 private var lastUrl: String? = null
50 55
51 private var joinContinuation: CancellableContinuation<Either<LivekitRtc.JoinResponse, Unit>>? = null 56 private var joinContinuation: CancellableContinuation<Either<LivekitRtc.JoinResponse, Unit>>? = null
  57 + private val coroutineScope = CloseableCoroutineScope(SupervisorJob() + ioDispatcher)
52 58
  59 + private val responseFlow = MutableSharedFlow<LivekitRtc.SignalResponse>(Int.MAX_VALUE)
53 suspend fun join( 60 suspend fun join(
54 url: String, 61 url: String,
55 token: String, 62 token: String,
@@ -114,6 +121,15 @@ constructor( @@ -114,6 +121,15 @@ constructor(
114 } 121 }
115 } 122 }
116 123
  124 + @ExperimentalCoroutinesApi
  125 + fun onReady(){
  126 + coroutineScope.launch {
  127 + responseFlow.collect {
  128 + responseFlow.resetReplayCache()
  129 + handleSignalResponseImpl(it)
  130 + }
  131 + }
  132 + }
117 //--------------------------------- WebSocket Listener --------------------------------------// 133 //--------------------------------- WebSocket Listener --------------------------------------//
118 override fun onOpen(webSocket: WebSocket, response: Response) { 134 override fun onOpen(webSocket: WebSocket, response: Response) {
119 if (isReconnecting) { 135 if (isReconnecting) {
@@ -335,7 +351,11 @@ constructor( @@ -335,7 +351,11 @@ constructor(
335 } 351 }
336 return 352 return
337 } 353 }
338 - 354 + coroutineScope.launch {
  355 + responseFlow.tryEmit(response)
  356 + }
  357 + }
  358 + private fun handleSignalResponseImpl(response: LivekitRtc.SignalResponse) {
339 LKLog.v { "response: $response" } 359 LKLog.v { "response: $response" }
340 when (response.messageCase) { 360 when (response.messageCase) {
341 LivekitRtc.SignalResponse.MessageCase.ANSWER -> { 361 LivekitRtc.SignalResponse.MessageCase.ANSWER -> {
@@ -386,6 +406,7 @@ constructor( @@ -386,6 +406,7 @@ constructor(
386 406
387 fun close() { 407 fun close() {
388 isConnected = false 408 isConnected = false
  409 + coroutineScope.close()
389 currentWs?.close(1000, "Normal Closure") 410 currentWs?.close(1000, "Normal Closure")
390 } 411 }
391 412