davidliu
Committed by GitHub

Reliable data channel (#738)

* e2e reliablility for data channel

* fix tests

* changeset

* add .idea to gitignore

* change to patch change
  1 +---
  2 +"client-sdk-android": patch
  3 +---
  4 +
  5 +E2E reliability for data channels with resending after reconnects
1 *.iml 1 *.iml
2 .gradle 2 .gradle
3 /local.properties 3 /local.properties
4 -/.idea/caches  
5 -/.idea/libraries  
6 -/.idea/modules.xml  
7 -/.idea/workspace.xml  
8 -/.idea/navEditor.xml  
9 -/.idea/assetWizardSettings.xml  
10 -/.idea/deploymentTargetDropDown.xml  
11 -/.idea/misc.xml  
12 -/.idea/gradle.xml  
13 -/.idea/runConfigurations.xml  
14 -/.idea/deploymentTargetSelector.xml 4 +/.idea
15 .DS_Store 5 .DS_Store
16 /build 6 /build
17 /captures 7 /captures
1 <component name="ProjectCodeStyleConfiguration"> 1 <component name="ProjectCodeStyleConfiguration">
2 <code_scheme name="Project" version="173"> 2 <code_scheme name="Project" version="173">
  3 + <JavaCodeStyleSettings>
  4 + <option name="IMPORT_LAYOUT_TABLE">
  5 + <value>
  6 + <package name="" withSubpackages="true" static="false" module="true" />
  7 + <package name="android" withSubpackages="true" static="true" />
  8 + <package name="androidx" withSubpackages="true" static="true" />
  9 + <package name="com" withSubpackages="true" static="true" />
  10 + <package name="junit" withSubpackages="true" static="true" />
  11 + <package name="net" withSubpackages="true" static="true" />
  12 + <package name="org" withSubpackages="true" static="true" />
  13 + <package name="java" withSubpackages="true" static="true" />
  14 + <package name="javax" withSubpackages="true" static="true" />
  15 + <package name="" withSubpackages="true" static="true" />
  16 + <emptyLine />
  17 + <package name="android" withSubpackages="true" static="false" />
  18 + <emptyLine />
  19 + <package name="androidx" withSubpackages="true" static="false" />
  20 + <emptyLine />
  21 + <package name="com" withSubpackages="true" static="false" />
  22 + <emptyLine />
  23 + <package name="junit" withSubpackages="true" static="false" />
  24 + <emptyLine />
  25 + <package name="net" withSubpackages="true" static="false" />
  26 + <emptyLine />
  27 + <package name="org" withSubpackages="true" static="false" />
  28 + <emptyLine />
  29 + <package name="java" withSubpackages="true" static="false" />
  30 + <emptyLine />
  31 + <package name="javax" withSubpackages="true" static="false" />
  32 + <emptyLine />
  33 + <package name="" withSubpackages="true" static="false" />
  34 + <emptyLine />
  35 + </value>
  36 + </option>
  37 + </JavaCodeStyleSettings>
3 <JetCodeStyleSettings> 38 <JetCodeStyleSettings>
4 <option name="CODE_STYLE_DEFAULTS" value="KOTLIN_OFFICIAL" /> 39 <option name="CODE_STYLE_DEFAULTS" value="KOTLIN_OFFICIAL" />
5 </JetCodeStyleSettings> 40 </JetCodeStyleSettings>
@@ -299,6 +299,7 @@ enum class DisconnectReason { @@ -299,6 +299,7 @@ enum class DisconnectReason {
299 USER_REJECTED, 299 USER_REJECTED,
300 SIP_TRUNK_FAILURE, 300 SIP_TRUNK_FAILURE,
301 CONNECTION_TIMEOUT, 301 CONNECTION_TIMEOUT,
  302 + MEDIA_FAILURE,
302 } 303 }
303 304
304 /** 305 /**
@@ -320,6 +321,7 @@ fun LivekitModels.DisconnectReason?.convert(): DisconnectReason { @@ -320,6 +321,7 @@ fun LivekitModels.DisconnectReason?.convert(): DisconnectReason {
320 LivekitModels.DisconnectReason.USER_REJECTED -> DisconnectReason.USER_REJECTED 321 LivekitModels.DisconnectReason.USER_REJECTED -> DisconnectReason.USER_REJECTED
321 LivekitModels.DisconnectReason.SIP_TRUNK_FAILURE -> DisconnectReason.SIP_TRUNK_FAILURE 322 LivekitModels.DisconnectReason.SIP_TRUNK_FAILURE -> DisconnectReason.SIP_TRUNK_FAILURE
322 LivekitModels.DisconnectReason.CONNECTION_TIMEOUT -> DisconnectReason.CONNECTION_TIMEOUT 323 LivekitModels.DisconnectReason.CONNECTION_TIMEOUT -> DisconnectReason.CONNECTION_TIMEOUT
  324 + LivekitModels.DisconnectReason.MEDIA_FAILURE -> DisconnectReason.MEDIA_FAILURE
323 LivekitModels.DisconnectReason.UNKNOWN_REASON, 325 LivekitModels.DisconnectReason.UNKNOWN_REASON,
324 LivekitModels.DisconnectReason.UNRECOGNIZED, 326 LivekitModels.DisconnectReason.UNRECOGNIZED,
325 null, 327 null,
@@ -36,10 +36,13 @@ import io.livekit.android.util.CloseableCoroutineScope @@ -36,10 +36,13 @@ import io.livekit.android.util.CloseableCoroutineScope
36 import io.livekit.android.util.Either 36 import io.livekit.android.util.Either
37 import io.livekit.android.util.FlowObservable 37 import io.livekit.android.util.FlowObservable
38 import io.livekit.android.util.LKLog 38 import io.livekit.android.util.LKLog
  39 +import io.livekit.android.util.TTLMap
39 import io.livekit.android.util.flowDelegate 40 import io.livekit.android.util.flowDelegate
40 import io.livekit.android.util.nullSafe 41 import io.livekit.android.util.nullSafe
41 import io.livekit.android.util.withCheckLock 42 import io.livekit.android.util.withCheckLock
42 import io.livekit.android.webrtc.DataChannelManager 43 import io.livekit.android.webrtc.DataChannelManager
  44 +import io.livekit.android.webrtc.DataPacketBuffer
  45 +import io.livekit.android.webrtc.DataPacketItem
43 import io.livekit.android.webrtc.RTCStatsGetter 46 import io.livekit.android.webrtc.RTCStatsGetter
44 import io.livekit.android.webrtc.copy 47 import io.livekit.android.webrtc.copy
45 import io.livekit.android.webrtc.isConnected 48 import io.livekit.android.webrtc.isConnected
@@ -171,6 +174,11 @@ internal constructor( @@ -171,6 +174,11 @@ internal constructor(
171 private var lossyDataChannelManager: DataChannelManager? = null 174 private var lossyDataChannelManager: DataChannelManager? = null
172 private var lossyDataChannelSubManager: DataChannelManager? = null 175 private var lossyDataChannelSubManager: DataChannelManager? = null
173 176
  177 + private val reliableStateLock = Object()
  178 + private var reliableDataSequence: Int = 1
  179 + private val reliableMessageBuffer = DataPacketBuffer(RELIABLE_RETRY_AMOUNT)
  180 + private val reliableReceivedState = TTLMap<String, Int>(RELIABLE_RECEIVE_STATE_TTL_MS)
  181 +
174 private var isSubscriberPrimary = false 182 private var isSubscriberPrimary = false
175 private var isClosed = true 183 private var isClosed = true
176 184
@@ -403,6 +411,12 @@ internal constructor( @@ -403,6 +411,12 @@ internal constructor(
403 abortPendingPublishTracks() 411 abortPendingPublishTracks()
404 closeResources(reason) 412 closeResources(reason)
405 connectionState = ConnectionState.DISCONNECTED 413 connectionState = ConnectionState.DISCONNECTED
  414 +
  415 + synchronized(reliableStateLock) {
  416 + reliableDataSequence = 1
  417 + reliableMessageBuffer.clear()
  418 + reliableReceivedState.clear()
  419 + }
406 } 420 }
407 421
408 private fun closeResources(reason: String) { 422 private fun closeResources(reason: String) {
@@ -506,6 +520,7 @@ internal constructor( @@ -506,6 +520,7 @@ internal constructor(
506 ReconnectType.FORCE_FULL_RECONNECT -> true 520 ReconnectType.FORCE_FULL_RECONNECT -> true
507 } 521 }
508 522
  523 + var lastMessageSeq: Int? = null
509 val connectOptions = connectOptions ?: ConnectOptions() 524 val connectOptions = connectOptions ?: ConnectOptions()
510 if (isFullReconnect) { 525 if (isFullReconnect) {
511 LKLog.v { "Attempting full reconnect." } 526 LKLog.v { "Attempting full reconnect." }
@@ -539,6 +554,7 @@ internal constructor( @@ -539,6 +554,7 @@ internal constructor(
539 val rtcConfig = makeRTCConfig(Either.Right(reconnectResponse), connectOptions) 554 val rtcConfig = makeRTCConfig(Either.Right(reconnectResponse), connectOptions)
540 subscriber?.updateRTCConfig(rtcConfig) 555 subscriber?.updateRTCConfig(rtcConfig)
541 publisher?.updateRTCConfig(rtcConfig) 556 publisher?.updateRTCConfig(rtcConfig)
  557 + lastMessageSeq = reconnectResponse.lastMessageSeq
542 } 558 }
543 client.onReadyForResponses() 559 client.onReadyForResponses()
544 } catch (e: Exception) { 560 } catch (e: Exception) {
@@ -590,6 +606,9 @@ internal constructor( @@ -590,6 +606,9 @@ internal constructor(
590 if (connectionState == ConnectionState.CONNECTED && 606 if (connectionState == ConnectionState.CONNECTED &&
591 (!hasPublished || publisher?.isConnected() == true) 607 (!hasPublished || publisher?.isConnected() == true)
592 ) { 608 ) {
  609 + if (lastMessageSeq != null) {
  610 + resendReliableMessagesForResume(lastMessageSeq)
  611 + }
593 // Is connected, notify and return. 612 // Is connected, notify and return.
594 regionUrlProvider?.clearAttemptedRegions() 613 regionUrlProvider?.clearAttemptedRegions()
595 client.onPCConnected() 614 client.onPCConnected()
@@ -630,21 +649,62 @@ internal constructor( @@ -630,21 +649,62 @@ internal constructor(
630 649
631 @CheckResult 650 @CheckResult
632 internal suspend fun sendData(dataPacket: LivekitModels.DataPacket): Result<Unit> { 651 internal suspend fun sendData(dataPacket: LivekitModels.DataPacket): Result<Unit> {
633 - try {  
634 - ensurePublisherConnected(dataPacket.kind) 652 + ensurePublisherConnected(dataPacket.kind)
  653 +
  654 + fun sendDataImpl(dataPacket: LivekitModels.DataPacket): Result<Unit> {
  655 + try {
  656 + // Redeclare to make variable
  657 + var dataPacket = dataPacket
  658 + if (dataPacket.kind == LivekitModels.DataPacket.Kind.RELIABLE) {
  659 + dataPacket = dataPacket.toBuilder()
  660 + .setSequence(reliableDataSequence)
  661 + .build()
  662 + reliableDataSequence++
  663 + }
635 664
636 - val buf = DataChannel.Buffer(  
637 - ByteBuffer.wrap(dataPacket.toByteArray()),  
638 - true,  
639 - ) 665 + val byteBuffer = ByteBuffer.wrap(dataPacket.toByteArray())
  666 +
  667 + if (dataPacket.kind == LivekitModels.DataPacket.Kind.RELIABLE) {
  668 + reliableMessageBuffer.queue(DataPacketItem(byteBuffer, dataPacket.sequence))
  669 + if (this.connectionState == ConnectionState.RECONNECTING) {
  670 + return Result.success(Unit)
  671 + }
  672 + }
  673 + val buf = DataChannel.Buffer(
  674 + byteBuffer,
  675 + true,
  676 + )
  677 + val channel = dataChannelForKind(dataPacket.kind)
  678 + ?: throw RoomException.ConnectException("channel not established for ${dataPacket.kind.name}")
640 679
641 - val channel = dataChannelForKind(dataPacket.kind)  
642 - ?: throw RoomException.ConnectException("channel not established for ${dataPacket.kind.name}") 680 + channel.send(buf)
  681 + } catch (e: Exception) {
  682 + return Result.failure(e)
  683 + }
  684 + return Result.success(Unit)
  685 + }
643 686
644 - channel.send(buf)  
645 - } catch (e: Exception) {  
646 - return Result.failure(e) 687 + if (dataPacket.kind == LivekitModels.DataPacket.Kind.RELIABLE) {
  688 + synchronized(reliableStateLock) {
  689 + return sendDataImpl(dataPacket)
  690 + }
  691 + } else {
  692 + return sendDataImpl(dataPacket)
647 } 693 }
  694 + }
  695 +
  696 + internal suspend fun resendReliableMessagesForResume(lastMessageSeq: Int): Result<Unit> {
  697 + ensurePublisherConnected(LivekitModels.DataPacket.Kind.RELIABLE)
  698 + val channel = dataChannelForKind(LivekitModels.DataPacket.Kind.RELIABLE)
  699 + ?: return Result.failure(NullPointerException("reliable channel not established!"))
  700 +
  701 + synchronized(reliableStateLock) {
  702 + reliableMessageBuffer.popToSequence(lastMessageSeq)
  703 + reliableMessageBuffer.getAll().forEach { item ->
  704 + channel.send(DataChannel.Buffer(item.data, true))
  705 + }
  706 + }
  707 +
648 return Result.success(Unit) 708 return Result.success(Unit)
649 } 709 }
650 710
@@ -858,7 +918,10 @@ internal constructor( @@ -858,7 +918,10 @@ internal constructor(
858 private const val MAX_RECONNECT_TIMEOUT = 60 * 1000 918 private const val MAX_RECONNECT_TIMEOUT = 60 * 1000
859 private const val MAX_ICE_CONNECT_TIMEOUT_MS = 20000 919 private const val MAX_ICE_CONNECT_TIMEOUT_MS = 20000
860 920
861 - private const val DATA_CHANNEL_LOW_THRESHOLD = 64 * 1024 // 64 KB 921 + private const val DATA_CHANNEL_LOW_THRESHOLD = 2 * 1024 * 1024 // 64 KB
  922 +
  923 + private val RELIABLE_RECEIVE_STATE_TTL_MS = 30.seconds
  924 + private val RELIABLE_RETRY_AMOUNT = (DATA_CHANNEL_LOW_THRESHOLD * 1.25).toLong()
862 925
863 internal val CONN_CONSTRAINTS = MediaConstraints().apply { 926 internal val CONN_CONSTRAINTS = MediaConstraints().apply {
864 with(optional) { 927 with(optional) {
@@ -1080,6 +1143,17 @@ internal constructor( @@ -1080,6 +1143,17 @@ internal constructor(
1080 return 1143 return
1081 } 1144 }
1082 val dp = LivekitModels.DataPacket.parseFrom(ByteString.copyFrom(buffer.data)) 1145 val dp = LivekitModels.DataPacket.parseFrom(ByteString.copyFrom(buffer.data))
  1146 +
  1147 + if (dp.sequence > 0 && dp.participantSid.isNotEmpty()) {
  1148 + synchronized(reliableStateLock) {
  1149 + val lastSeq = reliableReceivedState[dp.participantSid]
  1150 + if (lastSeq != null && dp.sequence <= lastSeq) {
  1151 + // ignore duplicate or out-of-order packets in reliable channel
  1152 + return
  1153 + }
  1154 + this.reliableReceivedState[dp.participantSid] = dp.sequence
  1155 + }
  1156 + }
1083 when (dp.valueCase) { 1157 when (dp.valueCase) {
1084 LivekitModels.DataPacket.ValueCase.SPEAKER -> { 1158 LivekitModels.DataPacket.ValueCase.SPEAKER -> {
1085 listener?.onActiveSpeakersUpdate(dp.speaker.speakersList) 1159 listener?.onActiveSpeakersUpdate(dp.speaker.speakersList)
@@ -1145,8 +1219,13 @@ internal constructor( @@ -1145,8 +1219,13 @@ internal constructor(
1145 subscription: LivekitRtc.UpdateSubscription, 1219 subscription: LivekitRtc.UpdateSubscription,
1146 publishedTracks: List<LivekitRtc.TrackPublishedResponse>, 1220 publishedTracks: List<LivekitRtc.TrackPublishedResponse>,
1147 ) { 1221 ) {
1148 - val answer = runBlocking {  
1149 - subscriber?.withPeerConnection { localDescription?.toProtoSessionDescription() } 1222 + var answer: LivekitRtc.SessionDescription? = null
  1223 + var offer: LivekitRtc.SessionDescription? = null
  1224 + runBlocking {
  1225 + subscriber?.withPeerConnection {
  1226 + answer = localDescription?.toProtoSessionDescription()
  1227 + offer = remoteDescription?.toProtoSessionDescription()
  1228 + }
1150 } 1229 }
1151 1230
1152 val dataChannelInfos = LivekitModels.DataPacket.Kind.values() 1231 val dataChannelInfos = LivekitModels.DataPacket.Kind.values()
@@ -1160,13 +1239,25 @@ internal constructor( @@ -1160,13 +1239,25 @@ internal constructor(
1160 .build() 1239 .build()
1161 } 1240 }
1162 1241
  1242 + val dataChannelReceiveStates = this.reliableReceivedState.map { (participantSid, sequence) ->
  1243 + with(LivekitRtc.DataChannelReceiveState.newBuilder()) {
  1244 + publisherSid = participantSid
  1245 + lastSeq = sequence
  1246 + build()
  1247 + }
  1248 + }
  1249 +
1163 val syncState = with(LivekitRtc.SyncState.newBuilder()) { 1250 val syncState = with(LivekitRtc.SyncState.newBuilder()) {
1164 if (answer != null) { 1251 if (answer != null) {
1165 setAnswer(answer) 1252 setAnswer(answer)
1166 } 1253 }
  1254 + if (offer != null) {
  1255 + setOffer(offer)
  1256 + }
1167 setSubscription(subscription) 1257 setSubscription(subscription)
1168 addAllPublishTracks(publishedTracks) 1258 addAllPublishTracks(publishedTracks)
1169 addAllDataChannels(dataChannelInfos) 1259 addAllDataChannels(dataChannelInfos)
  1260 + addAllDatachannelReceiveStates(dataChannelReceiveStates)
1170 build() 1261 build()
1171 } 1262 }
1172 1263
  1 +/*
  2 + * Copyright 2025 LiveKit, Inc.
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +
  17 +package io.livekit.android.util
  18 +
  19 +import android.os.SystemClock
  20 +import kotlin.time.Duration
  21 +import kotlin.time.DurationUnit
  22 +
  23 +/**
  24 + * @suppress
  25 + */
  26 +class TTLMap<K, V>(
  27 + val ttl: Duration,
  28 + private val clock: () -> Long = { SystemClock.elapsedRealtime() },
  29 +) : MutableMap<K, V> {
  30 +
  31 + private data class TTLItem<V>(val value: V, val expiresAt: Long)
  32 +
  33 + private val map = mutableMapOf<K, TTLItem<V>>()
  34 + private val lastCleanup = getNow()
  35 +
  36 + override fun get(key: K): V? {
  37 + val item = map[key]
  38 +
  39 + if (item == null) {
  40 + return null
  41 + }
  42 +
  43 + if (item.expiresAt < getNow()) {
  44 + map.remove(key)
  45 + return null
  46 + }
  47 + return item.value
  48 + }
  49 +
  50 + override val size: Int
  51 + get() {
  52 + cleanup()
  53 + return map.size
  54 + }
  55 +
  56 + override fun containsKey(key: K): Boolean = get(key) != null
  57 + override fun containsValue(value: V): Boolean = values.contains(value)
  58 +
  59 + override fun isEmpty(): Boolean {
  60 + cleanup()
  61 + return map.isEmpty()
  62 + }
  63 +
  64 + private data class MutableEntry<K, V>(override val key: K, override var value: V) : MutableMap.MutableEntry<K, V> {
  65 + override fun setValue(newValue: V): V {
  66 + val old = this.value
  67 + this.value = newValue
  68 + return old
  69 + }
  70 + }
  71 +
  72 + override val entries: MutableSet<MutableMap.MutableEntry<K, V>>
  73 + get() {
  74 + cleanup()
  75 + return map.entries
  76 + .map { (key, value) -> MutableEntry(key, value.value) }
  77 + .toMutableSet()
  78 + }
  79 +
  80 + override val keys: MutableSet<K>
  81 + get() {
  82 + cleanup()
  83 + return map.keys
  84 + }
  85 +
  86 + override val values: MutableCollection<V>
  87 + get() {
  88 + cleanup()
  89 + return map.values
  90 + .map { (value, _) -> value }
  91 + .toMutableList()
  92 + }
  93 +
  94 + override fun clear() {
  95 + map.clear()
  96 + }
  97 +
  98 + override fun put(key: K, value: V): V? {
  99 + val now = getNow()
  100 + val ttlMs = ttl.toLong(DurationUnit.MILLISECONDS)
  101 + if (now - lastCleanup > ttlMs / 2) {
  102 + cleanup()
  103 + }
  104 +
  105 + val expiresAt = now + ttlMs
  106 +
  107 + map[key] = TTLItem(value, expiresAt)
  108 + return value
  109 + }
  110 +
  111 + override fun putAll(from: Map<out K, V>) {
  112 + from.iterator().forEach { (key, value) ->
  113 + put(key, value)
  114 + }
  115 + }
  116 +
  117 + override fun remove(key: K): V? {
  118 + return map.remove(key)?.value
  119 + }
  120 +
  121 + fun cleanup() {
  122 + val now = getNow()
  123 +
  124 + val iterator = map.iterator()
  125 + while (iterator.hasNext()) {
  126 + val (_, entry) = iterator.next()
  127 + if (entry.expiresAt < now) {
  128 + iterator.remove()
  129 + }
  130 + }
  131 + }
  132 +
  133 + private fun getNow() = clock()
  134 +}
  1 +/*
  2 + * Copyright 2025 LiveKit, Inc.
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +
  17 +package io.livekit.android.webrtc
  18 +
  19 +import java.nio.ByteBuffer
  20 +import java.util.Deque
  21 +import java.util.LinkedList
  22 +
  23 +/** @suppress */
  24 +data class DataPacketItem(val data: ByteBuffer, val sequence: Int)
  25 +
  26 +/** @suppress */
  27 +class DataPacketBuffer(private val extraCapacity: Long = 0L) {
  28 + private val buffer: Deque<DataPacketItem> = LinkedList()
  29 + private var totalSize = 0L
  30 +
  31 + @Synchronized
  32 + fun clear() {
  33 + buffer.clear()
  34 + totalSize = 0L
  35 + }
  36 +
  37 + @Synchronized
  38 + fun queue(item: DataPacketItem) {
  39 + buffer.add(item)
  40 + totalSize += item.data.capacity()
  41 + }
  42 +
  43 + @Synchronized
  44 + fun dequeue(): DataPacketItem? {
  45 + if (buffer.isEmpty()) {
  46 + return null
  47 + }
  48 + val item = buffer.removeFirst()
  49 + totalSize -= item.data.capacity()
  50 + return item
  51 + }
  52 +
  53 + @Synchronized
  54 + fun getAll(): List<DataPacketItem> {
  55 + return buffer.toList()
  56 + }
  57 +
  58 + /**
  59 + * Pops until [sequence] (inclusive).
  60 + */
  61 + @Synchronized
  62 + fun popToSequence(sequence: Int): List<DataPacketItem> {
  63 + val retList = mutableListOf<DataPacketItem>()
  64 + while (buffer.isNotEmpty()) {
  65 + val first = buffer.first
  66 + if (first.sequence <= sequence) {
  67 + val item = dequeue()
  68 + retList.add(item!!)
  69 + } else {
  70 + break
  71 + }
  72 + }
  73 +
  74 + return retList
  75 + }
  76 +
  77 + @Synchronized
  78 + fun trim(size: Long) {
  79 + while (buffer.isNotEmpty() && totalSize > size + extraCapacity) {
  80 + dequeue()
  81 + }
  82 + }
  83 +
  84 + /**
  85 + * Returns the total byte size of the items in the buffer.
  86 + */
  87 + @Synchronized
  88 + fun byteSize() = totalSize
  89 +
  90 + /**
  91 + * Returns the number of items in the buffer
  92 + */
  93 + @Synchronized
  94 + fun size() = buffer.size
  95 +}
@@ -133,6 +133,7 @@ object TestData { @@ -133,6 +133,7 @@ object TestData {
133 forceRelay = LivekitModels.ClientConfigSetting.ENABLED 133 forceRelay = LivekitModels.ClientConfigSetting.ENABLED
134 build() 134 build()
135 } 135 }
  136 + lastMessageSeq = 1
136 build() 137 build()
137 } 138 }
138 build() 139 build()
@@ -73,6 +73,7 @@ class ProtoConverterTest( @@ -73,6 +73,7 @@ class ProtoConverterTest(
73 LivekitRtc.SessionDescription::class.java, 73 LivekitRtc.SessionDescription::class.java,
74 SessionDescription::class.java, 74 SessionDescription::class.java,
75 mapping = mapOf("sdp" to "description"), 75 mapping = mapOf("sdp" to "description"),
  76 + whitelist = listOf("id"),
76 ), 77 ),
77 ) 78 )
78 79
1 /* 1 /*
2 - * Copyright 2023-2024 LiveKit, Inc. 2 + * Copyright 2023-2025 LiveKit, Inc.
3 * 3 *
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
@@ -16,7 +16,9 @@ @@ -16,7 +16,9 @@
16 16
17 package io.livekit.android.room 17 package io.livekit.android.room
18 18
  19 +import io.livekit.android.room.track.DataPublishReliability
19 import io.livekit.android.test.MockE2ETest 20 import io.livekit.android.test.MockE2ETest
  21 +import io.livekit.android.test.mock.MockDataChannel
20 import io.livekit.android.test.mock.TestData 22 import io.livekit.android.test.mock.TestData
21 import io.livekit.android.test.mock.room.track.createMockLocalAudioTrack 23 import io.livekit.android.test.mock.room.track.createMockLocalAudioTrack
22 import io.livekit.android.test.util.toPBByteString 24 import io.livekit.android.test.util.toPBByteString
@@ -25,6 +27,7 @@ import livekit.LivekitRtc @@ -25,6 +27,7 @@ import livekit.LivekitRtc
25 import livekit.org.webrtc.PeerConnection 27 import livekit.org.webrtc.PeerConnection
26 import org.junit.Assert 28 import org.junit.Assert
27 import org.junit.Assert.assertEquals 29 import org.junit.Assert.assertEquals
  30 +import org.junit.Assert.assertTrue
28 import org.junit.Test 31 import org.junit.Test
29 import org.junit.runner.RunWith 32 import org.junit.runner.RunWith
30 import org.robolectric.RobolectricTestRunner 33 import org.robolectric.RobolectricTestRunner
@@ -77,6 +80,28 @@ class RoomReconnectionMockE2ETest : MockE2ETest() { @@ -77,6 +80,28 @@ class RoomReconnectionMockE2ETest : MockE2ETest() {
77 } 80 }
78 81
79 @Test 82 @Test
  83 + fun softReconnectResendsPackets() = runTest {
  84 + room.setReconnectionType(ReconnectType.FORCE_SOFT_RECONNECT)
  85 +
  86 + connect()
  87 +
  88 + for (i in 1..5) {
  89 + assertTrue(room.localParticipant.publishData(ByteArray(i), reliability = DataPublishReliability.RELIABLE).isSuccess)
  90 + }
  91 + disconnectPeerConnection()
  92 + // Wait so that the reconnect job properly starts first.
  93 + testScheduler.advanceTimeBy(1000)
  94 + reconnectWebsocket()
  95 + connectPeerConnection()
  96 +
  97 + testScheduler.advanceUntilIdle()
  98 + val pubPeerConnection = getPublisherPeerConnection()
  99 + val pubDataChannel = pubPeerConnection.dataChannels[RTCEngine.RELIABLE_DATA_CHANNEL_LABEL] as MockDataChannel
  100 +
  101 + assertEquals(5, pubDataChannel.sentBuffers.size)
  102 + }
  103 +
  104 + @Test
80 fun softReconnectConfiguration() = runTest { 105 fun softReconnectConfiguration() = runTest {
81 room.setReconnectionType(ReconnectType.FORCE_SOFT_RECONNECT) 106 room.setReconnectionType(ReconnectType.FORCE_SOFT_RECONNECT)
82 connect() 107 connect()
@@ -20,10 +20,12 @@ import android.Manifest @@ -20,10 +20,12 @@ import android.Manifest
20 import android.app.Application 20 import android.app.Application
21 import android.content.Context 21 import android.content.Context
22 import androidx.test.core.app.ApplicationProvider 22 import androidx.test.core.app.ApplicationProvider
  23 +import com.google.protobuf.ByteString
23 import io.livekit.android.audio.AudioProcessorInterface 24 import io.livekit.android.audio.AudioProcessorInterface
24 import io.livekit.android.events.ParticipantEvent 25 import io.livekit.android.events.ParticipantEvent
25 import io.livekit.android.events.RoomEvent 26 import io.livekit.android.events.RoomEvent
26 import io.livekit.android.room.DefaultsManager 27 import io.livekit.android.room.DefaultsManager
  28 +import io.livekit.android.room.RTCEngine
27 import io.livekit.android.room.track.LocalVideoTrack 29 import io.livekit.android.room.track.LocalVideoTrack
28 import io.livekit.android.room.track.LocalVideoTrackOptions 30 import io.livekit.android.room.track.LocalVideoTrackOptions
29 import io.livekit.android.room.track.Track 31 import io.livekit.android.room.track.Track
@@ -35,6 +37,7 @@ import io.livekit.android.test.assert.assertIsClassList @@ -35,6 +37,7 @@ import io.livekit.android.test.assert.assertIsClassList
35 import io.livekit.android.test.coroutines.toListUntilSignal 37 import io.livekit.android.test.coroutines.toListUntilSignal
36 import io.livekit.android.test.events.EventCollector 38 import io.livekit.android.test.events.EventCollector
37 import io.livekit.android.test.mock.MockAudioProcessingController 39 import io.livekit.android.test.mock.MockAudioProcessingController
  40 +import io.livekit.android.test.mock.MockDataChannel
38 import io.livekit.android.test.mock.MockEglBase 41 import io.livekit.android.test.mock.MockEglBase
39 import io.livekit.android.test.mock.MockVideoCapturer 42 import io.livekit.android.test.mock.MockVideoCapturer
40 import io.livekit.android.test.mock.MockVideoStreamTrack 43 import io.livekit.android.test.mock.MockVideoStreamTrack
@@ -55,6 +58,7 @@ import kotlinx.coroutines.test.StandardTestDispatcher @@ -55,6 +58,7 @@ import kotlinx.coroutines.test.StandardTestDispatcher
55 import kotlinx.coroutines.test.advanceUntilIdle 58 import kotlinx.coroutines.test.advanceUntilIdle
56 import livekit.LivekitModels 59 import livekit.LivekitModels
57 import livekit.LivekitModels.AudioTrackFeature 60 import livekit.LivekitModels.AudioTrackFeature
  61 +import livekit.LivekitModels.DataPacket
58 import livekit.LivekitRtc 62 import livekit.LivekitRtc
59 import livekit.LivekitRtc.SubscribedCodec 63 import livekit.LivekitRtc.SubscribedCodec
60 import livekit.LivekitRtc.SubscribedQuality 64 import livekit.LivekitRtc.SubscribedQuality
@@ -647,4 +651,22 @@ class LocalParticipantMockE2ETest : MockE2ETest() { @@ -647,4 +651,22 @@ class LocalParticipantMockE2ETest : MockE2ETest() {
647 assertTrue(collectedList[1]) 651 assertTrue(collectedList[1])
648 assertFalse(collectedList[2]) 652 assertFalse(collectedList[2])
649 } 653 }
  654 +
  655 + @Test
  656 + fun publishData() = runTest {
  657 + connect()
  658 + val pubPeerConnection = getPublisherPeerConnection()
  659 + val pubDataChannel = pubPeerConnection.dataChannels[RTCEngine.RELIABLE_DATA_CHANNEL_LABEL] as MockDataChannel
  660 +
  661 + val data = "hello".toByteArray()
  662 + assertTrue(room.localParticipant.publishData(data).isSuccess)
  663 +
  664 + assertEquals(1, pubDataChannel.sentBuffers.size)
  665 +
  666 + val headerPacket = DataPacket.parseFrom(ByteString.copyFrom(pubDataChannel.sentBuffers[0].data))
  667 + assertEquals(1, headerPacket.sequence)
  668 + assertTrue(headerPacket.hasUser())
  669 +
  670 + assertTrue(headerPacket.user.payload.toByteArray().contentEquals(data))
  671 + }
650 } 672 }
  1 +/*
  2 + * Copyright 2025 LiveKit, Inc.
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +
  17 +package io.livekit.android.room.webrtc
  18 +
  19 +import io.livekit.android.test.BaseTest
  20 +import io.livekit.android.webrtc.DataPacketBuffer
  21 +import io.livekit.android.webrtc.DataPacketItem
  22 +import org.junit.Assert.assertEquals
  23 +import org.junit.Assert.assertNotNull
  24 +import org.junit.Before
  25 +import org.junit.Test
  26 +import java.nio.ByteBuffer
  27 +
  28 +class DataPacketBufferTest : BaseTest() {
  29 +
  30 + lateinit var buffer: DataPacketBuffer
  31 +
  32 + @Before
  33 + fun setup() {
  34 + buffer = DataPacketBuffer(5)
  35 + }
  36 +
  37 + fun fillWithTestValues() {
  38 + for (i in 1..5) {
  39 + val bytes = ByteArray(i)
  40 + buffer.queue(DataPacketItem(ByteBuffer.wrap(bytes), i))
  41 + }
  42 + }
  43 +
  44 + @Test
  45 + fun queue() {
  46 + fillWithTestValues()
  47 + assertEquals(15, buffer.byteSize())
  48 + assertEquals(5, buffer.size())
  49 + }
  50 +
  51 + @Test
  52 + fun dequeue() {
  53 + fillWithTestValues()
  54 + val list = mutableListOf<DataPacketItem>()
  55 + for (i in 1..5) {
  56 + val item = buffer.dequeue()
  57 + assertNotNull(item)
  58 +
  59 + list.add(item!!)
  60 + }
  61 +
  62 + list.forEachIndexed { index, item ->
  63 + assertEquals(index + 1, item.sequence)
  64 + assertEquals(index + 1, item.data.capacity())
  65 + }
  66 + }
  67 +
  68 + @Test
  69 + fun clear() {
  70 + fillWithTestValues()
  71 + buffer.clear()
  72 + assertEquals(0, buffer.byteSize())
  73 + assertEquals(0, buffer.size())
  74 + }
  75 +
  76 + @Test
  77 + fun getAll() {
  78 + fillWithTestValues()
  79 + val list = buffer.getAll()
  80 +
  81 + assertEquals(5, list.size)
  82 + list.forEachIndexed { index, item ->
  83 + assertEquals(index + 1, item.sequence)
  84 + assertEquals(index + 1, item.data.capacity())
  85 + }
  86 + }
  87 +
  88 + @Test
  89 + fun trim() {
  90 + fillWithTestValues()
  91 + buffer.trim(9)
  92 + // extra capacity is set to 5, so only the 1st packet is dropped.
  93 + assertEquals(14, buffer.byteSize())
  94 + assertEquals(4, buffer.size())
  95 + }
  96 +
  97 + @Test
  98 + fun popToSequence() {
  99 + fillWithTestValues()
  100 + buffer.popToSequence(3)
  101 +
  102 + assertEquals(9, buffer.byteSize())
  103 + assertEquals(2, buffer.size())
  104 + }
  105 +}
  1 +/*
  2 + * Copyright 2025 LiveKit, Inc.
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +
  17 +package io.livekit.android.util
  18 +
  19 +import io.livekit.android.test.BaseTest
  20 +import org.junit.Assert.assertEquals
  21 +import org.junit.Assert.assertFalse
  22 +import org.junit.Assert.assertNull
  23 +import org.junit.Assert.assertTrue
  24 +import org.junit.Before
  25 +import org.junit.Test
  26 +import kotlin.time.Duration.Companion.milliseconds
  27 +
  28 +class TTLMapTest : BaseTest() {
  29 + lateinit var map: TTLMap<String, String>
  30 + var time = 0L
  31 +
  32 + @Before
  33 + fun setup() {
  34 + map = TTLMap(TTL, { time })
  35 + time = 0L
  36 + }
  37 +
  38 + fun expire() {
  39 + time += 1 + TTL.inWholeMilliseconds
  40 + }
  41 +
  42 + @Test
  43 + fun getNormal() {
  44 + map[KEY] = VALUE
  45 + assertEquals(map[KEY], VALUE)
  46 + }
  47 +
  48 + @Test
  49 + fun getExpired() {
  50 + map[KEY] = VALUE
  51 + expire()
  52 + assertNull("map key was not deleted!", map[KEY])
  53 + }
  54 +
  55 + @Test
  56 + fun isEmptyExpired() {
  57 + map[KEY] = VALUE
  58 + expire()
  59 + assertTrue(map.isEmpty())
  60 + }
  61 +
  62 + @Test
  63 + fun containsKeyNormal() {
  64 + map[KEY] = VALUE
  65 + assertTrue(map.containsKey(KEY))
  66 + }
  67 +
  68 + @Test
  69 + fun containsKeyExpired() {
  70 + map[KEY] = VALUE
  71 + expire()
  72 + assertFalse(map.containsKey(KEY))
  73 + }
  74 +
  75 + @Test
  76 + fun containsValueNormal() {
  77 + map[KEY] = VALUE
  78 + assertTrue(map.containsValue(VALUE))
  79 + }
  80 +
  81 + @Test
  82 + fun containsValueExpired() {
  83 + map[KEY] = VALUE
  84 + expire()
  85 + assertFalse(map.containsValue(VALUE))
  86 + }
  87 +
  88 + @Test
  89 + fun clear() {
  90 + map[KEY] = VALUE
  91 + map.clear()
  92 + assertTrue(map.isEmpty())
  93 + }
  94 +
  95 + @Test
  96 + fun remove() {
  97 + map[KEY] = VALUE
  98 + map.remove(KEY)
  99 + assertFalse(map.containsKey(KEY))
  100 + }
  101 +
  102 + @Test
  103 + fun cleanup() {
  104 + map[KEY] = VALUE
  105 + map.cleanup()
  106 + assertEquals(map[KEY], VALUE)
  107 + }
  108 +
  109 + @Test
  110 + fun cleanupExpired() {
  111 + map[KEY] = VALUE
  112 + expire()
  113 + map.cleanup()
  114 + assertTrue(map.isEmpty())
  115 + }
  116 +
  117 + @Test
  118 + fun sizeNormal() {
  119 + map[KEY] = VALUE
  120 + assertEquals(1, map.size)
  121 + }
  122 +
  123 + @Test
  124 + fun sizeExpired() {
  125 + map[KEY] = VALUE
  126 + expire()
  127 + assertEquals(0, map.size)
  128 + }
  129 +
  130 + companion object {
  131 + val TTL = 1000.milliseconds
  132 +
  133 + val KEY = "hello"
  134 + val VALUE = "world"
  135 + }
  136 +}
1 -Subproject commit 499c17c48063582ac2af0a021827fab18356cc29 1 +Subproject commit 7276243574cd9bd8a621700d2f696cda5fd1b6bd