davidliu
Committed by GitHub

Implement client metrics (#511)

* Update protocol submodule

* Initial Metrics implementation

* Protocol/metrics updates

* adjust log level of metrics exception

* spotless/changeset

* update protocol submodule

* spotless

* disable metrics for mocke2e unit tests since it causes tests to never complete

* spotless

* visibility

* make changeset patch and some more docs
  1 +---
  2 +"client-sdk-android": minor
  3 +---
  4 +
  5 +Implement client metrics
@@ -1028,6 +1028,20 @@ internal constructor( @@ -1028,6 +1028,20 @@ internal constructor(
1028 listener?.onTranscriptionReceived(dp.transcription) 1028 listener?.onTranscriptionReceived(dp.transcription)
1029 } 1029 }
1030 1030
  1031 + LivekitModels.DataPacket.ValueCase.METRICS -> {
  1032 + // TODO
  1033 + }
  1034 +
  1035 + LivekitModels.DataPacket.ValueCase.CHAT_MESSAGE -> {
  1036 + // TODO
  1037 + }
  1038 +
  1039 + LivekitModels.DataPacket.ValueCase.RPC_REQUEST,
  1040 + LivekitModels.DataPacket.ValueCase.RPC_ACK,
  1041 + LivekitModels.DataPacket.ValueCase.RPC_RESPONSE,
  1042 + -> {
  1043 + // TODO
  1044 + }
1031 LivekitModels.DataPacket.ValueCase.VALUE_NOT_SET, 1045 LivekitModels.DataPacket.ValueCase.VALUE_NOT_SET,
1032 null, 1046 null,
1033 -> { 1047 -> {
@@ -38,6 +38,7 @@ import io.livekit.android.e2ee.E2EEOptions @@ -38,6 +38,7 @@ import io.livekit.android.e2ee.E2EEOptions
38 import io.livekit.android.events.* 38 import io.livekit.android.events.*
39 import io.livekit.android.memory.CloseableManager 39 import io.livekit.android.memory.CloseableManager
40 import io.livekit.android.renderer.TextureViewRenderer 40 import io.livekit.android.renderer.TextureViewRenderer
  41 +import io.livekit.android.room.metrics.collectMetrics
41 import io.livekit.android.room.network.NetworkCallbackManagerFactory 42 import io.livekit.android.room.network.NetworkCallbackManagerFactory
42 import io.livekit.android.room.participant.* 43 import io.livekit.android.room.participant.*
43 import io.livekit.android.room.provisions.LKObjects 44 import io.livekit.android.room.provisions.LKObjects
@@ -181,6 +182,12 @@ constructor( @@ -181,6 +182,12 @@ constructor(
181 private set 182 private set
182 183
183 /** 184 /**
  185 + * @suppress
  186 + */
  187 + @VisibleForTesting
  188 + var enableMetrics: Boolean = true
  189 +
  190 + /**
184 * end-to-end encryption manager 191 * end-to-end encryption manager
185 */ 192 */
186 var e2eeManager: E2EEManager? = null 193 var e2eeManager: E2EEManager? = null
@@ -441,6 +448,12 @@ constructor( @@ -441,6 +448,12 @@ constructor(
441 val videoTrack = localParticipant.createVideoTrack() 448 val videoTrack = localParticipant.createVideoTrack()
442 localParticipant.publishVideoTrack(videoTrack) 449 localParticipant.publishVideoTrack(videoTrack)
443 } 450 }
  451 +
  452 + coroutineScope.launch {
  453 + if (enableMetrics) {
  454 + collectMetrics(room = this@Room, rtcEngine = engine)
  455 + }
  456 + }
444 } 457 }
445 458
446 val outerHandler = coroutineContext.job.invokeOnCompletion { cause -> 459 val outerHandler = coroutineContext.job.invokeOnCompletion { cause ->
  1 +/*
  2 + * Copyright 2024 LiveKit, Inc.
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +
  17 +package io.livekit.android.room.metrics
  18 +
  19 +import io.livekit.android.room.RTCEngine
  20 +import io.livekit.android.room.Room
  21 +import io.livekit.android.room.participant.Participant
  22 +import io.livekit.android.util.LKLog
  23 +import kotlinx.coroutines.coroutineScope
  24 +import kotlinx.coroutines.currentCoroutineContext
  25 +import kotlinx.coroutines.delay
  26 +import kotlinx.coroutines.isActive
  27 +import kotlinx.coroutines.launch
  28 +import kotlinx.coroutines.suspendCancellableCoroutine
  29 +import livekit.LivekitMetrics.MetricLabel
  30 +import livekit.LivekitMetrics.MetricSample
  31 +import livekit.LivekitMetrics.MetricsBatch
  32 +import livekit.LivekitMetrics.TimeSeriesMetric
  33 +import livekit.LivekitModels.DataPacket
  34 +import livekit.org.webrtc.RTCStats
  35 +import livekit.org.webrtc.RTCStatsReport
  36 +import java.util.concurrent.TimeUnit
  37 +import kotlin.coroutines.resume
  38 +
  39 +/**
  40 + * Handles getting the WebRTC metrics and sending them through the data channels.
  41 + *
  42 + * See [RTCMetric] for the related metrics we send.
  43 + */
  44 +internal suspend fun collectMetrics(room: Room, rtcEngine: RTCEngine) = coroutineScope {
  45 + launch { collectPublisherMetrics(room, rtcEngine) }
  46 + launch { collectSubscriberMetrics(room, rtcEngine) }
  47 +}
  48 +
  49 +private suspend fun collectPublisherMetrics(room: Room, rtcEngine: RTCEngine) {
  50 + while (currentCoroutineContext().isActive) {
  51 + delay(1000)
  52 + val report = suspendCancellableCoroutine { cont ->
  53 + room.getPublisherRTCStats { cont.resume(it) }
  54 + }
  55 +
  56 + val strings = mutableListOf<String>()
  57 + val stats = findPublisherVideoStats(strings, room, report, room.localParticipant.identity)
  58 +
  59 + val dataPacket = with(DataPacket.newBuilder()) {
  60 + metrics = with(MetricsBatch.newBuilder()) {
  61 + timestampMs = report.timestampUs.microToMilli()
  62 + addAllStrData(strings)
  63 + addAllTimeSeries(stats)
  64 + build()
  65 + }
  66 + kind = DataPacket.Kind.RELIABLE
  67 + build()
  68 + }
  69 +
  70 + try {
  71 + rtcEngine.sendData(dataPacket)
  72 + } catch (e: Exception) {
  73 + LKLog.i(e) { "Error sending metrics: " }
  74 + }
  75 + }
  76 +}
  77 +
  78 +private suspend fun collectSubscriberMetrics(room: Room, rtcEngine: RTCEngine) {
  79 + while (currentCoroutineContext().isActive) {
  80 + delay(1000)
  81 + val report = suspendCancellableCoroutine { cont ->
  82 + room.getSubscriberRTCStats { cont.resume(it) }
  83 + }
  84 +
  85 + val strings = mutableListOf<String>()
  86 + val stats = findSubscriberAudioStats(strings, report, room.localParticipant.identity) +
  87 + findSubscriberVideoStats(strings, report, room.localParticipant.identity)
  88 +
  89 + val dataPacket = with(DataPacket.newBuilder()) {
  90 + metrics = with(MetricsBatch.newBuilder()) {
  91 + timestampMs = report.timestampUs.microToMilli()
  92 + addAllStrData(strings)
  93 + addAllTimeSeries(stats)
  94 + build()
  95 + }
  96 + kind = DataPacket.Kind.RELIABLE
  97 + build()
  98 + }
  99 +
  100 + try {
  101 + rtcEngine.sendData(dataPacket)
  102 + } catch (e: Exception) {
  103 + LKLog.i(e) { "Error sending metrics: " }
  104 + }
  105 + }
  106 +}
  107 +
  108 +private fun findPublisherVideoStats(strings: MutableList<String>, room: Room, report: RTCStatsReport, participantIdentity: Participant.Identity?): List<TimeSeriesMetric> {
  109 + val mediaSources = report.statsMap
  110 + .values
  111 + .filter { stat -> stat.type == "media-source" && stat.members["kind"] == "video" }
  112 + val videoTracks = report.statsMap
  113 + .values
  114 + .filter { stat -> stat.type == "outbound-rtp" && stat.members["kind"] == "video" }
  115 + .mapNotNull { stat -> stat to getPublishVideoTrackSid(room, mediaSources, stat) }
  116 +
  117 + val metrics = videoTracks
  118 + .flatMap { (stat, trackSid) ->
  119 + val durations = stat.members["qualityLimitationDurations"] as? Map<*, *> ?: return emptyList()
  120 + val rid = stat.members["rid"] as? String
  121 + qualityLimitations.mapNotNull { (label, key) ->
  122 + val duration = durations[key] as? Number ?: return@mapNotNull null
  123 + val sample = createMetricSample(stat.timestampUs.microToMilli(), duration)
  124 + createTimeSeries(
  125 + label = label.protoLabel,
  126 + strings = strings,
  127 + samples = listOf(sample),
  128 + identity = participantIdentity,
  129 + trackSid = trackSid,
  130 + rid = rid,
  131 + )
  132 + }
  133 + }
  134 +
  135 + return metrics
  136 +}
  137 +
  138 +/**
  139 + * The track sid isn't available on outbound-rtp stats, so we cross-reference against
  140 + * the MediaSource trackIdentifier (which is a locally generated id), and then look up
  141 + * the local published track for the sid.
  142 + */
  143 +private fun getPublishVideoTrackSid(room: Room, mediaSources: List<RTCStats>, videoTrack: RTCStats): String? {
  144 + val mediaSourceId = videoTrack.members["mediaSourceId"] ?: return null
  145 + val mediaSource = mediaSources.firstOrNull { m -> m.id == mediaSourceId } ?: return null
  146 + val trackIdentifier = mediaSource.members["trackIdentifier"] ?: return null
  147 +
  148 + val trackPubPair = room.localParticipant.videoTrackPublications
  149 + .firstOrNull { (_, track) -> track?.rtcTrack?.id() == trackIdentifier } ?: return null
  150 +
  151 + val (publication) = trackPubPair
  152 +
  153 + return publication.sid
  154 +}
  155 +
  156 +private fun findSubscriberAudioStats(strings: MutableList<String>, report: RTCStatsReport, participantIdentity: Participant.Identity?): List<TimeSeriesMetric> {
  157 + val audioTracks = report.statsMap.filterValues { stat ->
  158 + stat.type == "inbound-rtp" && stat.members["kind"] == "audio"
  159 + }
  160 +
  161 + val metrics = audioTracks.values
  162 + .flatMap { stat ->
  163 + listOf(
  164 + RTCMetric.CONCEALED_SAMPLES,
  165 + RTCMetric.CONCEALMENT_EVENTS,
  166 + RTCMetric.SILENT_CONCEALED_SAMPLES,
  167 + RTCMetric.JITTER_BUFFER_DELAY,
  168 + RTCMetric.JITTER_BUFFER_EMITTED_COUNT,
  169 + ).mapNotNull { metric ->
  170 + createTimeSeriesForMetric(
  171 + stat = stat,
  172 + metric = metric,
  173 + strings = strings,
  174 + identity = participantIdentity,
  175 + )
  176 + }
  177 + }
  178 +
  179 + return metrics
  180 +}
  181 +
  182 +private fun findSubscriberVideoStats(strings: MutableList<String>, report: RTCStatsReport, participantIdentity: Participant.Identity?): List<TimeSeriesMetric> {
  183 + val videoTracks = report.statsMap.filterValues { stat ->
  184 + stat.type == "inbound-rtp" && stat.members["kind"] == "video"
  185 + }
  186 +
  187 + val metrics = videoTracks.values
  188 + .flatMap { stat ->
  189 + listOf(
  190 + RTCMetric.FREEZE_COUNT,
  191 + RTCMetric.TOTAL_FREEZES_DURATION,
  192 + RTCMetric.PAUSE_COUNT,
  193 + RTCMetric.TOTAL_PAUSES_DURATION,
  194 + RTCMetric.JITTER_BUFFER_DELAY,
  195 + RTCMetric.JITTER_BUFFER_EMITTED_COUNT,
  196 + ).mapNotNull { metric ->
  197 + createTimeSeriesForMetric(
  198 + stat = stat,
  199 + metric = metric,
  200 + strings = strings,
  201 + identity = participantIdentity,
  202 + )
  203 + }
  204 + }
  205 +
  206 + return metrics
  207 +}
  208 +
  209 +// Utility methods
  210 +
  211 +/**
  212 + * Gets the final index to use for indexes pointing at the MetricsBatch.str_data.
  213 + * Index starts at [MetricLabel.METRIC_LABEL_PREDEFINED_MAX_VALUE].
  214 + *
  215 + * Receivers should parse index values like so:
  216 + * ```
  217 + * if index < LABEL_MAX_VALUE
  218 + * MetricLabel[index]
  219 + * else
  220 + * str_data[index - 4096]
  221 + * ```
  222 + */
  223 +private fun MutableList<String>.getOrCreateIndex(string: String): Int {
  224 + var index = indexOf(string)
  225 +
  226 + if (index == -1) {
  227 + // Doesn't exist, create.
  228 + add(string)
  229 + index = size - 1
  230 + }
  231 +
  232 + return index + MetricLabel.METRIC_LABEL_PREDEFINED_MAX_VALUE.number
  233 +}
  234 +
  235 +private fun createMetricSample(
  236 + timestampMs: Long,
  237 + value: Number,
  238 +): MetricSample {
  239 + return with(MetricSample.newBuilder()) {
  240 + this.timestampMs = timestampMs
  241 + this.value = value.toFloat()
  242 + build()
  243 + }
  244 +}
  245 +
  246 +private fun createTimeSeriesForMetric(
  247 + stat: RTCStats,
  248 + metric: RTCMetric,
  249 + strings: MutableList<String>,
  250 + identity: Participant.Identity? = null,
  251 +): TimeSeriesMetric? {
  252 + val value = stat.members[metric.statKey] as? Number ?: return null
  253 + val trackSid = stat.members["trackIdentifier"] as? String ?: return null
  254 + val rid = stat.members["rid"] as? String
  255 +
  256 + val sample = createMetricSample(stat.timestampUs.microToMilli(), value)
  257 +
  258 + return createTimeSeries(
  259 + label = metric.protoLabel,
  260 + strings = strings,
  261 + samples = listOf(sample),
  262 + identity = identity,
  263 + trackSid = trackSid,
  264 + rid = rid,
  265 + )
  266 +}
  267 +
  268 +private fun createTimeSeries(
  269 + label: MetricLabel,
  270 + strings: MutableList<String>,
  271 + samples: List<MetricSample>,
  272 + identity: Participant.Identity? = null,
  273 + trackSid: String? = null,
  274 + rid: String? = null,
  275 +): TimeSeriesMetric {
  276 + return with(TimeSeriesMetric.newBuilder()) {
  277 + this.label = label.number
  278 +
  279 + if (identity != null) {
  280 + this.participantIdentity = strings.getOrCreateIndex(identity.value)
  281 + }
  282 + if (trackSid != null) {
  283 + this.trackSid = strings.getOrCreateIndex(trackSid)
  284 + }
  285 +
  286 + if (rid != null) {
  287 + this.rid = strings.getOrCreateIndex(rid)
  288 + }
  289 + this.addAllSamples(samples)
  290 + build()
  291 + }
  292 +}
  293 +
  294 +private fun Number.microToMilli(): Long {
  295 + return TimeUnit.MILLISECONDS.convert(this.toLong(), TimeUnit.MILLISECONDS)
  296 +}
  297 +
  298 +private enum class RTCMetric(val protoLabel: MetricLabel, val statKey: String) {
  299 + FREEZE_COUNT(MetricLabel.CLIENT_VIDEO_SUBSCRIBER_FREEZE_COUNT, "freezeCount"),
  300 + TOTAL_FREEZES_DURATION(MetricLabel.CLIENT_VIDEO_SUBSCRIBER_TOTAL_FREEZE_DURATION, "totalFreezesDuration"),
  301 + PAUSE_COUNT(MetricLabel.CLIENT_VIDEO_SUBSCRIBER_PAUSE_COUNT, "pauseCount"),
  302 + TOTAL_PAUSES_DURATION(MetricLabel.CLIENT_VIDEO_SUBSCRIBER_TOTAL_PAUSES_DURATION, "totalPausesDuration"),
  303 +
  304 + CONCEALED_SAMPLES(MetricLabel.CLIENT_AUDIO_SUBSCRIBER_CONCEALED_SAMPLES, "concealedSamples"),
  305 + SILENT_CONCEALED_SAMPLES(MetricLabel.CLIENT_AUDIO_SUBSCRIBER_SILENT_CONCEALED_SAMPLES, "silentConcealedSamples"),
  306 + CONCEALMENT_EVENTS(MetricLabel.CLIENT_AUDIO_SUBSCRIBER_CONCEALMENT_EVENTS, "concealmentEvents"),
  307 +
  308 + JITTER_BUFFER_DELAY(MetricLabel.CLIENT_SUBSCRIBER_JITTER_BUFFER_DELAY, "jitterBufferDelay"),
  309 + JITTER_BUFFER_EMITTED_COUNT(MetricLabel.CLIENT_SUBSCRIBER_JITTER_BUFFER_EMITTED_COUNT, "jitterBufferEmittedCount"),
  310 +
  311 + QUALITY_LIMITATION_DURATION_BANDWIDTH(MetricLabel.CLIENT_VIDEO_PUBLISHER_QUALITY_LIMITATION_DURATION_BANDWIDTH, "qualityLimitationDurations"),
  312 + QUALITY_LIMITATION_DURATION_CPU(MetricLabel.CLIENT_VIDEO_PUBLISHER_QUALITY_LIMITATION_DURATION_CPU, "qualityLimitationDurations"),
  313 + QUALITY_LIMITATION_DURATION_OTHER(MetricLabel.CLIENT_VIDEO_PUBLISHER_QUALITY_LIMITATION_DURATION_OTHER, "qualityLimitationDurations"),
  314 +}
  315 +
  316 +private val qualityLimitations = listOf(
  317 + RTCMetric.QUALITY_LIMITATION_DURATION_CPU to "cpu",
  318 + RTCMetric.QUALITY_LIMITATION_DURATION_BANDWIDTH to "bandwidth",
  319 + RTCMetric.QUALITY_LIMITATION_DURATION_OTHER to "other",
  320 +)
@@ -25,6 +25,7 @@ import kotlinx.coroutines.test.TestScope @@ -25,6 +25,7 @@ import kotlinx.coroutines.test.TestScope
25 import kotlinx.coroutines.test.runTest 25 import kotlinx.coroutines.test.runTest
26 import org.junit.Before 26 import org.junit.Before
27 import org.junit.Rule 27 import org.junit.Rule
  28 +import org.junit.rules.Timeout
28 import org.mockito.junit.MockitoJUnit 29 import org.mockito.junit.MockitoJUnit
29 30
30 @OptIn(ExperimentalCoroutinesApi::class) 31 @OptIn(ExperimentalCoroutinesApi::class)
@@ -39,6 +40,9 @@ abstract class BaseTest { @@ -39,6 +40,9 @@ abstract class BaseTest {
39 @get:Rule 40 @get:Rule
40 var coroutineRule = TestCoroutineRule() 41 var coroutineRule = TestCoroutineRule()
41 42
  43 + @get:Rule
  44 + var globalTimeout: Timeout = Timeout.seconds(60)
  45 +
42 @Before 46 @Before
43 fun setupRTCThread() { 47 fun setupRTCThread() {
44 overrideExecutorAndDispatcher( 48 overrideExecutorAndDispatcher(
@@ -58,6 +58,9 @@ abstract class MockE2ETest : BaseTest() { @@ -58,6 +58,9 @@ abstract class MockE2ETest : BaseTest() {
58 58
59 room = component.roomFactory() 59 room = component.roomFactory()
60 .create(context) 60 .create(context)
  61 + .apply {
  62 + enableMetrics = false
  63 + }
61 wsFactory = component.websocketFactory() 64 wsFactory = component.websocketFactory()
62 } 65 }
63 66
1 -Subproject commit 5c7350d25904ed8fd8163e91ff47f0577ca6afad 1 +Subproject commit a601adc5e9027820857a6d445b32a868b19d4184
@@ -17,8 +17,6 @@ @@ -17,8 +17,6 @@
17 package io.livekit.android.composesample 17 package io.livekit.android.composesample
18 18
19 import android.content.Intent 19 import android.content.Intent
20 -import android.net.ConnectivityManager  
21 -import android.net.Network  
22 import android.os.Bundle 20 import android.os.Bundle
23 import android.widget.Toast 21 import android.widget.Toast
24 import androidx.activity.ComponentActivity 22 import androidx.activity.ComponentActivity
@@ -58,26 +56,10 @@ import io.livekit.android.sample.MainViewModel @@ -58,26 +56,10 @@ import io.livekit.android.sample.MainViewModel
58 import io.livekit.android.sample.common.R 56 import io.livekit.android.sample.common.R
59 import io.livekit.android.sample.model.StressTest 57 import io.livekit.android.sample.model.StressTest
60 import io.livekit.android.sample.util.requestNeededPermissions 58 import io.livekit.android.sample.util.requestNeededPermissions
61 -import io.livekit.android.util.LKLog  
62 59
63 @ExperimentalPagerApi 60 @ExperimentalPagerApi
64 class MainActivity : ComponentActivity() { 61 class MainActivity : ComponentActivity() {
65 62
66 - private val networkCallback = object : ConnectivityManager.NetworkCallback() {  
67 - /**  
68 - * @suppress  
69 - */  
70 - override fun onLost(network: Network) {  
71 - LKLog.i { "network connection lost" }  
72 - }  
73 -  
74 - /**  
75 - * @suppress  
76 - */  
77 - override fun onAvailable(network: Network) {  
78 - LKLog.i { "network connection available, reconnecting" }  
79 - }  
80 - }  
81 private val viewModel by viewModels<MainViewModel>() 63 private val viewModel by viewModels<MainViewModel>()
82 override fun onCreate(savedInstanceState: Bundle?) { 64 override fun onCreate(savedInstanceState: Bundle?) {
83 super.onCreate(savedInstanceState) 65 super.onCreate(savedInstanceState)