davidliu
Committed by GitHub

Refactor sendData to return Result instead of throwing exceptions (#703)

* Switch away from throwing exceptions internally where not needed to avoid crashes.

* changeset
正在显示 18 个修改的文件 包含 324 行增加135 行删除
  1 +---
  2 +"client-sdk-android": minor
  3 +---
  4 +
  5 +Refactor some internal data message sending methods to use Result instead of throwing Exceptions to
  6 +fix crashes.
@@ -148,7 +148,10 @@ internal constructor(timeout: Duration) : AudioTrackSink { @@ -148,7 +148,10 @@ internal constructor(timeout: Duration) : AudioTrackSink {
148 ) 148 )
149 149
150 try { 150 try {
151 - sender.write(audioData) 151 + val result = sender.write(audioData)
  152 + if (result.isFailure) {
  153 + result.exceptionOrNull()?.let { throw it }
  154 + }
152 sender.close() 155 sender.close()
153 } catch (e: Exception) { 156 } catch (e: Exception) {
154 sender.close(e.localizedMessage) 157 sender.close(e.localizedMessage)
@@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
17 package io.livekit.android.room 17 package io.livekit.android.room
18 18
19 import android.os.SystemClock 19 import android.os.SystemClock
  20 +import androidx.annotation.CheckResult
20 import androidx.annotation.VisibleForTesting 21 import androidx.annotation.VisibleForTesting
21 import com.google.protobuf.ByteString 22 import com.google.protobuf.ByteString
22 import com.vdurmont.semver4j.Semver 23 import com.vdurmont.semver4j.Semver
@@ -82,7 +83,6 @@ import livekit.org.webrtc.RtpSender @@ -82,7 +83,6 @@ import livekit.org.webrtc.RtpSender
82 import livekit.org.webrtc.RtpTransceiver 83 import livekit.org.webrtc.RtpTransceiver
83 import livekit.org.webrtc.RtpTransceiver.RtpTransceiverInit 84 import livekit.org.webrtc.RtpTransceiver.RtpTransceiverInit
84 import livekit.org.webrtc.SessionDescription 85 import livekit.org.webrtc.SessionDescription
85 -import java.net.ConnectException  
86 import java.nio.ByteBuffer 86 import java.nio.ByteBuffer
87 import javax.inject.Inject 87 import javax.inject.Inject
88 import javax.inject.Named 88 import javax.inject.Named
@@ -445,7 +445,7 @@ internal constructor( @@ -445,7 +445,7 @@ internal constructor(
445 * reconnect Signal and PeerConnections 445 * reconnect Signal and PeerConnections
446 */ 446 */
447 @Synchronized 447 @Synchronized
448 - @VisibleForTesting 448 + @VisibleForTesting(otherwise = VisibleForTesting.PACKAGE_PRIVATE)
449 fun reconnect() { 449 fun reconnect() {
450 if (reconnectingJob?.isActive == true) { 450 if (reconnectingJob?.isActive == true) {
451 LKLog.d { "Reconnection is already in progress" } 451 LKLog.d { "Reconnection is already in progress" }
@@ -625,22 +625,32 @@ internal constructor( @@ -625,22 +625,32 @@ internal constructor(
625 } 625 }
626 } 626 }
627 627
628 - internal suspend fun sendData(dataPacket: LivekitModels.DataPacket) {  
629 - ensurePublisherConnected(dataPacket.kind) 628 + @CheckResult
  629 + internal suspend fun sendData(dataPacket: LivekitModels.DataPacket): Result<Unit> {
  630 + try {
  631 + ensurePublisherConnected(dataPacket.kind)
630 632
631 - val buf = DataChannel.Buffer(  
632 - ByteBuffer.wrap(dataPacket.toByteArray()),  
633 - true,  
634 - ) 633 + val buf = DataChannel.Buffer(
  634 + ByteBuffer.wrap(dataPacket.toByteArray()),
  635 + true,
  636 + )
635 637
636 - val channel = dataChannelForKind(dataPacket.kind)  
637 - ?: throw TrackException.PublishException("channel not established for ${dataPacket.kind.name}") 638 + val channel = dataChannelForKind(dataPacket.kind)
  639 + ?: throw RoomException.ConnectException("channel not established for ${dataPacket.kind.name}")
638 640
639 - channel.send(buf) 641 + channel.send(buf)
  642 + } catch (e: Exception) {
  643 + return Result.failure(e)
  644 + }
  645 + return Result.success(Unit)
640 } 646 }
641 647
642 internal suspend fun waitForBufferStatusLow(kind: LivekitModels.DataPacket.Kind) { 648 internal suspend fun waitForBufferStatusLow(kind: LivekitModels.DataPacket.Kind) {
643 - ensurePublisherConnected(kind) 649 + try {
  650 + ensurePublisherConnected(kind)
  651 + } catch (e: Exception) {
  652 + return
  653 + }
644 val manager = when (kind) { 654 val manager = when (kind) {
645 LivekitModels.DataPacket.Kind.RELIABLE -> reliableDataChannelManager 655 LivekitModels.DataPacket.Kind.RELIABLE -> reliableDataChannelManager
646 LivekitModels.DataPacket.Kind.LOSSY -> lossyDataChannelManager 656 LivekitModels.DataPacket.Kind.LOSSY -> lossyDataChannelManager
@@ -650,11 +660,12 @@ internal constructor( @@ -650,11 +660,12 @@ internal constructor(
650 } 660 }
651 661
652 if (manager == null) { 662 if (manager == null) {
653 - throw IllegalStateException("Not connected!") 663 + return
654 } 664 }
655 manager.waitForBufferedAmountLow(DATA_CHANNEL_LOW_THRESHOLD.toLong()) 665 manager.waitForBufferedAmountLow(DATA_CHANNEL_LOW_THRESHOLD.toLong())
656 } 666 }
657 667
  668 + @Throws(exceptionClasses = [RoomException.ConnectException::class])
658 private suspend fun ensurePublisherConnected(kind: LivekitModels.DataPacket.Kind) { 669 private suspend fun ensurePublisherConnected(kind: LivekitModels.DataPacket.Kind) {
659 if (!isSubscriberPrimary) { 670 if (!isSubscriberPrimary) {
660 return 671 return
@@ -671,7 +682,7 @@ internal constructor( @@ -671,7 +682,7 @@ internal constructor(
671 this.negotiatePublisher() 682 this.negotiatePublisher()
672 } 683 }
673 684
674 - val targetChannel = dataChannelForKind(kind) ?: throw IllegalArgumentException("Unknown data packet kind!") 685 + val targetChannel = dataChannelForKind(kind) ?: throw RoomException.ConnectException("Publisher isn't setup yet! Is room not connected?!")
675 if (targetChannel.state() == DataChannel.State.OPEN) { 686 if (targetChannel.state() == DataChannel.State.OPEN) {
676 return 687 return
677 } 688 }
@@ -685,14 +696,14 @@ internal constructor( @@ -685,14 +696,14 @@ internal constructor(
685 delay(50) 696 delay(50)
686 } 697 }
687 698
688 - throw ConnectException("could not establish publisher connection") 699 + throw RoomException.ConnectException("could not establish publisher connection")
689 } 700 }
690 701
691 private fun dataChannelForKind(kind: LivekitModels.DataPacket.Kind) = 702 private fun dataChannelForKind(kind: LivekitModels.DataPacket.Kind) =
692 when (kind) { 703 when (kind) {
693 LivekitModels.DataPacket.Kind.RELIABLE -> reliableDataChannel 704 LivekitModels.DataPacket.Kind.RELIABLE -> reliableDataChannel
694 LivekitModels.DataPacket.Kind.LOSSY -> lossyDataChannel 705 LivekitModels.DataPacket.Kind.LOSSY -> lossyDataChannel
695 - else -> null 706 + LivekitModels.DataPacket.Kind.UNRECOGNIZED -> throw IllegalArgumentException("Unknown data packet kind!")
696 } 707 }
697 708
698 private fun getPublisherOfferConstraints(): MediaConstraints { 709 private fun getPublisherOfferConstraints(): MediaConstraints {
@@ -1137,6 +1148,7 @@ internal constructor( @@ -1137,6 +1148,7 @@ internal constructor(
1137 1148
1138 val dataChannelInfos = LivekitModels.DataPacket.Kind.values() 1149 val dataChannelInfos = LivekitModels.DataPacket.Kind.values()
1139 .toList() 1150 .toList()
  1151 + .filterNot { it == LivekitModels.DataPacket.Kind.UNRECOGNIZED }
1140 .mapNotNull { kind -> dataChannelForKind(kind) } 1152 .mapNotNull { kind -> dataChannelForKind(kind) }
1141 .map { dataChannel -> 1153 .map { dataChannel ->
1142 LivekitRtc.DataChannelInfo.newBuilder() 1154 LivekitRtc.DataChannelInfo.newBuilder()
@@ -17,13 +17,48 @@ @@ -17,13 +17,48 @@
17 package io.livekit.android.room.datastream 17 package io.livekit.android.room.datastream
18 18
19 sealed class StreamException(message: String? = null) : Exception(message) { 19 sealed class StreamException(message: String? = null) : Exception(message) {
  20 + /**
  21 + * Unable to open a stream with the same ID as an existing open stream.
  22 + */
20 class AlreadyOpenedException : StreamException() 23 class AlreadyOpenedException : StreamException()
21 - class AbnormalEndException(message: String?) : StreamException(message) 24 +
  25 + /**
  26 + * Stream closed abnormally by remote participant.
  27 + */
  28 + class AbnormalEndException(message: String? = null) : StreamException(message)
  29 +
  30 + /**
  31 + * Incoming chunk data could not be decoded.
  32 + */
22 class DecodeFailedException : StreamException() 33 class DecodeFailedException : StreamException()
  34 +
  35 + /**
  36 + * Length exceeded total length specified in stream header.
  37 + */
23 class LengthExceededException : StreamException() 38 class LengthExceededException : StreamException()
  39 +
  40 + /**
  41 + * Length is less than total length specified in stream header.
  42 + */
24 class IncompleteException : StreamException() 43 class IncompleteException : StreamException()
25 - class TerminatedException : StreamException() 44 +
  45 + /**
  46 + * Stream terminated before completion.
  47 + */
  48 + class TerminatedException(message: String? = null) : StreamException(message)
  49 +
  50 + /**
  51 + * Cannot perform operations on an unknown stream.
  52 + */
26 class UnknownStreamException : StreamException() 53 class UnknownStreamException : StreamException()
  54 +
  55 + /**
  56 + * Given destination URL is not a directory.
  57 + */
27 class NotDirectoryException : StreamException() 58 class NotDirectoryException : StreamException()
  59 +
  60 + /**
  61 + * Unable to read information about the file to send.
  62 + */
28 class FileInfoUnavailableException : StreamException() 63 class FileInfoUnavailableException : StreamException()
29 } 64 }
@@ -16,21 +16,31 @@ @@ -16,21 +16,31 @@
16 16
17 package io.livekit.android.room.datastream.outgoing 17 package io.livekit.android.room.datastream.outgoing
18 18
  19 +import androidx.annotation.CheckResult
19 import io.livekit.android.room.datastream.StreamException 20 import io.livekit.android.room.datastream.StreamException
20 21
21 abstract class BaseStreamSender<T>( 22 abstract class BaseStreamSender<T>(
22 internal val destination: StreamDestination<T>, 23 internal val destination: StreamDestination<T>,
23 ) { 24 ) {
24 25
25 - suspend fun write(data: T) { 26 + val isOpen: Boolean
  27 + get() = destination.isOpen
  28 +
  29 + /**
  30 + * Write to the stream.
  31 + */
  32 + @CheckResult
  33 + suspend fun write(data: T): Result<Unit> {
26 if (!destination.isOpen) { 34 if (!destination.isOpen) {
27 - throw StreamException.TerminatedException() 35 + return Result.failure(StreamException.TerminatedException())
28 } 36 }
29 37
30 - writeImpl(data) 38 + return writeImpl(data)
31 } 39 }
32 40
33 - internal abstract suspend fun writeImpl(data: T) 41 + @CheckResult
  42 + internal abstract suspend fun writeImpl(data: T): Result<Unit>
  43 +
34 suspend fun close(reason: String? = null) { 44 suspend fun close(reason: String? = null) {
35 destination.close(reason) 45 destination.close(reason)
36 } 46 }
@@ -41,7 +51,9 @@ abstract class BaseStreamSender<T>( @@ -41,7 +51,9 @@ abstract class BaseStreamSender<T>(
41 */ 51 */
42 interface StreamDestination<T> { 52 interface StreamDestination<T> {
43 val isOpen: Boolean 53 val isOpen: Boolean
44 - suspend fun write(data: T, chunker: DataChunker<T>) 54 +
  55 + @CheckResult
  56 + suspend fun write(data: T, chunker: DataChunker<T>): Result<Unit>
45 suspend fun close(reason: String?) 57 suspend fun close(reason: String?)
46 } 58 }
47 59
@@ -30,8 +30,9 @@ class ByteStreamSender( @@ -30,8 +30,9 @@ class ByteStreamSender(
30 val info: ByteStreamInfo, 30 val info: ByteStreamInfo,
31 destination: StreamDestination<ByteArray>, 31 destination: StreamDestination<ByteArray>,
32 ) : BaseStreamSender<ByteArray>(destination = destination) { 32 ) : BaseStreamSender<ByteArray>(destination = destination) {
33 - override suspend fun writeImpl(data: ByteArray) {  
34 - destination.write(data, byteDataChunker) 33 +
  34 + override suspend fun writeImpl(data: ByteArray): Result<Unit> {
  35 + return destination.write(data, byteDataChunker)
35 } 36 }
36 } 37 }
37 38
@@ -50,9 +51,7 @@ private val byteDataChunker: DataChunker<ByteArray> = { data: ByteArray, chunkSi @@ -50,9 +51,7 @@ private val byteDataChunker: DataChunker<ByteArray> = { data: ByteArray, chunkSi
50 } 51 }
51 52
52 /** 53 /**
53 - * Reads the file and writes it to the data stream.  
54 - *  
55 - * @throws 54 + * Reads the file from [filePath] and writes it to the data stream.
56 */ 55 */
57 suspend fun ByteStreamSender.writeFile(filePath: String) { 56 suspend fun ByteStreamSender.writeFile(filePath: String) {
58 write(FileSystem.SYSTEM.source(filePath.toPath())) 57 write(FileSystem.SYSTEM.source(filePath.toPath()))
@@ -68,14 +67,23 @@ suspend fun ByteStreamSender.write(input: InputStream) { @@ -68,14 +67,23 @@ suspend fun ByteStreamSender.write(input: InputStream) {
68 /** 67 /**
69 * Reads the source and sends it to the data stream. 68 * Reads the source and sends it to the data stream.
70 */ 69 */
71 -suspend fun ByteStreamSender.write(source: Source) { 70 +suspend fun ByteStreamSender.write(source: Source): Result<Unit> {
72 val buffer = Buffer() 71 val buffer = Buffer()
73 while (true) { 72 while (true) {
74 - val readLen = source.read(buffer, 4096)  
75 - if (readLen == -1L) {  
76 - break  
77 - } 73 + try {
  74 + val readLen = source.read(buffer, 4096)
  75 + if (readLen == -1L) {
  76 + break
  77 + }
78 78
79 - write(buffer.readByteArray()) 79 + val result = write(buffer.readByteArray())
  80 + if (result.isFailure) {
  81 + return result
  82 + }
  83 + } catch (e: Exception) {
  84 + return Result.failure(e)
  85 + }
80 } 86 }
  87 +
  88 + return Result.success(Unit)
81 } 89 }
@@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
16 16
17 package io.livekit.android.room.datastream.outgoing 17 package io.livekit.android.room.datastream.outgoing
18 18
  19 +import androidx.annotation.CheckResult
19 import com.google.protobuf.ByteString 20 import com.google.protobuf.ByteString
20 import io.livekit.android.room.RTCEngine 21 import io.livekit.android.room.RTCEngine
21 import io.livekit.android.room.datastream.ByteStreamInfo 22 import io.livekit.android.room.datastream.ByteStreamInfo
@@ -36,11 +37,15 @@ import javax.inject.Inject @@ -36,11 +37,15 @@ import javax.inject.Inject
36 interface OutgoingDataStreamManager { 37 interface OutgoingDataStreamManager {
37 /** 38 /**
38 * Start sending a stream of text. Call [TextStreamSender.close] when finished sending. 39 * Start sending a stream of text. Call [TextStreamSender.close] when finished sending.
  40 + *
  41 + * @throws StreamException if the stream failed to open.
39 */ 42 */
40 suspend fun streamText(options: StreamTextOptions = StreamTextOptions()): TextStreamSender 43 suspend fun streamText(options: StreamTextOptions = StreamTextOptions()): TextStreamSender
41 44
42 /** 45 /**
43 * Start sending a stream of bytes. Call [ByteStreamSender.close] when finished sending. 46 * Start sending a stream of bytes. Call [ByteStreamSender.close] when finished sending.
  47 + *
  48 + * @throws StreamException if the stream failed to open.
44 */ 49 */
45 suspend fun streamBytes(options: StreamBytesOptions): ByteStreamSender 50 suspend fun streamBytes(options: StreamBytesOptions): ByteStreamSender
46 } 51 }
@@ -63,10 +68,11 @@ constructor( @@ -63,10 +68,11 @@ constructor(
63 68
64 private val openStreams = Collections.synchronizedMap(mutableMapOf<String, Descriptor>()) 69 private val openStreams = Collections.synchronizedMap(mutableMapOf<String, Descriptor>())
65 70
  71 + @CheckResult
66 private suspend fun openStream( 72 private suspend fun openStream(
67 info: StreamInfo, 73 info: StreamInfo,
68 destinationIdentities: List<Participant.Identity> = emptyList(), 74 destinationIdentities: List<Participant.Identity> = emptyList(),
69 - ) { 75 + ): Result<Unit> {
70 if (openStreams.containsKey(info.id)) { 76 if (openStreams.containsKey(info.id)) {
71 throw StreamException.AlreadyOpenedException() 77 throw StreamException.AlreadyOpenedException()
72 } 78 }
@@ -112,15 +118,20 @@ constructor( @@ -112,15 +118,20 @@ constructor(
112 build() 118 build()
113 } 119 }
114 120
115 - engine.sendData(headerPacket) 121 + val result = engine.sendData(headerPacket)
  122 + if (result.isFailure) {
  123 + return result
  124 + }
116 125
117 val descriptor = Descriptor(info, destinationIdentityStrings) 126 val descriptor = Descriptor(info, destinationIdentityStrings)
118 openStreams[info.id] = descriptor 127 openStreams[info.id] = descriptor
119 128
120 LKLog.d { "Opened send stream ${info.id}" } 129 LKLog.d { "Opened send stream ${info.id}" }
  130 + return Result.success(Unit)
121 } 131 }
122 132
123 - private suspend fun sendChunk(streamId: String, dataChunk: ByteArray) { 133 + @CheckResult
  134 + private suspend fun sendChunk(streamId: String, dataChunk: ByteArray): Result<Unit> {
124 val descriptor = openStreams[streamId] ?: throw StreamException.UnknownStreamException() 135 val descriptor = openStreams[streamId] ?: throw StreamException.UnknownStreamException()
125 val nextChunkIndex = descriptor.nextChunkIndex.getAndIncrement() 136 val nextChunkIndex = descriptor.nextChunkIndex.getAndIncrement()
126 137
@@ -137,7 +148,7 @@ constructor( @@ -137,7 +148,7 @@ constructor(
137 } 148 }
138 149
139 engine.waitForBufferStatusLow(DataPacket.Kind.RELIABLE) 150 engine.waitForBufferStatusLow(DataPacket.Kind.RELIABLE)
140 - engine.sendData(chunkPacket) 151 + return engine.sendData(chunkPacket)
141 } 152 }
142 153
143 private suspend fun closeStream(streamId: String, reason: String? = null) { 154 private suspend fun closeStream(streamId: String, reason: String? = null) {
@@ -157,7 +168,12 @@ constructor( @@ -157,7 +168,12 @@ constructor(
157 } 168 }
158 169
159 engine.waitForBufferStatusLow(DataPacket.Kind.RELIABLE) 170 engine.waitForBufferStatusLow(DataPacket.Kind.RELIABLE)
160 - engine.sendData(trailerPacket) 171 + val result = engine.sendData(trailerPacket)
  172 +
  173 + if (result.isFailure) {
  174 + // Log close failure only for now.
  175 + LKLog.w(result.exceptionOrNull()) { "Error when closing stream!" }
  176 + }
161 177
162 openStreams.remove(streamId) 178 openStreams.remove(streamId)
163 LKLog.d { "Closed send stream $streamId" } 179 LKLog.d { "Closed send stream $streamId" }
@@ -178,7 +194,11 @@ constructor( @@ -178,7 +194,11 @@ constructor(
178 ) 194 )
179 195
180 val streamId = options.streamId 196 val streamId = options.streamId
181 - openStream(streamInfo, options.destinationIdentities) 197 + val result = openStream(streamInfo, options.destinationIdentities)
  198 +
  199 + if (result.isFailure) {
  200 + throw result.exceptionOrNull() ?: StreamException.TerminatedException("Unknown failure when opening the stream!")
  201 + }
182 202
183 val destination = ManagerStreamDestination<String>(streamId) 203 val destination = ManagerStreamDestination<String>(streamId)
184 return TextStreamSender( 204 return TextStreamSender(
@@ -199,8 +219,11 @@ constructor( @@ -199,8 +219,11 @@ constructor(
199 ) 219 )
200 220
201 val streamId = options.streamId 221 val streamId = options.streamId
202 - openStream(streamInfo, options.destinationIdentities) 222 + val result = openStream(streamInfo, options.destinationIdentities)
203 223
  224 + if (result.isFailure) {
  225 + throw result.exceptionOrNull() ?: StreamException.TerminatedException("Unknown failure when opening the stream!")
  226 + }
204 val destination = ManagerStreamDestination<ByteArray>(streamId) 227 val destination = ManagerStreamDestination<ByteArray>(streamId)
205 return ByteStreamSender( 228 return ByteStreamSender(
206 streamInfo, 229 streamInfo,
@@ -212,12 +235,19 @@ constructor( @@ -212,12 +235,19 @@ constructor(
212 override val isOpen: Boolean 235 override val isOpen: Boolean
213 get() = openStreams.contains(streamId) 236 get() = openStreams.contains(streamId)
214 237
215 - override suspend fun write(data: T, chunker: DataChunker<T>) { 238 + override suspend fun write(data: T, chunker: DataChunker<T>): Result<Unit> {
  239 + if (!isOpen) {
  240 + return Result.failure(StreamException.TerminatedException("Stream is closed!"))
  241 + }
216 val chunks = chunker.invoke(data, RTCEngine.MAX_DATA_PACKET_SIZE) 242 val chunks = chunker.invoke(data, RTCEngine.MAX_DATA_PACKET_SIZE)
217 243
218 for (chunk in chunks) { 244 for (chunk in chunks) {
219 - sendChunk(streamId, chunk) 245 + val result = sendChunk(streamId, chunk)
  246 + if (result.isFailure) {
  247 + return result
  248 + }
220 } 249 }
  250 + return Result.success(Unit)
221 } 251 }
222 252
223 override suspend fun close(reason: String?) { 253 override suspend fun close(reason: String?) {
@@ -23,8 +23,8 @@ class TextStreamSender( @@ -23,8 +23,8 @@ class TextStreamSender(
23 val info: TextStreamInfo, 23 val info: TextStreamInfo,
24 destination: StreamDestination<String>, 24 destination: StreamDestination<String>,
25 ) : BaseStreamSender<String>(destination) { 25 ) : BaseStreamSender<String>(destination) {
26 - override suspend fun writeImpl(data: String) {  
27 - destination.write(data, stringChunker) 26 + override suspend fun writeImpl(data: String): Result<Unit> {
  27 + return destination.write(data, stringChunker)
28 } 28 }
29 } 29 }
30 30
1 /* 1 /*
2 - * Copyright 2024 LiveKit, Inc. 2 + * Copyright 2024-2025 LiveKit, Inc.
3 * 3 *
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
@@ -68,7 +68,10 @@ private suspend fun collectPublisherMetrics(room: Room, rtcEngine: RTCEngine) { @@ -68,7 +68,10 @@ private suspend fun collectPublisherMetrics(room: Room, rtcEngine: RTCEngine) {
68 } 68 }
69 69
70 try { 70 try {
71 - rtcEngine.sendData(dataPacket) 71 + val result = rtcEngine.sendData(dataPacket)
  72 + result.exceptionOrNull()?.let {
  73 + throw it
  74 + }
72 } catch (e: Exception) { 75 } catch (e: Exception) {
73 LKLog.i(e) { "Error sending metrics: " } 76 LKLog.i(e) { "Error sending metrics: " }
74 } 77 }
@@ -98,7 +101,10 @@ private suspend fun collectSubscriberMetrics(room: Room, rtcEngine: RTCEngine) { @@ -98,7 +101,10 @@ private suspend fun collectSubscriberMetrics(room: Room, rtcEngine: RTCEngine) {
98 } 101 }
99 102
100 try { 103 try {
101 - rtcEngine.sendData(dataPacket) 104 + val result = rtcEngine.sendData(dataPacket)
  105 + result.exceptionOrNull()?.let {
  106 + throw it
  107 + }
102 } catch (e: Exception) { 108 } catch (e: Exception) {
103 LKLog.i(e) { "Error sending metrics: " } 109 LKLog.i(e) { "Error sending metrics: " }
104 } 110 }
@@ -19,6 +19,7 @@ package io.livekit.android.room.participant @@ -19,6 +19,7 @@ package io.livekit.android.room.participant
19 import android.Manifest 19 import android.Manifest
20 import android.content.Context 20 import android.content.Context
21 import android.content.Intent 21 import android.content.Intent
  22 +import androidx.annotation.CheckResult
22 import androidx.annotation.VisibleForTesting 23 import androidx.annotation.VisibleForTesting
23 import com.google.protobuf.ByteString 24 import com.google.protobuf.ByteString
24 import com.vdurmont.semver4j.Semver 25 import com.vdurmont.semver4j.Semver
@@ -62,7 +63,6 @@ import kotlinx.coroutines.Job @@ -62,7 +63,6 @@ import kotlinx.coroutines.Job
62 import kotlinx.coroutines.async 63 import kotlinx.coroutines.async
63 import kotlinx.coroutines.coroutineScope 64 import kotlinx.coroutines.coroutineScope
64 import kotlinx.coroutines.delay 65 import kotlinx.coroutines.delay
65 -import kotlinx.coroutines.flow.map  
66 import kotlinx.coroutines.launch 66 import kotlinx.coroutines.launch
67 import kotlinx.coroutines.suspendCancellableCoroutine 67 import kotlinx.coroutines.suspendCancellableCoroutine
68 import kotlinx.coroutines.sync.Mutex 68 import kotlinx.coroutines.sync.Mutex
@@ -315,7 +315,8 @@ internal constructor( @@ -315,7 +315,8 @@ internal constructor(
315 * 315 *
316 * For screenshare audio, a [ScreenAudioCapturer] can be used. 316 * For screenshare audio, a [ScreenAudioCapturer] can be used.
317 * 317 *
318 - * @param mediaProjectionPermissionResultData The resultData returned from launching 318 + * @param screenCaptureParams When enabling the screenshare, this must be provided with
  319 + * [ScreenCaptureParams.mediaProjectionPermissionResultData] containing resultData returned from launching
319 * [MediaProjectionManager.createScreenCaptureIntent()](https://developer.android.com/reference/android/media/projection/MediaProjectionManager#createScreenCaptureIntent()). 320 * [MediaProjectionManager.createScreenCaptureIntent()](https://developer.android.com/reference/android/media/projection/MediaProjectionManager#createScreenCaptureIntent()).
320 * @throws IllegalArgumentException if attempting to enable screenshare without [mediaProjectionPermissionResultData] 321 * @throws IllegalArgumentException if attempting to enable screenshare without [mediaProjectionPermissionResultData]
321 * @see Room.screenShareTrackCaptureDefaults 322 * @see Room.screenShareTrackCaptureDefaults
@@ -904,14 +905,17 @@ internal constructor( @@ -904,14 +905,17 @@ internal constructor(
904 * @param reliability for delivery guarantee, use RELIABLE. for fastest delivery without guarantee, use LOSSY 905 * @param reliability for delivery guarantee, use RELIABLE. for fastest delivery without guarantee, use LOSSY
905 * @param topic the topic under which the message was published 906 * @param topic the topic under which the message was published
906 * @param identities list of participant identities to deliver the payload, null to deliver to everyone 907 * @param identities list of participant identities to deliver the payload, null to deliver to everyone
  908 + *
  909 + * @return A [Result] that succeeds if the publish succeeded, or a failure containing the exception.
907 */ 910 */
908 @Suppress("unused") 911 @Suppress("unused")
  912 + @CheckResult
909 suspend fun publishData( 913 suspend fun publishData(
910 data: ByteArray, 914 data: ByteArray,
911 reliability: DataPublishReliability = DataPublishReliability.RELIABLE, 915 reliability: DataPublishReliability = DataPublishReliability.RELIABLE,
912 topic: String? = null, 916 topic: String? = null,
913 identities: List<Identity>? = null, 917 identities: List<Identity>? = null,
914 - ) { 918 + ): Result<Unit> {
915 if (data.size > RTCEngine.MAX_DATA_PACKET_SIZE) { 919 if (data.size > RTCEngine.MAX_DATA_PACKET_SIZE) {
916 throw IllegalArgumentException("cannot publish data larger than " + RTCEngine.MAX_DATA_PACKET_SIZE) 920 throw IllegalArgumentException("cannot publish data larger than " + RTCEngine.MAX_DATA_PACKET_SIZE)
917 } 921 }
@@ -935,7 +939,7 @@ internal constructor( @@ -935,7 +939,7 @@ internal constructor(
935 .setKind(kind) 939 .setKind(kind)
936 .build() 940 .build()
937 941
938 - engine.sendData(dataPacket) 942 + return engine.sendData(dataPacket)
939 } 943 }
940 944
941 /** 945 /**
@@ -946,13 +950,16 @@ internal constructor( @@ -946,13 +950,16 @@ internal constructor(
946 * 950 *
947 * @param code an integer representing the DTMF signal code 951 * @param code an integer representing the DTMF signal code
948 * @param digit the string representing the DTMF digit (e.g., "1", "#", "*") 952 * @param digit the string representing the DTMF digit (e.g., "1", "#", "*")
  953 + *
  954 + * @return A [Result] that succeeds if the publish succeeded, or a failure containing the exception.
949 */ 955 */
950 956
951 @Suppress("unused") 957 @Suppress("unused")
  958 + @CheckResult
952 suspend fun publishDtmf( 959 suspend fun publishDtmf(
953 code: Int, 960 code: Int,
954 digit: String, 961 digit: String,
955 - ) { 962 + ): Result<Unit> {
956 val sipDTMF = LivekitModels.SipDTMF.newBuilder().setCode(code) 963 val sipDTMF = LivekitModels.SipDTMF.newBuilder().setCode(code)
957 .setDigit(digit) 964 .setDigit(digit)
958 .build() 965 .build()
@@ -962,7 +969,7 @@ internal constructor( @@ -962,7 +969,7 @@ internal constructor(
962 .setKind(LivekitModels.DataPacket.Kind.RELIABLE) 969 .setKind(LivekitModels.DataPacket.Kind.RELIABLE)
963 .build() 970 .build()
964 971
965 - engine.sendData(dataPacket) 972 + return engine.sendData(dataPacket)
966 } 973 }
967 974
968 /** 975 /**
@@ -998,7 +1005,6 @@ internal constructor( @@ -998,7 +1005,6 @@ internal constructor(
998 * @see RpcInvocationData 1005 * @see RpcInvocationData
999 * @see performRpc 1006 * @see performRpc
1000 */ 1007 */
1001 - @Suppress("RedundantSuspendModifier")  
1002 override suspend fun registerRpcMethod( 1008 override suspend fun registerRpcMethod(
1003 method: String, 1009 method: String,
1004 handler: RpcHandler, 1010 handler: RpcHandler,
@@ -1088,7 +1094,7 @@ internal constructor( @@ -1088,7 +1094,7 @@ internal constructor(
1088 1094
1089 val requestId = UUID.randomUUID().toString() 1095 val requestId = UUID.randomUUID().toString()
1090 1096
1091 - publishRpcRequest( 1097 + val result = publishRpcRequest(
1092 destinationIdentity = destinationIdentity, 1098 destinationIdentity = destinationIdentity,
1093 requestId = requestId, 1099 requestId = requestId,
1094 method = method, 1100 method = method,
@@ -1096,6 +1102,12 @@ internal constructor( @@ -1096,6 +1102,12 @@ internal constructor(
1096 responseTimeout = responseTimeout - maxRoundTripLatency, 1102 responseTimeout = responseTimeout - maxRoundTripLatency,
1097 ) 1103 )
1098 1104
  1105 + if (result.isFailure) {
  1106 + val exception = result.exceptionOrNull() as? RpcError
  1107 + ?: RpcError.BuiltinRpcError.SEND_FAILED.create(data = "Error while sending rpc request.", cause = result.exceptionOrNull())
  1108 + throw exception
  1109 + }
  1110 +
1099 val responsePayload = suspendCancellableCoroutine { continuation -> 1111 val responsePayload = suspendCancellableCoroutine { continuation ->
1100 var ackTimeoutJob: Job? = null 1112 var ackTimeoutJob: Job? = null
1101 var responseTimeoutJob: Job? = null 1113 var responseTimeoutJob: Job? = null
@@ -1149,13 +1161,25 @@ internal constructor( @@ -1149,13 +1161,25 @@ internal constructor(
1149 return@coroutineScope responsePayload 1161 return@coroutineScope responsePayload
1150 } 1162 }
1151 1163
  1164 + @CheckResult
  1165 + private suspend fun rpcSendData(dataPacket: DataPacket): Result<Unit> {
  1166 + val result = engine.sendData(dataPacket)
  1167 +
  1168 + return if (result.isFailure) {
  1169 + Result.failure(RpcError.BuiltinRpcError.SEND_FAILED.create(cause = result.exceptionOrNull()))
  1170 + } else {
  1171 + result
  1172 + }
  1173 + }
  1174 +
  1175 + @CheckResult
1152 private suspend fun publishRpcRequest( 1176 private suspend fun publishRpcRequest(
1153 destinationIdentity: Identity, 1177 destinationIdentity: Identity,
1154 requestId: String, 1178 requestId: String,
1155 method: String, 1179 method: String,
1156 payload: String, 1180 payload: String,
1157 responseTimeout: Duration = 10.seconds, 1181 responseTimeout: Duration = 10.seconds,
1158 - ) { 1182 + ): Result<Unit> {
1159 if (payload.byteLength() > RTCEngine.MAX_DATA_PACKET_SIZE) { 1183 if (payload.byteLength() > RTCEngine.MAX_DATA_PACKET_SIZE) {
1160 throw IllegalArgumentException("cannot publish data larger than " + RTCEngine.MAX_DATA_PACKET_SIZE) 1184 throw IllegalArgumentException("cannot publish data larger than " + RTCEngine.MAX_DATA_PACKET_SIZE)
1161 } 1185 }
@@ -1174,15 +1198,16 @@ internal constructor( @@ -1174,15 +1198,16 @@ internal constructor(
1174 build() 1198 build()
1175 } 1199 }
1176 1200
1177 - engine.sendData(dataPacket) 1201 + return rpcSendData(dataPacket)
1178 } 1202 }
1179 1203
  1204 + @CheckResult
1180 private suspend fun publishRpcResponse( 1205 private suspend fun publishRpcResponse(
1181 destinationIdentity: Identity, 1206 destinationIdentity: Identity,
1182 requestId: String, 1207 requestId: String,
1183 payload: String?, 1208 payload: String?,
1184 error: RpcError?, 1209 error: RpcError?,
1185 - ) { 1210 + ): Result<Unit> {
1186 if (payload.byteLength() > RTCEngine.MAX_DATA_PACKET_SIZE) { 1211 if (payload.byteLength() > RTCEngine.MAX_DATA_PACKET_SIZE) {
1187 throw IllegalArgumentException("cannot publish data larger than " + RTCEngine.MAX_DATA_PACKET_SIZE) 1212 throw IllegalArgumentException("cannot publish data larger than " + RTCEngine.MAX_DATA_PACKET_SIZE)
1188 } 1213 }
@@ -1202,13 +1227,14 @@ internal constructor( @@ -1202,13 +1227,14 @@ internal constructor(
1202 build() 1227 build()
1203 } 1228 }
1204 1229
1205 - engine.sendData(dataPacket) 1230 + return rpcSendData(dataPacket)
1206 } 1231 }
1207 1232
  1233 + @CheckResult
1208 private suspend fun publishRpcAck( 1234 private suspend fun publishRpcAck(
1209 destinationIdentity: Identity, 1235 destinationIdentity: Identity,
1210 requestId: String, 1236 requestId: String,
1211 - ) { 1237 + ): Result<Unit> {
1212 val dataPacket = with(DataPacket.newBuilder()) { 1238 val dataPacket = with(DataPacket.newBuilder()) {
1213 addDestinationIdentities(destinationIdentity.value) 1239 addDestinationIdentities(destinationIdentity.value)
1214 kind = DataPacket.Kind.RELIABLE 1240 kind = DataPacket.Kind.RELIABLE
@@ -1219,7 +1245,7 @@ internal constructor( @@ -1219,7 +1245,7 @@ internal constructor(
1219 build() 1245 build()
1220 } 1246 }
1221 1247
1222 - engine.sendData(dataPacket) 1248 + return rpcSendData(dataPacket)
1223 } 1249 }
1224 1250
1225 private fun handleIncomingRpcAck(requestId: String) { 1251 private fun handleIncomingRpcAck(requestId: String) {
@@ -1252,7 +1278,12 @@ internal constructor( @@ -1252,7 +1278,12 @@ internal constructor(
1252 responseTimeout: Duration, 1278 responseTimeout: Duration,
1253 version: Int, 1279 version: Int,
1254 ) { 1280 ) {
1255 - publishRpcAck(callerIdentity, requestId) 1281 + publishRpcAck(callerIdentity, requestId).also { result ->
  1282 + if (result.isFailure) {
  1283 + LKLog.w(result.exceptionOrNull()) { "Error sending ack for request $requestId." }
  1284 + return
  1285 + }
  1286 + }
1256 1287
1257 if (version != RpcManager.RPC_VERSION) { 1288 if (version != RpcManager.RPC_VERSION) {
1258 publishRpcResponse( 1289 publishRpcResponse(
@@ -1260,7 +1291,12 @@ internal constructor( @@ -1260,7 +1291,12 @@ internal constructor(
1260 requestId = requestId, 1291 requestId = requestId,
1261 payload = null, 1292 payload = null,
1262 error = RpcError.BuiltinRpcError.UNSUPPORTED_VERSION.create(), 1293 error = RpcError.BuiltinRpcError.UNSUPPORTED_VERSION.create(),
1263 - ) 1294 + ).also { result ->
  1295 + if (result.isFailure) {
  1296 + LKLog.w(result.exceptionOrNull()) { "Error sending error response for request $requestId." }
  1297 + }
  1298 + }
  1299 +
1264 return 1300 return
1265 } 1301 }
1266 1302
@@ -1272,7 +1308,12 @@ internal constructor( @@ -1272,7 +1308,12 @@ internal constructor(
1272 requestId = requestId, 1308 requestId = requestId,
1273 payload = null, 1309 payload = null,
1274 error = RpcError.BuiltinRpcError.UNSUPPORTED_METHOD.create(), 1310 error = RpcError.BuiltinRpcError.UNSUPPORTED_METHOD.create(),
1275 - ) 1311 + ).also { result ->
  1312 + if (result.isFailure) {
  1313 + LKLog.w(result.exceptionOrNull()) { "Error sending error response for request $requestId." }
  1314 + }
  1315 + }
  1316 +
1276 return 1317 return
1277 } 1318 }
1278 1319
@@ -1309,7 +1350,11 @@ internal constructor( @@ -1309,7 +1350,11 @@ internal constructor(
1309 requestId = requestId, 1350 requestId = requestId,
1310 payload = responsePayload, 1351 payload = responsePayload,
1311 error = responseError, 1352 error = responseError,
1312 - ) 1353 + ).also { result ->
  1354 + if (result.isFailure) {
  1355 + LKLog.w(result.exceptionOrNull()) { "Error sending error response for request $requestId." }
  1356 + }
  1357 + }
1313 } 1358 }
1314 1359
1315 internal fun handleParticipantDisconnect(identity: Identity) { 1360 internal fun handleParticipantDisconnect(identity: Identity) {
@@ -44,6 +44,12 @@ data class RpcError( @@ -44,6 +44,12 @@ data class RpcError(
44 * An optional data payload. Must be smaller than 15KB in size, or else will be truncated. 44 * An optional data payload. Must be smaller than 15KB in size, or else will be truncated.
45 */ 45 */
46 val data: String = "", 46 val data: String = "",
  47 +
  48 + /**
  49 + * The local cause of the error, if any. This will not be passed over the wire to the remote.
  50 + */
  51 + override val cause: Throwable? = null,
  52 +
47 ) : Exception(message) { 53 ) : Exception(message) {
48 54
49 enum class BuiltinRpcError(val code: Int, val message: String) { 55 enum class BuiltinRpcError(val code: Int, val message: String) {
@@ -61,8 +67,8 @@ data class RpcError( @@ -61,8 +67,8 @@ data class RpcError(
61 UNSUPPORTED_VERSION(1404, "Unsupported RPC version"), 67 UNSUPPORTED_VERSION(1404, "Unsupported RPC version"),
62 ; 68 ;
63 69
64 - fun create(data: String = ""): RpcError {  
65 - return RpcError(code, message, data) 70 + fun create(data: String = "", cause: Throwable? = null): RpcError {
  71 + return RpcError(code, message, data, cause)
66 } 72 }
67 } 73 }
68 74
@@ -74,6 +74,7 @@ class DataChannelManager( @@ -74,6 +74,7 @@ class DataChannelManager(
74 return 74 return
75 } 75 }
76 disposed = true 76 disposed = true
  77 + bufferedAmount = 0
77 } 78 }
78 executeOnRTCThread { 79 executeOnRTCThread {
79 dataChannel.unregisterObserver() 80 dataChannel.unregisterObserver()
@@ -27,7 +27,6 @@ class MockDataChannel(private val label: String?) : DataChannel(1L) { @@ -27,7 +27,6 @@ class MockDataChannel(private val label: String?) : DataChannel(1L) {
27 } 27 }
28 28
29 override fun unregisterObserver() { 29 override fun unregisterObserver() {
30 - observer = null  
31 } 30 }
32 31
33 override fun label(): String? { 32 override fun label(): String? {
@@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
16 16
17 package io.livekit.android.test.mock.room.datastream.outgoing 17 package io.livekit.android.test.mock.room.datastream.outgoing
18 18
  19 +import io.livekit.android.room.datastream.StreamException
19 import io.livekit.android.room.datastream.outgoing.DataChunker 20 import io.livekit.android.room.datastream.outgoing.DataChunker
20 import io.livekit.android.room.datastream.outgoing.StreamDestination 21 import io.livekit.android.room.datastream.outgoing.StreamDestination
21 22
@@ -23,12 +24,18 @@ class MockStreamDestination<T>(val chunkSize: Int) : StreamDestination<T> { @@ -23,12 +24,18 @@ class MockStreamDestination<T>(val chunkSize: Int) : StreamDestination<T> {
23 override var isOpen: Boolean = true 24 override var isOpen: Boolean = true
24 25
25 val writtenChunks = mutableListOf<ByteArray>() 26 val writtenChunks = mutableListOf<ByteArray>()
26 - override suspend fun write(data: T, chunker: DataChunker<T>) { 27 + override suspend fun write(data: T, chunker: DataChunker<T>): Result<Unit> {
  28 + if (!isOpen) {
  29 + return Result.failure(StreamException.TerminatedException())
  30 + }
  31 +
27 val chunks = chunker.invoke(data, chunkSize) 32 val chunks = chunker.invoke(data, chunkSize)
28 33
29 for (chunk in chunks) { 34 for (chunk in chunks) {
30 writtenChunks.add(chunk) 35 writtenChunks.add(chunk)
31 } 36 }
  37 +
  38 + return Result.success(Unit)
32 } 39 }
33 40
34 override suspend fun close(reason: String?) { 41 override suspend fun close(reason: String?) {
1 /* 1 /*
2 - * Copyright 2023-2024 LiveKit, Inc. 2 + * Copyright 2023-2025 LiveKit, Inc.
3 * 3 *
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
@@ -35,8 +35,6 @@ import io.livekit.android.test.mock.createMediaStreamId @@ -35,8 +35,6 @@ import io.livekit.android.test.mock.createMediaStreamId
35 import io.livekit.android.test.mock.room.track.createMockLocalAudioTrack 35 import io.livekit.android.test.mock.room.track.createMockLocalAudioTrack
36 import io.livekit.android.util.flow 36 import io.livekit.android.util.flow
37 import io.livekit.android.util.toOkioByteString 37 import io.livekit.android.util.toOkioByteString
38 -import junit.framework.Assert.assertEquals  
39 -import junit.framework.Assert.assertNotNull  
40 import kotlinx.coroutines.CoroutineScope 38 import kotlinx.coroutines.CoroutineScope
41 import kotlinx.coroutines.Dispatchers 39 import kotlinx.coroutines.Dispatchers
42 import kotlinx.coroutines.ExperimentalCoroutinesApi 40 import kotlinx.coroutines.ExperimentalCoroutinesApi
@@ -44,6 +42,8 @@ import kotlinx.coroutines.SupervisorJob @@ -44,6 +42,8 @@ import kotlinx.coroutines.SupervisorJob
44 import kotlinx.coroutines.launch 42 import kotlinx.coroutines.launch
45 import livekit.LivekitRtc 43 import livekit.LivekitRtc
46 import org.junit.Assert 44 import org.junit.Assert
  45 +import org.junit.Assert.assertEquals
  46 +import org.junit.Assert.assertNotNull
47 import org.junit.Test 47 import org.junit.Test
48 import org.junit.runner.RunWith 48 import org.junit.runner.RunWith
49 import org.mockito.Mockito 49 import org.mockito.Mockito
@@ -25,10 +25,10 @@ import io.livekit.android.test.mock.MockPeerConnection @@ -25,10 +25,10 @@ import io.livekit.android.test.mock.MockPeerConnection
25 import io.livekit.android.test.mock.TestData 25 import io.livekit.android.test.mock.TestData
26 import io.livekit.android.util.toOkioByteString 26 import io.livekit.android.util.toOkioByteString
27 import kotlinx.coroutines.ExperimentalCoroutinesApi 27 import kotlinx.coroutines.ExperimentalCoroutinesApi
28 -import kotlinx.coroutines.launch  
29 import livekit.LivekitModels 28 import livekit.LivekitModels
30 import livekit.LivekitRtc 29 import livekit.LivekitRtc
31 import org.junit.Assert.assertEquals 30 import org.junit.Assert.assertEquals
  31 +import org.junit.Assert.assertFalse
32 import org.junit.Assert.assertTrue 32 import org.junit.Assert.assertTrue
33 import org.junit.Test 33 import org.junit.Test
34 34
@@ -58,22 +58,19 @@ class RoomOutgoingDataStreamMockE2ETest : MockE2ETest() { @@ -58,22 +58,19 @@ class RoomOutgoingDataStreamMockE2ETest : MockE2ETest() {
58 for (i in bytesToStream.indices) { 58 for (i in bytesToStream.indices) {
59 bytesToStream[i] = i.toByte() 59 bytesToStream[i] = i.toByte()
60 } 60 }
61 - val job = launch {  
62 - val sender = room.localParticipant.streamBytes(  
63 - StreamBytesOptions(  
64 - topic = "topic",  
65 - attributes = mapOf("hello" to "world"),  
66 - streamId = "stream_id",  
67 - destinationIdentities = listOf(Participant.Identity(TestData.REMOTE_PARTICIPANT.identity)),  
68 - name = "stream_name",  
69 - totalSize = bytesToStream.size.toLong(),  
70 - ),  
71 - )  
72 - sender.write(bytesToStream)  
73 - sender.close()  
74 - }  
75 -  
76 - job.join() 61 + val sender = room.localParticipant.streamBytes(
  62 + StreamBytesOptions(
  63 + topic = "topic",
  64 + attributes = mapOf("hello" to "world"),
  65 + streamId = "stream_id",
  66 + destinationIdentities = listOf(Participant.Identity(TestData.REMOTE_PARTICIPANT.identity)),
  67 + name = "stream_name",
  68 + totalSize = bytesToStream.size.toLong(),
  69 + ),
  70 + )
  71 + assertTrue(sender.write(bytesToStream).isSuccess)
  72 + sender.close()
  73 + assertFalse(sender.isOpen)
77 74
78 val buffers = pubDataChannel.sentBuffers 75 val buffers = pubDataChannel.sentBuffers
79 76
@@ -111,25 +108,23 @@ class RoomOutgoingDataStreamMockE2ETest : MockE2ETest() { @@ -111,25 +108,23 @@ class RoomOutgoingDataStreamMockE2ETest : MockE2ETest() {
111 ) 108 )
112 109
113 val text = "test_text" 110 val text = "test_text"
114 - val job = launch {  
115 - val sender = room.localParticipant.streamText(  
116 - StreamTextOptions(  
117 - topic = "topic",  
118 - attributes = mapOf("hello" to "world"),  
119 - streamId = "stream_id",  
120 - destinationIdentities = listOf(Participant.Identity(TestData.REMOTE_PARTICIPANT.identity)),  
121 - operationType = TextStreamInfo.OperationType.CREATE,  
122 - version = 0,  
123 - attachedStreamIds = emptyList(),  
124 - replyToStreamId = null,  
125 - totalSize = 3,  
126 - ),  
127 - )  
128 - sender.write(text)  
129 - sender.close()  
130 - } 111 + val sender = room.localParticipant.streamText(
  112 + StreamTextOptions(
  113 + topic = "topic",
  114 + attributes = mapOf("hello" to "world"),
  115 + streamId = "stream_id",
  116 + destinationIdentities = listOf(Participant.Identity(TestData.REMOTE_PARTICIPANT.identity)),
  117 + operationType = TextStreamInfo.OperationType.CREATE,
  118 + version = 0,
  119 + attachedStreamIds = emptyList(),
  120 + replyToStreamId = null,
  121 + totalSize = 3,
  122 + ),
  123 + )
  124 + assertTrue(sender.write(text).isSuccess)
  125 + sender.close()
131 126
132 - job.join() 127 + assertFalse(sender.isOpen)
133 128
134 val buffers = pubDataChannel.sentBuffers 129 val buffers = pubDataChannel.sentBuffers
135 130
@@ -19,12 +19,14 @@ package io.livekit.android.room.datastream.outgoing @@ -19,12 +19,14 @@ package io.livekit.android.room.datastream.outgoing
19 import io.livekit.android.room.datastream.ByteStreamInfo 19 import io.livekit.android.room.datastream.ByteStreamInfo
20 import io.livekit.android.test.BaseTest 20 import io.livekit.android.test.BaseTest
21 import io.livekit.android.test.mock.room.datastream.outgoing.MockStreamDestination 21 import io.livekit.android.test.mock.room.datastream.outgoing.MockStreamDestination
22 -import kotlinx.coroutines.launch 22 +import kotlinx.coroutines.ExperimentalCoroutinesApi
23 import org.junit.Assert.assertEquals 23 import org.junit.Assert.assertEquals
24 import org.junit.Assert.assertFalse 24 import org.junit.Assert.assertFalse
  25 +import org.junit.Assert.assertTrue
25 import org.junit.Test 26 import org.junit.Test
26 import kotlin.math.roundToInt 27 import kotlin.math.roundToInt
27 28
  29 +@OptIn(ExperimentalCoroutinesApi::class)
28 class ByteStreamSenderTest : BaseTest() { 30 class ByteStreamSenderTest : BaseTest() {
29 31
30 companion object { 32 companion object {
@@ -39,12 +41,9 @@ class ByteStreamSenderTest : BaseTest() { @@ -39,12 +41,9 @@ class ByteStreamSenderTest : BaseTest() {
39 destination = destination, 41 destination = destination,
40 ) 42 )
41 43
42 - val job = launch {  
43 - sender.write(ByteArray(100))  
44 - sender.close()  
45 - }  
46 -  
47 - job.join() 44 + val result = sender.write(ByteArray(100))
  45 + assertTrue(result.isSuccess)
  46 + sender.close()
48 47
49 assertFalse(destination.isOpen) 48 assertFalse(destination.isOpen)
50 assertEquals(1, destination.writtenChunks.size) 49 assertEquals(1, destination.writtenChunks.size)
@@ -61,12 +60,9 @@ class ByteStreamSenderTest : BaseTest() { @@ -61,12 +60,9 @@ class ByteStreamSenderTest : BaseTest() {
61 60
62 val bytes = ByteArray((CHUNK_SIZE * 1.5).roundToInt()) 61 val bytes = ByteArray((CHUNK_SIZE * 1.5).roundToInt())
63 62
64 - val job = launch {  
65 - sender.write(bytes)  
66 - sender.close()  
67 - }  
68 -  
69 - job.join() 63 + val result = sender.write(bytes)
  64 + assertTrue(result.isSuccess)
  65 + sender.close()
70 66
71 assertFalse(destination.isOpen) 67 assertFalse(destination.isOpen)
72 assertEquals(2, destination.writtenChunks.size) 68 assertEquals(2, destination.writtenChunks.size)
@@ -74,5 +70,18 @@ class ByteStreamSenderTest : BaseTest() { @@ -74,5 +70,18 @@ class ByteStreamSenderTest : BaseTest() {
74 assertEquals(bytes.size - CHUNK_SIZE, destination.writtenChunks[1].size) 70 assertEquals(bytes.size - CHUNK_SIZE, destination.writtenChunks[1].size)
75 } 71 }
76 72
  73 + @Test
  74 + fun writeFailsAfterClose() = runTest {
  75 + val destination = MockStreamDestination<ByteArray>(CHUNK_SIZE)
  76 + val sender = ByteStreamSender(
  77 + info = createInfo(),
  78 + destination = destination,
  79 + )
  80 +
  81 + assertTrue(sender.write(ByteArray(100)).isSuccess)
  82 + sender.close()
  83 + assertTrue(sender.write(ByteArray(100)).isFailure)
  84 + }
  85 +
77 fun createInfo(): ByteStreamInfo = ByteStreamInfo(id = "stream_id", topic = "topic", timestampMs = 0, totalSize = null, attributes = mapOf(), mimeType = "", name = null) 86 fun createInfo(): ByteStreamInfo = ByteStreamInfo(id = "stream_id", topic = "topic", timestampMs = 0, totalSize = null, attributes = mapOf(), mimeType = "", name = null)
78 } 87 }
@@ -19,12 +19,14 @@ package io.livekit.android.room.datastream.outgoing @@ -19,12 +19,14 @@ package io.livekit.android.room.datastream.outgoing
19 import io.livekit.android.room.datastream.TextStreamInfo 19 import io.livekit.android.room.datastream.TextStreamInfo
20 import io.livekit.android.test.BaseTest 20 import io.livekit.android.test.BaseTest
21 import io.livekit.android.test.mock.room.datastream.outgoing.MockStreamDestination 21 import io.livekit.android.test.mock.room.datastream.outgoing.MockStreamDestination
22 -import kotlinx.coroutines.launch 22 +import kotlinx.coroutines.ExperimentalCoroutinesApi
23 import org.junit.Assert.assertEquals 23 import org.junit.Assert.assertEquals
24 import org.junit.Assert.assertFalse 24 import org.junit.Assert.assertFalse
25 import org.junit.Assert.assertNotEquals 25 import org.junit.Assert.assertNotEquals
  26 +import org.junit.Assert.assertTrue
26 import org.junit.Test 27 import org.junit.Test
27 28
  29 +@OptIn(ExperimentalCoroutinesApi::class)
28 class TextStreamSenderTest : BaseTest() { 30 class TextStreamSenderTest : BaseTest() {
29 31
30 companion object { 32 companion object {
@@ -40,12 +42,9 @@ class TextStreamSenderTest : BaseTest() { @@ -40,12 +42,9 @@ class TextStreamSenderTest : BaseTest() {
40 ) 42 )
41 43
42 val text = "abcdefghi" 44 val text = "abcdefghi"
43 - val job = launch {  
44 - sender.write(text)  
45 - sender.close()  
46 - }  
47 -  
48 - job.join() 45 + val result = sender.write(text)
  46 + assertTrue(result.isSuccess)
  47 + sender.close()
49 48
50 assertFalse(destination.isOpen) 49 assertFalse(destination.isOpen)
51 assertEquals(1, destination.writtenChunks.size) 50 assertEquals(1, destination.writtenChunks.size)
@@ -67,12 +66,9 @@ class TextStreamSenderTest : BaseTest() { @@ -67,12 +66,9 @@ class TextStreamSenderTest : BaseTest() {
67 toString() 66 toString()
68 } 67 }
69 68
70 - val job = launch {  
71 - sender.write(text)  
72 - sender.close()  
73 - }  
74 -  
75 - job.join() 69 + val result = sender.write(text)
  70 + assertTrue(result.isSuccess)
  71 + sender.close()
76 72
77 assertFalse(destination.isOpen) 73 assertFalse(destination.isOpen)
78 assertNotEquals(1, destination.writtenChunks.size) 74 assertNotEquals(1, destination.writtenChunks.size)
@@ -87,6 +83,25 @@ class TextStreamSenderTest : BaseTest() { @@ -87,6 +83,25 @@ class TextStreamSenderTest : BaseTest() {
87 assertEquals(text, writtenString) 83 assertEquals(text, writtenString)
88 } 84 }
89 85
  86 + @Test
  87 + fun writeFailsAfterClose() = runTest {
  88 + val destination = MockStreamDestination<String>(CHUNK_SIZE)
  89 + val sender = TextStreamSender(
  90 + info = createInfo(),
  91 + destination = destination,
  92 + )
  93 +
  94 + val text = "abcdefghi"
  95 + assertTrue(sender.write(text).isSuccess)
  96 + sender.close()
  97 +
  98 + assertTrue(sender.write(text).isFailure)
  99 +
  100 + assertFalse(destination.isOpen)
  101 + assertEquals(1, destination.writtenChunks.size)
  102 + assertEquals(text, destination.writtenChunks[0].decodeToString())
  103 + }
  104 +
90 fun createInfo(): TextStreamInfo = TextStreamInfo( 105 fun createInfo(): TextStreamInfo = TextStreamInfo(
91 id = "stream_id", 106 id = "stream_id",
92 topic = "topic", 107 topic = "topic",