winlin

merge from 2.0

@@ -71,6 +71,15 @@ int SrsThreadContext::set_id(int v) @@ -71,6 +71,15 @@ int SrsThreadContext::set_id(int v)
71 return ov; 71 return ov;
72 } 72 }
73 73
  74 +void SrsThreadContext::clear_cid()
  75 +{
  76 + st_thread_t self = st_thread_self();
  77 + std::map<st_thread_t, int>::iterator it = cache.find(self);
  78 + if (it != cache.end()) {
  79 + cache.erase(it);
  80 + }
  81 +}
  82 +
74 // the max size of a line of log. 83 // the max size of a line of log.
75 #define LOG_MAX_SIZE 4096 84 #define LOG_MAX_SIZE 4096
76 85
@@ -54,6 +54,8 @@ public: @@ -54,6 +54,8 @@ public:
54 virtual int generate_id(); 54 virtual int generate_id();
55 virtual int get_id(); 55 virtual int get_id();
56 virtual int set_id(int v); 56 virtual int set_id(int v);
  57 +public:
  58 + virtual void clear_cid();
57 }; 59 };
58 60
59 /** 61 /**
@@ -903,6 +903,11 @@ int SrsServer::cycle() @@ -903,6 +903,11 @@ int SrsServer::cycle()
903 srs_warn("main cycle terminated, system quit normally."); 903 srs_warn("main cycle terminated, system quit normally.");
904 dispose(); 904 dispose();
905 srs_trace("srs terminated"); 905 srs_trace("srs terminated");
  906 +
  907 + // for valgrind to detect.
  908 + srs_freep(_srs_config);
  909 + srs_freep(_srs_log);
  910 +
906 exit(0); 911 exit(0);
907 #endif 912 #endif
908 913
@@ -59,6 +59,9 @@ using namespace std; @@ -59,6 +59,9 @@ using namespace std;
59 // when got these videos or audios, pure audio or video, mix ok. 59 // when got these videos or audios, pure audio or video, mix ok.
60 #define SRS_MIX_CORRECT_PURE_AV 10 60 #define SRS_MIX_CORRECT_PURE_AV 10
61 61
  62 +// the time to cleanup source in ms.
  63 +#define SRS_SOURCE_CLEANUP 30000
  64 +
62 int _srs_time_jitter_string2int(std::string time_jitter) 65 int _srs_time_jitter_string2int(std::string time_jitter)
63 { 66 {
64 if (time_jitter == "full") { 67 if (time_jitter == "full") {
@@ -803,15 +806,38 @@ void SrsSource::dispose_all() @@ -803,15 +806,38 @@ void SrsSource::dispose_all()
803 806
804 int SrsSource::cycle_all() 807 int SrsSource::cycle_all()
805 { 808 {
  809 + int cid = _srs_context->get_id();
  810 + int ret = do_cycle_all();
  811 + _srs_context->set_id(cid);
  812 + return ret;
  813 +}
  814 +
  815 +int SrsSource::do_cycle_all()
  816 +{
806 int ret = ERROR_SUCCESS; 817 int ret = ERROR_SUCCESS;
807 818
808 - // TODO: FIXME: support remove dead source for a long time.  
809 std::map<std::string, SrsSource*>::iterator it; 819 std::map<std::string, SrsSource*>::iterator it;
810 - for (it = pool.begin(); it != pool.end(); ++it) { 820 + for (it = pool.begin(); it != pool.end();) {
811 SrsSource* source = it->second; 821 SrsSource* source = it->second;
812 if ((ret = source->cycle()) != ERROR_SUCCESS) { 822 if ((ret = source->cycle()) != ERROR_SUCCESS) {
813 return ret; 823 return ret;
814 } 824 }
  825 +
  826 + if (source->expired()) {
  827 + int cid = source->source_id();
  828 + if (cid == -1 && source->pre_source_id() > 0) {
  829 + cid = source->pre_source_id();
  830 + }
  831 + if (cid > 0) {
  832 + _srs_context->set_id(cid);
  833 + }
  834 + srs_trace("cleanup die source, total=%d", (int)pool.size());
  835 +
  836 + srs_freep(source);
  837 + pool.erase(it++);
  838 + } else {
  839 + ++it;
  840 + }
815 } 841 }
816 842
817 return ret; 843 return ret;
@@ -922,7 +948,8 @@ SrsSource::SrsSource() @@ -922,7 +948,8 @@ SrsSource::SrsSource()
922 cache_metadata = cache_sh_video = cache_sh_audio = NULL; 948 cache_metadata = cache_sh_video = cache_sh_audio = NULL;
923 949
924 _can_publish = true; 950 _can_publish = true;
925 - _source_id = -1; 951 + _pre_source_id = _source_id = -1;
  952 + die_at = -1;
926 953
927 play_edge = new SrsPlayEdge(); 954 play_edge = new SrsPlayEdge();
928 publish_edge = new SrsPublishEdge(); 955 publish_edge = new SrsPublishEdge();
@@ -1009,6 +1036,20 @@ int SrsSource::cycle() @@ -1009,6 +1036,20 @@ int SrsSource::cycle()
1009 return ret; 1036 return ret;
1010 } 1037 }
1011 1038
  1039 +bool SrsSource::expired()
  1040 +{
  1041 + if (!consumers.empty() || die_at == -1) {
  1042 + return false;
  1043 + }
  1044 +
  1045 + int64_t now = srs_get_system_time_ms();
  1046 + if (now > die_at + SRS_SOURCE_CLEANUP) {
  1047 + return true;
  1048 + }
  1049 +
  1050 + return false;
  1051 +}
  1052 +
1012 int SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h, ISrsHlsHandler* hh) 1053 int SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h, ISrsHlsHandler* hh)
1013 { 1054 {
1014 int ret = ERROR_SUCCESS; 1055 int ret = ERROR_SUCCESS;
@@ -1362,6 +1403,12 @@ int SrsSource::on_source_id_changed(int id) @@ -1362,6 +1403,12 @@ int SrsSource::on_source_id_changed(int id)
1362 return ret; 1403 return ret;
1363 } 1404 }
1364 1405
  1406 + if (_pre_source_id == -1) {
  1407 + _pre_source_id = id;
  1408 + } else if (_pre_source_id != _source_id) {
  1409 + _pre_source_id = _source_id;
  1410 + }
  1411 +
1365 _source_id = id; 1412 _source_id = id;
1366 1413
1367 // notice all consumer 1414 // notice all consumer
@@ -1379,6 +1426,11 @@ int SrsSource::source_id() @@ -1379,6 +1426,11 @@ int SrsSource::source_id()
1379 return _source_id; 1426 return _source_id;
1380 } 1427 }
1381 1428
  1429 +int SrsSource::pre_source_id()
  1430 +{
  1431 + return _pre_source_id;
  1432 +}
  1433 +
1382 bool SrsSource::can_publish(bool is_edge) 1434 bool SrsSource::can_publish(bool is_edge)
1383 { 1435 {
1384 if (is_edge) { 1436 if (is_edge) {
@@ -2121,6 +2173,11 @@ int SrsSource::on_publish() @@ -2121,6 +2173,11 @@ int SrsSource::on_publish()
2121 2173
2122 void SrsSource::on_unpublish() 2174 void SrsSource::on_unpublish()
2123 { 2175 {
  2176 + // ignore when already unpublished.
  2177 + if (_can_publish) {
  2178 + return;
  2179 + }
  2180 +
2124 // destroy all forwarders 2181 // destroy all forwarders
2125 destroy_forwarders(); 2182 destroy_forwarders();
2126 2183
@@ -2158,12 +2215,18 @@ void SrsSource::on_unpublish() @@ -2158,12 +2215,18 @@ void SrsSource::on_unpublish()
2158 SrsStatistic* stat = SrsStatistic::instance(); 2215 SrsStatistic* stat = SrsStatistic::instance();
2159 stat->on_stream_close(req); 2216 stat->on_stream_close(req);
2160 handler->on_unpublish(this, req); 2217 handler->on_unpublish(this, req);
  2218 +
  2219 + // no consumer, stream is die.
  2220 + if (consumers.empty()) {
  2221 + die_at = srs_get_system_time_ms();
  2222 + }
2161 } 2223 }
2162 2224
2163 int SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer, bool ds, bool dm, bool dg) 2225 int SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer, bool ds, bool dm, bool dg)
2164 { 2226 {
2165 int ret = ERROR_SUCCESS; 2227 int ret = ERROR_SUCCESS;
2166 2228
  2229 + die_at = -1;
2167 consumer = new SrsConsumer(this, conn); 2230 consumer = new SrsConsumer(this, conn);
2168 consumers.push_back(consumer); 2231 consumers.push_back(consumer);
2169 2232
@@ -2240,6 +2303,7 @@ void SrsSource::on_consumer_destroy(SrsConsumer* consumer) @@ -2240,6 +2303,7 @@ void SrsSource::on_consumer_destroy(SrsConsumer* consumer)
2240 2303
2241 if (consumers.empty()) { 2304 if (consumers.empty()) {
2242 play_edge->on_all_client_stop(); 2305 play_edge->on_all_client_stop();
  2306 + die_at = srs_get_system_time_ms();
2243 } 2307 }
2244 } 2308 }
2245 2309
@@ -441,6 +441,9 @@ public: @@ -441,6 +441,9 @@ public:
441 */ 441 */
442 static void dispose_all(); 442 static void dispose_all();
443 static int cycle_all(); 443 static int cycle_all();
  444 +private:
  445 + static int do_cycle_all();
  446 +public:
