David Liu

tests for events

... ... @@ -35,13 +35,19 @@ sealed class RoomEvent : Event() {
/**
* When a [RemoteParticipant] leaves after the local participant has joined.
*/
class ParticipantDisconnected(val room: Room, val participant: RemoteParticipant): RoomEvent()
class ParticipantDisconnected(val room: Room, val participant: RemoteParticipant) : RoomEvent()
/**
* Active speakers changed. List of speakers are ordered by their audio level. loudest
* speakers first. This will include the [LocalParticipant] too.
*/
class ActiveSpeakersChanged(val room: Room, val speakers: List<Participant>,): RoomEvent()
class ActiveSpeakersChanged(val room: Room, val speakers: List<Participant>) : RoomEvent()
class RoomMetadataChanged(
val room: Room,
val newMetadata: String?,
val prevMetadata: String?
) : RoomEvent()
// Participant callbacks
/**
... ... @@ -49,7 +55,11 @@ sealed class RoomEvent : Event() {
* When RoomService.UpdateParticipantMetadata is called to change a participant's state,
* this event will be fired for all clients in the room.
*/
class MetadataChanged(val room: Room, val participant: Participant, val prevMetadata: String?): RoomEvent()
class ParticipantMetadataChanged(
val room: Room,
val participant: Participant,
val prevMetadata: String?
) : RoomEvent()
/**
* The participant was muted.
... ...
... ... @@ -349,7 +349,10 @@ constructor(
}
override fun onRoomUpdate(update: LivekitModels.Room) {
val oldMetadata = metadata
metadata = update.metadata
eventBus.postEvent(RoomEvent.RoomMetadataChanged(this, metadata, oldMetadata), coroutineScope)
}
override fun onConnectionQuality(updates: List<LivekitRtc.ConnectionQualityInfo>) {
... ... @@ -403,7 +406,7 @@ constructor(
*/
override fun onMetadataChanged(participant: Participant, prevMetadata: String?) {
listener?.onMetadataChanged(participant, prevMetadata, this)
eventBus.postEvent(RoomEvent.MetadataChanged(this, participant, prevMetadata), coroutineScope)
eventBus.postEvent(RoomEvent.ParticipantMetadataChanged(this, participant, prevMetadata), coroutineScope)
}
/** @suppress */
... ...
package io.livekit.android.coroutines
import io.livekit.android.events.EventListenable
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.cancel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
/**
* Collect events until signal is given.
*/
suspend fun <T> EventListenable<T>.collectEvents(signal: Flow<Unit?>): List<T> {
return events.takeUntilSignal(signal)
.fold(emptyList()) { list, event ->
list.plus(event)
}
}
fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit?>): Flow<T> = flow {
try {
coroutineScope {
launch {
signal.takeWhile { it == null }.collect()
println("signalled")
this@coroutineScope.cancel()
}
collect {
emit(it)
}
}
} catch (e: CancellationException) {
//ignore
}
}
\ No newline at end of file
... ...
package io.livekit.android.events
import io.livekit.android.coroutines.collectEvents
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.async
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.test.runBlockingTest
class EventCollector<T : Event>(
private val eventListenable: EventListenable<T>,
coroutineScope: CoroutineScope
) {
val signal = MutableStateFlow<Unit?>(null)
val collectEventsDeferred = coroutineScope.async {
eventListenable.collectEvents(signal)
}
/**
* Stop collecting events. returns the events collected.
*/
@OptIn(ExperimentalCoroutinesApi::class)
fun stopCollectingEvents(): List<T> {
signal.compareAndSet(null, Unit)
var events: List<T> = emptyList()
runBlockingTest {
events = collectEventsDeferred.await()
}
return events
}
}
\ No newline at end of file
... ...
package io.livekit.android.mock
import livekit.LivekitModels
object TestData {
val REMOTE_AUDIO_TRACK = with(LivekitModels.TrackInfo.newBuilder()) {
sid = "remote_audio_track_sid"
type = LivekitModels.TrackType.AUDIO
build()
}
val LOCAL_PARTICIPANT = with(LivekitModels.ParticipantInfo.newBuilder()) {
sid = "local_participant_sid"
identity = "local_participant_identity"
state = LivekitModels.ParticipantInfo.State.ACTIVE
build()
}
val REMOTE_PARTICIPANT = with(LivekitModels.ParticipantInfo.newBuilder()) {
sid = "remote_participant_sid"
identity = "remote_participant_identity"
state = LivekitModels.ParticipantInfo.State.ACTIVE
addTracks(REMOTE_AUDIO_TRACK)
build()
}
val REMOTE_SPEAKER_INFO = with(LivekitModels.SpeakerInfo.newBuilder()) {
sid = REMOTE_PARTICIPANT.sid
level = 1.0f
active = true
build()
}
}
\ No newline at end of file
... ...
... ... @@ -3,21 +3,20 @@ package io.livekit.android.room
import android.content.Context
import androidx.test.core.app.ApplicationProvider
import io.livekit.android.coroutines.TestCoroutineRule
import io.livekit.android.events.EventCollector
import io.livekit.android.events.RoomEvent
import io.livekit.android.mock.MockWebsocketFactory
import io.livekit.android.mock.dagger.DaggerTestLiveKitComponent
import io.livekit.android.room.participant.ConnectionQuality
import io.livekit.android.util.toOkioByteString
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.TestCoroutineScope
import kotlinx.coroutines.test.runBlockingTest
import org.junit.Assert
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import org.junit.runner.RunWith
import org.mockito.Mockito
import org.mockito.junit.MockitoJUnit
import org.robolectric.RobolectricTestRunner
... ... @@ -71,30 +70,114 @@ class RoomMockE2ETest {
@Test
fun roomUpdateTest() {
connect()
val eventCollector = EventCollector(room.events, coroutineRule.scope)
wsFactory.listener.onMessage(wsFactory.ws, SignalClientTest.ROOM_UPDATE.toOkioByteString())
val events = eventCollector.stopCollectingEvents()
Assert.assertEquals(
SignalClientTest.ROOM_UPDATE.roomUpdate.room.metadata,
room.metadata
)
Assert.assertEquals(1, events.size)
Assert.assertEquals(true, events[0] is RoomEvent.RoomMetadataChanged)
}
@Test
fun connectionQualityUpdateTest() {
val roomListener = Mockito.mock(RoomListener::class.java)
room.listener = roomListener
connect()
val eventCollector = EventCollector(room.events, coroutineRule.scope)
wsFactory.listener.onMessage(
wsFactory.ws,
SignalClientTest.CONNECTION_QUALITY.toOkioByteString()
)
val events = eventCollector.stopCollectingEvents()
Assert.assertEquals(
ConnectionQuality.EXCELLENT,
room.localParticipant.connectionQuality
Assert.assertEquals(ConnectionQuality.EXCELLENT, room.localParticipant.connectionQuality)
Assert.assertEquals(1, events.size)
Assert.assertEquals(true, events[0] is RoomEvent.ConnectionQualityChanged)
}
@Test
fun participantConnected() {
connect()
val eventCollector = EventCollector(room.events, coroutineRule.scope)
wsFactory.listener.onMessage(
wsFactory.ws,
SignalClientTest.PARTICIPANT_JOIN.toOkioByteString()
)
Mockito.verify(roomListener)
.onConnectionQualityChanged(room.localParticipant, ConnectionQuality.EXCELLENT)
val events = eventCollector.stopCollectingEvents()
Assert.assertEquals(1, events.size)
Assert.assertEquals(true, events[0] is RoomEvent.ParticipantConnected)
}
@Test
fun participantDisconnected() {
connect()
wsFactory.listener.onMessage(
wsFactory.ws,
SignalClientTest.PARTICIPANT_JOIN.toOkioByteString()
)
val eventCollector = EventCollector(room.events, coroutineRule.scope)
wsFactory.listener.onMessage(
wsFactory.ws,
SignalClientTest.PARTICIPANT_DISCONNECT.toOkioByteString()
)
val events = eventCollector.stopCollectingEvents()
Assert.assertEquals(1, events.size)
Assert.assertEquals(true, events[0] is RoomEvent.ParticipantDisconnected)
}
@Test
fun onActiveSpeakersChanged() {
connect()
val eventCollector = EventCollector(room.events, coroutineRule.scope)
wsFactory.listener.onMessage(
wsFactory.ws,
SignalClientTest.ACTIVE_SPEAKER_UPDATE.toOkioByteString()
)
val events = eventCollector.stopCollectingEvents()
Assert.assertEquals(1, events.size)
Assert.assertEquals(true, events[0] is RoomEvent.ActiveSpeakersChanged)
}
@Test
fun participantMetadataChanged() {
connect()
wsFactory.listener.onMessage(
wsFactory.ws,
SignalClientTest.PARTICIPANT_JOIN.toOkioByteString()
)
val eventCollector = EventCollector(room.events, coroutineRule.scope)
wsFactory.listener.onMessage(
wsFactory.ws,
SignalClientTest.PARTICIPANT_METADATA_CHANGED.toOkioByteString()
)
val events = eventCollector.stopCollectingEvents()
Assert.assertEquals(1, events.size)
Assert.assertEquals(true, events[0] is RoomEvent.ParticipantMetadataChanged)
}
@Test
fun leave() {
connect()
val eventCollector = EventCollector(room.events, coroutineRule.scope)
wsFactory.listener.onMessage(
wsFactory.ws,
SignalClientTest.LEAVE.toOkioByteString()
)
val events = eventCollector.stopCollectingEvents()
Assert.assertEquals(1, events.size)
Assert.assertEquals(true, events[0] is RoomEvent.Disconnected)
}
}
\ No newline at end of file
... ...
package io.livekit.android.room
import android.content.Context
import android.net.Network
import androidx.test.core.app.ApplicationProvider
import io.livekit.android.coroutines.TestCoroutineRule
import io.livekit.android.coroutines.collectEvents
import io.livekit.android.events.Event
import io.livekit.android.events.EventCollector
import io.livekit.android.events.EventListenable
import io.livekit.android.events.RoomEvent
import io.livekit.android.mock.MockEglBase
import io.livekit.android.mock.TestData
import io.livekit.android.room.participant.LocalParticipant
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.launch
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.test.runBlockingTest
import livekit.LivekitModels
import org.junit.Assert
import org.junit.Before
import org.junit.Rule
import org.junit.Test
... ... @@ -56,16 +64,20 @@ class RoomTest {
eglBase,
localParticantFactory,
DefaultsManager(),
coroutineRule.dispatcher
coroutineRule.dispatcher,
coroutineRule.dispatcher,
)
}
@Test
fun connectTest() {
fun connect() {
rtcEngine.stub {
onBlocking { rtcEngine.join(any(), any(), anyOrNull()) }
.doReturn(SignalClientTest.JOIN.join)
}
rtcEngine.stub {
onBlocking { rtcEngine.client }
.doReturn(Mockito.mock(SignalClient::class.java))
}
val job = coroutineRule.scope.launch {
room.connect(
url = "http://www.example.com",
... ... @@ -77,4 +89,36 @@ class RoomTest {
job.join()
}
}
@Test
fun connectTest() {
connect()
}
@Test
fun onConnectionAvailableWillReconnect() {
connect()
val eventCollector = EventCollector(room.events, coroutineRule.scope)
val network = Mockito.mock(Network::class.java)
room.onLost(network)
room.onAvailable(network)
val events = eventCollector.stopCollectingEvents()
Assert.assertEquals(1, events.size)
Assert.assertEquals(true, events[0] is RoomEvent.Reconnecting)
}
@Test
fun onDisconnect() {
connect()
val eventCollector = EventCollector(room.events, coroutineRule.scope)
room.onDisconnect("")
val events = eventCollector.stopCollectingEvents()
Assert.assertEquals(1, events.size)
Assert.assertEquals(true, events[0] is RoomEvent.Disconnected)
}
}
\ No newline at end of file
... ...
... ... @@ -2,6 +2,7 @@ package io.livekit.android.room
import com.google.protobuf.util.JsonFormat
import io.livekit.android.mock.MockWebsocketFactory
import io.livekit.android.mock.TestData
import io.livekit.android.util.toOkioByteString
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.async
... ... @@ -11,7 +12,10 @@ import kotlinx.coroutines.test.runBlockingTest
import kotlinx.serialization.json.Json
import livekit.LivekitModels
import livekit.LivekitRtc
import okhttp3.*
import okhttp3.OkHttpClient
import okhttp3.Protocol
import okhttp3.Request
import okhttp3.Response
import org.junit.After
import org.junit.Assert
import org.junit.Before
... ... @@ -138,11 +142,7 @@ class SignalClientTest {
sid = "room_sid"
build()
}
participant = with(participantBuilder) {
sid = "participant_sid"
identity = "participant_identity"
build()
}
participant = TestData.LOCAL_PARTICIPANT
build()
}
build()
... ... @@ -168,9 +168,58 @@ class SignalClientTest {
build()
}
val TRACK_PUBLISHED = with(LivekitRtc.SignalResponse.newBuilder()) {
trackPublished = with(trackPublishedBuilder) {
track = TestData.REMOTE_AUDIO_TRACK
build()
}
build()
}
val PARTICIPANT_JOIN = with(LivekitRtc.SignalResponse.newBuilder()) {
update = with(LivekitRtc.ParticipantUpdate.newBuilder()) {
addParticipants(TestData.REMOTE_PARTICIPANT)
build()
}
build()
}
val PARTICIPANT_DISCONNECT = with(LivekitRtc.SignalResponse.newBuilder()) {
update = with(LivekitRtc.ParticipantUpdate.newBuilder()) {
val disconnectedParticipant = TestData.REMOTE_PARTICIPANT.toBuilder()
.setState(LivekitModels.ParticipantInfo.State.DISCONNECTED)
.build()
addParticipants(disconnectedParticipant)
build()
}
build()
}
val ACTIVE_SPEAKER_UPDATE = with(LivekitRtc.SignalResponse.newBuilder()) {
speakersChanged = with(LivekitRtc.SpeakersChanged.newBuilder()) {
addSpeakers(TestData.REMOTE_SPEAKER_INFO)
build()
}
build()
}
val PARTICIPANT_METADATA_CHANGED = with(LivekitRtc.SignalResponse.newBuilder()) {
update = with(LivekitRtc.ParticipantUpdate.newBuilder()) {
val participantMetadataChanged = TestData.REMOTE_PARTICIPANT.toBuilder()
.setMetadata("changed_metadata")
.build()
addParticipants(participantMetadataChanged)
build()
}
build()
}
val CONNECTION_QUALITY = with(LivekitRtc.SignalResponse.newBuilder()) {
connectionQuality = with(connectionQualityBuilder) {
addUpdates(with(LivekitRtc.ConnectionQualityInfo.newBuilder()){
addUpdates(with(LivekitRtc.ConnectionQualityInfo.newBuilder()) {
participantSid = JOIN.join.participant.sid
quality = LivekitModels.ConnectionQuality.EXCELLENT
build()
... ... @@ -179,5 +228,12 @@ class SignalClientTest {
}
build()
}
val LEAVE = with(LivekitRtc.SignalResponse.newBuilder()) {
leave = with(leaveBuilder) {
build()
}
build()
}
}
}
\ No newline at end of file
... ...