winlin

for bug #293, http live streaming framework.

@@ -41,6 +41,7 @@ using namespace std; @@ -41,6 +41,7 @@ using namespace std;
41 #include <srs_kernel_utility.hpp> 41 #include <srs_kernel_utility.hpp>
42 #include <srs_kernel_file.hpp> 42 #include <srs_kernel_file.hpp>
43 #include <srs_kernel_flv.hpp> 43 #include <srs_kernel_flv.hpp>
  44 +#include <srs_protocol_rtmp.hpp>
44 45
45 SrsVodStream::SrsVodStream(string root_dir) 46 SrsVodStream::SrsVodStream(string root_dir)
46 : SrsGoHttpFileServer(root_dir) 47 : SrsGoHttpFileServer(root_dir)
@@ -177,6 +178,39 @@ int SrsHttpServer::initialize() @@ -177,6 +178,39 @@ int SrsHttpServer::initialize()
177 return ret; 178 return ret;
178 } 179 }
179 180
  181 +int SrsHttpServer::mount(SrsSource* s, SrsRequest* r)
  182 +{
  183 + int ret = ERROR_SUCCESS;
  184 +
  185 + if (flvs.empty()) {
  186 + srs_info("ignore mount, no flv stream configed.");
  187 + return ret;
  188 + }
  189 +
  190 + if (flvs.find(r->vhost) == flvs.end()) {
  191 + srs_info("ignore mount flv stream for disabled");
  192 + return ret;
  193 + }
  194 +
  195 + // TODO: FIXME: implements it.
  196 + return ret;
  197 +}
  198 +
  199 +void SrsHttpServer::unmount(SrsSource* s, SrsRequest* r)
  200 +{
  201 + if (flvs.empty()) {
  202 + srs_info("ignore unmount, no flv stream configed.");
  203 + return;
  204 + }
  205 +
  206 + if (flvs.find(r->vhost) == flvs.end()) {
  207 + srs_info("ignore unmount flv stream for disabled");
  208 + return;
  209 + }
  210 +
  211 + // TODO: FIXME: implements it.
  212 +}
  213 +
180 int SrsHttpServer::on_reload_vhost_http_updated() 214 int SrsHttpServer::on_reload_vhost_http_updated()
181 { 215 {
182 int ret = ERROR_SUCCESS; 216 int ret = ERROR_SUCCESS;
@@ -37,6 +37,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -37,6 +37,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
37 #include <srs_app_http.hpp> 37 #include <srs_app_http.hpp>
38 #include <srs_app_reload.hpp> 38 #include <srs_app_reload.hpp>
39 39
  40 +class SrsSource;
  41 +class SrsRequest;
40 class SrsStSocket; 42 class SrsStSocket;
41 class SrsHttpParser; 43 class SrsHttpParser;
42 class SrsHttpMessage; 44 class SrsHttpMessage;
@@ -85,6 +87,9 @@ public: @@ -85,6 +87,9 @@ public:
85 virtual ~SrsHttpServer(); 87 virtual ~SrsHttpServer();
86 public: 88 public:
87 virtual int initialize(); 89 virtual int initialize();
  90 +public:
  91 + virtual int mount(SrsSource* s, SrsRequest* r);
  92 + virtual void unmount(SrsSource* s, SrsRequest* r);
88 // interface ISrsThreadHandler. 93 // interface ISrsThreadHandler.
89 public: 94 public:
90 virtual int on_reload_vhost_http_updated(); 95 virtual int on_reload_vhost_http_updated();
@@ -393,7 +393,7 @@ int SrsRtmpConn::stream_service_cycle() @@ -393,7 +393,7 @@ int SrsRtmpConn::stream_service_cycle()
393 393
394 // find a source to serve. 394 // find a source to serve.
395 SrsSource* source = NULL; 395 SrsSource* source = NULL;
396 - if ((ret = SrsSource::find(req, &source)) != ERROR_SUCCESS) { 396 + if ((ret = SrsSource::find(req, server, &source)) != ERROR_SUCCESS) {
397 return ret; 397 return ret;
398 } 398 }
399 srs_assert(source != NULL); 399 srs_assert(source != NULL);
@@ -1074,3 +1074,23 @@ int SrsServer::on_reload_http_stream_updated() @@ -1074,3 +1074,23 @@ int SrsServer::on_reload_http_stream_updated()
1074 return ret; 1074 return ret;
1075 } 1075 }
1076 1076
  1077 +int SrsServer::on_publish(SrsSource* s, SrsRequest* r)
  1078 +{
  1079 + int ret = ERROR_SUCCESS;
  1080 +
  1081 +#ifdef SRS_AUTO_HTTP_SERVER
  1082 + if ((ret = http_stream_mux->mount(s, r)) != ERROR_SUCCESS) {
  1083 + return ret;
  1084 + }
  1085 +#endif
  1086 +
  1087 + return ret;
  1088 +}
  1089 +
  1090 +void SrsServer::on_unpublish(SrsSource* s, SrsRequest* r)
  1091 +{
  1092 +#ifdef SRS_AUTO_HTTP_SERVER
  1093 + http_stream_mux->unmount(s, r);
  1094 +#endif
  1095 +}
  1096 +
