winlin

refine framework to calc the kbps

@@ -464,11 +464,11 @@ MODULE_ID="APP" @@ -464,11 +464,11 @@ MODULE_ID="APP"
464 MODULE_DEPENDS=("CORE" "KERNEL" "RTMP") 464 MODULE_DEPENDS=("CORE" "KERNEL" "RTMP")
465 ModuleLibIncs=(${LibSTRoot} ${LibHttpParserRoot} ${SRS_OBJS}) 465 ModuleLibIncs=(${LibSTRoot} ${LibHttpParserRoot} ${SRS_OBJS})
466 MODULE_FILES=("srs_app_server" "srs_app_conn" "srs_app_rtmp_conn" "srs_app_socket" "srs_app_source" 466 MODULE_FILES=("srs_app_server" "srs_app_conn" "srs_app_rtmp_conn" "srs_app_socket" "srs_app_source"
467 - "srs_app_codec" "srs_app_refer" "srs_app_hls" "srs_app_forward" "srs_app_encoder"  
468 - "srs_app_http" "srs_app_thread" "srs_app_bandwidth" "srs_app_st" "srs_app_log"  
469 - "srs_app_config" "srs_app_pithy_print" "srs_app_reload" "srs_app_http_api"  
470 - "srs_app_http_conn" "srs_app_http_hooks" "srs_app_json" "srs_app_ingest"  
471 - "srs_app_ffmpeg" "srs_app_utility" "srs_app_dvr" "srs_app_edge") 467 + "srs_app_codec" "srs_app_refer" "srs_app_hls" "srs_app_forward" "srs_app_encoder" "srs_app_http"
  468 + "srs_app_thread" "srs_app_bandwidth" "srs_app_st" "srs_app_log" "srs_app_config"
  469 + "srs_app_pithy_print" "srs_app_reload" "srs_app_http_api" "srs_app_http_conn" "srs_app_http_hooks"
  470 + "srs_app_json" "srs_app_ingest" "srs_app_ffmpeg" "srs_app_utility" "srs_app_dvr" "srs_app_edge"
  471 + "srs_app_kbps")
472 APP_INCS="src/app"; MODULE_DIR=${APP_INCS} . auto/modules.sh 472 APP_INCS="src/app"; MODULE_DIR=${APP_INCS} . auto/modules.sh
473 APP_OBJS="${MODULE_OBJS[@]}" 473 APP_OBJS="${MODULE_OBJS[@]}"
474 # 474 #
@@ -42,6 +42,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -42,6 +42,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
42 #include <srs_app_pithy_print.hpp> 42 #include <srs_app_pithy_print.hpp>
43 #include <srs_core_autofree.hpp> 43 #include <srs_core_autofree.hpp>
44 #include <srs_app_socket.hpp> 44 #include <srs_app_socket.hpp>
  45 +#include <srs_app_kbps.hpp>
45 46
46 // when error, edge ingester sleep for a while and retry. 47 // when error, edge ingester sleep for a while and retry.
47 #define SRS_EDGE_INGESTER_SLEEP_US (int64_t)(1*1000*1000LL) 48 #define SRS_EDGE_INGESTER_SLEEP_US (int64_t)(1*1000*1000LL)
@@ -61,6 +62,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -61,6 +62,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
61 SrsEdgeIngester::SrsEdgeIngester() 62 SrsEdgeIngester::SrsEdgeIngester()
62 { 63 {
63 io = NULL; 64 io = NULL;
  65 + kbps = new SrsKbps();
64 client = NULL; 66 client = NULL;
65 _edge = NULL; 67 _edge = NULL;
66 _req = NULL; 68 _req = NULL;
@@ -75,6 +77,7 @@ SrsEdgeIngester::~SrsEdgeIngester() @@ -75,6 +77,7 @@ SrsEdgeIngester::~SrsEdgeIngester()
75 stop(); 77 stop();
76 78
77 srs_freep(pthread); 79 srs_freep(pthread);
  80 + srs_freep(kbps);
78 } 81 }
79 82
80 int SrsEdgeIngester::initialize(SrsSource* source, SrsPlayEdge* edge, SrsRequest* req) 83 int SrsEdgeIngester::initialize(SrsSource* source, SrsPlayEdge* edge, SrsRequest* req)
@@ -101,6 +104,7 @@ void SrsEdgeIngester::stop() @@ -101,6 +104,7 @@ void SrsEdgeIngester::stop()
101 104
102 srs_freep(client); 105 srs_freep(client);
103 srs_freep(io); 106 srs_freep(io);
  107 + kbps->set_io(NULL, NULL);
104 } 108 }
105 109
106 int SrsEdgeIngester::cycle() 110 int SrsEdgeIngester::cycle()
@@ -169,9 +173,8 @@ int SrsEdgeIngester::ingest() @@ -169,9 +173,8 @@ int SrsEdgeIngester::ingest()
169 // pithy print 173 // pithy print
170 if (pithy_print.can_print()) { 174 if (pithy_print.can_print()) {
171 srs_trace("<- "SRS_LOG_ID_EDGE_PLAY 175 srs_trace("<- "SRS_LOG_ID_EDGE_PLAY
172 - " time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",  
173 - pithy_print.age(), client->get_send_bytes(), client->get_recv_bytes(),  
174 - client->get_send_kbps(), client->get_recv_kbps()); 176 + " time=%"PRId64", okbps=%d, ikbps=%d",
  177 + pithy_print.age(), kbps->get_send_kbps(), kbps->get_recv_kbps());
175 } 178 }
176 179
177 // read from client. 180 // read from client.
@@ -303,6 +306,7 @@ int SrsEdgeIngester::connect_server() @@ -303,6 +306,7 @@ int SrsEdgeIngester::connect_server()
303 306
304 io = new SrsSocket(stfd); 307 io = new SrsSocket(stfd);
305 client = new SrsRtmpClient(io); 308 client = new SrsRtmpClient(io);
  309 + kbps->set_io(io, io);
