winlin

release v0.1, support FMLE/FFMPEG publish, vp6 codec live streaming

@@ -38,6 +38,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -38,6 +38,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
38 #ifndef __STDC_FORMAT_MACROS 38 #ifndef __STDC_FORMAT_MACROS
39 #define __STDC_FORMAT_MACROS 39 #define __STDC_FORMAT_MACROS
40 #endif 40 #endif
  41 +#include <inttypes.h>
41 42
42 #include <assert.h> 43 #include <assert.h>
43 #define srs_assert(expression) assert(expression) 44 #define srs_assert(expression) assert(expression)
@@ -31,9 +31,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -31,9 +31,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
31 #include <srs_core_protocol.hpp> 31 #include <srs_core_protocol.hpp>
32 #include <srs_core_auto_free.hpp> 32 #include <srs_core_auto_free.hpp>
33 #include <srs_core_source.hpp> 33 #include <srs_core_source.hpp>
  34 +#include <srs_core_server.hpp>
34 35
35 -// wait for client message.  
36 -#define SRS_PULSE_TIME_MS 100 36 +#define SRS_PULSE_TIMEOUT_MS 100
  37 +#define SRS_SEND_TIMEOUT_MS 5000
37 38
38 SrsClient::SrsClient(SrsServer* srs_server, st_netfd_t client_stfd) 39 SrsClient::SrsClient(SrsServer* srs_server, st_netfd_t client_stfd)
39 : SrsConnection(srs_server, client_stfd) 40 : SrsConnection(srs_server, client_stfd)
@@ -61,6 +62,9 @@ int SrsClient::do_cycle() @@ -61,6 +62,9 @@ int SrsClient::do_cycle()
61 return ret; 62 return ret;
62 } 63 }
63 srs_verbose("get peer ip success. ip=%s", ip); 64 srs_verbose("get peer ip success. ip=%s", ip);
  65 +
  66 + rtmp->set_recv_timeout(SRS_SEND_TIMEOUT_MS);
  67 + rtmp->set_send_timeout(SRS_SEND_TIMEOUT_MS);
64 68
65 if ((ret = rtmp->handshake()) != ERROR_SUCCESS) { 69 if ((ret = rtmp->handshake()) != ERROR_SUCCESS) {
66 srs_error("rtmp handshake failed. ret=%d", ret); 70 srs_error("rtmp handshake failed. ret=%d", ret);
@@ -167,27 +171,33 @@ int SrsClient::streaming_play(SrsSource* source) @@ -167,27 +171,33 @@ int SrsClient::streaming_play(SrsSource* source)
167 SrsAutoFree(SrsConsumer, consumer, false); 171 SrsAutoFree(SrsConsumer, consumer, false);
168 srs_verbose("consumer created success."); 172 srs_verbose("consumer created success.");
169 173
  174 + rtmp->set_recv_timeout(SRS_PULSE_TIMEOUT_MS);
  175 +
  176 + int64_t report_time = 0;
  177 + int64_t reported_time = 0;
  178 +
170 while (true) { 179 while (true) {
  180 + report_time += SRS_PULSE_TIMEOUT_MS;
  181 +
171 // switch to other st-threads. 182 // switch to other st-threads.
172 st_usleep(0); 183 st_usleep(0);
173 -  
174 - bool ready = false;  
175 - if ((ret = rtmp->can_read(SRS_PULSE_TIME_MS, ready)) != ERROR_SUCCESS) {  
176 - srs_error("wait client control message failed. ret=%d", ret);  
177 - return ret;  
178 - }  
179 - srs_verbose("client pulse %dms, ready=%d", SRS_PULSE_TIME_MS, ready);  
180 184
181 // read from client. 185 // read from client.
182 - if (ready) { 186 + int ctl_msg_ret = ERROR_SUCCESS;
  187 + if (true) {
183 SrsCommonMessage* msg = NULL; 188 SrsCommonMessage* msg = NULL;
184 - if ((ret = rtmp->recv_message(&msg)) != ERROR_SUCCESS) { 189 + ctl_msg_ret = ret = rtmp->recv_message(&msg);
  190 +
  191 + srs_verbose("play loop recv message. ret=%d", ret);
  192 + if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) {
185 srs_error("recv client control message failed. ret=%d", ret); 193 srs_error("recv client control message failed. ret=%d", ret);
186 return ret; 194 return ret;
187 } 195 }
188 -  
189 - SrsAutoFree(SrsCommonMessage, msg, false);  
190 - // TODO: process it. 196 + if (ret == ERROR_SUCCESS && !msg) {
  197 + srs_info("play loop got a message.");
  198 + SrsAutoFree(SrsCommonMessage, msg, false);
  199 + // TODO: process it.
  200 + }
191 } 201 }
192 202
193 // get messages from consumer. 203 // get messages from consumer.
@@ -197,6 +207,11 @@ int SrsClient::streaming_play(SrsSource* source) @@ -197,6 +207,11 @@ int SrsClient::streaming_play(SrsSource* source)
197 srs_error("get messages from consumer failed. ret=%d", ret); 207 srs_error("get messages from consumer failed. ret=%d", ret);
198 return ret; 208 return ret;
199 } 209 }
  210 +
  211 + // reportable
  212 + if (server->can_report(reported_time, report_time)) {
  213 + srs_trace("play report, time=%"PRId64", ctl_msg_ret=%d, msgs=%d", report_time, ctl_msg_ret, count);
  214 + }
