davidliu
Committed by GitHub

handle StreamStateUpdate (#29)

* Update protocol submodule commit

* StreamStateUpdate

* default track streamstate paused

* fix tests
... ... @@ -7,13 +7,17 @@ import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.launch
class BroadcastEventBus<T> : EventListenable<T> {
private val mutableEvents = MutableSharedFlow<T>()
private val mutableEvents = MutableSharedFlow<T>(extraBufferCapacity = Int.MAX_VALUE)
override val events = mutableEvents.asSharedFlow()
suspend fun postEvent(event: T) {
mutableEvents.emit(event)
}
fun tryPostEvent(event: T) {
mutableEvents.tryEmit(event)
}
fun postEvent(event: T, scope: CoroutineScope): Job {
return scope.launch { postEvent(event) }
}
... ...
... ... @@ -98,4 +98,13 @@ sealed class ParticipantEvent(open val participant: Participant) : Event() {
* Received data published by another participant
*/
class DataReceived(override val participant: RemoteParticipant, val data: ByteArray) : ParticipantEvent(participant)
/**
* A track's stream state has changed.
*/
class TrackStreamStateChanged(
override val participant: Participant,
val trackPublication: TrackPublication,
val streamState: Track.StreamState
) : ParticipantEvent(participant)
}
\ No newline at end of file
... ...
... ... @@ -122,6 +122,15 @@ sealed class RoomEvent(val room: Room) : Event() {
) : RoomEvent(room)
/**
* A track's stream state has changed.
*/
class TrackStreamStateChanged(
room: Room,
val trackPublication: TrackPublication,
val streamState: Track.StreamState
) : RoomEvent(room)
/**
* Received data published by another participant
*/
class DataReceived(room: Room, val data: ByteArray, val participant: RemoteParticipant) : RoomEvent(room)
... ...
... ... @@ -2,7 +2,8 @@ package io.livekit.android.events
import io.livekit.android.room.track.Track
sealed class TrackEvent : Event() {
class VisibilityChanged(val isVisible: Boolean) : TrackEvent()
class VideoDimensionsChanged(val newDimensions: Track.Dimensions) : TrackEvent()
sealed class TrackEvent(val track: Track) : Event() {
class VisibilityChanged(track: Track, val isVisible: Boolean) : TrackEvent(track)
class VideoDimensionsChanged(track: Track, val newDimensions: Track.Dimensions) : TrackEvent(track)
class StreamStateChanged(track: Track, val streamState: Track.StreamState) : TrackEvent(track)
}
\ No newline at end of file
... ...
... ... @@ -387,6 +387,7 @@ internal constructor(
fun onDisconnect(reason: String)
fun onFailToConnect(error: Exception)
fun onUserPacket(packet: LivekitModels.UserPacket, kind: LivekitModels.DataPacket.Kind)
fun onStreamStateUpdate(streamStates: List<LivekitRtc.StreamStateInfo>)
}
companion object {
... ... @@ -524,6 +525,10 @@ internal constructor(
listener?.onFailToConnect(error)
}
override fun onStreamStateUpdate(streamStates: List<LivekitRtc.StreamStateInfo>) {
listener?.onStreamStateUpdate(streamStates)
}
//--------------------------------- DataChannel.Observer ------------------------------------//
override fun onBufferedAmountChange(previousAmount: Long) {
... ...
... ... @@ -12,7 +12,9 @@ import io.livekit.android.ConnectOptions
import io.livekit.android.Version
import io.livekit.android.dagger.InjectionNames
import io.livekit.android.events.BroadcastEventBus
import io.livekit.android.events.ParticipantEvent
import io.livekit.android.events.RoomEvent
import io.livekit.android.events.collect
import io.livekit.android.renderer.TextureViewRenderer
import io.livekit.android.room.participant.*
import io.livekit.android.room.track.*
... ... @@ -204,6 +206,14 @@ constructor(
}
participant.internalListener = this
coroutineScope.launch {
participant.events.collect {
when(it){
is ParticipantEvent.TrackStreamStateChanged -> eventBus.postEvent(RoomEvent.TrackStreamStateChanged(this@Room, it.trackPublication, it.streamState))
}
}
}
val newRemoteParticipants = mutableRemoteParticipants.toMutableMap()
newRemoteParticipants[sid] = participant
mutableRemoteParticipants = newRemoteParticipants
... ... @@ -435,6 +445,15 @@ constructor(
participant.onDataReceived(data)
}
override fun onStreamStateUpdate(streamStates: List<LivekitRtc.StreamStateInfo>) {
for(streamState in streamStates){
val participant = getParticipant(streamState.participantSid) ?: continue
val track = participant.tracks[streamState.trackSid] ?: continue
track.track?.streamState = Track.StreamState.fromProto(streamState.state)
}
}
/**
* @suppress
*/
... ...
... ... @@ -292,7 +292,7 @@ constructor(
sid: String,
disabled: Boolean,
videoDimensions: Track.Dimensions?,
videoQuality: LivekitRtc.VideoQuality?,
videoQuality: LivekitModels.VideoQuality?,
) {
val trackSettings = LivekitRtc.UpdateTrackSettings.newBuilder()
.addTrackSids(sid)
... ... @@ -305,7 +305,7 @@ constructor(
quality = videoQuality
} else {
// default to HIGH
quality = LivekitRtc.VideoQuality.HIGH
quality = LivekitModels.VideoQuality.HIGH
}
}
... ... @@ -415,8 +415,8 @@ constructor(
LivekitRtc.SignalResponse.MessageCase.CONNECTION_QUALITY -> {
listener?.onConnectionQuality(response.connectionQuality.updatesList)
}
LivekitRtc.SignalResponse.MessageCase.STREAMED_TRACKS_UPDATE -> {
// TODO
LivekitRtc.SignalResponse.MessageCase.STREAM_STATE_UPDATE -> {
listener?.onStreamStateUpdate(response.streamStateUpdate.streamStatesList)
}
LivekitRtc.SignalResponse.MessageCase.MESSAGE_NOT_SET,
null -> {
... ... @@ -444,6 +444,7 @@ constructor(
fun onConnectionQuality(updates: List<LivekitRtc.ConnectionQualityInfo>)
fun onLeave()
fun onError(error: Exception)
fun onStreamStateUpdate(streamStates: List<LivekitRtc.StreamStateInfo>)
}
companion object {
... ...
... ... @@ -3,6 +3,7 @@ package io.livekit.android.room.participant
import io.livekit.android.dagger.InjectionNames
import io.livekit.android.events.BroadcastEventBus
import io.livekit.android.events.ParticipantEvent
import io.livekit.android.events.TrackEvent
import io.livekit.android.room.track.LocalTrackPublication
import io.livekit.android.room.track.RemoteTrackPublication
import io.livekit.android.room.track.Track
... ... @@ -234,6 +235,11 @@ open class Participant(
eventBus.postEvent(ParticipantEvent.TrackUnmuted(this, trackPublication), scope)
}
internal fun onTrackStreamStateChanged(trackEvent: TrackEvent.StreamStateChanged) {
val trackPublication = tracks[trackEvent.track.sid] ?: return
eventBus.postEvent(ParticipantEvent.TrackStreamStateChanged(this, trackPublication, trackEvent.streamState), scope)
}
}
@Deprecated("Use Participant.events instead.")
... ...
... ... @@ -8,7 +8,6 @@ import io.livekit.android.util.debounce
import io.livekit.android.util.invoke
import kotlinx.coroutines.*
import livekit.LivekitModels
import livekit.LivekitRtc
import javax.inject.Named
class RemoteTrackPublication(
... ... @@ -36,6 +35,7 @@ class RemoteTrackPublication(
when (it) {
is TrackEvent.VisibilityChanged -> handleVisibilityChanged(it)
is TrackEvent.VideoDimensionsChanged -> handleVideoDimensionsChanged(it)
is TrackEvent.StreamStateChanged -> handleStreamStateChanged(it)
}
}
}
... ... @@ -52,11 +52,15 @@ class RemoteTrackPublication(
sendUpdateTrackSettings.invoke()
}
private fun handleStreamStateChanged(trackEvent: TrackEvent.StreamStateChanged) {
participant.get()?.onTrackStreamStateChanged(trackEvent)
}
private var trackJob: Job? = null
private var unsubscribed: Boolean = false
private var disabled: Boolean = false
private var videoQuality: LivekitRtc.VideoQuality? = LivekitRtc.VideoQuality.HIGH
private var videoQuality: LivekitModels.VideoQuality? = LivekitModels.VideoQuality.HIGH
private var videoDimensions: Track.Dimensions? = null
val isAutoManaged: Boolean
... ... @@ -113,7 +117,7 @@ class RemoteTrackPublication(
* this indicates the highest quality the client can accept. if network bandwidth does not
* allow, server will automatically reduce quality to optimize for uninterrupted video
*/
fun setVideoQuality(quality: LivekitRtc.VideoQuality) {
fun setVideoQuality(quality: LivekitModels.VideoQuality) {
if (isAutoManaged
|| !subscribed
|| quality == videoQuality
... ...
... ... @@ -83,11 +83,11 @@ class RemoteVideoTrack(
val eventsToPost = mutableListOf<TrackEvent>()
if (isVisible != lastVisibility) {
lastVisibility = isVisible
eventsToPost.add(TrackEvent.VisibilityChanged(isVisible))
eventsToPost.add(TrackEvent.VisibilityChanged(this, isVisible))
}
if (newDimensions != lastDimensions) {
lastDimensions = newDimensions
eventsToPost.add(TrackEvent.VideoDimensionsChanged(newDimensions))
eventsToPost.add(TrackEvent.VideoDimensionsChanged(this, newDimensions))
}
if (eventsToPost.any()) {
... ...
... ... @@ -2,7 +2,9 @@ package io.livekit.android.room.track
import io.livekit.android.events.BroadcastEventBus
import io.livekit.android.events.TrackEvent
import io.livekit.android.util.flowDelegate
import livekit.LivekitModels
import livekit.LivekitRtc
import org.webrtc.MediaStreamTrack
open class Track(
... ... @@ -12,17 +14,24 @@ open class Track(
) {
protected val eventBus = BroadcastEventBus<TrackEvent>()
val events = eventBus.readOnly()
var name = name
internal set
var kind = kind
internal set
var sid: String? = null
internal set
var streamState: StreamState by flowDelegate(StreamState.PAUSED) { newValue, oldValue ->
if (newValue != oldValue) {
eventBus.tryPostEvent(TrackEvent.StreamStateChanged(this, newValue))
}
}
internal set
enum class Kind(val value: String) {
AUDIO("audio"),
VIDEO("video"),
// unknown
UNRECOGNIZED("unrecognized");
... ... @@ -77,6 +86,30 @@ open class Track(
}
}
enum class StreamState {
ACTIVE,
PAUSED,
UNKNOWN;
fun toProto(): LivekitRtc.StreamState {
return when (this) {
ACTIVE -> LivekitRtc.StreamState.ACTIVE
PAUSED -> LivekitRtc.StreamState.PAUSED
UNKNOWN -> LivekitRtc.StreamState.UNRECOGNIZED
}
}
companion object {
fun fromProto(state: LivekitRtc.StreamState): StreamState {
return when (state) {
LivekitRtc.StreamState.ACTIVE -> ACTIVE
LivekitRtc.StreamState.PAUSED -> PAUSED
LivekitRtc.StreamState.UNRECOGNIZED -> UNKNOWN
}
}
}
}
data class Dimensions(val width: Int, val height: Int)
open fun start() {
... ...
package io.livekit.android.mock
import org.webrtc.AudioTrack
class MockAudioStreamTrack(
val id: String = "id",
val kind: String = AUDIO_TRACK_KIND,
var enabled: Boolean = true,
var state: State = State.LIVE,
) : AudioTrack(1L) {
override fun id(): String = id
override fun kind(): String = kind
override fun enabled(): Boolean = enabled
override fun setEnabled(enable: Boolean): Boolean {
enabled = enable
return true
}
override fun state(): State {
return state
}
override fun dispose() {
}
override fun setVolume(volume: Double) {
}
}
\ No newline at end of file
... ...
package io.livekit.android.mock
import org.webrtc.AudioTrack
import org.webrtc.MediaStream
import org.webrtc.VideoTrack
class MockMediaStream(private val id: String = "id") : MediaStream(1L) {
override fun addTrack(track: AudioTrack): Boolean {
return audioTracks.add(track)
}
override fun addTrack(track: VideoTrack?): Boolean {
return videoTracks.add(track)
}
override fun addPreservedTrack(track: VideoTrack?): Boolean {
return preservedVideoTracks.add(track)
}
override fun removeTrack(track: AudioTrack?): Boolean {
return audioTracks.remove(track)
}
override fun removeTrack(track: VideoTrack?): Boolean {
return videoTracks.remove(track)
}
override fun dispose() {
// Don't do anything in this stubbed class
}
override fun getId(): String = id
}
\ No newline at end of file
... ...
package io.livekit.android.mock
import org.webrtc.MediaStreamTrack
class MockMediaStreamTrack(
val id: String = "id",
val kind: String = AUDIO_TRACK_KIND,
var enabled: Boolean = true,
var state: State = State.LIVE,
) : MediaStreamTrack(1L) {
override fun id(): String = id
override fun kind(): String = kind
override fun enabled(): Boolean = enabled
override fun setEnabled(enable: Boolean): Boolean {
enabled = enable
return true
}
override fun state(): State {
return state
}
override fun dispose() {
}
}
\ No newline at end of file
... ...
... ... @@ -5,13 +5,15 @@ import androidx.test.core.app.ApplicationProvider
import io.livekit.android.coroutines.TestCoroutineRule
import io.livekit.android.events.EventCollector
import io.livekit.android.events.RoomEvent
import io.livekit.android.mock.MockWebsocketFactory
import io.livekit.android.mock.*
import io.livekit.android.mock.dagger.DaggerTestLiveKitComponent
import io.livekit.android.room.participant.ConnectionQuality
import io.livekit.android.room.track.Track
import io.livekit.android.util.toOkioByteString
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runBlockingTest
import livekit.LivekitRtc
import org.junit.Assert
import org.junit.Before
import org.junit.Rule
... ... @@ -166,6 +168,35 @@ class RoomMockE2ETest {
}
@Test
fun trackStreamStateChanged() {
connect()
wsFactory.listener.onMessage(
wsFactory.ws,
SignalClientTest.PARTICIPANT_JOIN.toOkioByteString()
)
// We intentionally don't emit if the track isn't subscribed, so need to
// add track.
room.onAddTrack(
MockAudioStreamTrack(),
arrayOf(MockMediaStream(id = "${TestData.REMOTE_PARTICIPANT.sid}|${TestData.REMOTE_AUDIO_TRACK.sid}"))
)
val eventCollector = EventCollector(room.events, coroutineRule.scope)
wsFactory.listener.onMessage(
wsFactory.ws,
SignalClientTest.STREAM_STATE_UPDATE.toOkioByteString()
)
val events = eventCollector.stopCollecting()
Assert.assertEquals(1, events.size)
Assert.assertEquals(true, events[0] is RoomEvent.TrackStreamStateChanged)
val event = events[0] as RoomEvent.TrackStreamStateChanged
Assert.assertEquals(Track.StreamState.ACTIVE, event.streamState)
}
@Test
fun leave() {
connect()
val eventCollector = EventCollector(room.events, coroutineRule.scope)
... ...
... ... @@ -229,6 +229,18 @@ class SignalClientTest {
build()
}
val STREAM_STATE_UPDATE = with(LivekitRtc.SignalResponse.newBuilder()) {
streamStateUpdate = with(LivekitRtc.StreamStateUpdate.newBuilder()) {
addStreamStates(with(LivekitRtc.StreamStateInfo.newBuilder()) {
participantSid = TestData.REMOTE_PARTICIPANT.sid
trackSid = TestData.REMOTE_AUDIO_TRACK.sid
state = LivekitRtc.StreamState.ACTIVE
build()
})
build()
}
build()
}
val LEAVE = with(LivekitRtc.SignalResponse.newBuilder()) {
leave = with(leaveBuilder) {
build()
... ...
Subproject commit 8de3224f9217f50b781e3a7f2b0e635711217fde
Subproject commit 8785fbf5c143612bf002dbbf6ca74db4e22f2f77
... ...