winlin

fix #439: http remux support reload

@@ -852,6 +852,18 @@ int SrsConfig::reload_vhost(SrsConfDirective* old_root) @@ -852,6 +852,18 @@ int SrsConfig::reload_vhost(SrsConfDirective* old_root)
852 return ret; 852 return ret;
853 } 853 }
854 } 854 }
  855 +
  856 + // http_remux, only one per vhost.
  857 + if (get_vhost_http_remux_enabled(vhost)) {
  858 + for (it = subscribes.begin(); it != subscribes.end(); ++it) {
  859 + ISrsReloadHandler* subscribe = *it;
  860 + if ((ret = subscribe->on_reload_vhost_http_remux_updated(vhost)) != ERROR_SUCCESS) {
  861 + srs_error("vhost %s notify subscribes http_remux failed. ret=%d", vhost.c_str(), ret);
  862 + return ret;
  863 + }
  864 + }
  865 + srs_trace("vhost %s reload http_remux success.", vhost.c_str());
  866 + }
855 srs_trace("reload new vhost %s success.", vhost.c_str()); 867 srs_trace("reload new vhost %s success.", vhost.c_str());
856 continue; 868 continue;
857 } 869 }
@@ -1060,7 +1072,7 @@ int SrsConfig::reload_vhost(SrsConfDirective* old_root) @@ -1060,7 +1072,7 @@ int SrsConfig::reload_vhost(SrsConfDirective* old_root)
1060 if (!srs_directive_equals(new_vhost->get("http_remux"), old_vhost->get("http_remux"))) { 1072 if (!srs_directive_equals(new_vhost->get("http_remux"), old_vhost->get("http_remux"))) {
1061 for (it = subscribes.begin(); it != subscribes.end(); ++it) { 1073 for (it = subscribes.begin(); it != subscribes.end(); ++it) {
1062 ISrsReloadHandler* subscribe = *it; 1074 ISrsReloadHandler* subscribe = *it;
1063 - if ((ret = subscribe->on_reload_vhost_http_remux_updated()) != ERROR_SUCCESS) { 1075 + if ((ret = subscribe->on_reload_vhost_http_remux_updated(vhost)) != ERROR_SUCCESS) {
1064 srs_error("vhost %s notify subscribes http_remux failed. ret=%d", vhost.c_str(), ret); 1076 srs_error("vhost %s notify subscribes http_remux failed. ret=%d", vhost.c_str(), ret);
1065 return ret; 1077 return ret;
1066 } 1078 }
@@ -1077,7 +1089,7 @@ int SrsConfig::reload_vhost(SrsConfDirective* old_root) @@ -1077,7 +1089,7 @@ int SrsConfig::reload_vhost(SrsConfDirective* old_root)
1077 } 1089 }
1078 continue; 1090 continue;
1079 } 1091 }
1080 - srs_trace("igreno reload vhost, enabled old: %d, new: %d", 1092 + srs_trace("ignore reload vhost, enabled old: %d, new: %d",
1081 get_vhost_enabled(old_vhost), get_vhost_enabled(new_vhost)); 1093 get_vhost_enabled(old_vhost), get_vhost_enabled(new_vhost));
1082 } 1094 }
1083 1095
@@ -509,7 +509,8 @@ int SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) @@ -509,7 +509,8 @@ int SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
509 SrsFastFlvStreamEncoder* ffe = dynamic_cast<SrsFastFlvStreamEncoder*>(enc); 509 SrsFastFlvStreamEncoder* ffe = dynamic_cast<SrsFastFlvStreamEncoder*>(enc);
510 #endif 510 #endif
511 511
512 - while (true) { 512 + // TODO: free and erase the disabled entry after all related connections is closed.
  513 + while (entry->enabled) {
513 pprint->elapse(); 514 pprint->elapse();
514 515
515 // get messages from consumer. 516 // get messages from consumer.
@@ -594,6 +595,9 @@ SrsLiveEntry::SrsLiveEntry(std::string m, bool h) @@ -594,6 +595,9 @@ SrsLiveEntry::SrsLiveEntry(std::string m, bool h)
594 stream = NULL; 595 stream = NULL;
595 cache = NULL; 596 cache = NULL;
596 597
  598 + req = NULL;
  599 + source = NULL;
  600 +
597 std::string ext; 601 std::string ext;
598 size_t pos = string::npos; 602 size_t pos = string::npos;
599 if ((pos = m.rfind(".")) != string::npos) { 603 if ((pos = m.rfind(".")) != string::npos) {
@@ -605,6 +609,11 @@ SrsLiveEntry::SrsLiveEntry(std::string m, bool h) @@ -605,6 +609,11 @@ SrsLiveEntry::SrsLiveEntry(std::string m, bool h)
605 _is_aac = (ext == ".aac"); 609 _is_aac = (ext == ".aac");
606 } 610 }
607 611
  612 +void SrsLiveEntry::reset_hstrs(bool h)
  613 +{
  614 + hstrs = h;
  615 +}
  616 +
608 bool SrsLiveEntry::is_flv() 617 bool SrsLiveEntry::is_flv()
609 { 618 {
610 return _is_flv; 619 return _is_flv;
@@ -699,16 +708,19 @@ SrsHttpStreamServer::SrsHttpStreamServer(SrsServer* svr) @@ -699,16 +708,19 @@ SrsHttpStreamServer::SrsHttpStreamServer(SrsServer* svr)
699 server = svr; 708 server = svr;
700 709
701 mux.hijack(this); 710 mux.hijack(this);
  711 + _srs_config->subscribe(this);
702 } 712 }
703 713
704 SrsHttpStreamServer::~SrsHttpStreamServer() 714 SrsHttpStreamServer::~SrsHttpStreamServer()
705 { 715 {
706 mux.unhijack(this); 716 mux.unhijack(this);
  717 + _srs_config->unsubscribe(this);
707 718
708 if (true) { 719 if (true) {
709 std::map<std::string, SrsLiveEntry*>::iterator it; 720 std::map<std::string, SrsLiveEntry*>::iterator it;
710 for (it = tflvs.begin(); it != tflvs.end(); ++it) { 721 for (it = tflvs.begin(); it != tflvs.end(); ++it) {
711 SrsLiveEntry* entry = it->second; 722 SrsLiveEntry* entry = it->second;
  723 + srs_freep(entry->req);
712 srs_freep(entry); 724 srs_freep(entry);
713 } 725 }
714 tflvs.clear(); 726 tflvs.clear();
@@ -783,12 +795,20 @@ int SrsHttpStreamServer::http_mount(SrsSource* s, SrsRequest* r) @@ -783,12 +795,20 @@ int SrsHttpStreamServer::http_mount(SrsSource* s, SrsRequest* r)
783 795
784 // remove the default vhost mount 796 // remove the default vhost mount
785 mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", "/"); 797 mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", "/");
  798 + // TODO: FIXME: check match
  799 + if (mount.at(0) != '/') {
  800 + mount = "/" + mount;
  801 + }
786 802
787 entry = new SrsLiveEntry(mount, tmpl->hstrs); 803 entry = new SrsLiveEntry(mount, tmpl->hstrs);
788 804
789 entry->cache = new SrsStreamCache(s, r); 805 entry->cache = new SrsStreamCache(s, r);
790 entry->stream = new SrsLiveStream(s, r, entry->cache); 806 entry->stream = new SrsLiveStream(s, r, entry->cache);
791 807
  808 + srs_assert(!tmpl->req);
  809 + tmpl->source = s;
  810 + tmpl->req = r->copy();
  811 +
792 sflvs[sid] = entry; 812 sflvs[sid] = entry;
793 813
794 // mount the http flv stream. 814 // mount the http flv stream.
@@ -810,7 +830,6 @@ int SrsHttpStreamServer::http_mount(SrsSource* s, SrsRequest* r) @@ -810,7 +830,6 @@ int SrsHttpStreamServer::http_mount(SrsSource* s, SrsRequest* r)
810 entry = sflvs[sid]; 830 entry = sflvs[sid];
811 } 831 }
812 832
813 - // TODO: FIXME: supports reload.  
814 if (entry->stream) { 833 if (entry->stream) {
815 entry->stream->entry->enabled = true; 834 entry->stream->entry->enabled = true;
816 return ret; 835 return ret;
@@ -832,6 +851,69 @@ void SrsHttpStreamServer::http_unmount(SrsSource* s, SrsRequest* r) @@ -832,6 +851,69 @@ void SrsHttpStreamServer::http_unmount(SrsSource* s, SrsRequest* r)
832 entry->stream->entry->enabled = false; 851 entry->stream->entry->enabled = false;
833 } 852 }
834 853
  854 +int SrsHttpStreamServer::on_reload_vhost_http_remux_updated(string vhost)
  855 +{
  856 + int ret = ERROR_SUCCESS;
  857 +
  858 + if (tflvs.find(vhost) == tflvs.end()) {
  859 + if ((ret = initialize_flv_entry(vhost)) != ERROR_SUCCESS) {
  860 + return ret;
  861 + }
  862 +
  863 + // http mount need SrsRequest and SrsSource param, only create a mapping template entry
  864 + // and do mount automatically on playing http flv if this stream is a new http_remux stream.
  865 + return ret;
  866 + }
  867 +
  868 + SrsLiveEntry* tmpl = tflvs[vhost];
  869 + SrsRequest* req = tmpl->req;
  870 + SrsSource* source = tmpl->source;
  871 +
  872 + if (source && req) {
  873 + // cleanup the exists http remux.
  874 + http_unmount(source, req);
  875 + }
  876 +
  877 + if (!_srs_config->get_vhost_http_remux_enabled(vhost)) {
  878 + return ret;
  879 + }
  880 +
  881 + string old_tmpl_mount = tmpl->mount;
  882 + string new_tmpl_mount = _srs_config->get_vhost_http_remux_mount(vhost);
  883 + bool hstrs = _srs_config->get_vhost_http_remux_hstrs(vhost);
  884 +
  885 + tmpl->reset_hstrs(hstrs);
  886 +
  887 + /**
  888 + * TODO: not support to reload different mount url for the time being.
  889 + * if the mount is change, need more logical thing to deal with.
  890 + * such as erase stream from sflvs and free all related resource.
  891 + */
  892 + srs_assert(old_tmpl_mount == new_tmpl_mount);
  893 +
  894 + // do http mount directly with SrsRequest and SrsSource if stream is played already.
  895 + if (req) {
  896 + std::string sid = req->get_stream_url();
  897 +
  898 + if (sflvs.find(sid) != sflvs.end()) {
  899 + SrsLiveEntry* stream = sflvs[sid];
  900 + stream->reset_hstrs(hstrs);
  901 + }
  902 + // remount stream.
  903 + if ((ret = http_mount(source, req)) != ERROR_SUCCESS) {
  904 + srs_trace("vhost %s http_remux reload failed", vhost.c_str());
  905 + return ret;
  906 + }
  907 + } else {
  908 + // for without SrsRequest and SrsSource if stream is not played yet, do http mount automatically
  909 + // when start play this http flv stream.
  910 + }
  911 +
  912 + srs_trace("vhost %s http_remux reload success", vhost.c_str());
  913 +
  914 + return ret;
  915 +}
  916 +
835 int SrsHttpStreamServer::mount_hls(SrsRequest* r) 917 int SrsHttpStreamServer::mount_hls(SrsRequest* r)
836 { 918 {
837 int ret = ERROR_SUCCESS; 919 int ret = ERROR_SUCCESS;
@@ -958,7 +1040,6 @@ int SrsHttpStreamServer::hls_update_ts(SrsRequest* r, string uri, string ts) @@ -958,7 +1040,6 @@ int SrsHttpStreamServer::hls_update_ts(SrsRequest* r, string uri, string ts)
958 return ret; 1040 return ret;
959 } 1041 }
960 1042
961 -  
962 int SrsHttpStreamServer::hls_remove_ts(SrsRequest* r, string uri) 1043 int SrsHttpStreamServer::hls_remove_ts(SrsRequest* r, string uri)
963 { 1044 {
964 int ret = ERROR_SUCCESS; 1045 int ret = ERROR_SUCCESS;
@@ -1010,13 +1091,6 @@ void SrsHttpStreamServer::unmount_hls(SrsRequest* r) @@ -1010,13 +1091,6 @@ void SrsHttpStreamServer::unmount_hls(SrsRequest* r)
1010 } 1091 }
1011 } 1092 }
1012 1093
1013 -int SrsHttpStreamServer::on_reload_vhost_http_remux_updated()  
1014 -{  
1015 - int ret = ERROR_SUCCESS;  
1016 - // TODO: FIXME: implements it.  
1017 - return ret;  
1018 -}  
1019 -  
1020 int SrsHttpStreamServer::on_reload_vhost_hls(string vhost) 1094 int SrsHttpStreamServer::on_reload_vhost_hls(string vhost)
1021 { 1095 {
1022 int ret = ERROR_SUCCESS; 1096 int ret = ERROR_SUCCESS;
@@ -1056,6 +1130,8 @@ int SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph) @@ -1056,6 +1130,8 @@ int SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph)
1056 } 1130 }
1057 1131
1058 // hstrs not enabled, ignore. 1132 // hstrs not enabled, ignore.
  1133 + // for origin: generally set hstrs to 'off' and mount while stream is pushed to origin.
  1134 + // for edge: must set hstrs to 'on' so that it could trigger rtmp stream before mount.
