David Liu

participant events

package io.livekit.android.events
import io.livekit.android.room.participant.LocalParticipant
import io.livekit.android.room.participant.Participant
import io.livekit.android.room.participant.RemoteParticipant
import io.livekit.android.room.track.LocalTrackPublication
import io.livekit.android.room.track.RemoteTrackPublication
import io.livekit.android.room.track.Track
import io.livekit.android.room.track.TrackPublication
sealed class ParticipantEvent(open val participant: Participant) : Event() {
// all participants
/**
* When a participant's metadata is updated, fired for all participants
*/
class MetadataChanged(participant: Participant, val prevMetadata: String?) : ParticipantEvent(participant)
/**
* Fired when the current participant's isSpeaking property changes. (including LocalParticipant)
*/
class SpeakingChanged(participant: Participant, val isSpeaking: Boolean) : ParticipantEvent(participant)
/**
* The participant was muted.
*
* For the local participant, the callback will be called if setMute was called on the
* [LocalTrackPublication], or if the server has requested the participant to be muted
*/
class TrackMuted(participant: Participant, val publication: TrackPublication) : ParticipantEvent(participant)
/**
* The participant was unmuted.
*
* For the local participant, the callback will be called if setMute was called on the
* [LocalTrackPublication], or if the server has requested the participant to be muted
*/
class TrackUnmuted(participant: Participant, val publication: TrackPublication) : ParticipantEvent(participant)
// local participants
/**
* When a new track is published by the local participant.
*/
class LocalTrackPublished(override val participant: LocalParticipant, val publication: LocalTrackPublication) :
ParticipantEvent(participant)
/**
* A [LocalParticipant] has unpublished a track
*/
class LocalTrackUnpublished(override val participant: LocalParticipant, val publication: LocalTrackPublication) :
ParticipantEvent(participant)
// remote participants
/**
* When a new track is published to room after the local participant has joined.
*
* It will not fire for tracks that are already published
*/
class TrackPublished(override val participant: RemoteParticipant, val publication: RemoteTrackPublication) :
ParticipantEvent(participant)
/**
* A [RemoteParticipant] has unpublished a track
*/
class TrackUnpublished(override val participant: RemoteParticipant, val publication: RemoteTrackPublication) :
ParticipantEvent(participant)
/**
* Subscribed to a new track
*/
class TrackSubscribed(
override val participant: RemoteParticipant,
val track: Track,
val publication: RemoteTrackPublication,
) :
ParticipantEvent(participant)
/**
* Error had occurred while subscribing to a track
*/
class TrackSubscriptionFailed(
override val participant: RemoteParticipant,
val sid: String,
val exception: Exception,
) : ParticipantEvent(participant)
/**
* A subscribed track is no longer available.
* Clients should listen to this event and handle cleanup
*/
class TrackUnsubscribed(
override val participant: RemoteParticipant,
val track: Track,
val publication: RemoteTrackPublication
) : ParticipantEvent(participant)
/**
* Received data published by another participant
*/
class DataReceived(override val participant: RemoteParticipant, val data: ByteArray) : ParticipantEvent(participant)
}
\ No newline at end of file
... ...
... ... @@ -9,45 +9,45 @@ import io.livekit.android.room.track.LocalTrackPublication
import io.livekit.android.room.track.Track
import io.livekit.android.room.track.TrackPublication
sealed class RoomEvent : Event() {
sealed class RoomEvent(val room: Room) : Event() {
/**
* A network change has been detected and LiveKit attempts to reconnect to the room
* When reconnect attempts succeed, the room state will be kept, including tracks that are subscribed/published
*/
class Reconnecting(val room: Room): RoomEvent()
class Reconnecting(room: Room) : RoomEvent(room)
/**
* The reconnect attempt had been successful
*/
class Reconnected(val room: Room): RoomEvent()
class Reconnected(room: Room) : RoomEvent(room)
/**
* Disconnected from room
*/
class Disconnected(val room: Room, val error: Exception?): RoomEvent()
class Disconnected(room: Room, val error: Exception?) : RoomEvent(room)
/**
* When a [RemoteParticipant] joins after the local participant. It will not emit events
* for participants that are already in the room
*/
class ParticipantConnected(val room: Room, val participant: RemoteParticipant): RoomEvent()
class ParticipantConnected(room: Room, val participant: RemoteParticipant) : RoomEvent(room)
/**
* When a [RemoteParticipant] leaves after the local participant has joined.
*/
class ParticipantDisconnected(val room: Room, val participant: RemoteParticipant) : RoomEvent()
class ParticipantDisconnected(room: Room, val participant: RemoteParticipant) : RoomEvent(room)
/**
* Active speakers changed. List of speakers are ordered by their audio level. loudest
* speakers first. This will include the [LocalParticipant] too.
*/
class ActiveSpeakersChanged(val room: Room, val speakers: List<Participant>) : RoomEvent()
class ActiveSpeakersChanged(room: Room, val speakers: List<Participant>) : RoomEvent(room)
class RoomMetadataChanged(
val room: Room,
room: Room,
val newMetadata: String?,
val prevMetadata: String?
) : RoomEvent()
) : RoomEvent(room)
// Participant callbacks
/**
... ... @@ -56,10 +56,10 @@ sealed class RoomEvent : Event() {
* this event will be fired for all clients in the room.
*/
class ParticipantMetadataChanged(
val room: Room,
room: Room,
val participant: Participant,
val prevMetadata: String?
) : RoomEvent()
) : RoomEvent(room)
/**
* The participant was muted.
... ... @@ -67,7 +67,7 @@ sealed class RoomEvent : Event() {
* For the local participant, the callback will be called if setMute was called on the
* [LocalTrackPublication], or if the server has requested the participant to be muted
*/
class TrackMuted(val room: Room, val publication: TrackPublication, val participant: Participant): RoomEvent()
class TrackMuted(room: Room, val publication: TrackPublication, val participant: Participant) : RoomEvent(room)
/**
* The participant was unmuted.
... ... @@ -75,40 +75,56 @@ sealed class RoomEvent : Event() {
* For the local participant, the callback will be called if setMute was called on the
* [LocalTrackPublication], or if the server has requested the participant to be muted
*/
class TrackUnmuted(val room: Room, val publication: TrackPublication, val participant: Participant): RoomEvent()
class TrackUnmuted(room: Room, val publication: TrackPublication, val participant: Participant) : RoomEvent(room)
/**
* When a new track is published to room after the local participant has joined. It will
* not fire for tracks that are already published
*/
class TrackPublished(val room: Room, val publication: TrackPublication, val participant: Participant): RoomEvent()
class TrackPublished(room: Room, val publication: TrackPublication, val participant: Participant) : RoomEvent(room)
/**
* A [Participant] has unpublished a track
*/
class TrackUnpublished(val room: Room, val publication: TrackPublication, val participant: Participant): RoomEvent()
class TrackUnpublished(room: Room, val publication: TrackPublication, val participant: Participant) :
RoomEvent(room)
/**
* The [LocalParticipant] has subscribed to a new track. This event will always fire as
* long as new tracks are ready for use.
*/
class TrackSubscribed(val room: Room, val track: Track, val publication: TrackPublication, val participant: RemoteParticipant): RoomEvent()
class TrackSubscribed(
room: Room,
val track: Track,
val publication: TrackPublication,
val participant: RemoteParticipant
) : RoomEvent(room)
/**
* Could not subscribe to a track
*/
class TrackSubscriptionFailed(val room: Room, val sid: String, val exception: Exception, val participant: RemoteParticipant): RoomEvent()
class TrackSubscriptionFailed(
room: Room,
val sid: String,
val exception: Exception,
val participant: RemoteParticipant
) : RoomEvent(room)
/**
* A subscribed track is no longer available. Clients should listen to this event and ensure
* the track removes all renderers
*/
class TrackUnsubscribed(val room: Room, val track: Track, val publications: TrackPublication, val participant: RemoteParticipant): RoomEvent()
class TrackUnsubscribed(
room: Room,
val track: Track,
val publications: TrackPublication,
val participant: RemoteParticipant
) : RoomEvent(room)
/**
* Received data published by another participant
*/
class DataReceived(val room: Room, val data: ByteArray, val participant: RemoteParticipant): RoomEvent()
class DataReceived(room: Room, val data: ByteArray, val participant: RemoteParticipant) : RoomEvent(room)
/**
* The connection quality for a participant has changed.
... ... @@ -116,6 +132,7 @@ sealed class RoomEvent : Event() {
* @param participant Either a remote participant or [Room.localParticipant]
* @param quality the new connection quality
*/
class ConnectionQualityChanged(val room: Room, val participant: Participant, val quality: ConnectionQuality): RoomEvent()
class ConnectionQualityChanged(room: Room, val participant: Participant, val quality: ConnectionQuality) :
RoomEvent(room)
}
\ No newline at end of file
... ...
... ... @@ -149,9 +149,9 @@ constructor(
}
participant = if (info != null) {
RemoteParticipant(info, engine.client, ioDispatcher)
RemoteParticipant(info, engine.client, ioDispatcher, defaultDispatcher)
} else {
RemoteParticipant(sid, null, engine.client, ioDispatcher)
RemoteParticipant(sid, null, engine.client, ioDispatcher, defaultDispatcher)
}
participant.internalListener = this
mutableRemoteParticipants[sid] = participant
... ... @@ -381,7 +381,7 @@ constructor(
listener?.onDataReceived(data, participant, this)
eventBus.postEvent(RoomEvent.DataReceived(this, data, participant), coroutineScope)
participant.listener?.onDataReceived(data, participant)
participant.onDataReceived(data)
}
/**
... ...
... ... @@ -7,16 +7,20 @@ import com.google.protobuf.ByteString
import dagger.assisted.Assisted
import dagger.assisted.AssistedFactory
import dagger.assisted.AssistedInject
import io.livekit.android.dagger.InjectionNames
import io.livekit.android.events.ParticipantEvent
import io.livekit.android.room.DefaultsManager
import io.livekit.android.room.RTCEngine
import io.livekit.android.room.track.*
import io.livekit.android.util.LKLog
import kotlinx.coroutines.CoroutineDispatcher
import livekit.LivekitModels
import livekit.LivekitRtc
import org.webrtc.EglBase
import org.webrtc.PeerConnectionFactory
import org.webrtc.RtpParameters
import org.webrtc.RtpTransceiver
import javax.inject.Named
import kotlin.math.abs
import kotlin.math.roundToInt
... ... @@ -31,8 +35,10 @@ internal constructor(
private val eglBase: EglBase,
private val screencastVideoTrackFactory: LocalScreencastVideoTrack.Factory,
private val videoTrackFactory: LocalVideoTrack.Factory,
private val defaultsManager: DefaultsManager
) : Participant(info.sid, info.identity) {
private val defaultsManager: DefaultsManager,
@Named(InjectionNames.DISPATCHER_DEFAULT)
coroutineDispatcher: CoroutineDispatcher,
) : Participant(info.sid, info.identity, coroutineDispatcher) {
var audioTrackCaptureDefaults: LocalAudioTrackOptions by defaultsManager::audioTrackCaptureDefaults
var audioTrackPublishDefaults: AudioTrackPublishDefaults by defaultsManager::audioTrackPublishDefaults
... ... @@ -212,6 +218,7 @@ internal constructor(
addTrackPublication(publication)
publishListener?.onPublishSuccess(publication)
internalListener?.onTrackPublished(publication, this)
eventBus.postEvent(ParticipantEvent.LocalTrackPublished(this, publication), scope)
}
suspend fun publishVideoTrack(
... ... @@ -260,6 +267,7 @@ internal constructor(
addTrackPublication(publication)
publishListener?.onPublishSuccess(publication)
internalListener?.onTrackPublished(publication, this)
eventBus.postEvent(ParticipantEvent.LocalTrackPublished(this, publication), scope)
}
private fun computeVideoEncodings(
... ... @@ -374,6 +382,7 @@ internal constructor(
}
track.stop()
internalListener?.onTrackUnpublished(publication, this)
eventBus.postEvent(ParticipantEvent.LocalTrackUnpublished(this, publication), scope)
}
/**
... ... @@ -420,11 +429,15 @@ internal constructor(
}
}
/**
* @suppress
*/
fun onRemoteMuteChanged(trackSid: String, muted: Boolean) {
val pub = tracks[trackSid]
pub?.muted = muted
}
interface PublishListener {
fun onPublishSuccess(publication: TrackPublication) {}
fun onPublishFailure(exception: Exception) {}
... ...
package io.livekit.android.room.participant
import io.livekit.android.dagger.InjectionNames
import io.livekit.android.events.BroadcastEventBus
import io.livekit.android.events.ParticipantEvent
import io.livekit.android.room.track.LocalTrackPublication
import io.livekit.android.room.track.RemoteTrackPublication
import io.livekit.android.room.track.Track
import io.livekit.android.room.track.TrackPublication
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import livekit.LivekitModels
import javax.inject.Named
open class Participant(
var sid: String,
identity: String? = null,
@Named(InjectionNames.DISPATCHER_DEFAULT)
coroutineDispatcher: CoroutineDispatcher,
) {
protected val scope = CoroutineScope(coroutineDispatcher + SupervisorJob())
protected val eventBus = BroadcastEventBus<ParticipantEvent>()
val events = eventBus.readOnly()
open class Participant(var sid: String, identity: String? = null) {
var participantInfo: LivekitModels.ParticipantInfo? = null
private set
var identity: String? = identity
... ... @@ -15,11 +32,12 @@ open class Participant(var sid: String, identity: String? = null) {
internal set
var isSpeaking: Boolean = false
internal set(v) {
val changed = v == field
val changed = v != field
field = v
if (changed) {
listener?.onSpeakingChanged(this)
internalListener?.onSpeakingChanged(this)
eventBus.postEvent(ParticipantEvent.SpeakingChanged(this, v), scope)
}
}
var metadata: String? = null
... ... @@ -29,6 +47,7 @@ open class Participant(var sid: String, identity: String? = null) {
if (prevMetadata != v) {
listener?.onMetadataChanged(this, prevMetadata)
internalListener?.onMetadataChanged(this, prevMetadata)
eventBus.postEvent(ParticipantEvent.MetadataChanged(this, prevMetadata), scope)
}
}
var connectionQuality: ConnectionQuality = ConnectionQuality.UNKNOWN
... ... @@ -37,11 +56,13 @@ open class Participant(var sid: String, identity: String? = null) {
/**
* Listener for when participant properties change
*/
@Deprecated("Use events instead")
var listener: ParticipantListener? = null
/**
* @suppress
*/
@Deprecated("Use events instead")
internal var internalListener: ParticipantListener? = null
val hasInfo
... ... @@ -152,9 +173,24 @@ open class Participant(var sid: String, identity: String? = null) {
override fun hashCode(): Int {
return sid.hashCode()
}
}
// Internal methods just for posting events.
internal fun onTrackMuted(trackPublication: TrackPublication) {
listener?.onTrackMuted(trackPublication, this)
internalListener?.onTrackMuted(trackPublication, this)
eventBus.postEvent(ParticipantEvent.TrackMuted(this, trackPublication), scope)
}
internal fun onTrackUnmuted(trackPublication: TrackPublication) {
listener?.onTrackUnmuted(trackPublication, this)
internalListener?.onTrackUnmuted(trackPublication, this)
eventBus.postEvent(ParticipantEvent.TrackUnmuted(this, trackPublication), scope)
}
}
@Deprecated("Use Participant.events instead.")
interface ParticipantListener {
// all participants
/**
... ...
package io.livekit.android.room.participant
import io.livekit.android.events.ParticipantEvent
import io.livekit.android.room.SignalClient
import io.livekit.android.room.track.*
import io.livekit.android.util.CloseableCoroutineScope
... ... @@ -18,19 +19,22 @@ class RemoteParticipant(
identity: String? = null,
val signalClient: SignalClient,
private val ioDispatcher: CoroutineDispatcher,
) : Participant(sid, identity) {
defaultdispatcher: CoroutineDispatcher,
) : Participant(sid, identity, defaultdispatcher) {
/**
* @suppress
*/
constructor(
info: LivekitModels.ParticipantInfo,
signalClient: SignalClient,
ioDispatcher: CoroutineDispatcher
ioDispatcher: CoroutineDispatcher,
defaultdispatcher: CoroutineDispatcher,
) : this(
info.sid,
info.identity,
signalClient,
ioDispatcher,
defaultdispatcher
) {
updateFromInfo(info)
}
... ... @@ -73,6 +77,7 @@ class RemoteParticipant(
for (publication in newTrackPublications.values) {
internalListener?.onTrackPublished(publication, this)
listener?.onTrackPublished(publication, this)
eventBus.postEvent(ParticipantEvent.TrackPublished(this, publication), scope)
}
}
... ... @@ -112,6 +117,7 @@ class RemoteParticipant(
internalListener?.onTrackSubscriptionFailed(sid, exception, this)
listener?.onTrackSubscriptionFailed(sid, exception, this)
eventBus.postEvent(ParticipantEvent.TrackSubscriptionFailed(this, sid, exception), scope)
} else {
coroutineScope.launch {
delay(150)
... ... @@ -131,6 +137,7 @@ class RemoteParticipant(
internalListener?.onTrackSubscribed(track, publication, this)
listener?.onTrackSubscribed(track, publication, this)
eventBus.postEvent(ParticipantEvent.TrackSubscribed(this, track, publication), scope)
}
fun unpublishTrack(trackSid: String, sendUnpublish: Boolean = false) {
... ... @@ -146,13 +153,21 @@ class RemoteParticipant(
track.stop()
internalListener?.onTrackUnsubscribed(track, publication, this)
listener?.onTrackUnsubscribed(track, publication, this)
eventBus.postEvent(ParticipantEvent.TrackUnsubscribed(this, track, publication), scope)
}
if (sendUnpublish) {
internalListener?.onTrackUnpublished(publication, this)
listener?.onTrackUnpublished(publication, this)
eventBus.postEvent(ParticipantEvent.TrackUnpublished(this, publication), scope)
}
}
// Internal methods just for posting events.
internal fun onDataReceived(data: ByteArray) {
listener?.onDataReceived(data, this)
eventBus.postEvent(ParticipantEvent.DataReceived(this, data), scope)
}
companion object {
private const val KIND_AUDIO = "audio"
private const val KIND_VIDEO = "video"
... ...
... ... @@ -31,11 +31,9 @@ class LocalTrackPublication(
participant.engine.updateMuteStatus(sid, muted)
if (muted) {
participant.listener?.onTrackMuted(this, participant)
participant.internalListener?.onTrackMuted(this, participant)
participant.onTrackMuted(this)
} else {
participant.listener?.onTrackUnmuted(this, participant)
participant.internalListener?.onTrackUnmuted(this, participant)
participant.onTrackUnmuted(this)
}
}
}
... ...
... ... @@ -77,11 +77,9 @@ class RemoteTrackPublication(
field = v
val participant = this.participant.get() as? RemoteParticipant ?: return
if (v) {
participant.listener?.onTrackMuted(this, participant)
participant.internalListener?.onTrackMuted(this, participant)
participant.onTrackMuted(this)
} else {
participant.listener?.onTrackUnmuted(this, participant)
participant.internalListener?.onTrackUnmuted(this, participant)
participant.onTrackUnmuted(this)
}
}
... ...
package io.livekit.android.room.participant
import io.livekit.android.coroutines.TestCoroutineRule
import io.livekit.android.events.EventCollector
import io.livekit.android.events.ParticipantEvent
import io.livekit.android.room.track.TrackPublication
import livekit.LivekitModels
import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
import org.junit.Before
import org.junit.Rule
import org.junit.Test
class ParticipantTest {
@get:Rule
var coroutineRule = TestCoroutineRule()
lateinit var participant: Participant
@Before
fun setup() {
participant = Participant("", null)
participant = Participant("", null, coroutineRule.dispatcher)
}
@Test
... ... @@ -58,7 +65,41 @@ class ParticipantTest {
checkValues(publicListener)
checkValues(internalListener)
}
@Test
fun setMetadataChangedEvent() {
val eventCollector = EventCollector(participant.events, coroutineRule.scope)
val prevMetadata = participant.metadata
val metadata = "metadata"
participant.metadata = metadata
val events = eventCollector.stopCollectingEvents()
assertEquals(1, events.size)
assertEquals(true, events[0] is ParticipantEvent.MetadataChanged)
val event = events[0] as ParticipantEvent.MetadataChanged
assertEquals(prevMetadata, event.prevMetadata)
assertEquals(participant, event.participant)
}
@Test
fun setIsSpeakingChangedEvent() {
val eventCollector = EventCollector(participant.events, coroutineRule.scope)
val newIsSpeaking = !participant.isSpeaking
participant.isSpeaking = newIsSpeaking
val events = eventCollector.stopCollectingEvents()
assertEquals(1, events.size)
assertEquals(true, events[0] is ParticipantEvent.SpeakingChanged)
val event = events[0] as ParticipantEvent.SpeakingChanged
assertEquals(participant, event.participant)
assertEquals(newIsSpeaking, event.isSpeaking)
}
@Test
... ...
... ... @@ -23,7 +23,8 @@ class RemoteParticipantTest {
participant = RemoteParticipant(
"sid",
signalClient = signalClient,
ioDispatcher = coroutineRule.dispatcher
ioDispatcher = coroutineRule.dispatcher,
defaultdispatcher = coroutineRule.dispatcher,
)
}
... ... @@ -33,7 +34,12 @@ class RemoteParticipantTest {
.addTracks(TRACK_INFO)
.build()
participant = RemoteParticipant(info, signalClient, ioDispatcher = coroutineRule.dispatcher)
participant = RemoteParticipant(
info,
signalClient,
ioDispatcher = coroutineRule.dispatcher,
defaultdispatcher = coroutineRule.dispatcher,
)
assertEquals(1, participant.tracks.values.size)
assertNotNull(participant.getTrackPublication(TRACK_INFO.sid))
... ...