winlin

add srs bytes and kbps to api summaries. 0.9.126

@@ -240,6 +240,7 @@ Supported operating systems and hardware: @@ -240,6 +240,7 @@ Supported operating systems and hardware:
240 * 2013-10-17, Created.<br/> 240 * 2013-10-17, Created.<br/>
241 241
242 ## History 242 ## History
  243 +* v1.0, 2014-06-19, add srs bytes and kbps to api summaries. 0.9.126
243 * v1.0, 2014-06-18, add network bytes to api summaries. 0.9.125 244 * v1.0, 2014-06-18, add network bytes to api summaries. 0.9.125
244 * v1.0, 2014-06-14, fix [#98](https://github.com/winlinvip/simple-rtmp-server/issues/98), workaround for librtmp ping(fmt=1,cid=2 fresh stream). 0.9.124 245 * v1.0, 2014-06-14, fix [#98](https://github.com/winlinvip/simple-rtmp-server/issues/98), workaround for librtmp ping(fmt=1,cid=2 fresh stream). 0.9.124
245 * v1.0, 2014-05-29, support flv inject and flv http streaming with start=bytes. 0.9.122 246 * v1.0, 2014-05-29, support flv inject and flv http streaming with start=bytes. 0.9.122
@@ -34,9 +34,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -34,9 +34,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
34 34
35 #include <srs_app_st.hpp> 35 #include <srs_app_st.hpp>
36 #include <srs_app_thread.hpp> 36 #include <srs_app_thread.hpp>
  37 +#include <srs_app_kbps.hpp>
37 38
38 class SrsServer; 39 class SrsServer;
39 -class SrsConnection : public ISrsThreadHandler 40 +class SrsConnection : public virtual ISrsThreadHandler, public virtual IKbpsDelta
40 { 41 {
41 private: 42 private:
42 SrsThread* pthread; 43 SrsThread* pthread;
@@ -52,6 +53,8 @@ public: @@ -52,6 +53,8 @@ public:
52 virtual int start(); 53 virtual int start();
53 virtual int cycle(); 54 virtual int cycle();
54 virtual void on_thread_stop(); 55 virtual void on_thread_stop();
  56 +public:
  57 + virtual void kbps_resample() = 0;
55 protected: 58 protected:
56 virtual int do_cycle() = 0; 59 virtual int do_cycle() = 0;
57 virtual void stop(); 60 virtual void stop();
@@ -182,8 +182,8 @@ int SrsEdgeIngester::ingest() @@ -182,8 +182,8 @@ int SrsEdgeIngester::ingest()
182 srs_trace("<- "SRS_LOG_ID_EDGE_PLAY 182 srs_trace("<- "SRS_LOG_ID_EDGE_PLAY
183 " time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d", 183 " time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d",
184 pithy_print.age(), 184 pithy_print.age(),
185 - kbps->get_send_kbps(), kbps->get_send_kbps_sample_high(), kbps->get_send_kbps_sample_medium(),  
186 - kbps->get_recv_kbps(), kbps->get_recv_kbps_sample_high(), kbps->get_recv_kbps_sample_medium()); 185 + kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
  186 + kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m());
187 } 187 }
188 188
189 // read from client. 189 // read from client.
@@ -479,8 +479,8 @@ int SrsEdgeForwarder::cycle() @@ -479,8 +479,8 @@ int SrsEdgeForwarder::cycle()
479 srs_trace("-> "SRS_LOG_ID_EDGE_PUBLISH 479 srs_trace("-> "SRS_LOG_ID_EDGE_PUBLISH
480 " time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d", 480 " time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d",
481 pithy_print.age(), count, 481 pithy_print.age(), count,
482 - kbps->get_send_kbps(), kbps->get_send_kbps_sample_high(), kbps->get_send_kbps_sample_medium(),  
483 - kbps->get_recv_kbps(), kbps->get_recv_kbps_sample_high(), kbps->get_recv_kbps_sample_medium()); 482 + kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
  483 + kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m());
484 } 484 }
485 485
486 // ignore when no messages. 486 // ignore when no messages.
@@ -351,8 +351,8 @@ int SrsForwarder::forward() @@ -351,8 +351,8 @@ int SrsForwarder::forward()
351 srs_trace("-> "SRS_LOG_ID_FOWARDER 351 srs_trace("-> "SRS_LOG_ID_FOWARDER
352 " time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d", 352 " time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d",
353 pithy_print.age(), count, 353 pithy_print.age(), count,
354 - kbps->get_send_kbps(), kbps->get_send_kbps_sample_high(), kbps->get_send_kbps_sample_medium(),  
355 - kbps->get_recv_kbps(), kbps->get_recv_kbps_sample_high(), kbps->get_recv_kbps_sample_medium()); 354 + kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
  355 + kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m());
356 } 356 }
357 357
358 // ignore when no messages. 358 // ignore when no messages.
@@ -390,6 +390,7 @@ int SrsApiSummaries::do_process_request(SrsSocket* skt, SrsHttpMessage* req) @@ -390,6 +390,7 @@ int SrsApiSummaries::do_process_request(SrsSocket* skt, SrsHttpMessage* req)
390 SrsMemInfo* m = srs_get_meminfo(); 390 SrsMemInfo* m = srs_get_meminfo();
391 SrsPlatformInfo* p = srs_get_platform_info(); 391 SrsPlatformInfo* p = srs_get_platform_info();
392 SrsNetworkDevices* n = srs_get_network_devices(); 392 SrsNetworkDevices* n = srs_get_network_devices();
  393 + SrsNetworkRtmpServer* nrs = srs_get_network_rtmp_server();
