winlin

support reload the gop_cache

@@ -198,7 +198,7 @@ usr sys idl wai hiq siq| read writ| recv send| in out | int csw @@ -198,7 +198,7 @@ usr sys idl wai hiq siq| read writ| recv send| in out | int csw
198 * nginx v1.5.0: 139524 lines <br/> 198 * nginx v1.5.0: 139524 lines <br/>
199 199
200 ### History 200 ### History
201 -* v0.9, 2013-12-14, support reload the hls/forwarder/transcoder. 201 +* v0.9, 2013-12-15, support reload the hls/forwarder/transcoder.
202 * v0.9, 2013-12-14, refine the thread model for the retry threads. 202 * v0.9, 2013-12-14, refine the thread model for the retry threads.
203 * v0.9, 2013-12-10, auto install depends tools/libs on centos/ubuntu. 203 * v0.9, 2013-12-10, auto install depends tools/libs on centos/ubuntu.
204 * v0.8, 2013-12-08, v0.8 released. 19186 lines. 204 * v0.8, 2013-12-08, v0.8 released. 19186 lines.
@@ -26,6 +26,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -26,6 +26,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
26 #include <arpa/inet.h> 26 #include <arpa/inet.h>
27 #include <stdlib.h> 27 #include <stdlib.h>
28 28
  29 +using namespace std;
  30 +
29 #include <srs_core_error.hpp> 31 #include <srs_core_error.hpp>
30 #include <srs_core_log.hpp> 32 #include <srs_core_log.hpp>
31 #include <srs_core_rtmp.hpp> 33 #include <srs_core_rtmp.hpp>
@@ -118,15 +120,20 @@ int SrsClient::do_cycle() @@ -118,15 +120,20 @@ int SrsClient::do_cycle()
118 return ret; 120 return ret;
119 } 121 }
120 122
121 -int SrsClient::on_reload_vhost_removed(SrsConfDirective* vhost) 123 +int SrsClient::on_reload_vhost_removed(string vhost)
122 { 124 {
123 int ret = ERROR_SUCCESS; 125 int ret = ERROR_SUCCESS;
124 126
125 - // if the vhost connected is removed, disconnect the client.  
126 - if (req->vhost == vhost->arg0()) {  
127 - srs_close_stfd(stfd); 127 + if (req->vhost != vhost) {
  128 + return ret;
128 } 129 }
129 130
  131 + // if the vhost connected is removed, disconnect the client.
  132 + srs_trace("vhost %s removed/disabled, close client url=%s",
  133 + vhost.c_str(), req->get_stream_url().c_str());
  134 +
  135 + srs_close_stfd(stfd);
  136 +
130 return ret; 137 return ret;
131 } 138 }
132 139
@@ -174,7 +181,7 @@ int SrsClient::service_cycle() @@ -174,7 +181,7 @@ int SrsClient::service_cycle()
174 srs_trace("set chunk_size=%d success", chunk_size); 181 srs_trace("set chunk_size=%d success", chunk_size);
175 182
176 // find a source to publish. 183 // find a source to publish.
177 - SrsSource* source = SrsSource::find(req->get_stream_url()); 184 + SrsSource* source = SrsSource::find(req->get_stream_url(), req->vhost);
178 srs_assert(source != NULL); 185 srs_assert(source != NULL);
179 186
180 // check publish available. 187 // check publish available.
@@ -65,7 +65,7 @@ protected: @@ -65,7 +65,7 @@ protected:
65 virtual int do_cycle(); 65 virtual int do_cycle();
66 // interface ISrsReloadHandler 66 // interface ISrsReloadHandler
67 public: 67 public:
68 - virtual int on_reload_vhost_removed(SrsConfDirective* vhost); 68 + virtual int on_reload_vhost_removed(std::string vhost);
69 private: 69 private:
70 // when valid and connected to vhost/app, service the client. 70 // when valid and connected to vhost/app, service the client.
71 virtual int service_cycle(); 71 virtual int service_cycle();
@@ -500,39 +500,60 @@ int SrsConfig::reload() @@ -500,39 +500,60 @@ int SrsConfig::reload()
500 if (old_vhost->name != "vhost") { 500 if (old_vhost->name != "vhost") {
501 continue; 501 continue;
502 } 502 }
  503 +
  504 + std::string vhost = old_vhost->arg0();
