davidliu
Committed by GitHub

Data channel check (#38)

@@ -312,38 +312,45 @@ internal constructor( @@ -312,38 +312,45 @@ internal constructor(
312 } 312 }
313 313
314 internal suspend fun sendData(dataPacket: LivekitModels.DataPacket) { 314 internal suspend fun sendData(dataPacket: LivekitModels.DataPacket) {
315 - ensurePublisherConnected() 315 + ensurePublisherConnected(dataPacket.kind)
316 316
317 val buf = DataChannel.Buffer( 317 val buf = DataChannel.Buffer(
318 ByteBuffer.wrap(dataPacket.toByteArray()), 318 ByteBuffer.wrap(dataPacket.toByteArray()),
319 true, 319 true,
320 ) 320 )
321 321
322 - val channel = when (dataPacket.kind) {  
323 - LivekitModels.DataPacket.Kind.RELIABLE -> reliableDataChannel  
324 - LivekitModels.DataPacket.Kind.LOSSY -> lossyDataChannel  
325 - else -> null  
326 - } ?: throw TrackException.PublishException("channel not established for ${dataPacket.kind.name}") 322 + val channel = dataChannelForKind(dataPacket.kind)
  323 + ?: throw TrackException.PublishException("channel not established for ${dataPacket.kind.name}")
327 324
328 channel.send(buf) 325 channel.send(buf)
329 } 326 }
330 327
331 - private suspend fun ensurePublisherConnected() { 328 + private suspend fun ensurePublisherConnected(kind: LivekitModels.DataPacket.Kind) {
332 if (!isSubscriberPrimary) { 329 if (!isSubscriberPrimary) {
333 return 330 return
334 } 331 }
335 332
336 - if (this.publisher.peerConnection.isConnected()) {  
337 - return 333 + if (!this::publisher.isInitialized) {
  334 + throw RoomException.ConnectException("Publisher is not connected!")
  335 + }
  336 +
  337 + if (!publisher.peerConnection.isConnected() &&
  338 + publisher.peerConnection.iceConnectionState() != PeerConnection.IceConnectionState.CHECKING
  339 + ) {
  340 + // start negotiation
  341 + this.negotiate();
338 } 342 }
339 343
340 - // start negotiation  
341 - this.negotiate() 344 +
  345 + val targetChannel = dataChannelForKind(kind) ?: throw IllegalArgumentException("Unknown data packet kind!")
  346 + if (targetChannel.state() == DataChannel.State.OPEN) {
  347 + return
  348 + }
342 349
343 // wait until publisher ICE connected 350 // wait until publisher ICE connected
344 val endTime = SystemClock.elapsedRealtime() + MAX_ICE_CONNECT_TIMEOUT_MS; 351 val endTime = SystemClock.elapsedRealtime() + MAX_ICE_CONNECT_TIMEOUT_MS;
345 while (SystemClock.elapsedRealtime() < endTime) { 352 while (SystemClock.elapsedRealtime() < endTime) {
346 - if (this.publisher.peerConnection.isConnected()) { 353 + if (this.publisher.peerConnection.isConnected() && targetChannel.state() == DataChannel.State.OPEN) {
347 return 354 return
348 } 355 }
349 delay(50) 356 delay(50)
@@ -352,6 +359,13 @@ internal constructor( @@ -352,6 +359,13 @@ internal constructor(
352 throw ConnectException("could not establish publisher connection") 359 throw ConnectException("could not establish publisher connection")
353 } 360 }
354 361
  362 + private fun dataChannelForKind(kind: LivekitModels.DataPacket.Kind) =
  363 + when (kind) {
  364 + LivekitModels.DataPacket.Kind.RELIABLE -> reliableDataChannel
  365 + LivekitModels.DataPacket.Kind.LOSSY -> lossyDataChannel
  366 + else -> null
  367 + }
  368 +
355 private fun getPublisherOfferConstraints(): MediaConstraints { 369 private fun getPublisherOfferConstraints(): MediaConstraints {
356 return MediaConstraints().apply { 370 return MediaConstraints().apply {
357 with(mandatory) { 371 with(mandatory) {