393 394
394 float self_mem_percent = 0; 395 float self_mem_percent = 0;
395 if (m->MemTotal > 0) { 396 if (m->MemTotal > 0) {
@@ -429,6 +430,7 @@ int SrsApiSummaries::do_process_request(SrsSocket* skt, SrsHttpMessage* req) @@ -429,6 +430,7 @@ int SrsApiSummaries::do_process_request(SrsSocket* skt, SrsHttpMessage* req)
429 << JFIELD_ORG("meminfo_ok", (m->ok? "true":"false")) << JFIELD_CONT 430 << JFIELD_ORG("meminfo_ok", (m->ok? "true":"false")) << JFIELD_CONT
430 << JFIELD_ORG("platform_ok", (p->ok? "true":"false")) << JFIELD_CONT 431 << JFIELD_ORG("platform_ok", (p->ok? "true":"false")) << JFIELD_CONT
431 << JFIELD_ORG("network_ok", (n_ok? "true":"false")) << JFIELD_CONT 432 << JFIELD_ORG("network_ok", (n_ok? "true":"false")) << JFIELD_CONT
  433 + << JFIELD_ORG("network_srs_ok", (nrs->ok? "true":"false")) << JFIELD_CONT
432 << JFIELD_ORG("now_ms", now) << JFIELD_CONT 434 << JFIELD_ORG("now_ms", now) << JFIELD_CONT
433 << JFIELD_ORG("self", JOBJECT_START) 435 << JFIELD_ORG("self", JOBJECT_START)
434 << JFIELD_ORG("pid", getpid()) << JFIELD_CONT 436 << JFIELD_ORG("pid", getpid()) << JFIELD_CONT
@@ -455,7 +457,12 @@ int SrsApiSummaries::do_process_request(SrsSocket* skt, SrsHttpMessage* req) @@ -455,7 +457,12 @@ int SrsApiSummaries::do_process_request(SrsSocket* skt, SrsHttpMessage* req)
455 << JFIELD_ORG("load_15m", p->load_fifteen_minutes) << JFIELD_CONT 457 << JFIELD_ORG("load_15m", p->load_fifteen_minutes) << JFIELD_CONT
456 << JFIELD_ORG("net_sample_time", n_sample_time) << JFIELD_CONT 458 << JFIELD_ORG("net_sample_time", n_sample_time) << JFIELD_CONT
457 << JFIELD_ORG("net_recv_bytes", nr_bytes) << JFIELD_CONT 459 << JFIELD_ORG("net_recv_bytes", nr_bytes) << JFIELD_CONT
458 - << JFIELD_ORG("net_send_bytes", ns_bytes) 460 + << JFIELD_ORG("net_send_bytes", ns_bytes) << JFIELD_CONT
  461 + << JFIELD_ORG("srs_sample_time", nrs->sample_time) << JFIELD_CONT
  462 + << JFIELD_ORG("srs_recv_bytes", nrs->rbytes) << JFIELD_CONT
  463 + << JFIELD_ORG("srs_recv_kbps", nrs->rkbps) << JFIELD_CONT
  464 + << JFIELD_ORG("srs_send_bytes", nrs->sbytes) << JFIELD_CONT
  465 + << JFIELD_ORG("srs_send_kbps", nrs->skbps)
459 << JOBJECT_END 466 << JOBJECT_END
460 << JOBJECT_END 467 << JOBJECT_END
461 << JOBJECT_END; 468 << JOBJECT_END;
@@ -707,6 +714,23 @@ SrsHttpApi::~SrsHttpApi() @@ -707,6 +714,23 @@ SrsHttpApi::~SrsHttpApi()
707 srs_freep(parser); 714 srs_freep(parser);
708 } 715 }
709 716
  717 +void SrsHttpApi::kbps_resample()
  718 +{
  719 + // TODO: FIXME: implements it
  720 +}
  721 +
  722 +int64_t SrsHttpApi::get_send_bytes_delta()
  723 +{
  724 + // TODO: FIXME: implements it
  725 + return 0;
  726 +}
  727 +
  728 +int64_t SrsHttpApi::get_recv_bytes_delta()
  729 +{
  730 + // TODO: FIXME: implements it
  731 + return 0;
  732 +}
  733 +
710 int SrsHttpApi::do_cycle() 734 int SrsHttpApi::do_cycle()
711 { 735 {
712 int ret = ERROR_SUCCESS; 736 int ret = ERROR_SUCCESS;
@@ -196,6 +196,12 @@ private: @@ -196,6 +196,12 @@ private:
196 public: 196 public:
197 SrsHttpApi(SrsServer* srs_server, st_netfd_t client_stfd, SrsHttpHandler* _handler); 197 SrsHttpApi(SrsServer* srs_server, st_netfd_t client_stfd, SrsHttpHandler* _handler);
198 virtual ~SrsHttpApi(); 198 virtual ~SrsHttpApi();
  199 +public:
  200 + virtual void kbps_resample();
  201 +// interface IKbpsDelta
  202 +public:
  203 + virtual int64_t get_send_bytes_delta();
  204 + virtual int64_t get_recv_bytes_delta();
199 protected: 205 protected:
200 virtual int do_cycle(); 206 virtual int do_cycle();
201 private: 207 private:
@@ -511,6 +511,23 @@ SrsHttpConn::~SrsHttpConn() @@ -511,6 +511,23 @@ SrsHttpConn::~SrsHttpConn()
511 srs_freep(parser); 511 srs_freep(parser);
512 } 512 }
513 513
  514 +void SrsHttpConn::kbps_resample()
  515 +{
  516 + // TODO: FIXME: implements it
  517 +}
  518 +
  519 +int64_t SrsHttpConn::get_send_bytes_delta()
  520 +{
  521 + // TODO: FIXME: implements it
  522 + return 0;
  523 +}
  524 +
  525 +int64_t SrsHttpConn::get_recv_bytes_delta()
  526 +{
  527 + // TODO: FIXME: implements it
  528 + return 0;
  529 +}
  530 +
