davidliu
Committed by GitHub

Listen directly to connectionState changes instead of polling (#465)

@@ -16,7 +16,10 @@ @@ -16,7 +16,10 @@
16 16
17 package io.livekit.android.room 17 package io.livekit.android.room
18 18
  19 +import io.livekit.android.room.util.PeerConnectionStateObservable
  20 +import io.livekit.android.util.FlowObservable
19 import io.livekit.android.util.LKLog 21 import io.livekit.android.util.LKLog
  22 +import io.livekit.android.util.flowDelegate
20 import io.livekit.android.webrtc.peerconnection.executeOnRTCThread 23 import io.livekit.android.webrtc.peerconnection.executeOnRTCThread
21 import livekit.LivekitRtc 24 import livekit.LivekitRtc
22 import livekit.org.webrtc.CandidatePairChangeEvent 25 import livekit.org.webrtc.CandidatePairChangeEvent
@@ -31,9 +34,14 @@ import livekit.org.webrtc.SessionDescription @@ -31,9 +34,14 @@ import livekit.org.webrtc.SessionDescription
31 internal class PublisherTransportObserver( 34 internal class PublisherTransportObserver(
32 private val engine: RTCEngine, 35 private val engine: RTCEngine,
33 private val client: SignalClient, 36 private val client: SignalClient,
34 -) : PeerConnection.Observer, PeerConnectionTransport.Listener { 37 +) : PeerConnection.Observer, PeerConnectionTransport.Listener, PeerConnectionStateObservable {
35 38
36 - var connectionChangeListener: ((newState: PeerConnection.PeerConnectionState) -> Unit)? = null 39 + var connectionChangeListener: PeerConnectionStateListener? = null
  40 +
  41 + @FlowObservable
  42 + @get:FlowObservable
  43 + override var connectionState by flowDelegate(PeerConnection.PeerConnectionState.NEW)
  44 + private set
37 45
38 override fun onIceCandidate(iceCandidate: IceCandidate?) { 46 override fun onIceCandidate(iceCandidate: IceCandidate?) {
39 executeOnRTCThread { 47 executeOnRTCThread {
@@ -66,6 +74,7 @@ internal class PublisherTransportObserver( @@ -66,6 +74,7 @@ internal class PublisherTransportObserver(
66 executeOnRTCThread { 74 executeOnRTCThread {
67 LKLog.v { "onConnection new state: $newState" } 75 LKLog.v { "onConnection new state: $newState" }
68 connectionChangeListener?.invoke(newState) 76 connectionChangeListener?.invoke(newState)
  77 + connectionState = newState
69 } 78 }
70 } 79 }
71 80
@@ -29,6 +29,7 @@ import io.livekit.android.room.track.TrackException @@ -29,6 +29,7 @@ import io.livekit.android.room.track.TrackException
29 import io.livekit.android.room.util.MediaConstraintKeys 29 import io.livekit.android.room.util.MediaConstraintKeys
30 import io.livekit.android.room.util.createAnswer 30 import io.livekit.android.room.util.createAnswer
31 import io.livekit.android.room.util.setLocalDescription 31 import io.livekit.android.room.util.setLocalDescription
  32 +import io.livekit.android.room.util.waitUntilConnected
32 import io.livekit.android.util.CloseableCoroutineScope 33 import io.livekit.android.util.CloseableCoroutineScope
33 import io.livekit.android.util.Either 34 import io.livekit.android.util.Either
34 import io.livekit.android.util.FlowObservable 35 import io.livekit.android.util.FlowObservable
@@ -49,10 +50,12 @@ import kotlinx.coroutines.SupervisorJob @@ -49,10 +50,12 @@ import kotlinx.coroutines.SupervisorJob
49 import kotlinx.coroutines.coroutineScope 50 import kotlinx.coroutines.coroutineScope
50 import kotlinx.coroutines.delay 51 import kotlinx.coroutines.delay
51 import kotlinx.coroutines.ensureActive 52 import kotlinx.coroutines.ensureActive
  53 +import kotlinx.coroutines.joinAll
52 import kotlinx.coroutines.launch 54 import kotlinx.coroutines.launch
53 import kotlinx.coroutines.runBlocking 55 import kotlinx.coroutines.runBlocking
54 import kotlinx.coroutines.sync.Mutex 56 import kotlinx.coroutines.sync.Mutex
55 import kotlinx.coroutines.sync.withLock 57 import kotlinx.coroutines.sync.withLock
  58 +import kotlinx.coroutines.withTimeoutOrNull
56 import kotlinx.coroutines.yield 59 import kotlinx.coroutines.yield
57 import livekit.LivekitModels 60 import livekit.LivekitModels
58 import livekit.LivekitModels.AudioTrackFeature 61 import livekit.LivekitModels.AudioTrackFeature
@@ -66,6 +69,7 @@ import livekit.org.webrtc.MediaConstraints @@ -66,6 +69,7 @@ import livekit.org.webrtc.MediaConstraints
66 import livekit.org.webrtc.MediaStream 69 import livekit.org.webrtc.MediaStream
67 import livekit.org.webrtc.MediaStreamTrack 70 import livekit.org.webrtc.MediaStreamTrack
68 import livekit.org.webrtc.PeerConnection 71 import livekit.org.webrtc.PeerConnection
  72 +import livekit.org.webrtc.PeerConnection.PeerConnectionState
69 import livekit.org.webrtc.PeerConnection.RTCConfiguration 73 import livekit.org.webrtc.PeerConnection.RTCConfiguration
70 import livekit.org.webrtc.RTCStatsCollectorCallback 74 import livekit.org.webrtc.RTCStatsCollectorCallback
71 import livekit.org.webrtc.RTCStatsReport 75 import livekit.org.webrtc.RTCStatsReport
@@ -246,7 +250,7 @@ internal constructor( @@ -246,7 +250,7 @@ internal constructor(
246 null, 250 null,
247 ) 251 )
248 252
249 - val connectionStateListener: (PeerConnection.PeerConnectionState) -> Unit = { newState -> 253 + val connectionStateListener: PeerConnectionStateListener = { newState ->
250 LKLog.v { "onIceConnection new state: $newState" } 254 LKLog.v { "onIceConnection new state: $newState" }
251 if (newState.isConnected()) { 255 if (newState.isConnected()) {
252 connectionState = ConnectionState.CONNECTED 256 connectionState = ConnectionState.CONNECTED
@@ -528,31 +532,21 @@ internal constructor( @@ -528,31 +532,21 @@ internal constructor(
528 } 532 }
529 533
530 // wait until publisher ICE connected 534 // wait until publisher ICE connected
531 - val endTime = SystemClock.elapsedRealtime() + MAX_ICE_CONNECT_TIMEOUT_MS 535 + var publisherWaitJob: Job? = null
532 if (hasPublished) { 536 if (hasPublished) {
533 - while (SystemClock.elapsedRealtime() < endTime) {  
534 - if (publisher?.isConnected() == true) {  
535 - LKLog.v { "publisher reconnected to ICE" }  
536 - break  
537 - }  
538 - delay(100) 537 + publisherWaitJob = launch {
  538 + publisherObserver.waitUntilConnected()
539 } 539 }
540 } 540 }
541 541
542 - ensureActive()  
543 - if (isClosed) {  
544 - LKLog.v { "RTCEngine closed, aborting reconnection" }  
545 - break 542 + // wait until subscriber ICE connected
  543 + val subscriberWaitJob = launch {
  544 + subscriberObserver.waitUntilConnected()
546 } 545 }
547 546
548 - // wait until subscriber ICE connected  
549 - while (SystemClock.elapsedRealtime() < endTime) {  
550 - if (subscriber?.isConnected() == true) {  
551 - LKLog.v { "reconnected to ICE" }  
552 - connectionState = ConnectionState.CONNECTED  
553 - break  
554 - }  
555 - delay(100) 547 + withTimeoutOrNull(MAX_ICE_CONNECT_TIMEOUT_MS.toLong()) {
  548 + listOfNotNull(publisherWaitJob, subscriberWaitJob)
  549 + .joinAll()
556 } 550 }
557 551
558 ensureActive() 552 ensureActive()
@@ -1160,3 +1154,5 @@ fun LivekitRtc.ICEServer.toWebrtc(): PeerConnection.IceServer = PeerConnection.I @@ -1160,3 +1154,5 @@ fun LivekitRtc.ICEServer.toWebrtc(): PeerConnection.IceServer = PeerConnection.I
1160 .setTlsAlpnProtocols(emptyList()) 1154 .setTlsAlpnProtocols(emptyList())
1161 .setTlsEllipticCurves(emptyList()) 1155 .setTlsEllipticCurves(emptyList())
1162 .createIceServer() 1156 .createIceServer()
  1157 +
  1158 +typealias PeerConnectionStateListener = (PeerConnectionState) -> Unit
@@ -16,7 +16,10 @@ @@ -16,7 +16,10 @@
16 16
17 package io.livekit.android.room 17 package io.livekit.android.room
18 18
  19 +import io.livekit.android.room.util.PeerConnectionStateObservable
  20 +import io.livekit.android.util.FlowObservable
19 import io.livekit.android.util.LKLog 21 import io.livekit.android.util.LKLog
  22 +import io.livekit.android.util.flowDelegate
20 import io.livekit.android.webrtc.peerconnection.executeOnRTCThread 23 import io.livekit.android.webrtc.peerconnection.executeOnRTCThread
21 import livekit.LivekitRtc 24 import livekit.LivekitRtc
22 import livekit.org.webrtc.CandidatePairChangeEvent 25 import livekit.org.webrtc.CandidatePairChangeEvent
@@ -34,10 +37,15 @@ import livekit.org.webrtc.RtpTransceiver @@ -34,10 +37,15 @@ import livekit.org.webrtc.RtpTransceiver
34 class SubscriberTransportObserver( 37 class SubscriberTransportObserver(
35 private val engine: RTCEngine, 38 private val engine: RTCEngine,
36 private val client: SignalClient, 39 private val client: SignalClient,
37 -) : PeerConnection.Observer { 40 +) : PeerConnection.Observer, PeerConnectionStateObservable {
38 41
39 var dataChannelListener: ((DataChannel) -> Unit)? = null 42 var dataChannelListener: ((DataChannel) -> Unit)? = null
40 - var connectionChangeListener: ((PeerConnection.PeerConnectionState) -> Unit)? = null 43 + var connectionChangeListener: PeerConnectionStateListener? = null
  44 +
  45 + @FlowObservable
  46 + @get:FlowObservable
  47 + override var connectionState by flowDelegate(PeerConnection.PeerConnectionState.NEW)
  48 + private set
41 49
42 override fun onIceCandidate(candidate: IceCandidate) { 50 override fun onIceCandidate(candidate: IceCandidate) {
43 executeOnRTCThread { 51 executeOnRTCThread {
@@ -75,6 +83,7 @@ class SubscriberTransportObserver( @@ -75,6 +83,7 @@ class SubscriberTransportObserver(
75 executeOnRTCThread { 83 executeOnRTCThread {
76 LKLog.v { "onConnectionChange new state: $newState" } 84 LKLog.v { "onConnectionChange new state: $newState" }
77 connectionChangeListener?.invoke(newState) 85 connectionChangeListener?.invoke(newState)
  86 + connectionState = newState
78 } 87 }
79 } 88 }
80 89
  1 +/*
  2 + * Copyright 2024 LiveKit, Inc.
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +
  17 +package io.livekit.android.room.util
  18 +
  19 +import io.livekit.android.util.FlowObservable
  20 +import io.livekit.android.util.flow
  21 +import io.livekit.android.webrtc.isConnected
  22 +import kotlinx.coroutines.flow.collect
  23 +import kotlinx.coroutines.flow.takeWhile
  24 +import livekit.org.webrtc.PeerConnection.PeerConnectionState
  25 +
  26 +internal interface PeerConnectionStateObservable {
  27 + @FlowObservable
  28 + @get:FlowObservable
  29 + val connectionState: PeerConnectionState
  30 +}
  31 +
  32 +/**
  33 + * Waits until the connection state [PeerConnectionState.isConnected].
  34 + */
  35 +internal suspend fun PeerConnectionStateObservable.waitUntilConnected() {
  36 + this::connectionState.flow
  37 + .takeWhile {
  38 + !it.isConnected()
  39 + }
  40 + .collect()
  41 +}