306 310
307 // connect to server. 311 // connect to server.
308 std::string ip = srs_dns_resolve(server); 312 std::string ip = srs_dns_resolve(server);
@@ -330,6 +334,7 @@ int SrsEdgeIngester::connect_server() @@ -330,6 +334,7 @@ int SrsEdgeIngester::connect_server()
330 SrsEdgeForwarder::SrsEdgeForwarder() 334 SrsEdgeForwarder::SrsEdgeForwarder()
331 { 335 {
332 io = NULL; 336 io = NULL;
  337 + kbps = NULL;
333 client = NULL; 338 client = NULL;
334 _edge = NULL; 339 _edge = NULL;
335 _req = NULL; 340 _req = NULL;
@@ -347,6 +352,7 @@ SrsEdgeForwarder::~SrsEdgeForwarder() @@ -347,6 +352,7 @@ SrsEdgeForwarder::~SrsEdgeForwarder()
347 352
348 srs_freep(pthread); 353 srs_freep(pthread);
349 srs_freep(queue); 354 srs_freep(queue);
  355 + srs_freep(kbps);
350 } 356 }
351 357
352 void SrsEdgeForwarder::set_queue_size(double queue_size) 358 void SrsEdgeForwarder::set_queue_size(double queue_size)
@@ -411,6 +417,7 @@ void SrsEdgeForwarder::stop() @@ -411,6 +417,7 @@ void SrsEdgeForwarder::stop()
411 417
412 srs_freep(client); 418 srs_freep(client);
413 srs_freep(io); 419 srs_freep(io);
  420 + kbps->set_io(NULL, NULL);
414 } 421 }
415 422
416 int SrsEdgeForwarder::cycle() 423 int SrsEdgeForwarder::cycle()
@@ -458,9 +465,8 @@ int SrsEdgeForwarder::cycle() @@ -458,9 +465,8 @@ int SrsEdgeForwarder::cycle()
458 // pithy print 465 // pithy print
459 if (pithy_print.can_print()) { 466 if (pithy_print.can_print()) {
460 srs_trace("-> "SRS_LOG_ID_EDGE_PUBLISH 467 srs_trace("-> "SRS_LOG_ID_EDGE_PUBLISH
461 - " time=%"PRId64", msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",  
462 - pithy_print.age(), count, client->get_send_bytes(), client->get_recv_bytes(),  
463 - client->get_send_kbps(), client->get_recv_kbps()); 468 + " time=%"PRId64", msgs=%d, okbps=%d, ikbps=%d",
  469 + pithy_print.age(), count, kbps->get_send_kbps(), kbps->get_recv_kbps());
464 } 470 }
465 471
466 // ignore when no messages. 472 // ignore when no messages.
@@ -576,6 +582,7 @@ int SrsEdgeForwarder::connect_server() @@ -576,6 +582,7 @@ int SrsEdgeForwarder::connect_server()
576 582
577 io = new SrsSocket(stfd); 583 io = new SrsSocket(stfd);
578 client = new SrsRtmpClient(io); 584 client = new SrsRtmpClient(io);
  585 + kbps->set_io(io, io);
579 586
580 // connect to server. 587 // connect to server.
581 std::string ip = srs_dns_resolve(server); 588 std::string ip = srs_dns_resolve(server);
@@ -43,6 +43,7 @@ class SrsRtmpClient; @@ -43,6 +43,7 @@ class SrsRtmpClient;
43 class SrsMessage; 43 class SrsMessage;
44 class SrsMessageQueue; 44 class SrsMessageQueue;
45 class ISrsProtocolReaderWriter; 45 class ISrsProtocolReaderWriter;
  46 +class SrsKbps;
