davidliu
Committed by GitHub

Return streamInfo from datastream send helper methods (#741)

* Return streamInfo from datastream send helper methods

* changeset
  1 +---
  2 +"client-sdk-android": minor
  3 +---
  4 +
  5 +Return streamInfo from datastream send helper methods
@@ -60,32 +60,36 @@ interface OutgoingDataStreamManager { @@ -60,32 +60,36 @@ interface OutgoingDataStreamManager {
60 * Send text through a data stream. 60 * Send text through a data stream.
61 */ 61 */
62 @CheckResult 62 @CheckResult
63 - suspend fun sendText(text: String, options: StreamTextOptions = StreamTextOptions()): Result<Unit> { 63 + suspend fun sendText(text: String, options: StreamTextOptions = StreamTextOptions()): Result<TextStreamInfo> {
64 val sender = streamText(options) 64 val sender = streamText(options)
65 val result = sender.write(text) 65 val result = sender.write(text)
66 66
67 if (result.isFailure) { 67 if (result.isFailure) {
68 - sender.close(result.exceptionOrNull()?.message ?: "Unknown error.") 68 + val exception = result.exceptionOrNull() ?: Exception("Unknown error.")
  69 + sender.close(exception.message)
  70 + return Result.failure(exception)
69 } else { 71 } else {
70 sender.close() 72 sender.close()
  73 + return Result.success(sender.info)
71 } 74 }
72 - return result  
73 } 75 }
74 76
75 /** 77 /**
76 * Send a file through a data stream. 78 * Send a file through a data stream.
77 */ 79 */
78 @CheckResult 80 @CheckResult
79 - suspend fun sendFile(file: File, options: StreamBytesOptions = StreamBytesOptions()): Result<Unit> { 81 + suspend fun sendFile(file: File, options: StreamBytesOptions = StreamBytesOptions()): Result<ByteStreamInfo> {
80 val sender = streamBytes(options) 82 val sender = streamBytes(options)
81 val result = sender.writeFile(file) 83 val result = sender.writeFile(file)
82 84
83 if (result.isFailure) { 85 if (result.isFailure) {
84 - sender.close(result.exceptionOrNull()?.message ?: "Unknown error.") 86 + val exception = result.exceptionOrNull() ?: Exception("Unknown error.")
  87 + sender.close(exception.message)
  88 + return Result.failure(exception)
85 } else { 89 } else {
86 sender.close() 90 sender.close()
  91 + return Result.success(sender.info)
87 } 92 }
88 - return result  
89 } 93 }
90 } 94 }
91 95
@@ -108,6 +108,96 @@ class RoomIncomingDataStreamMockE2ETest : MockE2ETest() { @@ -108,6 +108,96 @@ class RoomIncomingDataStreamMockE2ETest : MockE2ETest() {
108 } 108 }
109 109
110 @Test 110 @Test
  111 + fun textStreamThroughReadNext() = runTest {
  112 + connect()
  113 + val subPeerConnection = component.rtcEngine().getSubscriberPeerConnection() as MockPeerConnection
  114 + val subDataChannel = MockDataChannel(RTCEngine.RELIABLE_DATA_CHANNEL_LABEL)
  115 + subPeerConnection.observer?.onDataChannel(subDataChannel)
  116 +
  117 + val scope = CoroutineScope(currentCoroutineContext())
  118 + val collectedData = mutableListOf<String>()
  119 + var finished = false
  120 + room.registerTextStreamHandler("topic") { reader, _ ->
  121 + scope.launch {
  122 + try {
  123 + while (true) {
  124 + collectedData.add(reader.readNext())
  125 + }
  126 + } catch (exception: NoSuchElementException) {
  127 + // end of stream
  128 + }
  129 + finished = true
  130 + }
  131 + }
  132 +
  133 + val textStreamHeader = with(createStreamHeader().toBuilder()) {
  134 + streamHeader = with(streamHeader.toBuilder()) {
  135 + clearByteHeader()
  136 + textHeader = with(TextHeader.newBuilder()) {
  137 + operationType = OperationType.CREATE
  138 + generated = false
  139 + build()
  140 + }
  141 + build()
  142 + }
  143 + build()
  144 + }
  145 + subDataChannel.observer?.onMessage(textStreamHeader.wrap())
  146 + subDataChannel.observer?.onMessage(createStreamChunk(0, "hello".toByteArray()).wrap())
  147 + subDataChannel.observer?.onMessage(createStreamChunk(1, "world".toByteArray()).wrap())
  148 + subDataChannel.observer?.onMessage(createStreamChunk(2, "!".toByteArray()).wrap())
  149 + subDataChannel.observer?.onMessage(createStreamTrailer().wrap())
  150 +
  151 + assertTrue(finished)
  152 + assertEquals(3, collectedData.size)
  153 + assertEquals("hello", collectedData[0])
  154 + assertEquals("world", collectedData[1])
  155 + assertEquals("!", collectedData[2])
  156 + }
  157 +
  158 + @Test
  159 + fun textStreamThroughReadAll() = runTest {
  160 + connect()
  161 + val subPeerConnection = component.rtcEngine().getSubscriberPeerConnection() as MockPeerConnection
  162 + val subDataChannel = MockDataChannel(RTCEngine.RELIABLE_DATA_CHANNEL_LABEL)
  163 + subPeerConnection.observer?.onDataChannel(subDataChannel)
  164 +
  165 + val scope = CoroutineScope(currentCoroutineContext())
  166 + val collectedData = mutableListOf<String>()
  167 + var finished = false
  168 + room.registerTextStreamHandler("topic") { reader, _ ->
  169 + scope.launch {
  170 + collectedData.addAll(reader.readAll())
  171 + finished = true
  172 + }
  173 + }
  174 +
  175 + val textStreamHeader = with(createStreamHeader().toBuilder()) {
  176 + streamHeader = with(streamHeader.toBuilder()) {
  177 + clearByteHeader()
  178 + textHeader = with(TextHeader.newBuilder()) {
  179 + operationType = OperationType.CREATE
  180 + generated = false
  181 + build()
  182 + }
  183 + build()
  184 + }
  185 + build()
  186 + }
  187 + subDataChannel.observer?.onMessage(textStreamHeader.wrap())
  188 + subDataChannel.observer?.onMessage(createStreamChunk(0, "hello".toByteArray()).wrap())
  189 + subDataChannel.observer?.onMessage(createStreamChunk(1, "world".toByteArray()).wrap())
  190 + subDataChannel.observer?.onMessage(createStreamChunk(2, "!".toByteArray()).wrap())
  191 + subDataChannel.observer?.onMessage(createStreamTrailer().wrap())
  192 +
  193 + assertTrue(finished)
  194 + assertEquals(3, collectedData.size)
  195 + assertEquals("hello", collectedData[0])
  196 + assertEquals("world", collectedData[1])
  197 + assertEquals("!", collectedData[2])
  198 + }
  199 +
  200 + @Test
111 fun dataStreamTerminated() = runTest { 201 fun dataStreamTerminated() = runTest {
112 connect() 202 connect()
113 val subPeerConnection = component.rtcEngine().getSubscriberPeerConnection() as MockPeerConnection 203 val subPeerConnection = component.rtcEngine().getSubscriberPeerConnection() as MockPeerConnection