Jean Kruger
Committed by GitHub

Fixes deadlock on publish track (#604)

* Fixes deadlock on publish track

During a publish track, if the room is leaved before the the track is published, we can't publish a track anymore. Let's abord the pending publish track on leave event and when the websocket is disconnected

* Add changeset

* Lint fixes
  1 +---
  2 +"client-sdk-android": patch
  3 +---
  4 +
  5 +Fixes deadlock on publish track
@@ -86,6 +86,7 @@ import javax.inject.Named @@ -86,6 +86,7 @@ import javax.inject.Named
86 import javax.inject.Singleton 86 import javax.inject.Singleton
87 import kotlin.coroutines.Continuation 87 import kotlin.coroutines.Continuation
88 import kotlin.coroutines.resume 88 import kotlin.coroutines.resume
  89 +import kotlin.coroutines.resumeWithException
89 import kotlin.coroutines.suspendCoroutine 90 import kotlin.coroutines.suspendCoroutine
90 91
91 /** 92 /**
@@ -324,12 +325,17 @@ internal constructor( @@ -324,12 +325,17 @@ internal constructor(
324 stream: String?, 325 stream: String?,
325 builder: LivekitRtc.AddTrackRequest.Builder = LivekitRtc.AddTrackRequest.newBuilder(), 326 builder: LivekitRtc.AddTrackRequest.Builder = LivekitRtc.AddTrackRequest.newBuilder(),
326 ): LivekitModels.TrackInfo { 327 ): LivekitModels.TrackInfo {
327 - if (pendingTrackResolvers[cid] != null) {  
328 - throw TrackException.DuplicateTrackException("Track with same ID $cid has already been published!") 328 + synchronized(pendingTrackResolvers) {
  329 + if (pendingTrackResolvers[cid] != null) {
  330 + throw TrackException.DuplicateTrackException("Track with same ID $cid has already been published!")
  331 + }
329 } 332 }
  333 +
330 // Suspend until signal client receives message confirming track publication. 334 // Suspend until signal client receives message confirming track publication.
331 return suspendCoroutine { cont -> 335 return suspendCoroutine { cont ->
332 - pendingTrackResolvers[cid] = cont 336 + synchronized(pendingTrackResolvers) {
  337 + pendingTrackResolvers[cid] = cont
  338 + }
333 client.sendAddTrack( 339 client.sendAddTrack(
334 cid = cid, 340 cid = cid,
335 name = name, 341 name = name,
@@ -380,6 +386,7 @@ internal constructor( @@ -380,6 +386,7 @@ internal constructor(
380 lastRoomOptions = null 386 lastRoomOptions = null
381 participantSid = null 387 participantSid = null
382 regionUrlProvider = null 388 regionUrlProvider = null
  389 + abortPendingPublishTracks()
383 closeResources(reason) 390 closeResources(reason)
384 connectionState = ConnectionState.DISCONNECTED 391 connectionState = ConnectionState.DISCONNECTED
385 } 392 }
@@ -416,6 +423,15 @@ internal constructor( @@ -416,6 +423,15 @@ internal constructor(
416 client.close(reason = reason) 423 client.close(reason = reason)
417 } 424 }
418 425
  426 + private fun abortPendingPublishTracks() {
  427 + synchronized(pendingTrackResolvers) {
  428 + pendingTrackResolvers.values.forEach {
  429 + it.resumeWithException(TrackException.PublishException("pending track aborted"))
  430 + }
  431 + pendingTrackResolvers.clear()
  432 + }
  433 + }
  434 +
419 /** 435 /**
420 * reconnect Signal and PeerConnections 436 * reconnect Signal and PeerConnections
421 */ 437 */
@@ -907,7 +923,9 @@ internal constructor( @@ -907,7 +923,9 @@ internal constructor(
907 } 923 }
908 924
909 LKLog.v { "local track published $cid" } 925 LKLog.v { "local track published $cid" }
910 - val cont = pendingTrackResolvers.remove(cid) 926 + val cont = synchronized(pendingTrackResolvers) {
  927 + pendingTrackResolvers.remove(cid)
  928 + }
911 if (cont == null) { 929 if (cont == null) {
912 LKLog.d { "missing track resolver for: $cid" } 930 LKLog.d { "missing track resolver for: $cid" }
913 return 931 return
@@ -929,6 +947,7 @@ internal constructor( @@ -929,6 +947,7 @@ internal constructor(
929 947
930 override fun onClose(reason: String, code: Int) { 948 override fun onClose(reason: String, code: Int) {
931 LKLog.i { "received close event: $reason, code: $code" } 949 LKLog.i { "received close event: $reason, code: $code" }
  950 + abortPendingPublishTracks()
932 reconnect() 951 reconnect()
933 } 952 }
934 953
@@ -946,6 +965,9 @@ internal constructor( @@ -946,6 +965,9 @@ internal constructor(
946 965
947 override fun onLeave(leave: LeaveRequest) { 966 override fun onLeave(leave: LeaveRequest) {
948 LKLog.d { "leave request received: reason = ${leave.reason.name}" } 967 LKLog.d { "leave request received: reason = ${leave.reason.name}" }
  968 +
  969 + abortPendingPublishTracks()
  970 +
949 if (leave.hasRegions()) { 971 if (leave.hasRegions()) {
950 regionUrlProvider?.let { 972 regionUrlProvider?.let {
951 it.setServerReportedRegions(RegionSettings.fromProto(leave.regions)) 973 it.setServerReportedRegions(RegionSettings.fromProto(leave.regions))
@@ -482,13 +482,19 @@ internal constructor( @@ -482,13 +482,19 @@ internal constructor(
482 val builder = AddTrackRequest.newBuilder().apply { 482 val builder = AddTrackRequest.newBuilder().apply {
483 this.requestConfig() 483 this.requestConfig()
484 } 484 }
485 - val trackInfo = engine.addTrack(  
486 - cid = cid,  
487 - name = options.name ?: track.name,  
488 - kind = track.kind.toProto(),  
489 - stream = options.stream,  
490 - builder = builder,  
491 - ) 485 +
  486 + val trackInfo = try {
  487 + engine.addTrack(
  488 + cid = cid,
  489 + name = options.name ?: track.name,
  490 + kind = track.kind.toProto(),
  491 + stream = options.stream,
  492 + builder = builder,
  493 + )
  494 + } catch (e: Exception) {
  495 + publishListener?.onPublishFailure(TrackException.PublishException("Failed to publish track", e))
  496 + return null
  497 + }
492 498
493 if (options is VideoTrackPublishOptions) { 499 if (options is VideoTrackPublishOptions) {
494 // server might not support the codec the client has requested, in that case, fallback 500 // server might not support the codec the client has requested, in that case, fallback