46 47
47 /** 48 /**
48 * the state of edge, auto machine 49 * the state of edge, auto machine
@@ -83,6 +84,7 @@ private: @@ -83,6 +84,7 @@ private:
83 SrsThread* pthread; 84 SrsThread* pthread;
84 st_netfd_t stfd; 85 st_netfd_t stfd;
85 ISrsProtocolReaderWriter* io; 86 ISrsProtocolReaderWriter* io;
  87 + SrsKbps* kbps;
86 SrsRtmpClient* client; 88 SrsRtmpClient* client;
87 int origin_index; 89 int origin_index;
88 public: 90 public:
@@ -116,6 +118,7 @@ private: @@ -116,6 +118,7 @@ private:
116 SrsThread* pthread; 118 SrsThread* pthread;
117 st_netfd_t stfd; 119 st_netfd_t stfd;
118 ISrsProtocolReaderWriter* io; 120 ISrsProtocolReaderWriter* io;
  121 + SrsKbps* kbps;
119 SrsRtmpClient* client; 122 SrsRtmpClient* client;
120 int origin_index; 123 int origin_index;
121 /** 124 /**
@@ -39,6 +39,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -39,6 +39,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
39 #include <srs_protocol_rtmp_stack.hpp> 39 #include <srs_protocol_rtmp_stack.hpp>
40 #include <srs_protocol_utility.hpp> 40 #include <srs_protocol_utility.hpp>
41 #include <srs_protocol_rtmp.hpp> 41 #include <srs_protocol_rtmp.hpp>
  42 +#include <srs_app_kbps.hpp>
42 43
43 // when error, forwarder sleep for a while and retry. 44 // when error, forwarder sleep for a while and retry.
44 #define SRS_FORWARDER_SLEEP_US (int64_t)(3*1000*1000LL) 45 #define SRS_FORWARDER_SLEEP_US (int64_t)(3*1000*1000LL)
@@ -50,6 +51,7 @@ SrsForwarder::SrsForwarder(SrsSource* _source) @@ -50,6 +51,7 @@ SrsForwarder::SrsForwarder(SrsSource* _source)
50 io = NULL; 51 io = NULL;
51 client = NULL; 52 client = NULL;
52 stfd = NULL; 53 stfd = NULL;
  54 + kbps = new SrsKbps();
53 stream_id = 0; 55 stream_id = 0;
54 56
55 pthread = new SrsThread(this, SRS_FORWARDER_SLEEP_US); 57 pthread = new SrsThread(this, SRS_FORWARDER_SLEEP_US);
@@ -64,6 +66,7 @@ SrsForwarder::~SrsForwarder() @@ -64,6 +66,7 @@ SrsForwarder::~SrsForwarder()
64 srs_freep(pthread); 66 srs_freep(pthread);
65 srs_freep(queue); 67 srs_freep(queue);
66 srs_freep(jitter); 68 srs_freep(jitter);
  69 + srs_freep(kbps);
67 } 70 }
68 71
69 void SrsForwarder::set_queue_size(double queue_size) 72 void SrsForwarder::set_queue_size(double queue_size)
@@ -146,6 +149,7 @@ void SrsForwarder::on_unpublish() @@ -146,6 +149,7 @@ void SrsForwarder::on_unpublish()
146 149
147 srs_freep(client); 150 srs_freep(client);
148 srs_freep(io); 151 srs_freep(io);
  152 + kbps->set_io(NULL, NULL);
149 } 153 }
150 154
151 int SrsForwarder::on_meta_data(SrsSharedPtrMessage* metadata) 155 int SrsForwarder::on_meta_data(SrsSharedPtrMessage* metadata)
@@ -275,6 +279,7 @@ int SrsForwarder::connect_server() @@ -275,6 +279,7 @@ int SrsForwarder::connect_server()
275 279
276 io = new SrsSocket(stfd); 280 io = new SrsSocket(stfd);
277 client = new SrsRtmpClient(io); 281 client = new SrsRtmpClient(io);
  282 + kbps->set_io(io, io);
278 283
279 // connect to server. 284 // connect to server.
280 std::string ip = srs_dns_resolve(server); 285 std::string ip = srs_dns_resolve(server);
@@ -338,9 +343,8 @@ int SrsForwarder::forward() @@ -338,9 +343,8 @@ int SrsForwarder::forward()
338 // pithy print 343 // pithy print
339 if (pithy_print.can_print()) { 344 if (pithy_print.can_print()) {
340 srs_trace("-> "SRS_LOG_ID_FOWARDER 345 srs_trace("-> "SRS_LOG_ID_FOWARDER
341 - " time=%"PRId64", msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",  
342 - pithy_print.age(), count, client->get_send_bytes(), client->get_recv_bytes(),  
343 - client->get_send_kbps(), client->get_recv_kbps()); 346 + " time=%"PRId64", msgs=%d, okbps=%d, ikbps=%d",
  347 + pithy_print.age(), count, kbps->get_send_kbps(), kbps->get_recv_kbps());
344 } 348 }
345 349
346 // ignore when no messages. 350 // ignore when no messages.
@@ -42,6 +42,7 @@ class SrsRtmpJitter; @@ -42,6 +42,7 @@ class SrsRtmpJitter;
42 class SrsRtmpClient; 42 class SrsRtmpClient;
43 class SrsRequest; 43 class SrsRequest;
44 class SrsSource; 44 class SrsSource;
  45 +class SrsKbps;
45 46
46 /** 47 /**
47 * forward the stream to other servers. 48 * forward the stream to other servers.
@@ -61,6 +62,7 @@ private: @@ -61,6 +62,7 @@ private:
61 private: 62 private:
62 SrsSource* source; 63 SrsSource* source;
63 ISrsProtocolReaderWriter* io; 64 ISrsProtocolReaderWriter* io;
  65 + SrsKbps* kbps;
64 SrsRtmpClient* client; 66 SrsRtmpClient* client;
65 SrsRtmpJitter* jitter; 67 SrsRtmpJitter* jitter;
66 SrsMessageQueue* queue; 68 SrsMessageQueue* queue;
  1 +/*
  2 +The MIT License (MIT)
  3 +
  4 +Copyright (c) 2013-2014 winlin
  5 +
  6 +Permission is hereby granted, free of charge, to any person obtaining a copy of
  7 +this software and associated documentation files (the "Software"), to deal in
  8 +the Software without restriction, including without limitation the rights to
  9 +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
  10 +the Software, and to permit persons to whom the Software is furnished to do so,
  11 +subject to the following conditions:
  12 +
  13 +The above copyright notice and this permission notice shall be included in all
  14 +copies or substantial portions of the Software.
  15 +
  16 +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17 +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
  18 +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
  19 +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
  20 +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  21 +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  22 +*/
  23 +
  24 +#include <srs_app_kbps.hpp>
  25 +
  26 +#include <srs_kernel_error.hpp>
  27 +#include <srs_kernel_log.hpp>
  28 +#include <srs_protocol_io.hpp>
  29 +
  30 +SrsKbps::SrsKbps()
  31 +{
  32 + _in = NULL;
  33 + _out = NULL;
  34 +}
  35 +
  36 +SrsKbps::~SrsKbps()
  37 +{
  38 +}
  39 +
  40 +void SrsKbps::set_io(ISrsProtocolReader* in, ISrsProtocolWriter* out)
  41 +{
  42 + _in = in;
  43 + _out = out;
  44 +}
  45 +
  46 +int SrsKbps::get_send_kbps()
  47 +{
  48 + return 0;
  49 +}
  50 +
  51 +int SrsKbps::get_recv_kbps()
  52 +{
  53 + return 0;
  54 +}
  55 +
  1 +/*
  2 +The MIT License (MIT)
  3 +
  4 +Copyright (c) 2013-2014 winlin
  5 +
  6 +Permission is hereby granted, free of charge, to any person obtaining a copy of
  7 +this software and associated documentation files (the "Software"), to deal in
  8 +the Software without restriction, including without limitation the rights to
  9 +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
  10 +the Software, and to permit persons to whom the Software is furnished to do so,
  11 +subject to the following conditions:
  12 +
  13 +The above copyright notice and this permission notice shall be included in all
  14 +copies or substantial portions of the Software.
  15 +
  16 +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17 +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
  18 +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
  19 +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
  20 +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  21 +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  22 +*/
  23 +
  24 +#ifndef SRS_APP_KBPS_HPP
  25 +#define SRS_APP_KBPS_HPP
  26 +
  27 +/*
  28 +#include <srs_app_kbps.hpp>
  29 +*/
  30 +
  31 +#include <srs_core.hpp>
  32 +
  33 +class ISrsProtocolReader;
  34 +class ISrsProtocolWriter;
  35 +
  36 +/**
  37 +* to statistic the kbps of io.
  38 +*/
  39 +class SrsKbps
  40 +{
  41 +private:
  42 + ISrsProtocolReader* _in;
  43 + ISrsProtocolWriter* _out;
  44 +public:
  45 + SrsKbps();
  46 + virtual ~SrsKbps();
  47 +public:
  48 + virtual void set_io(ISrsProtocolReader* in, ISrsProtocolWriter* out);
  49 +public:
  50 + virtual int get_send_kbps();
  51 + virtual int get_recv_kbps();
  52 +};
  53 +
  54 +#endif
