davidliu
Committed by GitHub

adaptive stream connection param (#74)

@@ -3,6 +3,7 @@ package io.livekit.android.room @@ -3,6 +3,7 @@ package io.livekit.android.room
3 import android.os.SystemClock 3 import android.os.SystemClock
4 import com.google.protobuf.ByteString 4 import com.google.protobuf.ByteString
5 import io.livekit.android.ConnectOptions 5 import io.livekit.android.ConnectOptions
  6 +import io.livekit.android.RoomOptions
6 import io.livekit.android.dagger.InjectionNames 7 import io.livekit.android.dagger.InjectionNames
7 import io.livekit.android.room.participant.ParticipantTrackPermission 8 import io.livekit.android.room.participant.ParticipantTrackPermission
8 import io.livekit.android.room.track.TrackException 9 import io.livekit.android.room.track.TrackException
@@ -80,6 +81,7 @@ internal constructor( @@ -80,6 +81,7 @@ internal constructor(
80 private var sessionUrl: String? = null 81 private var sessionUrl: String? = null
81 private var sessionToken: String? = null 82 private var sessionToken: String? = null
82 private var connectOptions: ConnectOptions? = null 83 private var connectOptions: ConnectOptions? = null
  84 + private var lastRoomOptions: RoomOptions? = null
83 85
84 private val publisherObserver = PublisherTransportObserver(this, client) 86 private val publisherObserver = PublisherTransportObserver(this, client)
85 private val subscriberObserver = SubscriberTransportObserver(this, client) 87 private val subscriberObserver = SubscriberTransportObserver(this, client)
@@ -113,16 +115,28 @@ internal constructor( @@ -113,16 +115,28 @@ internal constructor(
113 client.listener = this 115 client.listener = this
114 } 116 }
115 117
116 - suspend fun join(url: String, token: String, options: ConnectOptions): LivekitRtc.JoinResponse { 118 + suspend fun join(
  119 + url: String,
  120 + token: String,
  121 + options: ConnectOptions,
  122 + roomOptions: RoomOptions
  123 + ): LivekitRtc.JoinResponse {
117 coroutineScope.close() 124 coroutineScope.close()
118 coroutineScope = CloseableCoroutineScope(SupervisorJob() + ioDispatcher) 125 coroutineScope = CloseableCoroutineScope(SupervisorJob() + ioDispatcher)
119 sessionUrl = url 126 sessionUrl = url
120 sessionToken = token 127 sessionToken = token
121 - return joinImpl(url, token, options)  
122 - }  
123 -  
124 - suspend fun joinImpl(url: String, token: String, options: ConnectOptions): LivekitRtc.JoinResponse {  
125 - val joinResponse = client.join(url, token, options) 128 + connectOptions = options
  129 + lastRoomOptions = roomOptions
  130 + return joinImpl(url, token, options, roomOptions)
  131 + }
  132 +
  133 + suspend fun joinImpl(
  134 + url: String,
  135 + token: String,
  136 + options: ConnectOptions,
  137 + roomOptions: RoomOptions
  138 + ): LivekitRtc.JoinResponse {
  139 + val joinResponse = client.join(url, token, options, roomOptions)
126 listener?.onJoinResponse(joinResponse) 140 listener?.onJoinResponse(joinResponse)
127 isClosed = false 141 isClosed = false
128 listener?.onSignalConnected(false) 142 listener?.onSignalConnected(false)
@@ -281,6 +295,7 @@ internal constructor( @@ -281,6 +295,7 @@ internal constructor(
281 sessionUrl = null 295 sessionUrl = null
282 sessionToken = null 296 sessionToken = null
283 connectOptions = null 297 connectOptions = null
  298 + lastRoomOptions = null
284 reconnectingJob?.cancel() 299 reconnectingJob?.cancel()
285 reconnectingJob = null 300 reconnectingJob = null
286 coroutineScope.close() 301 coroutineScope.close()
@@ -343,7 +358,7 @@ internal constructor( @@ -343,7 +358,7 @@ internal constructor(
343 try { 358 try {
344 closeResources() 359 closeResources()
345 listener?.onFullReconnecting() 360 listener?.onFullReconnecting()
346 - joinImpl(url, token, connectOptions ?: ConnectOptions()) 361 + joinImpl(url, token, connectOptions ?: ConnectOptions(), lastRoomOptions ?: RoomOptions())
347 } catch (e: Exception) { 362 } catch (e: Exception) {
348 LKLog.w(e) { "Error during reconnection." } 363 LKLog.w(e) { "Error during reconnection." }
349 // reconnect failed, retry. 364 // reconnect failed, retry.
@@ -11,6 +11,7 @@ import dagger.assisted.Assisted @@ -11,6 +11,7 @@ import dagger.assisted.Assisted
11 import dagger.assisted.AssistedFactory 11 import dagger.assisted.AssistedFactory
12 import dagger.assisted.AssistedInject 12 import dagger.assisted.AssistedInject
13 import io.livekit.android.ConnectOptions 13 import io.livekit.android.ConnectOptions
  14 +import io.livekit.android.RoomOptions
14 import io.livekit.android.Version 15 import io.livekit.android.Version
15 import io.livekit.android.dagger.InjectionNames 16 import io.livekit.android.dagger.InjectionNames
16 import io.livekit.android.events.BroadcastEventBus 17 import io.livekit.android.events.BroadcastEventBus
@@ -148,6 +149,15 @@ constructor( @@ -148,6 +149,15 @@ constructor(
148 private var hasLostConnectivity: Boolean = false 149 private var hasLostConnectivity: Boolean = false
149 private var connectOptions: ConnectOptions = ConnectOptions() 150 private var connectOptions: ConnectOptions = ConnectOptions()
150 151
  152 + private fun getCurrentRoomOptions(): RoomOptions =
  153 + RoomOptions(
  154 + adaptiveStream = adaptiveStream,
  155 + dynacast = dynacast,
  156 + audioTrackCaptureDefaults = audioTrackCaptureDefaults,
  157 + videoTrackCaptureDefaults = videoTrackCaptureDefaults,
  158 + audioTrackPublishDefaults = audioTrackPublishDefaults,
  159 + videoTrackPublishDefaults = videoTrackPublishDefaults,
  160 + )
151 161
152 suspend fun connect(url: String, token: String, options: ConnectOptions = ConnectOptions()) { 162 suspend fun connect(url: String, token: String, options: ConnectOptions = ConnectOptions()) {
153 if (this::coroutineScope.isInitialized) { 163 if (this::coroutineScope.isInitialized) {
@@ -156,7 +166,7 @@ constructor( @@ -156,7 +166,7 @@ constructor(
156 coroutineScope = CoroutineScope(defaultDispatcher + SupervisorJob()) 166 coroutineScope = CoroutineScope(defaultDispatcher + SupervisorJob())
157 state = State.CONNECTING 167 state = State.CONNECTING
158 connectOptions = options 168 connectOptions = options
159 - engine.join(url, token, options) 169 + engine.join(url, token, options, getCurrentRoomOptions())
160 170
161 val cm = context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager 171 val cm = context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager
162 val networkRequest = NetworkRequest.Builder() 172 val networkRequest = NetworkRequest.Builder()
@@ -2,6 +2,7 @@ package io.livekit.android.room @@ -2,6 +2,7 @@ package io.livekit.android.room
2 2
3 import com.vdurmont.semver4j.Semver 3 import com.vdurmont.semver4j.Semver
4 import io.livekit.android.ConnectOptions 4 import io.livekit.android.ConnectOptions
  5 +import io.livekit.android.RoomOptions
5 import io.livekit.android.dagger.InjectionNames 6 import io.livekit.android.dagger.InjectionNames
6 import io.livekit.android.room.participant.ParticipantTrackPermission 7 import io.livekit.android.room.participant.ParticipantTrackPermission
7 import io.livekit.android.room.track.Track 8 import io.livekit.android.room.track.Track
@@ -49,6 +50,8 @@ constructor( @@ -49,6 +50,8 @@ constructor(
49 var listener: Listener? = null 50 var listener: Listener? = null
50 private var serverVersion: Semver? = null 51 private var serverVersion: Semver? = null
51 private var lastUrl: String? = null 52 private var lastUrl: String? = null
  53 + private var lastOptions: ConnectOptions? = null
  54 + private var lastRoomOptions: RoomOptions? = null
52 55
53 private var joinContinuation: CancellableContinuation<Either<LivekitRtc.JoinResponse, Unit>>? = null 56 private var joinContinuation: CancellableContinuation<Either<LivekitRtc.JoinResponse, Unit>>? = null
54 private lateinit var coroutineScope: CloseableCoroutineScope 57 private lateinit var coroutineScope: CloseableCoroutineScope
@@ -59,6 +62,7 @@ constructor( @@ -59,6 +62,7 @@ constructor(
59 62
60 private val responseFlow = MutableSharedFlow<LivekitRtc.SignalResponse>(Int.MAX_VALUE) 63 private val responseFlow = MutableSharedFlow<LivekitRtc.SignalResponse>(Int.MAX_VALUE)
61 64
  65 +
62 /** 66 /**
63 * @throws Exception if fails to connect. 67 * @throws Exception if fails to connect.
64 */ 68 */
@@ -66,8 +70,9 @@ constructor( @@ -66,8 +70,9 @@ constructor(
66 url: String, 70 url: String,
67 token: String, 71 token: String,
68 options: ConnectOptions = ConnectOptions(), 72 options: ConnectOptions = ConnectOptions(),
  73 + roomOptions: RoomOptions = RoomOptions(),
69 ): LivekitRtc.JoinResponse { 74 ): LivekitRtc.JoinResponse {
70 - val joinResponse = connect(url, token, options) 75 + val joinResponse = connect(url, token, options, roomOptions)
71 return (joinResponse as Either.Left).value 76 return (joinResponse as Either.Left).value
72 } 77 }
73 78
@@ -78,26 +83,30 @@ constructor( @@ -78,26 +83,30 @@ constructor(
78 connect( 83 connect(
79 url, 84 url,
80 token, 85 token,
81 - ConnectOptions()  
82 - .apply { reconnect = true } 86 + (lastOptions ?: ConnectOptions()).copy()
  87 + .apply { reconnect = true },
  88 + lastRoomOptions ?: RoomOptions()
83 ) 89 )
84 } 90 }
85 91
86 suspend fun connect( 92 suspend fun connect(
87 url: String, 93 url: String,
88 token: String, 94 token: String,
89 - options: ConnectOptions 95 + options: ConnectOptions,
  96 + roomOptions: RoomOptions
90 ): Either<LivekitRtc.JoinResponse, Unit> { 97 ): Either<LivekitRtc.JoinResponse, Unit> {
91 // Clean up any pre-existing connection. 98 // Clean up any pre-existing connection.
92 close() 99 close()
93 100
94 - val wsUrlString = "$url/rtc" + createConnectionParams(token, getClientInfo(), options) 101 + val wsUrlString = "$url/rtc" + createConnectionParams(token, getClientInfo(), options, roomOptions)
95 isReconnecting = options.reconnect 102 isReconnecting = options.reconnect
96 103
97 LKLog.i { "connecting to $wsUrlString" } 104 LKLog.i { "connecting to $wsUrlString" }
98 105
99 coroutineScope = CloseableCoroutineScope(SupervisorJob() + ioDispatcher) 106 coroutineScope = CloseableCoroutineScope(SupervisorJob() + ioDispatcher)
100 lastUrl = wsUrlString 107 lastUrl = wsUrlString
  108 + lastOptions = options
  109 + lastRoomOptions = roomOptions
101 110
102 val request = Request.Builder() 111 val request = Request.Builder()
103 .url(wsUrlString) 112 .url(wsUrlString)
@@ -113,7 +122,8 @@ constructor( @@ -113,7 +122,8 @@ constructor(
113 private fun createConnectionParams( 122 private fun createConnectionParams(
114 token: String, 123 token: String,
115 clientInfo: LivekitModels.ClientInfo, 124 clientInfo: LivekitModels.ClientInfo,
116 - options: ConnectOptions 125 + options: ConnectOptions,
  126 + roomOptions: RoomOptions
117 ): String { 127 ): String {
118 128
119 val queryParams = mutableListOf<Pair<String, String>>() 129 val queryParams = mutableListOf<Pair<String, String>>()
@@ -123,9 +133,12 @@ constructor( @@ -123,9 +133,12 @@ constructor(
123 queryParams.add(CONNECT_QUERY_RECONNECT to 1.toString()) 133 queryParams.add(CONNECT_QUERY_RECONNECT to 1.toString())
124 } 134 }
125 135
126 - val autoSubscribe = if(options.autoSubscribe) 1 else 0 136 + val autoSubscribe = if (options.autoSubscribe) 1 else 0
127 queryParams.add(CONNECT_QUERY_AUTOSUBSCRIBE to autoSubscribe.toString()) 137 queryParams.add(CONNECT_QUERY_AUTOSUBSCRIBE to autoSubscribe.toString())
128 138
  139 + val adaptiveStream = if (roomOptions.adaptiveStream) 1 else 0
  140 + queryParams.add(CONNECT_QUERY_ADAPTIVE_STREAM to adaptiveStream.toString())
  141 +
129 // Client info 142 // Client info
130 queryParams.add(CONNECT_QUERY_SDK to "android") 143 queryParams.add(CONNECT_QUERY_SDK to "android")
131 queryParams.add(CONNECT_QUERY_VERSION to clientInfo.version) 144 queryParams.add(CONNECT_QUERY_VERSION to clientInfo.version)
@@ -551,6 +564,9 @@ constructor( @@ -551,6 +564,9 @@ constructor(
551 currentWs = null 564 currentWs = null
552 joinContinuation?.cancel() 565 joinContinuation?.cancel()
553 joinContinuation = null 566 joinContinuation = null
  567 + lastUrl = null
  568 + lastOptions = null
  569 + lastRoomOptions = null
554 } 570 }
555 571
556 interface Listener { 572 interface Listener {
@@ -577,6 +593,7 @@ constructor( @@ -577,6 +593,7 @@ constructor(
577 const val CONNECT_QUERY_TOKEN = "access_token" 593 const val CONNECT_QUERY_TOKEN = "access_token"
578 const val CONNECT_QUERY_RECONNECT = "reconnect" 594 const val CONNECT_QUERY_RECONNECT = "reconnect"
579 const val CONNECT_QUERY_AUTOSUBSCRIBE = "auto_subscribe" 595 const val CONNECT_QUERY_AUTOSUBSCRIBE = "auto_subscribe"
  596 + const val CONNECT_QUERY_ADAPTIVE_STREAM = "adaptive_stream"
580 const val CONNECT_QUERY_SDK = "sdk" 597 const val CONNECT_QUERY_SDK = "sdk"
581 const val CONNECT_QUERY_VERSION = "version" 598 const val CONNECT_QUERY_VERSION = "version"
582 const val CONNECT_QUERY_PROTOCOL = "protocol" 599 const val CONNECT_QUERY_PROTOCOL = "protocol"
@@ -22,7 +22,6 @@ import org.mockito.junit.MockitoJUnit @@ -22,7 +22,6 @@ import org.mockito.junit.MockitoJUnit
22 import org.mockito.kotlin.* 22 import org.mockito.kotlin.*
23 import org.robolectric.RobolectricTestRunner 23 import org.robolectric.RobolectricTestRunner
24 import org.webrtc.EglBase 24 import org.webrtc.EglBase
25 -import java.lang.Exception  
26 25
27 @ExperimentalCoroutinesApi 26 @ExperimentalCoroutinesApi
28 @RunWith(RobolectricTestRunner::class) 27 @RunWith(RobolectricTestRunner::class)
@@ -65,7 +64,7 @@ class RoomTest { @@ -65,7 +64,7 @@ class RoomTest {
65 64
66 suspend fun connect() { 65 suspend fun connect() {
67 rtcEngine.stub { 66 rtcEngine.stub {
68 - onBlocking { rtcEngine.join(any(), any(), anyOrNull()) } 67 + onBlocking { rtcEngine.join(any(), any(), anyOrNull(), anyOrNull()) }
69 .doSuspendableAnswer { 68 .doSuspendableAnswer {
70 room.onJoinResponse(SignalClientTest.JOIN.join) 69 room.onJoinResponse(SignalClientTest.JOIN.join)
71 SignalClientTest.JOIN.join 70 SignalClientTest.JOIN.join
1 -Subproject commit 3c712ad5c941c0d2ddb5631c44239fbe525c0391 1 +Subproject commit e3f22408016f5ef825b6a03e4d36a5f977a05745