winlin

support reload the forwarder

@@ -83,7 +83,7 @@ vhost __defaultVhost__ { @@ -83,7 +83,7 @@ vhost __defaultVhost__ {
83 vhost dev { 83 vhost dev {
84 enabled on; 84 enabled on;
85 gop_cache on; 85 gop_cache on;
86 - #forward 127.0.0.1:19350; 86 + forward 127.0.0.1:19350;
87 hls { 87 hls {
88 hls off; 88 hls off;
89 hls_path ./objs/nginx/html; 89 hls_path ./objs/nginx/html;
@@ -181,7 +181,7 @@ int SrsClient::service_cycle() @@ -181,7 +181,7 @@ int SrsClient::service_cycle()
181 srs_trace("set chunk_size=%d success", chunk_size); 181 srs_trace("set chunk_size=%d success", chunk_size);
182 182
183 // find a source to publish. 183 // find a source to publish.
184 - SrsSource* source = SrsSource::find(req->get_stream_url(), req->vhost); 184 + SrsSource* source = SrsSource::find(req);
185 srs_assert(source != NULL); 185 srs_assert(source != NULL);
186 186
187 // check publish available. 187 // check publish available.
@@ -32,6 +32,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -32,6 +32,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
32 #include <srs_core_handshake.hpp> 32 #include <srs_core_handshake.hpp>
33 #include <srs_core_config.hpp> 33 #include <srs_core_config.hpp>
34 34
  35 +using namespace std;
  36 +
35 /** 37 /**
36 * the signature for packets to client. 38 * the signature for packets to client.
37 */ 39 */
@@ -75,6 +77,23 @@ SrsRequest::~SrsRequest() @@ -75,6 +77,23 @@ SrsRequest::~SrsRequest()
75 { 77 {
76 } 78 }
77 79
  80 +SrsRequest* SrsRequest::copy()
  81 +{
  82 + SrsRequest* cp = new SrsRequest();
  83 +
  84 + cp->app = app;
  85 + cp->objectEncoding = objectEncoding;
  86 + cp->pageUrl = pageUrl;
  87 + cp->port = port;
  88 + cp->schema = schema;
  89 + cp->stream = stream;
  90 + cp->swfUrl = swfUrl;
  91 + cp->tcUrl = tcUrl;
  92 + cp->vhost = vhost;
  93 +
  94 + return cp;
  95 +}
  96 +
78 int SrsRequest::discovery_app() 97 int SrsRequest::discovery_app()
79 { 98 {
80 int ret = ERROR_SUCCESS; 99 int ret = ERROR_SUCCESS;
@@ -128,7 +147,7 @@ int SrsRequest::discovery_app() @@ -128,7 +147,7 @@ int SrsRequest::discovery_app()
128 return ret; 147 return ret;
129 } 148 }
130 149
131 -std::string SrsRequest::get_stream_url() 150 +string SrsRequest::get_stream_url()
132 { 151 {
133 std::string url = ""; 152 std::string url = "";
134 153
@@ -148,7 +167,7 @@ void SrsRequest::strip() @@ -148,7 +167,7 @@ void SrsRequest::strip()
148 trim(stream, "/ \n\r\t"); 167 trim(stream, "/ \n\r\t");
149 } 168 }
150 169
151 -std::string& SrsRequest::trim(std::string& str, std::string chs) 170 +std::string& SrsRequest::trim(string& str, string chs)
152 { 171 {
153 for (int i = 0; i < (int)chs.length(); i++) { 172 for (int i = 0; i < (int)chs.length(); i++) {
154 char ch = chs.at(i); 173 char ch = chs.at(i);
@@ -243,7 +262,7 @@ int SrsRtmpClient::handshake() @@ -243,7 +262,7 @@ int SrsRtmpClient::handshake()
243 return ret; 262 return ret;
244 } 263 }
245 264
246 -int SrsRtmpClient::connect_app(std::string app, std::string tc_url) 265 +int SrsRtmpClient::connect_app(string app, string tc_url)
247 { 266 {
248 int ret = ERROR_SUCCESS; 267 int ret = ERROR_SUCCESS;
249 268
@@ -329,7 +348,7 @@ int SrsRtmpClient::create_stream(int& stream_id) @@ -329,7 +348,7 @@ int SrsRtmpClient::create_stream(int& stream_id)
329 return ret; 348 return ret;
330 } 349 }
331 350
332 -int SrsRtmpClient::play(std::string stream, int stream_id) 351 +int SrsRtmpClient::play(string stream, int stream_id)
333 { 352 {
334 int ret = ERROR_SUCCESS; 353 int ret = ERROR_SUCCESS;
335 354
@@ -371,7 +390,7 @@ int SrsRtmpClient::play(std::string stream, int stream_id) @@ -371,7 +390,7 @@ int SrsRtmpClient::play(std::string stream, int stream_id)
371 return ret; 390 return ret;
372 } 391 }
373 392
374 -int SrsRtmpClient::publish(std::string stream, int stream_id) 393 +int SrsRtmpClient::publish(string stream, int stream_id)
375 { 394 {
376 int ret = ERROR_SUCCESS; 395 int ret = ERROR_SUCCESS;
377 396
@@ -1045,7 +1064,7 @@ int SrsRtmp::start_flash_publish(int stream_id) @@ -1045,7 +1064,7 @@ int SrsRtmp::start_flash_publish(int stream_id)
1045 return ret; 1064 return ret;
1046 } 1065 }
1047 1066
1048 -int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name) 1067 +int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, string& stream_name)
1049 { 1068 {
1050 int ret = ERROR_SUCCESS; 1069 int ret = ERROR_SUCCESS;
1051 1070
@@ -1102,7 +1121,7 @@ int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int strea @@ -1102,7 +1121,7 @@ int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int strea
1102 return ret; 1121 return ret;
1103 } 1122 }
1104 1123
1105 -int SrsRtmp::identify_fmle_publish_client(SrsFMLEStartPacket* req, SrsClientType& type, std::string& stream_name) 1124 +int SrsRtmp::identify_fmle_publish_client(SrsFMLEStartPacket* req, SrsClientType& type, string& stream_name)
1106 { 1125 {
1107 int ret = ERROR_SUCCESS; 1126 int ret = ERROR_SUCCESS;
1108 1127
@@ -1126,7 +1145,7 @@ int SrsRtmp::identify_fmle_publish_client(SrsFMLEStartPacket* req, SrsClientType @@ -1126,7 +1145,7 @@ int SrsRtmp::identify_fmle_publish_client(SrsFMLEStartPacket* req, SrsClientType
1126 return ret; 1145 return ret;
1127 } 1146 }
1128 1147
1129 -int SrsRtmp::identify_flash_publish_client(SrsPublishPacket* req, SrsClientType& type, std::string& stream_name) 1148 +int SrsRtmp::identify_flash_publish_client(SrsPublishPacket* req, SrsClientType& type, string& stream_name)
1130 { 1149 {
1131 int ret = ERROR_SUCCESS; 1150 int ret = ERROR_SUCCESS;
1132 1151
@@ -65,6 +65,13 @@ struct SrsRequest @@ -65,6 +65,13 @@ struct SrsRequest
65 65
66 SrsRequest(); 66 SrsRequest();
67 virtual ~SrsRequest(); 67 virtual ~SrsRequest();
  68 +
  69 + /**
  70 + * deep copy the request, for source to use it to support reload,
  71 + * for when initialize the source, the request is valid,
  72 + * when reload it, the request maybe invalid, so need to copy it.
  73 + */
  74 + virtual SrsRequest* copy();
68 75
69 /** 76 /**
70 * disconvery vhost/app from tcUrl. 77 * disconvery vhost/app from tcUrl.
@@ -345,21 +345,22 @@ int SrsGopCache::dump(SrsConsumer* consumer, int tba, int tbv) @@ -345,21 +345,22 @@ int SrsGopCache::dump(SrsConsumer* consumer, int tba, int tbv)
345 345
346 std::map<std::string, SrsSource*> SrsSource::pool; 346 std::map<std::string, SrsSource*> SrsSource::pool;
347 347
348 -SrsSource* SrsSource::find(string stream_url, string vhost) 348 +SrsSource* SrsSource::find(SrsRequest* req)
349 { 349 {
  350 + string stream_url = req->get_stream_url();
  351 + string vhost = req->vhost;
  352 +
350 if (pool.find(stream_url) == pool.end()) { 353 if (pool.find(stream_url) == pool.end()) {
351 - pool[stream_url] = new SrsSource(stream_url, vhost);  
352 - srs_verbose("create new source for "  
353 - "url=%s, vhost=%s", stream_url.c_str(), vhost.c_str()); 354 + pool[stream_url] = new SrsSource(req);
  355 + srs_verbose("create new source for url=%s, vhost=%s", stream_url.c_str(), vhost.c_str());
354 } 356 }
355 357
356 return pool[stream_url]; 358 return pool[stream_url];
357 } 359 }
358 360
359 -SrsSource::SrsSource(string _stream_url, string _vhost) 361 +SrsSource::SrsSource(SrsRequest* _req)
360 { 362 {
361 - stream_url = _stream_url;  
362 - vhost = _vhost; 363 + req = _req->copy();
363 364
364 #ifdef SRS_HLS 365 #ifdef SRS_HLS
365 hls = new SrsHls(); 366 hls = new SrsHls();
@@ -412,13 +413,15 @@ SrsSource::~SrsSource() @@ -412,13 +413,15 @@ SrsSource::~SrsSource()
412 #ifdef SRS_FFMPEG 413 #ifdef SRS_FFMPEG
413 srs_freep(encoder); 414 srs_freep(encoder);
414 #endif 415 #endif
  416 +
  417 + srs_freep(req);
415 } 418 }
416 419
417 -int SrsSource::on_reload_gop_cache(string _vhost) 420 +int SrsSource::on_reload_gop_cache(string vhost)
418 { 421 {
419 int ret = ERROR_SUCCESS; 422 int ret = ERROR_SUCCESS;
420 423
421 - if (vhost != _vhost) { 424 + if (req->vhost != vhost) {
422 return ret; 425 return ret;
423 } 426 }
424 427
@@ -426,13 +429,32 @@ int SrsSource::on_reload_gop_cache(string _vhost) @@ -426,13 +429,32 @@ int SrsSource::on_reload_gop_cache(string _vhost)
426 bool enabled_cache = config->get_gop_cache(vhost); 429 bool enabled_cache = config->get_gop_cache(vhost);
427 430
428 srs_trace("vhost %s gop_cache changed to %d, source url=%s", 431 srs_trace("vhost %s gop_cache changed to %d, source url=%s",
429 - vhost.c_str(), enabled_cache, stream_url.c_str()); 432 + vhost.c_str(), enabled_cache, req->get_stream_url().c_str());
430 433
431 set_cache(enabled_cache); 434 set_cache(enabled_cache);
432 435
433 return ret; 436 return ret;
434 } 437 }
435 438
  439 +int SrsSource::on_reload_forward(string vhost)
  440 +{
  441 + int ret = ERROR_SUCCESS;
  442 +
  443 + if (req->vhost != vhost) {
  444 + return ret;
  445 + }
  446 +
  447 + // forwarders
  448 + destroy_forwarders();
  449 + if ((ret = create_forwarders()) != ERROR_SUCCESS) {
  450 + srs_error("create forwarders failed. ret=%d", ret);
  451 + return ret;
  452 + }
  453 + srs_trace("vhost %s forwarders reload success", vhost.c_str());
  454 +
  455 + return ret;
  456 +}
  457 +
436 bool SrsSource::can_publish() 458 bool SrsSource::can_publish()
437 { 459 {
438 return _can_publish; 460 return _can_publish;
@@ -656,31 +678,23 @@ int SrsSource::on_video(SrsCommonMessage* video) @@ -656,31 +678,23 @@ int SrsSource::on_video(SrsCommonMessage* video)
656 return ret; 678 return ret;
657 } 679 }
658 680
659 -int SrsSource::on_publish(SrsRequest* req) 681 +int SrsSource::on_publish(SrsRequest* _req)
660 { 682 {
661 int ret = ERROR_SUCCESS; 683 int ret = ERROR_SUCCESS;
662 684
  685 + // update the request object.
  686 + srs_freep(req);
  687 + req = _req->copy();
  688 + srs_assert(req);
  689 +
663 _can_publish = false; 690 _can_publish = false;
664 -  
665 - // TODO: support reload.  
666 691
667 // create forwarders 692 // create forwarders
668 - SrsConfDirective* conf = config->get_forward(req->vhost);  
669 - for (int i = 0; conf && i < (int)conf->args.size(); i++) {  
670 - std::string forward_server = conf->args.at(i);  
671 -  
672 - SrsForwarder* forwarder = new SrsForwarder();  
673 - forwarders.push_back(forwarder);  
674 -  
675 - if ((ret = forwarder->on_publish(req, forward_server)) != ERROR_SUCCESS) {  
676 - srs_error("start forwarder failed. "  
677 - "vhost=%s, app=%s, stream=%s, forward-to=%s",  
678 - req->vhost.c_str(), req->app.c_str(), req->stream.c_str(),  
679 - forward_server.c_str());  
680 - return ret;  
681 - } 693 + if ((ret = create_forwarders()) != ERROR_SUCCESS) {
  694 + srs_error("create forwarders failed. ret=%d", ret);
  695 + return ret;
682 } 696 }
683 - 697 +
684 #ifdef SRS_FFMPEG 698 #ifdef SRS_FFMPEG
685 if ((ret = encoder->on_publish(req)) != ERROR_SUCCESS) { 699 if ((ret = encoder->on_publish(req)) != ERROR_SUCCESS) {
686 return ret; 700 return ret;
@@ -698,14 +712,8 @@ int SrsSource::on_publish(SrsRequest* req) @@ -698,14 +712,8 @@ int SrsSource::on_publish(SrsRequest* req)
698 712
699 void SrsSource::on_unpublish() 713 void SrsSource::on_unpublish()
700 { 714 {
701 - // close all forwarders  
702 - std::vector<SrsForwarder*>::iterator it;  
703 - for (it = forwarders.begin(); it != forwarders.end(); ++it) {  
704 - SrsForwarder* forwarder = *it;  
705 - forwarder->on_unpublish();  
706 - srs_freep(forwarder);  
707 - }  
708 - forwarders.clear(); 715 + // destroy all forwarders
  716 + destroy_forwarders();
709 717
710 #ifdef SRS_FFMPEG 718 #ifdef SRS_FFMPEG
711 encoder->on_unpublish(); 719 encoder->on_unpublish();
@@ -776,3 +784,37 @@ void SrsSource::set_cache(bool enabled) @@ -776,3 +784,37 @@ void SrsSource::set_cache(bool enabled)
776 gop_cache->set(enabled); 784 gop_cache->set(enabled);
777 } 785 }
778 786
  787 +int SrsSource::create_forwarders()
  788 +{
  789 + int ret = ERROR_SUCCESS;
  790 +
  791 + SrsConfDirective* conf = config->get_forward(req->vhost);
  792 + for (int i = 0; conf && i < (int)conf->args.size(); i++) {
  793 + std::string forward_server = conf->args.at(i);
  794 +
  795 + SrsForwarder* forwarder = new SrsForwarder();
  796 + forwarders.push_back(forwarder);
  797 +
  798 + if ((ret = forwarder->on_publish(req, forward_server)) != ERROR_SUCCESS) {
  799 + srs_error("start forwarder failed. "
  800 + "vhost=%s, app=%s, stream=%s, forward-to=%s",
  801 + req->vhost.c_str(), req->app.c_str(), req->stream.c_str(),
  802 + forward_server.c_str());
  803 + return ret;
  804 + }
  805 + }
  806 +
  807 + return ret;
  808 +}
  809 +
  810 +void SrsSource::destroy_forwarders()
  811 +{
  812 + std::vector<SrsForwarder*>::iterator it;
  813 + for (it = forwarders.begin(); it != forwarders.end(); ++it) {
  814 + SrsForwarder* forwarder = *it;
  815 + forwarder->on_unpublish();
  816 + srs_freep(forwarder);
  817 + }
  818 + forwarders.clear();
  819 +}
  820 +
@@ -167,15 +167,14 @@ private: @@ -167,15 +167,14 @@ private:
167 public: 167 public:
168 /** 168 /**
169 * find stream by vhost/app/stream. 169 * find stream by vhost/app/stream.
170 - * @param stream_url the stream url, for example, myserver.xxx.com/app/stream  
171 - * @param vhost the vhost to constructor the object. 170 + * @param req the client request.
172 * @return the matched source, never be NULL. 171 * @return the matched source, never be NULL.
173 * @remark stream_url should without port and schema. 172 * @remark stream_url should without port and schema.
174 */ 173 */
175 - static SrsSource* find(std::string stream_url, std::string vhost); 174 + static SrsSource* find(SrsRequest* req);
176 private: 175 private:
177 - std::string vhost;  
178 - std::string stream_url; 176 + // deep copy of client request.
  177 + SrsRequest* req;
179 // to delivery stream to clients. 178 // to delivery stream to clients.
180 std::vector<SrsConsumer*> consumers; 179 std::vector<SrsConsumer*> consumers;
181 // hls handler. 180 // hls handler.
@@ -210,22 +209,35 @@ private: @@ -210,22 +209,35 @@ private:
210 // the cached audio sequence header. 209 // the cached audio sequence header.
211 SrsSharedPtrMessage* cache_sh_audio; 210 SrsSharedPtrMessage* cache_sh_audio;
212 public: 211 public:
213 - SrsSource(std::string _stream_url, std::string _vhost); 212 + /**
  213 + * @param _req the client request object,
  214 + * this object will deep copy it for reload.
  215 + */
  216 + SrsSource(SrsRequest* _req);
214 virtual ~SrsSource(); 217 virtual ~SrsSource();
215 // interface ISrsReloadHandler 218 // interface ISrsReloadHandler
216 public: 219 public:
217 - virtual int on_reload_gop_cache(std::string _vhost); 220 + virtual int on_reload_gop_cache(std::string vhost);
  221 + virtual int on_reload_forward(std::string vhost);
218 public: 222 public:
219 virtual bool can_publish(); 223 virtual bool can_publish();
220 virtual int on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata); 224 virtual int on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata);
221 virtual int on_audio(SrsCommonMessage* audio); 225 virtual int on_audio(SrsCommonMessage* audio);
222 virtual int on_video(SrsCommonMessage* video); 226 virtual int on_video(SrsCommonMessage* video);
223 - virtual int on_publish(SrsRequest* req); 227 + /**
  228 + * publish stream event notify.
  229 + * @param _req the request from client, the source will deep copy it,
  230 + * for when reload the request of client maybe invalid.
  231 + */
  232 + virtual int on_publish(SrsRequest* _req);
224 virtual void on_unpublish(); 233 virtual void on_unpublish();
225 public: 234 public:
226 virtual int create_consumer(SrsConsumer*& consumer); 235 virtual int create_consumer(SrsConsumer*& consumer);
227 virtual void on_consumer_destroy(SrsConsumer* consumer); 236 virtual void on_consumer_destroy(SrsConsumer* consumer);
228 virtual void set_cache(bool enabled); 237 virtual void set_cache(bool enabled);
  238 +private:
  239 + virtual int create_forwarders();
  240 + virtual void destroy_forwarders();
229 }; 241 };
230 242
231 #endif 243 #endif