@@ -43,6 +43,7 @@ using namespace std; @@ -43,6 +43,7 @@ using namespace std;
43 #include <srs_app_socket.hpp> 43 #include <srs_app_socket.hpp>
44 #include <srs_app_http_hooks.hpp> 44 #include <srs_app_http_hooks.hpp>
45 #include <srs_app_edge.hpp> 45 #include <srs_app_edge.hpp>
  46 +#include <srs_app_kbps.hpp>
46 47
47 // when stream is busy, for example, streaming is already 48 // when stream is busy, for example, streaming is already
48 // publishing, when a new client to request to publish, 49 // publishing, when a new client to request to publish,
@@ -71,6 +72,8 @@ SrsRtmpConn::SrsRtmpConn(SrsServer* srs_server, st_netfd_t client_stfd) @@ -71,6 +72,8 @@ SrsRtmpConn::SrsRtmpConn(SrsServer* srs_server, st_netfd_t client_stfd)
71 refer = new SrsRefer(); 72 refer = new SrsRefer();
72 bandwidth = new SrsBandwidth(); 73 bandwidth = new SrsBandwidth();
73 duration = 0; 74 duration = 0;
  75 + kbps = new SrsKbps();
  76 + kbps->set_io(skt, skt);
74 77
75 _srs_config->subscribe(this); 78 _srs_config->subscribe(this);
76 } 79 }
@@ -87,6 +90,7 @@ SrsRtmpConn::~SrsRtmpConn() @@ -87,6 +90,7 @@ SrsRtmpConn::~SrsRtmpConn()
87 srs_freep(skt); 90 srs_freep(skt);
88 srs_freep(refer); 91 srs_freep(refer);
89 srs_freep(bandwidth); 92 srs_freep(bandwidth);
  93 + srs_freep(kbps);