@@ -35,6 +35,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -35,6 +35,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
35 #include <srs_app_st.hpp> 35 #include <srs_app_st.hpp>
36 #include <srs_app_reload.hpp> 36 #include <srs_app_reload.hpp>
37 #include <srs_app_thread.hpp> 37 #include <srs_app_thread.hpp>
  38 +#include <srs_app_source.hpp>
38 39
39 class SrsServer; 40 class SrsServer;
40 class SrsConnection; 41 class SrsConnection;
@@ -113,7 +114,8 @@ private: @@ -113,7 +114,8 @@ private:
113 * SRS RTMP server, initialize and listen, 114 * SRS RTMP server, initialize and listen,
114 * start connection service thread, destroy client. 115 * start connection service thread, destroy client.
115 */ 116 */
116 -class SrsServer : public ISrsReloadHandler 117 +class SrsServer : virtual public ISrsReloadHandler
  118 + , virtual public ISrsSourceHandler
117 { 119 {
118 private: 120 private:
119 #ifdef SRS_AUTO_HTTP_API 121 #ifdef SRS_AUTO_HTTP_API
@@ -241,6 +243,10 @@ public: @@ -241,6 +243,10 @@ public:
241 virtual int on_reload_http_stream_enabled(); 243 virtual int on_reload_http_stream_enabled();
242 virtual int on_reload_http_stream_disabled(); 244 virtual int on_reload_http_stream_disabled();
243 virtual int on_reload_http_stream_updated(); 245 virtual int on_reload_http_stream_updated();
  246 +// interface ISrsSourceHandler
  247 +public:
  248 + virtual int on_publish(SrsSource* s, SrsRequest* r);
  249 + virtual void on_unpublish(SrsSource* s, SrsRequest* r);
244 }; 250 };
245 251
246 #endif 252 #endif
@@ -672,24 +672,33 @@ bool SrsGopCache::pure_audio() @@ -672,24 +672,33 @@ bool SrsGopCache::pure_audio()
672 return cached_video_count == 0; 672 return cached_video_count == 0;
673 } 673 }
674 674
  675 +ISrsSourceHandler::ISrsSourceHandler()
  676 +{
  677 +}
  678 +
  679 +ISrsSourceHandler::~ISrsSourceHandler()
  680 +{
  681 +}
  682 +