503 505
504 - SrsConfDirective* new_vhost = root->get("vhost", old_vhost->arg0()); 506 + SrsConfDirective* new_vhost = root->get("vhost", vhost);
505 // ignore if absolutely equal 507 // ignore if absolutely equal
506 if (new_vhost && srs_directive_equals(old_vhost, new_vhost)) { 508 if (new_vhost && srs_directive_equals(old_vhost, new_vhost)) {
  509 + srs_trace("vhost %s absolutely equal, ignore.", vhost.c_str());
507 continue; 510 continue;
508 } 511 }
509 // ignore if enable the new vhost when old vhost is disabled. 512 // ignore if enable the new vhost when old vhost is disabled.
510 if (get_vhost_enabled(new_vhost) && !get_vhost_enabled(old_vhost)) { 513 if (get_vhost_enabled(new_vhost) && !get_vhost_enabled(old_vhost)) {
  514 + srs_trace("vhost %s disabled=>enabled, ignore.", vhost.c_str());
511 continue; 515 continue;
512 } 516 }
513 // ignore if both old and new vhost are disabled. 517 // ignore if both old and new vhost are disabled.
514 if (!get_vhost_enabled(new_vhost) && !get_vhost_enabled(old_vhost)) { 518 if (!get_vhost_enabled(new_vhost) && !get_vhost_enabled(old_vhost)) {
  519 + srs_trace("vhost %s disabled=>disabled, ignore.", vhost.c_str());
515 continue; 520 continue;
516 } 521 }
517 522
518 // merge config: vhost removed/disabled. 523 // merge config: vhost removed/disabled.
519 if (!get_vhost_enabled(new_vhost) && get_vhost_enabled(old_vhost)) { 524 if (!get_vhost_enabled(new_vhost) && get_vhost_enabled(old_vhost)) {
520 - srs_trace("vhost %s disabled, reload it.", old_vhost->name.c_str()); 525 + srs_trace("vhost %s disabled, reload it.", vhost.c_str());
521 for (it = subscribes.begin(); it != subscribes.end(); ++it) { 526 for (it = subscribes.begin(); it != subscribes.end(); ++it) {
522 ISrsReloadHandler* subscribe = *it; 527 ISrsReloadHandler* subscribe = *it;
523 - if ((ret = subscribe->on_reload_vhost_removed(old_vhost)) != ERROR_SUCCESS) {  
524 - srs_error("notify subscribes pithy_print remove vhost failed. ret=%d", ret); 528 + if ((ret = subscribe->on_reload_vhost_removed(vhost)) != ERROR_SUCCESS) {
  529 + srs_error("notify subscribes pithy_print remove "
  530 + "vhost %s failed. ret=%d", vhost.c_str(), ret);
525 return ret; 531 return ret;
526 } 532 }
527 } 533 }
528 - srs_trace("reload remove vhost success."); 534 + srs_trace("reload remove vhost %s success.", vhost.c_str());
529 } 535 }
530 536
531 // merge config: vhost modified. 537 // merge config: vhost modified.
  538 + srs_trace("vhost %s modified, reload its detail.", vhost.c_str());
  539 + if (get_vhost_enabled(new_vhost) && get_vhost_enabled(old_vhost)) {
  540 + if (!srs_directive_equals(new_vhost->get("gop_cache"), old_vhost->get("gop_cache"))) {
  541 + for (it = subscribes.begin(); it != subscribes.end(); ++it) {
  542 + ISrsReloadHandler* subscribe = *it;
  543 + if ((ret = subscribe->on_reload_gop_cache(vhost)) != ERROR_SUCCESS) {
  544 + srs_error("vhost %s notify subscribes gop_cache failed. ret=%d", vhost.c_str(), ret);
  545 + return ret;
  546 + }
  547 + }
  548 + srs_trace("vhost %s reload gop_cache success.", vhost.c_str());
  549 + }
  550 + // TODO: suppor reload hls/forward/ffmpeg/http
  551 + continue;
  552 + }
  553 + srs_warn("invalid reload path, enabled old: %d, new: %d",
  554 + get_vhost_enabled(old_vhost), get_vhost_enabled(new_vhost));
