davidliu
Committed by GitHub

Fire TrackPublished events when participants connect (#99)

@@ -210,6 +210,13 @@ constructor( @@ -210,6 +210,13 @@ constructor(
210 coroutineScope.launch { 210 coroutineScope.launch {
211 lp.events.collect { 211 lp.events.collect {
212 when (it) { 212 when (it) {
  213 + is ParticipantEvent.TrackPublished -> eventBus.postEvent(
  214 + RoomEvent.TrackPublished(
  215 + room = this@Room,
  216 + publication = it.publication,
  217 + participant = it.participant,
  218 + )
  219 + )
213 is ParticipantEvent.ParticipantPermissionsChanged -> eventBus.postEvent( 220 is ParticipantEvent.ParticipantPermissionsChanged -> eventBus.postEvent(
214 RoomEvent.ParticipantPermissionsChanged( 221 RoomEvent.ParticipantPermissionsChanged(
215 room = this@Room, 222 room = this@Room,
@@ -276,6 +283,17 @@ constructor( @@ -276,6 +283,17 @@ constructor(
276 coroutineScope.launch { 283 coroutineScope.launch {
277 participant.events.collect { 284 participant.events.collect {
278 when (it) { 285 when (it) {
  286 + is ParticipantEvent.TrackPublished -> {
  287 + if (state == State.CONNECTED) {
  288 + eventBus.postEvent(
  289 + RoomEvent.TrackPublished(
  290 + room = this@Room,
  291 + publication = it.publication,
  292 + participant = it.participant,
  293 + )
  294 + )
  295 + }
  296 + }
279 is ParticipantEvent.TrackStreamStateChanged -> eventBus.postEvent( 297 is ParticipantEvent.TrackStreamStateChanged -> eventBus.postEvent(
280 RoomEvent.TrackStreamStateChanged( 298 RoomEvent.TrackStreamStateChanged(
281 this@Room, 299 this@Room,
@@ -306,6 +324,10 @@ constructor( @@ -306,6 +324,10 @@ constructor(
306 } 324 }
307 } 325 }
308 326
  327 + if (info != null) {
  328 + participant.updateFromInfo(info)
  329 + }
  330 +
309 val newRemoteParticipants = mutableRemoteParticipants.toMutableMap() 331 val newRemoteParticipants = mutableRemoteParticipants.toMutableMap()
310 newRemoteParticipants[sid] = participant 332 newRemoteParticipants[sid] = participant
311 mutableRemoteParticipants = newRemoteParticipants 333 mutableRemoteParticipants = newRemoteParticipants
@@ -729,14 +751,6 @@ constructor( @@ -729,14 +751,6 @@ constructor(
729 /** 751 /**
730 * @suppress 752 * @suppress
731 */ 753 */
732 - override fun onTrackPublished(publication: RemoteTrackPublication, participant: RemoteParticipant) {  
733 - listener?.onTrackPublished(publication, participant, this)  
734 - eventBus.postEvent(RoomEvent.TrackPublished(this, publication, participant), coroutineScope)  
735 - }  
736 -  
737 - /**  
738 - * @suppress  
739 - */  
740 override fun onTrackUnpublished(publication: RemoteTrackPublication, participant: RemoteParticipant) { 754 override fun onTrackUnpublished(publication: RemoteTrackPublication, participant: RemoteParticipant) {
741 listener?.onTrackUnpublished(publication, participant, this) 755 listener?.onTrackUnpublished(publication, participant, this)
742 eventBus.postEvent(RoomEvent.TrackUnpublished(this, publication, participant), coroutineScope) 756 eventBus.postEvent(RoomEvent.TrackUnpublished(this, publication, participant), coroutineScope)
@@ -745,14 +759,6 @@ constructor( @@ -745,14 +759,6 @@ constructor(
745 /** 759 /**
746 * @suppress 760 * @suppress
747 */ 761 */
748 - override fun onTrackPublished(publication: LocalTrackPublication, participant: LocalParticipant) {  
749 - listener?.onTrackPublished(publication, participant, this)  
750 - eventBus.postEvent(RoomEvent.TrackPublished(this, publication, participant), coroutineScope)  
751 - }  
752 -  
753 - /**  
754 - * @suppress  
755 - */  
756 override fun onTrackUnpublished(publication: LocalTrackPublication, participant: LocalParticipant) { 762 override fun onTrackUnpublished(publication: LocalTrackPublication, participant: LocalParticipant) {
757 listener?.onTrackUnpublished(publication, participant, this) 763 listener?.onTrackUnpublished(publication, participant, this)
758 eventBus.postEvent(RoomEvent.TrackUnpublished(this, publication, participant), coroutineScope) 764 eventBus.postEvent(RoomEvent.TrackUnpublished(this, publication, participant), coroutineScope)
@@ -23,6 +23,10 @@ class RemoteParticipant( @@ -23,6 +23,10 @@ class RemoteParticipant(
23 defaultDispatcher: CoroutineDispatcher, 23 defaultDispatcher: CoroutineDispatcher,
24 ) : Participant(sid, identity, defaultDispatcher) { 24 ) : Participant(sid, identity, defaultDispatcher) {
25 /** 25 /**
  26 + * Note: This constructor does not update all info due to event listener race conditions.
  27 + *
  28 + * Callers are responsible for calling through to [updateFromInfo] once ready.
  29 + *
26 * @suppress 30 * @suppress
27 */ 31 */
28 constructor( 32 constructor(
@@ -37,7 +41,7 @@ class RemoteParticipant( @@ -37,7 +41,7 @@ class RemoteParticipant(
37 ioDispatcher, 41 ioDispatcher,
38 defaultDispatcher 42 defaultDispatcher
39 ) { 43 ) {
40 - updateFromInfo(info) 44 + super.updateFromInfo(info)
41 } 45 }
42 46
43 private val coroutineScope = CloseableCoroutineScope(defaultDispatcher + SupervisorJob()) 47 private val coroutineScope = CloseableCoroutineScope(defaultDispatcher + SupervisorJob())
@@ -48,7 +52,6 @@ class RemoteParticipant( @@ -48,7 +52,6 @@ class RemoteParticipant(
48 * @suppress 52 * @suppress
49 */ 53 */
50 override fun updateFromInfo(info: LivekitModels.ParticipantInfo) { 54 override fun updateFromInfo(info: LivekitModels.ParticipantInfo) {
51 - val hadInfo = hasInfo  
52 super.updateFromInfo(info) 55 super.updateFromInfo(info)
53 56
54 val validTrackPublication = mutableMapOf<String, RemoteTrackPublication>() 57 val validTrackPublication = mutableMapOf<String, RemoteTrackPublication>()
@@ -74,12 +77,10 @@ class RemoteParticipant( @@ -74,12 +77,10 @@ class RemoteParticipant(
74 validTrackPublication[trackSid] = publication 77 validTrackPublication[trackSid] = publication
75 } 78 }
76 79
77 - if (hadInfo) {  
78 - for (publication in newTrackPublications.values) {  
79 - internalListener?.onTrackPublished(publication, this)  
80 - listener?.onTrackPublished(publication, this)  
81 - eventBus.postEvent(ParticipantEvent.TrackPublished(this, publication), scope)  
82 - } 80 + for (publication in newTrackPublications.values) {
  81 + internalListener?.onTrackPublished(publication, this)
  82 + listener?.onTrackPublished(publication, this)
  83 + eventBus.postEvent(ParticipantEvent.TrackPublished(this, publication), scope)
83 } 84 }
84 85
85 val invalidKeys = tracks.keys - validTrackPublication.keys 86 val invalidKeys = tracks.keys - validTrackPublication.keys
@@ -14,6 +14,7 @@ import io.livekit.android.room.SignalClientTest @@ -14,6 +14,7 @@ import io.livekit.android.room.SignalClientTest
14 import io.livekit.android.util.toOkioByteString 14 import io.livekit.android.util.toOkioByteString
15 import kotlinx.coroutines.ExperimentalCoroutinesApi 15 import kotlinx.coroutines.ExperimentalCoroutinesApi
16 import kotlinx.coroutines.launch 16 import kotlinx.coroutines.launch
  17 +import livekit.LivekitRtc
17 import okhttp3.Protocol 18 import okhttp3.Protocol
18 import okhttp3.Request 19 import okhttp3.Request
19 import okhttp3.Response 20 import okhttp3.Response
@@ -42,12 +43,12 @@ abstract class MockE2ETest : BaseTest() { @@ -42,12 +43,12 @@ abstract class MockE2ETest : BaseTest() {
42 wsFactory = component.websocketFactory() 43 wsFactory = component.websocketFactory()
43 } 44 }
44 45
45 - suspend fun connect() {  
46 - connectSignal() 46 + suspend fun connect(joinResponse: LivekitRtc.SignalResponse = SignalClientTest.JOIN) {
  47 + connectSignal(joinResponse)
47 connectPeerConnection() 48 connectPeerConnection()
48 } 49 }
49 50
50 - suspend fun connectSignal() { 51 + suspend fun connectSignal(joinResponse: LivekitRtc.SignalResponse) {
51 val job = coroutineRule.scope.launch { 52 val job = coroutineRule.scope.launch {
52 room.connect( 53 room.connect(
53 url = SignalClientTest.EXAMPLE_URL, 54 url = SignalClientTest.EXAMPLE_URL,
@@ -55,14 +56,14 @@ abstract class MockE2ETest : BaseTest() { @@ -55,14 +56,14 @@ abstract class MockE2ETest : BaseTest() {
55 ) 56 )
56 } 57 }
57 wsFactory.listener.onOpen(wsFactory.ws, createOpenResponse(wsFactory.request)) 58 wsFactory.listener.onOpen(wsFactory.ws, createOpenResponse(wsFactory.request))
58 - wsFactory.listener.onMessage(wsFactory.ws, SignalClientTest.JOIN.toOkioByteString()) 59 + simulateMessageFromServer(joinResponse)
59 60
60 job.join() 61 job.join()
61 } 62 }
62 63
63 suspend fun connectPeerConnection() { 64 suspend fun connectPeerConnection() {
64 subscriber = component.rtcEngine().subscriber 65 subscriber = component.rtcEngine().subscriber
65 - wsFactory.listener.onMessage(wsFactory.ws, SignalClientTest.OFFER.toOkioByteString()) 66 + simulateMessageFromServer(SignalClientTest.OFFER)
66 val subPeerConnection = subscriber.peerConnection as MockPeerConnection 67 val subPeerConnection = subscriber.peerConnection as MockPeerConnection
67 subPeerConnection.moveToIceConnectionState(PeerConnection.IceConnectionState.CONNECTED) 68 subPeerConnection.moveToIceConnectionState(PeerConnection.IceConnectionState.CONNECTED)
68 } 69 }
@@ -20,6 +20,7 @@ object TestData { @@ -20,6 +20,7 @@ object TestData {
20 sid = "local_participant_sid" 20 sid = "local_participant_sid"
21 identity = "local_participant_identity" 21 identity = "local_participant_identity"
22 state = LivekitModels.ParticipantInfo.State.ACTIVE 22 state = LivekitModels.ParticipantInfo.State.ACTIVE
  23 + metadata = "local_metadata"
23 permission = LivekitModels.ParticipantPermission.newBuilder() 24 permission = LivekitModels.ParticipantPermission.newBuilder()
24 .setCanPublish(true) 25 .setCanPublish(true)
25 .setCanSubscribe(true) 26 .setCanSubscribe(true)
@@ -34,6 +35,14 @@ object TestData { @@ -34,6 +35,14 @@ object TestData {
34 sid = "remote_participant_sid" 35 sid = "remote_participant_sid"
35 identity = "remote_participant_identity" 36 identity = "remote_participant_identity"
36 state = LivekitModels.ParticipantInfo.State.ACTIVE 37 state = LivekitModels.ParticipantInfo.State.ACTIVE
  38 + metadata = "remote_metadata"
  39 + isPublisher = true
  40 + permission = with(LivekitModels.ParticipantPermission.newBuilder()) {
  41 + canPublish = true
  42 + canSubscribe = true
  43 + canPublishData
  44 + build()
  45 + }
37 addTracks(REMOTE_AUDIO_TRACK) 46 addTracks(REMOTE_AUDIO_TRACK)
38 build() 47 build()
39 } 48 }
@@ -14,6 +14,7 @@ import io.livekit.android.room.track.LocalAudioTrack @@ -14,6 +14,7 @@ import io.livekit.android.room.track.LocalAudioTrack
14 import io.livekit.android.room.track.Track 14 import io.livekit.android.room.track.Track
15 import io.livekit.android.util.flow 15 import io.livekit.android.util.flow
16 import io.livekit.android.util.toOkioByteString 16 import io.livekit.android.util.toOkioByteString
  17 +import junit.framework.Assert.assertEquals
17 import kotlinx.coroutines.ExperimentalCoroutinesApi 18 import kotlinx.coroutines.ExperimentalCoroutinesApi
18 import kotlinx.coroutines.launch 19 import kotlinx.coroutines.launch
19 import org.junit.Assert 20 import org.junit.Assert
@@ -38,6 +39,30 @@ class RoomMockE2ETest : MockE2ETest() { @@ -38,6 +39,30 @@ class RoomMockE2ETest : MockE2ETest() {
38 } 39 }
39 40
40 @Test 41 @Test
  42 + fun connectNoEvents() = runTest {
  43 + val collector = EventCollector(room.events, coroutineRule.scope)
  44 + connect()
  45 + val events = collector.stopCollecting()
  46 + assertEquals(0, events.size)
  47 + }
  48 +
  49 + @Test
  50 + fun connectNoEventsWithRemoteParticipant() = runTest {
  51 + val joinResponse = with(SignalClientTest.JOIN.toBuilder()) {
  52 + join = with(SignalClientTest.JOIN.join.toBuilder()) {
  53 + addOtherParticipants(TestData.REMOTE_PARTICIPANT)
  54 + build()
  55 + }
  56 + build()
  57 + }
  58 +
  59 + val collector = EventCollector(room.events, coroutineRule.scope)
  60 + connect(joinResponse)
  61 + val events = collector.stopCollecting()
  62 + assertEquals(0, events.size)
  63 + }
  64 +
  65 + @Test
41 fun connectFailureProperlyContinues() = runTest { 66 fun connectFailureProperlyContinues() = runTest {
42 67
43 var didThrowException = false 68 var didThrowException = false
@@ -94,14 +119,12 @@ class RoomMockE2ETest : MockE2ETest() { @@ -94,14 +119,12 @@ class RoomMockE2ETest : MockE2ETest() {
94 connect() 119 connect()
95 120
96 val eventCollector = EventCollector(room.events, coroutineRule.scope) 121 val eventCollector = EventCollector(room.events, coroutineRule.scope)
97 - wsFactory.listener.onMessage(  
98 - wsFactory.ws,  
99 - SignalClientTest.PARTICIPANT_JOIN.toOkioByteString()  
100 - ) 122 + simulateMessageFromServer(SignalClientTest.PARTICIPANT_JOIN)
101 val events = eventCollector.stopCollecting() 123 val events = eventCollector.stopCollecting()
102 124
103 - Assert.assertEquals(1, events.size) 125 + Assert.assertEquals(2, events.size)
104 Assert.assertEquals(true, events[0] is RoomEvent.ParticipantConnected) 126 Assert.assertEquals(true, events[0] is RoomEvent.ParticipantConnected)
  127 + Assert.assertEquals(true, events[1] is RoomEvent.TrackPublished)
105 } 128 }
106 129
107 @Test 130 @Test
1 package io.livekit.android.room.participant 1 package io.livekit.android.room.participant
2 2
3 import io.livekit.android.BaseTest 3 import io.livekit.android.BaseTest
  4 +import io.livekit.android.events.EventCollector
4 import io.livekit.android.events.FlowCollector 5 import io.livekit.android.events.FlowCollector
  6 +import io.livekit.android.events.ParticipantEvent
5 import io.livekit.android.room.SignalClient 7 import io.livekit.android.room.SignalClient
6 import io.livekit.android.room.track.TrackPublication 8 import io.livekit.android.room.track.TrackPublication
7 import io.livekit.android.util.flow 9 import io.livekit.android.util.flow
@@ -31,31 +33,20 @@ class RemoteParticipantTest : BaseTest() { @@ -31,31 +33,20 @@ class RemoteParticipantTest : BaseTest() {
31 } 33 }
32 34
33 @Test 35 @Test
34 - fun constructorAddsTrack() {  
35 - val info = LivekitModels.ParticipantInfo.newBuilder(INFO)  
36 - .addTracks(TRACK_INFO)  
37 - .build()  
38 -  
39 - participant = RemoteParticipant(  
40 - info,  
41 - signalClient,  
42 - ioDispatcher = coroutineRule.dispatcher,  
43 - defaultDispatcher = coroutineRule.dispatcher,  
44 - )  
45 -  
46 - assertEquals(1, participant.tracks.values.size)  
47 - assertNotNull(participant.getTrackPublication(TRACK_INFO.sid))  
48 - }  
49 -  
50 - @Test  
51 - fun updateFromInfoAddsTrack() { 36 + fun updateFromInfoAddsTrack() = runTest {
52 val newTrackInfo = LivekitModels.ParticipantInfo.newBuilder(INFO) 37 val newTrackInfo = LivekitModels.ParticipantInfo.newBuilder(INFO)
53 .addTracks(TRACK_INFO) 38 .addTracks(TRACK_INFO)
54 .build() 39 .build()
  40 +
  41 + val collector = EventCollector(participant.events, coroutineRule.scope)
55 participant.updateFromInfo(newTrackInfo) 42 participant.updateFromInfo(newTrackInfo)
  43 + val events = collector.stopCollecting()
56 44
57 assertEquals(1, participant.tracks.values.size) 45 assertEquals(1, participant.tracks.values.size)
58 assertNotNull(participant.getTrackPublication(TRACK_INFO.sid)) 46 assertNotNull(participant.getTrackPublication(TRACK_INFO.sid))
  47 +
  48 + val publishes = events.filterIsInstance<ParticipantEvent.TrackPublished>()
  49 + assertEquals(1, publishes.size)
59 } 50 }
60 51
61 @Test 52 @Test