davidliu
Committed by GitHub

Fix publish deadlocks (#618)

* Fast fail attempts to publish without permissions

* Fix publish deadlock when no response from server
  1 +---
  2 +"client-sdk-android": patch
  3 +---
  4 +
  5 +Fix publish deadlock when no response from server
  1 +---
  2 +"client-sdk-android": patch
  3 +---
  4 +
  5 +Fast fail attempts to publish without permissions
@@ -54,8 +54,10 @@ import kotlinx.coroutines.ensureActive @@ -54,8 +54,10 @@ import kotlinx.coroutines.ensureActive
54 import kotlinx.coroutines.joinAll 54 import kotlinx.coroutines.joinAll
55 import kotlinx.coroutines.launch 55 import kotlinx.coroutines.launch
56 import kotlinx.coroutines.runBlocking 56 import kotlinx.coroutines.runBlocking
  57 +import kotlinx.coroutines.suspendCancellableCoroutine
57 import kotlinx.coroutines.sync.Mutex 58 import kotlinx.coroutines.sync.Mutex
58 import kotlinx.coroutines.sync.withLock 59 import kotlinx.coroutines.sync.withLock
  60 +import kotlinx.coroutines.withTimeout
59 import kotlinx.coroutines.withTimeoutOrNull 61 import kotlinx.coroutines.withTimeoutOrNull
60 import kotlinx.coroutines.yield 62 import kotlinx.coroutines.yield
61 import livekit.LivekitModels 63 import livekit.LivekitModels
@@ -87,7 +89,7 @@ import javax.inject.Singleton @@ -87,7 +89,7 @@ import javax.inject.Singleton
87 import kotlin.coroutines.Continuation 89 import kotlin.coroutines.Continuation
88 import kotlin.coroutines.resume 90 import kotlin.coroutines.resume
89 import kotlin.coroutines.resumeWithException 91 import kotlin.coroutines.resumeWithException
90 -import kotlin.coroutines.suspendCoroutine 92 +import kotlin.time.Duration.Companion.seconds
91 93
92 /** 94 /**
93 * @suppress 95 * @suppress
@@ -332,17 +334,19 @@ internal constructor( @@ -332,17 +334,19 @@ internal constructor(
332 } 334 }
333 335
334 // Suspend until signal client receives message confirming track publication. 336 // Suspend until signal client receives message confirming track publication.
335 - return suspendCoroutine { cont ->  
336 - synchronized(pendingTrackResolvers) {  
337 - pendingTrackResolvers[cid] = cont 337 + return withTimeout(20.seconds) {
  338 + suspendCancellableCoroutine { cont ->
  339 + synchronized(pendingTrackResolvers) {
  340 + pendingTrackResolvers[cid] = cont
  341 + }
  342 + client.sendAddTrack(
  343 + cid = cid,
  344 + name = name,
  345 + type = kind,
  346 + stream = stream,
  347 + builder = builder,
  348 + )
338 } 349 }
339 - client.sendAddTrack(  
340 - cid = cid,  
341 - name = name,  
342 - type = kind,  
343 - stream = stream,  
344 - builder = builder,  
345 - )  
346 } 350 }
347 } 351 }
348 352
@@ -252,6 +252,7 @@ internal constructor( @@ -252,6 +252,7 @@ internal constructor(
252 * @see Room.videoTrackCaptureDefaults 252 * @see Room.videoTrackCaptureDefaults
253 * @see Room.videoTrackPublishDefaults 253 * @see Room.videoTrackPublishDefaults
254 */ 254 */
  255 + @Throws(TrackException.PublishException::class)
255 suspend fun setCameraEnabled(enabled: Boolean) { 256 suspend fun setCameraEnabled(enabled: Boolean) {
256 setTrackEnabled(Track.Source.CAMERA, enabled) 257 setTrackEnabled(Track.Source.CAMERA, enabled)
257 } 258 }
@@ -266,6 +267,7 @@ internal constructor( @@ -266,6 +267,7 @@ internal constructor(
266 * @see Room.audioTrackCaptureDefaults 267 * @see Room.audioTrackCaptureDefaults
267 * @see Room.audioTrackPublishDefaults 268 * @see Room.audioTrackPublishDefaults
268 */ 269 */
  270 + @Throws(TrackException.PublishException::class)
269 suspend fun setMicrophoneEnabled(enabled: Boolean) { 271 suspend fun setMicrophoneEnabled(enabled: Boolean) {
270 setTrackEnabled(Track.Source.MICROPHONE, enabled) 272 setTrackEnabled(Track.Source.MICROPHONE, enabled)
271 } 273 }
@@ -286,6 +288,7 @@ internal constructor( @@ -286,6 +288,7 @@ internal constructor(
286 * @see Room.screenShareTrackPublishDefaults 288 * @see Room.screenShareTrackPublishDefaults
287 * @see ScreenAudioCapturer 289 * @see ScreenAudioCapturer
288 */ 290 */
  291 + @Throws(TrackException.PublishException::class)
289 suspend fun setScreenShareEnabled( 292 suspend fun setScreenShareEnabled(
290 enabled: Boolean, 293 enabled: Boolean,
291 mediaProjectionPermissionResultData: Intent? = null, 294 mediaProjectionPermissionResultData: Intent? = null,
@@ -481,7 +484,27 @@ internal constructor( @@ -481,7 +484,27 @@ internal constructor(
481 ) 484 )
482 } 485 }
483 486
  487 + private fun hasPermissionsToPublish(source: Track.Source): Boolean {
  488 + val permissions = this.permissions
  489 + if (permissions == null) {
  490 + LKLog.w { "No permissions present for publishing track." }
  491 + return false
  492 + }
  493 + val canPublish = permissions.canPublish
  494 + val canPublishSources = permissions.canPublishSources
  495 +
  496 + val sourceAllowed = canPublishSources.contains(source)
  497 +
  498 + if (canPublish && (canPublishSources.isEmpty() || sourceAllowed)) {
  499 + return true
  500 + }
  501 +
  502 + LKLog.w { "insufficient permissions to publish" }
  503 + return false
  504 + }
  505 +
484 /** 506 /**
  507 + * @throws TrackException.PublishException thrown when the publish fails. see [TrackException.PublishException.message] for details.
485 * @return true if the track publish was successful. 508 * @return true if the track publish was successful.
486 */ 509 */
487 private suspend fun publishTrackImpl( 510 private suspend fun publishTrackImpl(
@@ -491,6 +514,15 @@ internal constructor( @@ -491,6 +514,15 @@ internal constructor(
491 encodings: List<RtpParameters.Encoding> = emptyList(), 514 encodings: List<RtpParameters.Encoding> = emptyList(),
492 publishListener: PublishListener? = null, 515 publishListener: PublishListener? = null,
493 ): LocalTrackPublication? { 516 ): LocalTrackPublication? {
  517 + val addTrackRequestBuilder = AddTrackRequest.newBuilder().apply {
  518 + this.requestConfig()
  519 + }
  520 +
  521 + val trackSource = Track.Source.fromProto(addTrackRequestBuilder.source ?: LivekitModels.TrackSource.UNRECOGNIZED)
  522 + if (!hasPermissionsToPublish(trackSource)) {
  523 + throw TrackException.PublishException("Failed to publish track, insufficient permissions")
  524 + }
  525 +
494 @Suppress("NAME_SHADOWING") var options = options 526 @Suppress("NAME_SHADOWING") var options = options
495 527
496 @Suppress("NAME_SHADOWING") var encodings = encodings 528 @Suppress("NAME_SHADOWING") var encodings = encodings
@@ -564,17 +596,13 @@ internal constructor( @@ -564,17 +596,13 @@ internal constructor(
564 } 596 }
565 597
566 suspend fun requestAddTrack(): TrackInfo { 598 suspend fun requestAddTrack(): TrackInfo {
567 - val builder = AddTrackRequest.newBuilder().apply {  
568 - this.requestConfig()  
569 - }  
570 -  
571 return try { 599 return try {
572 engine.addTrack( 600 engine.addTrack(
573 cid = cid, 601 cid = cid,
574 name = options.name ?: track.name, 602 name = options.name ?: track.name,
575 kind = track.kind.toProto(), 603 kind = track.kind.toProto(),
576 stream = options.stream, 604 stream = options.stream,
577 - builder = builder, 605 + builder = addTrackRequestBuilder,
578 ) 606 )
579 } catch (e: Exception) { 607 } catch (e: Exception) {
580 val exception = TrackException.PublishException("Failed to publish track", e) 608 val exception = TrackException.PublishException("Failed to publish track", e)
@@ -1362,8 +1390,12 @@ internal constructor( @@ -1362,8 +1390,12 @@ internal constructor(
1362 ) 1390 )
1363 } 1391 }
1364 negotiateJob.join() 1392 negotiateJob.join()
1365 - val trackInfo = publishJob.await()  
1366 - LKLog.d { "published $codec for track ${track.sid}, $trackInfo" } 1393 + try {
  1394 + val trackInfo = publishJob.await()
  1395 + LKLog.d { "published $codec for track ${track.sid}, $trackInfo" }
  1396 + } catch (e: Exception) {
  1397 + LKLog.w(e) { "exception when publishing $codec for track ${track.sid}" }
  1398 + }
1367 } 1399 }
1368 } 1400 }
1369 1401
@@ -591,6 +591,9 @@ data class ParticipantPermission( @@ -591,6 +591,9 @@ data class ParticipantPermission(
591 val canPublishData: Boolean, 591 val canPublishData: Boolean,
592 val hidden: Boolean, 592 val hidden: Boolean,
593 val recorder: Boolean, 593 val recorder: Boolean,
  594 + /**
  595 + * The list of allowed sources. If this is empty, then all sources are allowed.
  596 + */
594 val canPublishSources: List<Track.Source>, 597 val canPublishSources: List<Track.Source>,
595 val canUpdateMetadata: Boolean, 598 val canUpdateMetadata: Boolean,
596 val canSubscribeMetrics: Boolean, 599 val canSubscribeMetrics: Boolean,
@@ -56,13 +56,15 @@ object TestData { @@ -56,13 +56,15 @@ object TestData {
56 identity = "local_participant_identity" 56 identity = "local_participant_identity"
57 state = LivekitModels.ParticipantInfo.State.ACTIVE 57 state = LivekitModels.ParticipantInfo.State.ACTIVE
58 metadata = "local_metadata" 58 metadata = "local_metadata"
59 - permission = LivekitModels.ParticipantPermission.newBuilder()  
60 - .setCanPublish(true)  
61 - .setCanSubscribe(true)  
62 - .setCanPublishData(true)  
63 - .setHidden(true)  
64 - .setRecorder(false)  
65 - .build() 59 + permission = with(LivekitModels.ParticipantPermission.newBuilder()) {
  60 + canPublish = true
  61 + canSubscribe = true
  62 + canPublishData = true
  63 +
  64 + hidden = false
  65 + recorder = false
  66 + build()
  67 + }
66 putAttributes("attribute", "value") 68 putAttributes("attribute", "value")
67 build() 69 build()
68 } 70 }
1 /* 1 /*
2 - * Copyright 2023-2024 LiveKit, Inc. 2 + * Copyright 2023-2025 LiveKit, Inc.
3 * 3 *
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
@@ -27,6 +27,7 @@ import io.livekit.android.room.DefaultsManager @@ -27,6 +27,7 @@ import io.livekit.android.room.DefaultsManager
27 import io.livekit.android.room.track.LocalVideoTrack 27 import io.livekit.android.room.track.LocalVideoTrack
28 import io.livekit.android.room.track.LocalVideoTrackOptions 28 import io.livekit.android.room.track.LocalVideoTrackOptions
29 import io.livekit.android.room.track.Track 29 import io.livekit.android.room.track.Track
  30 +import io.livekit.android.room.track.TrackException
30 import io.livekit.android.room.track.VideoCaptureParameter 31 import io.livekit.android.room.track.VideoCaptureParameter
31 import io.livekit.android.room.track.VideoCodec 32 import io.livekit.android.room.track.VideoCodec
32 import io.livekit.android.test.MockE2ETest 33 import io.livekit.android.test.MockE2ETest
@@ -46,6 +47,7 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi @@ -46,6 +47,7 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi
46 import kotlinx.coroutines.Job 47 import kotlinx.coroutines.Job
47 import kotlinx.coroutines.cancel 48 import kotlinx.coroutines.cancel
48 import kotlinx.coroutines.launch 49 import kotlinx.coroutines.launch
  50 +import kotlinx.coroutines.test.StandardTestDispatcher
49 import kotlinx.coroutines.test.advanceUntilIdle 51 import kotlinx.coroutines.test.advanceUntilIdle
50 import livekit.LivekitModels 52 import livekit.LivekitModels
51 import livekit.LivekitModels.AudioTrackFeature 53 import livekit.LivekitModels.AudioTrackFeature
@@ -66,6 +68,7 @@ import org.mockito.kotlin.argThat @@ -66,6 +68,7 @@ import org.mockito.kotlin.argThat
66 import org.robolectric.RobolectricTestRunner 68 import org.robolectric.RobolectricTestRunner
67 import org.robolectric.Shadows 69 import org.robolectric.Shadows
68 import java.nio.ByteBuffer 70 import java.nio.ByteBuffer
  71 +import kotlin.time.Duration.Companion.seconds
69 72
70 @ExperimentalCoroutinesApi 73 @ExperimentalCoroutinesApi
71 @RunWith(RobolectricTestRunner::class) 74 @RunWith(RobolectricTestRunner::class)
@@ -122,11 +125,23 @@ class LocalParticipantMockE2ETest : MockE2ETest() { @@ -122,11 +125,23 @@ class LocalParticipantMockE2ETest : MockE2ETest() {
122 wsFactory.unregisterSignalRequestHandler(wsFactory.defaultSignalRequestHandler) 125 wsFactory.unregisterSignalRequestHandler(wsFactory.defaultSignalRequestHandler)
123 wsFactory.ws.clearRequests() 126 wsFactory.ws.clearRequests()
124 127
125 - val backgroundScope = CoroutineScope(coroutineContext + Job()) 128 + val standardTestDispatcher = StandardTestDispatcher()
  129 + val backgroundScope = CoroutineScope(coroutineContext + Job() + standardTestDispatcher)
126 try { 130 try {
127 - backgroundScope.launch { room.localParticipant.setMicrophoneEnabled(true) }  
128 - backgroundScope.launch { room.localParticipant.setMicrophoneEnabled(true) } 131 + backgroundScope.launch {
  132 + try {
  133 + room.localParticipant.setMicrophoneEnabled(true)
  134 + } catch (_: Exception) {
  135 + }
  136 + }
  137 + backgroundScope.launch {
  138 + try {
  139 + room.localParticipant.setMicrophoneEnabled(true)
  140 + } catch (_: Exception) {
  141 + }
  142 + }
129 143
  144 + standardTestDispatcher.scheduler.advanceTimeBy(1.seconds.inWholeMilliseconds)
130 assertEquals(1, wsFactory.ws.sentRequests.size) 145 assertEquals(1, wsFactory.ws.sentRequests.size)
131 } finally { 146 } finally {
132 backgroundScope.cancel() 147 backgroundScope.cancel()
@@ -144,10 +159,23 @@ class LocalParticipantMockE2ETest : MockE2ETest() { @@ -144,10 +159,23 @@ class LocalParticipantMockE2ETest : MockE2ETest() {
144 wsFactory.unregisterSignalRequestHandler(wsFactory.defaultSignalRequestHandler) 159 wsFactory.unregisterSignalRequestHandler(wsFactory.defaultSignalRequestHandler)
145 wsFactory.ws.clearRequests() 160 wsFactory.ws.clearRequests()
146 161
147 - val backgroundScope = CoroutineScope(coroutineContext + Job()) 162 + val standardTestDispatcher = StandardTestDispatcher()
  163 + val backgroundScope = CoroutineScope(coroutineContext + Job() + standardTestDispatcher)
148 try { 164 try {
149 - backgroundScope.launch { room.localParticipant.setMicrophoneEnabled(true) }  
150 - backgroundScope.launch { room.localParticipant.setCameraEnabled(true) } 165 + backgroundScope.launch {
  166 + try {
  167 + room.localParticipant.setMicrophoneEnabled(true)
  168 + } catch (_: Exception) {
  169 + }
  170 + }
  171 + backgroundScope.launch {
  172 + try {
  173 + room.localParticipant.setCameraEnabled(true)
  174 + } catch (_: Exception) {
  175 + }
  176 + }
  177 +
  178 + standardTestDispatcher.scheduler.advanceTimeBy(1.seconds.inWholeMilliseconds)
151 179
152 assertEquals(2, wsFactory.ws.sentRequests.size) 180 assertEquals(2, wsFactory.ws.sentRequests.size)
153 } finally { 181 } finally {
@@ -534,4 +562,49 @@ class LocalParticipantMockE2ETest : MockE2ETest() { @@ -534,4 +562,49 @@ class LocalParticipantMockE2ETest : MockE2ETest() {
534 assertTrue(features.contains(AudioTrackFeature.TF_AUTO_GAIN_CONTROL)) 562 assertTrue(features.contains(AudioTrackFeature.TF_AUTO_GAIN_CONTROL))
535 assertFalse(features.contains(AudioTrackFeature.TF_ENHANCED_NOISE_CANCELLATION)) 563 assertFalse(features.contains(AudioTrackFeature.TF_ENHANCED_NOISE_CANCELLATION))
536 } 564 }
  565 +
  566 + @Test
  567 + fun lackOfPublishPermissionCausesException() = runTest {
  568 + val noCanPublishJoin = with(TestData.JOIN.toBuilder()) {
  569 + join = with(join.toBuilder()) {
  570 + participant = with(participant.toBuilder()) {
  571 + permission = with(permission.toBuilder()) {
  572 + canPublish = false
  573 + build()
  574 + }
  575 + build()
  576 + }
  577 + build()
  578 + }
  579 + build()
  580 + }
  581 + connect(noCanPublishJoin)
  582 +
  583 + var didThrow = false
  584 + try {
  585 + room.localParticipant.publishVideoTrack(createLocalTrack())
  586 + } catch (e: TrackException.PublishException) {
  587 + didThrow = true
  588 + }
  589 +
  590 + assertTrue(didThrow)
  591 + }
  592 +
  593 + @Test
  594 + fun publishWithNoResponseCausesException() = runTest {
  595 + connect()
  596 +
  597 + wsFactory.unregisterSignalRequestHandler(wsFactory.defaultSignalRequestHandler)
  598 + var didThrow = false
  599 + launch {
  600 + try {
  601 + room.localParticipant.publishVideoTrack(createLocalTrack())
  602 + } catch (e: TrackException.PublishException) {
  603 + didThrow = true
  604 + }
  605 + }
  606 +
  607 + coroutineRule.dispatcher.scheduler.advanceUntilIdle()
  608 + assertTrue(didThrow)
  609 + }
537 } 610 }