winlin

fix bug of edge, one fd for one thread

@@ -55,6 +55,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -55,6 +55,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
55 // when edge timeout, retry next. 55 // when edge timeout, retry next.
56 #define SRS_EDGE_FORWARDER_TIMEOUT_US (int64_t)(3*1000*1000LL) 56 #define SRS_EDGE_FORWARDER_TIMEOUT_US (int64_t)(3*1000*1000LL)
57 57
  58 +// when edge error, wait for quit
  59 +#define SRS_EDGE_FORWARDER_ERROR_US (int64_t)(50*1000LL)
  60 +
58 SrsEdgeIngester::SrsEdgeIngester() 61 SrsEdgeIngester::SrsEdgeIngester()
59 { 62 {
60 io = NULL; 63 io = NULL;
@@ -165,8 +168,10 @@ int SrsEdgeIngester::ingest() @@ -165,8 +168,10 @@ int SrsEdgeIngester::ingest()
165 168
166 // pithy print 169 // pithy print
167 if (pithy_print.can_print()) { 170 if (pithy_print.can_print()) {
168 - srs_trace("<- time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",  
169 - pithy_print.age(), client->get_send_bytes(), client->get_recv_bytes(), client->get_send_kbps(), client->get_recv_kbps()); 171 + srs_trace("<- "SRS_LOG_ID_EDGE_PLAY
  172 + " time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",
  173 + pithy_print.age(), client->get_send_bytes(), client->get_recv_bytes(),
  174 + client->get_send_kbps(), client->get_recv_kbps());
170 } 175 }
171 176
172 // read from client. 177 // read from client.
@@ -323,6 +328,8 @@ SrsEdgeForwarder::SrsEdgeForwarder() @@ -323,6 +328,8 @@ SrsEdgeForwarder::SrsEdgeForwarder()
323 stream_id = 0; 328 stream_id = 0;
324 stfd = NULL; 329 stfd = NULL;
325 pthread = new SrsThread(this, SRS_EDGE_FORWARDER_SLEEP_US); 330 pthread = new SrsThread(this, SRS_EDGE_FORWARDER_SLEEP_US);
  331 + queue = new SrsMessageQueue();
  332 + send_error_code = ERROR_SUCCESS;
326 } 333 }
327 334
328 SrsEdgeForwarder::~SrsEdgeForwarder() 335 SrsEdgeForwarder::~SrsEdgeForwarder()
@@ -330,6 +337,11 @@ SrsEdgeForwarder::~SrsEdgeForwarder() @@ -330,6 +337,11 @@ SrsEdgeForwarder::~SrsEdgeForwarder()
330 stop(); 337 stop();
331 } 338 }
332 339
  340 +void SrsEdgeForwarder::set_queue_size(double queue_size)
  341 +{
  342 + return queue->set_queue_size(queue_size);
  343 +}
  344 +
333 int SrsEdgeForwarder::initialize(SrsSource* source, SrsPublishEdge* edge, SrsRequest* req) 345 int SrsEdgeForwarder::initialize(SrsSource* source, SrsPublishEdge* edge, SrsRequest* req)
334 { 346 {
335 int ret = ERROR_SUCCESS; 347 int ret = ERROR_SUCCESS;
@@ -345,6 +357,8 @@ int SrsEdgeForwarder::start() @@ -345,6 +357,8 @@ int SrsEdgeForwarder::start()
345 { 357 {
346 int ret = ERROR_SUCCESS; 358 int ret = ERROR_SUCCESS;
347 359
  360 + send_error_code = ERROR_SUCCESS;
  361 +
348 if ((ret = connect_server()) != ERROR_SUCCESS) { 362 if ((ret = connect_server()) != ERROR_SUCCESS) {
349 return ret; 363 return ret;
350 } 364 }
@@ -391,7 +405,7 @@ int SrsEdgeForwarder::cycle() @@ -391,7 +405,7 @@ int SrsEdgeForwarder::cycle()
391 { 405 {
392 int ret = ERROR_SUCCESS; 406 int ret = ERROR_SUCCESS;
393 407
394 - client->set_recv_timeout(SRS_EDGE_FORWARDER_TIMEOUT_US); 408 + client->set_recv_timeout(SRS_PULSE_TIMEOUT_US);
395 409
396 SrsPithyPrint pithy_print(SRS_STAGE_EDGE); 410 SrsPithyPrint pithy_print(SRS_STAGE_EDGE);
397 411
@@ -399,24 +413,63 @@ int SrsEdgeForwarder::cycle() @@ -399,24 +413,63 @@ int SrsEdgeForwarder::cycle()
399 // switch to other st-threads. 413 // switch to other st-threads.
400 st_usleep(0); 414 st_usleep(0);
401 415
  416 + if (send_error_code != ERROR_SUCCESS) {
  417 + st_usleep(SRS_EDGE_FORWARDER_ERROR_US);
  418 + continue;
  419 + }
  420 +
  421 + // read from client.
  422 + if (true) {
  423 + SrsCommonMessage* msg = NULL;
  424 + ret = client->recv_message(&msg);
  425 +
  426 + srs_verbose("edge loop recv message. ret=%d", ret);
  427 + if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) {
  428 + srs_error("edge forwarder recv server control message failed. ret=%d", ret);
  429 + send_error_code = ret;
  430 + continue;
  431 + }
  432 +
  433 + srs_freep(msg);
  434 + }
  435 +
  436 + // forward all messages.
  437 + int count = 0;
  438 + SrsSharedPtrMessage** msgs = NULL;
  439 + if ((ret = queue->get_packets(0, msgs, count)) != ERROR_SUCCESS) {
  440 + srs_error("get message to forward to origin failed. ret=%d", ret);
  441 + return ret;
  442 + }
  443 +
402 pithy_print.elapse(); 444 pithy_print.elapse();
403 445
404 // pithy print 446 // pithy print
405 if (pithy_print.can_print()) { 447 if (pithy_print.can_print()) {
406 - srs_trace("-> time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",  
407 - pithy_print.age(), client->get_send_bytes(), client->get_recv_bytes(), client->get_send_kbps(), client->get_recv_kbps()); 448 + srs_trace("-> "SRS_LOG_ID_EDGE_PUBLISH
  449 + " time=%"PRId64", msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",
  450 + pithy_print.age(), count, client->get_send_bytes(), client->get_recv_bytes(),
  451 + client->get_send_kbps(), client->get_recv_kbps());
408 } 452 }
409 453
410 - // read from client.  
411 - SrsCommonMessage* msg = NULL;  
412 - if ((ret = client->recv_message(&msg)) != ERROR_SUCCESS) {  
413 - srs_info("ignore forwarder recv origin server message failed. ret=%d", ret); 454 + // ignore when no messages.
  455 + if (count <= 0) {
  456 + srs_verbose("no packets to forward.");
414 continue; 457 continue;
415 } 458 }
416 - srs_verbose("edge loop recv message. ret=%d", ret); 459 + SrsAutoFree(SrsSharedPtrMessage*, msgs, true);
  460 +
  461 + // all msgs to forward.
  462 + for (int i = 0; i < count; i++) {
  463 + SrsSharedPtrMessage* msg = msgs[i];
417 464
418 srs_assert(msg); 465 srs_assert(msg);
419 - SrsAutoFree(SrsCommonMessage, msg, false); 466 + msgs[i] = NULL;
  467 +
  468 + if ((ret = client->send_message(msg)) != ERROR_SUCCESS) {
  469 + srs_error("edge publish forwarder send message to server failed. ret=%d", ret);
  470 + return ret;
  471 + }
  472 + }
420 } 473 }
421 474
422 return ret; 475 return ret;
@@ -426,6 +479,11 @@ int SrsEdgeForwarder::proxy(SrsCommonMessage* msg) @@ -426,6 +479,11 @@ int SrsEdgeForwarder::proxy(SrsCommonMessage* msg)
426 { 479 {
427 int ret = ERROR_SUCCESS; 480 int ret = ERROR_SUCCESS;
428 481
  482 + if ((ret = send_error_code) != ERROR_SUCCESS) {
  483 + srs_error("publish edge proxy thread send error, ret=%d", ret);
  484 + return ret;
  485 + }
  486 +
429 // the msg is auto free by source, 487 // the msg is auto free by source,
430 // so we just ignore, or copy then send it. 488 // so we just ignore, or copy then send it.
431 if (msg->size <= 0 489 if (msg->size <= 0
@@ -445,9 +503,8 @@ int SrsEdgeForwarder::proxy(SrsCommonMessage* msg) @@ -445,9 +503,8 @@ int SrsEdgeForwarder::proxy(SrsCommonMessage* msg)
445 srs_verbose("initialize shared ptr msg success."); 503 srs_verbose("initialize shared ptr msg success.");
446 504
447 copy->header.stream_id = stream_id; 505 copy->header.stream_id = stream_id;
448 - if ((ret = client->send_message(copy->copy())) != ERROR_SUCCESS) {  
449 - srs_error("send client message to origin failed. ret=%d", ret);  
450 - return ret; 506 + if ((ret = queue->enqueue(copy->copy())) != ERROR_SUCCESS) {
  507 + srs_error("enqueue edge publish msg failed. ret=%d", ret);
451 } 508 }
452 509
453 return ret; 510 return ret;
@@ -620,6 +677,11 @@ SrsPublishEdge::~SrsPublishEdge() @@ -620,6 +677,11 @@ SrsPublishEdge::~SrsPublishEdge()
620 srs_freep(forwarder); 677 srs_freep(forwarder);
621 } 678 }
622 679
  680 +void SrsPublishEdge::set_queue_size(double queue_size)
  681 +{
  682 + return forwarder->set_queue_size(queue_size);
  683 +}
  684 +
623 int SrsPublishEdge::initialize(SrsSource* source, SrsRequest* req) 685 int SrsPublishEdge::initialize(SrsSource* source, SrsRequest* req)
624 { 686 {
625 int ret = ERROR_SUCCESS; 687 int ret = ERROR_SUCCESS;
@@ -651,11 +713,15 @@ int SrsPublishEdge::on_client_publish() @@ -651,11 +713,15 @@ int SrsPublishEdge::on_client_publish()
651 return ret; 713 return ret;
652 } 714 }
653 715
  716 + if ((ret = forwarder->start()) != ERROR_SUCCESS) {
  717 + return ret;
  718 + }
  719 +
654 SrsEdgeState pstate = state; 720 SrsEdgeState pstate = state;
655 state = SrsEdgeStatePublish; 721 state = SrsEdgeStatePublish;
656 srs_trace("edge change from %d to state %d (forward publish).", pstate, state); 722 srs_trace("edge change from %d to state %d (forward publish).", pstate, state);
657 723
658 - return forwarder->start(); 724 + return ret;
659 } 725 }
660 726
661 int SrsPublishEdge::on_proxy_publish(SrsCommonMessage* msg) 727 int SrsPublishEdge::on_proxy_publish(SrsCommonMessage* msg)
@@ -41,6 +41,7 @@ class SrsPlayEdge; @@ -41,6 +41,7 @@ class SrsPlayEdge;
41 class SrsPublishEdge; 41 class SrsPublishEdge;
42 class SrsRtmpClient; 42 class SrsRtmpClient;
43 class SrsCommonMessage; 43 class SrsCommonMessage;
  44 +class SrsMessageQueue;
44 class ISrsProtocolReaderWriter; 45 class ISrsProtocolReaderWriter;
45 46
46 /** 47 /**
@@ -117,10 +118,23 @@ private: @@ -117,10 +118,23 @@ private:
117 ISrsProtocolReaderWriter* io; 118 ISrsProtocolReaderWriter* io;
118 SrsRtmpClient* client; 119 SrsRtmpClient* client;
119 int origin_index; 120 int origin_index;
  121 + /**
  122 + * we must ensure one thread one fd principle,
  123 + * that is, a fd must be write/read by the one thread.
  124 + * the publish service thread will proxy(msg), and the edge forward thread
  125 + * will cycle(), so we use queue for cycle to send the msg of proxy.
  126 + */
  127 + SrsMessageQueue* queue;
  128 + /**
  129 + * error code of send, for edge proxy thread to query.
  130 + */
  131 + int send_error_code;
120 public: 132 public:
121 SrsEdgeForwarder(); 133 SrsEdgeForwarder();
122 virtual ~SrsEdgeForwarder(); 134 virtual ~SrsEdgeForwarder();
123 public: 135 public:
  136 + virtual void set_queue_size(double queue_size);
  137 +public:
124 virtual int initialize(SrsSource* source, SrsPublishEdge* edge, SrsRequest* req); 138 virtual int initialize(SrsSource* source, SrsPublishEdge* edge, SrsRequest* req);
125 virtual int start(); 139 virtual int start();
126 virtual void stop(); 140 virtual void stop();
@@ -180,6 +194,8 @@ public: @@ -180,6 +194,8 @@ public:
180 SrsPublishEdge(); 194 SrsPublishEdge();
181 virtual ~SrsPublishEdge(); 195 virtual ~SrsPublishEdge();
182 public: 196 public:
  197 + virtual void set_queue_size(double queue_size);
  198 +public:
183 virtual int initialize(SrsSource* source, SrsRequest* req); 199 virtual int initialize(SrsSource* source, SrsRequest* req);
184 /** 200 /**
185 * when client publish stream on edge. 201 * when client publish stream on edge.
@@ -323,6 +323,8 @@ int SrsForwarder::forward() @@ -323,6 +323,8 @@ int SrsForwarder::forward()
323 srs_error("recv server control message failed. ret=%d", ret); 323 srs_error("recv server control message failed. ret=%d", ret);
324 return ret; 324 return ret;
325 } 325 }
  326 +
  327 + srs_freep(msg);
326 } 328 }
327 329
328 // forward all messages. 330 // forward all messages.
@@ -335,8 +337,10 @@ int SrsForwarder::forward() @@ -335,8 +337,10 @@ int SrsForwarder::forward()
335 337
336 // pithy print 338 // pithy print
337 if (pithy_print.can_print()) { 339 if (pithy_print.can_print()) {
338 - srs_trace("-> time=%"PRId64", msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",  
339 - pithy_print.age(), count, client->get_send_bytes(), client->get_recv_bytes(), client->get_send_kbps(), client->get_recv_kbps()); 340 + srs_trace("-> "SRS_LOG_ID_FOWARDER
  341 + " time=%"PRId64", msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",
  342 + pithy_print.age(), count, client->get_send_bytes(), client->get_recv_bytes(),
  343 + client->get_send_kbps(), client->get_recv_kbps());
340 } 344 }
341 345
342 // ignore when no messages. 346 // ignore when no messages.
@@ -500,8 +500,10 @@ int SrsRtmpConn::playing(SrsSource* source) @@ -500,8 +500,10 @@ int SrsRtmpConn::playing(SrsSource* source)
500 500
501 // reportable 501 // reportable
502 if (pithy_print.can_print()) { 502 if (pithy_print.can_print()) {
503 - srs_trace("-> time=%"PRId64", duration=%"PRId64", cmr=%d, msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",  
504 - pithy_print.age(), duration, ctl_msg_ret, count, rtmp->get_send_bytes(), rtmp->get_recv_bytes(), rtmp->get_send_kbps(), rtmp->get_recv_kbps()); 503 + srs_trace("-> "SRS_LOG_ID_PLAY
  504 + " time=%"PRId64", duration=%"PRId64", cmr=%d, msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",
  505 + pithy_print.age(), duration, ctl_msg_ret, count, rtmp->get_send_bytes(), rtmp->get_recv_bytes(),
  506 + rtmp->get_send_kbps(), rtmp->get_recv_kbps());
505 } 507 }
506 508
507 if (count <= 0) { 509 if (count <= 0) {
@@ -581,8 +583,10 @@ int SrsRtmpConn::fmle_publish(SrsSource* source) @@ -581,8 +583,10 @@ int SrsRtmpConn::fmle_publish(SrsSource* source)
581 583
582 // reportable 584 // reportable
583 if (pithy_print.can_print()) { 585 if (pithy_print.can_print()) {
584 - srs_trace("<- time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",  
585 - pithy_print.age(), rtmp->get_send_bytes(), rtmp->get_recv_bytes(), rtmp->get_send_kbps(), rtmp->get_recv_kbps()); 586 + srs_trace("<- "SRS_LOG_ID_CLIENT_PUBLISH
  587 + " time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",
  588 + pithy_print.age(), rtmp->get_send_bytes(), rtmp->get_recv_bytes(),
  589 + rtmp->get_send_kbps(), rtmp->get_recv_kbps());
586 } 590 }
587 591
588 // process UnPublish event. 592 // process UnPublish event.
@@ -654,8 +658,10 @@ int SrsRtmpConn::flash_publish(SrsSource* source) @@ -654,8 +658,10 @@ int SrsRtmpConn::flash_publish(SrsSource* source)
654 658
655 // reportable 659 // reportable
656 if (pithy_print.can_print()) { 660 if (pithy_print.can_print()) {
657 - srs_trace("<- time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",  
658 - pithy_print.age(), rtmp->get_send_bytes(), rtmp->get_recv_bytes(), rtmp->get_send_kbps(), rtmp->get_recv_kbps()); 661 + srs_trace("<- "SRS_LOG_ID_WEB_PUBLISH
  662 + " time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",
  663 + pithy_print.age(), rtmp->get_send_bytes(), rtmp->get_recv_bytes(),
  664 + rtmp->get_send_kbps(), rtmp->get_recv_kbps());
659 } 665 }
660 666
661 // process UnPublish event. 667 // process UnPublish event.
@@ -687,7 +693,11 @@ int SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* ms @@ -687,7 +693,11 @@ int SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* ms
687 693
688 // for edge, directly proxy message to origin. 694 // for edge, directly proxy message to origin.
689 if (vhost_is_edge) { 695 if (vhost_is_edge) {
690 - return source->on_edge_proxy_publish(msg); 696 + if ((ret = source->on_edge_proxy_publish(msg)) != ERROR_SUCCESS) {
  697 + srs_error("edge publish proxy msg failed. ret=%d", ret);
  698 + return ret;
  699 + }
  700 + return ret;
691 } 701 }
692 702
693 // process audio packet 703 // process audio packet
@@ -524,6 +524,9 @@ int SrsSource::initialize() @@ -524,6 +524,9 @@ int SrsSource::initialize()
524 return ret; 524 return ret;
525 } 525 }
526 526
  527 + double queue_size = _srs_config->get_queue_length(_req->vhost);
  528 + publish_edge->set_queue_size(queue_size);
  529 +
527 return ret; 530 return ret;
528 } 531 }
529 532
@@ -597,6 +600,11 @@ int SrsSource::on_reload_vhost_queue_length(string vhost) @@ -597,6 +600,11 @@ int SrsSource::on_reload_vhost_queue_length(string vhost)
597 srs_trace("forwarders reload queue size success."); 600 srs_trace("forwarders reload queue size success.");
598 } 601 }
599 602
  603 + if (true) {
  604 + publish_edge->set_queue_size(queue_size);
  605 + srs_trace("publish_edge reload queue size success.");
  606 + }
  607 +
600 return ret; 608 return ret;
601 } 609 }
602 610
@@ -144,4 +144,17 @@ extern ISrsThreadContext* _srs_context; @@ -144,4 +144,17 @@ extern ISrsThreadContext* _srs_context;
144 #define srs_trace(msg, ...) (void)0 144 #define srs_trace(msg, ...) (void)0
145 #endif 145 #endif
146 146
  147 +// downloading speed-up, play to edge, ingest from origin
  148 +#define SRS_LOG_ID_EDGE_PLAY "EIG"
  149 +// uploading speed-up, publish to edge, foward to origin
  150 +#define SRS_LOG_ID_EDGE_PUBLISH "EFW"
  151 +// edge/origin forwarder.
  152 +#define SRS_LOG_ID_FOWARDER "FWR"
  153 +// play stream on edge/origin.
  154 +#define SRS_LOG_ID_PLAY "PLA"
  155 +// client publish to edge/origin
  156 +#define SRS_LOG_ID_CLIENT_PUBLISH "CPB"
  157 +// web/flash publish to edge/origin
  158 +#define SRS_LOG_ID_WEB_PUBLISH "WPB"
  159 +
147 #endif 160 #endif