514 int SrsHttpConn::do_cycle() 531 int SrsHttpConn::do_cycle()
515 { 532 {
516 int ret = ERROR_SUCCESS; 533 int ret = ERROR_SUCCESS;
@@ -90,6 +90,12 @@ private: @@ -90,6 +90,12 @@ private:
90 public: 90 public:
91 SrsHttpConn(SrsServer* srs_server, st_netfd_t client_stfd, SrsHttpHandler* _handler); 91 SrsHttpConn(SrsServer* srs_server, st_netfd_t client_stfd, SrsHttpHandler* _handler);
92 virtual ~SrsHttpConn(); 92 virtual ~SrsHttpConn();
  93 +public:
  94 + virtual void kbps_resample();
  95 +// interface IKbpsDelta
  96 +public:
  97 + virtual int64_t get_send_bytes_delta();
  98 + virtual int64_t get_recv_bytes_delta();
93 protected: 99 protected:
94 virtual int do_cycle(); 100 virtual int do_cycle();
95 private: 101 private:
@@ -38,7 +38,7 @@ SrsKbpsSlice::SrsKbpsSlice() @@ -38,7 +38,7 @@ SrsKbpsSlice::SrsKbpsSlice()
38 { 38 {
39 io.in = NULL; 39 io.in = NULL;
40 io.out = NULL; 40 io.out = NULL;
41 - last_bytes = io_bytes_base = starttime = bytes = 0; 41 + last_bytes = io_bytes_base = starttime = bytes = delta_bytes = 0;
42 } 42 }
43 43
44 SrsKbpsSlice::~SrsKbpsSlice() 44 SrsKbpsSlice::~SrsKbpsSlice()
@@ -98,6 +98,14 @@ void SrsKbpsSlice::sample() @@ -98,6 +98,14 @@ void SrsKbpsSlice::sample()
98 } 98 }
99 } 99 }
100 100
  101 +IKbpsDelta::IKbpsDelta()
  102 +{
  103 +}
  104 +
  105 +IKbpsDelta::~IKbpsDelta()
  106 +{
  107 +}
  108 +
101 SrsKbps::SrsKbps() 109 SrsKbps::SrsKbps()
102 { 110 {
103 } 111 }
@@ -165,22 +173,22 @@ int SrsKbps::get_recv_kbps() @@ -165,22 +173,22 @@ int SrsKbps::get_recv_kbps()
165 return bytes * 8 / duration; 173 return bytes * 8 / duration;
166 } 174 }
167 175
168 -int SrsKbps::get_send_kbps_sample_high() 176 +int SrsKbps::get_send_kbps_30s()
169 { 177 {
170 return os.sample_30s.kbps; 178 return os.sample_30s.kbps;
171 } 179 }
172 180
173 -int SrsKbps::get_recv_kbps_sample_high() 181 +int SrsKbps::get_recv_kbps_30s()
174 { 182 {
175 return is.sample_30s.kbps; 183 return is.sample_30s.kbps;
176 } 184 }
177 185
178 -int SrsKbps::get_send_kbps_sample_medium() 186 +int SrsKbps::get_send_kbps_5m()
179 { 187 {
180 return os.sample_5m.kbps; 188 return os.sample_5m.kbps;
181 } 189 }
182 190
183 -int SrsKbps::get_recv_kbps_sample_medium() 191 +int SrsKbps::get_recv_kbps_5m()
184 { 192 {
185 return is.sample_5m.kbps; 193 return is.sample_5m.kbps;
186 } 194 }
@@ -195,20 +203,44 @@ int64_t SrsKbps::get_recv_bytes() @@ -195,20 +203,44 @@ int64_t SrsKbps::get_recv_bytes()
195 return is.get_total_bytes(); 203 return is.get_total_bytes();
196 } 204 }
197 205
  206 +int64_t SrsKbps::get_send_bytes_delta()
  207 +{
  208 + int64_t delta = os.get_total_bytes() - os.delta_bytes;
  209 + os.delta_bytes = os.get_total_bytes();
  210 + return delta;
  211 +}
  212 +
  213 +int64_t SrsKbps::get_recv_bytes_delta()
  214 +{
  215 + int64_t delta = is.get_total_bytes() - is.delta_bytes;
  216 + is.delta_bytes = is.get_total_bytes();
  217 + return delta;
  218 +}
  219 +
  220 +void SrsKbps::add_delta(IKbpsDelta* delta)
  221 +{
  222 + srs_assert(delta);
  223 +
  224 + // update the total bytes
  225 + is.last_bytes += delta->get_recv_bytes_delta();
  226 + os.last_bytes += delta->get_send_bytes_delta();
  227 +
  228 + // we donot sample, please use sample() to do resample.
  229 +}
  230 +
198 void SrsKbps::sample() 231 void SrsKbps::sample()
199 { 232 {
  233 + // update the total bytes
200 if (os.io.out) { 234 if (os.io.out) {
201 os.last_bytes = os.io.out->get_send_bytes(); 235 os.last_bytes = os.io.out->get_send_bytes();
202 } 236 }
203 237
204 - // resample  
205 - os.sample();  
206 -  
207 if (is.io.in) { 238 if (is.io.in) {
208 is.last_bytes = is.io.in->get_recv_bytes(); 239 is.last_bytes = is.io.in->get_recv_bytes();
209 } 240 }
210 241
211 // resample 242 // resample
212 is.sample(); 243 is.sample();
  244 + os.sample();
213 } 245 }
214 246
@@ -30,8 +30,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -30,8 +30,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
30 30
31 #include <srs_core.hpp> 31 #include <srs_core.hpp>
32 32
33 -class ISrsProtocolStatistic;  
34 -class ISrsProtocolStatistic; 33 +#include <srs_protocol_io.hpp>
35 34
36 /** 35 /**
37 * a kbps sample, for example, 1minute kbps, 36 * a kbps sample, for example, 1minute kbps,
@@ -59,7 +58,8 @@ public: @@ -59,7 +58,8 @@ public:
59 * send_bytes = bytes + last_bytes - io_bytes_base 58 * send_bytes = bytes + last_bytes - io_bytes_base
60 * so, the bytes sent duration current session is: 59 * so, the bytes sent duration current session is:
61 * send_bytes = last_bytes - io_bytes_base 60 * send_bytes = last_bytes - io_bytes_base
62 -* @remark user use set_io to start new session. 61 +* @remark use set_io to start new session.
  62 +* @remakr the slice is a data collection object driven by SrsKbps.
63 */ 63 */
64 class SrsKbpsSlice 64 class SrsKbpsSlice
65 { 65 {
@@ -69,6 +69,8 @@ private: @@ -69,6 +69,8 @@ private:
69 ISrsProtocolStatistic* out; 69 ISrsProtocolStatistic* out;
70 }; 70 };
71 public: 71 public:
  72 + // the slice io used for SrsKbps to invoke,
  73 + // the SrsKbpsSlice itself never use it.
72 slice_io io; 74 slice_io io;
73 // session startup bytes 75 // session startup bytes
74 // @remark, use total_bytes() to get the total bytes of slice. 76 // @remark, use total_bytes() to get the total bytes of slice.
@@ -87,6 +89,9 @@ public: @@ -87,6 +89,9 @@ public:
87 SrsKbpsSample sample_5m; 89 SrsKbpsSample sample_5m;
88 SrsKbpsSample sample_60m; 90 SrsKbpsSample sample_60m;
89 public: 91 public:
  92 + // for the delta bytes.
  93 + int64_t delta_bytes;
  94 +public:
90 SrsKbpsSlice(); 95 SrsKbpsSlice();
91 virtual ~SrsKbpsSlice(); 96 virtual ~SrsKbpsSlice();
92 public: 97 public:
@@ -101,9 +106,36 @@ public: @@ -101,9 +106,36 @@ public:
101 }; 106 };
102 107
103 /** 108 /**
  109 +* the interface which provices delta of bytes.
  110 +*/
  111 +class IKbpsDelta
  112 +{
  113 +public:
  114 + IKbpsDelta();
  115 + virtual ~IKbpsDelta();
  116 +public:
  117 + virtual int64_t get_send_bytes_delta() = 0;
  118 + virtual int64_t get_recv_bytes_delta() = 0;
  119 +};
  120 +
  121 +/**
104 * to statistic the kbps of io. 122 * to statistic the kbps of io.
  123 +* itself can be a statistic source, for example, used for SRS bytes stat.
  124 +* there are two usage scenarios:
  125 +* 1. connections to calc kbps:
  126 +* set_io(in, out)
  127 +* sample()
  128 +* get_xxx_kbps().
  129 +* the connections know how many bytes already send/recv.
  130 +* 2. server to calc kbps:
  131 +* set_io(NULL, NULL)
  132 +* for each connection in connections:
  133 +* add_delta(connections) // where connection is a IKbpsDelta*
  134 +* sample()
  135 +* get_xxx_kbps().
  136 +* the server never know how many bytes already send/recv, for the connection maybe closed.
105 */ 137 */
106 -class SrsKbps 138 +class SrsKbps : public virtual ISrsProtocolStatistic, public virtual IKbpsDelta
107 { 139 {
108 private: 140 private:
109 SrsKbpsSlice is; 141 SrsKbpsSlice is;
@@ -130,11 +162,11 @@ public: @@ -130,11 +162,11 @@ public:
130 virtual int get_send_kbps(); 162 virtual int get_send_kbps();
131 virtual int get_recv_kbps(); 163 virtual int get_recv_kbps();
132 // 30s 164 // 30s
133 - virtual int get_send_kbps_sample_high();  
134 - virtual int get_recv_kbps_sample_high(); 165 + virtual int get_send_kbps_30s();
  166 + virtual int get_recv_kbps_30s();
135 // 5m 167 // 5m
136 - virtual int get_send_kbps_sample_medium();  
137 - virtual int get_recv_kbps_sample_medium(); 168 + virtual int get_send_kbps_5m();
  169 + virtual int get_recv_kbps_5m();
138 public: 170 public:
139 /** 171 /**
140 * get the total send/recv bytes, from the startup of the oldest io. 172 * get the total send/recv bytes, from the startup of the oldest io.
@@ -142,9 +174,26 @@ public: @@ -142,9 +174,26 @@ public:
142 */ 174 */
143 virtual int64_t get_send_bytes(); 175 virtual int64_t get_send_bytes();
144 virtual int64_t get_recv_bytes(); 176 virtual int64_t get_recv_bytes();
  177 + /**
  178 + * get the delta of send/recv bytes.
  179 + * @remark, used for add_delta to calc the total system bytes/kbps.
  180 + */
  181 + virtual int64_t get_send_bytes_delta();
  182 + virtual int64_t get_recv_bytes_delta();
145 public: 183 public:
146 /** 184 /**
147 - * resample all samples. 185 + * add delta to kbps clac mechenism.
  186 + * we donot know the total bytes, but know the delta, for instance,
  187 + * for rtmp server to calc total bytes and kbps.
  188 + * @remark user must invoke sample() when invoke this method.
  189 + * @param delta, assert should never be NULL.
  190 + */
  191 + virtual void add_delta(IKbpsDelta* delta);
  192 + /**
  193 + * resample all samples, ignore if in/out is NULL.
  194 + * used for user to calc the kbps, to sample new kbps value.
  195 + * @remark if user, for instance, the rtmp server to calc the total bytes,
  196 + * use the add_delta() is better solutions.
148 */ 197 */
149 virtual void sample(); 198 virtual void sample();
150 }; 199 };
@@ -94,6 +94,11 @@ SrsRtmpConn::~SrsRtmpConn() @@ -94,6 +94,11 @@ SrsRtmpConn::~SrsRtmpConn()
94 srs_freep(kbps); 94 srs_freep(kbps);
95 } 95 }
96 96
  97 +void SrsRtmpConn::kbps_resample()
  98 +{
  99 + kbps->sample();
  100 +}
  101 +