532 } 555 }
533 556
534 - // TODO: suppor reload hls/forward/ffmpeg/http  
535 -  
536 return ret; 557 return ret;
537 } 558 }
538 559
@@ -1440,6 +1461,11 @@ int SrsConfig::get_pithy_print_play() @@ -1440,6 +1461,11 @@ int SrsConfig::get_pithy_print_play()
1440 1461
1441 bool srs_directive_equals(SrsConfDirective* a, SrsConfDirective* b) 1462 bool srs_directive_equals(SrsConfDirective* a, SrsConfDirective* b)
1442 { 1463 {
  1464 + // both NULL, equal.
  1465 + if (!a && !b) {
  1466 + return true;
  1467 + }
  1468 +
1443 if (!a || !b) { 1469 if (!a || !b) {
1444 return false; 1470 return false;
1445 } 1471 }
@@ -23,6 +23,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -23,6 +23,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23 23
24 #include <srs_core_reload.hpp> 24 #include <srs_core_reload.hpp>
25 25
  26 +using namespace std;
  27 +
26 #include <srs_core_error.hpp> 28 #include <srs_core_error.hpp>
27 29
28 ISrsReloadHandler::ISrsReloadHandler() 30 ISrsReloadHandler::ISrsReloadHandler()
@@ -43,7 +45,12 @@ int ISrsReloadHandler::on_reload_pithy_print() @@ -43,7 +45,12 @@ int ISrsReloadHandler::on_reload_pithy_print()
43 return ERROR_SUCCESS; 45 return ERROR_SUCCESS;
44 } 46 }
45 47
46 -int ISrsReloadHandler::on_reload_vhost_removed(SrsConfDirective* /*vhost*/) 48 +int ISrsReloadHandler::on_reload_vhost_removed(string /*vhost*/)
  49 +{
  50 + return ERROR_SUCCESS;
  51 +}
  52 +
  53 +int ISrsReloadHandler::on_reload_gop_cache(string /*vhost*/)
