winlin

Merge branch 'srs.master'

@@ -81,7 +81,7 @@ int SrsPipe::active() @@ -81,7 +81,7 @@ int SrsPipe::active()
81 int ret = ERROR_SUCCESS; 81 int ret = ERROR_SUCCESS;
82 82
83 int v = 0; 83 int v = 0;
84 - if (st_write(read_stfd, &v, sizeof(int), ST_UTIME_NO_TIMEOUT) != sizeof(int)) { 84 + if (st_write(write_stfd, &v, sizeof(int), ST_UTIME_NO_TIMEOUT) != sizeof(int)) {
85 ret = ERROR_SYSTEM_WRITE_PIPE; 85 ret = ERROR_SYSTEM_WRITE_PIPE;
86 srs_error("write pipe failed. ret=%d", ret); 86 srs_error("write pipe failed. ret=%d", ret);
87 return ret; 87 return ret;
@@ -516,7 +516,12 @@ int SrsRtmpConn::playing(SrsSource* source) @@ -516,7 +516,12 @@ int SrsRtmpConn::playing(SrsSource* source)
516 SrsAutoFree(SrsConsumer, consumer); 516 SrsAutoFree(SrsConsumer, consumer);
517 srs_verbose("consumer created success."); 517 srs_verbose("consumer created success.");
518 518
  519 + // TODO: FIXME: remove it.
519 rtmp->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); 520 rtmp->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
  521 + // disable the timeout.
  522 + // TODO: FIXME: maybe can use larger timeout?
  523 + rtmp->set_recv_timeout(ST_UTIME_NO_TIMEOUT);
  524 + rtmp->set_send_timeout(ST_UTIME_NO_TIMEOUT);
520 525
521 SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_PLAY_USER); 526 SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_PLAY_USER);
522 527
@@ -525,12 +530,30 @@ int SrsRtmpConn::playing(SrsSource* source) @@ -525,12 +530,30 @@ int SrsRtmpConn::playing(SrsSource* source)
525 bool user_specified_duration_to_stop = (req->duration > 0); 530 bool user_specified_duration_to_stop = (req->duration > 0);
526 int64_t starttime = -1; 531 int64_t starttime = -1;
527 532
  533 + pollfd pds[2];
  534 + // poll the client incoming fd.
  535 + pds[0].fd = st_netfd_fileno(stfd);
  536 + pds[0].events = POLLIN;
  537 + // poll the consumer queue pipe.
  538 + pds[1].fd = st_netfd_fileno(consumer->pipe_fd());
  539 + pds[1].events = POLLIN;
  540 +
528 while (true) { 541 while (true) {
529 // collect elapse for pithy print. 542 // collect elapse for pithy print.
530 pithy_print.elapse(); 543 pithy_print.elapse();
  544 +
  545 + pds[0].revents = 0;
  546 + pds[1].revents = 0;
531 547
  548 + // wait for packet incoming or data outgoing.
  549 + if (st_poll(pds, 2, ST_UTIME_NO_TIMEOUT) <= 0) {
  550 + srs_error("st_poll failed.");
  551 + break;
  552 + }
  553 +
  554 + // packet incoming, read from RTMP.
532 // read from client. 555 // read from client.
533 - if (true) { 556 + if (pds[0].revents & POLLIN) {
534 SrsMessage* msg = NULL; 557 SrsMessage* msg = NULL;
535 ret = rtmp->recv_message(&msg); 558 ret = rtmp->recv_message(&msg);
536 srs_verbose("play loop recv message. ret=%d", ret); 559 srs_verbose("play loop recv message. ret=%d", ret);
@@ -553,49 +576,52 @@ int SrsRtmpConn::playing(SrsSource* source) @@ -553,49 +576,52 @@ int SrsRtmpConn::playing(SrsSource* source)
553 } 576 }
554 } 577 }
555 578
556 - // get messages from consumer.  
557 - int count = 0;  
558 - if ((ret = consumer->dump_packets(msgs.size, msgs.msgs, count)) != ERROR_SUCCESS) {  
559 - srs_error("get messages from consumer failed. ret=%d", ret);  
560 - return ret;  
561 - }  
562 -  
563 - // reportable  
564 - if (pithy_print.can_print()) {  
565 - kbps->sample();  
566 - srs_trace("-> "SRS_CONSTS_LOG_PLAY  
567 - " time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d",  
568 - pithy_print.age(), count,  
569 - kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),  
570 - kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m());  
571 - }  
572 -  
573 - // sendout messages  
574 - // @remark, becareful, all msgs must be free explicitly,  
575 - // free by send_and_free_message or srs_freep.  
576 - for (int i = 0; i < count; i++) {  
577 - SrsSharedPtrMessage* msg = msgs.msgs[i];  
578 -  
579 - // the send_message will free the msg,  
580 - // so set the msgs[i] to NULL.  
581 - msgs.msgs[i] = NULL; 579 + // data outgoing, sendout packets.
  580 + if (pds[1].revents & POLLIN) {
  581 + // get messages from consumer.
  582 + int count = 0;
  583 + if ((ret = consumer->dump_packets(msgs.size, msgs.msgs, count)) != ERROR_SUCCESS) {
  584 + srs_error("get messages from consumer failed. ret=%d", ret);
  585 + return ret;
  586 + }
  587 +
  588 + // reportable
  589 + if (pithy_print.can_print()) {
  590 + kbps->sample();
  591 + srs_trace("-> "SRS_CONSTS_LOG_PLAY
  592 + " time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d",
  593 + pithy_print.age(), count,
  594 + kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
  595 + kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m());
  596 + }
582 597
583 - // only when user specifies the duration,  
584 - // we start to collect the durations for each message.  
585 - if (user_specified_duration_to_stop) {  
586 - // foreach msg, collect the duration.  
587 - // @remark: never use msg when sent it, for the protocol sdk will free it.  
588 - if (starttime < 0 || starttime > msg->header.timestamp) { 598 + // sendout messages
  599 + // @remark, becareful, all msgs must be free explicitly,
  600 + // free by send_and_free_message or srs_freep.
  601 + for (int i = 0; i < count; i++) {
  602 + SrsSharedPtrMessage* msg = msgs.msgs[i];
  603 +
  604 + // the send_message will free the msg,
  605 + // so set the msgs[i] to NULL.
  606 + msgs.msgs[i] = NULL;
  607 +
  608 + // only when user specifies the duration,
  609 + // we start to collect the durations for each message.
  610 + if (user_specified_duration_to_stop) {
  611 + // foreach msg, collect the duration.
  612 + // @remark: never use msg when sent it, for the protocol sdk will free it.
  613 + if (starttime < 0 || starttime > msg->header.timestamp) {
  614 + starttime = msg->header.timestamp;
  615 + }
  616 + duration += msg->header.timestamp - starttime;
589 starttime = msg->header.timestamp; 617 starttime = msg->header.timestamp;
590 } 618 }
591 - duration += msg->header.timestamp - starttime;  
592 - starttime = msg->header.timestamp;  
593 - }  
594 -  
595 - // no need to assert msg, for the rtmp will assert it.  
596 - if ((ret = rtmp->send_and_free_message(msg, res->stream_id)) != ERROR_SUCCESS) {  
597 - srs_error("send message to client failed. ret=%d", ret);  
598 - return ret; 619 +
  620 + // no need to assert msg, for the rtmp will assert it.
  621 + if ((ret = rtmp->send_and_free_message(msg, res->stream_id)) != ERROR_SUCCESS) {
  622 + srs_error("send message to client failed. ret=%d", ret);
  623 + return ret;
  624 + }
599 } 625 }
600 } 626 }
601 627