97 // TODO: return detail message when error for client. 102 // TODO: return detail message when error for client.
98 int SrsRtmpConn::do_cycle() 103 int SrsRtmpConn::do_cycle()
99 { 104 {
@@ -169,6 +174,16 @@ int SrsRtmpConn::on_reload_vhost_removed(string vhost) @@ -169,6 +174,16 @@ int SrsRtmpConn::on_reload_vhost_removed(string vhost)
169 return ret; 174 return ret;
170 } 175 }
171 176
  177 +int64_t SrsRtmpConn::get_send_bytes_delta()
  178 +{
  179 + return kbps->get_send_bytes_delta();
  180 +}
  181 +
  182 +int64_t SrsRtmpConn::get_recv_bytes_delta()
  183 +{
  184 + return kbps->get_recv_bytes_delta();
  185 +}
  186 +
172 int SrsRtmpConn::service_cycle() 187 int SrsRtmpConn::service_cycle()
173 { 188 {
174 int ret = ERROR_SUCCESS; 189 int ret = ERROR_SUCCESS;
@@ -510,8 +525,8 @@ int SrsRtmpConn::playing(SrsSource* source) @@ -510,8 +525,8 @@ int SrsRtmpConn::playing(SrsSource* source)
510 srs_trace("-> "SRS_LOG_ID_PLAY 525 srs_trace("-> "SRS_LOG_ID_PLAY
511 " time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d", 526 " time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d",
512 pithy_print.age(), count, 527 pithy_print.age(), count,
513 - kbps->get_send_kbps(), kbps->get_send_kbps_sample_high(), kbps->get_send_kbps_sample_medium(),  
514 - kbps->get_recv_kbps(), kbps->get_recv_kbps_sample_high(), kbps->get_recv_kbps_sample_medium()); 528 + kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
  529 + kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m());
515 } 530 }
516 531
517 if (count <= 0) { 532 if (count <= 0) {
@@ -601,8 +616,8 @@ int SrsRtmpConn::fmle_publish(SrsSource* source) @@ -601,8 +616,8 @@ int SrsRtmpConn::fmle_publish(SrsSource* source)
601 kbps->sample(); 616 kbps->sample();
602 srs_trace("<- "SRS_LOG_ID_CLIENT_PUBLISH 617 srs_trace("<- "SRS_LOG_ID_CLIENT_PUBLISH
603 " time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d", pithy_print.age(), 618 " time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d", pithy_print.age(),
604 - kbps->get_send_kbps(), kbps->get_send_kbps_sample_high(), kbps->get_send_kbps_sample_medium(),  
605 - kbps->get_recv_kbps(), kbps->get_recv_kbps_sample_high(), kbps->get_recv_kbps_sample_medium()); 619 + kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
  620 + kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m());
606 } 621 }
607 622
608 // process UnPublish event. 623 // process UnPublish event.
@@ -683,8 +698,8 @@ int SrsRtmpConn::flash_publish(SrsSource* source) @@ -683,8 +698,8 @@ int SrsRtmpConn::flash_publish(SrsSource* source)
683 srs_trace("<- "SRS_LOG_ID_WEB_PUBLISH 698 srs_trace("<- "SRS_LOG_ID_WEB_PUBLISH
684 " time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d", 699 " time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d",
685 pithy_print.age(), 700 pithy_print.age(),
686 - kbps->get_send_kbps(), kbps->get_send_kbps_sample_high(), kbps->get_send_kbps_sample_medium(),  
687 - kbps->get_recv_kbps(), kbps->get_recv_kbps_sample_high(), kbps->get_recv_kbps_sample_medium()); 701 + kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
  702 + kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m());