90 } 94 }
91 95
92 // TODO: return detail message when error for client. 96 // TODO: return detail message when error for client.
@@ -501,9 +505,8 @@ int SrsRtmpConn::playing(SrsSource* source) @@ -501,9 +505,8 @@ int SrsRtmpConn::playing(SrsSource* source)
501 // reportable 505 // reportable
502 if (pithy_print.can_print()) { 506 if (pithy_print.can_print()) {
503 srs_trace("-> "SRS_LOG_ID_PLAY 507 srs_trace("-> "SRS_LOG_ID_PLAY
504 - " time=%"PRId64", duration=%"PRId64", msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",  
505 - pithy_print.age(), duration, count, rtmp->get_send_bytes(), rtmp->get_recv_bytes(),  
506 - rtmp->get_send_kbps(), rtmp->get_recv_kbps()); 508 + " time=%"PRId64", duration=%"PRId64", msgs=%d, okbps=%d, ikbps=%d",
  509 + pithy_print.age(), duration, count, kbps->get_send_kbps(), kbps->get_recv_kbps());
507 } 510 }
508 511
509 if (count <= 0) { 512 if (count <= 0) {
@@ -586,9 +589,8 @@ int SrsRtmpConn::fmle_publish(SrsSource* source) @@ -586,9 +589,8 @@ int SrsRtmpConn::fmle_publish(SrsSource* source)
586 // reportable 589 // reportable
587 if (pithy_print.can_print()) { 590 if (pithy_print.can_print()) {
588 srs_trace("<- "SRS_LOG_ID_CLIENT_PUBLISH 591 srs_trace("<- "SRS_LOG_ID_CLIENT_PUBLISH
589 - " time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",  
590 - pithy_print.age(), rtmp->get_send_bytes(), rtmp->get_recv_bytes(),  
591 - rtmp->get_send_kbps(), rtmp->get_recv_kbps()); 592 + " time=%"PRId64", okbps=%d, ikbps=%d",
  593 + pithy_print.age(), kbps->get_send_kbps(), kbps->get_recv_kbps());
592 } 594 }
593 595
594 // process UnPublish event. 596 // process UnPublish event.
@@ -663,9 +665,8 @@ int SrsRtmpConn::flash_publish(SrsSource* source) @@ -663,9 +665,8 @@ int SrsRtmpConn::flash_publish(SrsSource* source)
663 // reportable 665 // reportable
664 if (pithy_print.can_print()) { 666 if (pithy_print.can_print()) {
665 srs_trace("<- "SRS_LOG_ID_WEB_PUBLISH 667 srs_trace("<- "SRS_LOG_ID_WEB_PUBLISH
666 - " time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",  
667 - pithy_print.age(), rtmp->get_send_bytes(), rtmp->get_recv_bytes(),  
668 - rtmp->get_send_kbps(), rtmp->get_recv_kbps()); 668 + " time=%"PRId64", okbps=%d, ikbps=%d",
  669 + pithy_print.age(), kbps->get_send_kbps(), kbps->get_recv_kbps());
669 } 670 }
670 671
671 // process UnPublish event. 672 // process UnPublish event.
@@ -46,6 +46,7 @@ class SrsSocket; @@ -46,6 +46,7 @@ class SrsSocket;
46 class SrsHttpHooks; 46 class SrsHttpHooks;
47 #endif 47 #endif
48 class SrsBandwidth; 48 class SrsBandwidth;
  49 +class SrsKbps;
49 50
50 /** 51 /**
51 * the client provides the main logic control for RTMP clients. 52 * the client provides the main logic control for RTMP clients.
@@ -63,6 +64,7 @@ private: @@ -63,6 +64,7 @@ private:
63 // for live play duration, for instance, rtmpdump to record. 64 // for live play duration, for instance, rtmpdump to record.
64 // @see https://github.com/winlinvip/simple-rtmp-server/issues/47 65 // @see https://github.com/winlinvip/simple-rtmp-server/issues/47
65 int64_t duration; 66 int64_t duration;
  67 + SrsKbps* kbps;
66 public: 68 public:
67 SrsRtmpConn(SrsServer* srs_server, st_netfd_t client_stfd); 69 SrsRtmpConn(SrsServer* srs_server, st_netfd_t client_stfd);
68 virtual ~SrsRtmpConn(); 70 virtual ~SrsRtmpConn();
@@ -31,7 +31,6 @@ SrsSocket::SrsSocket(st_netfd_t client_stfd) @@ -31,7 +31,6 @@ SrsSocket::SrsSocket(st_netfd_t client_stfd)
31 stfd = client_stfd; 31 stfd = client_stfd;
32 send_timeout = recv_timeout = ST_UTIME_NO_TIMEOUT; 32 send_timeout = recv_timeout = ST_UTIME_NO_TIMEOUT;
33 recv_bytes = send_bytes = 0; 33 recv_bytes = send_bytes = 0;
34 - start_time_ms = srs_get_system_time_ms();  
35 } 34 }
36 35
37 SrsSocket::~SrsSocket() 36 SrsSocket::~SrsSocket()
@@ -73,28 +72,6 @@ int64_t SrsSocket::get_send_bytes() @@ -73,28 +72,6 @@ int64_t SrsSocket::get_send_bytes()
73 return send_bytes; 72 return send_bytes;
74 } 73 }
75 74
76 -int SrsSocket::get_recv_kbps()  
77 -{  
78 - int64_t diff_ms = srs_get_system_time_ms() - start_time_ms;  
79 -  
80 - if (diff_ms <= 0) {  
81 - return 0;  
82 - }  
83 -  
84 - return recv_bytes * 8 / diff_ms;  
85 -}  
86 -  
87 -int SrsSocket::get_send_kbps()  
88 -{  
89 - int64_t diff_ms = srs_get_system_time_ms() - start_time_ms;  
90 -  
91 - if (diff_ms <= 0) {  
92 - return 0;  
93 - }  
94 -  
95 - return send_bytes * 8 / diff_ms;  
96 -}  
97 -  
98 int SrsSocket::read(const void* buf, size_t size, ssize_t* nread) 75 int SrsSocket::read(const void* buf, size_t size, ssize_t* nread)
99 { 76 {
100 int ret = ERROR_SUCCESS; 77 int ret = ERROR_SUCCESS;
@@ -44,7 +44,6 @@ private: @@ -44,7 +44,6 @@ private:
44 int64_t send_timeout; 44 int64_t send_timeout;
45 int64_t recv_bytes; 45 int64_t recv_bytes;
46 int64_t send_bytes; 46 int64_t send_bytes;
47 - int64_t start_time_ms;  
48 st_netfd_t stfd; 47 st_netfd_t stfd;
49 public: 48 public:
50 SrsSocket(st_netfd_t client_stfd); 49 SrsSocket(st_netfd_t client_stfd);
@@ -57,8 +56,6 @@ public: @@ -57,8 +56,6 @@ public:
57 virtual int64_t get_send_timeout(); 56 virtual int64_t get_send_timeout();
58 virtual int64_t get_recv_bytes(); 57 virtual int64_t get_recv_bytes();
59 virtual int64_t get_send_bytes(); 58 virtual int64_t get_send_bytes();
60 - virtual int get_recv_kbps();  
61 - virtual int get_send_kbps();  
62 public: 59 public:
63 /** 60 /**
64 * @param nread, the actual read bytes, ignore if NULL. 61 * @param nread, the actual read bytes, ignore if NULL.
@@ -44,9 +44,6 @@ SimpleSocketStream::SimpleSocketStream() @@ -44,9 +44,6 @@ SimpleSocketStream::SimpleSocketStream()
44 fd = -1; 44 fd = -1;
45 send_timeout = recv_timeout = ST_UTIME_NO_TIMEOUT; 45 send_timeout = recv_timeout = ST_UTIME_NO_TIMEOUT;
46 recv_bytes = send_bytes = 0; 46 recv_bytes = send_bytes = 0;
47 -  
48 - srs_update_system_time_ms();  
49 - start_time_ms = srs_get_system_time_ms();  
50 } 47 }
51 48
52 SimpleSocketStream::~SimpleSocketStream() 49 SimpleSocketStream::~SimpleSocketStream()
@@ -122,18 +119,6 @@ int64_t SimpleSocketStream::get_recv_bytes() @@ -122,18 +119,6 @@ int64_t SimpleSocketStream::get_recv_bytes()
122 return recv_bytes; 119 return recv_bytes;
123 } 120 }
124 121
125 -int SimpleSocketStream::get_recv_kbps()  
126 -{  
127 - srs_update_system_time_ms();  
128 - int64_t diff_ms = srs_get_system_time_ms() - start_time_ms;  
129 -  
130 - if (diff_ms <= 0) {  
131 - return 0;  
132 - }  
133 -  
134 - return recv_bytes * 8 / diff_ms;  
135 -}  
136 -  
137 // ISrsProtocolWriter 122 // ISrsProtocolWriter
138 void SimpleSocketStream::set_send_timeout(int64_t timeout_us) 123 void SimpleSocketStream::set_send_timeout(int64_t timeout_us)
139 { 124 {
@@ -150,18 +135,6 @@ int64_t SimpleSocketStream::get_send_bytes() @@ -150,18 +135,6 @@ int64_t SimpleSocketStream::get_send_bytes()
150 return send_bytes; 135 return send_bytes;
151 } 136 }
152 137
153 -int SimpleSocketStream::get_send_kbps()  
154 -{  
155 - srs_update_system_time_ms();  
156 - int64_t diff_ms = srs_get_system_time_ms() - start_time_ms;  
157 -  
158 - if (diff_ms <= 0) {  
159 - return 0;  
160 - }  
161 -  
162 - return send_bytes * 8 / diff_ms;  
163 -}  
164 -  
165 int SimpleSocketStream::writev(const iovec *iov, int iov_size, ssize_t* nwrite) 138 int SimpleSocketStream::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
166 { 139 {
167 int ret = ERROR_SUCCESS; 140 int ret = ERROR_SUCCESS;
@@ -39,7 +39,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -39,7 +39,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
39 class SimpleSocketStream : public ISrsProtocolReaderWriter 39 class SimpleSocketStream : public ISrsProtocolReaderWriter
40 { 40 {
41 private: 41 private:
42 - int64_t start_time_ms;  
43 int64_t recv_timeout; 42 int64_t recv_timeout;
44 int64_t send_timeout; 43 int64_t send_timeout;
45 int64_t recv_bytes; 44 int64_t recv_bytes;
@@ -59,13 +58,11 @@ public: @@ -59,13 +58,11 @@ public:
59 virtual void set_recv_timeout(int64_t timeout_us); 58 virtual void set_recv_timeout(int64_t timeout_us);
60 virtual int64_t get_recv_timeout(); 59 virtual int64_t get_recv_timeout();
61 virtual int64_t get_recv_bytes(); 60 virtual int64_t get_recv_bytes();
62 - virtual int get_recv_kbps();  
63 // ISrsProtocolWriter 61 // ISrsProtocolWriter
64 public: 62 public:
65 virtual void set_send_timeout(int64_t timeout_us); 63 virtual void set_send_timeout(int64_t timeout_us);
66 virtual int64_t get_send_timeout(); 64 virtual int64_t get_send_timeout();
67 virtual int64_t get_send_bytes(); 65 virtual int64_t get_send_bytes();
68 - virtual int get_send_kbps();  
69 virtual int writev(const iovec *iov, int iov_size, ssize_t* nwrite); 66 virtual int writev(const iovec *iov, int iov_size, ssize_t* nwrite);
70 // ISrsProtocolReaderWriter 67 // ISrsProtocolReaderWriter
71 public: 68 public:
@@ -47,8 +47,6 @@ public: @@ -47,8 +47,6 @@ public:
47 virtual void set_recv_timeout(int64_t timeout_us) = 0; 47 virtual void set_recv_timeout(int64_t timeout_us) = 0;
48 virtual int64_t get_recv_timeout() = 0; 48 virtual int64_t get_recv_timeout() = 0;
49 virtual int64_t get_recv_bytes() = 0; 49 virtual int64_t get_recv_bytes() = 0;
50 - // TODO: FIXME: remove this interface.  
51 - virtual int get_recv_kbps() = 0;  
52 }; 50 };
53 51
54 /** 52 /**
@@ -64,8 +62,6 @@ public: @@ -64,8 +62,6 @@ public:
64 virtual void set_send_timeout(int64_t timeout_us) = 0; 62 virtual void set_send_timeout(int64_t timeout_us) = 0;
65 virtual int64_t get_send_timeout() = 0; 63 virtual int64_t get_send_timeout() = 0;
66 virtual int64_t get_send_bytes() = 0; 64 virtual int64_t get_send_bytes() = 0;
67 - // TODO: FIXME: remove this interface.  
68 - virtual int get_send_kbps() = 0;  
69 virtual int writev(const iovec *iov, int iov_size, ssize_t* nwrite) = 0; 65 virtual int writev(const iovec *iov, int iov_size, ssize_t* nwrite) = 0;
70 }; 66 };
71 67
@@ -375,16 +375,6 @@ int64_t SrsRtmpClient::get_send_bytes() @@ -375,16 +375,6 @@ int64_t SrsRtmpClient::get_send_bytes()
375 return protocol->get_send_bytes(); 375 return protocol->get_send_bytes();
376 } 376 }
377 377
378 -int SrsRtmpClient::get_recv_kbps()  
379 -{  
380 - return protocol->get_recv_kbps();  
381 -}  
382 -  
383 -int SrsRtmpClient::get_send_kbps()  
384 -{  
385 - return protocol->get_send_kbps();  
386 -}  
387 -  
388 int SrsRtmpClient::recv_message(SrsMessage** pmsg) 378 int SrsRtmpClient::recv_message(SrsMessage** pmsg)
389 { 379 {
390 return protocol->recv_message(pmsg); 380 return protocol->recv_message(pmsg);
@@ -730,16 +720,6 @@ int64_t SrsRtmpServer::get_send_bytes() @@ -730,16 +720,6 @@ int64_t SrsRtmpServer::get_send_bytes()
730 return protocol->get_send_bytes(); 720 return protocol->get_send_bytes();
731 } 721 }
732 722
733 -int SrsRtmpServer::get_recv_kbps()  
734 -{  
735 - return protocol->get_recv_kbps();  
736 -}  
737 -  
738 -int SrsRtmpServer::get_send_kbps()  
739 -{  
740 - return protocol->get_send_kbps();  
741 -}  
742 -  
743 int SrsRtmpServer::recv_message(SrsMessage** pmsg) 723 int SrsRtmpServer::recv_message(SrsMessage** pmsg)
744 { 724 {
745 return protocol->recv_message(pmsg); 725 return protocol->recv_message(pmsg);
@@ -161,8 +161,6 @@ public: @@ -161,8 +161,6 @@ public:
161 virtual void set_send_timeout(int64_t timeout_us); 161 virtual void set_send_timeout(int64_t timeout_us);
162 virtual int64_t get_recv_bytes(); 162 virtual int64_t get_recv_bytes();
163 virtual int64_t get_send_bytes(); 163 virtual int64_t get_send_bytes();
164 - virtual int get_recv_kbps();  
165 - virtual int get_send_kbps();  
166 virtual int recv_message(SrsMessage** pmsg); 164 virtual int recv_message(SrsMessage** pmsg);
167 virtual int decode_message(SrsMessage* msg, SrsPacket** ppacket); 165 virtual int decode_message(SrsMessage* msg, SrsPacket** ppacket);
168 virtual int send_and_free_message(SrsMessage* msg); 166 virtual int send_and_free_message(SrsMessage* msg);
@@ -208,8 +206,6 @@ public: @@ -208,8 +206,6 @@ public:
208 virtual int64_t get_send_timeout(); 206 virtual int64_t get_send_timeout();
209 virtual int64_t get_recv_bytes(); 207 virtual int64_t get_recv_bytes();
210 virtual int64_t get_send_bytes(); 208 virtual int64_t get_send_bytes();
211 - virtual int get_recv_kbps();  
212 - virtual int get_send_kbps();  
213 virtual int recv_message(SrsMessage** pmsg); 209 virtual int recv_message(SrsMessage** pmsg);
214 virtual int decode_message(SrsMessage* msg, SrsPacket** ppacket); 210 virtual int decode_message(SrsMessage* msg, SrsPacket** ppacket);
215 virtual int send_and_free_message(SrsMessage* msg); 211 virtual int send_and_free_message(SrsMessage* msg);
@@ -359,16 +359,6 @@ int64_t SrsProtocol::get_send_bytes() @@ -359,16 +359,6 @@ int64_t SrsProtocol::get_send_bytes()
359 return skt->get_send_bytes(); 359 return skt->get_send_bytes();
360 } 360 }
361 361
362 -int SrsProtocol::get_recv_kbps()  
363 -{  
364 - return skt->get_recv_kbps();  
365 -}  
366 -  
367 -int SrsProtocol::get_send_kbps()  
368 -{  
369 - return skt->get_send_kbps();  
370 -}  
371 -  
372 int SrsProtocol::recv_message(SrsMessage** pmsg) 362 int SrsProtocol::recv_message(SrsMessage** pmsg)
373 { 363 {
374 *pmsg = NULL; 364 *pmsg = NULL;
@@ -138,8 +138,6 @@ public: @@ -138,8 +138,6 @@ public:
138 virtual int64_t get_send_timeout(); 138 virtual int64_t get_send_timeout();
139 virtual int64_t get_recv_bytes(); 139 virtual int64_t get_recv_bytes();
140 virtual int64_t get_send_bytes(); 140 virtual int64_t get_send_bytes();
141 - virtual int get_recv_kbps();  
142 - virtual int get_send_kbps();  
143 public: 141 public:
144 /** 142 /**
145 * recv a RTMP message, which is bytes oriented. 143 * recv a RTMP message, which is bytes oriented.
@@ -71,6 +71,8 @@ file @@ -71,6 +71,8 @@ file
71 ..\app\srs_app_ingest.cpp, 71 ..\app\srs_app_ingest.cpp,
72 ..\app\srs_app_json.hpp, 72 ..\app\srs_app_json.hpp,
73 ..\app\srs_app_json.cpp, 73 ..\app\srs_app_json.cpp,
  74 + ..\app\srs_app_kbps.hpp,
  75 + ..\app\srs_app_kbps.cpp,
74 ..\app\srs_app_log.hpp, 76 ..\app\srs_app_log.hpp,
75 ..\app\srs_app_log.cpp, 77 ..\app\srs_app_log.cpp,
76 ..\app\srs_app_refer.hpp, 78 ..\app\srs_app_refer.hpp,
@@ -73,11 +73,6 @@ int64_t MockEmptyIO::get_recv_bytes() @@ -73,11 +73,6 @@ int64_t MockEmptyIO::get_recv_bytes()
73 return -1; 73 return -1;
74 } 74 }
75 75
76 -int MockEmptyIO::get_recv_kbps()  
77 -{  
78 - return 0;  
79 -}  
80 -  
81 void MockEmptyIO::set_send_timeout(int64_t /*timeout_us*/) 76 void MockEmptyIO::set_send_timeout(int64_t /*timeout_us*/)
82 { 77 {
83 } 78 }
@@ -92,11 +87,6 @@ int64_t MockEmptyIO::get_send_bytes() @@ -92,11 +87,6 @@ int64_t MockEmptyIO::get_send_bytes()
92 return 0; 87 return 0;
93 } 88 }
94 89
95 -int MockEmptyIO::get_send_kbps()  
96 -{  
97 - return 0;  
98 -}  
99 -  
100 int MockEmptyIO::writev(const iovec */*iov*/, int /*iov_size*/, ssize_t* /*nwrite*/) 90 int MockEmptyIO::writev(const iovec */*iov*/, int /*iov_size*/, ssize_t* /*nwrite*/)
101 { 91 {
102 return ERROR_SUCCESS; 92 return ERROR_SUCCESS;
@@ -53,13 +53,11 @@ public: @@ -53,13 +53,11 @@ public:
53 virtual void set_recv_timeout(int64_t timeout_us); 53 virtual void set_recv_timeout(int64_t timeout_us);
54 virtual int64_t get_recv_timeout(); 54 virtual int64_t get_recv_timeout();
55 virtual int64_t get_recv_bytes(); 55 virtual int64_t get_recv_bytes();
56 - virtual int get_recv_kbps();  
57 // for protocol 56 // for protocol
58 public: 57 public:
59 virtual void set_send_timeout(int64_t timeout_us); 58 virtual void set_send_timeout(int64_t timeout_us);
60 virtual int64_t get_send_timeout(); 59 virtual int64_t get_send_timeout();
61 virtual int64_t get_send_bytes(); 60 virtual int64_t get_send_bytes();
62 - virtual int get_send_kbps();  
63 virtual int writev(const iovec *iov, int iov_size, ssize_t* nwrite); 61 virtual int writev(const iovec *iov, int iov_size, ssize_t* nwrite);
64 // for protocol/amf0/msg-codec 62 // for protocol/amf0/msg-codec
65 public: 63 public: