David Zhao

Moved callbacks to Room, removed Remote*Track classes

... ... @@ -137,7 +137,7 @@ constructor(
interface Listener {
fun onJoin(response: LivekitRtc.JoinResponse)
fun onAddTrack(track: MediaStreamTrack, streams: Array<out MediaStream>)
fun onPublishLocalTrack(cid: String, track: LivekitModels.TrackInfo)
// fun onPublishLocalTrack(cid: String, track: LivekitModels.TrackInfo)
fun onAddDataChannel(channel: DataChannel)
fun onUpdateParticipants(updates: List<LivekitModels.ParticipantInfo>)
fun onUpdateSpeakers(speakers: List<LivekitRtc.SpeakerInfo>)
... ... @@ -267,8 +267,7 @@ constructor(
return
}
cont.resume(response.track)
listener?.onPublishLocalTrack(cid, track)
// listener?.onPublishLocalTrack(cid, track)
}
override fun onParticipantUpdate(updates: List<LivekitModels.ParticipantInfo>) {
... ...
... ... @@ -9,10 +9,14 @@ import io.livekit.android.ConnectOptions
import io.livekit.android.room.participant.LocalParticipant
import io.livekit.android.room.participant.Participant
import io.livekit.android.room.participant.RemoteParticipant
import io.livekit.android.room.track.DataTrack
import io.livekit.android.room.track.Track
import io.livekit.android.room.track.TrackPublication
import io.livekit.android.room.util.unpackedTrackLabel
import livekit.LivekitModels
import livekit.LivekitRtc
import org.webrtc.*
import java.nio.ByteBuffer
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
import kotlin.coroutines.suspendCoroutine
... ... @@ -45,7 +49,7 @@ constructor(
private set
var state: State = State.DISCONNECTED
private set
var localParticipant: LocalParticipant? = null
lateinit var localParticipant: LocalParticipant
private set
private val mutableRemoteParticipants = mutableMapOf<String, RemoteParticipant>()
val remoteParticipants: Map<String, RemoteParticipant>
... ... @@ -57,10 +61,6 @@ constructor(
private var connectContinuation: Continuation<Unit>? = null
suspend fun connect(url: String, token: String) {
if (localParticipant != null) {
Timber.d { "Attempting to connect to room when already connected." }
return
}
engine.join(url, token)
return suspendCoroutine { connectContinuation = it }
... ... @@ -120,7 +120,7 @@ constructor(
}
}
if (localParticipant != null && !seenSids.contains(localParticipant.sid)) {
if (!seenSids.contains(localParticipant.sid)) {
localParticipant.audioLevel = 0.0f
}
remoteParticipants.values
... ... @@ -140,22 +140,6 @@ constructor(
fun create(connectOptions: ConnectOptions): Room
}
/**
* Room Listener, this class provides callbacks that clients should override.
*
*/
interface Listener {
fun onConnect(room: Room) {}
fun onDisconnect(room: Room, error: Exception?) {}
fun onParticipantConnected(room: Room, participant: RemoteParticipant) {}
fun onParticipantDisconnected(room: Room, participant: RemoteParticipant) {}
fun onFailedToConnect(room: Room, error: Exception) {}
fun onReconnecting(room: Room, error: Exception) {}
fun onReconnect(room: Room) {}
fun onMetadataChanged(room: Room, Participant: Participant, prevMetadata: String?) {}
fun onActiveSpeakersChanged(speakers: List<Participant>, room: Room) {}
}
//----------------------------------- RTCEngine.Listener ------------------------------------//
/**
* @suppress
... ... @@ -163,25 +147,18 @@ constructor(
override fun onJoin(response: LivekitRtc.JoinResponse) {
Timber.v { "engine did join, version: ${response.serverVersion}" }
try {
val serverVersion = Semver(response.serverVersion)
if (serverVersion.major == 0 && serverVersion.minor < 5) {
Timber.e { "This version of livekit requires server version >= 0.5.x" }
return
}
} catch (e: Exception) {
Timber.e { "Unable to parse server version!" }
return
}
state = State.CONNECTED
sid = Sid(response.room.sid)
name = response.room.name
if (response.hasParticipant()) {
if (!response.hasParticipant()) {
listener?.onFailedToConnect(this, RoomException.ConnectException("server didn't return any participants"))
return
}
val lp = LocalParticipant(response.participant, engine)
lp.listener = this
localParticipant = lp
}
if (response.otherParticipantsList.isNotEmpty()) {
response.otherParticipantsList.forEach {
getOrCreateRemoteParticipant(it.sid, it)
... ... @@ -190,7 +167,6 @@ constructor(
connectContinuation?.resume(Unit)
connectContinuation = null
listener?.onConnect(this)
}
/**
... ... @@ -218,11 +194,6 @@ constructor(
participant.addSubscribedDataTrack(channel, trackSid, name)
}
/**
* @suppress
*/
override fun onPublishLocalTrack(cid: String, track: LivekitModels.TrackInfo) {
}
/**
* @suppress
... ... @@ -231,8 +202,9 @@ constructor(
for (info in updates) {
val participantSid = info.sid
if(localParticipant?.sid == participantSid) {
localParticipant?.updateFromInfo(info)
if(localParticipant.sid == participantSid) {
localParticipant.updateFromInfo(info)
continue
}
val isNewParticipant = remoteParticipants.contains(participantSid)
... ... @@ -272,12 +244,48 @@ constructor(
//------------------------------- RemoteParticipant.Listener --------------------------------//
/**
* This is called for both Local and Remote participants
* @suppress
*/
override fun onMetadataChanged(participant: Participant, prevMetadata: String?) {
listener?.onMetadataChanged(this, participant, prevMetadata)
listener?.onMetadataChanged(participant, prevMetadata, this)
}
override fun onTrackPublished(publication: TrackPublication, participant: RemoteParticipant) {
listener?.onTrackPublished(publication, participant, this)
}
override fun onTrackUnpublished(publication: TrackPublication, participant: RemoteParticipant) {
listener?.onTrackUnpublished(publication, participant, this)
}
override fun onTrackSubscribed(track: Track, publication: TrackPublication, participant: RemoteParticipant) {
listener?.onTrackSubscribed(track, publication, participant, this)
}
override fun onTrackSubscriptionFailed(
sid: String,
exception: Exception,
participant: RemoteParticipant
) {
listener?.onTrackSubscriptionFailed(sid, exception, participant, this)
}
override fun onTrackUnsubscribed(
track: Track,
publication: TrackPublication,
participant: RemoteParticipant
) {
listener?.onTrackUnsubscribed(track, publication, participant, this)
}
override fun onDataReceived(
data: ByteBuffer,
dataTrack: DataTrack,
participant: RemoteParticipant
) {
listener?.onDataReceived(data, dataTrack, participant, this)
}
/**
* @suppress
... ... @@ -288,4 +296,86 @@ constructor(
viewRenderer.setScalingType(RendererCommon.ScalingType.SCALE_ASPECT_FIT)
viewRenderer.setEnableHardwareScaler(false /* enabled */);
}
/**
* Room Listener, this class provides callbacks that clients should override.
*
*/
interface Listener {
/**
* Disconnected from room
*/
fun onDisconnect(room: Room, error: Exception?) {}
/**
* When a [RemoteParticipant] joins after the local participant. It will not emit events
* for participants that are already in the room
*/
fun onParticipantConnected(room: Room, participant: RemoteParticipant) {}
/**
* When a [RemoteParticipant] leaves after the local participant has joined.
*/
fun onParticipantDisconnected(room: Room, participant: RemoteParticipant) {}
/**
* Could not connect to the room
*/
fun onFailedToConnect(room: Room, error: Exception) {}
// fun onReconnecting(room: Room, error: Exception) {}
// fun onReconnect(room: Room) {}
/**
* Active speakers changed. List of speakers are ordered by their audio level. loudest
* speakers first. This will include the [LocalParticipant] too.
*/
fun onActiveSpeakersChanged(speakers: List<Participant>, room: Room) {}
// Participant callbacks
/**
* Participant metadata is a simple way for app-specific state to be pushed to all users.
* When RoomService.UpdateParticipantMetadata is called to change a participant's state,
* this event will be fired for all clients in the room.
*/
fun onMetadataChanged(Participant: Participant, prevMetadata: String?, room: Room) {}
/**
* When a new track is published to room after the local participant has joined. It will
* not fire for tracks that are already published
*/
fun onTrackPublished(publication: TrackPublication, participant: RemoteParticipant, room: Room) {}
/**
* A [RemoteParticipant] has unpublished a track
*/
fun onTrackUnpublished(publication: TrackPublication, participant: RemoteParticipant, room: Room) {}
/**
* The [LocalParticipant] has subscribed to a new track. This event will always fire as
* long as new tracks are ready for use.
*/
fun onTrackSubscribed(track: Track, publication: TrackPublication, participant: RemoteParticipant, room: Room) {}
/**
* Could not subscribe to a track
*/
fun onTrackSubscriptionFailed(sid: String, exception: Exception, participant: RemoteParticipant, room: Room) {}
/**
* A subscribed track is no longer available. Clients should listen to this event and ensure
* the track removes all renderers
*/
fun onTrackUnsubscribed(track: Track, publications: TrackPublication, participant: RemoteParticipant, room: Room) {}
/**
* Message received over a [DataTrack]
*/
fun onDataReceived(data: ByteBuffer, dataTrack: DataTrack, participant: RemoteParticipant, room: Room) {}
}
}
sealed class RoomException(message: String? = null, cause: Throwable? = null) :
Exception(message, cause) {
class ConnectException(message: String? = null, cause: Throwable? = null) :
RoomException(message, cause)
}
\ No newline at end of file
... ...
... ... @@ -27,6 +27,8 @@ open class Participant(var sid: String, identity: String? = null) {
* @suppress
*/
fun addTrackPublication(publication: TrackPublication) {
val track = publication.track
track?.sid = publication.sid
tracks[publication.sid] = publication
when (publication.kind) {
LivekitModels.TrackType.AUDIO -> audioTracks[publication.sid] = publication
... ...
... ... @@ -15,7 +15,7 @@ import java.nio.ByteBuffer
class RemoteParticipant(
sid: String, name: String? = null
) : Participant(sid, name), RemoteDataTrack.Listener {
) : Participant(sid, name) {
/**
* @suppress
*/
... ... @@ -31,8 +31,7 @@ class RemoteParticipant(
private val coroutineScope = CloseableCoroutineScope(SupervisorJob())
fun getTrackPublication(sid: String): TrackPublication? =
tracks[sid]
fun getTrackPublication(sid: String): TrackPublication? = tracks[sid]
/**
* @suppress
... ... @@ -62,7 +61,7 @@ class RemoteParticipant(
if (hadInfo) {
for (publication in newTrackPublications.values) {
listener?.onPublish(publication, this)
listener?.onTrackPublished(publication, this)
}
}
... ... @@ -79,8 +78,8 @@ class RemoteParticipant(
fun addSubscribedMediaTrack(mediaTrack: MediaStreamTrack, sid: String, triesLeft: Int = 20) {
val publication = getTrackPublication(sid)
val track: Track = when (val kind = mediaTrack.kind()) {
KIND_AUDIO -> RemoteAudioTrack(sid = sid, mediaTrack = mediaTrack as AudioTrack, name = "")
KIND_VIDEO -> RemoteVideoTrack(sid = sid, mediaTrack = mediaTrack as VideoTrack, name = "")
KIND_AUDIO -> AudioTrack(rtcTrack = mediaTrack as AudioTrack, name = "")
KIND_VIDEO -> VideoTrack(rtcTrack = mediaTrack as VideoTrack, name = "")
else -> throw TrackException.InvalidTrackTypeException("invalid track type: $kind")
}
... ... @@ -90,7 +89,7 @@ class RemoteParticipant(
val exception = TrackException.InvalidTrackStateException(message)
Timber.e { "remote participant ${this.sid} --- $message" }
listener?.onFailToSubscribe(sid, exception, this)
listener?.onTrackSubscriptionFailed(sid, exception, this)
} else {
coroutineScope.launch {
delay(150)
... ... @@ -100,14 +99,14 @@ class RemoteParticipant(
return
}
val remoteTrack = track as RemoteTrack
publication.track = track
track.name = publication.name
remoteTrack.sid = publication.sid
track.sid = publication.sid
addTrackPublication(publication)
// TODO: how does mediatrack send ended event?
listener?.onSubscribe(track, publication, this)
listener?.onTrackSubscribed(track, publication, this)
}
/**
... ... @@ -117,20 +116,19 @@ class RemoteParticipant(
val track = DataTrack(name, dataChannel)
var publication = getTrackPublication(sid)
if (publication != null) {
publication.track = track
} else {
if (publication == null) {
val trackInfo = LivekitModels.TrackInfo.newBuilder()
.setSid(sid)
.setName(name)
.setType(LivekitModels.TrackType.DATA)
.build()
publication = TrackPublication(info = trackInfo, track = track)
publication = TrackPublication(info = trackInfo)
addTrackPublication(publication)
if (hasInfo) {
listener?.onPublish(publication, this)
listener?.onTrackPublished(publication, this)
}
}
publication.track = track
dataChannel.registerObserver(object : DataChannel.Observer {
override fun onBufferedAmountChange(previousAmount: Long) {}
... ... @@ -139,15 +137,15 @@ class RemoteParticipant(
val newState = dataChannel.state()
if (newState == DataChannel.State.CLOSED) {
publication.track = null
listener?.onUnsubscribe(track, publication, this@RemoteParticipant)
listener?.onTrackUnsubscribed(track, publication, this@RemoteParticipant)
}
}
override fun onMessage(buffer: DataChannel.Buffer) {
listener?.onReceive(buffer.data, track, this@RemoteParticipant)
listener?.onDataReceived(buffer.data, track, this@RemoteParticipant)
}
})
listener?.onSubscribe(track, publication, participant = this)
listener?.onTrackSubscribed(track, publication, participant = this)
}
fun unpublishTrack(trackSid: String, sendUnpublish: Boolean = false) {
... ... @@ -162,57 +160,49 @@ class RemoteParticipant(
val track = publication.track
if (track != null) {
track.stop()
listener?.onUnsubscribe(track, publication, this)
listener?.onTrackUnsubscribed(track, publication, this)
}
if (sendUnpublish) {
listener?.onUnpublish(publication, this)
listener?.onTrackUnpublished(publication, this)
}
}
override fun onReceiveString(message: String, dataTrack: DataTrack) {
TODO("Not yet implemented")
}
override fun onReceiveData(message: DataChannel.Buffer, dataTrack: DataTrack) {
TODO("Not yet implemented")
}
companion object {
private const val KIND_AUDIO = "audio"
private const val KIND_VIDEO = "video"
}
interface Listener: Participant.Listener {
fun onPublish(publication: TrackPublication, participant: RemoteParticipant) {}
fun onUnpublish(publication: TrackPublication, participant: RemoteParticipant) {}
fun onTrackPublished(publication: TrackPublication, participant: RemoteParticipant) {}
fun onTrackUnpublished(publication: TrackPublication, participant: RemoteParticipant) {}
fun onEnable(publication: TrackPublication, participant: RemoteParticipant) {}
fun onDisable(publication: TrackPublication, participant: RemoteParticipant) {}
fun onSubscribe(track: Track, publication: TrackPublication, participant: RemoteParticipant) {}
fun onFailToSubscribe(
fun onTrackSubscribed(track: Track, publication: TrackPublication, participant: RemoteParticipant) {}
fun onTrackSubscriptionFailed(
sid: String,
exception: Exception,
participant: RemoteParticipant
) {
}
fun onUnsubscribe(
fun onTrackUnsubscribed(
track: Track,
publications: TrackPublication,
publication: TrackPublication,
participant: RemoteParticipant
) {
}
fun onReceive(
fun onDataReceived(
data: ByteBuffer,
dataTrack: DataTrack,
participant: RemoteParticipant
) {
}
fun switchedOffVideo(track: RemoteVideoTrack, participant: RemoteParticipant) {}
fun switchedOnVideo(track: RemoteVideoTrack, participant: RemoteParticipant) {}
fun switchedOffVideo(track: VideoTrack, publication: TrackPublication, participant: RemoteParticipant) {}
fun switchedOnVideo(track: VideoTrack, publication: TrackPublication, participant: RemoteParticipant) {}
}
}
\ No newline at end of file
... ...
... ... @@ -7,8 +7,6 @@ import java.util.*
class LocalDataTrack(
val options: DataTrackOptions
) : DataTrack(options.name) {
var sid: String? = null
internal set
var cid: String = UUID.randomUUID().toString()
fun sendString(message: String) {
... ...
package io.livekit.android.room.track
class RemoteAudioTrack(
sid: String,
playbackEnabled: Boolean = true,
name: String,
mediaTrack: org.webrtc.AudioTrack
) : AudioTrack(name, mediaTrack), RemoteTrack {
override var sid: String = sid
var playbackEnabled = playbackEnabled
internal set
}
\ No newline at end of file
package io.livekit.android.room.track
import org.webrtc.DataChannel
class RemoteDataTrack(
override var sid: String,
name: String,
rtcTrack: DataChannel
) :
DataTrack(name, rtcTrack),
RemoteTrack {
var listener: Listener? = null
interface Listener {
fun onReceiveString(message: String, dataTrack: DataTrack)
fun onReceiveData(message: DataChannel.Buffer, dataTrack: DataTrack)
}
}
\ No newline at end of file
package io.livekit.android.room.track
interface RemoteTrack {
var sid: String
}
\ No newline at end of file
package io.livekit.android.room.track
class RemoteVideoTrack(
override var sid: String,
var switchedOff: Boolean = false,
name: String,
mediaTrack: org.webrtc.VideoTrack
) : VideoTrack(name, mediaTrack), RemoteTrack
... ... @@ -10,6 +10,8 @@ open class Track(name: String, kind: LivekitModels.TrackType) {
var kind = kind
internal set
var state: State = State.NONE
var sid: String? = null
internal set
enum class State {
ENDED, LIVE, NONE;
... ...
... ... @@ -34,10 +34,6 @@ class CallViewModel(
token,
ConnectOptions(false),
object : Room.Listener {
override fun onConnect(room: Room) {
updateParticipants(room)
}
override fun onDisconnect(room: Room, error: Exception?) {
}
... ... @@ -58,25 +54,15 @@ class CallViewModel(
override fun onFailedToConnect(room: Room, error: Exception) {
}
override fun onReconnecting(room: Room, error: Exception) {
}
override fun onReconnect(room: Room) {
updateParticipants(room)
}
override fun onActiveSpeakersChanged(speakers: List<Participant>, room: Room) {
}
override fun onMetadataChanged(
room: Room,
Participant: Participant,
prevMetadata: String?
) {
override fun onMetadataChanged(Participant: Participant, prevMetadata: String?, room: Room) {
}
}
)
updateParticipants(mutableRoom.value!!)
}
}
... ...
... ... @@ -28,7 +28,7 @@ class ParticipantItem(
viewBinding.run {
remoteParticipant.listener = object : RemoteParticipant.Listener {
override fun onSubscribe(
override fun onTrackSubscribed(
track: Track,
publication: TrackPublication,
participant: RemoteParticipant
... ...