1059 entry = it->second; 1135 entry = it->second;
1060 if (!entry->hstrs) { 1136 if (!entry->hstrs) {
1061 return ret; 1137 return ret;
@@ -1090,6 +1166,17 @@ int SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph) @@ -1090,6 +1166,17 @@ int SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph)
1090 // hijack for entry. 1166 // hijack for entry.
1091 SrsRequest* r = hreq->to_request(vhost->arg0()); 1167 SrsRequest* r = hreq->to_request(vhost->arg0());
1092 SrsAutoFree(SrsRequest, r); 1168 SrsAutoFree(SrsRequest, r);
  1169 +
  1170 + std::string sid = r->get_stream_url();
  1171 + // check if the stream is enabled.
  1172 + if (sflvs.find(sid) != sflvs.end()) {
  1173 + SrsLiveEntry* entry = sflvs[sid];
  1174 + if (!entry->stream->entry->enabled) {
  1175 + srs_error("stream is disabled, hijack failed. ret=%d", ret);
  1176 + return ret;
  1177 + }
  1178 + }
  1179 +
1093 SrsSource* s = SrsSource::fetch(r); 1180 SrsSource* s = SrsSource::fetch(r);
1094 if (!s) { 1181 if (!s) {
1095 if ((ret = SrsSource::create(r, server, server, &s)) != ERROR_SUCCESS) { 1182 if ((ret = SrsSource::create(r, server, server, &s)) != ERROR_SUCCESS) {
@@ -1105,7 +1192,6 @@ int SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph) @@ -1105,7 +1192,6 @@ int SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph)
1105 1192
1106 // use the handler if exists. 1193 // use the handler if exists.
1107 if (ph) { 1194 if (ph) {
1108 - std::string sid = r->get_stream_url();  
1109 if (sflvs.find(sid) != sflvs.end()) { 1195 if (sflvs.find(sid) != sflvs.end()) {
1110 entry = sflvs[sid]; 1196 entry = sflvs[sid];
1111 *ph = entry->stream; 1197 *ph = entry->stream;
@@ -1142,19 +1228,27 @@ int SrsHttpStreamServer::initialize_flv_streaming() @@ -1142,19 +1228,27 @@ int SrsHttpStreamServer::initialize_flv_streaming()
1142 continue; 1228 continue;
1143 } 1229 }
1144 1230
1145 - std::string vhost = conf->arg0(); 1231 + initialize_flv_entry(conf->arg0());
  1232 + }
  1233 + return ret;
  1234 +}
  1235 +
  1236 +int SrsHttpStreamServer::initialize_flv_entry(std::string vhost)
  1237 +{
  1238 + int ret = ERROR_SUCCESS;
  1239 +
1146 if (!_srs_config->get_vhost_http_remux_enabled(vhost)) { 1240 if (!_srs_config->get_vhost_http_remux_enabled(vhost)) {
1147 - continue; 1241 + return ret;
1148 } 1242 }
1149 1243
1150 SrsLiveEntry* entry = new SrsLiveEntry( 1244 SrsLiveEntry* entry = new SrsLiveEntry(
1151 _srs_config->get_vhost_http_remux_mount(vhost), 1245 _srs_config->get_vhost_http_remux_mount(vhost),
1152 _srs_config->get_vhost_http_remux_hstrs(vhost) 1246 _srs_config->get_vhost_http_remux_hstrs(vhost)
1153 ); 1247 );
  1248 +
1154 tflvs[vhost] = entry; 1249 tflvs[vhost] = entry;
1155 srs_trace("http flv live stream, vhost=%s, mount=%s", 1250 srs_trace("http flv live stream, vhost=%s, mount=%s",
1156 vhost.c_str(), entry->mount.c_str()); 1251 vhost.c_str(), entry->mount.c_str());
1157 - }  
1158 1252
1159 return ret; 1253 return ret;
1160 } 1254 }
@@ -242,6 +242,9 @@ private: @@ -242,6 +242,9 @@ private:
242 bool _is_aac; 242 bool _is_aac;
243 bool _is_mp3; 243 bool _is_mp3;
244 public: 244 public:
  245 + SrsRequest* req;
  246 + SrsSource* source;
  247 +public:
245 // for template, the mount contains variables. 248 // for template, the mount contains variables.
246 // for concrete stream, the mount is url to access. 249 // for concrete stream, the mount is url to access.
247 std::string mount; 250 std::string mount;
@@ -252,6 +255,7 @@ public: @@ -252,6 +255,7 @@ public:
252 SrsStreamCache* cache; 255 SrsStreamCache* cache;
253 256
254 SrsLiveEntry(std::string m, bool h); 257 SrsLiveEntry(std::string m, bool h);
  258 + void reset_hstrs(bool h);
255 259
256 bool is_flv(); 260 bool is_flv();
257 bool is_ts(); 261 bool is_ts();
@@ -348,13 +352,14 @@ public: @@ -348,13 +352,14 @@ public:
348 virtual void unmount_hls(SrsRequest* r); 352 virtual void unmount_hls(SrsRequest* r);
349 // interface ISrsReloadHandler. 353 // interface ISrsReloadHandler.
350 public: 354 public:
351 - virtual int on_reload_vhost_http_remux_updated(); 355 + virtual int on_reload_vhost_http_remux_updated(std::string vhost);
352 virtual int on_reload_vhost_hls(std::string vhost); 356 virtual int on_reload_vhost_hls(std::string vhost);
353 // interface ISrsHttpMatchHijacker 357 // interface ISrsHttpMatchHijacker
354 public: 358 public:
355 virtual int hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph); 359 virtual int hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph);
356 private: 360 private:
357 virtual int initialize_flv_streaming(); 361 virtual int initialize_flv_streaming();
  362 + virtual int initialize_flv_entry(std::string vhost);
