David Zhao

separate onJoin and onICEConnected to eliminate race conditions. clean up after …

…tracks upon disconnect
1 -<?xml version="1.0" encoding="UTF-8"?>  
2 -<project version="4">  
3 - <component name="RunConfigurationProducerService">  
4 - <option name="ignoredProducers">  
5 - <set>  
6 - <option value="org.jetbrains.plugins.gradle.execution.test.runner.AllInPackageGradleConfigurationProducer" />  
7 - <option value="org.jetbrains.plugins.gradle.execution.test.runner.TestClassGradleConfigurationProducer" />  
8 - <option value="org.jetbrains.plugins.gradle.execution.test.runner.TestMethodGradleConfigurationProducer" />  
9 - </set>  
10 - </option>  
11 - </component>  
12 -</project>  
@@ -25,6 +25,7 @@ class PublisherTransportObserver( @@ -25,6 +25,7 @@ class PublisherTransportObserver(
25 Timber.v { "onIceConnection new state: $newState" } 25 Timber.v { "onIceConnection new state: $newState" }
26 if (state == PeerConnection.IceConnectionState.CONNECTED && !engine.iceConnected) { 26 if (state == PeerConnection.IceConnectionState.CONNECTED && !engine.iceConnected) {
27 engine.iceConnected = true 27 engine.iceConnected = true
  28 + engine.listener?.onICEConnected()
28 } else if (state == PeerConnection.IceConnectionState.FAILED) { 29 } else if (state == PeerConnection.IceConnectionState.FAILED) {
29 // when we publish tracks, some WebRTC versions will send out disconnected events periodically 30 // when we publish tracks, some WebRTC versions will send out disconnected events periodically
30 engine.iceConnected = false 31 engine.iceConnected = false
@@ -221,8 +221,7 @@ constructor( @@ -221,8 +221,7 @@ constructor(
221 } 221 }
222 222
223 if (!sent) { 223 if (!sent) {
224 - Timber.d { "error sending request: $request" }  
225 - throw IllegalStateException() 224 + Timber.e { "error sending request: $request" }
226 } 225 }
227 } 226 }
228 227
@@ -107,6 +107,7 @@ constructor( @@ -107,6 +107,7 @@ constructor(
107 107
108 interface Listener { 108 interface Listener {
109 fun onJoin(response: LivekitRtc.JoinResponse) 109 fun onJoin(response: LivekitRtc.JoinResponse)
  110 + fun onICEConnected()
110 fun onAddTrack(track: MediaStreamTrack, streams: Array<out MediaStream>) 111 fun onAddTrack(track: MediaStreamTrack, streams: Array<out MediaStream>)
111 // fun onPublishLocalTrack(cid: String, track: LivekitModels.TrackInfo) 112 // fun onPublishLocalTrack(cid: String, track: LivekitModels.TrackInfo)
112 fun onAddDataChannel(channel: DataChannel) 113 fun onAddDataChannel(channel: DataChannel)
@@ -136,7 +137,6 @@ constructor( @@ -136,7 +137,6 @@ constructor(
136 } 137 }
137 138
138 override fun onJoin(info: LivekitRtc.JoinResponse) { 139 override fun onJoin(info: LivekitRtc.JoinResponse) {
139 -  
140 val iceServers = mutableListOf<PeerConnection.IceServer>() 140 val iceServers = mutableListOf<PeerConnection.IceServer>()
141 for(serverInfo in info.iceServersList){ 141 for(serverInfo in info.iceServersList){
142 val username = serverInfo.username ?: "" 142 val username = serverInfo.username ?: ""
@@ -161,7 +161,6 @@ constructor( @@ -161,7 +161,6 @@ constructor(
161 Timber.e{" $it"} 161 Timber.e{" $it"}
162 } 162 }
163 } 163 }
164 - listener?.onJoin(info)  
165 164
166 val rtcConfig = PeerConnection.RTCConfiguration(iceServers).apply { 165 val rtcConfig = PeerConnection.RTCConfiguration(iceServers).apply {
167 sdpSemantics = PeerConnection.SdpSemantics.UNIFIED_PLAN 166 sdpSemantics = PeerConnection.SdpSemantics.UNIFIED_PLAN
@@ -194,6 +193,7 @@ constructor( @@ -194,6 +193,7 @@ constructor(
194 193
195 client.sendOffer(sdpOffer) 194 client.sendOffer(sdpOffer)
196 } 195 }
  196 + listener?.onJoin(info)
197 } 197 }
198 198
199 override fun onAnswer(sessionDescription: SessionDescription) { 199 override fun onAnswer(sessionDescription: SessionDescription) {
@@ -66,7 +66,6 @@ constructor( @@ -66,7 +66,6 @@ constructor(
66 } 66 }
67 67
68 fun disconnect() { 68 fun disconnect() {
69 - engine.close()  
70 handleDisconnect() 69 handleDisconnect()
71 } 70 }
72 71
@@ -138,6 +137,10 @@ constructor( @@ -138,6 +137,10 @@ constructor(
138 } 137 }
139 138
140 private fun handleDisconnect() { 139 private fun handleDisconnect() {
  140 + for (track in localParticipant?.tracks.values) {
  141 + track.track?.stop()
  142 + }
  143 + engine.close()
141 state = State.DISCONNECTED 144 state = State.DISCONNECTED
142 listener?.onDisconnect(this, null) 145 listener?.onDisconnect(this, null)
143 } 146 }
@@ -175,7 +178,9 @@ constructor( @@ -175,7 +178,9 @@ constructor(
175 getOrCreateRemoteParticipant(it.sid, it) 178 getOrCreateRemoteParticipant(it.sid, it)
176 } 179 }
177 } 180 }
  181 + }
178 182
  183 + override fun onICEConnected() {
179 state = State.CONNECTED 184 state = State.CONNECTED
180 connectContinuation?.resume(Unit) 185 connectContinuation?.resume(Unit)
181 connectContinuation = null 186 connectContinuation = null
@@ -20,6 +20,11 @@ class LocalVideoTrack( @@ -20,6 +20,11 @@ class LocalVideoTrack(
20 capturer.startCapture(400, 400, 30) 20 capturer.startCapture(400, 400, 30)
21 } 21 }
22 22
  23 + override fun stop() {
  24 + capturer.stopCapture()
  25 + super.stop()
  26 + }
  27 +
23 companion object { 28 companion object {
24 internal fun createTrack( 29 internal fun createTrack(
25 peerConnectionFactory: PeerConnectionFactory, 30 peerConnectionFactory: PeerConnectionFactory,
@@ -45,7 +50,6 @@ class LocalVideoTrack( @@ -45,7 +50,6 @@ class LocalVideoTrack(
45 ) 50 )
46 } 51 }
47 52
48 -  
49 private fun createVideoCapturer(context: Context): VideoCapturer? { 53 private fun createVideoCapturer(context: Context): VideoCapturer? {
50 val videoCapturer: VideoCapturer? = if (Camera2Enumerator.isSupported(context)) { 54 val videoCapturer: VideoCapturer? = if (Camera2Enumerator.isSupported(context)) {
51 createCameraCapturer(Camera2Enumerator(context)) 55 createCameraCapturer(Camera2Enumerator(context))
@@ -5,7 +5,8 @@ import org.webrtc.VideoSink @@ -5,7 +5,8 @@ import org.webrtc.VideoSink
5 import org.webrtc.VideoTrack 5 import org.webrtc.VideoTrack
6 6
7 open class VideoTrack(name: String, override val rtcTrack: VideoTrack) : 7 open class VideoTrack(name: String, override val rtcTrack: VideoTrack) :
8 - MediaTrack(name, LivekitModels.TrackType.VIDEO, rtcTrack){ 8 + MediaTrack(name, LivekitModels.TrackType.VIDEO, rtcTrack) {
  9 + private val sinks: MutableList<VideoSink> = ArrayList();
9 10
10 var enabled: Boolean 11 var enabled: Boolean
11 get() = rtcTrack.enabled() 12 get() = rtcTrack.enabled()
@@ -13,7 +14,21 @@ open class VideoTrack(name: String, override val rtcTrack: VideoTrack) : @@ -13,7 +14,21 @@ open class VideoTrack(name: String, override val rtcTrack: VideoTrack) :
13 rtcTrack.setEnabled(value) 14 rtcTrack.setEnabled(value)
14 } 15 }
15 16
16 - fun addRenderer(renderer: VideoSink) = rtcTrack.addSink(renderer) 17 + fun addRenderer(renderer: VideoSink) {
  18 + sinks.add(renderer)
  19 + rtcTrack.addSink(renderer)
  20 + }
17 21
18 - fun removeRenderer(renderer: VideoSink) = rtcTrack.addSink(renderer) 22 + fun removeRenderer(renderer: VideoSink) {
  23 + sinks.remove(renderer)
  24 + rtcTrack.addSink(renderer)
  25 + }
  26 +
  27 + override fun stop() {
  28 + for (sink in sinks) {
  29 + rtcTrack.removeSink(sink)
  30 + }
  31 + sinks.clear()
  32 + super.stop()
  33 + }
19 } 34 }