David Liu

RemoteParticipant

1 package io.livekit.android.room.participant 1 package io.livekit.android.room.participant
2 2
3 -import io.livekit.android.room.track.Track  
4 -import io.livekit.android.room.track.TrackPublication 3 +import io.livekit.android.room.track.*
5 4
6 -class Participant(val sid: String, name: String? = null) { 5 +open class Participant(var sid: String, name: String? = null) {
7 inline class Sid(val sid: String) 6 inline class Sid(val sid: String)
8 7
9 var metadata: String? = null 8 var metadata: String? = null
@@ -20,10 +19,28 @@ class Participant(val sid: String, name: String? = null) { @@ -20,10 +19,28 @@ class Participant(val sid: String, name: String? = null) {
20 var dataTracks = mutableMapOf<Track.Sid, TrackPublication>() 19 var dataTracks = mutableMapOf<Track.Sid, TrackPublication>()
21 private set 20 private set
22 21
23 - fun addTrack(publication: TrackPublication){ 22 + fun addTrack(publication: TrackPublication) {
24 tracks[publication.trackSid] = publication 23 tracks[publication.trackSid] = publication
25 - when(publication) { 24 + when (publication) {
  25 + is RemoteAudioTrackPublication -> audioTracks[publication.trackSid] = publication
  26 + is RemoteVideoTrackPublication -> videoTracks[publication.trackSid] = publication
  27 + is RemoteDataTrackPublication -> dataTracks[publication.trackSid] = publication
  28 + }
  29 + }
  30 +
  31 + override fun equals(other: Any?): Boolean {
  32 + if (this === other) return true
  33 + if (javaClass != other?.javaClass) return false
  34 +
  35 + other as Participant
26 36
  37 + if (sid != other.sid) return false
  38 +
  39 + return true
27 } 40 }
  41 +
  42 + override fun hashCode(): Int {
  43 + return sid.hashCode()
28 } 44 }
  45 +
29 } 46 }
1 package io.livekit.android.room.participant 1 package io.livekit.android.room.participant
2 2
3 -class RemoteParticipant { 3 +import com.github.ajalt.timberkt.Timber
  4 +import io.livekit.android.room.track.*
  5 +import io.livekit.android.util.CloseableCoroutineScope
  6 +import kotlinx.coroutines.SupervisorJob
  7 +import kotlinx.coroutines.delay
  8 +import kotlinx.coroutines.launch
  9 +import livekit.Model
  10 +import org.webrtc.AudioTrack
  11 +import org.webrtc.DataChannel
  12 +import org.webrtc.MediaStreamTrack
  13 +import org.webrtc.VideoTrack
  14 +import java.nio.ByteBuffer
  15 +
  16 +class RemoteParticipant(
  17 + sid: String, name: String? = null
  18 +) : Participant(sid, name), RemoteDataTrack.Listener {
  19 +
  20 + constructor(info: Model.ParticipantInfo) : this(info.sid, info.identity) {
  21 + updateFromInfo(info)
  22 + }
  23 +
  24 + val remoteAudioTracks
  25 + get() = audioTracks.values.toList()
  26 + val remoteVideoTracks
  27 + get() = videoTracks.values.toList()
  28 + val remoteDataTracks
  29 + get() = dataTracks.values.toList()
  30 +
  31 + val listener: Listener? = null
  32 +
  33 + var participantInfo: Model.ParticipantInfo? = null
  34 +
  35 + val hasInfo
  36 + get() = participantInfo != null
  37 +
  38 + private val coroutineScope = CloseableCoroutineScope(SupervisorJob())
  39 +
  40 + fun getTrackPublication(sid: Track.Sid): RemoteTrackPublication? =
  41 + tracks[sid] as? RemoteTrackPublication
  42 +
  43 + fun updateFromInfo(info: Model.ParticipantInfo) {
  44 + val hadInfo = hasInfo
  45 + sid = info.sid
  46 + name = info.identity
  47 + participantInfo = info
  48 + metadata = info.metadata
  49 +
  50 + val validTrackPublication = mutableMapOf<Track.Sid, RemoteTrackPublication>()
  51 + val newTrackPublications = mutableMapOf<Track.Sid, RemoteTrackPublication>()
  52 +
  53 + for (trackInfo in info.tracksList) {
  54 + val trackSid = Track.Sid(trackInfo.sid)
  55 + var publication = getTrackPublication(trackSid)
  56 +
  57 + if (publication == null) {
  58 + publication = when (trackInfo.type) {
  59 + Model.TrackType.AUDIO -> RemoteAudioTrackPublication(trackInfo)
  60 + Model.TrackType.VIDEO -> RemoteVideoTrackPublication(trackInfo)
  61 + Model.TrackType.DATA -> RemoteDataTrackPublication(trackInfo)
  62 + Model.TrackType.UNRECOGNIZED -> throw TrackException.InvalidTrackTypeException()
  63 + null -> throw NullPointerException("trackInfo.type is null")
  64 + }
  65 +
  66 + newTrackPublications[trackSid] = publication
  67 + addTrack(publication)
  68 + } else {
  69 + publication.updateFromInfo(trackInfo)
  70 + }
  71 +
  72 + validTrackPublication[trackSid] = publication
  73 + }
  74 +
  75 + if (hadInfo) {
  76 + for (publication in newTrackPublications.values) {
  77 + sendTrackPublishedEvent(publication)
  78 + }
  79 + }
  80 +
  81 + val invalidKeys = tracks.keys - validTrackPublication.keys
  82 + for (invalidKey in invalidKeys) {
  83 + val publication = tracks[invalidKey] ?: continue
  84 + unpublishTrack(publication.trackSid, true)
  85 + }
  86 + }
  87 +
  88 + fun addSubscribedMediaTrack(rtcTrack: MediaStreamTrack, sid: Track.Sid, triesLeft: Int = 20) {
  89 + val publication = getTrackPublication(sid)
  90 + val track: Track = when (val kind = rtcTrack.kind()) {
  91 + KIND_AUDIO -> RemoteAudioTrack(sid = sid, rtcTrack = rtcTrack as AudioTrack, name = "")
  92 + KIND_VIDEO -> RemoteVideoTrack(sid = sid, rtcTrack = rtcTrack as VideoTrack, name = "")
  93 + else -> throw TrackException.InvalidTrackTypeException("invalid track type: $kind")
  94 + }
  95 +
  96 + if (publication == null) {
  97 + if (triesLeft == 0) {
  98 + val message = "Could not find published track with sid: $sid"
  99 + val exception = TrackException.InvalidTrackStateException(message)
  100 + Timber.e { "remote participant ${this.sid} --- $message" }
  101 + when (rtcTrack.kind()) {
  102 + KIND_AUDIO -> {
  103 + listener?.onFailToSubscribe(
  104 + audioTrack = track as RemoteAudioTrack,
  105 + exception = exception,
  106 + participant = this
  107 + )
  108 + }
  109 +
  110 + KIND_VIDEO -> {
  111 + listener?.onFailToSubscribe(
  112 + videoTrack = track as RemoteVideoTrack,
  113 + exception = exception,
  114 + participant = this
  115 + )
  116 + }
  117 + }
  118 + } else {
  119 + coroutineScope.launch {
  120 + delay(150)
  121 + addSubscribedMediaTrack(rtcTrack, sid, triesLeft - 1)
  122 + }
  123 + }
  124 + return
  125 + }
  126 +
  127 + val remoteTrack = track as RemoteTrack
  128 + publication.track = track
  129 + track.name = publication.trackName
  130 + remoteTrack.sid = publication.trackSid
  131 +
  132 + when (publication) {
  133 + is RemoteAudioTrackPublication -> listener?.onSubscribe(publication, this)
  134 + is RemoteVideoTrackPublication -> listener?.onSubscribe(publication, this)
  135 + else -> throw TrackException.InvalidTrackTypeException()
  136 + }
  137 + }
  138 +
  139 + fun addSubscribedDataTrack(rtcTrack: DataChannel, sid: Track.Sid, name: String) {
  140 + val track = RemoteDataTrack(sid, name, rtcTrack)
  141 + var publication = getTrackPublication(sid) as? RemoteDataTrackPublication
  142 +
  143 + if (publication != null) {
  144 + publication.track = track
  145 + } else {
  146 + val trackInfo = Model.TrackInfo.newBuilder()
  147 + .setSid(sid.sid)
  148 + .setName(name)
  149 + .setType(Model.TrackType.DATA)
  150 + .build()
  151 + publication = RemoteDataTrackPublication(info = trackInfo, track = track)
  152 + addTrack(publication)
  153 + if (hasInfo) {
  154 + sendTrackPublishedEvent(publication)
  155 + }
  156 + }
  157 +
  158 +
  159 + rtcTrack.registerObserver(object : DataChannel.Observer {
  160 + override fun onBufferedAmountChange(previousAmount: Long) {}
  161 +
  162 + override fun onStateChange() {
  163 + val newState = rtcTrack.state()
  164 + if (newState == DataChannel.State.CLOSED) {
  165 + listener?.onUnsubscribe(publication, this@RemoteParticipant)
  166 + }
  167 + }
  168 +
  169 + override fun onMessage(buffer: DataChannel.Buffer) {
  170 + listener?.onReceive(buffer.data, publication, this@RemoteParticipant)
  171 + }
  172 + })
  173 + listener?.onSubscribe(dataTrack = publication, participant = this)
  174 + }
  175 +
  176 + private fun unpublishTrack(trackSid: Track.Sid, sendUnpublish: Boolean) {
  177 + val publication = tracks.remove(trackSid) ?: return
  178 +
  179 + when (publication) {
  180 + is RemoteAudioTrackPublication -> audioTracks.remove(trackSid)
  181 + is RemoteVideoTrackPublication -> videoTracks.remove(trackSid)
  182 + is RemoteDataTrackPublication -> {
  183 + dataTracks.remove(trackSid)
  184 + publication.dataTrack?.rtcTrack?.unregisterObserver()
  185 + }
  186 + else -> throw TrackException.InvalidTrackTypeException()
  187 + }
  188 +
  189 + if (publication.track != null) {
  190 + // TODO: need to stop track?
  191 + publication.track
  192 + sendTrackUnsubscribedEvent(publication)
  193 + }
  194 + if (sendUnpublish) {
  195 + sendTrackUnpublishedEvent(publication)
  196 + }
  197 + }
  198 +
  199 + private fun sendTrackUnsubscribedEvent(publication: TrackPublication) {
  200 + when (publication) {
  201 + is RemoteAudioTrackPublication -> listener?.onUnsubscribe(publication, this)
  202 + is RemoteVideoTrackPublication -> listener?.onUnsubscribe(publication, this)
  203 + is RemoteDataTrackPublication -> listener?.onUnsubscribe(publication, this)
  204 + else -> throw TrackException.InvalidTrackTypeException()
  205 + }
  206 + }
  207 +
  208 + private fun sendTrackUnpublishedEvent(publication: TrackPublication) {
  209 + when (publication) {
  210 + is RemoteAudioTrackPublication -> listener?.onUnpublish(publication, this)
  211 + is RemoteVideoTrackPublication -> listener?.onUnpublish(publication, this)
  212 + is RemoteDataTrackPublication -> listener?.onUnpublish(publication, this)
  213 + else -> throw TrackException.InvalidTrackTypeException()
  214 + }
  215 + }
  216 +
  217 + private fun sendTrackPublishedEvent(publication: RemoteTrackPublication) {
  218 + when (publication) {
  219 + is RemoteAudioTrackPublication -> listener?.onPublish(publication, this)
  220 + is RemoteVideoTrackPublication -> listener?.onPublish(publication, this)
  221 + is RemoteDataTrackPublication -> listener?.onPublish(publication, this)
  222 + else -> throw TrackException.InvalidTrackTypeException()
  223 + }
  224 + }
  225 +
  226 + override fun onReceiveString(message: String, dataTrack: DataTrack) {
  227 + TODO("Not yet implemented")
  228 + }
  229 +
  230 + override fun onReceiveData(message: DataChannel.Buffer, dataTrack: DataTrack) {
  231 + TODO("Not yet implemented")
  232 + }
  233 +
  234 + companion object {
  235 + private const val KIND_AUDIO = "audio"
  236 + private const val KIND_VIDEO = "video"
  237 + }
  238 +
  239 + interface Listener {
  240 + fun onPublish(audioTrack: RemoteAudioTrackPublication, participant: RemoteParticipant)
  241 + fun onUnpublish(audioTrack: RemoteAudioTrackPublication, participant: RemoteParticipant)
  242 + fun onPublish(videoTrack: RemoteVideoTrackPublication, participant: RemoteParticipant)
  243 + fun onUnpublish(videoTrack: RemoteVideoTrackPublication, participant: RemoteParticipant)
  244 + fun onPublish(dataTrack: RemoteDataTrackPublication, participant: RemoteParticipant)
  245 + fun onUnpublish(dataTrack: RemoteDataTrackPublication, participant: RemoteParticipant)
  246 +
  247 + fun onEnable(audioTrack: RemoteAudioTrackPublication, participant: RemoteParticipant)
  248 + fun onDisable(audioTrack: RemoteAudioTrackPublication, participant: RemoteParticipant)
  249 + fun onEnable(videoTrack: RemoteVideoTrackPublication, participant: RemoteParticipant)
  250 + fun onDisable(videoTrack: RemoteVideoTrackPublication, participant: RemoteParticipant)
  251 +
  252 + fun onSubscribe(audioTrack: RemoteAudioTrackPublication, participant: RemoteParticipant)
  253 + fun onFailToSubscribe(
  254 + audioTrack: RemoteAudioTrack,
  255 + exception: Exception,
  256 + participant: RemoteParticipant
  257 + )
  258 +
  259 + fun onUnsubscribe(audioTrack: RemoteAudioTrackPublication, participant: RemoteParticipant)
  260 +
  261 + fun onSubscribe(videoTrack: RemoteVideoTrackPublication, participant: RemoteParticipant)
  262 + fun onFailToSubscribe(
  263 + videoTrack: RemoteVideoTrack,
  264 + exception: Exception,
  265 + participant: RemoteParticipant
  266 + )
  267 +
  268 + fun onUnsubscribe(videoTrack: RemoteVideoTrackPublication, participant: RemoteParticipant)
  269 +
  270 + fun onSubscribe(dataTrack: RemoteDataTrackPublication, participant: RemoteParticipant)
  271 + fun onFailToSubscribe(
  272 + dataTrack: RemoteDataTrackPublication,
  273 + exception: Exception,
  274 + participant: RemoteParticipant
  275 + )
  276 +
  277 + fun onUnsubscribe(dataTrack: RemoteDataTrackPublication, participant: RemoteParticipant)
  278 + fun onReceive(
  279 + data: ByteBuffer,
  280 + dataTrack: RemoteDataTrackPublication,
  281 + participant: RemoteParticipant
  282 + )
  283 +
  284 + //fun networkQualityDidChange(networkQualityLevel: NetworkQualityLevel, participant: remoteParticipant)
  285 + fun switchedOffVideo(track: RemoteVideoTrack, participant: RemoteParticipant)
  286 + fun switchedOnVideo(track: RemoteVideoTrack, participant: RemoteParticipant)
  287 +// fun onChangePublishPriority(videoTrack: RemoteVideoTrackPublication, priority: PublishPriority, participant: RemoteParticipant)
  288 +// fun onChangePublishPriority(audioTrack: RemoteAudioTrackPublication, priority: PublishPriority, participant: RemoteParticipant)
  289 +// fun onChangePublishPriority(dataTrack: RemoteDataTrackPublication, priority: PublishPriority, participant: RemoteParticipant)
  290 + }
  291 +
4 } 292 }
  1 +package io.livekit.android.room.track
  2 +
  3 +import org.webrtc.DataChannel
  4 +
  5 +open class DataTrack(
  6 + name: String,
  7 + val rtcTrack: DataChannel
  8 +) : Track(name, stateFromRTCDataChannelState(rtcTrack.state())) {
  9 +
  10 + /**
  11 + * TODO: These values are only available at [DataChannel.Init]
  12 + */
  13 + val ordered: Boolean = TODO()
  14 + val maxPacketLifeTime: Int = TODO()
  15 + val maxRetransmits: Int = TODO()
  16 +
  17 +}
  1 +package io.livekit.android.room.track
  2 +
  3 +interface DataTrackPublication {
  4 + val dataTrack: DataTrack?
  5 +}
  1 +package io.livekit.android.room.track
  2 +
  3 +import org.webrtc.DataChannel
  4 +
  5 +class RemoteDataTrack(
  6 + override var sid: Sid,
  7 + name: String,
  8 + rtcTrack: DataChannel
  9 +) :
  10 + DataTrack(name, rtcTrack),
  11 + RemoteTrack {
  12 +
  13 + var listener: Listener? = null
  14 +
  15 + interface Listener {
  16 + fun onReceiveString(message: String, dataTrack: DataTrack)
  17 + fun onReceiveData(message: DataChannel.Buffer, dataTrack: DataTrack)
  18 + }
  19 +}
  1 +package io.livekit.android.room.track
  2 +
  3 +import livekit.Model
  4 +
  5 +class RemoteDataTrackPublication(
  6 + info: Model.TrackInfo,
  7 + track: Track? = null
  8 +) : RemoteTrackPublication(info, track), DataTrackPublication {
  9 + override val dataTrack: DataTrack?
  10 + get() = track as? DataTrack
  11 +}