200 215
201 if (count <= 0) { 216 if (count <= 0) {
202 srs_verbose("no packets in queue."); 217 srs_verbose("no packets in queue.");
@@ -49,6 +49,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -49,6 +49,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
49 #define ERROR_SOCKET_READ_FULLY 208 49 #define ERROR_SOCKET_READ_FULLY 208
50 #define ERROR_SOCKET_WRITE 209 50 #define ERROR_SOCKET_WRITE 209
51 #define ERROR_SOCKET_WAIT 210 51 #define ERROR_SOCKET_WAIT 210
  52 +#define ERROR_SOCKET_TIMEOUT 211
52 53
53 #define ERROR_RTMP_PLAIN_REQUIRED 300 54 #define ERROR_RTMP_PLAIN_REQUIRED 300
54 #define ERROR_RTMP_CHUNK_START 301 55 #define ERROR_RTMP_CHUNK_START 301
@@ -78,7 +78,7 @@ extern ILogContext* log_context; @@ -78,7 +78,7 @@ extern ILogContext* log_context;
78 #undef srs_verbose 78 #undef srs_verbose
79 #define srs_verbose(msg, ...) (void)0 79 #define srs_verbose(msg, ...) (void)0
80 #endif 80 #endif
81 -#if 1 81 +#if 0
82 #undef srs_info 82 #undef srs_info
83 #define srs_info(msg, ...) (void)0 83 #define srs_info(msg, ...) (void)0
84 #endif 84 #endif
@@ -274,9 +274,14 @@ SrsProtocol::~SrsProtocol() @@ -274,9 +274,14 @@ SrsProtocol::~SrsProtocol()
274 srs_freep(skt); 274 srs_freep(skt);
275 } 275 }
276 276
277 -int SrsProtocol::can_read(int timeout_ms, bool& ready) 277 +void SrsProtocol::set_recv_timeout(int timeout_ms)
278 { 278 {
279 - return skt->can_read(timeout_ms, ready); 279 + return skt->set_recv_timeout(timeout_ms);
  280 +}
  281 +
  282 +void SrsProtocol::set_send_timeout(int timeout_ms)
  283 +{
  284 + return skt->set_send_timeout(timeout_ms);
280 } 285 }
281 286
282 int SrsProtocol::recv_message(SrsCommonMessage** pmsg) 287 int SrsProtocol::recv_message(SrsCommonMessage** pmsg)
@@ -289,7 +294,9 @@ int SrsProtocol::recv_message(SrsCommonMessage** pmsg) @@ -289,7 +294,9 @@ int SrsProtocol::recv_message(SrsCommonMessage** pmsg)
289 SrsCommonMessage* msg = NULL; 294 SrsCommonMessage* msg = NULL;
290 295
291 if ((ret = recv_interlaced_message(&msg)) != ERROR_SUCCESS) { 296 if ((ret = recv_interlaced_message(&msg)) != ERROR_SUCCESS) {
292 - srs_error("recv interlaced message failed. ret=%d", ret); 297 + if (ret != ERROR_SOCKET_TIMEOUT) {
  298 + srs_error("recv interlaced message failed. ret=%d", ret);
  299 + }
293 return ret; 300 return ret;
294 } 301 }
295 srs_verbose("entire msg received"); 302 srs_verbose("entire msg received");
@@ -518,7 +525,9 @@ int SrsProtocol::recv_interlaced_message(SrsCommonMessage** pmsg) @@ -518,7 +525,9 @@ int SrsProtocol::recv_interlaced_message(SrsCommonMessage** pmsg)
518 int cid = 0; 525 int cid = 0;
519 int bh_size = 0; 526 int bh_size = 0;
520 if ((ret = read_basic_header(fmt, cid, bh_size)) != ERROR_SUCCESS) { 527 if ((ret = read_basic_header(fmt, cid, bh_size)) != ERROR_SUCCESS) {
521 - srs_error("read basic header failed. ret=%d", ret); 528 + if (ret != ERROR_SOCKET_TIMEOUT) {
  529 + srs_error("read basic header failed. ret=%d", ret);
  530 + }
522 return ret; 531 return ret;
523 } 532 }
524 srs_info("read basic header success. fmt=%d, cid=%d, bh_size=%d", fmt, cid, bh_size); 533 srs_info("read basic header success. fmt=%d, cid=%d, bh_size=%d", fmt, cid, bh_size);
@@ -539,7 +548,9 @@ int SrsProtocol::recv_interlaced_message(SrsCommonMessage** pmsg) @@ -539,7 +548,9 @@ int SrsProtocol::recv_interlaced_message(SrsCommonMessage** pmsg)
539 // chunk stream message header 548 // chunk stream message header
540 int mh_size = 0; 549 int mh_size = 0;
541 if ((ret = read_message_header(chunk, fmt, bh_size, mh_size)) != ERROR_SUCCESS) { 550 if ((ret = read_message_header(chunk, fmt, bh_size, mh_size)) != ERROR_SUCCESS) {
542 - srs_error("read message header failed. ret=%d", ret); 551 + if (ret != ERROR_SOCKET_TIMEOUT) {
  552 + srs_error("read message header failed. ret=%d", ret);
  553 + }
543 return ret; 554 return ret;
544 } 555 }
545 srs_info("read message header success. " 556 srs_info("read message header success. "
@@ -551,7 +562,9 @@ int SrsProtocol::recv_interlaced_message(SrsCommonMessage** pmsg) @@ -551,7 +562,9 @@ int SrsProtocol::recv_interlaced_message(SrsCommonMessage** pmsg)
551 SrsCommonMessage* msg = NULL; 562 SrsCommonMessage* msg = NULL;
552 int payload_size = 0; 563 int payload_size = 0;
553 if ((ret = read_message_payload(chunk, bh_size, mh_size, payload_size, &msg)) != ERROR_SUCCESS) { 564 if ((ret = read_message_payload(chunk, bh_size, mh_size, payload_size, &msg)) != ERROR_SUCCESS) {
554 - srs_error("read message payload failed. ret=%d", ret); 565 + if (ret != ERROR_SOCKET_TIMEOUT) {
  566 + srs_error("read message payload failed. ret=%d", ret);
  567 + }
555 return ret; 568 return ret;
556 } 569 }
557 570
@@ -577,7 +590,9 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& bh_size) @@ -577,7 +590,9 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& bh_size)
577 590
578 int required_size = 1; 591 int required_size = 1;
579 if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) { 592 if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) {
580 - srs_error("read 1bytes basic header failed. required_size=%d, ret=%d", required_size, ret); 593 + if (ret != ERROR_SOCKET_TIMEOUT) {
  594 + srs_error("read 1bytes basic header failed. required_size=%d, ret=%d", required_size, ret);
  595 + }
581 return ret; 596 return ret;
582 } 597 }
583 598
@@ -595,7 +610,9 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& bh_size) @@ -595,7 +610,9 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& bh_size)
595 if (cid == 0) { 610 if (cid == 0) {
596 required_size = 2; 611 required_size = 2;
597 if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) { 612 if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) {
598 - srs_error("read 2bytes basic header failed. required_size=%d, ret=%d", required_size, ret); 613 + if (ret != ERROR_SOCKET_TIMEOUT) {
  614 + srs_error("read 2bytes basic header failed. required_size=%d, ret=%d", required_size, ret);
  615 + }
599 return ret; 616 return ret;
600 } 617 }
601 618
@@ -606,7 +623,9 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& bh_size) @@ -606,7 +623,9 @@ int SrsProtocol::read_basic_header(char& fmt, int& cid, int& bh_size)
606 } else if (cid == 1) { 623 } else if (cid == 1) {
607 required_size = 3; 624 required_size = 3;
608 if ((ret = buffer->ensure_buffer_bytes(skt, 3)) != ERROR_SUCCESS) { 625 if ((ret = buffer->ensure_buffer_bytes(skt, 3)) != ERROR_SUCCESS) {
609 - srs_error("read 3bytes basic header failed. required_size=%d, ret=%d", required_size, ret); 626 + if (ret != ERROR_SOCKET_TIMEOUT) {
  627 + srs_error("read 3bytes basic header failed. required_size=%d, ret=%d", required_size, ret);
  628 + }
610 return ret; 629 return ret;
611 } 630 }
612 631
@@ -682,7 +701,9 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz @@ -682,7 +701,9 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz
682 701
683 int required_size = bh_size + mh_size; 702 int required_size = bh_size + mh_size;
684 if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) { 703 if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) {
685 - srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret); 704 + if (ret != ERROR_SOCKET_TIMEOUT) {
  705 + srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret);
  706 + }
