David Liu

Finish off rest of rtc engine

@@ -14,6 +14,7 @@ constructor( @@ -14,6 +14,7 @@ constructor(
14 ) { 14 ) {
15 val peerConnection: PeerConnection = connectionFactory.createPeerConnection( 15 val peerConnection: PeerConnection = connectionFactory.createPeerConnection(
16 config, 16 config,
  17 + RTCEngine.CONN_CONSTRAINTS,
17 listener 18 listener
18 ) ?: throw IllegalStateException("peer connection creation failed?") 19 ) ?: throw IllegalStateException("peer connection creation failed?")
19 val pendingCandidates = mutableListOf<IceCandidate>() 20 val pendingCandidates = mutableListOf<IceCandidate>()
@@ -107,11 +107,6 @@ constructor( @@ -107,11 +107,6 @@ constructor(
107 } 107 }
108 } 108 }
109 109
110 - if (sdpOffer == null) {  
111 - Timber.d { "sdp is missing during negotiation?" }  
112 - return@launch  
113 - }  
114 -  
115 val setObserver = CoroutineSdpObserver() 110 val setObserver = CoroutineSdpObserver()
116 publisher.peerConnection.setLocalDescription(setObserver, sdpOffer) 111 publisher.peerConnection.setLocalDescription(setObserver, sdpOffer)
117 val setOutcome = setObserver.awaitSet() 112 val setOutcome = setObserver.awaitSet()
@@ -136,7 +131,7 @@ constructor( @@ -136,7 +131,7 @@ constructor(
136 fun onAddTrack(track: MediaStreamTrack, streams: Array<out MediaStream>) 131 fun onAddTrack(track: MediaStreamTrack, streams: Array<out MediaStream>)
137 fun onPublishLocalTrack(cid: String, track: Model.TrackInfo) 132 fun onPublishLocalTrack(cid: String, track: Model.TrackInfo)
138 fun onAddDataChannel(channel: DataChannel) 133 fun onAddDataChannel(channel: DataChannel)
139 - fun onUpdateParticipants(updates: Array<out Model.ParticipantInfo>) 134 + fun onUpdateParticipants(updates: List<Model.ParticipantInfo>)
140 fun onUpdateSpeakers(speakers: List<Rtc.SpeakerInfo>) 135 fun onUpdateSpeakers(speakers: List<Rtc.SpeakerInfo>)
141 fun onDisconnect(reason: String) 136 fun onDisconnect(reason: String)
142 fun onFailToConnect(error: Error) 137 fun onFailToConnect(error: Error)
@@ -154,7 +149,7 @@ constructor( @@ -154,7 +149,7 @@ constructor(
154 149
155 private val MEDIA_CONSTRAINTS = MediaConstraints() 150 private val MEDIA_CONSTRAINTS = MediaConstraints()
156 151
157 - private val CONN_CONSTRAINTS = MediaConstraints().apply { 152 + internal val CONN_CONSTRAINTS = MediaConstraints().apply {
158 with(optional) { 153 with(optional) {
159 add(MediaConstraints.KeyValuePair("DtlsSrtpKeyAgreement", "true")) 154 add(MediaConstraints.KeyValuePair("DtlsSrtpKeyAgreement", "true"))
160 } 155 }
@@ -167,40 +162,87 @@ constructor( @@ -167,40 +162,87 @@ constructor(
167 coroutineScope.launch { 162 coroutineScope.launch {
168 val offerObserver = CoroutineSdpObserver() 163 val offerObserver = CoroutineSdpObserver()
169 publisher.peerConnection.createOffer(offerObserver, OFFER_CONSTRAINTS) 164 publisher.peerConnection.createOffer(offerObserver, OFFER_CONSTRAINTS)
170 - val offerOutcome = offerObserver.awaitCreate()  
171 - val sdpOffer = when (offerOutcome) {  
172 - is Either.Left -> offerOutcome.value 165 + val sdpOffer = when (val outcome = offerObserver.awaitCreate()) {
  166 + is Either.Left -> outcome.value
173 is Either.Right -> { 167 is Either.Right -> {
174 - Timber.d { "error creating offer: ${offerOutcome.value}" } 168 + Timber.d { "error creating offer: ${outcome.value}" }
175 return@launch 169 return@launch
176 } 170 }
177 } 171 }
178 172
179 - if (sdpOffer == null) {  
180 - Timber.d { "sdp is missing during negotiation?" }  
181 - return@launch  
182 - }  
183 -  
184 val setObserver = CoroutineSdpObserver() 173 val setObserver = CoroutineSdpObserver()
185 publisher.peerConnection.setLocalDescription(setObserver, sdpOffer) 174 publisher.peerConnection.setLocalDescription(setObserver, sdpOffer)
186 - val setOutcome = setObserver.awaitSet()  
187 - when (setOutcome) { 175 + when (val outcome = setObserver.awaitSet()) {
188 is Either.Left -> client.sendOffer(sdpOffer) 176 is Either.Left -> client.sendOffer(sdpOffer)
189 - is Either.Right -> Timber.d { "error setting local description: ${setOutcome.value}" } 177 + is Either.Right -> Timber.d { "error setting local description: ${outcome.value}" }
190 } 178 }
191 } 179 }
192 } 180 }
193 181
194 override fun onAnswer(sessionDescription: SessionDescription) { 182 override fun onAnswer(sessionDescription: SessionDescription) {
195 - TODO("Not yet implemented") 183 + Timber.v { "received server answer: ${sessionDescription.type}, ${publisher.peerConnection.signalingState()}" }
  184 + val observer = CoroutineSdpObserver()
  185 + publisher.peerConnection.setRemoteDescription(observer, sessionDescription)
  186 + coroutineScope.launch {
  187 + when (val outcome = observer.awaitSet()) {
  188 + is Either.Left -> {
  189 + if (!rtcConnected) {
  190 + onRTCConnected()
  191 + }
  192 + }
  193 + is Either.Right -> {
  194 + Timber.e { "error setting remote description for answer: ${outcome.value} " }
  195 + }
  196 + }
  197 + }
196 } 198 }
197 199
198 override fun onOffer(sessionDescription: SessionDescription) { 200 override fun onOffer(sessionDescription: SessionDescription) {
199 - TODO("Not yet implemented") 201 + Timber.v { "received server offer: ${sessionDescription.type}, ${subscriber.peerConnection.signalingState()}" }
  202 + coroutineScope.launch {
  203 + run<Unit> {
  204 + val observer = CoroutineSdpObserver()
  205 + subscriber.peerConnection.setRemoteDescription(observer, sessionDescription)
  206 + when (val outcome = observer.awaitSet()) {
  207 + is Either.Right -> {
  208 + Timber.e { "error setting remote description for answer: ${outcome.value} " }
  209 + return@launch
  210 + }
  211 + }
  212 + }
  213 +
  214 + val answer = run {
  215 + val observer = CoroutineSdpObserver()
  216 + subscriber.peerConnection.createAnswer(observer, OFFER_CONSTRAINTS)
  217 + when (val outcome = observer.awaitCreate()) {
  218 + is Either.Left -> outcome.value
  219 + is Either.Right -> {
  220 + Timber.e { "error creating answer: ${outcome.value}" }
  221 + return@launch
  222 + }
  223 + }
  224 + }
  225 +
  226 + run<Unit> {
  227 + val observer = CoroutineSdpObserver()
  228 + subscriber.peerConnection.setLocalDescription(observer, answer)
  229 + when (val outcome = observer.awaitCreate()) {
  230 + is Either.Left -> client.sendAnswer(answer)
  231 + is Either.Right -> {
  232 + Timber.e { "error setting local description for answer: ${outcome.value}" }
  233 + }
  234 + }
  235 + }
  236 + }
200 } 237 }
201 238
202 override fun onTrickle(candidate: IceCandidate, target: Rtc.SignalTarget) { 239 override fun onTrickle(candidate: IceCandidate, target: Rtc.SignalTarget) {
203 - TODO("Not yet implemented") 240 + Timber.v { "received ice candidate from peer: $candidate, $target" }
  241 + when (target) {
  242 + Rtc.SignalTarget.PUBLISHER -> publisher.addIceCandidate(candidate)
  243 + Rtc.SignalTarget.SUBSCRIBER -> publisher.addIceCandidate(candidate)
  244 + else -> Timber.i { "unknown ice candidate target?" }
  245 + }
204 } 246 }
205 247
206 override fun onLocalTrackPublished(response: Rtc.TrackPublishedResponse) { 248 override fun onLocalTrackPublished(response: Rtc.TrackPublishedResponse) {
@@ -226,7 +268,7 @@ constructor( @@ -226,7 +268,7 @@ constructor(
226 } 268 }
227 269
228 override fun onParticipantUpdate(updates: List<Model.ParticipantInfo>) { 270 override fun onParticipantUpdate(updates: List<Model.ParticipantInfo>) {
229 - TODO("Not yet implemented") 271 + listener?.onUpdateParticipants(updates)
230 } 272 }
231 273
232 override fun onActiveSpeakersChanged(speakers: List<Rtc.SpeakerInfo>) { 274 override fun onActiveSpeakersChanged(speakers: List<Rtc.SpeakerInfo>) {
@@ -234,10 +276,11 @@ constructor( @@ -234,10 +276,11 @@ constructor(
234 } 276 }
235 277
236 override fun onClose(reason: String, code: Int) { 278 override fun onClose(reason: String, code: Int) {
237 - TODO("Not yet implemented") 279 + Timber.i { "received close event: $reason, code: $code" }
  280 + listener?.onDisconnect(reason)
238 } 281 }
239 282
240 override fun onError(error: Error) { 283 override fun onError(error: Error) {
241 - TODO("Not yet implemented") 284 + listener?.onFailToConnect(error)
242 } 285 }
243 } 286 }
@@ -8,7 +8,7 @@ import kotlin.coroutines.resume @@ -8,7 +8,7 @@ import kotlin.coroutines.resume
8 import kotlin.coroutines.suspendCoroutine 8 import kotlin.coroutines.suspendCoroutine
9 9
10 class CoroutineSdpObserver : SdpObserver { 10 class CoroutineSdpObserver : SdpObserver {
11 - private var createOutcome: Either<SessionDescription?, String?>? = null 11 + private var createOutcome: Either<SessionDescription, String?>? = null
12 set(value) { 12 set(value) {
13 field = value 13 field = value
14 if (value != null) { 14 if (value != null) {
@@ -19,7 +19,7 @@ class CoroutineSdpObserver : SdpObserver { @@ -19,7 +19,7 @@ class CoroutineSdpObserver : SdpObserver {
19 } 19 }
20 } 20 }
21 } 21 }
22 - private var pendingCreate = mutableListOf<Continuation<Either<SessionDescription?, String?>>>() 22 + private var pendingCreate = mutableListOf<Continuation<Either<SessionDescription, String?>>>()
23 23
24 private var setOutcome: Either<Unit, String?>? = null 24 private var setOutcome: Either<Unit, String?>? = null
25 set(value) { 25 set(value) {
@@ -35,7 +35,11 @@ class CoroutineSdpObserver : SdpObserver { @@ -35,7 +35,11 @@ class CoroutineSdpObserver : SdpObserver {
35 private var pendingSets = mutableListOf<Continuation<Either<Unit, String?>>>() 35 private var pendingSets = mutableListOf<Continuation<Either<Unit, String?>>>()
36 36
37 override fun onCreateSuccess(sdp: SessionDescription?) { 37 override fun onCreateSuccess(sdp: SessionDescription?) {
38 - createOutcome = Either.Left(sdp) 38 + createOutcome = if (sdp == null) {
  39 + Either.Right("empty sdp")
  40 + } else {
  41 + Either.Left(sdp)
  42 + }
39 } 43 }
40 44
41 override fun onSetSuccess() { 45 override fun onSetSuccess() {
@@ -50,7 +54,7 @@ class CoroutineSdpObserver : SdpObserver { @@ -50,7 +54,7 @@ class CoroutineSdpObserver : SdpObserver {
50 setOutcome = Either.Right(message) 54 setOutcome = Either.Right(message)
51 } 55 }
52 56
53 - suspend fun awaitCreate() = suspendCoroutine<Either<SessionDescription?, String?>> { cont -> 57 + suspend fun awaitCreate() = suspendCoroutine<Either<SessionDescription, String?>> { cont ->
54 val curOutcome = createOutcome 58 val curOutcome = createOutcome
55 if (curOutcome != null) { 59 if (curOutcome != null) {
56 cont.resume(curOutcome) 60 cont.resume(curOutcome)