winlin

fix bug#25: fmle republish bug, srs return special error code and wait for client to republish.

@@ -201,6 +201,16 @@ int SrsClient::service_cycle() @@ -201,6 +201,16 @@ int SrsClient::service_cycle()
201 return ret; 201 return ret;
202 } 202 }
203 203
  204 + // for republish, continue service
  205 + if (ret == ERROR_CONTROL_REPUBLISH) {
  206 + // set timeout to a larger value, wait for encoder to republish.
  207 + rtmp->set_send_timeout(SRS_REPUBLISH_RECV_TIMEOUT_US);
  208 + rtmp->set_recv_timeout(SRS_REPUBLISH_SEND_TIMEOUT_US);
  209 +
  210 + srs_trace("control message(unpublish) accept, retry stream service.");
  211 + continue;
  212 + }
  213 +
204 // for "some" system control error, 214 // for "some" system control error,
205 // logical accept and retry stream service. 215 // logical accept and retry stream service.
206 if (ret == ERROR_CONTROL_RTMP_CLOSE) { 216 if (ret == ERROR_CONTROL_RTMP_CLOSE) {
@@ -292,7 +302,7 @@ int SrsClient::stream_service_cycle() @@ -292,7 +302,7 @@ int SrsClient::stream_service_cycle()
292 return ret; 302 return ret;
293 } 303 }
294 srs_info("start to publish stream %s success", req->stream.c_str()); 304 srs_info("start to publish stream %s success", req->stream.c_str());
295 - ret = publish(source, true); 305 + ret = fmle_publish(source);
296 source->on_unpublish(); 306 source->on_unpublish();
297 on_unpublish(); 307 on_unpublish();
298 return ret; 308 return ret;
@@ -309,7 +319,7 @@ int SrsClient::stream_service_cycle() @@ -309,7 +319,7 @@ int SrsClient::stream_service_cycle()
309 return ret; 319 return ret;
310 } 320 }
311 srs_info("flash start to publish stream %s success", req->stream.c_str()); 321 srs_info("flash start to publish stream %s success", req->stream.c_str());
312 - ret = publish(source, false); 322 + ret = flash_publish(source);
313 source->on_unpublish(); 323 source->on_unpublish();
314 on_unpublish(); 324 on_unpublish();
315 return ret; 325 return ret;
@@ -448,24 +458,24 @@ int SrsClient::playing(SrsSource* source) @@ -448,24 +458,24 @@ int SrsClient::playing(SrsSource* source)
448 return ret; 458 return ret;
449 } 459 }
450 460
451 -int SrsClient::publish(SrsSource* source, bool is_fmle) 461 +int SrsClient::fmle_publish(SrsSource* source)
452 { 462 {
453 int ret = ERROR_SUCCESS; 463 int ret = ERROR_SUCCESS;
454 464
455 if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != ERROR_SUCCESS) { 465 if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != ERROR_SUCCESS) {
456 - srs_error("check publish_refer failed. ret=%d", ret); 466 + srs_error("fmle check publish_refer failed. ret=%d", ret);
457 return ret; 467 return ret;
458 } 468 }
459 - srs_verbose("check publish_refer success."); 469 + srs_verbose("fmle check publish_refer success.");
460 470
461 SrsPithyPrint pithy_print(SRS_STAGE_PUBLISH_USER); 471 SrsPithyPrint pithy_print(SRS_STAGE_PUBLISH_USER);
462 472
463 // notify the hls to prepare when publish start. 473 // notify the hls to prepare when publish start.
464 if ((ret = source->on_publish(req)) != ERROR_SUCCESS) { 474 if ((ret = source->on_publish(req)) != ERROR_SUCCESS) {
465 - srs_error("hls on_publish failed. ret=%d", ret); 475 + srs_error("fmle hls on_publish failed. ret=%d", ret);
466 return ret; 476 return ret;
467 } 477 }
468 - srs_verbose("hls on_publish success."); 478 + srs_verbose("fmle hls on_publish success.");
469 479
470 while (true) { 480 while (true) {
471 // switch to other st-threads. 481 // switch to other st-threads.
@@ -473,7 +483,7 @@ int SrsClient::publish(SrsSource* source, bool is_fmle) @@ -473,7 +483,7 @@ int SrsClient::publish(SrsSource* source, bool is_fmle)
473 483
474 SrsCommonMessage* msg = NULL; 484 SrsCommonMessage* msg = NULL;
475 if ((ret = rtmp->recv_message(&msg)) != ERROR_SUCCESS) { 485 if ((ret = rtmp->recv_message(&msg)) != ERROR_SUCCESS) {
476 - srs_error("recv identify client message failed. ret=%d", ret); 486 + srs_error("fmle recv identify client message failed. ret=%d", ret);
477 return ret; 487 return ret;
478 } 488 }
479 489
@@ -486,9 +496,30 @@ int SrsClient::publish(SrsSource* source, bool is_fmle) @@ -486,9 +496,30 @@ int SrsClient::publish(SrsSource* source, bool is_fmle)
486 srs_trace("<- time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d", 496 srs_trace("<- time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",
487 pithy_print.get_age(), rtmp->get_send_bytes(), rtmp->get_recv_bytes(), rtmp->get_send_kbps(), rtmp->get_recv_kbps()); 497 pithy_print.get_age(), rtmp->get_send_bytes(), rtmp->get_recv_bytes(), rtmp->get_send_kbps(), rtmp->get_recv_kbps());
488 } 498 }
  499 +
  500 + // process UnPublish event.
  501 + if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) {
  502 + if ((ret = msg->decode_packet(rtmp->get_protocol())) != ERROR_SUCCESS) {
  503 + srs_error("fmle decode unpublish message failed. ret=%d", ret);
  504 + return ret;
  505 + }
  506 +
  507 + SrsPacket* pkt = msg->get_packet();
  508 + if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) {
  509 + SrsFMLEStartPacket* unpublish = dynamic_cast<SrsFMLEStartPacket*>(pkt);
  510 + if ((ret = rtmp->fmle_unpublish(res->stream_id, unpublish->transaction_id)) != ERROR_SUCCESS) {
  511 + return ret;
  512 + }
  513 + return ERROR_CONTROL_REPUBLISH;
  514 + }
  515 +
  516 + srs_trace("fmle ignore AMF0/AMF3 command message.");
  517 + continue;
  518 + }
489 519
490 - if ((ret = process_publish_message(source, msg, is_fmle)) != ERROR_SUCCESS) {  
491 - srs_error("process publish message failed. ret=%d", ret); 520 + // video, audio, data message
  521 + if ((ret = process_publish_message(source, msg)) != ERROR_SUCCESS) {
  522 + srs_error("fmle process publish message failed. ret=%d", ret);
492 return ret; 523 return ret;
493 } 524 }
494 } 525 }
@@ -496,7 +527,69 @@ int SrsClient::publish(SrsSource* source, bool is_fmle) @@ -496,7 +527,69 @@ int SrsClient::publish(SrsSource* source, bool is_fmle)
496 return ret; 527 return ret;
497 } 528 }
498 529
499 -int SrsClient::process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle) 530 +int SrsClient::flash_publish(SrsSource* source)
  531 +{
  532 + int ret = ERROR_SUCCESS;
  533 +
  534 + if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != ERROR_SUCCESS) {
  535 + srs_error("flash check publish_refer failed. ret=%d", ret);
  536 + return ret;
  537 + }
  538 + srs_verbose("flash check publish_refer success.");
  539 +
  540 + SrsPithyPrint pithy_print(SRS_STAGE_PUBLISH_USER);
  541 +
  542 + // notify the hls to prepare when publish start.
  543 + if ((ret = source->on_publish(req)) != ERROR_SUCCESS) {
  544 + srs_error("flash hls on_publish failed. ret=%d", ret);
  545 + return ret;
  546 + }
  547 + srs_verbose("flash hls on_publish success.");
  548 +
  549 + while (true) {
  550 + // switch to other st-threads.
  551 + st_usleep(0);
  552 +
  553 + SrsCommonMessage* msg = NULL;
  554 + if ((ret = rtmp->recv_message(&msg)) != ERROR_SUCCESS) {
  555 + srs_error("flash recv identify client message failed. ret=%d", ret);
  556 + return ret;
  557 + }
  558 +
  559 + SrsAutoFree(SrsCommonMessage, msg, false);
  560 +
  561 + pithy_print.set_age(msg->header.timestamp);
  562 +
  563 + // reportable
  564 + if (pithy_print.can_print()) {
  565 + srs_trace("<- time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",
  566 + pithy_print.get_age(), rtmp->get_send_bytes(), rtmp->get_recv_bytes(), rtmp->get_send_kbps(), rtmp->get_recv_kbps());
  567 + }
  568 +
  569 + // process UnPublish event.
  570 + if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) {
  571 + if ((ret = msg->decode_packet(rtmp->get_protocol())) != ERROR_SUCCESS) {
  572 + srs_error("flash decode unpublish message failed. ret=%d", ret);
  573 + return ret;
  574 + }
  575 +
  576 + // flash unpublish.
  577 + // TODO: maybe need to support republish.
  578 + srs_trace("flash flash publish finished.");
  579 + return ERROR_CONTROL_REPUBLISH;
  580 + }
  581 +
  582 + // video, audio, data message
  583 + if ((ret = process_publish_message(source, msg)) != ERROR_SUCCESS) {
  584 + srs_error("flash process publish message failed. ret=%d", ret);
  585 + return ret;
  586 + }
  587 + }
  588 +
  589 + return ret;
  590 +}
  591 +
  592 +int SrsClient::process_publish_message(SrsSource* source, SrsCommonMessage* msg)
500 { 593 {
501 int ret = ERROR_SUCCESS; 594 int ret = ERROR_SUCCESS;
502 595
@@ -537,29 +630,6 @@ int SrsClient::process_publish_message(SrsSource* source, SrsCommonMessage* msg, @@ -537,29 +630,6 @@ int SrsClient::process_publish_message(SrsSource* source, SrsCommonMessage* msg,
537 return ret; 630 return ret;
538 } 631 }
539 632
540 - // process UnPublish event.  
541 - if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) {  
542 - if ((ret = msg->decode_packet(rtmp->get_protocol())) != ERROR_SUCCESS) {  
543 - srs_error("decode unpublish message failed. ret=%d", ret);  
544 - return ret;  
545 - }  
546 -  
547 - // flash unpublish.  
548 - if (!is_fmle) {  
549 - srs_trace("flash publish finished.");  
550 - return ret;  
551 - }  
552 -  
553 - SrsPacket* pkt = msg->get_packet();  
554 - if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) {  
555 - SrsFMLEStartPacket* unpublish = dynamic_cast<SrsFMLEStartPacket*>(pkt);  
556 - return rtmp->fmle_unpublish(res->stream_id, unpublish->transaction_id);  
557 - }  
558 -  
559 - srs_trace("ignore AMF0/AMF3 command message.");  
560 - return ret;  
561 - }  
562 -  
563 return ret; 633 return ret;
564 } 634 }
565 635
@@ -78,8 +78,9 @@ private: @@ -78,8 +78,9 @@ private:
78 virtual int stream_service_cycle(); 78 virtual int stream_service_cycle();
79 virtual int check_vhost(); 79 virtual int check_vhost();
80 virtual int playing(SrsSource* source); 80 virtual int playing(SrsSource* source);
81 - virtual int publish(SrsSource* source, bool is_fmle);  
82 - virtual int process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle); 81 + virtual int fmle_publish(SrsSource* source);
  82 + virtual int flash_publish(SrsSource* source);
  83 + virtual int process_publish_message(SrsSource* source, SrsCommonMessage* msg);
83 virtual int get_peer_ip(); 84 virtual int get_peer_ip();
84 virtual int process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg); 85 virtual int process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg);
85 private: 86 private:
@@ -25,6 +25,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -25,6 +25,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
25 25
26 bool srs_is_system_control_error(int error_code) 26 bool srs_is_system_control_error(int error_code)
27 { 27 {
28 - return error_code == ERROR_CONTROL_RTMP_CLOSE; 28 + return error_code == ERROR_CONTROL_RTMP_CLOSE
  29 + || error_code == ERROR_CONTROL_REPUBLISH;
29 } 30 }
30 31
@@ -155,6 +155,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -155,6 +155,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
155 // not an error, but special control logic. 155 // not an error, but special control logic.
156 // sys ctl: rtmp close stream, support replay. 156 // sys ctl: rtmp close stream, support replay.
157 #define ERROR_CONTROL_RTMP_CLOSE 900 157 #define ERROR_CONTROL_RTMP_CLOSE 900
  158 +// FMLE stop publish and republish.
  159 +#define ERROR_CONTROL_REPUBLISH 901
158 160
159 /** 161 /**
160 * whether the error code is an system control error. 162 * whether the error code is an system control error.
@@ -59,11 +59,15 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -59,11 +59,15 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
59 // the timeout to wait client data, when client paused 59 // the timeout to wait client data, when client paused
60 // if timeout, close the connection. 60 // if timeout, close the connection.
61 #define SRS_PAUSED_SEND_TIMEOUT_US (int64_t)(30*60*1000*1000LL) 61 #define SRS_PAUSED_SEND_TIMEOUT_US (int64_t)(30*60*1000*1000LL)
62 -  
63 -// the timeout to send data to client, when client paused  
64 // if timeout, close the connection. 62 // if timeout, close the connection.
65 #define SRS_PAUSED_RECV_TIMEOUT_US (int64_t)(30*60*1000*1000LL) 63 #define SRS_PAUSED_RECV_TIMEOUT_US (int64_t)(30*60*1000*1000LL)
66 64
  65 +// the timeout to wait encoder to republish
  66 +// if timeout, close the connection.
  67 +#define SRS_REPUBLISH_SEND_TIMEOUT_US (int64_t)(3*60*1000*1000LL)
  68 +// if timeout, close the connection.
  69 +#define SRS_REPUBLISH_RECV_TIMEOUT_US (int64_t)(3*60*1000*1000LL)
  70 +
67 // when stream is busy, for example, streaming is already 71 // when stream is busy, for example, streaming is already
68 // publishing, when a new client to request to publish, 72 // publishing, when a new client to request to publish,
69 // sleep a while and close the connection. 73 // sleep a while and close the connection.