David Liu

Peer transports and observers

1 package io.livekit.android.room 1 package io.livekit.android.room
2 2
3 import dagger.assisted.Assisted 3 import dagger.assisted.Assisted
  4 +import dagger.assisted.AssistedFactory
4 import dagger.assisted.AssistedInject 5 import dagger.assisted.AssistedInject
5 import org.webrtc.* 6 import org.webrtc.*
6 7
7 class PeerConnectionTransport 8 class PeerConnectionTransport
8 @AssistedInject 9 @AssistedInject
9 constructor( 10 constructor(
10 - config: PeerConnection.RTCConfiguration,  
11 - listener: PeerConnection.Observer,  
12 - @Assisted connectionFactory: PeerConnectionFactory 11 + @Assisted config: PeerConnection.RTCConfiguration,
  12 + @Assisted listener: PeerConnection.Observer,
  13 + connectionFactory: PeerConnectionFactory
13 ) { 14 ) {
14 val peerConnection: PeerConnection = connectionFactory.createPeerConnection( 15 val peerConnection: PeerConnection = connectionFactory.createPeerConnection(
15 config, 16 config,
@@ -49,4 +50,12 @@ constructor( @@ -49,4 +50,12 @@ constructor(
49 fun close() { 50 fun close() {
50 peerConnection.close() 51 peerConnection.close()
51 } 52 }
  53 +
  54 + @AssistedFactory
  55 + interface Factory {
  56 + fun create(
  57 + config: PeerConnection.RTCConfiguration,
  58 + listener: PeerConnection.Observer
  59 + ): PeerConnectionTransport
  60 + }
52 } 61 }
  1 +package io.livekit.android.room
  2 +
  3 +import livekit.Rtc
  4 +import org.webrtc.*
  5 +
  6 +class PublisherTransportObserver(
  7 + private val engine: RTCEngine
  8 +) : PeerConnection.Observer {
  9 +
  10 + override fun onIceCandidate(candidate: IceCandidate?) {
  11 + val candidate = candidate ?: return
  12 + if (engine.rtcConnected) {
  13 + engine.client.sendCandidate(candidate, target = Rtc.SignalTarget.PUBLISHER)
  14 + } else {
  15 + engine.pendingCandidates.add(candidate)
  16 + }
  17 + }
  18 +
  19 + override fun onRenegotiationNeeded() {
  20 + engine.negotiate()
  21 + }
  22 +
  23 + override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) {
  24 + val newState = newState ?: throw NullPointerException("unexpected null new state, what do?")
  25 + if (newState == PeerConnection.IceConnectionState.CONNECTED && !engine.iceConnected) {
  26 + engine.iceConnected = true
  27 + } else if (newState == PeerConnection.IceConnectionState.DISCONNECTED) {
  28 + engine.iceConnected = false
  29 + engine.listener?.onDisconnect("Peer connection disconnected")
  30 + }
  31 + }
  32 +
  33 + override fun onStandardizedIceConnectionChange(newState: PeerConnection.IceConnectionState?) {
  34 + }
  35 +
  36 + override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) {
  37 + }
  38 +
  39 + override fun onSelectedCandidatePairChanged(event: CandidatePairChangeEvent?) {
  40 + }
  41 +
  42 +
  43 + override fun onSignalingChange(p0: PeerConnection.SignalingState?) {
  44 + }
  45 +
  46 + override fun onIceConnectionReceivingChange(p0: Boolean) {
  47 + }
  48 +
  49 + override fun onIceGatheringChange(p0: PeerConnection.IceGatheringState?) {
  50 + }
  51 +
  52 + override fun onIceCandidatesRemoved(p0: Array<out IceCandidate>?) {
  53 + }
  54 +
  55 + override fun onAddStream(p0: MediaStream?) {
  56 + }
  57 +
  58 + override fun onRemoveStream(p0: MediaStream?) {
  59 + }
  60 +
  61 + override fun onDataChannel(p0: DataChannel?) {
  62 + }
  63 +
  64 + override fun onTrack(transceiver: RtpTransceiver?) {
  65 + }
  66 +
  67 + override fun onAddTrack(p0: RtpReceiver?, p1: Array<out MediaStream>?) {
  68 + }
  69 +}
1 package io.livekit.android.room 1 package io.livekit.android.room
2 2
3 -import org.webrtc.PeerConnection 3 +import livekit.Model
  4 +import livekit.Rtc
  5 +import org.webrtc.*
4 import javax.inject.Inject 6 import javax.inject.Inject
5 7
6 class RTCEngine 8 class RTCEngine
7 @Inject 9 @Inject
8 constructor( 10 constructor(
9 - private val client: RTCClient 11 + val client: RTCClient,
  12 + pctFactory: PeerConnectionTransport.Factory,
10 ) { 13 ) {
11 14
  15 + var listener: Listener? = null
  16 + var rtcConnected: Boolean = false
  17 + var joinResponse: Rtc.JoinResponse? = null
  18 + var iceConnected: Boolean = false
  19 + set(value) {
  20 + field = value
  21 + if (field) {
  22 + listener?.onJoin(joinResponse)
  23 + joinResponse = null
  24 + }
  25 + }
  26 + val pendingCandidates = mutableListOf<IceCandidate>()
  27 +
  28 + private val publisherObserver = PublisherTransportObserver(this)
  29 + private val subscriberObserver = SubscriberTransportObserver(this)
  30 + private val publisher: PeerConnectionTransport
  31 + private val subscriber: PeerConnectionTransport
  32 +
12 init { 33 init {
13 val rtcConfig = PeerConnection.RTCConfiguration(RTCClient.DEFAULT_ICE_SERVERS).apply { 34 val rtcConfig = PeerConnection.RTCConfiguration(RTCClient.DEFAULT_ICE_SERVERS).apply {
14 sdpSemantics = PeerConnection.SdpSemantics.UNIFIED_PLAN 35 sdpSemantics = PeerConnection.SdpSemantics.UNIFIED_PLAN
15 continualGatheringPolicy = PeerConnection.ContinualGatheringPolicy.GATHER_CONTINUALLY 36 continualGatheringPolicy = PeerConnection.ContinualGatheringPolicy.GATHER_CONTINUALLY
16 } 37 }
17 38
  39 + publisher = pctFactory.create(rtcConfig, publisherObserver)
  40 + subscriber = pctFactory.create(rtcConfig, subscriberObserver)
  41 +
18 } 42 }
19 43
20 suspend fun join(url: String, token: String, isSecure: Boolean) { 44 suspend fun join(url: String, token: String, isSecure: Boolean) {
21 client.join(url, token, isSecure) 45 client.join(url, token, isSecure)
22 } 46 }
  47 +
  48 + fun negotiate() {
  49 + TODO("Not yet implemented")
  50 + }
  51 +
  52 + interface Listener {
  53 + fun onJoin(response: Rtc.JoinResponse?)
  54 + fun onAddTrack(track: MediaStreamTrack, streams: Array<out MediaStream>)
  55 + fun onPublishLocalTrack(cid: String, track: Model.TrackInfo)
  56 + fun onAddDataChannel(channel: DataChannel)
  57 + fun onUpdateParticipants(updates: Array<out Model.ParticipantInfo>)
  58 + fun onUpdateSpeakers(speakers: Array<out Rtc.SpeakerInfo>)
  59 + fun onDisconnect(reason: String)
  60 + fun onFailToConnect(error: Error)
  61 + }
23 } 62 }
@@ -11,7 +11,7 @@ constructor( @@ -11,7 +11,7 @@ constructor(
11 @Assisted private val engine: RTCEngine, 11 @Assisted private val engine: RTCEngine,
12 ) { 12 ) {
13 13
14 - suspend fun connect() {  
15 - engine.join(connectOptions) 14 + suspend fun connect(url: String, token: String, isSecure: Boolean) {
  15 + engine.join(url, token, isSecure)
16 } 16 }
17 } 17 }
  1 +package io.livekit.android.room
  2 +
  3 +import com.github.ajalt.timberkt.Timber
  4 +import livekit.Rtc
  5 +import org.webrtc.*
  6 +
  7 +class SubscriberTransportObserver(
  8 + private val engine: RTCEngine
  9 +) : PeerConnection.Observer {
  10 +
  11 +
  12 + override fun onIceCandidate(candidate: IceCandidate) {
  13 + engine.client.sendCandidate(candidate, Rtc.SignalTarget.SUBSCRIBER)
  14 + }
  15 +
  16 + override fun onAddTrack(receiver: RtpReceiver, streams: Array<out MediaStream>) {
  17 + val track = receiver.track() ?: return
  18 + engine.listener?.onAddTrack(track, streams)
  19 + }
  20 +
  21 + override fun onTrack(transceiver: RtpTransceiver) {
  22 + when (transceiver.mediaType) {
  23 + MediaStreamTrack.MediaType.MEDIA_TYPE_AUDIO -> Timber.v { "peerconn started receiving audio" }
  24 + MediaStreamTrack.MediaType.MEDIA_TYPE_VIDEO -> Timber.v { "peerconn started receiving video" }
  25 + else -> Timber.d { "peerconn started receiving unknown media type: ${transceiver.mediaType}" }
  26 + }
  27 + }
  28 +
  29 + override fun onDataChannel(channel: DataChannel) {
  30 + engine.listener?.onAddDataChannel(channel)
  31 + }
  32 +
  33 + override fun onStandardizedIceConnectionChange(newState: PeerConnection.IceConnectionState?) {
  34 + }
  35 +
  36 + override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) {
  37 + }
  38 +
  39 + override fun onSelectedCandidatePairChanged(event: CandidatePairChangeEvent?) {
  40 + }
  41 +
  42 + override fun onSignalingChange(p0: PeerConnection.SignalingState?) {
  43 + }
  44 +
  45 + override fun onIceConnectionChange(p0: PeerConnection.IceConnectionState?) {
  46 + }
  47 +
  48 + override fun onIceConnectionReceivingChange(p0: Boolean) {
  49 + }
  50 +
  51 + override fun onIceGatheringChange(p0: PeerConnection.IceGatheringState?) {
  52 + }
  53 +
  54 + override fun onIceCandidatesRemoved(p0: Array<out IceCandidate>?) {
  55 + }
  56 +
  57 + override fun onAddStream(p0: MediaStream?) {
  58 + }
  59 +
  60 + override fun onRemoveStream(p0: MediaStream?) {
  61 + }
  62 +
  63 + override fun onRenegotiationNeeded() {
  64 + }
  65 +
  66 +}