444 /** 447 /**
445 * when system exit, destroy the sources, 448 * when system exit, destroy the sources,
446 * for gmc to analysis mem leaks. 449 * for gmc to analysis mem leaks.
@@ -453,6 +456,8 @@ private: @@ -453,6 +456,8 @@ private:
453 // when source id changed, for example, the edge reconnect, 456 // when source id changed, for example, the edge reconnect,
454 // invoke the on_source_id_changed() to let all clients know. 457 // invoke the on_source_id_changed() to let all clients know.
455 int _source_id; 458 int _source_id;
  459 + // previous source id.
  460 + int _pre_source_id;
456 // deep copy of client request. 461 // deep copy of client request.
457 SrsRequest* req; 462 SrsRequest* req;
458 // to delivery stream to clients. 463 // to delivery stream to clients.
@@ -508,6 +513,9 @@ private: @@ -508,6 +513,9 @@ private:
508 * can publish, true when is not streaming 513 * can publish, true when is not streaming
509 */ 514 */
510 bool _can_publish; 515 bool _can_publish;
  516 + // last die time, when all consumers quit and no publisher,
  517 + // we will remove the source when source die.
  518 + int64_t die_at;
511 private: 519 private:
512 SrsSharedPtrMessage* cache_metadata; 520 SrsSharedPtrMessage* cache_metadata;
513 // the cached video sequence header. 521 // the cached video sequence header.
@@ -520,6 +528,8 @@ public: @@ -520,6 +528,8 @@ public:
520 public: 528 public:
521 virtual void dispose(); 529 virtual void dispose();
522 virtual int cycle(); 530 virtual int cycle();
  531 + // remove source when expired.
  532 + virtual bool expired();
