davidliu
Committed by GitHub

Implement data streams (#625)

* Implement data streams

* Emit warning logs for unhandled datastreams

* Fix stream closing
正在显示 26 个修改的文件 包含 1692 行增加49 行删除
---
"client-sdk-android": minor
---
Implement data streams feature
... ...
... ... @@ -3,6 +3,7 @@
<words>
<w>bitrates</w>
<w>capturer</w>
<w>chunker</w>
<w>exts</w>
<w>msid</w>
</words>
... ...
/*
* Copyright 2025 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.livekit.android.coroutines
import kotlinx.coroutines.cancel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.launch
import kotlin.coroutines.cancellation.CancellationException
fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit?>): Flow<T> = flow {
try {
coroutineScope {
launch {
signal.takeWhile { it == null }.collect()
this@coroutineScope.cancel()
}
collect {
emit(it)
}
}
} catch (e: CancellationException) {
// ignore
}
}
fun <T> Flow<T>.cancelOnSignal(signal: Flow<Unit?>): Flow<T> = flow {
coroutineScope {
launch {
signal.takeWhile { it == null }.collect()
currentCoroutineContext().cancel()
}
collect {
emit(it)
}
currentCoroutineContext().cancel()
}
}
... ...
/*
* Copyright 2025 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.livekit.android.dagger
import dagger.Binds
import dagger.Module
import io.livekit.android.room.datastream.incoming.IncomingDataStreamManager
import io.livekit.android.room.datastream.incoming.IncomingDataStreamManagerImpl
import io.livekit.android.room.datastream.outgoing.OutgoingDataStreamManager
import io.livekit.android.room.datastream.outgoing.OutgoingDataStreamManagerImpl
/**
* @suppress
*/
@Module
abstract class InternalBindsModule {
@Binds
abstract fun incomingDataStreamManager(manager: IncomingDataStreamManagerImpl): IncomingDataStreamManager
@Binds
abstract fun outgoingDataStreamManager(manager: OutgoingDataStreamManagerImpl): OutgoingDataStreamManager
}
... ...
/*
* Copyright 2023-2024 LiveKit, Inc.
* Copyright 2023-2025 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
... ... @@ -38,6 +38,7 @@ import javax.inject.Singleton
OverridesModule::class,
AudioHandlerModule::class,
MemoryModule::class,
InternalBindsModule::class,
],
)
interface LiveKitComponent {
... ...
... ... @@ -38,6 +38,7 @@ import io.livekit.android.util.LKLog
import io.livekit.android.util.flowDelegate
import io.livekit.android.util.nullSafe
import io.livekit.android.util.withCheckLock
import io.livekit.android.webrtc.DataChannelManager
import io.livekit.android.webrtc.RTCStatsGetter
import io.livekit.android.webrtc.copy
import io.livekit.android.webrtc.isConnected
... ... @@ -165,6 +166,10 @@ internal constructor(
private var reliableDataChannelSub: DataChannel? = null
private var lossyDataChannel: DataChannel? = null
private var lossyDataChannelSub: DataChannel? = null
private var reliableDataChannelManager: DataChannelManager? = null
private var reliableDataChannelSubManager: DataChannelManager? = null
private var lossyDataChannelManager: DataChannelManager? = null
private var lossyDataChannelSubManager: DataChannelManager? = null
private var isSubscriberPrimary = false
private var isClosed = true
... ... @@ -406,19 +411,17 @@ internal constructor(
subscriber?.closeBlocking()
subscriber = null
fun DataChannel?.completeDispose() {
this?.unregisterObserver()
this?.close()
this?.dispose()
}
reliableDataChannel?.completeDispose()
reliableDataChannelManager?.dispose()
reliableDataChannelManager = null
reliableDataChannel = null
reliableDataChannelSub?.completeDispose()
reliableDataChannelSubManager?.dispose()
reliableDataChannelSubManager = null
reliableDataChannelSub = null
lossyDataChannel?.completeDispose()
lossyDataChannelManager?.dispose()
lossyDataChannelManager = null
lossyDataChannel = null
lossyDataChannelSub?.completeDispose()
lossyDataChannelSubManager?.dispose()
lossyDataChannelSubManager = null
lossyDataChannelSub = null
isSubscriberPrimary = false
}
... ... @@ -634,6 +637,22 @@ internal constructor(
channel.send(buf)
}
internal suspend fun waitForBufferStatusLow(kind: LivekitModels.DataPacket.Kind) {
ensurePublisherConnected(kind)
val manager = when (kind) {
LivekitModels.DataPacket.Kind.RELIABLE -> reliableDataChannelManager
LivekitModels.DataPacket.Kind.LOSSY -> lossyDataChannelManager
LivekitModels.DataPacket.Kind.UNRECOGNIZED -> {
throw IllegalArgumentException()
}
}
if (manager == null) {
throw IllegalStateException("Not connected!")
}
manager.waitForBufferedAmountLow(DATA_CHANNEL_LOW_THRESHOLD.toLong())
}
private suspend fun ensurePublisherConnected(kind: LivekitModels.DataPacket.Kind) {
if (!isSubscriberPrimary) {
return
... ... @@ -802,6 +821,7 @@ internal constructor(
fun onTranscriptionReceived(transcription: LivekitModels.Transcription)
fun onLocalTrackSubscribed(trackSubscribed: LivekitRtc.TrackSubscribed)
fun onRpcPacketReceived(dp: LivekitModels.DataPacket)
fun onDataStreamPacket(dp: LivekitModels.DataPacket)
}
companion object {
... ... @@ -817,11 +837,13 @@ internal constructor(
*/
@VisibleForTesting
const val LOSSY_DATA_CHANNEL_LABEL = "_lossy"
internal const val MAX_DATA_PACKET_SIZE = 15360 // 15 KB
internal const val MAX_DATA_PACKET_SIZE = 15 * 1024 // 15 KB
private const val MAX_RECONNECT_RETRIES = 10
private const val MAX_RECONNECT_TIMEOUT = 60 * 1000
private const val MAX_ICE_CONNECT_TIMEOUT_MS = 20000
private const val DATA_CHANNEL_LOW_THRESHOLD = 64 * 1024 // 64 KB
internal val CONN_CONSTRAINTS = MediaConstraints().apply {
with(optional) {
add(MediaConstraints.KeyValuePair("DtlsSrtpKeyAgreement", "true"))
... ... @@ -1079,16 +1101,11 @@ internal constructor(
LKLog.v { "invalid value for data packet" }
}
LivekitModels.DataPacket.ValueCase.STREAM_HEADER -> {
// TODO
}
LivekitModels.DataPacket.ValueCase.STREAM_CHUNK -> {
// TODO
}
LivekitModels.DataPacket.ValueCase.STREAM_TRAILER -> {
// TODO
LivekitModels.DataPacket.ValueCase.STREAM_HEADER,
LivekitModels.DataPacket.ValueCase.STREAM_CHUNK,
LivekitModels.DataPacket.ValueCase.STREAM_TRAILER,
-> {
listener?.onDataStreamPacket(dp)
}
}
}
... ...
... ... @@ -42,6 +42,7 @@ import io.livekit.android.e2ee.E2EEOptions
import io.livekit.android.events.*
import io.livekit.android.memory.CloseableManager
import io.livekit.android.renderer.TextureViewRenderer
import io.livekit.android.room.datastream.incoming.IncomingDataStreamManager
import io.livekit.android.room.metrics.collectMetrics
import io.livekit.android.room.network.NetworkCallbackManagerFactory
import io.livekit.android.room.participant.*
... ... @@ -106,7 +107,8 @@ constructor(
private val regionUrlProviderFactory: RegionUrlProvider.Factory,
private val connectionWarmer: ConnectionWarmer,
private val audioRecordPrewarmer: AudioRecordPrewarmer,
) : RTCEngine.Listener, ParticipantListener {
private val incomingDataStreamManager: IncomingDataStreamManager,
) : RTCEngine.Listener, ParticipantListener, IncomingDataStreamManager by incomingDataStreamManager {
private lateinit var coroutineScope: CoroutineScope
private val eventBus = BroadcastEventBus<RoomEvent>()
... ... @@ -907,6 +909,7 @@ constructor(
name = null
isRecording = false
sidToIdentity.clear()
incomingDataStreamManager.clearOpenStreams()
}
private fun sendSyncState() {
... ... @@ -1193,6 +1196,28 @@ constructor(
/**
* @suppress
*/
override fun onDataStreamPacket(dp: LivekitModels.DataPacket) {
when (dp.valueCase) {
LivekitModels.DataPacket.ValueCase.STREAM_HEADER -> {
incomingDataStreamManager.handleStreamHeader(dp.streamHeader, Participant.Identity(dp.participantIdentity))
}
LivekitModels.DataPacket.ValueCase.STREAM_CHUNK -> {
incomingDataStreamManager.handleDataChunk(dp.streamChunk)
}
LivekitModels.DataPacket.ValueCase.STREAM_TRAILER -> {
incomingDataStreamManager.handleStreamTrailer(dp.streamTrailer)
}
// Ignore other cases.
else -> {}
}
}
/**
* @suppress
*/
override fun onTranscriptionReceived(transcription: LivekitModels.Transcription) {
if (transcription.segmentsList.isEmpty()) {
LKLog.d { "Received transcription segments are empty." }
... ...
/*
* Copyright 2025 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.livekit.android.room.datastream
sealed class StreamException(message: String? = null) : Exception(message) {
class AlreadyOpenedException : StreamException()
class AbnormalEndException(message: String?) : StreamException(message)
class DecodeFailedException : StreamException()
class LengthExceededException : StreamException()
class IncompleteException : StreamException()
class TerminatedException : StreamException()
class UnknownStreamException : StreamException()
class NotDirectoryException : StreamException()
class FileInfoUnavailableException : StreamException()
}
... ...
/*
* Copyright 2025 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.livekit.android.room.datastream
import livekit.LivekitModels
import livekit.LivekitModels.DataStream.ByteHeader
import livekit.LivekitModels.DataStream.Header
import livekit.LivekitModels.DataStream.TextHeader
sealed class StreamInfo(
open val id: String,
open val topic: String,
open val timestampMs: Long,
open val totalSize: Long?,
open val attributes: Map<String, String>,
)
data class TextStreamInfo(
override val id: String,
override val topic: String,
override val timestampMs: Long,
override val totalSize: Long?,
override val attributes: Map<String, String>,
val operationType: OperationType,
val version: Int,
val replyToStreamId: String?,
val attachedStreamIds: List<String>,
val generated: Boolean,
) : StreamInfo(id, topic, timestampMs, totalSize, attributes) {
constructor(header: Header, textHeader: TextHeader) : this(
id = header.streamId,
topic = header.topic,
timestampMs = header.timestamp,
totalSize = if (header.hasTotalLength()) {
header.totalLength
} else {
null
},
attributes = header.attributesMap.toMap(),
operationType = OperationType.fromProto(textHeader.operationType),
version = textHeader.version,
replyToStreamId = if (!textHeader.replyToStreamId.isNullOrEmpty()) {
textHeader.replyToStreamId
} else {
null
},
attachedStreamIds = textHeader.attachedStreamIdsList ?: emptyList(),
generated = textHeader.generated,
)
enum class OperationType {
CREATE,
UPDATE,
DELETE,
REACTION;
/**
* @throws IllegalArgumentException [operationType] is unrecognized
*/
fun toProto(): LivekitModels.DataStream.OperationType {
return when (this) {
CREATE -> LivekitModels.DataStream.OperationType.CREATE
UPDATE -> LivekitModels.DataStream.OperationType.UPDATE
DELETE -> LivekitModels.DataStream.OperationType.DELETE
REACTION -> LivekitModels.DataStream.OperationType.REACTION
}
}
companion object {
/**
* @throws IllegalArgumentException [operationType] is unrecognized
*/
fun fromProto(operationType: LivekitModels.DataStream.OperationType): OperationType {
return when (operationType) {
LivekitModels.DataStream.OperationType.CREATE -> CREATE
LivekitModels.DataStream.OperationType.UPDATE -> UPDATE
LivekitModels.DataStream.OperationType.DELETE -> DELETE
LivekitModels.DataStream.OperationType.REACTION -> REACTION
LivekitModels.DataStream.OperationType.UNRECOGNIZED -> throw IllegalArgumentException("Unrecognized operation type!")
}
}
}
}
}
data class ByteStreamInfo(
override val id: String,
override val topic: String,
override val timestampMs: Long,
override val totalSize: Long?,
override val attributes: Map<String, String>,
val mimeType: String,
val name: String?,
) : StreamInfo(id, topic, timestampMs, totalSize, attributes) {
constructor(header: Header, byteHeader: ByteHeader) : this(
id = header.streamId,
topic = header.topic,
timestampMs = header.timestamp,
totalSize = if (header.hasTotalLength()) {
header.totalLength
} else {
null
},
attributes = header.attributesMap.toMap(),
mimeType = header.mimeType,
name = byteHeader.name,
)
}
... ...
/*
* Copyright 2025 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.livekit.android.room.datastream
import io.livekit.android.room.participant.Participant
import livekit.LivekitModels
import java.util.UUID
interface StreamOptions {
val topic: String?
val attributes: Map<String, String>?
val totalLength: Long?
val mimeType: String?
val encryptionType: LivekitModels.Encryption.Type?
val destinationIdentities: List<Participant.Identity>
}
data class StreamTextOptions(
val topic: String = "",
val attributes: Map<String, String> = emptyMap(),
val streamId: String = UUID.randomUUID().toString(),
val destinationIdentities: List<Participant.Identity> = emptyList(),
val operationType: TextStreamInfo.OperationType,
val version: Int = 0,
val attachedStreamIds: List<String> = emptyList(),
val replyToStreamId: String? = null,
/**
* The total exact size in bytes, if known.
*/
val totalSize: Long? = null,
)
data class StreamBytesOptions(
val topic: String = "",
val attributes: Map<String, String> = emptyMap(),
val streamId: String = UUID.randomUUID().toString(),
val destinationIdentities: List<Participant.Identity> = emptyList(),
/**
* The mime type of the stream data. Defaults to application/octet-stream
*/
val mimeType: String = "application/octet-stream",
/**
* The name of the file being sent.
*/
val name: String,
/**
* The total exact size in bytes, if known.
*/
val totalSize: Long? = null,
)
... ...
/*
* Copyright 2025 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.livekit.android.room.datastream.incoming
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.fold
abstract class BaseStreamReceiver<T>(private val source: Channel<ByteArray>) {
abstract val flow: Flow<T>
internal fun close(error: Exception?) {
source.close(cause = error)
}
/**
* Suspends and waits for the next piece of data.
*
* @return the next available piece of data.
* @throws NoSuchElementException when the stream is closed and no more data is available.
*/
suspend fun readNext(): T {
return flow.first()
}
/**
* Suspends and waits for all available data until the stream is closed.
*/
suspend fun readAll(): List<T> {
flow.catch { }
return flow.fold(mutableListOf()) { acc, value ->
acc.add(value)
return@fold acc
}
}
}
... ...
/*
* Copyright 2025 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.livekit.android.room.datastream.incoming
import io.livekit.android.room.datastream.ByteStreamInfo
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.receiveAsFlow
class ByteStreamReceiver(
val info: ByteStreamInfo,
channel: Channel<ByteArray>,
) : BaseStreamReceiver<ByteArray>(channel) {
override val flow: Flow<ByteArray> = channel.receiveAsFlow()
}
... ...
/*
* Copyright 2025 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.livekit.android.room.datastream.incoming
import android.os.SystemClock
import androidx.annotation.VisibleForTesting
import io.livekit.android.room.datastream.ByteStreamInfo
import io.livekit.android.room.datastream.StreamException
import io.livekit.android.room.datastream.StreamInfo
import io.livekit.android.room.datastream.TextStreamInfo
import io.livekit.android.room.participant.Participant
import io.livekit.android.util.LKLog
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import livekit.LivekitModels.DataStream
import java.util.Collections
import javax.inject.Inject
// Type-erased stream handler
private typealias AnyStreamHandler = (Channel<ByteArray>, Participant.Identity) -> Unit
typealias ByteStreamHandler = (reader: ByteStreamReceiver, fromIdentity: Participant.Identity) -> Unit
typealias TextStreamHandler = (reader: TextStreamReceiver, fromIdentity: Participant.Identity) -> Unit
interface IncomingDataStreamManager {
/**
* Registers a text stream handler for [topic]. Only one handler can be set for a particular topic at a time.
*
* @throws IllegalArgumentException if a topic is already set.
*/
fun registerTextStreamHandler(topic: String, handler: TextStreamHandler)
/**
* Unregisters a previously registered text handler for [topic].
*/
fun unregisterTextStreamHandler(topic: String)
/**
* Registers a byte stream handler for [topic]. Only one handler can be set for a particular topic at a time.
*
* @throws IllegalArgumentException if a topic is already set.
*/
fun registerByteStreamHandler(topic: String, handler: ByteStreamHandler)
/**
* Unregisters a previously registered byte handler for [topic].
*/
fun unregisterByteStreamHandler(topic: String)
/**
* @suppress
*/
fun handleStreamHeader(header: DataStream.Header, fromIdentity: Participant.Identity)
/**
* @suppress
*/
fun handleDataChunk(chunk: DataStream.Chunk)
/**
* @suppress
*/
fun handleStreamTrailer(trailer: DataStream.Trailer)
/**
* @suppress
*/
fun clearOpenStreams()
}
/**
* @suppress
*/
class IncomingDataStreamManagerImpl @Inject constructor() : IncomingDataStreamManager {
private data class Descriptor(
val streamInfo: StreamInfo,
/**
* Measured by SystemClock.elapsedRealtime()
*/
val openTime: Long,
val channel: Channel<ByteArray>,
var readLength: Long = 0,
)
private val openStreams = Collections.synchronizedMap(mutableMapOf<String, Descriptor>())
private val textStreamHandlers = Collections.synchronizedMap(mutableMapOf<String, TextStreamHandler>())
private val byteStreamHandlers = Collections.synchronizedMap(mutableMapOf<String, ByteStreamHandler>())
/**
* Registers a text stream handler for [topic]. Only one handler can be set for a particular topic at a time.
*
* @throws IllegalArgumentException if a topic is already set.
*/
override fun registerTextStreamHandler(topic: String, handler: TextStreamHandler) {
synchronized(textStreamHandlers) {
if (textStreamHandlers.containsKey(topic)) {
throw IllegalArgumentException("A text stream handler for topic $topic has already been set.")
}
textStreamHandlers[topic] = handler
}
}
/**
* Unregisters a previously registered text handler for [topic].
*/
override fun unregisterTextStreamHandler(topic: String) {
synchronized(textStreamHandlers) {
textStreamHandlers.remove(topic)
}
}
/**
* Registers a byte stream handler for [topic]. Only one handler can be set for a particular topic at a time.
*
* @throws IllegalArgumentException if a topic is already set.
*/
override fun registerByteStreamHandler(topic: String, handler: ByteStreamHandler) {
synchronized(byteStreamHandlers) {
if (byteStreamHandlers.containsKey(topic)) {
throw IllegalArgumentException("A byte stream handler for topic $topic has already been set.")
}
byteStreamHandlers[topic] = handler
}
}
/**
* Unregisters a previously registered byte handler for [topic].
*/
override fun unregisterByteStreamHandler(topic: String) {
synchronized(byteStreamHandlers) {
byteStreamHandlers.remove(topic)
}
}
/**
* @suppress
*/
override fun handleStreamHeader(header: DataStream.Header, fromIdentity: Participant.Identity) {
val info = streamInfoFromHeader(header) ?: return
openStream(info, fromIdentity)
}
@OptIn(ExperimentalCoroutinesApi::class)
private fun openStream(info: StreamInfo, fromIdentity: Participant.Identity) {
if (openStreams.containsKey(info.id)) {
LKLog.w { "Stream already open for id ${info.id}" }
return
}
val handler = getHandlerForInfo(info)
val channel = createChannelForStreamReceiver()
val descriptor = Descriptor(
streamInfo = info,
openTime = SystemClock.elapsedRealtime(),
channel = channel,
)
openStreams[info.id] = descriptor
channel.invokeOnClose { closeStream(id = info.id) }
LKLog.d { "Opened stream ${info.id}" }
try {
handler.invoke(channel, fromIdentity)
} catch (e: Exception) {
LKLog.e(e) { "Unhandled exception when invoking stream handler!" }
}
}
/**
* @suppress
*/
override fun handleDataChunk(chunk: DataStream.Chunk) {
val content = chunk.content ?: return
val descriptor = openStreams[chunk.streamId] ?: return
val totalReadLength = descriptor.readLength + content.size()
val totalLength = descriptor.streamInfo.totalSize
if (totalLength != null) {
if (totalReadLength > totalLength) {
descriptor.channel.close(StreamException.LengthExceededException())
return
}
}
descriptor.readLength = totalReadLength
descriptor.channel.trySend(content.toByteArray())
}
/**
* @suppress
*/
override fun handleStreamTrailer(trailer: DataStream.Trailer) {
val descriptor = openStreams[trailer.streamId]
if (descriptor == null) {
LKLog.w { "Received trailer for unknown stream: ${trailer.streamId}" }
return
}
val totalLength = descriptor.streamInfo.totalSize
if (totalLength != null) {
if (descriptor.readLength != totalLength) {
descriptor.channel.close(StreamException.IncompleteException())
return
}
}
val reason = trailer.reason
if (!reason.isNullOrEmpty()) {
// A non-empty reason string indicates an error
val exception = StreamException.AbnormalEndException(reason)
descriptor.channel.close(exception)
return
}
// Close successfully.
descriptor.channel.close()
}
private fun closeStream(id: String) {
synchronized(openStreams) {
val descriptor = openStreams[id]
if (descriptor == null) {
LKLog.d { "Attempted to close stream $id, but no descriptor was found." }
return
}
descriptor.channel.close()
val openMillis = SystemClock.elapsedRealtime() - descriptor.openTime
LKLog.d { "Closed stream $id, (open for ${openMillis}ms" }
openStreams.remove(id)
}
}
/**
* @suppress
*/
override fun clearOpenStreams() {
synchronized(openStreams) {
for (descriptor in openStreams.values) {
descriptor.channel.close(StreamException.TerminatedException())
}
openStreams.clear()
}
}
private fun getHandlerForInfo(info: StreamInfo): AnyStreamHandler {
return when (info) {
is ByteStreamInfo -> {
val handler = byteStreamHandlers[info.topic]
{ channel, identity ->
if (handler == null) {
LKLog.w { "Received byte stream for topic \"${info.topic}\", but no handler was found. Ignoring." }
} else {
handler.invoke(ByteStreamReceiver(info, channel), identity)
}
}
}
is TextStreamInfo -> {
val handler = textStreamHandlers[info.topic]
{ channel, identity ->
if (handler == null) {
LKLog.w { "Received text stream for topic \"${info.topic}\", but no handler was found. Ignoring." }
} else {
handler.invoke(TextStreamReceiver(info, channel), identity)
}
}
}
}
}
private fun streamInfoFromHeader(header: DataStream.Header): StreamInfo? {
try {
return when (header.contentHeaderCase) {
DataStream.Header.ContentHeaderCase.TEXT_HEADER -> {
TextStreamInfo(header, header.textHeader)
}
DataStream.Header.ContentHeaderCase.BYTE_HEADER -> {
ByteStreamInfo(header, header.byteHeader)
}
DataStream.Header.ContentHeaderCase.CONTENTHEADER_NOT_SET,
null,
-> {
LKLog.i { "received header with non-set content header. streamId: ${header.streamId}, topic: ${header.topic}" }
null
}
}
} catch (e: Exception) {
LKLog.e(e) { "Exception when processing new stream header." }
return null
}
}
companion object {
@VisibleForTesting
fun createChannelForStreamReceiver() = Channel<ByteArray>(
capacity = Int.MAX_VALUE,
onBufferOverflow = BufferOverflow.SUSPEND,
)
}
}
... ...
/*
* Copyright 2025 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.livekit.android.room.datastream.incoming
import io.livekit.android.room.datastream.TextStreamInfo
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.receiveAsFlow
class TextStreamReceiver(
val info: TextStreamInfo,
source: Channel<ByteArray>,
) : BaseStreamReceiver<String>(source) {
override val flow: Flow<String> = source.receiveAsFlow()
.map { bytes -> bytes.toString(Charsets.UTF_8) }
}
... ...
/*
* Copyright 2025 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.livekit.android.room.datastream.outgoing
import io.livekit.android.room.datastream.StreamException
abstract class BaseStreamSender<T>(
internal val destination: StreamDestination<T>,
) {
suspend fun write(data: T) {
if (!destination.isOpen) {
throw StreamException.TerminatedException()
}
writeImpl(data)
}
internal abstract suspend fun writeImpl(data: T)
suspend fun close(reason: String? = null) {
destination.close(reason)
}
}
/**
* @suppress
*/
interface StreamDestination<T> {
val isOpen: Boolean
suspend fun write(data: T, chunker: DataChunker<T>)
suspend fun close(reason: String?)
}
internal typealias DataChunker<T> = (data: T, chunkSize: Int) -> List<ByteArray>
... ...
/*
* Copyright 2025 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.livekit.android.room.datastream.outgoing
import io.livekit.android.room.datastream.ByteStreamInfo
import okio.Buffer
import okio.FileSystem
import okio.Path.Companion.toPath
import okio.Source
import okio.source
import java.io.InputStream
import java.util.Arrays
class ByteStreamSender(
val info: ByteStreamInfo,
destination: StreamDestination<ByteArray>,
) : BaseStreamSender<ByteArray>(destination = destination) {
override suspend fun writeImpl(data: ByteArray) {
destination.write(data, byteDataChunker)
}
}
private val byteDataChunker: DataChunker<ByteArray> = { data: ByteArray, chunkSize: Int ->
(data.indices step chunkSize)
.map { index -> Arrays.copyOfRange(data, index, index + chunkSize) }
}
/**
* Reads the file and writes it to the data stream.
*
* @throws
*/
suspend fun ByteStreamSender.writeFile(filePath: String) {
write(FileSystem.SYSTEM.source(filePath.toPath()))
}
/**
* Reads the input stream and sends it to the data stream.
*/
suspend fun ByteStreamSender.write(input: InputStream) {
write(input.source())
}
/**
* Reads the source and sends it to the data stream.
*/
suspend fun ByteStreamSender.write(source: Source) {
val buffer = Buffer()
while (true) {
val readLen = source.read(buffer, 4096)
if (readLen == -1L) {
break
}
write(buffer.readByteArray())
}
}
... ...
/*
* Copyright 2025 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.livekit.android.room.datastream.outgoing
import com.google.protobuf.ByteString
import io.livekit.android.room.RTCEngine
import io.livekit.android.room.datastream.ByteStreamInfo
import io.livekit.android.room.datastream.StreamBytesOptions
import io.livekit.android.room.datastream.StreamException
import io.livekit.android.room.datastream.StreamInfo
import io.livekit.android.room.datastream.StreamTextOptions
import io.livekit.android.room.datastream.TextStreamInfo
import io.livekit.android.room.participant.Participant
import io.livekit.android.util.LKLog
import livekit.LivekitModels.DataPacket
import livekit.LivekitModels.DataStream
import java.util.Collections
import java.util.Date
import java.util.concurrent.atomic.AtomicLong
import javax.inject.Inject
interface OutgoingDataStreamManager {
/**
* Start sending a stream of text
*/
suspend fun streamText(options: StreamTextOptions): TextStreamSender
/**
* Start sending a stream of bytes
*/
suspend fun streamBytes(options: StreamBytesOptions): ByteStreamSender
}
/**
* @suppress
*/
class OutgoingDataStreamManagerImpl
@Inject
constructor(
val engine: RTCEngine,
) : OutgoingDataStreamManager {
private data class Descriptor(
val info: StreamInfo,
val destinationIdentityStrings: List<String>,
var writtenLength: Long = 0L,
val nextChunkIndex: AtomicLong = AtomicLong(0),
)
private val openStreams = Collections.synchronizedMap(mutableMapOf<String, Descriptor>())
private suspend fun openStream(
info: StreamInfo,
destinationIdentities: List<Participant.Identity> = emptyList(),
) {
if (openStreams.containsKey(info.id)) {
throw StreamException.AlreadyOpenedException()
}
val destinationIdentityStrings = destinationIdentities.map { it.value }
val headerPacket = with(DataPacket.newBuilder()) {
addAllDestinationIdentities(destinationIdentityStrings)
kind = DataPacket.Kind.RELIABLE
streamHeader = with(DataStream.Header.newBuilder()) {
this.streamId = info.id
this.topic = info.topic
this.timestamp = info.timestampMs
this.putAllAttributes(info.attributes)
info.totalSize?.let {
this.totalLength = it
}
when (info) {
is ByteStreamInfo -> {
this.mimeType = info.mimeType
this.byteHeader = with(DataStream.ByteHeader.newBuilder()) {
this.name = name
build()
}
}
is TextStreamInfo -> {
textHeader = with(DataStream.TextHeader.newBuilder()) {
this.operationType = info.operationType.toProto()
this.version = info.version
this.replyToStreamId = info.replyToStreamId
this.addAllAttachedStreamIds(info.attachedStreamIds)
this.generated = info.generated
build()
}
}
}
build()
}
build()
}
engine.sendData(headerPacket)
val descriptor = Descriptor(info, destinationIdentityStrings)
openStreams[info.id] = descriptor
LKLog.d { "Opened send stream ${info.id}" }
}
private suspend fun sendChunk(streamId: String, dataChunk: ByteArray) {
val descriptor = openStreams[streamId] ?: throw StreamException.UnknownStreamException()
val nextChunkIndex = descriptor.nextChunkIndex.getAndIncrement()
val chunkPacket = with(DataPacket.newBuilder()) {
addAllDestinationIdentities(descriptor.destinationIdentityStrings)
kind = DataPacket.Kind.RELIABLE
streamChunk = with(DataStream.Chunk.newBuilder()) {
this.streamId = streamId
this.content = ByteString.copyFrom(dataChunk)
this.chunkIndex = nextChunkIndex
build()
}
build()
}
engine.waitForBufferStatusLow(DataPacket.Kind.RELIABLE)
engine.sendData(chunkPacket)
}
private suspend fun closeStream(streamId: String, reason: String? = null) {
val descriptor = openStreams[streamId] ?: throw StreamException.UnknownStreamException()
val trailerPacket = with(DataPacket.newBuilder()) {
addAllDestinationIdentities(descriptor.destinationIdentityStrings)
kind = DataPacket.Kind.RELIABLE
streamTrailer = with(DataStream.Trailer.newBuilder()) {
this.streamId = streamId
if (reason != null) {
this.reason = reason
}
build()
}
build()
}
engine.waitForBufferStatusLow(DataPacket.Kind.RELIABLE)
engine.sendData(trailerPacket)
openStreams.remove(streamId)
LKLog.d { "Closed send stream $streamId" }
}
override suspend fun streamText(options: StreamTextOptions): TextStreamSender {
val streamInfo = TextStreamInfo(
id = options.streamId,
topic = options.topic,
timestampMs = Date().time,
totalSize = options.totalSize,
attributes = options.attributes,
operationType = options.operationType,
version = options.version,
replyToStreamId = options.replyToStreamId,
attachedStreamIds = options.attachedStreamIds,
generated = false,
)
val streamId = options.streamId
openStream(streamInfo, options.destinationIdentities)
val destination = ManagerStreamDestination<String>(streamId)
return TextStreamSender(
streamInfo,
destination,
)
}
override suspend fun streamBytes(options: StreamBytesOptions): ByteStreamSender {
val streamInfo = ByteStreamInfo(
id = options.streamId,
topic = options.topic,
timestampMs = Date().time,
totalSize = options.totalSize,
attributes = options.attributes,
mimeType = options.mimeType,
name = options.name,
)
val streamId = options.streamId
openStream(streamInfo, options.destinationIdentities)
val destination = ManagerStreamDestination<ByteArray>(streamId)
return ByteStreamSender(
streamInfo,
destination,
)
}
private inner class ManagerStreamDestination<T>(val streamId: String) : StreamDestination<T> {
override val isOpen: Boolean
get() = openStreams.contains(streamId)
override suspend fun write(data: T, chunker: DataChunker<T>) {
val chunks = chunker.invoke(data, RTCEngine.MAX_DATA_PACKET_SIZE)
for (chunk in chunks) {
sendChunk(streamId, chunk)
}
}
override suspend fun close(reason: String?) {
closeStream(streamId, reason)
}
}
}
... ...
/*
* Copyright 2025 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.livekit.android.room.datastream.outgoing
import io.livekit.android.room.datastream.TextStreamInfo
import java.util.Arrays
class TextStreamSender(
val info: TextStreamInfo,
destination: StreamDestination<String>,
) : BaseStreamSender<String>(destination) {
override suspend fun writeImpl(data: String) {
destination.write(data, stringChunker)
}
}
private val stringChunker: DataChunker<String> = { text: String, chunkSize: Int ->
val utf8Array = text.toByteArray(Charsets.UTF_8)
val result = mutableListOf<ByteArray>()
var startIndex = 0
var endIndex = 0
var i = 0
while (i < utf8Array.size) {
val nextHead = utf8Array[i].toInt()
val nextCharPointSize = if ((nextHead and 0b1111_1000) == 0b1111_0000) {
4
} else if ((nextHead and 0b1111_0000) == 0b1110_0000) {
3
} else if ((nextHead and 0b1110_0000) == 0b1100_0000) {
2
} else {
1
}
val curLength = endIndex - startIndex
if (curLength + nextCharPointSize > chunkSize) {
result.add(Arrays.copyOfRange(utf8Array, startIndex, endIndex))
startIndex = endIndex
}
i += nextCharPointSize
endIndex = i
}
// Last chunk done manually
if (startIndex != endIndex) {
result.add(Arrays.copyOfRange(utf8Array, startIndex, endIndex))
}
result
}
... ...
... ... @@ -34,6 +34,7 @@ import io.livekit.android.room.DefaultsManager
import io.livekit.android.room.RTCEngine
import io.livekit.android.room.Room
import io.livekit.android.room.TrackBitrateInfo
import io.livekit.android.room.datastream.outgoing.OutgoingDataStreamManager
import io.livekit.android.room.isSVCCodec
import io.livekit.android.room.rpc.RpcManager
import io.livekit.android.room.track.DataPublishReliability
... ... @@ -106,7 +107,8 @@ internal constructor(
coroutineDispatcher: CoroutineDispatcher,
@Named(InjectionNames.SENDER)
private val capabilitiesGetter: CapabilitiesGetter,
) : Participant(Sid(""), null, coroutineDispatcher) {
private val outgoingDataStreamManager: OutgoingDataStreamManager,
) : Participant(Sid(""), null, coroutineDispatcher), OutgoingDataStreamManager by outgoingDataStreamManager {
var audioTrackCaptureDefaults: LocalAudioTrackOptions by defaultsManager::audioTrackCaptureDefaults
var audioTrackPublishDefaults: AudioTrackPublishDefaults by defaultsManager::audioTrackPublishDefaults
... ...
/*
* Copyright 2025 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.livekit.android.webrtc
import io.livekit.android.coroutines.cancelOnSignal
import io.livekit.android.room.RTCEngine
import io.livekit.android.util.FlowObservable
import io.livekit.android.util.flow
import io.livekit.android.util.flowDelegate
import io.livekit.android.webrtc.peerconnection.executeOnRTCThread
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.takeWhile
import livekit.org.webrtc.DataChannel
/**
* @suppress
*/
class DataChannelManager(
val dataChannel: DataChannel,
private val dataMessageListener: DataChannel.Observer,
) : DataChannel.Observer {
@get:FlowObservable
var disposed by flowDelegate(false)
private set
@get:FlowObservable
var bufferedAmount by flowDelegate(0L)
private set
@get:FlowObservable
var state by flowDelegate(dataChannel.state())
private set
suspend fun waitForBufferedAmountLow(amount: Long = RTCEngine.MAX_DATA_PACKET_SIZE.toLong()) {
val signal = ::disposed.flow.map { if (it) Unit else null }
::bufferedAmount.flow
.cancelOnSignal(signal)
.takeWhile { it > amount }
.collect()
}
override fun onBufferedAmountChange(previousAmount: Long) {
bufferedAmount = dataChannel.bufferedAmount()
}
override fun onStateChange() {
state = dataChannel.state()
}
override fun onMessage(buffer: DataChannel.Buffer) {
dataMessageListener.onMessage(buffer)
}
fun dispose() {
synchronized(this) {
if (disposed) {
return
}
disposed = true
}
executeOnRTCThread {
dataChannel.unregisterObserver()
dataChannel.close()
dataChannel.dispose()
}
}
interface Listener {
fun onBufferedAmountChange(dataChannel: DataChannel, newAmount: Long, previousAmount: Long)
fun onStateChange(dataChannel: DataChannel, state: DataChannel.State)
fun onMessage(dataChannel: DataChannel, buffer: DataChannel.Buffer)
}
}
... ...
/*
* Copyright 2023-2024 LiveKit, Inc.
* Copyright 2023-2025 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
... ... @@ -16,11 +16,9 @@
package io.livekit.android.test.coroutines
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.cancel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import io.livekit.android.coroutines.takeUntilSignal
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.fold
/**
* Collect all items until signal is given.
... ... @@ -31,21 +29,3 @@ suspend fun <T> Flow<T>.toListUntilSignal(signal: Flow<Unit?>): List<T> {
list.plus(event)
}
}
fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit?>): Flow<T> = flow {
try {
coroutineScope {
launch {
signal.takeWhile { it == null }.collect()
println("signalled")
this@coroutineScope.cancel()
}
collect {
emit(it)
}
}
} catch (e: CancellationException) {
// ignore
}
}
... ...
/*
* Copyright 2023-2024 LiveKit, Inc.
* Copyright 2023-2025 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
... ... @@ -19,6 +19,7 @@ package io.livekit.android.test.mock.dagger
import android.content.Context
import dagger.BindsInstance
import dagger.Component
import io.livekit.android.dagger.InternalBindsModule
import io.livekit.android.dagger.JsonFormatModule
import io.livekit.android.dagger.LiveKitComponent
import io.livekit.android.dagger.MemoryModule
... ... @@ -36,6 +37,7 @@ import javax.inject.Singleton
TestAudioHandlerModule::class,
JsonFormatModule::class,
MemoryModule::class,
InternalBindsModule::class,
],
)
interface TestLiveKitComponent : LiveKitComponent {
... ...
... ... @@ -27,6 +27,9 @@ import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
/**
* A test that ensures all the proto enum cases match their sdk counterparts.
*/
@RunWith(Parameterized::class)
class ProtoConverterTest(
val protoClass: Class<*>,
... ...
/*
* Copyright 2023-2025 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.livekit.android.room
import com.google.protobuf.ByteString
import io.livekit.android.room.datastream.StreamException
import io.livekit.android.test.MockE2ETest
import io.livekit.android.test.assert.assertIsClass
import io.livekit.android.test.mock.MockDataChannel
import io.livekit.android.test.mock.MockPeerConnection
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.launch
import livekit.LivekitModels.DataPacket
import livekit.LivekitModels.DataStream
import livekit.LivekitModels.DataStream.OperationType
import livekit.LivekitModels.DataStream.TextHeader
import livekit.org.webrtc.DataChannel
import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
import org.junit.Test
import java.nio.ByteBuffer
@OptIn(ExperimentalCoroutinesApi::class)
class RoomDataStreamMockE2ETest : MockE2ETest() {
@Test
fun dataStream() = runTest {
connect()
val subPeerConnection = component.rtcEngine().getSubscriberPeerConnection() as MockPeerConnection
val subDataChannel = MockDataChannel(RTCEngine.RELIABLE_DATA_CHANNEL_LABEL)
subPeerConnection.observer?.onDataChannel(subDataChannel)
val scope = CoroutineScope(currentCoroutineContext())
val collectedData = mutableListOf<ByteArray>()
var finished = false
room.registerByteStreamHandler("topic") { reader, _ ->
scope.launch {
reader.flow.collect {
collectedData.add(it)
}
finished = true
}
}
subDataChannel.observer?.onMessage(createStreamHeader().wrap())
subDataChannel.observer?.onMessage(createStreamChunk(0, ByteArray(1) { 1 }).wrap())
subDataChannel.observer?.onMessage(createStreamTrailer().wrap())
assertTrue(finished)
assertEquals(1, collectedData.size)
assertEquals(1, collectedData[0][0].toInt())
}
@Test
fun textStream() = runTest {
connect()
val subPeerConnection = component.rtcEngine().getSubscriberPeerConnection() as MockPeerConnection
val subDataChannel = MockDataChannel(RTCEngine.RELIABLE_DATA_CHANNEL_LABEL)
subPeerConnection.observer?.onDataChannel(subDataChannel)
val scope = CoroutineScope(currentCoroutineContext())
val collectedData = mutableListOf<String>()
var finished = false
room.registerTextStreamHandler("topic") { reader, _ ->
scope.launch {
reader.flow.collect {
collectedData.add(it)
}
finished = true
}
}
val textStreamHeader = with(createStreamHeader().toBuilder()) {
streamHeader = with(streamHeader.toBuilder()) {
clearByteHeader()
textHeader = with(TextHeader.newBuilder()) {
operationType = OperationType.CREATE
generated = false
build()
}
build()
}
build()
}
subDataChannel.observer?.onMessage(textStreamHeader.wrap())
subDataChannel.observer?.onMessage(createStreamChunk(0, "hello".toByteArray()).wrap())
subDataChannel.observer?.onMessage(createStreamTrailer().wrap())
assertTrue(finished)
assertEquals(1, collectedData.size)
assertEquals("hello", collectedData[0])
}
@Test
fun dataStreamTerminated() = runTest {
connect()
val subPeerConnection = component.rtcEngine().getSubscriberPeerConnection() as MockPeerConnection
val subDataChannel = MockDataChannel(RTCEngine.RELIABLE_DATA_CHANNEL_LABEL)
subPeerConnection.observer?.onDataChannel(subDataChannel)
val scope = CoroutineScope(currentCoroutineContext())
var finished = false
var threwOnce = false
room.registerByteStreamHandler("topic") { reader, _ ->
scope.launch {
reader.flow
.catch {
assertIsClass(StreamException.AbnormalEndException::class.java, it)
threwOnce = true
}
.collect {
}
finished = true
}
}
subDataChannel.observer?.onMessage(createStreamHeader().wrap())
val abnormalEnd = with(DataPacket.newBuilder()) {
streamTrailer = with(DataStream.Trailer.newBuilder()) {
streamId = "streamId"
reason = "reason"
build()
}
build()
}
subDataChannel.observer?.onMessage(abnormalEnd.wrap())
assertTrue(finished)
assertTrue(threwOnce)
}
@Test
fun dataStreamLengthExceeded() = runTest {
connect()
val subPeerConnection = component.rtcEngine().getSubscriberPeerConnection() as MockPeerConnection
val subDataChannel = MockDataChannel(RTCEngine.RELIABLE_DATA_CHANNEL_LABEL)
subPeerConnection.observer?.onDataChannel(subDataChannel)
val scope = CoroutineScope(currentCoroutineContext())
var finished = false
var threwOnce = false
room.registerByteStreamHandler("topic") { reader, _ ->
scope.launch {
reader.flow
.catch {
assertIsClass(StreamException.LengthExceededException::class.java, it)
threwOnce = true
}
.collect {
}
finished = true
}
}
val header = with(createStreamHeader().toBuilder()) {
streamHeader = with(streamHeader.toBuilder()) {
totalLength = 1
build()
}
build()
}
subDataChannel.observer?.onMessage(header.wrap())
subDataChannel.observer?.onMessage(createStreamChunk(0, ByteArray(2) { 1 }).wrap())
subDataChannel.observer?.onMessage(createStreamTrailer().wrap())
assertTrue(finished)
assertTrue(threwOnce)
}
@Test
fun dataStreamIncomplete() = runTest {
connect()
val subPeerConnection = component.rtcEngine().getSubscriberPeerConnection() as MockPeerConnection
val subDataChannel = MockDataChannel(RTCEngine.RELIABLE_DATA_CHANNEL_LABEL)
subPeerConnection.observer?.onDataChannel(subDataChannel)
val scope = CoroutineScope(currentCoroutineContext())
var finished = false
var threwOnce = false
room.registerByteStreamHandler("topic") { reader, _ ->
scope.launch {
reader.flow
.catch {
assertIsClass(StreamException.IncompleteException::class.java, it)
threwOnce = true
}
.collect {
}
finished = true
}
}
val header = with(createStreamHeader().toBuilder()) {
streamHeader = with(streamHeader.toBuilder()) {
totalLength = 2
build()
}
build()
}
subDataChannel.observer?.onMessage(header.wrap())
subDataChannel.observer?.onMessage(createStreamChunk(0, ByteArray(1) { 1 }).wrap())
subDataChannel.observer?.onMessage(createStreamTrailer().wrap())
assertTrue(finished)
assertTrue(threwOnce)
}
private fun DataPacket.wrap() = DataChannel.Buffer(
ByteBuffer.wrap(this.toByteArray()),
true,
)
fun createStreamHeader() = with(DataPacket.newBuilder()) {
streamHeader = with(DataStream.Header.newBuilder()) {
streamId = "streamId"
topic = "topic"
timestamp = 0L
clearTotalLength()
mimeType = "mime"
byteHeader = with(DataStream.ByteHeader.newBuilder()) {
name = "name"
build()
}
build()
}
build()
}
fun createStreamChunk(index: Int, bytes: ByteArray) = with(DataPacket.newBuilder()) {
streamChunk = with(DataStream.Chunk.newBuilder()) {
streamId = "streamId"
chunkIndex = index.toLong()
content = ByteString.copyFrom(bytes)
build()
}
build()
}
fun createStreamTrailer() = with(DataPacket.newBuilder()) {
streamTrailer = with(DataStream.Trailer.newBuilder()) {
streamId = "streamId"
build()
}
build()
}
}
... ...
... ... @@ -29,6 +29,7 @@ import io.livekit.android.events.EventListenable
import io.livekit.android.events.ParticipantEvent
import io.livekit.android.events.RoomEvent
import io.livekit.android.memory.CloseableManager
import io.livekit.android.room.datastream.incoming.IncomingDataStreamManagerImpl
import io.livekit.android.room.network.NetworkCallbackManagerImpl
import io.livekit.android.room.participant.LocalParticipant
import io.livekit.android.test.assert.assertIsClassList
... ... @@ -133,6 +134,7 @@ class RoomTest {
regionUrlProviderFactory = regionUrlProviderFactory,
connectionWarmer = MockConnectionWarmer(),
audioRecordPrewarmer = NoAudioRecordPrewarmer(),
incomingDataStreamManager = IncomingDataStreamManagerImpl(),
)
}
... ...
/*
* Copyright 2025 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.livekit.android.room.datastream
import io.livekit.android.room.datastream.incoming.ByteStreamReceiver
import io.livekit.android.room.datastream.incoming.IncomingDataStreamManagerImpl
import io.livekit.android.test.BaseTest
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.runBlocking
import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
import org.junit.Before
import org.junit.Test
@OptIn(ExperimentalCoroutinesApi::class)
class StreamReaderTest : BaseTest() {
lateinit var channel: Channel<ByteArray>
lateinit var reader: ByteStreamReceiver
@Before
fun setup() {
channel = IncomingDataStreamManagerImpl.createChannelForStreamReceiver()
channel.trySend(ByteArray(1) { 0 })
channel.trySend(ByteArray(1) { 1 })
channel.trySend(ByteArray(1) { 2 })
channel.close()
val streamInfo = ByteStreamInfo(id = "id", topic = "topic", timestampMs = 3, totalSize = null, attributes = mapOf(), mimeType = "mime", name = null)
reader = ByteStreamReceiver(streamInfo, channel)
}
@Test
fun buffersDataUntilSubscribed() = runTest {
var count = 0
runBlocking {
reader.flow.collect {
assertEquals(count, it[0].toInt())
count++
}
}
assertEquals(3, count)
}
@Test
fun readEach() = runTest {
runBlocking {
for (i in 0..2) {
val next = reader.readNext()
assertEquals(i, next[0].toInt())
}
}
}
@Test
fun readAll() = runTest {
runBlocking {
val data = reader.readAll()
assertEquals(3, data.size)
for (i in 0..2) {
assertEquals(i, data[i][0].toInt())
}
}
}
@Test
fun overreadThrows() = runTest {
var threwOnce = false
runBlocking {
try {
for (i in 0..3) {
val next = reader.readNext()
assertEquals(i, next[0].toInt())
}
} catch (e: NoSuchElementException) {
threwOnce = true
}
}
assertTrue(threwOnce)
}
}
... ...