688 } 703 }
689 704
690 // process UnPublish event. 705 // process UnPublish event.
@@ -51,7 +51,7 @@ class SrsKbps; @@ -51,7 +51,7 @@ class SrsKbps;
51 /** 51 /**
52 * the client provides the main logic control for RTMP clients. 52 * the client provides the main logic control for RTMP clients.
53 */ 53 */
54 -class SrsRtmpConn : public SrsConnection, public ISrsReloadHandler 54 +class SrsRtmpConn : public virtual SrsConnection, public virtual ISrsReloadHandler
55 { 55 {
56 private: 56 private:
57 SrsRequest* req; 57 SrsRequest* req;
@@ -68,11 +68,17 @@ private: @@ -68,11 +68,17 @@ private:
68 public: 68 public:
69 SrsRtmpConn(SrsServer* srs_server, st_netfd_t client_stfd); 69 SrsRtmpConn(SrsServer* srs_server, st_netfd_t client_stfd);
70 virtual ~SrsRtmpConn(); 70 virtual ~SrsRtmpConn();
  71 +public:
  72 + virtual void kbps_resample();
71 protected: 73 protected:
72 virtual int do_cycle(); 74 virtual int do_cycle();
73 // interface ISrsReloadHandler 75 // interface ISrsReloadHandler
74 public: 76 public:
75 virtual int on_reload_vhost_removed(std::string vhost); 77 virtual int on_reload_vhost_removed(std::string vhost);
  78 +// interface IKbpsDelta
  79 +public:
  80 + virtual int64_t get_send_bytes_delta();
  81 + virtual int64_t get_recv_bytes_delta();
76 private: 82 private:
77 // when valid and connected to vhost/app, service the client. 83 // when valid and connected to vhost/app, service the client.
78 virtual int service_cycle(); 84 virtual int service_cycle();
@@ -45,6 +45,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -45,6 +45,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
45 #include <srs_app_source.hpp> 45 #include <srs_app_source.hpp>
46 #include <srs_app_utility.hpp> 46 #include <srs_app_utility.hpp>
47 #include <srs_app_heartbeat.hpp> 47 #include <srs_app_heartbeat.hpp>
  48 +#include <srs_app_kbps.hpp>
48 49
49 // signal defines. 50 // signal defines.
50 #define SIGNAL_RELOAD SIGHUP 51 #define SIGNAL_RELOAD SIGHUP
@@ -82,6 +83,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -82,6 +83,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
82 // SRS_SYS_CYCLE_INTERVAL * SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES 83 // SRS_SYS_CYCLE_INTERVAL * SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES
83 #define SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES 90 84 #define SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES 90
84 85
  86 +// update network devices info interval:
  87 +// SRS_SYS_CYCLE_INTERVAL * SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES
  88 +#define SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES 90
  89 +
85 SrsListener::SrsListener(SrsServer* server, SrsListenerType type) 90 SrsListener::SrsListener(SrsServer* server, SrsListenerType type)
86 { 91 {
87 fd = -1; 92 fd = -1;
@@ -312,7 +317,8 @@ SrsServer::SrsServer() @@ -312,7 +317,8 @@ SrsServer::SrsServer()
312 signal_gmc_stop = false; 317 signal_gmc_stop = false;
313 pid_fd = -1; 318 pid_fd = -1;
314 319
315 - signal_manager = new SrsSignalManager(this); 320 + signal_manager = NULL;
  321 + kbps = NULL;
316 322
317 // donot new object in constructor, 323 // donot new object in constructor,
318 // for some global instance is not ready now, 324 // for some global instance is not ready now,
@@ -372,6 +378,7 @@ void SrsServer::destroy() @@ -372,6 +378,7 @@ void SrsServer::destroy()
372 } 378 }
373 379
374 srs_freep(signal_manager); 380 srs_freep(signal_manager);
  381 + srs_freep(kbps);
375 382
376 for (std::vector<SrsConnection*>::iterator it = conns.begin(); it != conns.end();) { 383 for (std::vector<SrsConnection*>::iterator it = conns.begin(); it != conns.end();) {
377 SrsConnection* conn = *it; 384 SrsConnection* conn = *it;
@@ -398,6 +405,13 @@ int SrsServer::initialize() @@ -398,6 +405,13 @@ int SrsServer::initialize()
398 srs_assert(_srs_config); 405 srs_assert(_srs_config);
399 _srs_config->subscribe(this); 406 _srs_config->subscribe(this);
400 407
  408 + srs_assert(!signal_manager);
  409 + signal_manager = new SrsSignalManager(this);
  410 +
  411 + srs_assert(!kbps);
  412 + kbps = new SrsKbps();
  413 + kbps->set_io(NULL, NULL);
  414 +
401 #ifdef SRS_AUTO_HTTP_API 415 #ifdef SRS_AUTO_HTTP_API
402 srs_assert(!http_api_handler); 416 srs_assert(!http_api_handler);
403 http_api_handler = SrsHttpHandler::create_http_api(); 417 http_api_handler = SrsHttpHandler::create_http_api();
@@ -610,6 +624,9 @@ void SrsServer::remove(SrsConnection* conn) @@ -610,6 +624,9 @@ void SrsServer::remove(SrsConnection* conn)
610 624
611 srs_info("conn removed. conns=%d", (int)conns.size()); 625 srs_info("conn removed. conns=%d", (int)conns.size());
612 626
  627 + // resample the resource of specified connection.
  628 + resample_kbps(conn);
  629 +
613 // all connections are created by server, 630 // all connections are created by server,
614 // so we free it here. 631 // so we free it here.
615 srs_freep(conn); 632 srs_freep(conn);
@@ -651,11 +668,12 @@ int SrsServer::do_cycle() @@ -651,11 +668,12 @@ int SrsServer::do_cycle()
651 max = srs_max(max, SRS_SYS_MEMINFO_RESOLUTION_TIMES); 668 max = srs_max(max, SRS_SYS_MEMINFO_RESOLUTION_TIMES);
652 max = srs_max(max, SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES); 669 max = srs_max(max, SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES);
653 max = srs_max(max, SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES); 670 max = srs_max(max, SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES);
  671 + max = srs_max(max, SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES);
654 672
655 // the deamon thread, update the time cache 673 // the deamon thread, update the time cache
656 while (true) { 674 while (true) {
657 // the interval in config. 675 // the interval in config.
658 - int64_t heartbeat_max_resolution = _srs_config->get_heartbeat_interval() / 100; 676 + int heartbeat_max_resolution = (int)(_srs_config->get_heartbeat_interval() / 100);
659 677
660 // dynamic fetch the max. 678 // dynamic fetch the max.
661 int __max = max; 679 int __max = max;
@@ -689,30 +707,43 @@ int SrsServer::do_cycle() @@ -689,30 +707,43 @@ int SrsServer::do_cycle()
689 707
690 // update the cache time or rusage. 708 // update the cache time or rusage.
691 if ((i % SRS_SYS_TIME_RESOLUTION_MS_TIMES) == 0) { 709 if ((i % SRS_SYS_TIME_RESOLUTION_MS_TIMES) == 0) {
  710 + srs_info("update current time cache.");
692 srs_update_system_time_ms(); 711 srs_update_system_time_ms();
693 } 712 }
694 if ((i % SRS_SYS_RUSAGE_RESOLUTION_TIMES) == 0) { 713 if ((i % SRS_SYS_RUSAGE_RESOLUTION_TIMES) == 0) {
  714 + srs_info("update resource info, rss.");
695 srs_update_system_rusage(); 715 srs_update_system_rusage();
696 } 716 }
697 if ((i % SRS_SYS_CPU_STAT_RESOLUTION_TIMES) == 0) { 717 if ((i % SRS_SYS_CPU_STAT_RESOLUTION_TIMES) == 0) {
  718 + srs_info("update cpu info, usage.");
698 srs_update_proc_stat(); 719 srs_update_proc_stat();
699 } 720 }
700 if ((i % SRS_SYS_MEMINFO_RESOLUTION_TIMES) == 0) { 721 if ((i % SRS_SYS_MEMINFO_RESOLUTION_TIMES) == 0) {
  722 + srs_info("update memory info, usage/free.");
701 srs_update_meminfo(); 723 srs_update_meminfo();
702 } 724 }
703 if ((i % SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES) == 0) { 725 if ((i % SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES) == 0) {
  726 + srs_info("update platform info, uptime/load.");
704 srs_update_platform_info(); 727 srs_update_platform_info();
705 } 728 }
706 if ((i % SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES) == 0) { 729 if ((i % SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES) == 0) {
  730 + srs_info("update network devices info.");
707 srs_update_network_devices(); 731 srs_update_network_devices();
708 } 732 }
  733 + if ((i % SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES) == 0) {
  734 + srs_info("update network rtmp server info.");
  735 + resample_kbps(NULL);
  736 + srs_update_rtmp_server(kbps);
  737 + }
709 #ifdef SRS_AUTO_HTTP_PARSER 738 #ifdef SRS_AUTO_HTTP_PARSER
710 if (_srs_config->get_heartbeat_enabled()) { 739 if (_srs_config->get_heartbeat_enabled()) {
711 if ((i % heartbeat_max_resolution) == 0) { 740 if ((i % heartbeat_max_resolution) == 0) {
  741 + srs_info("do http heartbeat, for internal server to report.");
712 http_heartbeat->heartbeat(); 742 http_heartbeat->heartbeat();
713 } 743 }
714 } 744 }
715 #endif 745 #endif
  746 + srs_info("server main thread loop");
716 } 747 }
717 } 748 }
718 749
@@ -801,6 +832,32 @@ void SrsServer::close_listeners(SrsListenerType type) @@ -801,6 +832,32 @@ void SrsServer::close_listeners(SrsListenerType type)
801 } 832 }
802 } 833 }
803 834
  835 +void SrsServer::resample_kbps(SrsConnection* conn, bool do_resample)
  836 +{
  837 + // resample all when conn is NULL.
  838 + if (!conn) {
  839 + for (std::vector<SrsConnection*>::iterator it = conns.begin(); it != conns.end(); ++it) {
  840 + SrsConnection* client = *it;
  841 + srs_assert(client);
  842 +
  843 + // only resample, do resample when all finished.
  844 + resample_kbps(client, false);
  845 + }
  846 +
  847 + kbps->sample();
  848 + return;
  849 + }
  850 +
  851 + // resample for connection.
  852 + conn->kbps_resample();
  853 +
  854 + kbps->add_delta(conn);
  855 +
  856 + if (do_resample) {
  857 + kbps->sample();
  858 + }
  859 +}
  860 +
