davidliu
Committed by GitHub

handle subscribedqualityupdate (#35)

* update protocol

* OnSubscribedQualityUpdate

* fix addTransceiver

* fix bug

* ignore subscribed quality updates from 0.15.1 and below

* fix transceivers not getting hooked up

* add dynacast option and rename autoManageVideo to adaptiveStream
... ... @@ -50,7 +50,7 @@ class LiveKit {
options.videoTrackPublishDefaults?.let {
room.videoTrackPublishDefaults = it
}
room.autoManageVideo = options.autoManageVideo
room.adaptiveStream = options.adaptiveStream
return room
}
... ...
package io.livekit.android
import io.livekit.android.room.Room
import io.livekit.android.room.participant.AudioTrackPublishDefaults
import io.livekit.android.room.participant.VideoTrackPublishDefaults
import io.livekit.android.room.track.LocalAudioTrackOptions
... ... @@ -7,14 +8,14 @@ import io.livekit.android.room.track.LocalVideoTrackOptions
data class RoomOptions(
/**
* Automatically manage quality of subscribed video tracks, subscribe to the
* an appropriate resolution based on the size of the video elements that tracks
* are attached to.
*
* Also observes the visibility of attached tracks and pauses receiving data
* if they are not visible.
* @see [Room.adaptiveStream]
*/
val autoManageVideo: Boolean = false,
val adaptiveStream: Boolean = false,
/**
* @see [Room.dynacast]
*/
val dynacast: Boolean = false,
val audioTrackCaptureDefaults: LocalAudioTrackOptions? = null,
val videoTrackCaptureDefaults: LocalVideoTrackOptions? = null,
... ...
... ... @@ -385,6 +385,7 @@ internal constructor(
fun onFailToConnect(error: Throwable)
fun onUserPacket(packet: LivekitModels.UserPacket, kind: LivekitModels.DataPacket.Kind)
fun onStreamStateUpdate(streamStates: List<LivekitRtc.StreamStateInfo>)
fun onSubscribedQualityUpdate(subscribedQualityUpdate: LivekitRtc.SubscribedQualityUpdate)
}
companion object {
... ... @@ -526,6 +527,10 @@ internal constructor(
listener?.onStreamStateUpdate(streamStates)
}
override fun onSubscribedQualityUpdate(subscribedQualityUpdate: LivekitRtc.SubscribedQualityUpdate) {
listener?.onSubscribedQualityUpdate(subscribedQualityUpdate)
}
//--------------------------------- DataChannel.Observer ------------------------------------//
override fun onBufferedAmountChange(previousAmount: Long) {
... ...
... ... @@ -84,8 +84,18 @@ constructor(
*
* Also observes the visibility of attached tracks and pauses receiving data
* if they are not visible.
*
* Defaults to false.
*/
var adaptiveStream: Boolean = false
/**
* Dynamically pauses video layers that are not being consumed by any subscribers,
* significantly reducing publishing CPU and bandwidth usage.
*
* Defaults to false.
*/
var autoManageVideo: Boolean = false
var dynacast: Boolean = false
/**
* Default options to use when creating an audio track.
... ... @@ -140,7 +150,7 @@ constructor(
return
}
val lp = localParticipantFactory.create(response.participant)
val lp = localParticipantFactory.create(response.participant, dynacast)
lp.internalListener = this
localParticipant = lp
if (response.otherParticipantsList.isNotEmpty()) {
... ... @@ -182,12 +192,13 @@ constructor(
}
fun getParticipant(sid: String): Participant? {
if(sid == localParticipant.sid){
if (sid == localParticipant.sid) {
return localParticipant
} else {
return remoteParticipants[sid]
}
}
@Synchronized
private fun getOrCreateRemoteParticipant(
sid: String,
... ... @@ -207,8 +218,14 @@ constructor(
coroutineScope.launch {
participant.events.collect {
when(it){
is ParticipantEvent.TrackStreamStateChanged -> eventBus.postEvent(RoomEvent.TrackStreamStateChanged(this@Room, it.trackPublication, it.streamState))
when (it) {
is ParticipantEvent.TrackStreamStateChanged -> eventBus.postEvent(
RoomEvent.TrackStreamStateChanged(
this@Room,
it.trackPublication,
it.streamState
)
)
}
}
}
... ... @@ -262,7 +279,7 @@ constructor(
participant.audioLevel = speaker.level
participant.isSpeaking = speaker.active
if(speaker.active) {
if (speaker.active) {
updatedSpeakers[speaker.sid] = participant
} else {
updatedSpeakers.remove(speaker.sid)
... ... @@ -288,14 +305,14 @@ constructor(
}
private fun handleDisconnect() {
if(state == State.DISCONNECTED) {
if (state == State.DISCONNECTED) {
return
}
try {
val cm = context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager
cm.unregisterNetworkCallback(this)
} catch (e : IllegalArgumentException) {
} catch (e: IllegalArgumentException) {
// do nothing, may happen on older versions if attempting to unregister twice.
}
... ... @@ -377,7 +394,7 @@ constructor(
trackSid = track.id()
}
val participant = getOrCreateRemoteParticipant(participantSid)
participant.addSubscribedMediaTrack(track, trackSid!!, autoManageVideo)
participant.addSubscribedMediaTrack(track, trackSid!!, adaptiveStream)
}
/**
... ... @@ -387,7 +404,7 @@ constructor(
for (info in updates) {
val participantSid = info.sid
if(localParticipant.sid == participantSid) {
if (localParticipant.sid == participantSid) {
localParticipant.updateFromInfo(info)
continue
}
... ... @@ -454,7 +471,7 @@ constructor(
}
override fun onStreamStateUpdate(streamStates: List<LivekitRtc.StreamStateInfo>) {
for(streamState in streamStates){
for (streamState in streamStates) {
val participant = getParticipant(streamState.participantSid) ?: continue
val track = participant.tracks[streamState.trackSid] ?: continue
... ... @@ -462,6 +479,10 @@ constructor(
}
}
override fun onSubscribedQualityUpdate(subscribedQualityUpdate: LivekitRtc.SubscribedQualityUpdate) {
localParticipant.handleSubscribedQualityUpdate(subscribedQualityUpdate)
}
/**
* @suppress
*/
... ... @@ -505,7 +526,7 @@ constructor(
* @suppress
*/
override fun onTrackPublished(publication: RemoteTrackPublication, participant: RemoteParticipant) {
listener?.onTrackPublished(publication, participant, this)
listener?.onTrackPublished(publication, participant, this)
eventBus.postEvent(RoomEvent.TrackPublished(this, publication, participant), coroutineScope)
}
... ... @@ -513,7 +534,7 @@ constructor(
* @suppress
*/
override fun onTrackUnpublished(publication: RemoteTrackPublication, participant: RemoteParticipant) {
listener?.onTrackUnpublished(publication, participant, this)
listener?.onTrackUnpublished(publication, participant, this)
eventBus.postEvent(RoomEvent.TrackUnpublished(this, publication, participant), coroutineScope)
}
... ... @@ -521,7 +542,7 @@ constructor(
* @suppress
*/
override fun onTrackPublished(publication: LocalTrackPublication, participant: LocalParticipant) {
listener?.onTrackPublished(publication, participant, this)
listener?.onTrackPublished(publication, participant, this)
eventBus.postEvent(RoomEvent.TrackPublished(this, publication, participant), coroutineScope)
}
... ... @@ -529,7 +550,7 @@ constructor(
* @suppress
*/
override fun onTrackUnpublished(publication: LocalTrackPublication, participant: LocalParticipant) {
listener?.onTrackUnpublished(publication, participant, this)
listener?.onTrackUnpublished(publication, participant, this)
eventBus.postEvent(RoomEvent.TrackUnpublished(this, publication, participant), coroutineScope)
}
... ...
package io.livekit.android.room
import com.google.protobuf.util.JsonFormat
import com.vdurmont.semver4j.Semver
import io.livekit.android.ConnectOptions
import io.livekit.android.Version
import io.livekit.android.dagger.InjectionNames
... ... @@ -50,6 +51,7 @@ constructor(
private var currentWs: WebSocket? = null
private var isReconnecting: Boolean = false
var listener: Listener? = null
private var serverVersion: Semver? = null
private var lastUrl: String? = null
private var joinContinuation: CancellableContinuation<Either<LivekitRtc.JoinResponse, Unit>>? = null
... ... @@ -359,6 +361,11 @@ constructor(
// Only handle joins if not connected.
if (response.hasJoin()) {
isConnected = true
try {
serverVersion = Semver(response.join.serverVersion)
} catch (t: Throwable) {
LKLog.w(t) { "Thrown while trying to parse server version." }
}
joinContinuation?.resumeWith(Result.success(Either.Left(response.join)))
} else {
LKLog.e { "Received response while not connected. ${toJsonProtobuf.print(response)}" }
... ... @@ -418,6 +425,16 @@ constructor(
LivekitRtc.SignalResponse.MessageCase.STREAM_STATE_UPDATE -> {
listener?.onStreamStateUpdate(response.streamStateUpdate.streamStatesList)
}
LivekitRtc.SignalResponse.MessageCase.SUBSCRIBED_QUALITY_UPDATE -> {
val versionToIgnoreUpTo = Semver("0.15.1")
if (serverVersion?.compareTo(versionToIgnoreUpTo) ?: 1 <= 0) {
return
}
listener?.onSubscribedQualityUpdate(response.subscribedQualityUpdate)
}
LivekitRtc.SignalResponse.MessageCase.SUBSCRIPTION_PERMISSION_UPDATE -> {
// TODO
}
LivekitRtc.SignalResponse.MessageCase.MESSAGE_NOT_SET,
null -> {
LKLog.v { "empty messageCase!" }
... ... @@ -445,6 +462,7 @@ constructor(
fun onLeave()
fun onError(error: Throwable)
fun onStreamStateUpdate(streamStates: List<LivekitRtc.StreamStateInfo>)
fun onSubscribedQualityUpdate(subscribedQualityUpdate: LivekitRtc.SubscribedQualityUpdate)
}
companion object {
... ...
... ... @@ -31,6 +31,8 @@ class LocalParticipant
internal constructor(
@Assisted
info: LivekitModels.ParticipantInfo,
@Assisted
private val dynacast: Boolean,
internal val engine: RTCEngine,
private val peerConnectionFactory: PeerConnectionFactory,
private val context: Context,
... ... @@ -253,8 +255,8 @@ internal constructor(
val transceiver = engine.publisher.peerConnection.addTransceiver(track.rtcTrack, transInit)
when (track) {
is LocalVideoTrack -> track.transceiver
is LocalAudioTrack -> track.transceiver
is LocalVideoTrack -> track.transceiver = transceiver
is LocalAudioTrack -> track.transceiver = transceiver
else -> {
throw IllegalArgumentException("Trying to publish a non local track of type ${track.javaClass}")
}
... ... @@ -314,6 +316,7 @@ internal constructor(
if (encodings.size >= VIDEO_RIDS.size) {
throw IllegalStateException("Attempting to add more encodings than we have rids for!")
}
// encodings is mutable, so this will grab next available rid
val rid = VIDEO_RIDS[encodings.size]
encodings.add(videoEncoding.toRtpEncoding(rid, scale))
}
... ... @@ -334,11 +337,13 @@ internal constructor(
val lowScale = if (hasEvenDimensions) calculateScale(lowPreset.capture) else 2.0
addEncoding(lowPreset.encoding, lowScale)
}
addEncoding(encoding, 1.0)
} else {
encodings.add(encoding.toRtpEncoding())
}
// Make largest size at front. addTransceiver seems to fail if ordered from smallest to largest.
encodings.reverse()
return encodings
}
... ... @@ -405,6 +410,15 @@ internal constructor(
}
}
private fun ridForVideoQuality(quality: LivekitModels.VideoQuality): String? {
return when (quality) {
LivekitModels.VideoQuality.HIGH -> "f"
LivekitModels.VideoQuality.MEDIUM -> "h"
LivekitModels.VideoQuality.LOW -> "q"
else -> null
}
}
fun unpublishTrack(track: Track) {
val publication = localTrackPublications.firstOrNull { it.track == track }
if (publication === null) {
... ... @@ -482,6 +496,32 @@ internal constructor(
pub?.muted = muted
}
fun handleSubscribedQualityUpdate(subscribedQualityUpdate: LivekitRtc.SubscribedQualityUpdate) {
val trackSid = subscribedQualityUpdate.trackSid
val qualities = subscribedQualityUpdate.subscribedQualitiesList
val pub = tracks[trackSid] ?: return
val track = pub.track as? LocalVideoTrack ?: return
val sender = track.transceiver?.sender ?: return
val parameters = sender.parameters ?: return
val encodings = parameters.encodings ?: return
var hasChanged = false
for (quality in qualities) {
val rid = ridForVideoQuality(quality.quality) ?: continue
val encoding = encodings.firstOrNull { it.rid == rid } ?: continue
if (encoding.active != quality.enabled) {
hasChanged = true
encoding.active = quality.enabled
LKLog.v { "setting layer ${quality.quality} to ${quality.enabled}" }
}
}
if (hasChanged) {
sender.parameters = parameters
}
}
interface PublishListener {
fun onPublishSuccess(publication: TrackPublication) {}
... ... @@ -490,7 +530,7 @@ internal constructor(
@AssistedFactory
interface Factory {
fun create(info: LivekitModels.ParticipantInfo): LocalParticipant
fun create(info: LivekitModels.ParticipantInfo, dynacast: Boolean): LocalParticipant
}
companion object {
... ...
... ... @@ -43,7 +43,7 @@ class RoomTest {
var eglBase: EglBase = MockEglBase()
val localParticantFactory = object : LocalParticipant.Factory {
override fun create(info: LivekitModels.ParticipantInfo): LocalParticipant {
override fun create(info: LivekitModels.ParticipantInfo, dynacast: Boolean): LocalParticipant {
return Mockito.mock(LocalParticipant::class.java)
}
}
... ...
Subproject commit 8785fbf5c143612bf002dbbf6ca74db4e22f2f77
Subproject commit 88ab66e0e761ff304042b286c41de3c803b45576
... ...
... ... @@ -17,8 +17,6 @@ import io.livekit.android.util.flow
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import okio.Utf8
import java.nio.charset.Charset
@OptIn(ExperimentalCoroutinesApi::class)
class CallViewModel(
... ... @@ -76,12 +74,17 @@ class CallViewModel(
init {
viewModelScope.launch {
launch {
error.collect { Timber.e(it) }
}
try {
val room = LiveKit.connect(
application,
url,
token,
roomOptions = RoomOptions(autoManageVideo = true),
roomOptions = RoomOptions(adaptiveStream = true),
listener = this@CallViewModel
)
... ...