davidliu
Committed by GitHub

Properly trim data packet buffer as buffered amount changes (#743)

@@ -37,6 +37,7 @@ import io.livekit.android.util.Either @@ -37,6 +37,7 @@ import io.livekit.android.util.Either
37 import io.livekit.android.util.FlowObservable 37 import io.livekit.android.util.FlowObservable
38 import io.livekit.android.util.LKLog 38 import io.livekit.android.util.LKLog
39 import io.livekit.android.util.TTLMap 39 import io.livekit.android.util.TTLMap
  40 +import io.livekit.android.util.flow
40 import io.livekit.android.util.flowDelegate 41 import io.livekit.android.util.flowDelegate
41 import io.livekit.android.util.nullSafe 42 import io.livekit.android.util.nullSafe
42 import io.livekit.android.util.withCheckLock 43 import io.livekit.android.util.withCheckLock
@@ -170,6 +171,7 @@ internal constructor( @@ -170,6 +171,7 @@ internal constructor(
170 private var lossyDataChannel: DataChannel? = null 171 private var lossyDataChannel: DataChannel? = null
171 private var lossyDataChannelSub: DataChannel? = null 172 private var lossyDataChannelSub: DataChannel? = null
172 private var reliableDataChannelManager: DataChannelManager? = null 173 private var reliableDataChannelManager: DataChannelManager? = null
  174 + private var reliableBufferedAmountJob: Job? = null
173 private var reliableDataChannelSubManager: DataChannelManager? = null 175 private var reliableDataChannelSubManager: DataChannelManager? = null
174 private var lossyDataChannelManager: DataChannelManager? = null 176 private var lossyDataChannelManager: DataChannelManager? = null
175 private var lossyDataChannelSubManager: DataChannelManager? = null 177 private var lossyDataChannelSubManager: DataChannelManager? = null
@@ -313,8 +315,18 @@ internal constructor( @@ -313,8 +315,18 @@ internal constructor(
313 RELIABLE_DATA_CHANNEL_LABEL, 315 RELIABLE_DATA_CHANNEL_LABEL,
314 reliableInit, 316 reliableInit,
315 ).also { dataChannel -> 317 ).also { dataChannel ->
316 - reliableDataChannelManager = DataChannelManager(dataChannel, DataChannelObserver(dataChannel))  
317 - dataChannel.registerObserver(reliableDataChannelManager) 318 +
  319 + val dataChannelManager = DataChannelManager(dataChannel, DataChannelObserver(dataChannel))
  320 + reliableDataChannelManager = dataChannelManager
  321 + dataChannel.registerObserver(dataChannelManager)
  322 + reliableBufferedAmountJob?.cancel()
  323 + reliableBufferedAmountJob = coroutineScope.launch {
  324 + dataChannelManager::bufferedAmount.flow.collect { bufferedAmount ->
  325 + synchronized(reliableStateLock) {
  326 + reliableMessageBuffer.trim(bufferedAmount)
  327 + }
  328 + }
  329 + }
318 } 330 }
319 } 331 }
320 332
@@ -430,6 +442,8 @@ internal constructor( @@ -430,6 +442,8 @@ internal constructor(
430 subscriber?.closeBlocking() 442 subscriber?.closeBlocking()
431 subscriber = null 443 subscriber = null
432 444
  445 + reliableBufferedAmountJob?.cancel()
  446 + reliableBufferedAmountJob = null
433 reliableDataChannelManager?.dispose() 447 reliableDataChannelManager?.dispose()
434 reliableDataChannelManager = null 448 reliableDataChannelManager = null
435 reliableDataChannel = null 449 reliableDataChannel = null
@@ -918,7 +932,7 @@ internal constructor( @@ -918,7 +932,7 @@ internal constructor(
918 private const val MAX_RECONNECT_TIMEOUT = 60 * 1000 932 private const val MAX_RECONNECT_TIMEOUT = 60 * 1000
919 private const val MAX_ICE_CONNECT_TIMEOUT_MS = 20000 933 private const val MAX_ICE_CONNECT_TIMEOUT_MS = 20000
920 934
921 - private const val DATA_CHANNEL_LOW_THRESHOLD = 2 * 1024 * 1024 // 64 KB 935 + private const val DATA_CHANNEL_LOW_THRESHOLD = 2 * 1024 * 1024 // 2 MB
922 936
923 private val RELIABLE_RECEIVE_STATE_TTL_MS = 30.seconds 937 private val RELIABLE_RECEIVE_STATE_TTL_MS = 30.seconds
924 private val RELIABLE_RETRY_AMOUNT = (DATA_CHANNEL_LOW_THRESHOLD * 1.25).toLong() 938 private val RELIABLE_RETRY_AMOUNT = (DATA_CHANNEL_LOW_THRESHOLD * 1.25).toLong()