358 virtual int initialize_hls_streaming(); 363 virtual int initialize_hls_streaming();
359 virtual std::string hls_mount_generate(SrsRequest* r, std::string uri, std::string tmpl); 364 virtual std::string hls_mount_generate(SrsRequest* r, std::string uri, std::string tmpl);
360 }; 365 };
@@ -100,7 +100,7 @@ int ISrsReloadHandler::on_reload_vhost_http_updated() @@ -100,7 +100,7 @@ int ISrsReloadHandler::on_reload_vhost_http_updated()
100 return ERROR_SUCCESS; 100 return ERROR_SUCCESS;
101 } 101 }
102 102
103 -int ISrsReloadHandler::on_reload_vhost_http_remux_updated() 103 +int ISrsReloadHandler::on_reload_vhost_http_remux_updated(string vhost)
104 { 104 {
105 return ERROR_SUCCESS; 105 return ERROR_SUCCESS;
106 } 106 }
@@ -58,7 +58,7 @@ public: @@ -58,7 +58,7 @@ public:
58 virtual int on_reload_http_stream_updated(); 58 virtual int on_reload_http_stream_updated();
59 public: 59 public:
60 virtual int on_reload_vhost_http_updated(); 60 virtual int on_reload_vhost_http_updated();
61 - virtual int on_reload_vhost_http_remux_updated(); 61 + virtual int on_reload_vhost_http_remux_updated(std::string vhost);
62 virtual int on_reload_vhost_added(std::string vhost); 62 virtual int on_reload_vhost_added(std::string vhost);
63 virtual int on_reload_vhost_removed(std::string vhost); 63 virtual int on_reload_vhost_removed(std::string vhost);
64 virtual int on_reload_vhost_atc(std::string vhost); 64 virtual int on_reload_vhost_atc(std::string vhost);