davidliu
Committed by GitHub

Protocol 7 support: server unpublish local track and permission change updates (#72)

1 package io.livekit.android.events 1 package io.livekit.android.events
2 2
3 -import io.livekit.android.room.Room  
4 import io.livekit.android.room.participant.LocalParticipant 3 import io.livekit.android.room.participant.LocalParticipant
5 import io.livekit.android.room.participant.Participant 4 import io.livekit.android.room.participant.Participant
  5 +import io.livekit.android.room.participant.ParticipantPermission
6 import io.livekit.android.room.participant.RemoteParticipant 6 import io.livekit.android.room.participant.RemoteParticipant
7 import io.livekit.android.room.track.LocalTrackPublication 7 import io.livekit.android.room.track.LocalTrackPublication
8 import io.livekit.android.room.track.RemoteTrackPublication 8 import io.livekit.android.room.track.RemoteTrackPublication
@@ -118,4 +118,15 @@ sealed class ParticipantEvent(open val participant: Participant) : Event() { @@ -118,4 +118,15 @@ sealed class ParticipantEvent(open val participant: Participant) : Event() {
118 val trackPublication: RemoteTrackPublication, 118 val trackPublication: RemoteTrackPublication,
119 val subscriptionAllowed: Boolean 119 val subscriptionAllowed: Boolean
120 ) : ParticipantEvent(participant) 120 ) : ParticipantEvent(participant)
  121 +
  122 + /**
  123 + * A participant's permissions have changed.
  124 + *
  125 + * Currently only fires for the local participant.
  126 + */
  127 + class ParticipantPermissionsChanged(
  128 + override val participant: Participant,
  129 + val newPermissions: ParticipantPermission?,
  130 + val oldPermissions: ParticipantPermission?,
  131 + ) : ParticipantEvent(participant)
121 } 132 }
1 package io.livekit.android.events 1 package io.livekit.android.events
2 2
3 import io.livekit.android.room.Room 3 import io.livekit.android.room.Room
4 -import io.livekit.android.room.participant.ConnectionQuality  
5 -import io.livekit.android.room.participant.LocalParticipant  
6 -import io.livekit.android.room.participant.Participant  
7 -import io.livekit.android.room.participant.RemoteParticipant 4 +import io.livekit.android.room.participant.*
8 import io.livekit.android.room.track.LocalTrackPublication 5 import io.livekit.android.room.track.LocalTrackPublication
9 import io.livekit.android.room.track.RemoteTrackPublication 6 import io.livekit.android.room.track.RemoteTrackPublication
10 import io.livekit.android.room.track.Track 7 import io.livekit.android.room.track.Track
@@ -157,4 +154,15 @@ sealed class RoomEvent(val room: Room) : Event() { @@ -157,4 +154,15 @@ sealed class RoomEvent(val room: Room) : Event() {
157 154
158 class FailedToConnect(room: Room, val error: Throwable) : RoomEvent(room) 155 class FailedToConnect(room: Room, val error: Throwable) : RoomEvent(room)
159 156
  157 + /**
  158 + * A participant's permissions have changed.
  159 + *
  160 + * Currently only fires for the local participant.
  161 + */
  162 + class ParticipantPermissionsChanged(
  163 + room: Room,
  164 + val participant: Participant,
  165 + val newPermissions: ParticipantPermission?,
  166 + val oldPermissions: ParticipantPermission?,
  167 + ) : RoomEvent(room)
160 } 168 }
@@ -519,6 +519,7 @@ internal constructor( @@ -519,6 +519,7 @@ internal constructor(
519 fun onSignalConnected(isResume: Boolean) 519 fun onSignalConnected(isResume: Boolean)
520 fun onFullReconnecting() 520 fun onFullReconnecting()
521 suspend fun onPostReconnect(isFullReconnect: Boolean) 521 suspend fun onPostReconnect(isFullReconnect: Boolean)
  522 + fun onLocalTrackUnpublished(trackUnpublished: LivekitRtc.TrackUnpublishedResponse)
522 } 523 }
523 524
524 companion object { 525 companion object {
@@ -671,6 +672,10 @@ internal constructor( @@ -671,6 +672,10 @@ internal constructor(
671 sessionToken = token 672 sessionToken = token
672 } 673 }
673 674
  675 + override fun onLocalTrackUnpublished(trackUnpublished: LivekitRtc.TrackUnpublishedResponse) {
  676 + listener?.onLocalTrackUnpublished(trackUnpublished)
  677 + }
  678 +
674 //--------------------------------- DataChannel.Observer ------------------------------------// 679 //--------------------------------- DataChannel.Observer ------------------------------------//
675 680
676 override fun onBufferedAmountChange(previousAmount: Long) { 681 override fun onBufferedAmountChange(previousAmount: Long) {
@@ -197,6 +197,23 @@ constructor( @@ -197,6 +197,23 @@ constructor(
197 if (_localParticipant == null) { 197 if (_localParticipant == null) {
198 val lp = localParticipantFactory.create(response.participant, dynacast) 198 val lp = localParticipantFactory.create(response.participant, dynacast)
199 lp.internalListener = this 199 lp.internalListener = this
  200 + coroutineScope.launch {
  201 + lp.events.collect {
  202 + when (it) {
  203 + is ParticipantEvent.ParticipantPermissionsChanged -> eventBus.postEvent(
  204 + RoomEvent.ParticipantPermissionsChanged(
  205 + room = this@Room,
  206 + participant = it.participant,
  207 + newPermissions = it.newPermissions,
  208 + oldPermissions = it.oldPermissions,
  209 + )
  210 + )
  211 + else -> {
  212 + /* do nothing */
  213 + }
  214 + }
  215 + }
  216 + }
200 _localParticipant = lp 217 _localParticipant = lp
201 } else { 218 } else {
202 localParticipant.updateFromInfo(response.participant) 219 localParticipant.updateFromInfo(response.participant)
@@ -264,6 +281,17 @@ constructor( @@ -264,6 +281,17 @@ constructor(
264 it.subscriptionAllowed 281 it.subscriptionAllowed
265 ) 282 )
266 ) 283 )
  284 + is ParticipantEvent.ParticipantPermissionsChanged -> eventBus.postEvent(
  285 + RoomEvent.ParticipantPermissionsChanged(
  286 + room = this@Room,
  287 + participant = it.participant,
  288 + newPermissions = it.newPermissions,
  289 + oldPermissions = it.oldPermissions,
  290 + )
  291 + )
  292 + else -> {
  293 + /* do nothing */
  294 + }
267 } 295 }
268 } 296 }
269 } 297 }
@@ -651,7 +679,7 @@ constructor( @@ -651,7 +679,7 @@ constructor(
651 val pubs = participant.tracks.values.toList() 679 val pubs = participant.tracks.values.toList()
652 for (pub in pubs) { 680 for (pub in pubs) {
653 val remotePub = pub as? RemoteTrackPublication ?: continue 681 val remotePub = pub as? RemoteTrackPublication ?: continue
654 - if(remotePub.subscribed) { 682 + if (remotePub.subscribed) {
655 remotePub.sendUpdateTrackSettings.invoke() 683 remotePub.sendUpdateTrackSettings.invoke()
656 } 684 }
657 } 685 }
@@ -659,6 +687,13 @@ constructor( @@ -659,6 +687,13 @@ constructor(
659 } 687 }
660 } 688 }
661 689
  690 + /**
  691 + * @suppress
  692 + */
  693 + override fun onLocalTrackUnpublished(trackUnpublished: LivekitRtc.TrackUnpublishedResponse) {
  694 + localParticipant.handleLocalTrackUnpublished(trackUnpublished)
  695 + }
  696 +
662 //------------------------------- ParticipantListener --------------------------------// 697 //------------------------------- ParticipantListener --------------------------------//
663 /** 698 /**
664 * This is called for both Local and Remote participants 699 * This is called for both Local and Remote participants
@@ -525,6 +525,9 @@ constructor( @@ -525,6 +525,9 @@ constructor(
525 LivekitRtc.SignalResponse.MessageCase.REFRESH_TOKEN -> { 525 LivekitRtc.SignalResponse.MessageCase.REFRESH_TOKEN -> {
526 listener?.onRefreshToken(response.refreshToken) 526 listener?.onRefreshToken(response.refreshToken)
527 } 527 }
  528 + LivekitRtc.SignalResponse.MessageCase.TRACK_UNPUBLISHED -> {
  529 + listener?.onLocalTrackUnpublished(response.trackUnpublished)
  530 + }
528 LivekitRtc.SignalResponse.MessageCase.MESSAGE_NOT_SET, 531 LivekitRtc.SignalResponse.MessageCase.MESSAGE_NOT_SET,
529 null -> { 532 null -> {
530 LKLog.v { "empty messageCase!" } 533 LKLog.v { "empty messageCase!" }
@@ -567,6 +570,7 @@ constructor( @@ -567,6 +570,7 @@ constructor(
567 fun onSubscribedQualityUpdate(subscribedQualityUpdate: LivekitRtc.SubscribedQualityUpdate) 570 fun onSubscribedQualityUpdate(subscribedQualityUpdate: LivekitRtc.SubscribedQualityUpdate)
568 fun onSubscriptionPermissionUpdate(subscriptionPermissionUpdate: LivekitRtc.SubscriptionPermissionUpdate) 571 fun onSubscriptionPermissionUpdate(subscriptionPermissionUpdate: LivekitRtc.SubscriptionPermissionUpdate)
569 fun onRefreshToken(token: String) 572 fun onRefreshToken(token: String)
  573 + fun onLocalTrackUnpublished(trackUnpublished: LivekitRtc.TrackUnpublishedResponse)
570 } 574 }
571 575
572 companion object { 576 companion object {
@@ -583,7 +587,7 @@ constructor( @@ -583,7 +587,7 @@ constructor(
583 const val SD_TYPE_ANSWER = "answer" 587 const val SD_TYPE_ANSWER = "answer"
584 const val SD_TYPE_OFFER = "offer" 588 const val SD_TYPE_OFFER = "offer"
585 const val SD_TYPE_PRANSWER = "pranswer" 589 const val SD_TYPE_PRANSWER = "pranswer"
586 - const val PROTOCOL_VERSION = 6 590 + const val PROTOCOL_VERSION = 7
587 const val SDK_TYPE = "android" 591 const val SDK_TYPE = "android"
588 592
589 private val skipQueueTypes = listOf( 593 private val skipQueueTypes = listOf(
@@ -474,15 +474,12 @@ internal constructor( @@ -474,15 +474,12 @@ internal constructor(
474 } 474 }
475 } 475 }
476 476
477 - /**  
478 - * @suppress  
479 - */  
480 - fun onRemoteMuteChanged(trackSid: String, muted: Boolean) { 477 + internal fun onRemoteMuteChanged(trackSid: String, muted: Boolean) {
481 val pub = tracks[trackSid] 478 val pub = tracks[trackSid]
482 pub?.muted = muted 479 pub?.muted = muted
483 } 480 }
484 481
485 - fun handleSubscribedQualityUpdate(subscribedQualityUpdate: LivekitRtc.SubscribedQualityUpdate) { 482 + internal fun handleSubscribedQualityUpdate(subscribedQualityUpdate: LivekitRtc.SubscribedQualityUpdate) {
486 if (!dynacast) { 483 if (!dynacast) {
487 return 484 return
488 } 485 }
@@ -515,6 +512,17 @@ internal constructor( @@ -515,6 +512,17 @@ internal constructor(
515 } 512 }
516 } 513 }
517 514
  515 + internal fun handleLocalTrackUnpublished(unpublishedResponse: LivekitRtc.TrackUnpublishedResponse) {
  516 + val pub = tracks[unpublishedResponse.trackSid]
  517 + val track = pub?.track
  518 + if (track == null) {
  519 + LKLog.w { "Received unpublished track response for unknown or non-published track: ${unpublishedResponse.trackSid}" }
  520 + return
  521 + }
  522 +
  523 + unpublishTrack(track)
  524 + }
  525 +
518 fun prepareForFullReconnect() { 526 fun prepareForFullReconnect() {
519 val pubs = localTrackPublications // creates a copy, so is safe from the following removal. 527 val pubs = localTrackPublications // creates a copy, so is safe from the following removal.
520 tracks = tracks.toMutableMap().apply { clear() } 528 tracks = tracks.toMutableMap().apply { clear() }
@@ -9,6 +9,7 @@ import io.livekit.android.room.track.RemoteTrackPublication @@ -9,6 +9,7 @@ import io.livekit.android.room.track.RemoteTrackPublication
9 import io.livekit.android.room.track.Track 9 import io.livekit.android.room.track.Track
10 import io.livekit.android.room.track.TrackPublication 10 import io.livekit.android.room.track.TrackPublication
11 import io.livekit.android.util.FlowObservable 11 import io.livekit.android.util.FlowObservable
  12 +import io.livekit.android.util.LKLog
12 import io.livekit.android.util.flow 13 import io.livekit.android.util.flow
13 import io.livekit.android.util.flowDelegate 14 import io.livekit.android.util.flowDelegate
14 import kotlinx.coroutines.CoroutineDispatcher 15 import kotlinx.coroutines.CoroutineDispatcher
@@ -18,6 +19,7 @@ import kotlinx.coroutines.flow.SharingStarted @@ -18,6 +19,7 @@ import kotlinx.coroutines.flow.SharingStarted
18 import kotlinx.coroutines.flow.map 19 import kotlinx.coroutines.flow.map
19 import kotlinx.coroutines.flow.stateIn 20 import kotlinx.coroutines.flow.stateIn
20 import livekit.LivekitModels 21 import livekit.LivekitModels
  22 +import livekit.LivekitRtc
21 import javax.inject.Named 23 import javax.inject.Named
22 24
23 open class Participant( 25 open class Participant(
@@ -88,6 +90,25 @@ open class Participant( @@ -88,6 +90,25 @@ open class Participant(
88 internal set 90 internal set
89 91
90 /** 92 /**
  93 + *
  94 + */
  95 + @FlowObservable
  96 + @get:FlowObservable
  97 + var permissions: ParticipantPermission? by flowDelegate(null) { newPermissions, oldPermissions ->
  98 + if (newPermissions != oldPermissions) {
  99 + eventBus.postEvent(
  100 + ParticipantEvent.ParticipantPermissionsChanged(
  101 + this,
  102 + newPermissions,
  103 + oldPermissions
  104 + ),
  105 + scope
  106 + )
  107 + }
  108 + }
  109 + internal set
  110 +
  111 + /**
91 * Changes can be observed by using [io.livekit.android.util.flow] 112 * Changes can be observed by using [io.livekit.android.util.flow]
92 */ 113 */
93 @FlowObservable 114 @FlowObservable
@@ -222,6 +243,9 @@ open class Participant( @@ -222,6 +243,9 @@ open class Participant(
222 participantInfo = info 243 participantInfo = info
223 metadata = info.metadata 244 metadata = info.metadata
224 name = info.name 245 name = info.name
  246 + if (info.hasPermission()) {
  247 + permissions = ParticipantPermission.fromProto(info.permission)
  248 + }
225 } 249 }
226 250
227 override fun equals(other: Any?): Boolean { 251 override fun equals(other: Any?): Boolean {
@@ -239,7 +263,6 @@ open class Participant( @@ -239,7 +263,6 @@ open class Participant(
239 return sid.hashCode() 263 return sid.hashCode()
240 } 264 }
241 265
242 -  
243 // Internal methods just for posting events. 266 // Internal methods just for posting events.
244 internal fun onTrackMuted(trackPublication: TrackPublication) { 267 internal fun onTrackMuted(trackPublication: TrackPublication) {
245 listener?.onTrackMuted(trackPublication, this) 268 listener?.onTrackMuted(trackPublication, this)
@@ -364,4 +387,24 @@ enum class ConnectionQuality { @@ -364,4 +387,24 @@ enum class ConnectionQuality {
364 } 387 }
365 } 388 }
366 } 389 }
  390 +}
  391 +
  392 +data class ParticipantPermission(
  393 + val canPublish: Boolean,
  394 + val canSubscribe: Boolean,
  395 + val canPublishData: Boolean,
  396 + val hidden: Boolean,
  397 + val recorder: Boolean,
  398 +) {
  399 + companion object {
  400 + fun fromProto(proto: LivekitModels.ParticipantPermission): ParticipantPermission {
  401 + return ParticipantPermission(
  402 + canPublish = proto.canPublish,
  403 + canSubscribe = proto.canSubscribe,
  404 + canPublishData = proto.canPublishData,
  405 + hidden = proto.hidden,
  406 + recorder = proto.recorder,
  407 + )
  408 + }
  409 + }
367 } 410 }
@@ -2,6 +2,7 @@ package io.livekit.android @@ -2,6 +2,7 @@ package io.livekit.android
2 2
3 import android.content.Context 3 import android.content.Context
4 import androidx.test.core.app.ApplicationProvider 4 import androidx.test.core.app.ApplicationProvider
  5 +import com.google.protobuf.MessageLite
5 import io.livekit.android.mock.MockPeerConnection 6 import io.livekit.android.mock.MockPeerConnection
6 import io.livekit.android.mock.MockWebSocketFactory 7 import io.livekit.android.mock.MockWebSocketFactory
7 import io.livekit.android.mock.dagger.DaggerTestLiveKitComponent 8 import io.livekit.android.mock.dagger.DaggerTestLiveKitComponent
@@ -16,6 +17,7 @@ import kotlinx.coroutines.launch @@ -16,6 +17,7 @@ import kotlinx.coroutines.launch
16 import okhttp3.Protocol 17 import okhttp3.Protocol
17 import okhttp3.Request 18 import okhttp3.Request
18 import okhttp3.Response 19 import okhttp3.Response
  20 +import okio.ByteString
19 import org.junit.Before 21 import org.junit.Before
20 import org.webrtc.PeerConnection 22 import org.webrtc.PeerConnection
21 23
@@ -73,4 +75,18 @@ abstract class MockE2ETest : BaseTest() { @@ -73,4 +75,18 @@ abstract class MockE2ETest : BaseTest() {
73 .message("") 75 .message("")
74 .build() 76 .build()
75 } 77 }
  78 +
  79 + /**
  80 + * Simulates receiving [message] from the server
  81 + */
  82 + fun simulateMessageFromServer(message: MessageLite) {
  83 + simulateMessageFromServer(message.toOkioByteString())
  84 + }
  85 +
  86 + /**
  87 + * Simulates receiving [message] from the server
  88 + */
  89 + fun simulateMessageFromServer(message: ByteString) {
  90 + wsFactory.listener.onMessage(wsFactory.ws, message)
  91 + }
76 } 92 }
@@ -20,6 +20,13 @@ object TestData { @@ -20,6 +20,13 @@ object TestData {
20 sid = "local_participant_sid" 20 sid = "local_participant_sid"
21 identity = "local_participant_identity" 21 identity = "local_participant_identity"
22 state = LivekitModels.ParticipantInfo.State.ACTIVE 22 state = LivekitModels.ParticipantInfo.State.ACTIVE
  23 + permission = LivekitModels.ParticipantPermission.newBuilder()
  24 + .setCanPublish(true)
  25 + .setCanSubscribe(true)
  26 + .setCanPublishData(true)
  27 + .setHidden(true)
  28 + .setRecorder(false)
  29 + .build()
23 build() 30 build()
24 } 31 }
25 32
@@ -260,6 +260,34 @@ class SignalClientTest : BaseTest() { @@ -260,6 +260,34 @@ class SignalClientTest : BaseTest() {
260 build() 260 build()
261 } 261 }
262 262
  263 + val LOCAL_TRACK_UNPUBLISHED = with(LivekitRtc.SignalResponse.newBuilder()) {
  264 + trackUnpublished = with(LivekitRtc.TrackUnpublishedResponse.newBuilder()) {
  265 + trackSid = TestData.LOCAL_AUDIO_TRACK.sid
  266 + build()
  267 + }
  268 + build()
  269 + }
  270 +
  271 + val PERMISSION_CHANGE = with(LivekitRtc.SignalResponse.newBuilder()) {
  272 + update = with(LivekitRtc.ParticipantUpdate.newBuilder()) {
  273 + addParticipants(
  274 + TestData.LOCAL_PARTICIPANT.toBuilder()
  275 + .setPermission(
  276 + LivekitModels.ParticipantPermission.newBuilder()
  277 + .setCanPublish(false)
  278 + .setCanSubscribe(false)
  279 + .setCanPublishData(false)
  280 + .setHidden(false)
  281 + .setRecorder(false)
  282 + .build()
  283 + )
  284 + .build()
  285 + )
  286 + build()
  287 + }
  288 + build()
  289 + }
  290 +
263 val PARTICIPANT_JOIN = with(LivekitRtc.SignalResponse.newBuilder()) { 291 val PARTICIPANT_JOIN = with(LivekitRtc.SignalResponse.newBuilder()) {
264 update = with(LivekitRtc.ParticipantUpdate.newBuilder()) { 292 update = with(LivekitRtc.ParticipantUpdate.newBuilder()) {
265 addParticipants(TestData.REMOTE_PARTICIPANT) 293 addParticipants(TestData.REMOTE_PARTICIPANT)
  1 +package io.livekit.android.room.participant
  2 +
  3 +import io.livekit.android.MockE2ETest
  4 +import io.livekit.android.events.EventCollector
  5 +import io.livekit.android.events.RoomEvent
  6 +import io.livekit.android.mock.MockAudioStreamTrack
  7 +import io.livekit.android.room.SignalClientTest
  8 +import io.livekit.android.room.track.LocalAudioTrack
  9 +import kotlinx.coroutines.ExperimentalCoroutinesApi
  10 +import kotlinx.coroutines.launch
  11 +import org.junit.Assert
  12 +import org.junit.Test
  13 +import org.junit.runner.RunWith
  14 +import org.robolectric.RobolectricTestRunner
  15 +
  16 +
  17 +@ExperimentalCoroutinesApi
  18 +@RunWith(RobolectricTestRunner::class)
  19 +class ParticipantMockE2ETest : MockE2ETest() {
  20 +
  21 + @Test
  22 + fun trackUnpublished() = runTest {
  23 + connect()
  24 +
  25 + // publish track
  26 + val publishJob = launch {
  27 + room.localParticipant.publishAudioTrack(
  28 + LocalAudioTrack(
  29 + "",
  30 + MockAudioStreamTrack(id = SignalClientTest.LOCAL_TRACK_PUBLISHED.trackPublished.cid)
  31 + )
  32 + )
  33 + }
  34 + simulateMessageFromServer(SignalClientTest.LOCAL_TRACK_PUBLISHED)
  35 + publishJob.join()
  36 +
  37 + val eventCollector = EventCollector(room.events, coroutineRule.scope)
  38 + // remote unpublish
  39 + simulateMessageFromServer(SignalClientTest.LOCAL_TRACK_UNPUBLISHED)
  40 + val events = eventCollector.stopCollecting()
  41 +
  42 + Assert.assertEquals(1, events.size)
  43 + Assert.assertEquals(true, events[0] is RoomEvent.TrackUnpublished)
  44 + Assert.assertEquals(0, room.localParticipant.tracks.size)
  45 + }
  46 +
  47 + @Test
  48 + fun participantPermissions() = runTest {
  49 + connect()
  50 +
  51 + val eventCollector = EventCollector(room.events, coroutineRule.scope)
  52 + simulateMessageFromServer(SignalClientTest.PERMISSION_CHANGE)
  53 + val events = eventCollector.stopCollecting()
  54 +
  55 + Assert.assertEquals(1, events.size)
  56 + Assert.assertEquals(true, events[0] is RoomEvent.ParticipantPermissionsChanged)
  57 + }
  58 +}
1 -Subproject commit 6f2a49e449143a01b8c63803198b7e9d1112e77b 1 +Subproject commit 3c712ad5c941c0d2ddb5631c44239fbe525c0391