davidliu
Committed by GitHub

Fast track publication support (#612)

* protocol update

* Fast track publication support
---
"client-sdk-android": minor
---
Fast track publication support
... ...
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="AndroidLintVisibleForTests" enabled="true" level="WARNING" enabled_by_default="true">
<scope name="Library Projects" level="WARNING" enabled="false" />
</inspection_tool>
<inspection_tool class="MemberVisibilityCanBePrivate" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<scope name="Library Projects" level="WEAK WARNING" enabled="false" />
</inspection_tool>
<inspection_tool class="PreviewAnnotationInFunctionWithParameters" enabled="true" level="ERROR" enabled_by_default="true">
<option name="composableFile" value="true" />
<option name="previewFile" value="true" />
... ... @@ -37,5 +43,8 @@
<option name="composableFile" value="true" />
<option name="previewFile" value="true" />
</inspection_tool>
<inspection_tool class="UnusedSymbol" enabled="true" level="WARNING" enabled_by_default="true">
<scope name="Library Projects" level="WARNING" enabled="false" />
</inspection_tool>
</profile>
</component>
\ No newline at end of file
... ...
<component name="DependencyValidationManager">
<scope name="Library Projects" pattern="file[livekit-android.livekit-android-sdk*]:*//*||file[livekit-android.livekit-android-camerax*]:*//*||file[livekit-android.livekit-android-test*]:*//*||file[livekit-android.livekit-lint*]:*//*" />
</component>
\ No newline at end of file
... ...
/*
* Copyright 2023-2024 LiveKit, Inc.
* Copyright 2023-2025 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
... ... @@ -131,7 +131,7 @@ constructor(
return result
}
val negotiate = debounce<MediaConstraints?, Unit>(100, coroutineScope) {
val negotiate = debounce<MediaConstraints?, Unit>(20, coroutineScope) {
if (it != null) {
createAndSendOffer(it)
} else {
... ...
... ... @@ -156,7 +156,7 @@ internal constructor(
private val publisherObserver = PublisherTransportObserver(this, client)
private val subscriberObserver = SubscriberTransportObserver(this, client)
private var publisher: PeerConnectionTransport? = null
internal var publisher: PeerConnectionTransport? = null
private var subscriber: PeerConnectionTransport? = null
private var reliableDataChannel: DataChannel? = null
... ... @@ -214,7 +214,7 @@ internal constructor(
configure(joinResponse, options)
// create offer
if (!isSubscriberPrimary) {
if (!isSubscriberPrimary || joinResponse.fastPublish) {
negotiatePublisher()
}
client.onReadyForResponses()
... ... @@ -1082,6 +1082,10 @@ internal constructor(
LivekitModels.DataPacket.ValueCase.STREAM_CHUNK -> {
// TODO
}
LivekitModels.DataPacket.ValueCase.STREAM_TRAILER -> {
// TODO
}
}
}
... ...
... ... @@ -585,6 +585,7 @@ constructor(
}
localParticipant.updateFromInfo(response.participant)
localParticipant.setEnabledPublishCodecs(response.enabledPublishCodecsList)
if (response.otherParticipantsList.isNotEmpty()) {
response.otherParticipantsList.forEach { info ->
... ...
... ... @@ -57,6 +57,7 @@ import io.livekit.android.util.flow
import io.livekit.android.webrtc.sortVideoCodecPreferences
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
... ... @@ -64,7 +65,9 @@ import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import livekit.LivekitModels
import livekit.LivekitModels.Codec
import livekit.LivekitModels.DataPacket
import livekit.LivekitModels.TrackInfo
import livekit.LivekitRtc
import livekit.LivekitRtc.AddTrackRequest
import livekit.LivekitRtc.SimulcastCodec
... ... @@ -127,8 +130,9 @@ internal constructor(
// For ensuring that only one caller can execute setTrackEnabled at a time.
// Without it, there's a potential to create multiple of the same source,
// Camera has deadlock issues with multiple CameraCapturers trying to activate/stop.
private val sourcePubLocks = Track.Source.values()
.associate { source -> source to Mutex() }
private val sourcePubLocks = Track.Source.entries.associateWith { Mutex() }
internal val enabledPublishVideoCodecs = Collections.synchronizedList(mutableListOf<Codec>())
/**
* Creates an audio track, recording audio through the microphone with the given [options].
... ... @@ -405,9 +409,26 @@ internal constructor(
options: VideoTrackPublishOptions = VideoTrackPublishOptions(null, videoTrackPublishDefaults),
publishListener: PublishListener? = null,
) {
@Suppress("NAME_SHADOWING") var options = options
synchronized(enabledPublishVideoCodecs) {
if (enabledPublishVideoCodecs.isNotEmpty()) {
if (enabledPublishVideoCodecs.none { allowedCodec -> allowedCodec.mime.mimeTypeToVideoCodec() == options.videoCodec }) {
val oldCodec = options.videoCodec
val newCodec = enabledPublishVideoCodecs
.firstOrNull { it.mime.mimeTypeToVideoCodec() != null }
?.mime?.mimeTypeToVideoCodec()
if (newCodec != null) {
LKLog.w { "$oldCodec not enabled on server, falling back to supported codec $newCodec" }
options = options.copy(videoCodec = newCodec)
}
}
}
}
val isSVC = isSVCCodec(options.videoCodec)
@Suppress("NAME_SHADOWING") var options = options
if (isSVC) {
dynacast = true
... ... @@ -478,39 +499,16 @@ internal constructor(
return null
}
val cid = track.rtcTrack.id()
val builder = AddTrackRequest.newBuilder().apply {
this.requestConfig()
if (engine.connectionState == ConnectionState.DISCONNECTED) {
publishListener?.onPublishFailure(TrackException.PublishException("Not connected!"))
}
val trackInfo = try {
engine.addTrack(
cid = cid,
name = options.name ?: track.name,
kind = track.kind.toProto(),
stream = options.stream,
builder = builder,
)
} catch (e: Exception) {
publishListener?.onPublishFailure(TrackException.PublishException("Failed to publish track", e))
return null
}
if (options is VideoTrackPublishOptions) {
// server might not support the codec the client has requested, in that case, fallback
// to a supported codec
val primaryCodecMime = trackInfo.codecsList.firstOrNull()?.mimeType
if (primaryCodecMime != null) {
val updatedCodec = primaryCodecMime.mimeTypeToVideoCodec()
if (updatedCodec != null && updatedCodec != options.videoCodec) {
LKLog.d { "falling back to server selected codec: $updatedCodec" }
options = options.copy(videoCodec = updatedCodec)
val cid = track.rtcTrack.id()
// recompute encodings since bitrates/etc could have changed
encodings = computeVideoEncodings((track as LocalVideoTrack).dimensions, options)
}
}
// For fast publish, we can negotiate PC and request add track at the same time
suspend fun negotiate() {
if (this.engine.publisher == null) {
throw IllegalStateException("publisher is not configured yet!")
}
val transInit = RtpTransceiverInit(
... ... @@ -529,35 +527,96 @@ internal constructor(
}
if (transceiver == null) {
publishListener?.onPublishFailure(TrackException.PublishException("null sender returned from peer connection"))
return null
val exception = TrackException.PublishException("null sender returned from peer connection")
publishListener?.onPublishFailure(exception)
throw exception
}
track.statsGetter = engine.createStatsGetter(transceiver.sender)
val finalOptions = options
// Handle trackBitrates
if (encodings.isNotEmpty()) {
if (options is VideoTrackPublishOptions && isSVCCodec(options.videoCodec) && encodings.firstOrNull()?.maxBitrateBps != null) {
if (finalOptions is VideoTrackPublishOptions && isSVCCodec(finalOptions.videoCodec) && encodings.firstOrNull()?.maxBitrateBps != null) {
engine.registerTrackBitrateInfo(
cid = cid,
TrackBitrateInfo(
codec = options.videoCodec,
codec = finalOptions.videoCodec,
maxBitrate = (encodings.first().maxBitrateBps?.div(1000) ?: 0).toLong(),
),
)
}
}
if (options is VideoTrackPublishOptions) {
if (finalOptions is VideoTrackPublishOptions) {
// Set preferred video codec order
transceiver.sortVideoCodecPreferences(options.videoCodec, capabilitiesGetter)
(track as LocalVideoTrack).codec = options.videoCodec
transceiver.sortVideoCodecPreferences(finalOptions.videoCodec, capabilitiesGetter)
(track as LocalVideoTrack).codec = finalOptions.videoCodec
val rtpParameters = transceiver.sender.parameters
rtpParameters.degradationPreference = options.degradationPreference
rtpParameters.degradationPreference = finalOptions.degradationPreference
transceiver.sender.parameters = rtpParameters
}
// PublisherTransportObserver.onRenegotiationNeeded() gets triggered automatically
// so no need to call negotiate manually.
}
suspend fun requestAddTrack(): TrackInfo {
val builder = AddTrackRequest.newBuilder().apply {
this.requestConfig()
}
return try {
engine.addTrack(
cid = cid,
name = options.name ?: track.name,
kind = track.kind.toProto(),
stream = options.stream,
builder = builder,
)
} catch (e: Exception) {
val exception = TrackException.PublishException("Failed to publish track", e)
publishListener?.onPublishFailure(exception)
throw exception
}
}
val trackInfo: TrackInfo
if (enabledPublishVideoCodecs.isNotEmpty()) {
// Can simultaneous publish and negotiate.
// codec is pre-verified in publishVideoTrack
trackInfo = coroutineScope {
val negotiateJob = launch { negotiate() }
val publishJob = async { requestAddTrack() }
negotiateJob.join()
return@coroutineScope publishJob.await()
}
} else {
// legacy path.
trackInfo = requestAddTrack()
if (options is VideoTrackPublishOptions) {
// server might not support the codec the client has requested, in that case, fallback
// to a supported codec
val primaryCodecMime = trackInfo.codecsList.firstOrNull()?.mimeType
if (primaryCodecMime != null) {
val updatedCodec = primaryCodecMime.mimeTypeToVideoCodec()
if (updatedCodec != null && updatedCodec != options.videoCodec) {
LKLog.d { "falling back to server selected codec: $updatedCodec" }
options = options.copy(videoCodec = updatedCodec)
// recompute encodings since bitrates/etc could have changed
encodings = computeVideoEncodings((track as LocalVideoTrack).dimensions, options)
}
}
}
negotiate()
}
val publication = LocalTrackPublication(
info = trackInfo,
track = track,
... ... @@ -1266,13 +1325,8 @@ internal constructor(
LKLog.w { "couldn't create new transceiver! $codec" }
return@launch
}
transceiver.sortVideoCodecPreferences(newOptions.videoCodec, capabilitiesGetter)
simulcastTrack.sender = transceiver.sender
val trackRequest = AddTrackRequest.newBuilder().apply {
cid = transceiver.sender.id()
sid = existingPublication.sid
type = track.kind.toProto()
muted = !track.enabled
source = existingPublication.source.toProto()
addSimulcastCodecs(
... ... @@ -1291,17 +1345,23 @@ internal constructor(
),
)
}
val negotiateJob = launch {
transceiver.sortVideoCodecPreferences(newOptions.videoCodec, capabilitiesGetter)
simulcastTrack.sender = transceiver.sender
val trackInfo = engine.addTrack(
engine.negotiatePublisher()
}
val publishJob = async {
engine.addTrack(
cid = simulcastTrack.rtcTrack.id(),
name = existingPublication.name,
kind = existingPublication.kind.toProto(),
stream = options.stream,
builder = trackRequest,
)
engine.negotiatePublisher()
}
negotiateJob.join()
val trackInfo = publishJob.await()
LKLog.d { "published $codec for track ${track.sid}, $trackInfo" }
}
}
... ... @@ -1360,6 +1420,20 @@ internal constructor(
eventBus.postEvent(ParticipantEvent.LocalTrackSubscribed(this, publication), scope)
}
internal fun setEnabledPublishCodecs(codecs: List<Codec>) {
synchronized(enabledPublishVideoCodecs) {
enabledPublishVideoCodecs.clear()
enabledPublishVideoCodecs.addAll(
codecs.filter { codec ->
codec.mime.split('/')
.takeIf { it.isNotEmpty() }
?.get(0)
?.lowercase() == "video"
},
)
}
}
/**
* @suppress
*/
... ... @@ -1386,6 +1460,7 @@ internal constructor(
*/
override fun dispose() {
cleanup()
enabledPublishVideoCodecs.clear()
super.dispose()
}
... ...
/*
* Copyright 2023-2024 LiveKit, Inc.
* Copyright 2023-2025 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
... ... @@ -85,7 +85,7 @@ enum class VideoCodec(val codecName: String) {
companion object {
fun fromCodecName(codecName: String): VideoCodec {
return VideoCodec.values().first { it.codecName.equals(codecName, ignoreCase = true) }
return entries.first { it.codecName.equals(codecName, ignoreCase = true) }
}
}
}
... ...
/*
* Copyright 2023-2024 LiveKit, Inc.
* Copyright 2023-2025 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
... ... @@ -18,9 +18,12 @@ package io.livekit.android.test.mock
import livekit.org.webrtc.RtpReceiver
import org.mockito.Mockito
import org.mockito.kotlin.whenever
object MockRtpReceiver {
fun create(): RtpReceiver {
return Mockito.mock(RtpReceiver::class.java)
fun create(id: String = "receiver_id"): RtpReceiver {
return Mockito.mock(RtpReceiver::class.java).apply {
whenever(this.id()).thenReturn(id)
}
}
}
... ...
/*
* Copyright 2023-2024 LiveKit, Inc.
* Copyright 2023-2025 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
... ... @@ -25,7 +25,7 @@ import org.mockito.kotlin.whenever
import java.util.UUID
object MockRtpSender {
fun create(): RtpSender {
fun create(id: String = "sender_id"): RtpSender {
var rtpParameters: RtpParameters = MockRtpParameters(
transactionId = UUID.randomUUID().toString(),
degradationPreference = null,
... ... @@ -36,6 +36,7 @@ object MockRtpSender {
)
return Mockito.mock(RtpSender::class.java).apply {
whenever(this.parameters).thenAnswer { rtpParameters }
whenever(this.id()).thenReturn(id)
whenever(this.setParameters(any())).thenAnswer {
rtpParameters = it.getArgument(0)
true
... ...
/*
* Copyright 2023-2024 LiveKit, Inc.
* Copyright 2023-2025 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
... ... @@ -20,6 +20,7 @@ import io.livekit.android.test.mock.MockRtpReceiver
import io.livekit.android.test.mock.MockRtpSender
import livekit.org.webrtc.RtpTransceiver.RtpTransceiverDirection
import org.mockito.Mockito
import java.util.UUID
object MockRtpTransceiver {
fun create(
... ... @@ -27,7 +28,7 @@ object MockRtpTransceiver {
init: RtpTransceiver.RtpTransceiverInit = RtpTransceiver.RtpTransceiverInit(),
): RtpTransceiver {
val mock = Mockito.mock(RtpTransceiver::class.java)
val id = UUID.randomUUID().toString()
Mockito.`when`(mock.mediaType).then {
return@then when (track.kind()) {
MediaStreamTrack.AUDIO_TRACK_KIND -> MediaStreamTrack.MediaType.MEDIA_TYPE_AUDIO
... ... @@ -40,7 +41,7 @@ object MockRtpTransceiver {
when (direction) {
RtpTransceiverDirection.SEND_RECV, RtpTransceiverDirection.SEND_ONLY -> {
val sender = MockRtpSender.create()
val sender = MockRtpSender.create(id = id)
Mockito.`when`(mock.sender)
.then { sender }
}
... ... @@ -50,7 +51,7 @@ object MockRtpTransceiver {
when (direction) {
RtpTransceiverDirection.SEND_RECV, RtpTransceiverDirection.RECV_ONLY -> {
val receiver = MockRtpReceiver.create()
val receiver = MockRtpReceiver.create(id = id)
Mockito.`when`(mock.receiver)
.then { receiver }
}
... ...
Subproject commit 9e8d1e37c5eb4434424bc16c657c83e7dc63bc2a
Subproject commit 02ee5e6947593443d0dfc90cae0b27ce03b6c1fe
... ...