Jean Kruger
Committed by GitHub

Improve track publish failure handling (#637)

* Avoid exception on request add track failure

* Stop audio and video track on publish failure

* Start the video capture on connect if requested

* Fire a room event when the track publication failed

* Add changeset

* Fixes unit tests

* Update livekit-android-sdk/src/main/java/io/livekit/android/room/participant/LocalParticipant.kt

* Also return true from setDeviceEnabled

---------

Co-authored-by: davidliu <davidliu@deviange.net>
---
"client-sdk-android": patch
---
Improved handling of track publication failures by introducing a new TrackPublicationFailed event and fixing a broken state issue where the track remained active but inaccessible, causing the microphone or camera to stay on without a published track and leading to unreliable republishing.
... ...
/*
* Copyright 2023-2024 LiveKit, Inc.
* Copyright 2023-2025 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
... ... @@ -23,6 +23,7 @@ import io.livekit.android.room.participant.RemoteParticipant
import io.livekit.android.room.track.LocalTrackPublication
import io.livekit.android.room.track.RemoteTrackPublication
import io.livekit.android.room.track.Track
import io.livekit.android.room.track.TrackException
import io.livekit.android.room.track.TrackPublication
import io.livekit.android.room.types.TranscriptionSegment
... ... @@ -89,6 +90,15 @@ sealed class ParticipantEvent(open val participant: Participant) : Event() {
ParticipantEvent(participant)
/**
* Error had occurred while publishing a track
*/
class LocalTrackPublicationFailed(
override val participant: LocalParticipant,
val track: Track,
val e: TrackException.PublishException,
) : ParticipantEvent(participant)
/**
* A [LocalParticipant] has unpublished a track
*/
class LocalTrackUnpublished(override val participant: LocalParticipant, val publication: LocalTrackPublication) :
... ...
... ... @@ -27,6 +27,7 @@ import io.livekit.android.room.participant.RemoteParticipant
import io.livekit.android.room.track.LocalTrackPublication
import io.livekit.android.room.track.RemoteTrackPublication
import io.livekit.android.room.track.Track
import io.livekit.android.room.track.TrackException
import io.livekit.android.room.track.TrackPublication
import io.livekit.android.room.types.TranscriptionSegment
import livekit.LivekitModels
... ... @@ -139,6 +140,17 @@ sealed class RoomEvent(val room: Room) : Event() {
class TrackPublished(room: Room, val publication: TrackPublication, val participant: Participant) : RoomEvent(room)
/**
* Error had occurred while publishing a track, for LocalParticipant only
* not fire for tracks that are already published
*/
class TrackPublicationFailed(
room: Room,
val track: Track,
val participant: LocalParticipant,
e: TrackException.PublishException,
) : RoomEvent(room)
/**
* A [Participant] has unpublished a track
*/
class TrackUnpublished(room: Room, val publication: TrackPublication, val participant: Participant) :
... ...
... ... @@ -479,14 +479,21 @@ constructor(
ensureActive()
networkCallbackManager.registerCallback()
if (options.audio) {
val audioTrack = localParticipant.createAudioTrack()
val audioTrack = localParticipant.getOrCreateDefaultAudioTrack()
audioTrack.prewarm()
localParticipant.publishAudioTrack(audioTrack)
if (!localParticipant.publishAudioTrack(audioTrack)) {
audioTrack.stop()
audioTrack.stopPrewarm()
}
}
ensureActive()
if (options.video) {
val videoTrack = localParticipant.createVideoTrack()
localParticipant.publishVideoTrack(videoTrack)
val videoTrack = localParticipant.getOrCreateDefaultVideoTrack()
videoTrack.startCapture()
if (!localParticipant.publishVideoTrack(videoTrack)) {
videoTrack.stopCapture()
videoTrack.stop()
}
}
coroutineScope.launch {
... ... @@ -612,6 +619,15 @@ constructor(
),
)
is ParticipantEvent.LocalTrackPublicationFailed -> emitWhenConnected(
RoomEvent.TrackPublicationFailed(
room = this@Room,
track = it.track,
participant = it.participant,
e = it.e,
),
)
is ParticipantEvent.TrackUnpublished -> emitWhenConnected(
RoomEvent.TrackUnpublished(
room = this@Room,
... ...
... ... @@ -137,6 +137,29 @@ internal constructor(
internal val enabledPublishVideoCodecs = Collections.synchronizedList(mutableListOf<Codec>())
private var defaultAudioTrack: LocalAudioTrack? = null
private var defaultVideoTrack: LocalVideoTrack? = null
/**
* Returns the default audio track, or creates one if it doesn't exist.
* @exception SecurityException will be thrown if [Manifest.permission.RECORD_AUDIO] permission is missing.
*/
fun getOrCreateDefaultAudioTrack(): LocalAudioTrack {
return defaultAudioTrack ?: createAudioTrack().also {
defaultAudioTrack = it
}
}
/**
* Returns the default video track, or creates one if it doesn't exist.
* @exception SecurityException will be thrown if [Manifest.permission.CAMERA] permission is missing.
*/
fun getOrCreateDefaultVideoTrack(): LocalVideoTrack {
return defaultVideoTrack ?: createVideoTrack().also {
defaultVideoTrack = it
}
}
/**
* Creates an audio track, recording audio through the microphone with the given [options].
*
... ... @@ -256,10 +279,11 @@ internal constructor(
*
* @see Room.videoTrackCaptureDefaults
* @see Room.videoTrackPublishDefaults
* @return true if the change was successful, or false if it failed.
*/
@Throws(TrackException.PublishException::class)
suspend fun setCameraEnabled(enabled: Boolean) {
setTrackEnabled(Track.Source.CAMERA, enabled)
suspend fun setCameraEnabled(enabled: Boolean): Boolean {
return setTrackEnabled(Track.Source.CAMERA, enabled)
}
/**
... ... @@ -271,10 +295,11 @@ internal constructor(
*
* @see Room.audioTrackCaptureDefaults
* @see Room.audioTrackPublishDefaults
* @return true if the change was successful, or false if it failed.
*/
@Throws(TrackException.PublishException::class)
suspend fun setMicrophoneEnabled(enabled: Boolean) {
setTrackEnabled(Track.Source.MICROPHONE, enabled)
suspend fun setMicrophoneEnabled(enabled: Boolean): Boolean {
return setTrackEnabled(Track.Source.MICROPHONE, enabled)
}
/**
... ... @@ -292,41 +317,58 @@ internal constructor(
* @see Room.screenShareTrackCaptureDefaults
* @see Room.screenShareTrackPublishDefaults
* @see ScreenAudioCapturer
* @return true if the change was successful, or false if it failed.
*/
@Throws(TrackException.PublishException::class)
suspend fun setScreenShareEnabled(
enabled: Boolean,
screenCaptureParams: ScreenCaptureParams? = null,
) {
setTrackEnabled(Track.Source.SCREEN_SHARE, enabled, screenCaptureParams)
): Boolean {
return setTrackEnabled(Track.Source.SCREEN_SHARE, enabled, screenCaptureParams)
}
private suspend fun setTrackEnabled(
source: Track.Source,
enabled: Boolean,
screenCaptureParams: ScreenCaptureParams? = null,
) {
): Boolean {
var success = false
val pubLock = sourcePubLocks[source]!!
pubLock.withLock {
val pub = getTrackPublication(source)
if (enabled) {
if (pub != null) {
// Publication exists, just unmute the existing track.
pub.muted = false
if (source == Track.Source.CAMERA && pub.track is LocalVideoTrack) {
(pub.track as? LocalVideoTrack)?.startCapture()
}
success = true
} else {
// Not published yet, create the default track and publish.
when (source) {
Track.Source.CAMERA -> {
val track = createVideoTrack()
val track = getOrCreateDefaultVideoTrack()
track.start()
track.startCapture()
publishVideoTrack(track)
if (!publishVideoTrack(track)) {
track.stopCapture()
track.stop()
} else {
success = true
}
}
Track.Source.MICROPHONE -> {
val track = createAudioTrack()
val track = getOrCreateDefaultAudioTrack()
track.prewarm()
publishAudioTrack(track)
track.start()
if (!publishAudioTrack(track)) {
track.stop()
track.stopPrewarm()
} else {
success = true
}
}
Track.Source.SCREEN_SHARE -> {
... ... @@ -340,7 +382,16 @@ internal constructor(
}
track.startForegroundService(screenCaptureParams.notificationId, screenCaptureParams.notification)
track.startCapture()
publishVideoTrack(track, options = VideoTrackPublishOptions(null, screenShareTrackPublishDefaults))
if (!publishVideoTrack(track, options = VideoTrackPublishOptions(null, screenShareTrackPublishDefaults))) {
screenCaptureParams.onStop?.invoke()
track.apply {
stopCapture()
stop()
dispose()
}
} else {
success = true
}
}
else -> {
... ... @@ -362,9 +413,12 @@ internal constructor(
}
}
}
success = true
}
return@withLock
}
return success
}
/**
... ... @@ -380,7 +434,7 @@ internal constructor(
audioTrackPublishDefaults,
),
publishListener: PublishListener? = null,
) {
): Boolean {
val encodings = listOf(
RtpParameters.Encoding(null, true, null).apply {
if (options.audioBitrate != null && options.audioBitrate > 0) {
... ... @@ -408,6 +462,8 @@ internal constructor(
}
jobs[publication] = job
}
return publication != null
}
/**
... ... @@ -420,7 +476,7 @@ internal constructor(
track: LocalVideoTrack,
options: VideoTrackPublishOptions = VideoTrackPublishOptions(null, videoTrackPublishDefaults),
publishListener: PublishListener? = null,
) {
): Boolean {
@Suppress("NAME_SHADOWING") var options = options
synchronized(enabledPublishVideoCodecs) {
... ... @@ -456,7 +512,7 @@ internal constructor(
val videoLayers =
EncodingUtils.videoLayersFromEncodings(track.dimensions.width, track.dimensions.height, encodings, isSVC)
publishTrackImpl(
return publishTrackImpl(
track = track,
options = options,
requestConfig = {
... ... @@ -489,7 +545,7 @@ internal constructor(
},
encodings = encodings,
publishListener = publishListener,
)
) != null
}
private fun hasPermissionsToPublish(source: Track.Source): Boolean {
... ... @@ -522,13 +578,22 @@ internal constructor(
encodings: List<RtpParameters.Encoding> = emptyList(),
publishListener: PublishListener? = null,
): LocalTrackPublication? {
fun onPublishFailure(e: TrackException.PublishException, triggerEvent: Boolean = true) {
publishListener?.onPublishFailure(e)
if (triggerEvent) {
eventBus.postEvent(ParticipantEvent.LocalTrackPublicationFailed(this, track, e), scope)
}
}
val addTrackRequestBuilder = AddTrackRequest.newBuilder().apply {
this.requestConfig()
}
val trackSource = Track.Source.fromProto(addTrackRequestBuilder.source ?: LivekitModels.TrackSource.UNRECOGNIZED)
if (!hasPermissionsToPublish(trackSource)) {
throw TrackException.PublishException("Failed to publish track, insufficient permissions")
val exception = TrackException.PublishException("Failed to publish track, insufficient permissions")
onPublishFailure(exception)
throw exception
}
@Suppress("NAME_SHADOWING") var options = options
... ... @@ -536,12 +601,12 @@ internal constructor(
@Suppress("NAME_SHADOWING") var encodings = encodings
if (localTrackPublications.any { it.track == track }) {
publishListener?.onPublishFailure(TrackException.PublishException("Track has already been published"))
onPublishFailure(TrackException.PublishException("Track has already been published"), triggerEvent = false)
return null
}
if (engine.connectionState == ConnectionState.DISCONNECTED) {
publishListener?.onPublishFailure(TrackException.PublishException("Not connected!"))
onPublishFailure(TrackException.PublishException("Not connected!"))
}
val cid = track.rtcTrack.id()
... ... @@ -569,7 +634,7 @@ internal constructor(
if (transceiver == null) {
val exception = TrackException.PublishException("null sender returned from peer connection")
publishListener?.onPublishFailure(exception)
onPublishFailure(exception)
throw exception
}
... ... @@ -603,7 +668,7 @@ internal constructor(
// so no need to call negotiate manually.
}
suspend fun requestAddTrack(): TrackInfo {
suspend fun requestAddTrack(): TrackInfo? {
return try {
engine.addTrack(
cid = cid,
... ... @@ -613,13 +678,12 @@ internal constructor(
builder = addTrackRequestBuilder,
)
} catch (e: Exception) {
val exception = TrackException.PublishException("Failed to publish track", e)
publishListener?.onPublishFailure(exception)
throw exception
onPublishFailure(TrackException.PublishException("Failed to publish track", e))
null
}
}
val trackInfo: TrackInfo
val trackInfo: TrackInfo?
if (enabledPublishVideoCodecs.isNotEmpty()) {
// Can simultaneous publish and negotiate.
// codec is pre-verified in publishVideoTrack
... ... @@ -633,7 +697,7 @@ internal constructor(
} else {
// legacy path.
trackInfo = requestAddTrack()
if (trackInfo != null) {
if (options is VideoTrackPublishOptions) {
// server might not support the codec the client has requested, in that case, fallback
// to a supported codec
... ... @@ -653,7 +717,9 @@ internal constructor(
negotiate()
}
}
return if (trackInfo != null) {
val publication = LocalTrackPublication(
info = trackInfo,
track = track,
... ... @@ -666,8 +732,10 @@ internal constructor(
publishListener?.onPublishSuccess(publication)
internalListener?.onTrackPublished(publication, this)
eventBus.postEvent(ParticipantEvent.LocalTrackPublished(this, publication), scope)
return publication
publication
} else {
null
}
}
private fun computeVideoEncodings(
... ... @@ -1443,11 +1511,14 @@ internal constructor(
unpublishTrack(track, false)
// Cannot publish muted tracks.
if (!pub.muted) {
when (track) {
val success = when (track) {
is LocalAudioTrack -> publishAudioTrack(track, pub.options as AudioTrackPublishOptions, null)
is LocalVideoTrack -> publishVideoTrack(track, pub.options as VideoTrackPublishOptions, null)
else -> throw IllegalStateException("LocalParticipant has a non local track publish?")
}
if (!success) {
track.stop()
}
}
}
}
... ... @@ -1479,6 +1550,10 @@ internal constructor(
* @suppress
*/
fun cleanup() {
defaultAudioTrack?.dispose()
defaultAudioTrack = null
defaultVideoTrack?.dispose()
defaultVideoTrack = null
for (pub in trackPublications.values) {
val track = pub.track
... ...
... ... @@ -591,20 +591,21 @@ class LocalParticipantMockE2ETest : MockE2ETest() {
}
@Test
fun publishWithNoResponseCausesException() = runTest {
fun publishWithNoResponseReturnFalseWithoutException() = runTest {
connect()
wsFactory.unregisterSignalRequestHandler(wsFactory.defaultSignalRequestHandler)
var didThrow = false
var success: Boolean? = null
launch {
try {
room.localParticipant.publishVideoTrack(createLocalTrack())
success = room.localParticipant.publishVideoTrack(createLocalTrack())
} catch (e: TrackException.PublishException) {
didThrow = true
}
}
coroutineRule.dispatcher.scheduler.advanceUntilIdle()
assertTrue(didThrow)
assertTrue(!didThrow && success == false)
}
}
... ...