davidliu
Committed by GitHub

Clear buffered responses on SignalClient closure (#141)

* clear response/request flow in case anything previously unhandled remains

* Add in some verbose logging whenever SignalClient closes

* more comments and disable request buffer clearing
@@ -294,7 +294,7 @@ internal constructor( @@ -294,7 +294,7 @@ internal constructor(
294 client.sendMuteTrack(sid, muted) 294 client.sendMuteTrack(sid, muted)
295 } 295 }
296 296
297 - fun close() { 297 + fun close(reason: String = "Normal Closure") {
298 if (isClosed) { 298 if (isClosed) {
299 return 299 return
300 } 300 }
@@ -307,10 +307,10 @@ internal constructor( @@ -307,10 +307,10 @@ internal constructor(
307 reconnectingJob?.cancel() 307 reconnectingJob?.cancel()
308 reconnectingJob = null 308 reconnectingJob = null
309 coroutineScope.close() 309 coroutineScope.close()
310 - closeResources() 310 + closeResources(reason)
311 } 311 }
312 312
313 - private fun closeResources() { 313 + private fun closeResources(reason: String) {
314 connectionState = ConnectionState.DISCONNECTED 314 connectionState = ConnectionState.DISCONNECTED
315 _publisher?.close() 315 _publisher?.close()
316 _publisher = null 316 _publisher = null
@@ -325,7 +325,7 @@ internal constructor( @@ -325,7 +325,7 @@ internal constructor(
325 lossyDataChannelSub?.close() 325 lossyDataChannelSub?.close()
326 lossyDataChannelSub = null 326 lossyDataChannelSub = null
327 isSubscriberPrimary = false 327 isSubscriberPrimary = false
328 - client.close() 328 + client.close(reason = reason)
329 } 329 }
330 330
331 /** 331 /**
@@ -372,7 +372,7 @@ internal constructor( @@ -372,7 +372,7 @@ internal constructor(
372 if (isFullReconnect) { 372 if (isFullReconnect) {
373 LKLog.v { "Attempting full reconnect." } 373 LKLog.v { "Attempting full reconnect." }
374 try { 374 try {
375 - closeResources() 375 + closeResources("Full Reconnecting")
376 listener?.onFullReconnecting() 376 listener?.onFullReconnecting()
377 joinImpl(url, token, connectOptions ?: ConnectOptions(), lastRoomOptions ?: RoomOptions()) 377 joinImpl(url, token, connectOptions ?: ConnectOptions(), lastRoomOptions ?: RoomOptions())
378 } catch (e: Exception) { 378 } catch (e: Exception) {
@@ -437,7 +437,7 @@ internal constructor( @@ -437,7 +437,7 @@ internal constructor(
437 } 437 }
438 } 438 }
439 439
440 - close() 440 + close("Failed reconnecting")
441 listener?.onEngineDisconnected("failed reconnecting.") 441 listener?.onEngineDisconnected("failed reconnecting.")
442 } 442 }
443 443
@@ -34,6 +34,7 @@ import javax.inject.Singleton @@ -34,6 +34,7 @@ import javax.inject.Singleton
34 * SignalClient to LiveKit WS servers 34 * SignalClient to LiveKit WS servers
35 * @suppress 35 * @suppress
36 */ 36 */
  37 +@OptIn(ExperimentalCoroutinesApi::class)
37 @Singleton 38 @Singleton
38 class SignalClient 39 class SignalClient
39 @Inject 40 @Inject
@@ -99,7 +100,7 @@ constructor( @@ -99,7 +100,7 @@ constructor(
99 roomOptions: RoomOptions 100 roomOptions: RoomOptions
100 ): Either<LivekitRtc.JoinResponse, Unit> { 101 ): Either<LivekitRtc.JoinResponse, Unit> {
101 // Clean up any pre-existing connection. 102 // Clean up any pre-existing connection.
102 - close() 103 + close(reason = "Starting new connection")
103 104
104 val wsUrlString = "$url/rtc" + createConnectionParams(token, getClientInfo(), options, roomOptions) 105 val wsUrlString = "$url/rtc" + createConnectionParams(token, getClientInfo(), options, roomOptions)
105 isReconnecting = options.reconnect 106 isReconnecting = options.reconnect
@@ -163,7 +164,6 @@ constructor( @@ -163,7 +164,6 @@ constructor(
163 * 164 *
164 * Should be called after resolving the join message. 165 * Should be called after resolving the join message.
165 */ 166 */
166 - @OptIn(ExperimentalCoroutinesApi::class)  
167 fun onReadyForResponses() { 167 fun onReadyForResponses() {
168 coroutineScope.launch { 168 coroutineScope.launch {
169 responseFlow.collect { 169 responseFlow.collect {
@@ -561,6 +561,7 @@ constructor( @@ -561,6 +561,7 @@ constructor(
561 * Can be reused afterwards. 561 * Can be reused afterwards.
562 */ 562 */
563 fun close(code: Int = 1000, reason: String = "Normal Closure") { 563 fun close(code: Int = 1000, reason: String = "Normal Closure") {
  564 + LKLog.v(Exception()) { "Closing SignalClient: code = $code, reason = $reason" }
564 isConnected = false 565 isConnected = false
565 isReconnecting = false 566 isReconnecting = false
566 requestFlowJob = null 567 requestFlowJob = null
@@ -571,6 +572,9 @@ constructor( @@ -571,6 +572,9 @@ constructor(
571 currentWs = null 572 currentWs = null
572 joinContinuation?.cancel() 573 joinContinuation?.cancel()
573 joinContinuation = null 574 joinContinuation = null
  575 + // TODO: support calling this from connect without wiping any queued requests.
  576 + //requestFlow.resetReplayCache()
  577 + responseFlow.resetReplayCache()
574 lastUrl = null 578 lastUrl = null
575 lastOptions = null 579 lastOptions = null
576 lastRoomOptions = null 580 lastRoomOptions = null