523 // initialize, get and setter. 533 // initialize, get and setter.
524 public: 534 public:
525 /** 535 /**
@@ -547,6 +557,7 @@ public: @@ -547,6 +557,7 @@ public:
547 virtual int on_source_id_changed(int id); 557 virtual int on_source_id_changed(int id);
548 // get current source id. 558 // get current source id.
549 virtual int source_id(); 559 virtual int source_id();
  560 + virtual int pre_source_id();
550 // logic data methods 561 // logic data methods
551 public: 562 public:
552 virtual bool can_publish(bool is_edge); 563 virtual bool can_publish(bool is_edge);
@@ -29,6 +29,7 @@ using namespace std; @@ -29,6 +29,7 @@ using namespace std;
29 #include <srs_kernel_error.hpp> 29 #include <srs_kernel_error.hpp>
30 #include <srs_kernel_log.hpp> 30 #include <srs_kernel_log.hpp>
31 #include <srs_app_utility.hpp> 31 #include <srs_app_utility.hpp>
  32 +#include <srs_app_log.hpp>
32 33
33 namespace internal 34 namespace internal
34 { 35 {
@@ -248,6 +249,12 @@ namespace internal @@ -248,6 +249,12 @@ namespace internal
248 249
249 obj->thread_cycle(); 250 obj->thread_cycle();
250 251
  252 + // for valgrind to detect.
  253 + SrsThreadContext* ctx = dynamic_cast<SrsThreadContext*>(_srs_context);
  254 + if (ctx) {
  255 + ctx->clear_cid();
  256 + }
  257 +
251 st_thread_exit(NULL); 258 st_thread_exit(NULL);
252 259
253 return NULL; 260 return NULL;
@@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
31 // current release version 31 // current release version
32 #define VERSION_MAJOR 3 32 #define VERSION_MAJOR 3
33 #define VERSION_MINOR 0 33 #define VERSION_MINOR 0
34 -#define VERSION_REVISION 213 34 +#define VERSION_REVISION 214
35 35
36 // generated by configure, only macros. 36 // generated by configure, only macros.
37 #include <srs_auto_headers.hpp> 37 #include <srs_auto_headers.hpp>