winlin

refine source, rename req to _req

@@ -24,6 +24,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -24,6 +24,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24 #include <srs_app_edge.hpp> 24 #include <srs_app_edge.hpp>
25 25
26 #include <srs_kernel_error.hpp> 26 #include <srs_kernel_error.hpp>
  27 +#include <srs_protocol_rtmp.hpp>
27 28
28 SrsEdge::SrsEdge() 29 SrsEdge::SrsEdge()
29 { 30 {
@@ -34,6 +35,15 @@ SrsEdge::~SrsEdge() @@ -34,6 +35,15 @@ SrsEdge::~SrsEdge()
34 { 35 {
35 } 36 }
36 37
  38 +int SrsEdge::initialize(SrsRequest* req)
  39 +{
  40 + int ret = ERROR_SUCCESS;
  41 +
  42 + _req = req;
  43 +
  44 + return ret;
  45 +}
  46 +
37 int SrsEdge::on_client_play() 47 int SrsEdge::on_client_play()
38 { 48 {
39 int ret = ERROR_SUCCESS; 49 int ret = ERROR_SUCCESS;
@@ -30,6 +30,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -30,6 +30,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
30 30
31 #include <srs_core.hpp> 31 #include <srs_core.hpp>
32 32
  33 +class SrsRequest;
  34 +
33 /** 35 /**
34 * the state of edge 36 * the state of edge
35 */ 37 */
@@ -49,11 +51,13 @@ enum SrsEdgeState @@ -49,11 +51,13 @@ enum SrsEdgeState
49 class SrsEdge 51 class SrsEdge
50 { 52 {
51 private: 53 private:
  54 + SrsRequest* _req;
52 SrsEdgeState state; 55 SrsEdgeState state;
53 public: 56 public:
54 SrsEdge(); 57 SrsEdge();
55 virtual ~SrsEdge(); 58 virtual ~SrsEdge();
56 public: 59 public:
  60 + virtual int initialize(SrsRequest* req);
57 /** 61 /**
58 * when client play stream on edge. 62 * when client play stream on edge.
59 */ 63 */
@@ -515,7 +515,7 @@ int SrsRtmpConn::fmle_publish(SrsSource* source) @@ -515,7 +515,7 @@ int SrsRtmpConn::fmle_publish(SrsSource* source)
515 SrsPithyPrint pithy_print(SRS_STAGE_PUBLISH_USER); 515 SrsPithyPrint pithy_print(SRS_STAGE_PUBLISH_USER);
516 516
517 // notify the hls to prepare when publish start. 517 // notify the hls to prepare when publish start.
518 - if ((ret = source->on_publish(req)) != ERROR_SUCCESS) { 518 + if ((ret = source->on_publish()) != ERROR_SUCCESS) {
519 srs_error("fmle hls on_publish failed. ret=%d", ret); 519 srs_error("fmle hls on_publish failed. ret=%d", ret);
520 return ret; 520 return ret;
521 } 521 }
@@ -584,7 +584,7 @@ int SrsRtmpConn::flash_publish(SrsSource* source) @@ -584,7 +584,7 @@ int SrsRtmpConn::flash_publish(SrsSource* source)
584 SrsPithyPrint pithy_print(SRS_STAGE_PUBLISH_USER); 584 SrsPithyPrint pithy_print(SRS_STAGE_PUBLISH_USER);
585 585
586 // notify the hls to prepare when publish start. 586 // notify the hls to prepare when publish start.
587 - if ((ret = source->on_publish(req)) != ERROR_SUCCESS) { 587 + if ((ret = source->on_publish()) != ERROR_SUCCESS) {
588 srs_error("flash hls on_publish failed. ret=%d", ret); 588 srs_error("flash hls on_publish failed. ret=%d", ret);
589 return ret; 589 return ret;
590 } 590 }
@@ -437,9 +437,9 @@ int SrsSource::find(SrsRequest* req, SrsSource** ppsource) @@ -437,9 +437,9 @@ int SrsSource::find(SrsRequest* req, SrsSource** ppsource)
437 return ret; 437 return ret;
438 } 438 }
439 439
440 -SrsSource::SrsSource(SrsRequest* _req) 440 +SrsSource::SrsSource(SrsRequest* req)
441 { 441 {
442 - req = _req->copy(); 442 + _req = req->copy();
443 443
444 #ifdef SRS_AUTO_HLS 444 #ifdef SRS_AUTO_HLS
445 hls = new SrsHls(this); 445 hls = new SrsHls(this);
@@ -460,7 +460,7 @@ SrsSource::SrsSource(SrsRequest* _req) @@ -460,7 +460,7 @@ SrsSource::SrsSource(SrsRequest* _req)
460 gop_cache = new SrsGopCache(); 460 gop_cache = new SrsGopCache();
461 461
462 _srs_config->subscribe(this); 462 _srs_config->subscribe(this);
463 - atc = _srs_config->get_atc(req->vhost); 463 + atc = _srs_config->get_atc(_req->vhost);
464 } 464 }
465 465
466 SrsSource::~SrsSource() 466 SrsSource::~SrsSource()
@@ -502,7 +502,7 @@ SrsSource::~SrsSource() @@ -502,7 +502,7 @@ SrsSource::~SrsSource()
502 srs_freep(encoder); 502 srs_freep(encoder);
503 #endif 503 #endif
504 504
505 - srs_freep(req); 505 + srs_freep(_req);
506 } 506 }
507 507
508 int SrsSource::initialize() 508 int SrsSource::initialize()
@@ -510,11 +510,15 @@ int SrsSource::initialize() @@ -510,11 +510,15 @@ int SrsSource::initialize()
510 int ret = ERROR_SUCCESS; 510 int ret = ERROR_SUCCESS;
511 511
512 #ifdef SRS_AUTO_DVR 512 #ifdef SRS_AUTO_DVR
513 - if ((ret = dvr->initialize(req)) != ERROR_SUCCESS) { 513 + if ((ret = dvr->initialize(_req)) != ERROR_SUCCESS) {
514 return ret; 514 return ret;
515 } 515 }
516 #endif 516 #endif
517 517
  518 + if ((ret = edge->initialize(_req)) != ERROR_SUCCESS) {
  519 + return ret;
  520 + }
  521 +
518 return ret; 522 return ret;
519 } 523 }
520 524
@@ -522,7 +526,7 @@ int SrsSource::on_reload_vhost_atc(string vhost) @@ -522,7 +526,7 @@ int SrsSource::on_reload_vhost_atc(string vhost)
522 { 526 {
523 int ret = ERROR_SUCCESS; 527 int ret = ERROR_SUCCESS;
524 528
525 - if (req->vhost != vhost) { 529 + if (_req->vhost != vhost) {
526 return ret; 530 return ret;
527 } 531 }
528 532
@@ -541,7 +545,7 @@ int SrsSource::on_reload_vhost_gop_cache(string vhost) @@ -541,7 +545,7 @@ int SrsSource::on_reload_vhost_gop_cache(string vhost)
541 { 545 {
542 int ret = ERROR_SUCCESS; 546 int ret = ERROR_SUCCESS;
543 547
544 - if (req->vhost != vhost) { 548 + if (_req->vhost != vhost) {
545 return ret; 549 return ret;
546 } 550 }
547 551
@@ -549,7 +553,7 @@ int SrsSource::on_reload_vhost_gop_cache(string vhost) @@ -549,7 +553,7 @@ int SrsSource::on_reload_vhost_gop_cache(string vhost)
549 bool enabled_cache = _srs_config->get_gop_cache(vhost); 553 bool enabled_cache = _srs_config->get_gop_cache(vhost);
550 554
551 srs_trace("vhost %s gop_cache changed to %d, source url=%s", 555 srs_trace("vhost %s gop_cache changed to %d, source url=%s",
552 - vhost.c_str(), enabled_cache, req->get_stream_url().c_str()); 556 + vhost.c_str(), enabled_cache, _req->get_stream_url().c_str());
553 557
554 set_cache(enabled_cache); 558 set_cache(enabled_cache);
555 559
@@ -560,11 +564,11 @@ int SrsSource::on_reload_vhost_queue_length(string vhost) @@ -560,11 +564,11 @@ int SrsSource::on_reload_vhost_queue_length(string vhost)
560 { 564 {
561 int ret = ERROR_SUCCESS; 565 int ret = ERROR_SUCCESS;
562 566
563 - if (req->vhost != vhost) { 567 + if (_req->vhost != vhost) {
564 return ret; 568 return ret;
565 } 569 }
566 570
567 - double queue_size = _srs_config->get_queue_length(req->vhost); 571 + double queue_size = _srs_config->get_queue_length(_req->vhost);
568 572
569 if (true) { 573 if (true) {
570 std::vector<SrsConsumer*>::iterator it; 574 std::vector<SrsConsumer*>::iterator it;
@@ -595,7 +599,7 @@ int SrsSource::on_reload_vhost_forward(string vhost) @@ -595,7 +599,7 @@ int SrsSource::on_reload_vhost_forward(string vhost)
595 { 599 {
596 int ret = ERROR_SUCCESS; 600 int ret = ERROR_SUCCESS;
597 601
598 - if (req->vhost != vhost) { 602 + if (_req->vhost != vhost) {
599 return ret; 603 return ret;
600 } 604 }
601 605
@@ -615,13 +619,13 @@ int SrsSource::on_reload_vhost_hls(string vhost) @@ -615,13 +619,13 @@ int SrsSource::on_reload_vhost_hls(string vhost)
615 { 619 {
616 int ret = ERROR_SUCCESS; 620 int ret = ERROR_SUCCESS;
617 621
618 - if (req->vhost != vhost) { 622 + if (_req->vhost != vhost) {
619 return ret; 623 return ret;
620 } 624 }
621 625
622 #ifdef SRS_AUTO_HLS 626 #ifdef SRS_AUTO_HLS
623 hls->on_unpublish(); 627 hls->on_unpublish();
624 - if ((ret = hls->on_publish(req)) != ERROR_SUCCESS) { 628 + if ((ret = hls->on_publish(_req)) != ERROR_SUCCESS) {
625 srs_error("hls publish failed. ret=%d", ret); 629 srs_error("hls publish failed. ret=%d", ret);
626 return ret; 630 return ret;
627 } 631 }
@@ -635,7 +639,7 @@ int SrsSource::on_reload_vhost_dvr(string vhost) @@ -635,7 +639,7 @@ int SrsSource::on_reload_vhost_dvr(string vhost)
635 { 639 {
636 int ret = ERROR_SUCCESS; 640 int ret = ERROR_SUCCESS;
637 641
638 - if (req->vhost != vhost) { 642 + if (_req->vhost != vhost) {
639 return ret; 643 return ret;
640 } 644 }
641 645
@@ -644,12 +648,12 @@ int SrsSource::on_reload_vhost_dvr(string vhost) @@ -644,12 +648,12 @@ int SrsSource::on_reload_vhost_dvr(string vhost)
644 dvr->on_unpublish(); 648 dvr->on_unpublish();
645 649
646 // reinitialize the dvr, update plan. 650 // reinitialize the dvr, update plan.
647 - if ((ret = dvr->initialize(req)) != ERROR_SUCCESS) { 651 + if ((ret = dvr->initialize(_req)) != ERROR_SUCCESS) {
648 return ret; 652 return ret;
649 } 653 }
650 654
651 // start to publish by new plan. 655 // start to publish by new plan.
652 - if ((ret = dvr->on_publish(req)) != ERROR_SUCCESS) { 656 + if ((ret = dvr->on_publish(_req)) != ERROR_SUCCESS) {
653 srs_error("dvr publish failed. ret=%d", ret); 657 srs_error("dvr publish failed. ret=%d", ret);
654 return ret; 658 return ret;
655 } 659 }
@@ -664,13 +668,13 @@ int SrsSource::on_reload_vhost_transcode(string vhost) @@ -664,13 +668,13 @@ int SrsSource::on_reload_vhost_transcode(string vhost)
664 { 668 {
665 int ret = ERROR_SUCCESS; 669 int ret = ERROR_SUCCESS;
666 670
667 - if (req->vhost != vhost) { 671 + if (_req->vhost != vhost) {
668 return ret; 672 return ret;
669 } 673 }
670 674
671 #ifdef SRS_AUTO_TRANSCODE 675 #ifdef SRS_AUTO_TRANSCODE
672 encoder->on_unpublish(); 676 encoder->on_unpublish();
673 - if ((ret = encoder->on_publish(req)) != ERROR_SUCCESS) { 677 + if ((ret = encoder->on_publish(_req)) != ERROR_SUCCESS) {
674 srs_error("start encoder failed. ret=%d", ret); 678 srs_error("start encoder failed. ret=%d", ret);
675 return ret; 679 return ret;
676 } 680 }
@@ -1038,14 +1042,12 @@ int SrsSource::on_video(SrsCommonMessage* video) @@ -1038,14 +1042,12 @@ int SrsSource::on_video(SrsCommonMessage* video)
1038 return ret; 1042 return ret;
1039 } 1043 }
1040 1044
1041 -int SrsSource::on_publish(SrsRequest* _req) 1045 +int SrsSource::on_publish()
1042 { 1046 {
1043 int ret = ERROR_SUCCESS; 1047 int ret = ERROR_SUCCESS;
1044 1048
1045 // update the request object. 1049 // update the request object.
1046 - srs_freep(req);  
1047 - req = _req->copy();  
1048 - srs_assert(req); 1050 + srs_assert(_req);
1049 1051
1050 _can_publish = false; 1052 _can_publish = false;
1051 1053
@@ -1056,21 +1058,21 @@ int SrsSource::on_publish(SrsRequest* _req) @@ -1056,21 +1058,21 @@ int SrsSource::on_publish(SrsRequest* _req)
1056 } 1058 }
1057 1059
1058 #ifdef SRS_AUTO_TRANSCODE 1060 #ifdef SRS_AUTO_TRANSCODE
1059 - if ((ret = encoder->on_publish(req)) != ERROR_SUCCESS) { 1061 + if ((ret = encoder->on_publish(_req)) != ERROR_SUCCESS) {
1060 srs_error("start encoder failed. ret=%d", ret); 1062 srs_error("start encoder failed. ret=%d", ret);
1061 return ret; 1063 return ret;
1062 } 1064 }
1063 #endif 1065 #endif
1064 1066
1065 #ifdef SRS_AUTO_HLS 1067 #ifdef SRS_AUTO_HLS
1066 - if ((ret = hls->on_publish(req)) != ERROR_SUCCESS) { 1068 + if ((ret = hls->on_publish(_req)) != ERROR_SUCCESS) {
1067 srs_error("start hls failed. ret=%d", ret); 1069 srs_error("start hls failed. ret=%d", ret);
1068 return ret; 1070 return ret;
1069 } 1071 }
1070 #endif 1072 #endif
1071 1073
1072 #ifdef SRS_AUTO_DVR 1074 #ifdef SRS_AUTO_DVR
1073 - if ((ret = dvr->on_publish(req)) != ERROR_SUCCESS) { 1075 + if ((ret = dvr->on_publish(_req)) != ERROR_SUCCESS) {
1074 srs_error("start dvr failed. ret=%d", ret); 1076 srs_error("start dvr failed. ret=%d", ret);
1075 return ret; 1077 return ret;
1076 } 1078 }
@@ -1116,7 +1118,7 @@ void SrsSource::on_unpublish() @@ -1116,7 +1118,7 @@ void SrsSource::on_unpublish()
1116 consumer = new SrsConsumer(this); 1118 consumer = new SrsConsumer(this);
1117 consumers.push_back(consumer); 1119 consumers.push_back(consumer);
1118 1120
1119 - double queue_size = _srs_config->get_queue_length(req->vhost); 1121 + double queue_size = _srs_config->get_queue_length(_req->vhost);
1120 consumer->set_queue_size(queue_size); 1122 consumer->set_queue_size(queue_size);
1121 1123
1122 if (cache_metadata && (ret = consumer->enqueue(cache_metadata->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) { 1124 if (cache_metadata && (ret = consumer->enqueue(cache_metadata->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {
@@ -1187,20 +1189,20 @@ int SrsSource::create_forwarders() @@ -1187,20 +1189,20 @@ int SrsSource::create_forwarders()
1187 { 1189 {
1188 int ret = ERROR_SUCCESS; 1190 int ret = ERROR_SUCCESS;
1189 1191
1190 - SrsConfDirective* conf = _srs_config->get_forward(req->vhost); 1192 + SrsConfDirective* conf = _srs_config->get_forward(_req->vhost);
1191 for (int i = 0; conf && i < (int)conf->args.size(); i++) { 1193 for (int i = 0; conf && i < (int)conf->args.size(); i++) {
1192 std::string forward_server = conf->args.at(i); 1194 std::string forward_server = conf->args.at(i);
1193 1195
1194 SrsForwarder* forwarder = new SrsForwarder(this); 1196 SrsForwarder* forwarder = new SrsForwarder(this);
1195 forwarders.push_back(forwarder); 1197 forwarders.push_back(forwarder);
1196 1198
1197 - double queue_size = _srs_config->get_queue_length(req->vhost); 1199 + double queue_size = _srs_config->get_queue_length(_req->vhost);
1198 forwarder->set_queue_size(queue_size); 1200 forwarder->set_queue_size(queue_size);
1199 1201
1200 - if ((ret = forwarder->on_publish(req, forward_server)) != ERROR_SUCCESS) { 1202 + if ((ret = forwarder->on_publish(_req, forward_server)) != ERROR_SUCCESS) {
1201 srs_error("start forwarder failed. " 1203 srs_error("start forwarder failed. "
1202 "vhost=%s, app=%s, stream=%s, forward-to=%s", 1204 "vhost=%s, app=%s, stream=%s, forward-to=%s",
1203 - req->vhost.c_str(), req->app.c_str(), req->stream.c_str(), 1205 + _req->vhost.c_str(), _req->app.c_str(), _req->stream.c_str(),
1204 forward_server.c_str()); 1206 forward_server.c_str());
1205 return ret; 1207 return ret;
1206 } 1208 }
@@ -220,7 +220,7 @@ public: @@ -220,7 +220,7 @@ public:
220 static int find(SrsRequest* req, SrsSource** ppsource); 220 static int find(SrsRequest* req, SrsSource** ppsource);
221 private: 221 private:
222 // deep copy of client request. 222 // deep copy of client request.
223 - SrsRequest* req; 223 + SrsRequest* _req;
224 // to delivery stream to clients. 224 // to delivery stream to clients.
225 std::vector<SrsConsumer*> consumers; 225 std::vector<SrsConsumer*> consumers;
226 // hls handler. 226 // hls handler.
@@ -272,7 +272,7 @@ public: @@ -272,7 +272,7 @@ public:
272 * @param _req the client request object, 272 * @param _req the client request object,
273 * this object will deep copy it for reload. 273 * this object will deep copy it for reload.
274 */ 274 */
275 - SrsSource(SrsRequest* _req); 275 + SrsSource(SrsRequest* req);
276 virtual ~SrsSource(); 276 virtual ~SrsSource();
277 public: 277 public:
278 virtual int initialize(); 278 virtual int initialize();
@@ -302,7 +302,7 @@ public: @@ -302,7 +302,7 @@ public:
302 * @param _req the request from client, the source will deep copy it, 302 * @param _req the request from client, the source will deep copy it,
303 * for when reload the request of client maybe invalid. 303 * for when reload the request of client maybe invalid.
304 */ 304 */
305 - virtual int on_publish(SrsRequest* _req); 305 + virtual int on_publish();
306 virtual void on_unpublish(); 306 virtual void on_unpublish();
307 public: 307 public:
308 virtual int create_consumer(SrsConsumer*& consumer); 308 virtual int create_consumer(SrsConsumer*& consumer);