804 int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd) 861 int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd)
805 { 862 {
806 int ret = ERROR_SUCCESS; 863 int ret = ERROR_SUCCESS;
@@ -41,6 +41,7 @@ class SrsConnection; @@ -41,6 +41,7 @@ class SrsConnection;
41 class SrsHttpHandler; 41 class SrsHttpHandler;
42 class SrsIngester; 42 class SrsIngester;
43 class SrsHttpHeartbeat; 43 class SrsHttpHeartbeat;
  44 +class SrsKbps;
44 45
45 // listener type for server to identify the connection, 46 // listener type for server to identify the connection,
46 // that is, use different type to process the connection. 47 // that is, use different type to process the connection.
@@ -107,6 +108,10 @@ private: @@ -107,6 +108,10 @@ private:
107 static void sig_catcher(int signo); 108 static void sig_catcher(int signo);
108 }; 109 };
109 110
  111 +/**
  112 +* SRS RTMP server, initialize and listen,
  113 +* start connection service thread, destroy client.
  114 +*/
110 class SrsServer : public ISrsReloadHandler 115 class SrsServer : public ISrsReloadHandler
111 { 116 {
112 private: 117 private:
@@ -127,6 +132,7 @@ private: @@ -127,6 +132,7 @@ private:
127 std::vector<SrsConnection*> conns; 132 std::vector<SrsConnection*> conns;
128 std::vector<SrsListener*> listeners; 133 std::vector<SrsListener*> listeners;
129 SrsSignalManager* signal_manager; 134 SrsSignalManager* signal_manager;
  135 + SrsKbps* kbps;
130 bool signal_reload; 136 bool signal_reload;
131 bool signal_gmc_stop; 137 bool signal_gmc_stop;
132 public: 138 public:
@@ -150,6 +156,8 @@ private: @@ -150,6 +156,8 @@ private:
150 virtual int listen_http_api(); 156 virtual int listen_http_api();
151 virtual int listen_http_stream(); 157 virtual int listen_http_stream();
152 virtual void close_listeners(SrsListenerType type); 158 virtual void close_listeners(SrsListenerType type);
  159 + // resample the server kbps. resample all when conn is NULL.
  160 + virtual void resample_kbps(SrsConnection* conn, bool do_resample = true);
153 // internal only 161 // internal only
154 public: 162 public:
155 virtual int accept_client(SrsListenerType type, st_netfd_t client_stfd); 163 virtual int accept_client(SrsListenerType type, st_netfd_t client_stfd);
@@ -34,6 +34,7 @@ using namespace std; @@ -34,6 +34,7 @@ using namespace std;
34 #include <srs_app_config.hpp> 34 #include <srs_app_config.hpp>
35 #include <srs_kernel_utility.hpp> 35 #include <srs_kernel_utility.hpp>
36 #include <srs_kernel_error.hpp> 36 #include <srs_kernel_error.hpp>
  37 +#include <srs_app_kbps.hpp>
37 38
38 #define SRS_LOCAL_LOOP_IP "127.0.0.1" 39 #define SRS_LOCAL_LOOP_IP "127.0.0.1"
39 40
@@ -523,6 +524,38 @@ void srs_update_network_devices() @@ -523,6 +524,38 @@ void srs_update_network_devices()
523 } 524 }
524 } 525 }
525 526
  527 +SrsNetworkRtmpServer::SrsNetworkRtmpServer()
  528 +{
  529 + ok = false;
  530 + sample_time = rbytes = sbytes = 0;
  531 +}
  532 +
  533 +static SrsNetworkRtmpServer _srs_network_rtmp_server;
  534 +
  535 +SrsNetworkRtmpServer* srs_get_network_rtmp_server()
  536 +{
  537 + return &_srs_network_rtmp_server;
  538 +}
  539 +
  540 +void srs_update_rtmp_server(SrsKbps* kbps)
  541 +{
  542 + SrsNetworkRtmpServer& r = _srs_network_rtmp_server;
  543 +
  544 + r.ok = true;
  545 +
  546 + r.sample_time = srs_get_system_time_ms();
  547 +
  548 + r.rbytes = kbps->get_recv_bytes();
  549 + r.rkbps = kbps->get_recv_kbps();
  550 + r.rkbps_30s = kbps->get_recv_kbps_30s();
  551 + r.rkbps_5m = kbps->get_recv_kbps_5m();
  552 +
  553 + r.sbytes = kbps->get_send_bytes();
  554 + r.skbps = kbps->get_send_kbps();
  555 + r.skbps_30s = kbps->get_send_kbps_30s();
  556 + r.skbps_5m = kbps->get_send_kbps_5m();
  557 +}
  558 +
526 vector<string> _srs_system_ipv4_ips; 559 vector<string> _srs_system_ipv4_ips;
527 560
528 void retrieve_local_ipv4_ips() 561 void retrieve_local_ipv4_ips()
@@ -35,6 +35,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -35,6 +35,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
35 35
36 #include <sys/resource.h> 36 #include <sys/resource.h>
37 37
  38 +class SrsKbps;
  39 +
38 /** 40 /**
39 * convert level in string to log level in int. 41 * convert level in string to log level in int.
40 * @return the log level defined in SrsLogLevel. 42 * @return the log level defined in SrsLogLevel.
@@ -390,6 +392,36 @@ extern int srs_get_network_devices_count(); @@ -390,6 +392,36 @@ extern int srs_get_network_devices_count();
390 // the deamon st-thread will update it. 392 // the deamon st-thread will update it.
391 extern void srs_update_network_devices(); 393 extern void srs_update_network_devices();
392 394
  395 +// srs rtmp network summary
  396 +class SrsNetworkRtmpServer
  397 +{
  398 +public:
  399 + // whether the network device is ok.
  400 + bool ok;
  401 +
  402 + // the sample time in ms.
  403 + int64_t sample_time;
  404 +
  405 + // data for receive.
  406 + int64_t rbytes;
  407 + int rkbps;
  408 + int rkbps_30s;
  409 + int rkbps_5m;
  410 +
  411 + // data for transmit
  412 + int64_t sbytes;
  413 + int skbps;
  414 + int skbps_30s;
  415 + int skbps_5m;
  416 +
  417 + SrsNetworkRtmpServer();
  418 +};
  419 +
  420 +// get network devices info, use cache to avoid performance problem.
  421 +extern SrsNetworkRtmpServer* srs_get_network_rtmp_server();
  422 +// the deamon st-thread will update it.
  423 +extern void srs_update_rtmp_server(SrsKbps* kbps);
  424 +
393 // get local ip, fill to @param ips 425 // get local ip, fill to @param ips
394 extern void srs_retrieve_local_ipv4_ips(); 426 extern void srs_retrieve_local_ipv4_ips();
395 extern std::vector<std::string>& srs_get_local_ipv4_ips(); 427 extern std::vector<std::string>& srs_get_local_ipv4_ips();
@@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
31 // current release version 31 // current release version
32 #define VERSION_MAJOR "0" 32 #define VERSION_MAJOR "0"
33 #define VERSION_MINOR "9" 33 #define VERSION_MINOR "9"
34 -#define VERSION_REVISION "125" 34 +#define VERSION_REVISION "126"
35 #define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION 35 #define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION
36 // server info. 36 // server info.
37 #define RTMP_SIG_SRS_KEY "SRS" 37 #define RTMP_SIG_SRS_KEY "SRS"