David Liu

flesh out room

@@ -69,6 +69,7 @@ dependencies { @@ -69,6 +69,7 @@ dependencies {
69 kapt 'com.google.dagger:dagger-compiler:2.33' 69 kapt 'com.google.dagger:dagger-compiler:2.33'
70 70
71 implementation 'com.github.ajalt:timberkt:1.5.1' 71 implementation 'com.github.ajalt:timberkt:1.5.1'
  72 + implementation 'com.vdurmont:semver4j:3.1.0'
72 73
73 testImplementation 'junit:junit:4.12' 74 testImplementation 'junit:junit:4.12'
74 androidTestImplementation 'androidx.test.ext:junit:1.1.2' 75 androidTestImplementation 'androidx.test.ext:junit:1.1.2'
@@ -237,7 +237,7 @@ constructor( @@ -237,7 +237,7 @@ constructor(
237 fun onParticipantUpdate(updates: List<Model.ParticipantInfo>) 237 fun onParticipantUpdate(updates: List<Model.ParticipantInfo>)
238 fun onActiveSpeakersChanged(speakers: List<Rtc.SpeakerInfo>) 238 fun onActiveSpeakersChanged(speakers: List<Rtc.SpeakerInfo>)
239 fun onClose(reason: String, code: Int) 239 fun onClose(reason: String, code: Int)
240 - fun onError(error: Error) 240 + fun onError(error: Exception)
241 } 241 }
242 242
243 companion object { 243 companion object {
@@ -37,7 +37,8 @@ constructor( @@ -37,7 +37,8 @@ constructor(
37 set(value) { 37 set(value) {
38 field = value 38 field = value
39 if (field) { 39 if (field) {
40 - listener?.onJoin(joinResponse) 40 + // TODO get rid of this assertion
  41 + listener?.onJoin(joinResponse!!)
41 joinResponse = null 42 joinResponse = null
42 } 43 }
43 } 44 }
@@ -127,14 +128,14 @@ constructor( @@ -127,14 +128,14 @@ constructor(
127 } 128 }
128 129
129 interface Listener { 130 interface Listener {
130 - fun onJoin(response: Rtc.JoinResponse?) 131 + fun onJoin(response: Rtc.JoinResponse)
131 fun onAddTrack(track: MediaStreamTrack, streams: Array<out MediaStream>) 132 fun onAddTrack(track: MediaStreamTrack, streams: Array<out MediaStream>)
132 fun onPublishLocalTrack(cid: String, track: Model.TrackInfo) 133 fun onPublishLocalTrack(cid: String, track: Model.TrackInfo)
133 fun onAddDataChannel(channel: DataChannel) 134 fun onAddDataChannel(channel: DataChannel)
134 fun onUpdateParticipants(updates: List<Model.ParticipantInfo>) 135 fun onUpdateParticipants(updates: List<Model.ParticipantInfo>)
135 fun onUpdateSpeakers(speakers: List<Rtc.SpeakerInfo>) 136 fun onUpdateSpeakers(speakers: List<Rtc.SpeakerInfo>)
136 fun onDisconnect(reason: String) 137 fun onDisconnect(reason: String)
137 - fun onFailToConnect(error: Error) 138 + fun onFailToConnect(error: Exception)
138 } 139 }
139 140
140 companion object { 141 companion object {
@@ -280,7 +281,7 @@ constructor( @@ -280,7 +281,7 @@ constructor(
280 listener?.onDisconnect(reason) 281 listener?.onDisconnect(reason)
281 } 282 }
282 283
283 - override fun onError(error: Error) { 284 + override fun onError(error: Exception) {
284 listener?.onFailToConnect(error) 285 listener?.onFailToConnect(error)
285 } 286 }
286 } 287 }
1 package io.livekit.android.room 1 package io.livekit.android.room
2 2
  3 +import com.github.ajalt.timberkt.Timber
  4 +import com.vdurmont.semver4j.Semver
3 import dagger.assisted.Assisted 5 import dagger.assisted.Assisted
4 import dagger.assisted.AssistedFactory 6 import dagger.assisted.AssistedFactory
5 import dagger.assisted.AssistedInject 7 import dagger.assisted.AssistedInject
6 import io.livekit.android.ConnectOptions 8 import io.livekit.android.ConnectOptions
7 import io.livekit.android.room.participant.LocalParticipant 9 import io.livekit.android.room.participant.LocalParticipant
  10 +import io.livekit.android.room.participant.Participant
8 import io.livekit.android.room.participant.RemoteParticipant 11 import io.livekit.android.room.participant.RemoteParticipant
  12 +import io.livekit.android.room.track.Track
  13 +import io.livekit.android.room.util.unpackedTrackLabel
  14 +import livekit.Model
  15 +import livekit.Rtc
  16 +import org.webrtc.DataChannel
  17 +import org.webrtc.MediaStream
  18 +import org.webrtc.MediaStreamTrack
9 19
10 class Room 20 class Room
11 @AssistedInject 21 @AssistedInject
12 constructor( 22 constructor(
13 @Assisted private val connectOptions: ConnectOptions, 23 @Assisted private val connectOptions: ConnectOptions,
14 private val engine: RTCEngine, 24 private val engine: RTCEngine,
15 -) { 25 +) : RTCEngine.Listener {
  26 + init {
  27 + engine.listener = this
  28 + }
16 29
17 enum class State { 30 enum class State {
18 CONNECTING, 31 CONNECTING,
@@ -31,13 +44,89 @@ constructor( @@ -31,13 +44,89 @@ constructor(
31 private set 44 private set
32 var state: State = State.DISCONNECTED 45 var state: State = State.DISCONNECTED
33 private set 46 private set
34 - var localParticipant: LocalParticipant? = TODO() 47 + var localParticipant: LocalParticipant? = null
  48 + private set
  49 + private val mutableRemoteParticipants = mutableMapOf<Participant.Sid, RemoteParticipant>()
  50 + val remoteParticipants: Map<Participant.Sid, RemoteParticipant>
  51 + get() = mutableRemoteParticipants
35 52
  53 + private val mutableActiveSpeakers = mutableListOf<Participant>()
  54 + val activeSpeakers: List<Participant>
  55 + get() = mutableActiveSpeakers
36 56
37 suspend fun connect(url: String, token: String, isSecure: Boolean) { 57 suspend fun connect(url: String, token: String, isSecure: Boolean) {
  58 + if (localParticipant != null) {
  59 + Timber.d { "Attempting to connect to room when already connected." }
  60 + return
  61 + }
38 engine.join(url, token, isSecure) 62 engine.join(url, token, isSecure)
39 } 63 }
40 64
  65 + fun disconnect() {
  66 + engine.close()
  67 + state = State.DISCONNECTED
  68 + listener?.onDisconnect(this, null)
  69 + }
  70 +
  71 + private fun handleParticipantDisconnect(sid: Participant.Sid, participant: RemoteParticipant) {
  72 + val removedParticipant = mutableRemoteParticipants.remove(sid) ?: return
  73 + removedParticipant.tracks.values.forEach { publication ->
  74 + removedParticipant.unpublishTrack(publication.trackSid)
  75 + }
  76 +
  77 + listener?.onparticipantDisconnected(this, removedParticipant)
  78 + }
  79 +
  80 + private fun getOrCreateRemoteParticipant(
  81 + sid: Participant.Sid,
  82 + info: Model.ParticipantInfo? = null
  83 + ): RemoteParticipant {
  84 + var participant = remoteParticipants[sid]
  85 + if (participant != null) {
  86 + return participant
  87 + }
  88 +
  89 + participant = if (info != null) {
  90 + RemoteParticipant(info)
  91 + } else {
  92 + RemoteParticipant(sid, null)
  93 + }
  94 + mutableRemoteParticipants[sid] = participant
  95 + return participant
  96 + }
  97 +
  98 + private fun handleSpeakerUpdate(speakerInfos: List<Rtc.SpeakerInfo>) {
  99 + val speakers = mutableListOf<Participant>()
  100 + val seenSids = mutableSetOf<Participant.Sid>()
  101 + val localParticipant = localParticipant
  102 + speakerInfos.forEach { speakerInfo ->
  103 + val speakerSid = Participant.Sid(speakerInfo.sid)
  104 + seenSids.add(speakerSid)
  105 +
  106 + if (speakerSid == localParticipant?.sid) {
  107 + localParticipant.audioLevel = speakerInfo.level
  108 + speakers.add(localParticipant)
  109 + } else {
  110 + val participant = remoteParticipants[speakerSid]
  111 + if (participant != null) {
  112 + participant.audioLevel = speakerInfo.level
  113 + speakers.add(participant)
  114 + }
  115 + }
  116 + }
  117 +
  118 + if (localParticipant != null && seenSids.contains(localParticipant.sid)) {
  119 + localParticipant.audioLevel = 0.0f
  120 + }
  121 + remoteParticipants.values
  122 + .filterNot { seenSids.contains(it.sid) }
  123 + .forEach { it.audioLevel = 0.0f }
  124 +
  125 + mutableActiveSpeakers.clear()
  126 + mutableActiveSpeakers.addAll(speakers)
  127 + listener?.onActiveSpeakersChanged(speakers, this)
  128 + }
  129 +
41 @AssistedFactory 130 @AssistedFactory
42 interface Factory { 131 interface Factory {
43 fun create(connectOptions: ConnectOptions): Room 132 fun create(connectOptions: ConnectOptions): Room
@@ -45,17 +134,95 @@ constructor( @@ -45,17 +134,95 @@ constructor(
45 134
46 interface Listener { 135 interface Listener {
47 fun onConnect(room: Room) 136 fun onConnect(room: Room)
48 - fun onDisconnect(room: Room, error: Exception)  
49 - fun onParticipantDidConnect(room: Room, participant: RemoteParticipant)  
50 - }  
51 -// func didConnect(room: Room)  
52 -// func didDisconnect(room: Room, error: Error?)  
53 -// func participantDidConnect(room: Room, participant: RemoteParticipant)  
54 -// func participantDidDisconnect(room: Room, participant: RemoteParticipant)  
55 -// func didFailToConnect(room: Room, error: Error)  
56 -// func isReconnecting(room: Room, error: Error)  
57 -// func didReconnect(room: Room)  
58 -// func didStartRecording(room: Room)  
59 -// func didStopRecording(room: Room)  
60 -// func activeSpeakersDidChange(speakers: [Participant], room: Room) 137 + fun onDisconnect(room: Room, error: Exception?)
  138 + fun onParticipantConnected(room: Room, participant: RemoteParticipant)
  139 + fun onparticipantDisconnected(room: Room, participant: RemoteParticipant)
  140 + fun onFailedToConnect(room: Room, error: Exception)
  141 + fun onReconnecting(room: Room, error: Exception)
  142 + fun onReconnect(room: Room)
  143 + fun onStartRecording(room: Room)
  144 + fun onStopRecording(room: Room)
  145 + fun onActiveSpeakersChanged(speakers: List<Participant>, room: Room)
  146 + }
  147 +
  148 + override fun onJoin(response: Rtc.JoinResponse) {
  149 + Timber.v { "engine did join, version: ${response.serverVersion}" }
  150 +
  151 + try {
  152 + val serverVersion = Semver(response.serverVersion)
  153 + if (serverVersion.major == 0 && serverVersion.minor < 5) {
  154 + Timber.e { "This version of livekit requires server version >= 0.5.x" }
  155 + return
  156 + }
  157 + } catch (e: Exception) {
  158 + Timber.e { "Unable to parse server version!" }
  159 + return
  160 + }
  161 + state = State.CONNECTED
  162 + sid = Sid(response.room.sid)
  163 + name = response.room.name
  164 +
  165 + if (response.hasParticipant()) {
  166 + localParticipant = LocalParticipant(response.participant, engine)
  167 + }
  168 + if (response.otherParticipantsList.isNotEmpty()) {
  169 + response.otherParticipantsList.forEach {
  170 + getOrCreateRemoteParticipant(Participant.Sid(it.sid), it)
  171 + }
  172 + }
  173 +
  174 + listener?.onConnect(this)
  175 + }
  176 +
  177 + override fun onAddTrack(track: MediaStreamTrack, streams: Array<out MediaStream>) {
  178 + if (streams.count() < 0) {
  179 + Timber.i { "add track with empty streams?" }
  180 + return
  181 + }
  182 +
  183 + val participantSid = Participant.Sid(streams.first().id)
  184 + val trackSid = Track.Sid(track.id())
  185 + val participant = getOrCreateRemoteParticipant(participantSid)
  186 + participant.addSubscribedMediaTrack(track, trackSid)
  187 + }
  188 +
  189 + override fun onAddDataChannel(channel: DataChannel) {
  190 + val unpackedTrackLabel = channel.unpackedTrackLabel()
  191 + val (participantSid, trackSid, name) = unpackedTrackLabel
  192 + val participant = getOrCreateRemoteParticipant(participantSid)
  193 + participant.addSubscribedDataTrack(channel, trackSid, name)
  194 + }
  195 +
  196 + override fun onPublishLocalTrack(cid: String, track: Model.TrackInfo) {
  197 + }
  198 +
  199 +
  200 + override fun onUpdateParticipants(updates: List<Model.ParticipantInfo>) {
  201 + for (info in updates) {
  202 + val participantSid = Participant.Sid(info.sid)
  203 + val isNewParticipant = remoteParticipants.contains(participantSid)
  204 + val participant = getOrCreateRemoteParticipant(participantSid, info)
  205 +
  206 + if (info.state == Model.ParticipantInfo.State.DISCONNECTED) {
  207 + handleParticipantDisconnect(participantSid, participant)
  208 + } else if (isNewParticipant) {
  209 + listener?.onParticipantConnected(this, participant)
  210 + } else {
  211 + participant.updateFromInfo(info)
  212 + }
  213 + }
  214 + }
  215 +
  216 + override fun onUpdateSpeakers(speakers: List<Rtc.SpeakerInfo>) {
  217 + handleSpeakerUpdate(speakers)
  218 + }
  219 +
  220 + override fun onDisconnect(reason: String) {
  221 + Timber.v { "engine did disconnect: $reason" }
  222 + listener?.onDisconnect(this, null)
  223 + }
  224 +
  225 + override fun onFailToConnect(error: Exception) {
  226 + listener?.onFailedToConnect(this, error)
  227 + }
61 } 228 }
@@ -173,7 +173,7 @@ class RemoteParticipant( @@ -173,7 +173,7 @@ class RemoteParticipant(
173 listener?.onSubscribe(dataTrack = publication, participant = this) 173 listener?.onSubscribe(dataTrack = publication, participant = this)
174 } 174 }
175 175
176 - private fun unpublishTrack(trackSid: Track.Sid, sendUnpublish: Boolean) { 176 + fun unpublishTrack(trackSid: Track.Sid, sendUnpublish: Boolean = false) {
177 val publication = tracks.remove(trackSid) ?: return 177 val publication = tracks.remove(trackSid) ?: return
178 178
179 when (publication) { 179 when (publication) {
  1 +package io.livekit.android.room.util
  2 +
  3 +import io.livekit.android.room.participant.Participant
  4 +import io.livekit.android.room.track.Track
  5 +import org.webrtc.DataChannel
  6 +
  7 +fun DataChannel.unpackedTrackLabel(): Triple<Participant.Sid, Track.Sid, String> {
  8 + val parts = label().split("|")
  9 + val participantSid: Participant.Sid
  10 + val trackSid: Track.Sid
  11 + val name: String
  12 +
  13 + if (parts.count() == 3) {
  14 + participantSid = Participant.Sid(parts[0])
  15 + trackSid = Track.Sid(parts[1])
  16 + name = parts[2]
  17 + } else {
  18 + participantSid = Participant.Sid("")
  19 + trackSid = Track.Sid("")
  20 + name = ""
  21 + }
  22 +
  23 + return Triple(participantSid, trackSid, name)
  24 +}