winlin

Revert "for bug#194, open pipe for each connection."

This reverts commit ade81bb2.
@@ -81,7 +81,7 @@ int SrsPipe::active() @@ -81,7 +81,7 @@ int SrsPipe::active()
81 int ret = ERROR_SUCCESS; 81 int ret = ERROR_SUCCESS;
82 82
83 int v = 0; 83 int v = 0;
84 - if (st_write(write_stfd, &v, sizeof(int), ST_UTIME_NO_TIMEOUT) != sizeof(int)) { 84 + if (st_write(read_stfd, &v, sizeof(int), ST_UTIME_NO_TIMEOUT) != sizeof(int)) {
85 ret = ERROR_SYSTEM_WRITE_PIPE; 85 ret = ERROR_SYSTEM_WRITE_PIPE;
86 srs_error("write pipe failed. ret=%d", ret); 86 srs_error("write pipe failed. ret=%d", ret);
87 return ret; 87 return ret;
@@ -516,12 +516,7 @@ int SrsRtmpConn::playing(SrsSource* source) @@ -516,12 +516,7 @@ int SrsRtmpConn::playing(SrsSource* source)
516 SrsAutoFree(SrsConsumer, consumer); 516 SrsAutoFree(SrsConsumer, consumer);
517 srs_verbose("consumer created success."); 517 srs_verbose("consumer created success.");
518 518
519 - // TODO: FIXME: remove it.  
520 rtmp->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); 519 rtmp->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
521 - // disable the timeout.  
522 - // TODO: FIXME: maybe can use larger timeout?  
523 - rtmp->set_recv_timeout(ST_UTIME_NO_TIMEOUT);  
524 - rtmp->set_send_timeout(ST_UTIME_NO_TIMEOUT);  
525 520
526 SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_PLAY_USER); 521 SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_PLAY_USER);
527 522
@@ -530,30 +525,12 @@ int SrsRtmpConn::playing(SrsSource* source) @@ -530,30 +525,12 @@ int SrsRtmpConn::playing(SrsSource* source)
530 bool user_specified_duration_to_stop = (req->duration > 0); 525 bool user_specified_duration_to_stop = (req->duration > 0);
531 int64_t starttime = -1; 526 int64_t starttime = -1;
532 527
533 - pollfd pds[2];  
534 - // poll the client incoming fd.  
535 - pds[0].fd = st_netfd_fileno(stfd);  
536 - pds[0].events = POLLIN;  
537 - // poll the consumer queue pipe.  
538 - pds[1].fd = st_netfd_fileno(consumer->pipe_fd());  
539 - pds[1].events = POLLIN;  
540 -  
541 while (true) { 528 while (true) {
542 // collect elapse for pithy print. 529 // collect elapse for pithy print.
543 pithy_print.elapse(); 530 pithy_print.elapse();
544 -  
545 - pds[0].revents = 0;  
546 - pds[1].revents = 0;  
547 531
548 - // wait for packet incoming or data outgoing.  
549 - if (st_poll(pds, 2, ST_UTIME_NO_TIMEOUT) <= 0) {  
550 - srs_error("st_poll failed.");  
551 - break;  
552 - }  
553 -  
554 - // packet incoming, read from RTMP.  
555 // read from client. 532 // read from client.
556 - if (pds[0].revents & POLLIN) { 533 + if (true) {
557 SrsMessage* msg = NULL; 534 SrsMessage* msg = NULL;
558 ret = rtmp->recv_message(&msg); 535 ret = rtmp->recv_message(&msg);
559 srs_verbose("play loop recv message. ret=%d", ret); 536 srs_verbose("play loop recv message. ret=%d", ret);
@@ -576,52 +553,49 @@ int SrsRtmpConn::playing(SrsSource* source) @@ -576,52 +553,49 @@ int SrsRtmpConn::playing(SrsSource* source)
576 } 553 }
577 } 554 }
578 555
579 - // data outgoing, sendout packets.  
580 - if (pds[1].revents & POLLIN) {  
581 - // get messages from consumer.  
582 - int count = 0;  
583 - if ((ret = consumer->dump_packets(msgs.size, msgs.msgs, count)) != ERROR_SUCCESS) {  
584 - srs_error("get messages from consumer failed. ret=%d", ret);  
585 - return ret;  
586 - }  
587 -  
588 - // reportable  
589 - if (pithy_print.can_print()) {  
590 - kbps->sample();  
591 - srs_trace("-> "SRS_CONSTS_LOG_PLAY  
592 - " time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d",  
593 - pithy_print.age(), count,  
594 - kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),  
595 - kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m());  
596 - } 556 + // get messages from consumer.
  557 + int count = 0;
  558 + if ((ret = consumer->dump_packets(msgs.size, msgs.msgs, count)) != ERROR_SUCCESS) {
  559 + srs_error("get messages from consumer failed. ret=%d", ret);
  560 + return ret;
  561 + }
  562 +
  563 + // reportable
  564 + if (pithy_print.can_print()) {
  565 + kbps->sample();
  566 + srs_trace("-> "SRS_CONSTS_LOG_PLAY
  567 + " time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d",
  568 + pithy_print.age(), count,
  569 + kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
  570 + kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m());
  571 + }
  572 +
  573 + // sendout messages
  574 + // @remark, becareful, all msgs must be free explicitly,
  575 + // free by send_and_free_message or srs_freep.
  576 + for (int i = 0; i < count; i++) {
  577 + SrsSharedPtrMessage* msg = msgs.msgs[i];
597 578
598 - // sendout messages  
599 - // @remark, becareful, all msgs must be free explicitly,  
600 - // free by send_and_free_message or srs_freep.  
601 - for (int i = 0; i < count; i++) {  
602 - SrsSharedPtrMessage* msg = msgs.msgs[i];  
603 -  
604 - // the send_message will free the msg,  
605 - // so set the msgs[i] to NULL.  
606 - msgs.msgs[i] = NULL;  
607 -  
608 - // only when user specifies the duration,  
609 - // we start to collect the durations for each message.  
610 - if (user_specified_duration_to_stop) {  
611 - // foreach msg, collect the duration.  
612 - // @remark: never use msg when sent it, for the protocol sdk will free it.  
613 - if (starttime < 0 || starttime > msg->header.timestamp) {  
614 - starttime = msg->header.timestamp;  
615 - }  
616 - duration += msg->header.timestamp - starttime; 579 + // the send_message will free the msg,
  580 + // so set the msgs[i] to NULL.
  581 + msgs.msgs[i] = NULL;
  582 +
  583 + // only when user specifies the duration,
  584 + // we start to collect the durations for each message.
  585 + if (user_specified_duration_to_stop) {
  586 + // foreach msg, collect the duration.
  587 + // @remark: never use msg when sent it, for the protocol sdk will free it.
  588 + if (starttime < 0 || starttime > msg->header.timestamp) {
617 starttime = msg->header.timestamp; 589 starttime = msg->header.timestamp;
618 } 590 }
619 -  
620 - // no need to assert msg, for the rtmp will assert it.  
621 - if ((ret = rtmp->send_and_free_message(msg, res->stream_id)) != ERROR_SUCCESS) {  
622 - srs_error("send message to client failed. ret=%d", ret);  
623 - return ret;  
624 - } 591 + duration += msg->header.timestamp - starttime;
  592 + starttime = msg->header.timestamp;
  593 + }
  594 +
  595 + // no need to assert msg, for the rtmp will assert it.
  596 + if ((ret = rtmp->send_and_free_message(msg, res->stream_id)) != ERROR_SUCCESS) {
  597 + srs_error("send message to client failed. ret=%d", ret);
  598 + return ret;
625 } 599 }
626 } 600 }
627 601