686 return ret; 707 return ret;
687 } 708 }
688 char* p = buffer->bytes() + bh_size; 709 char* p = buffer->bytes() + bh_size;
@@ -768,7 +789,9 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz @@ -768,7 +789,9 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz
768 required_size = bh_size + mh_size; 789 required_size = bh_size + mh_size;
769 srs_verbose("read header ext time. fmt=%d, ext_time=%d, mh_size=%d", fmt, chunk->extended_timestamp, mh_size); 790 srs_verbose("read header ext time. fmt=%d, ext_time=%d, mh_size=%d", fmt, chunk->extended_timestamp, mh_size);
770 if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) { 791 if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) {
771 - srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret); 792 + if (ret != ERROR_SOCKET_TIMEOUT) {
  793 + srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret);
  794 + }
772 return ret; 795 return ret;
773 } 796 }
774 797
@@ -833,7 +856,9 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh @@ -833,7 +856,9 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh
833 // read payload to buffer 856 // read payload to buffer
834 int required_size = bh_size + mh_size + payload_size; 857 int required_size = bh_size + mh_size + payload_size;
835 if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) { 858 if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) {
836 - srs_error("read payload failed. required_size=%d, ret=%d", required_size, ret); 859 + if (ret != ERROR_SOCKET_TIMEOUT) {
  860 + srs_error("read payload failed. required_size=%d, ret=%d", required_size, ret);
  861 + }
837 return ret; 862 return ret;
838 } 863 }
839 memcpy(chunk->msg->payload + chunk->msg->size, buffer->bytes() + bh_size + mh_size, payload_size); 864 memcpy(chunk->msg->payload + chunk->msg->size, buffer->bytes() + bh_size + mh_size, payload_size);
@@ -95,9 +95,11 @@ public: @@ -95,9 +95,11 @@ public:
95 virtual ~SrsProtocol(); 95 virtual ~SrsProtocol();
96 public: 96 public:
97 /** 97 /**
98 - * whether the peer can read. 98 + * set the timeout in ms.
  99 + * if timeout, recv/send message return ERROR_SOCKET_TIMEOUT.
99 */ 100 */
100 - virtual int can_read(int timeout_ms, bool& ready); 101 + virtual void set_recv_timeout(int timeout_ms);
  102 + virtual void set_send_timeout(int timeout_ms);
101 /** 103 /**
102 * recv a message with raw/undecoded payload from peer. 104 * recv a message with raw/undecoded payload from peer.
103 * the payload is not decoded, use srs_rtmp_expect_message<T> if requires 105 * the payload is not decoded, use srs_rtmp_expect_message<T> if requires
@@ -146,14 +146,19 @@ SrsRtmp::~SrsRtmp() @@ -146,14 +146,19 @@ SrsRtmp::~SrsRtmp()
146 srs_freep(protocol); 146 srs_freep(protocol);
147 } 147 }
148 148
149 -int SrsRtmp::recv_message(SrsCommonMessage** pmsg) 149 +void SrsRtmp::set_recv_timeout(int timeout_ms)
150 { 150 {
151 - return protocol->recv_message(pmsg); 151 + return protocol->set_recv_timeout(timeout_ms);
152 } 152 }
153 153
154 -int SrsRtmp::can_read(int timeout_ms, bool& ready) 154 +void SrsRtmp::set_send_timeout(int timeout_ms)
155 { 155 {
156 - return protocol->can_read(timeout_ms, ready); 156 + return protocol->set_send_timeout(timeout_ms);
  157 +}
  158 +
  159 +int SrsRtmp::recv_message(SrsCommonMessage** pmsg)
  160 +{
  161 + return protocol->recv_message(pmsg);
157 } 162 }
158 163
159 int SrsRtmp::send_message(ISrsMessage* msg) 164 int SrsRtmp::send_message(ISrsMessage* msg)
@@ -101,8 +101,9 @@ public: @@ -101,8 +101,9 @@ public:
101 SrsRtmp(st_netfd_t client_stfd); 101 SrsRtmp(st_netfd_t client_stfd);
102 virtual ~SrsRtmp(); 102 virtual ~SrsRtmp();
103 public: 103 public:
  104 + virtual void set_recv_timeout(int timeout_ms);
  105 + virtual void set_send_timeout(int timeout_ms);
104 virtual int recv_message(SrsCommonMessage** pmsg); 106 virtual int recv_message(SrsCommonMessage** pmsg);
105 - virtual int can_read(int timeout_ms, bool& ready);  
106 virtual int send_message(ISrsMessage* msg); 107 virtual int send_message(ISrsMessage* msg);
107 public: 108 public:
108 virtual int handshake(); 109 virtual int handshake();
@@ -37,8 +37,13 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -37,8 +37,13 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
37 37
38 #define SERVER_LISTEN_BACKLOG 10 38 #define SERVER_LISTEN_BACKLOG 10
39 39
  40 +// global value, ensure the report interval,
  41 +// it will be changed when clients increase.
  42 +#define SRS_CONST_REPORT_INTERVAL_MS 3000
  43 +
40 SrsServer::SrsServer() 44 SrsServer::SrsServer()
41 { 45 {
  46 + srs_report_interval_ms = SRS_CONST_REPORT_INTERVAL_MS;
42 } 47 }
43 48
44 SrsServer::~SrsServer() 49 SrsServer::~SrsServer()
@@ -155,6 +160,20 @@ void SrsServer::remove(SrsConnection* conn) @@ -155,6 +160,20 @@ void SrsServer::remove(SrsConnection* conn)
155 srs_freep(conn); 160 srs_freep(conn);
156 } 161 }
157 162
  163 +bool SrsServer::can_report(int64_t& reported, int64_t time)
  164 +{
  165 + if (srs_report_interval_ms <= 0) {
  166 + return false;
  167 + }
  168 +
  169 + if (time - reported < srs_report_interval_ms) {
  170 + return false;
  171 + }
  172 +
  173 + reported = time;
  174 + return true;
  175 +}
  176 +
158 int SrsServer::accept_client(st_netfd_t client_stfd) 177 int SrsServer::accept_client(st_netfd_t client_stfd)
159 { 178 {
160 int ret = ERROR_SUCCESS; 179 int ret = ERROR_SUCCESS;
@@ -164,6 +183,9 @@ int SrsServer::accept_client(st_netfd_t client_stfd) @@ -164,6 +183,9 @@ int SrsServer::accept_client(st_netfd_t client_stfd)
164 // directly enqueue, the cycle thread will remove the client. 183 // directly enqueue, the cycle thread will remove the client.
165 conns.push_back(conn); 184 conns.push_back(conn);
166 srs_verbose("add conn to vector. conns=%d", (int)conns.size()); 185 srs_verbose("add conn to vector. conns=%d", (int)conns.size());
  186 +
  187 + // ensure the report interval is consts
  188 + srs_report_interval_ms = SRS_CONST_REPORT_INTERVAL_MS * (int)conns.size();
167 189
168 // cycle will start process thread and when finished remove the client. 190 // cycle will start process thread and when finished remove the client.
169 if ((ret = conn->start()) != ERROR_SUCCESS) { 191 if ((ret = conn->start()) != ERROR_SUCCESS) {
@@ -41,6 +41,7 @@ private: @@ -41,6 +41,7 @@ private:
41 int fd; 41 int fd;
42 st_netfd_t stfd; 42 st_netfd_t stfd;
43 std::vector<SrsConnection*> conns; 43 std::vector<SrsConnection*> conns;
  44 + int srs_report_interval_ms;
44 public: 45 public:
45 SrsServer(); 46 SrsServer();
46 virtual ~SrsServer(); 47 virtual ~SrsServer();
@@ -49,6 +50,7 @@ public: @@ -49,6 +50,7 @@ public:
49 virtual int listen(int port); 50 virtual int listen(int port);
50 virtual int cycle(); 51 virtual int cycle();
51 virtual void remove(SrsConnection* conn); 52 virtual void remove(SrsConnection* conn);
  53 + virtual bool can_report(int64_t& reported, int64_t time);
52 private: 54 private:
53 virtual int accept_client(st_netfd_t client_stfd); 55 virtual int accept_client(st_netfd_t client_stfd);
54 virtual void listen_cycle(); 56 virtual void listen_cycle();
@@ -28,41 +28,37 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -28,41 +28,37 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
28 SrsSocket::SrsSocket(st_netfd_t client_stfd) 28 SrsSocket::SrsSocket(st_netfd_t client_stfd)
29 { 29 {
30 stfd = client_stfd; 30 stfd = client_stfd;
  31 + recv_timeout = ST_UTIME_NO_TIMEOUT;
  32 + send_timeout = ST_UTIME_NO_TIMEOUT;
31 } 33 }
32 34
33 SrsSocket::~SrsSocket() 35 SrsSocket::~SrsSocket()
34 { 36 {
35 } 37 }
36 38
37 -int SrsSocket::can_read(int timeout_ms, bool& ready) 39 +void SrsSocket::set_recv_timeout(int timeout_ms)
38 { 40 {
39 - ready = false;  
40 - int ret = ERROR_SUCCESS;  
41 -  
42 - // If the named file descriptor object is ready for I/O within the specified amount of time,  
43 - // a value of 0 is returned. Otherwise, a value of -1 is returned and errno is set to  
44 - // indicate the error  
45 - if(st_netfd_poll(stfd, POLLIN, timeout_ms * 1000) == -1){  
46 - if(errno == ETIME){  
47 - return ret;  
48 - }  
49 -  
50 - return ERROR_SOCKET_WAIT;  
51 - }  
52 -  
53 - ready = true;  
54 - return ret; 41 + recv_timeout = timeout_ms * 1000;
  42 +}
  43 +
  44 +void SrsSocket::set_send_timeout(int timeout_ms)
  45 +{
  46 + send_timeout = timeout_ms * 1000;
55 } 47 }
56 48
57 int SrsSocket::read(const void* buf, size_t size, ssize_t* nread) 49 int SrsSocket::read(const void* buf, size_t size, ssize_t* nread)
58 { 50 {
59 int ret = ERROR_SUCCESS; 51 int ret = ERROR_SUCCESS;
60 52
61 - *nread = st_read(stfd, (void*)buf, size, ST_UTIME_NO_TIMEOUT); 53 + *nread = st_read(stfd, (void*)buf, size, recv_timeout);
62 54
63 // On success a non-negative integer indicating the number of bytes actually read is returned 55 // On success a non-negative integer indicating the number of bytes actually read is returned
64 // (a value of 0 means the network connection is closed or end of file is reached). 56 // (a value of 0 means the network connection is closed or end of file is reached).
65 if (*nread <= 0) { 57 if (*nread <= 0) {
  58 + if (errno == ETIME) {
  59 + return ERROR_SOCKET_TIMEOUT;
  60 + }
  61 +
66 if (*nread == 0) { 62 if (*nread == 0) {
67 errno = ECONNRESET; 63 errno = ECONNRESET;
68 } 64 }
@@ -77,11 +73,15 @@ int SrsSocket::read_fully(const void* buf, size_t size, ssize_t* nread) @@ -77,11 +73,15 @@ int SrsSocket::read_fully(const void* buf, size_t size, ssize_t* nread)
77 { 73 {
78 int ret = ERROR_SUCCESS; 74 int ret = ERROR_SUCCESS;
79 75
80 - *nread = st_read_fully(stfd, (void*)buf, size, ST_UTIME_NO_TIMEOUT); 76 + *nread = st_read_fully(stfd, (void*)buf, size, recv_timeout);
81 77
82 // On success a non-negative integer indicating the number of bytes actually read is returned 78 // On success a non-negative integer indicating the number of bytes actually read is returned
83 // (a value less than nbyte means the network connection is closed or end of file is reached) 79 // (a value less than nbyte means the network connection is closed or end of file is reached)
84 if (*nread != (ssize_t)size) { 80 if (*nread != (ssize_t)size) {
  81 + if (errno == ETIME) {
  82 + return ERROR_SOCKET_TIMEOUT;
  83 + }
  84 +
85 if (*nread >= 0) { 85 if (*nread >= 0) {
86 errno = ECONNRESET; 86 errno = ECONNRESET;
87 } 87 }
@@ -96,9 +96,13 @@ int SrsSocket::write(const void* buf, size_t size, ssize_t* nwrite) @@ -96,9 +96,13 @@ int SrsSocket::write(const void* buf, size_t size, ssize_t* nwrite)
96 { 96 {
97 int ret = ERROR_SUCCESS; 97 int ret = ERROR_SUCCESS;
98 98
99 - *nwrite = st_write(stfd, (void*)buf, size, ST_UTIME_NO_TIMEOUT); 99 + *nwrite = st_write(stfd, (void*)buf, size, send_timeout);
100 100
101 if (*nwrite <= 0) { 101 if (*nwrite <= 0) {
  102 + if (errno == ETIME) {
  103 + return ERROR_SOCKET_TIMEOUT;
  104 + }
  105 +
102 ret = ERROR_SOCKET_WRITE; 106 ret = ERROR_SOCKET_WRITE;
103 } 107 }
104 108
@@ -109,9 +113,13 @@ int SrsSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite) @@ -109,9 +113,13 @@ int SrsSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
109 { 113 {
110 int ret = ERROR_SUCCESS; 114 int ret = ERROR_SUCCESS;
111 115
112 - *nwrite = st_writev(stfd, iov, iov_size, ST_UTIME_NO_TIMEOUT); 116 + *nwrite = st_writev(stfd, iov, iov_size, send_timeout);
113 117
114 if (*nwrite <= 0) { 118 if (*nwrite <= 0) {
  119 + if (errno == ETIME) {
  120 + return ERROR_SOCKET_TIMEOUT;
  121 + }
  122 +
115 ret = ERROR_SOCKET_WRITE; 123 ret = ERROR_SOCKET_WRITE;
116 } 124 }
117 125
@@ -39,12 +39,15 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -39,12 +39,15 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
39 class SrsSocket 39 class SrsSocket
40 { 40 {
41 private: 41 private:
  42 + int64_t recv_timeout;
  43 + int64_t send_timeout;
42 st_netfd_t stfd; 44 st_netfd_t stfd;
43 public: 45 public:
44 SrsSocket(st_netfd_t client_stfd); 46 SrsSocket(st_netfd_t client_stfd);
45 virtual ~SrsSocket(); 47 virtual ~SrsSocket();
46 public: 48 public:
47 - virtual int can_read(int timeout_ms, bool& ready); 49 + virtual void set_recv_timeout(int timeout_ms);
  50 + virtual void set_send_timeout(int timeout_ms);
48 virtual int read(const void* buf, size_t size, ssize_t* nread); 51 virtual int read(const void* buf, size_t size, ssize_t* nread);
49 virtual int read_fully(const void* buf, size_t size, ssize_t* nread); 52 virtual int read_fully(const void* buf, size_t size, ssize_t* nread);
50 virtual int write(const void* buf, size_t size, ssize_t* nwrite); 53 virtual int write(const void* buf, size_t size, ssize_t* nwrite);
@@ -23,6 +23,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -23,6 +23,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23 23
24 #include <srs_core_source.hpp> 24 #include <srs_core_source.hpp>
25 25
  26 +#include <algorithm>
  27 +
26 #include <srs_core_log.hpp> 28 #include <srs_core_log.hpp>
27 #include <srs_core_protocol.hpp> 29 #include <srs_core_protocol.hpp>
28 #include <srs_core_auto_free.hpp> 30 #include <srs_core_auto_free.hpp>
@@ -40,8 +42,9 @@ SrsSource* SrsSource::find(std::string stream_url) @@ -40,8 +42,9 @@ SrsSource* SrsSource::find(std::string stream_url)
40 return pool[stream_url]; 42 return pool[stream_url];
41 } 43 }
42 44
43 -SrsConsumer::SrsConsumer() 45 +SrsConsumer::SrsConsumer(SrsSource* _source)
44 { 46 {
  47 + source = _source;
45 } 48 }
46 49
47 SrsConsumer::~SrsConsumer() 50 SrsConsumer::~SrsConsumer()
@@ -52,6 +55,8 @@ SrsConsumer::~SrsConsumer() @@ -52,6 +55,8 @@ SrsConsumer::~SrsConsumer()
52 srs_freep(msg); 55 srs_freep(msg);
53 } 56 }
54 msgs.clear(); 57 msgs.clear();
  58 +
  59 + source->on_consumer_destroy(this);
55 } 60 }
56 61
57 int SrsConsumer::enqueue(SrsSharedPtrMessage* msg) 62 int SrsConsumer::enqueue(SrsSharedPtrMessage* msg)
@@ -235,7 +240,7 @@ int SrsSource::on_video(SrsCommonMessage* video) @@ -235,7 +240,7 @@ int SrsSource::on_video(SrsCommonMessage* video)
235 { 240 {
236 int ret = ERROR_SUCCESS; 241 int ret = ERROR_SUCCESS;
237 242
238 - consumer = new SrsConsumer(); 243 + consumer = new SrsConsumer(this);
239 consumers.push_back(consumer); 244 consumers.push_back(consumer);
240 245
241 if (cache_metadata && (ret = consumer->enqueue(cache_metadata->copy())) != ERROR_SUCCESS) { 246 if (cache_metadata && (ret = consumer->enqueue(cache_metadata->copy())) != ERROR_SUCCESS) {
@@ -259,3 +264,13 @@ int SrsSource::on_video(SrsCommonMessage* video) @@ -259,3 +264,13 @@ int SrsSource::on_video(SrsCommonMessage* video)
259 return ret; 264 return ret;
260 } 265 }
261 266
  267 +void SrsSource::on_consumer_destroy(SrsConsumer* consumer)
  268 +{
  269 + std::vector<SrsConsumer*>::iterator it;
  270 + it = std::find(consumers.begin(), consumers.end(), consumer);
  271 + if (it != consumers.end()) {
  272 + consumers.erase(it);
  273 + }
  274 + srs_info("handle consumer destroy success.");
  275 +}
  276 +
@@ -34,6 +34,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -34,6 +34,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
34 #include <vector> 34 #include <vector>
35 #include <string> 35 #include <string>
36 36
  37 +class SrsSource;
37 class SrsCommonMessage; 38 class SrsCommonMessage;
38 class SrsOnMetaDataPacket; 39 class SrsOnMetaDataPacket;
39 class SrsSharedPtrMessage; 40 class SrsSharedPtrMessage;
@@ -44,9 +45,10 @@ class SrsSharedPtrMessage; @@ -44,9 +45,10 @@ class SrsSharedPtrMessage;
44 class SrsConsumer 45 class SrsConsumer
45 { 46 {
46 private: 47 private:
  48 + SrsSource* source;
47 std::vector<SrsSharedPtrMessage*> msgs; 49 std::vector<SrsSharedPtrMessage*> msgs;
48 public: 50 public:
49 - SrsConsumer(); 51 + SrsConsumer(SrsSource* _source);
50 virtual ~SrsConsumer(); 52 virtual ~SrsConsumer();
51 public: 53 public:
52 /** 54 /**
@@ -95,6 +97,7 @@ public: @@ -95,6 +97,7 @@ public:
95 virtual int on_video(SrsCommonMessage* video); 97 virtual int on_video(SrsCommonMessage* video);
96 public: 98 public:
97 virtual int create_consumer(SrsConsumer*& consumer); 99 virtual int create_consumer(SrsConsumer*& consumer);
  100 + virtual void on_consumer_destroy(SrsConsumer* consumer);
98 }; 101 };
99 102
100 #endif 103 #endif