675 std::map<std::string, SrsSource*> SrsSource::pool; 683 std::map<std::string, SrsSource*> SrsSource::pool;
676 684
677 -int SrsSource::find(SrsRequest* req, SrsSource** ppsource) 685 +int SrsSource::find(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps)
678 { 686 {
679 int ret = ERROR_SUCCESS; 687 int ret = ERROR_SUCCESS;
680 688
681 - string stream_url = req->get_stream_url();  
682 - string vhost = req->vhost; 689 + string stream_url = r->get_stream_url();
  690 + string vhost = r->vhost;
683 691
684 if (pool.find(stream_url) == pool.end()) { 692 if (pool.find(stream_url) == pool.end()) {
685 - SrsSource* source = new SrsSource(req);  
686 - if ((ret = source->initialize()) != ERROR_SUCCESS) { 693 + SrsSource* source = new SrsSource();
  694 + if ((ret = source->initialize(r, h)) != ERROR_SUCCESS) {
687 srs_freep(source); 695 srs_freep(source);
688 return ret; 696 return ret;
689 } 697 }
690 698
691 pool[stream_url] = source; 699 pool[stream_url] = source;
692 - srs_info("create new source for url=%s, vhost=%s", stream_url.c_str(), vhost.c_str()); 700 + srs_info("create new source for url=%s, vhost=%s",
  701 + stream_url.c_str(), vhost.c_str());
693 } 702 }
694 703
695 // we always update the request of resource, 704 // we always update the request of resource,
@@ -697,8 +706,8 @@ int SrsSource::find(SrsRequest* req, SrsSource** ppsource) @@ -697,8 +706,8 @@ int SrsSource::find(SrsRequest* req, SrsSource** ppsource)
697 // and we only need to update the token of request, it's simple. 706 // and we only need to update the token of request, it's simple.
698 if (true) { 707 if (true) {
699 SrsSource* source = pool[stream_url]; 708 SrsSource* source = pool[stream_url];
700 - source->_req->update_auth(req);  
701 - *ppsource = source; 709 + source->_req->update_auth(r);
  710 + *pps = source;
702 } 711 }
703 712
704 return ret; 713 return ret;
@@ -714,9 +723,9 @@ void SrsSource::destroy() @@ -714,9 +723,9 @@ void SrsSource::destroy()
714 pool.clear(); 723 pool.clear();
715 } 724 }
716 725
717 -SrsSource::SrsSource(SrsRequest* req) 726 +SrsSource::SrsSource()
718 { 727 {
719 - _req = req->copy(); 728 + _req = NULL;
720 jitter_algorithm = SrsRtmpJitterAlgorithmOFF; 729 jitter_algorithm = SrsRtmpJitterAlgorithmOFF;
721 730
722 #ifdef SRS_AUTO_HLS 731 #ifdef SRS_AUTO_HLS
@@ -741,7 +750,7 @@ SrsSource::SrsSource(SrsRequest* req) @@ -741,7 +750,7 @@ SrsSource::SrsSource(SrsRequest* req)
741 aggregate_stream = new SrsStream(); 750 aggregate_stream = new SrsStream();
742 751
743 _srs_config->subscribe(this); 752 _srs_config->subscribe(this);
744 - atc = _srs_config->get_atc(_req->vhost); 753 + atc = false;
745 } 754 }
746 755
747 SrsSource::~SrsSource() 756 SrsSource::~SrsSource()
@@ -783,10 +792,14 @@ SrsSource::~SrsSource() @@ -783,10 +792,14 @@ SrsSource::~SrsSource()
783 srs_freep(_req); 792 srs_freep(_req);
784 } 793 }
785 794
786 -int SrsSource::initialize() 795 +int SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h)
787 { 796 {
788 int ret = ERROR_SUCCESS; 797 int ret = ERROR_SUCCESS;
789 798
  799 + handler = h;
  800 + _req = r->copy();
  801 + atc = _srs_config->get_atc(_req->vhost);
  802 +
790 #ifdef SRS_AUTO_DVR 803 #ifdef SRS_AUTO_DVR
791 if ((ret = dvr->initialize(_req)) != ERROR_SUCCESS) { 804 if ((ret = dvr->initialize(_req)) != ERROR_SUCCESS) {
792 return ret; 805 return ret;
@@ -1643,6 +1656,13 @@ int SrsSource::on_publish() @@ -1643,6 +1656,13 @@ int SrsSource::on_publish()
1643 } 1656 }
1644 #endif 1657 #endif
1645 1658
  1659 + // notify the handler.
  1660 + srs_assert(handler);
  1661 + if ((ret = handler->on_publish(this, _req)) != ERROR_SUCCESS) {
  1662 + srs_error("handle on publish failed. ret=%d", ret);
  1663 + return ret;
  1664 + }
  1665 +
1646 return ret; 1666 return ret;
1647 } 1667 }
1648 1668
@@ -1676,6 +1696,10 @@ void SrsSource::on_unpublish() @@ -1676,6 +1696,10 @@ void SrsSource::on_unpublish()
1676 1696
1677 _can_publish = true; 1697 _can_publish = true;
1678 _source_id = -1; 1698 _source_id = -1;
  1699 +
  1700 + // notify the handler.
  1701 + srs_assert(handler);
  1702 + handler->on_unpublish(this, _req);