47 { 54 {
48 return ERROR_SUCCESS; 55 return ERROR_SUCCESS;
49 } 56 }
@@ -29,7 +29,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -29,7 +29,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 */ 29 */
30 #include <srs_core.hpp> 30 #include <srs_core.hpp>
31 31
32 -class SrsConfDirective; 32 +#include <string>
33 33
34 /** 34 /**
35 * the handler for config reload. 35 * the handler for config reload.
@@ -42,7 +42,8 @@ public: @@ -42,7 +42,8 @@ public:
42 public: 42 public:
43 virtual int on_reload_listen(); 43 virtual int on_reload_listen();
44 virtual int on_reload_pithy_print(); 44 virtual int on_reload_pithy_print();
45 - virtual int on_reload_vhost_removed(SrsConfDirective* vhost); 45 + virtual int on_reload_vhost_removed(std::string vhost);
  46 + virtual int on_reload_gop_cache(std::string vhost);
46 }; 47 };
47 48
48 #endif 49 #endif
@@ -24,6 +24,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -24,6 +24,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24 #include <srs_core_source.hpp> 24 #include <srs_core_source.hpp>
25 25
26 #include <algorithm> 26 #include <algorithm>
  27 +using namespace std;
27 28
28 #include <srs_core_log.hpp> 29 #include <srs_core_log.hpp>
29 #include <srs_core_protocol.hpp> 30 #include <srs_core_protocol.hpp>
@@ -344,19 +345,21 @@ int SrsGopCache::dump(SrsConsumer* consumer, int tba, int tbv) @@ -344,19 +345,21 @@ int SrsGopCache::dump(SrsConsumer* consumer, int tba, int tbv)
344 345
345 std::map<std::string, SrsSource*> SrsSource::pool; 346 std::map<std::string, SrsSource*> SrsSource::pool;
346 347
347 -SrsSource* SrsSource::find(std::string stream_url) 348 +SrsSource* SrsSource::find(string stream_url, string vhost)
348 { 349 {
349 if (pool.find(stream_url) == pool.end()) { 350 if (pool.find(stream_url) == pool.end()) {
350 - pool[stream_url] = new SrsSource(stream_url);  
351 - srs_verbose("create new source for url=%s", stream_url.c_str()); 351 + pool[stream_url] = new SrsSource(stream_url, vhost);
  352 + srs_verbose("create new source for "
  353 + "url=%s, vhost=%s", stream_url.c_str(), vhost.c_str());
352 } 354 }
353 355
354 return pool[stream_url]; 356 return pool[stream_url];
355 } 357 }
356 358
357 -SrsSource::SrsSource(std::string _stream_url) 359 +SrsSource::SrsSource(string _stream_url, string _vhost)
358 { 360 {
359 stream_url = _stream_url; 361 stream_url = _stream_url;
  362 + vhost = _vhost;
360 363
361 #ifdef SRS_HLS 364 #ifdef SRS_HLS
362 hls = new SrsHls(); 365 hls = new SrsHls();
@@ -407,6 +410,25 @@ SrsSource::~SrsSource() @@ -407,6 +410,25 @@ SrsSource::~SrsSource()
407 #endif 410 #endif
408 } 411 }
409 412
  413 +int SrsSource::on_reload_gop_cache(string _vhost)
  414 +{
  415 + int ret = ERROR_SUCCESS;
  416 +
  417 + if (vhost != _vhost) {
  418 + return ret;
  419 + }
  420 +
  421 + // gop cache changed.
  422 + bool enabled_cache = config->get_gop_cache(vhost);
  423 +
  424 + srs_trace("vhost %s gop_cache changed to %d, source url=%s",
  425 + vhost.c_str(), enabled_cache, stream_url.c_str());
  426 +
  427 + set_cache(enabled_cache);
  428 +
  429 + return ret;
  430 +}
  431 +
410 bool SrsSource::can_publish() 432 bool SrsSource::can_publish()
411 { 433 {
412 return _can_publish; 434 return _can_publish;
@@ -34,6 +34,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -34,6 +34,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
34 #include <vector> 34 #include <vector>
35 #include <string> 35 #include <string>
36 36
  37 +#include <srs_core_reload.hpp>
  38 +
37 class SrsSource; 39 class SrsSource;
38 class SrsCommonMessage; 40 class SrsCommonMessage;
39 class SrsOnMetaDataPacket; 41 class SrsOnMetaDataPacket;
@@ -158,19 +160,21 @@ public: @@ -158,19 +160,21 @@ public:
158 /** 160 /**
159 * live streaming source. 161 * live streaming source.
160 */ 162 */
161 -class SrsSource 163 +class SrsSource : public ISrsReloadHandler
162 { 164 {
163 private: 165 private:
164 static std::map<std::string, SrsSource*> pool; 166 static std::map<std::string, SrsSource*> pool;
165 public: 167 public:
166 /** 168 /**
167 * find stream by vhost/app/stream. 169 * find stream by vhost/app/stream.
168 - * @stream_url the stream url, for example, myserver.xxx.com/app/stream 170 + * @param stream_url the stream url, for example, myserver.xxx.com/app/stream
  171 + * @param vhost the vhost to constructor the object.
169 * @return the matched source, never be NULL. 172 * @return the matched source, never be NULL.
170 * @remark stream_url should without port and schema. 173 * @remark stream_url should without port and schema.
171 */ 174 */
172 - static SrsSource* find(std::string stream_url); 175 + static SrsSource* find(std::string stream_url, std::string vhost);
173 private: 176 private:
  177 + std::string vhost;
174 std::string stream_url; 178 std::string stream_url;
175 // to delivery stream to clients. 179 // to delivery stream to clients.
176 std::vector<SrsConsumer*> consumers; 180 std::vector<SrsConsumer*> consumers;
@@ -206,8 +210,11 @@ private: @@ -206,8 +210,11 @@ private:
206 // the cached audio sequence header. 210 // the cached audio sequence header.
207 SrsSharedPtrMessage* cache_sh_audio; 211 SrsSharedPtrMessage* cache_sh_audio;
208 public: 212 public:
209 - SrsSource(std::string _stream_url); 213 + SrsSource(std::string _stream_url, std::string _vhost);
210 virtual ~SrsSource(); 214 virtual ~SrsSource();
  215 +// interface ISrsReloadHandler
  216 +public:
  217 + virtual int on_reload_gop_cache(std::string _vhost);
211 public: 218 public:
212 virtual bool can_publish(); 219 virtual bool can_publish();
213 virtual int on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata); 220 virtual int on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata);