winlin

fix memory leak at source. 2.0.214

@@ -340,6 +340,7 @@ Remark: @@ -340,6 +340,7 @@ Remark:
340 340
341 ## History 341 ## History
342 342
  343 +* v2.0, 2016-09-05, fix memory leak at source. 2.0.214
343 * v2.0, 2016-09-05, fix memory leak at handshake. 2.0.213 344 * v2.0, 2016-09-05, fix memory leak at handshake. 2.0.213
344 * v2.0, 2016-09-04, support valgrind for [patched st](https://github.com/ossrs/state-threads/issues/2). 345 * v2.0, 2016-09-04, support valgrind for [patched st](https://github.com/ossrs/state-threads/issues/2).
345 * v2.0, 2016-09-03, support all arm for [patched st](https://github.com/ossrs/state-threads/issues/1). 2.0.212 346 * v2.0, 2016-09-03, support all arm for [patched st](https://github.com/ossrs/state-threads/issues/1). 2.0.212
@@ -71,6 +71,15 @@ int SrsThreadContext::set_id(int v) @@ -71,6 +71,15 @@ int SrsThreadContext::set_id(int v)
71 return ov; 71 return ov;
72 } 72 }
73 73
  74 +void SrsThreadContext::clear_cid()
  75 +{
  76 + st_thread_t self = st_thread_self();
  77 + std::map<st_thread_t, int>::iterator it = cache.find(self);
  78 + if (it != cache.end()) {
  79 + cache.erase(it);
  80 + }
  81 +}
  82 +
74 // the max size of a line of log. 83 // the max size of a line of log.
75 #define LOG_MAX_SIZE 4096 84 #define LOG_MAX_SIZE 4096
76 85
@@ -54,6 +54,8 @@ public: @@ -54,6 +54,8 @@ public:
54 virtual int generate_id(); 54 virtual int generate_id();
55 virtual int get_id(); 55 virtual int get_id();
56 virtual int set_id(int v); 56 virtual int set_id(int v);
  57 +public:
  58 + virtual void clear_cid();
57 }; 59 };
58 60
59 /** 61 /**
@@ -866,6 +866,11 @@ int SrsServer::cycle() @@ -866,6 +866,11 @@ int SrsServer::cycle()
866 srs_warn("main cycle terminated, system quit normally."); 866 srs_warn("main cycle terminated, system quit normally.");
867 dispose(); 867 dispose();
868 srs_trace("srs terminated"); 868 srs_trace("srs terminated");
  869 +
  870 + // for valgrind to detect.
  871 + srs_freep(_srs_config);
  872 + srs_freep(_srs_log);
  873 +
869 exit(0); 874 exit(0);
870 #endif 875 #endif
871 876
@@ -58,6 +58,9 @@ using namespace std; @@ -58,6 +58,9 @@ using namespace std;
58 // when got these videos or audios, pure audio or video, mix ok. 58 // when got these videos or audios, pure audio or video, mix ok.
59 #define SRS_MIX_CORRECT_PURE_AV 10 59 #define SRS_MIX_CORRECT_PURE_AV 10
60 60
  61 +// the time to cleanup source in ms.
  62 +#define SRS_SOURCE_CLEANUP 30000
  63 +
61 int _srs_time_jitter_string2int(std::string time_jitter) 64 int _srs_time_jitter_string2int(std::string time_jitter)
62 { 65 {
63 if (time_jitter == "full") { 66 if (time_jitter == "full") {
@@ -797,15 +800,38 @@ void SrsSource::dispose_all() @@ -797,15 +800,38 @@ void SrsSource::dispose_all()
797 800
798 int SrsSource::cycle_all() 801 int SrsSource::cycle_all()
799 { 802 {
  803 + int cid = _srs_context->get_id();
  804 + int ret = do_cycle_all();
  805 + _srs_context->set_id(cid);
  806 + return ret;
  807 +}
  808 +
  809 +int SrsSource::do_cycle_all()
  810 +{
800 int ret = ERROR_SUCCESS; 811 int ret = ERROR_SUCCESS;
801 812
802 - // TODO: FIXME: support remove dead source for a long time.  
803 std::map<std::string, SrsSource*>::iterator it; 813 std::map<std::string, SrsSource*>::iterator it;
804 - for (it = pool.begin(); it != pool.end(); ++it) { 814 + for (it = pool.begin(); it != pool.end();) {
805 SrsSource* source = it->second; 815 SrsSource* source = it->second;
806 if ((ret = source->cycle()) != ERROR_SUCCESS) { 816 if ((ret = source->cycle()) != ERROR_SUCCESS) {
807 return ret; 817 return ret;
808 } 818 }
  819 +
  820 + if (source->expired()) {
  821 + int cid = source->source_id();
  822 + if (cid == -1 && source->pre_source_id() > 0) {
  823 + cid = source->pre_source_id();
  824 + }
  825 + if (cid > 0) {
  826 + _srs_context->set_id(cid);
  827 + }
  828 + srs_trace("cleanup die source, total=%d", (int)pool.size());
  829 +
  830 + srs_freep(source);
  831 + pool.erase(it++);
  832 + } else {
  833 + ++it;
  834 + }
809 } 835 }
810 836
811 return ret; 837 return ret;
@@ -916,7 +942,8 @@ SrsSource::SrsSource() @@ -916,7 +942,8 @@ SrsSource::SrsSource()
916 cache_metadata = cache_sh_video = cache_sh_audio = NULL; 942 cache_metadata = cache_sh_video = cache_sh_audio = NULL;
917 943
918 _can_publish = true; 944 _can_publish = true;
919 - _source_id = -1; 945 + _pre_source_id = _source_id = -1;
  946 + die_at = -1;
920 947
921 play_edge = new SrsPlayEdge(); 948 play_edge = new SrsPlayEdge();
922 publish_edge = new SrsPublishEdge(); 949 publish_edge = new SrsPublishEdge();
@@ -1001,6 +1028,20 @@ int SrsSource::cycle() @@ -1001,6 +1028,20 @@ int SrsSource::cycle()
1001 return ret; 1028 return ret;
1002 } 1029 }
1003 1030
  1031 +bool SrsSource::expired()
  1032 +{
  1033 + if (!consumers.empty() || die_at == -1) {
  1034 + return false;
  1035 + }
  1036 +
  1037 + int64_t now = srs_get_system_time_ms();
  1038 + if (now > die_at + SRS_SOURCE_CLEANUP) {
  1039 + return true;
  1040 + }
  1041 +
  1042 + return false;
  1043 +}
  1044 +
1004 int SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h, ISrsHlsHandler* hh) 1045 int SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h, ISrsHlsHandler* hh)
1005 { 1046 {
1006 int ret = ERROR_SUCCESS; 1047 int ret = ERROR_SUCCESS;
@@ -1355,6 +1396,12 @@ int SrsSource::on_source_id_changed(int id) @@ -1355,6 +1396,12 @@ int SrsSource::on_source_id_changed(int id)
1355 return ret; 1396 return ret;
1356 } 1397 }
1357 1398
  1399 + if (_pre_source_id == -1) {
  1400 + _pre_source_id = id;
  1401 + } else if (_pre_source_id != _source_id) {
  1402 + _pre_source_id = _source_id;
  1403 + }
  1404 +
1358 _source_id = id; 1405 _source_id = id;
1359 1406
1360 // notice all consumer 1407 // notice all consumer
@@ -1372,6 +1419,11 @@ int SrsSource::source_id() @@ -1372,6 +1419,11 @@ int SrsSource::source_id()
1372 return _source_id; 1419 return _source_id;
1373 } 1420 }
1374 1421
  1422 +int SrsSource::pre_source_id()
  1423 +{
  1424 + return _pre_source_id;
  1425 +}
  1426 +
1375 bool SrsSource::can_publish(bool is_edge) 1427 bool SrsSource::can_publish(bool is_edge)
1376 { 1428 {
1377 if (is_edge) { 1429 if (is_edge) {
@@ -2107,6 +2159,11 @@ int SrsSource::on_publish() @@ -2107,6 +2159,11 @@ int SrsSource::on_publish()
2107 2159
2108 void SrsSource::on_unpublish() 2160 void SrsSource::on_unpublish()
2109 { 2161 {
  2162 + // ignore when already unpublished.
  2163 + if (_can_publish) {
  2164 + return;
  2165 + }
  2166 +
2110 // destroy all forwarders 2167 // destroy all forwarders
2111 destroy_forwarders(); 2168 destroy_forwarders();
2112 2169
@@ -2142,12 +2199,18 @@ void SrsSource::on_unpublish() @@ -2142,12 +2199,18 @@ void SrsSource::on_unpublish()
2142 SrsStatistic* stat = SrsStatistic::instance(); 2199 SrsStatistic* stat = SrsStatistic::instance();
2143 stat->on_stream_close(_req); 2200 stat->on_stream_close(_req);
2144 handler->on_unpublish(this, _req); 2201 handler->on_unpublish(this, _req);
  2202 +
  2203 + // no consumer, stream is die.
  2204 + if (consumers.empty()) {
  2205 + die_at = srs_get_system_time_ms();
  2206 + }
2145 } 2207 }
2146 2208
2147 int SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer, bool ds, bool dm, bool dg) 2209 int SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer, bool ds, bool dm, bool dg)
2148 { 2210 {
2149 int ret = ERROR_SUCCESS; 2211 int ret = ERROR_SUCCESS;
2150 2212
  2213 + die_at = -1;
2151 consumer = new SrsConsumer(this, conn); 2214 consumer = new SrsConsumer(this, conn);
2152 consumers.push_back(consumer); 2215 consumers.push_back(consumer);
2153 2216
@@ -2224,6 +2287,7 @@ void SrsSource::on_consumer_destroy(SrsConsumer* consumer) @@ -2224,6 +2287,7 @@ void SrsSource::on_consumer_destroy(SrsConsumer* consumer)
2224 2287
2225 if (consumers.empty()) { 2288 if (consumers.empty()) {
2226 play_edge->on_all_client_stop(); 2289 play_edge->on_all_client_stop();
  2290 + die_at = srs_get_system_time_ms();
2227 } 2291 }
2228 } 2292 }
2229 2293
@@ -439,6 +439,9 @@ public: @@ -439,6 +439,9 @@ public:
439 */ 439 */
440 static void dispose_all(); 440 static void dispose_all();
441 static int cycle_all(); 441 static int cycle_all();
  442 +private:
  443 + static int do_cycle_all();
  444 +public:
442 /** 445 /**
443 * when system exit, destroy the sources, 446 * when system exit, destroy the sources,
444 * for gmc to analysis mem leaks. 447 * for gmc to analysis mem leaks.
@@ -451,6 +454,8 @@ private: @@ -451,6 +454,8 @@ private:
451 // when source id changed, for example, the edge reconnect, 454 // when source id changed, for example, the edge reconnect,
452 // invoke the on_source_id_changed() to let all clients know. 455 // invoke the on_source_id_changed() to let all clients know.
453 int _source_id; 456 int _source_id;
  457 + // previous source id.
  458 + int _pre_source_id;
454 // deep copy of client request. 459 // deep copy of client request.
455 SrsRequest* _req; 460 SrsRequest* _req;
456 // to delivery stream to clients. 461 // to delivery stream to clients.
@@ -501,6 +506,9 @@ private: @@ -501,6 +506,9 @@ private:
501 */ 506 */
502 // TODO: FIXME: to support reload atc. 507 // TODO: FIXME: to support reload atc.
503 bool atc; 508 bool atc;
  509 + // last die time, when all consumers quit and no publisher,
  510 + // we will remove the source when source die.
  511 + int64_t die_at;
504 private: 512 private:
505 SrsSharedPtrMessage* cache_metadata; 513 SrsSharedPtrMessage* cache_metadata;
506 // the cached video sequence header. 514 // the cached video sequence header.
@@ -513,6 +521,8 @@ public: @@ -513,6 +521,8 @@ public:
513 public: 521 public:
514 virtual void dispose(); 522 virtual void dispose();
515 virtual int cycle(); 523 virtual int cycle();
  524 + // remove source when expired.
  525 + virtual bool expired();
516 // initialize, get and setter. 526 // initialize, get and setter.
517 public: 527 public:
518 /** 528 /**
@@ -543,6 +553,7 @@ public: @@ -543,6 +553,7 @@ public:
543 virtual int on_source_id_changed(int id); 553 virtual int on_source_id_changed(int id);
544 // get current source id. 554 // get current source id.
545 virtual int source_id(); 555 virtual int source_id();
  556 + virtual int pre_source_id();
546 // logic data methods 557 // logic data methods
547 public: 558 public:
548 virtual bool can_publish(bool is_edge); 559 virtual bool can_publish(bool is_edge);
@@ -25,6 +25,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -25,6 +25,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
25 25
26 #include <srs_kernel_error.hpp> 26 #include <srs_kernel_error.hpp>
27 #include <srs_kernel_log.hpp> 27 #include <srs_kernel_log.hpp>
  28 +#include <srs_app_log.hpp>
28 29
29 namespace internal { 30 namespace internal {
30 ISrsThreadHandler::ISrsThreadHandler() 31 ISrsThreadHandler::ISrsThreadHandler()
@@ -243,6 +244,12 @@ namespace internal { @@ -243,6 +244,12 @@ namespace internal {
243 244
244 obj->thread_cycle(); 245 obj->thread_cycle();
245 246
  247 + // for valgrind to detect.
  248 + SrsThreadContext* ctx = dynamic_cast<SrsThreadContext*>(_srs_context);
  249 + if (ctx) {
  250 + ctx->clear_cid();
  251 + }
  252 +
246 st_thread_exit(NULL); 253 st_thread_exit(NULL);
247 254
248 return NULL; 255 return NULL;
@@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
31 // current release version 31 // current release version
32 #define VERSION_MAJOR 2 32 #define VERSION_MAJOR 2
33 #define VERSION_MINOR 0 33 #define VERSION_MINOR 0
34 -#define VERSION_REVISION 213 34 +#define VERSION_REVISION 214
35 35
36 // generated by configure, only macros. 36 // generated by configure, only macros.
37 #include <srs_auto_headers.hpp> 37 #include <srs_auto_headers.hpp>