1679 } 1703 }
1680 1704
1681 int SrsSource::create_consumer(SrsConsumer*& consumer) 1705 int SrsSource::create_consumer(SrsConsumer*& consumer)
@@ -337,6 +337,27 @@ public: @@ -337,6 +337,27 @@ public:
337 }; 337 };
338 338
339 /** 339 /**
  340 +* the handler to handle the event of srs source.
  341 +* for example, the http flv streaming module handle the event and
  342 +* mount http when rtmp start publishing.
  343 +*/
  344 +class ISrsSourceHandler
  345 +{
  346 +public:
  347 + ISrsSourceHandler();
  348 + virtual ~ISrsSourceHandler();
  349 +public:
  350 + /**
  351 + * when stream start publish, mount stream.
  352 + */
  353 + virtual int on_publish(SrsSource* s, SrsRequest* r) = 0;
  354 + /**
  355 + * when stream stop publish, unmount stream.
  356 + */
  357 + virtual void on_unpublish(SrsSource* s, SrsRequest* r) = 0;
  358 +};
  359 +
  360 +/**
340 * live streaming source. 361 * live streaming source.
341 */ 362 */
342 class SrsSource : public ISrsReloadHandler 363 class SrsSource : public ISrsReloadHandler
@@ -346,11 +367,11 @@ private: @@ -346,11 +367,11 @@ private:
346 public: 367 public:
347 /** 368 /**
348 * find stream by vhost/app/stream. 369 * find stream by vhost/app/stream.
349 - * @param req the client request.  
350 - * @param ppsource the matched source, if success never be NULL.  
351 - * @remark stream_url should without port and schema. 370 + * @param r the client request.
  371 + * @param h the event handler for source.
  372 + * @param pps the matched source, if success never be NULL.
352 */ 373 */
353 - static int find(SrsRequest* req, SrsSource** ppsource); 374 + static int find(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps);
354 /** 375 /**
355 * when system exit, destroy the sources, 376 * when system exit, destroy the sources,
356 * for gmc to analysis mem leaks. 377 * for gmc to analysis mem leaks.
@@ -390,6 +411,8 @@ private: @@ -390,6 +411,8 @@ private:
390 std::vector<SrsForwarder*> forwarders; 411 std::vector<SrsForwarder*> forwarders;
391 // for aggregate message 412 // for aggregate message
392 SrsStream* aggregate_stream; 413 SrsStream* aggregate_stream;
  414 + // the event handler.
  415 + ISrsSourceHandler* handler;
393 private: 416 private:
394 /** 417 /**
395 * the sample rate of audio in metadata. 418 * the sample rate of audio in metadata.
@@ -421,10 +444,11 @@ public: @@ -421,10 +444,11 @@ public:
421 * @param _req the client request object, 444 * @param _req the client request object,
422 * this object will deep copy it for reload. 445 * this object will deep copy it for reload.
423 */ 446 */
424 - SrsSource(SrsRequest* req); 447 + SrsSource();
425 virtual ~SrsSource(); 448 virtual ~SrsSource();
  449 +// initialize, get and setter.
426 public: 450 public:
427 - virtual int initialize(); 451 + virtual int initialize(SrsRequest* r, ISrsSourceHandler* h);
428 // interface ISrsReloadHandler 452 // interface ISrsReloadHandler
429 public: 453 public:
430 virtual int on_reload_vhost_atc(std::string vhost); 454 virtual int on_reload_vhost_atc(std::string vhost);