davidliu
Committed by GitHub

Support data packet topics (#213)

@@ -98,7 +98,11 @@ sealed class ParticipantEvent(open val participant: Participant) : Event() { @@ -98,7 +98,11 @@ sealed class ParticipantEvent(open val participant: Participant) : Event() {
98 /** 98 /**
99 * Received data published by another participant 99 * Received data published by another participant
100 */ 100 */
101 - class DataReceived(override val participant: RemoteParticipant, val data: ByteArray) : ParticipantEvent(participant) 101 + class DataReceived(
  102 + override val participant: RemoteParticipant,
  103 + val data: ByteArray,
  104 + val topic: String?
  105 + ) : ParticipantEvent(participant)
102 106
103 /** 107 /**
104 * A track's stream state has changed. 108 * A track's stream state has changed.
@@ -145,7 +145,8 @@ sealed class RoomEvent(val room: Room) : Event() { @@ -145,7 +145,8 @@ sealed class RoomEvent(val room: Room) : Event() {
145 * @param data the published data 145 * @param data the published data
146 * @param participant the participant if available 146 * @param participant the participant if available
147 */ 147 */
148 - class DataReceived(room: Room, val data: ByteArray, val participant: RemoteParticipant?) : RoomEvent(room) 148 + class DataReceived(room: Room, val data: ByteArray, val participant: RemoteParticipant?, val topic: String?) :
  149 + RoomEvent(room)
149 150
150 /** 151 /**
151 * The connection quality for a participant has changed. 152 * The connection quality for a participant has changed.
@@ -722,10 +722,15 @@ constructor( @@ -722,10 +722,15 @@ constructor(
722 override fun onUserPacket(packet: LivekitModels.UserPacket, kind: LivekitModels.DataPacket.Kind) { 722 override fun onUserPacket(packet: LivekitModels.UserPacket, kind: LivekitModels.DataPacket.Kind) {
723 val participant = remoteParticipants[packet.participantSid] 723 val participant = remoteParticipants[packet.participantSid]
724 val data = packet.payload.toByteArray() 724 val data = packet.payload.toByteArray()
  725 + val topic = if (packet.hasTopic()) {
  726 + packet.topic
  727 + } else {
  728 + null
  729 + }
725 730
726 listener?.onDataReceived(data, participant, this) 731 listener?.onDataReceived(data, participant, this)
727 - eventBus.postEvent(RoomEvent.DataReceived(this, data, participant), coroutineScope)  
728 - participant?.onDataReceived(data) 732 + eventBus.postEvent(RoomEvent.DataReceived(this, data, participant, topic), coroutineScope)
  733 + participant?.onDataReceived(data, topic)
729 } 734 }
730 735
731 /** 736 /**
@@ -270,6 +270,8 @@ constructor( @@ -270,6 +270,8 @@ constructor(
270 val wasConnected = isConnected 270 val wasConnected = isConnected
271 271
272 if (wasConnected) { 272 if (wasConnected) {
  273 + // onClosing/onClosed will not be called after onFailure.
  274 + // Handle websocket closure here.
273 handleWebSocketClose( 275 handleWebSocketClose(
274 reason = reason ?: response?.toString() ?: t.localizedMessage ?: "websocket failure", 276 reason = reason ?: response?.toString() ?: t.localizedMessage ?: "websocket failure",
275 code = response?.code ?: CLOSE_REASON_WEBSOCKET_FAILURE 277 code = response?.code ?: CLOSE_REASON_WEBSOCKET_FAILURE
@@ -390,7 +392,7 @@ constructor( @@ -390,7 +392,7 @@ constructor(
390 quality = LivekitModels.VideoQuality.HIGH 392 quality = LivekitModels.VideoQuality.HIGH
391 } 393 }
392 394
393 - if(fps != null){ 395 + if (fps != null) {
394 setFps(fps) 396 setFps(fps)
395 } 397 }
396 } 398 }
@@ -443,12 +443,14 @@ internal constructor( @@ -443,12 +443,14 @@ internal constructor(
443 * @param data payload to send 443 * @param data payload to send
444 * @param reliability for delivery guarantee, use RELIABLE. for fastest delivery without guarantee, use LOSSY 444 * @param reliability for delivery guarantee, use RELIABLE. for fastest delivery without guarantee, use LOSSY
445 * @param destination list of participant SIDs to deliver the payload, null to deliver to everyone 445 * @param destination list of participant SIDs to deliver the payload, null to deliver to everyone
  446 + * @param topic the topic under which the message was published
446 */ 447 */
447 @Suppress("unused") 448 @Suppress("unused")
448 suspend fun publishData( 449 suspend fun publishData(
449 data: ByteArray, 450 data: ByteArray,
450 reliability: DataPublishReliability = DataPublishReliability.RELIABLE, 451 reliability: DataPublishReliability = DataPublishReliability.RELIABLE,
451 - destination: List<String>? = null 452 + destination: List<String>? = null,
  453 + topic: String? = null,
452 ) { 454 ) {
453 if (data.size > RTCEngine.MAX_DATA_PACKET_SIZE) { 455 if (data.size > RTCEngine.MAX_DATA_PACKET_SIZE) {
454 throw IllegalArgumentException("cannot publish data larger than " + RTCEngine.MAX_DATA_PACKET_SIZE) 456 throw IllegalArgumentException("cannot publish data larger than " + RTCEngine.MAX_DATA_PACKET_SIZE)
@@ -458,11 +460,15 @@ internal constructor( @@ -458,11 +460,15 @@ internal constructor(
458 DataPublishReliability.RELIABLE -> LivekitModels.DataPacket.Kind.RELIABLE 460 DataPublishReliability.RELIABLE -> LivekitModels.DataPacket.Kind.RELIABLE
459 DataPublishReliability.LOSSY -> LivekitModels.DataPacket.Kind.LOSSY 461 DataPublishReliability.LOSSY -> LivekitModels.DataPacket.Kind.LOSSY
460 } 462 }
461 - val packetBuilder = LivekitModels.UserPacket.newBuilder()  
462 - .setPayload(ByteString.copyFrom(data))  
463 - .setParticipantSid(sid)  
464 - if (destination != null) {  
465 - packetBuilder.addAllDestinationSids(destination) 463 + val packetBuilder = LivekitModels.UserPacket.newBuilder().apply {
  464 + payload = ByteString.copyFrom(data)
  465 + participantSid = sid
  466 + if (topic != null) {
  467 + setTopic(topic)
  468 + }
  469 + if (destination != null) {
  470 + addAllDestinationSids(destination)
  471 + }
466 } 472 }
467 val dataPacket = LivekitModels.DataPacket.newBuilder() 473 val dataPacket = LivekitModels.DataPacket.newBuilder()
468 .setUser(packetBuilder) 474 .setUser(packetBuilder)
@@ -179,9 +179,9 @@ class RemoteParticipant( @@ -179,9 +179,9 @@ class RemoteParticipant(
179 } 179 }
180 180
181 // Internal methods just for posting events. 181 // Internal methods just for posting events.
182 - internal fun onDataReceived(data: ByteArray) { 182 + internal fun onDataReceived(data: ByteArray, topic: String?) {
183 listener?.onDataReceived(data, this) 183 listener?.onDataReceived(data, this)
184 - eventBus.postEvent(ParticipantEvent.DataReceived(this, data), scope) 184 + eventBus.postEvent(ParticipantEvent.DataReceived(this, data, topic), scope)
185 } 185 }
186 186
187 companion object { 187 companion object {