winlin

merge from 2.0

@@ -348,6 +348,7 @@ Remark: @@ -348,6 +348,7 @@ Remark:
348 348
349 ### SRS 2.0 history 349 ### SRS 2.0 history
350 350
  351 +* v2.0, 2015-07-16, for [#441](https://github.com/simple-rtmp-server/srs/issues/441) use 30s timeout for first msg. 2.0.178
351 * v2.0, 2015-07-14, refine hls disable the time jitter, support not mix monotonically increase. 2.0.177 352 * v2.0, 2015-07-14, refine hls disable the time jitter, support not mix monotonically increase. 2.0.177
352 * v2.0, 2015-07-01, fix [#433](https://github.com/simple-rtmp-server/srs/issues/433) fix the sps parse bug. 2.0.176 353 * v2.0, 2015-07-01, fix [#433](https://github.com/simple-rtmp-server/srs/issues/433) fix the sps parse bug. 2.0.176
353 * v2.0, 2015-06-10, fix [#425](https://github.com/simple-rtmp-server/srs/issues/425) refine the time jitter, correct (-inf,-250)+(250,+inf) to 10ms. 2.0.175 354 * v2.0, 2015-06-10, fix [#425](https://github.com/simple-rtmp-server/srs/issues/425) refine the time jitter, correct (-inf,-250)+(250,+inf) to 10ms. 2.0.175
@@ -23,6 +23,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -23,6 +23,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23 23
24 #include <srs_app_http_stream.hpp> 24 #include <srs_app_http_stream.hpp>
25 25
  26 +#define SRS_STREAM_CACHE_CYCLE_SECONDS 30
  27 +
26 #if defined(SRS_AUTO_HTTP_CORE) 28 #if defined(SRS_AUTO_HTTP_CORE)
27 29
28 #include <sys/types.h> 30 #include <sys/types.h>
@@ -63,6 +65,9 @@ SrsStreamCache::SrsStreamCache(SrsSource* s, SrsRequest* r) @@ -63,6 +65,9 @@ SrsStreamCache::SrsStreamCache(SrsSource* s, SrsRequest* r)
63 source = s; 65 source = s;
64 queue = new SrsMessageQueue(true); 66 queue = new SrsMessageQueue(true);
65 pthread = new SrsEndlessThread("http-stream", this); 67 pthread = new SrsEndlessThread("http-stream", this);
  68 +
  69 + // TODO: FIXME: support reload.
  70 + fast_cache = _srs_config->get_vhost_http_remux_fast_cache(req->vhost);
66 } 71 }
67 72
68 SrsStreamCache::~SrsStreamCache() 73 SrsStreamCache::~SrsStreamCache()
@@ -82,8 +87,6 @@ int SrsStreamCache::dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jit @@ -82,8 +87,6 @@ int SrsStreamCache::dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jit
82 { 87 {
83 int ret = ERROR_SUCCESS; 88 int ret = ERROR_SUCCESS;
84 89
85 - double fast_cache = _srs_config->get_vhost_http_remux_fast_cache(req->vhost);  
86 -  
87 if (fast_cache <= 0) { 90 if (fast_cache <= 0) {
88 srs_info("http: ignore dump fast cache."); 91 srs_info("http: ignore dump fast cache.");
89 return ret; 92 return ret;
@@ -104,6 +107,14 @@ int SrsStreamCache::cycle() @@ -104,6 +107,14 @@ int SrsStreamCache::cycle()
104 { 107 {
105 int ret = ERROR_SUCCESS; 108 int ret = ERROR_SUCCESS;
106 109
  110 + // TODO: FIXME: support reload.
  111 + if (fast_cache <= 0) {
  112 + st_sleep(SRS_STREAM_CACHE_CYCLE_SECONDS);
  113 + return ret;
  114 + }
  115 +
  116 + // the stream cache will create consumer to cache stream,
  117 + // which will trigger to fetch stream from origin for edge.
107 SrsConsumer* consumer = NULL; 118 SrsConsumer* consumer = NULL;
108 if ((ret = source->create_consumer(consumer, false, false, true)) != ERROR_SUCCESS) { 119 if ((ret = source->create_consumer(consumer, false, false, true)) != ERROR_SUCCESS) {
109 srs_error("http: create consumer failed. ret=%d", ret); 120 srs_error("http: create consumer failed. ret=%d", ret);
@@ -116,11 +127,9 @@ int SrsStreamCache::cycle() @@ -116,11 +127,9 @@ int SrsStreamCache::cycle()
116 127
117 SrsMessageArray msgs(SRS_PERF_MW_MSGS); 128 SrsMessageArray msgs(SRS_PERF_MW_MSGS);
118 129
  130 + // set the queue size, which used for max cache.
119 // TODO: FIXME: support reload. 131 // TODO: FIXME: support reload.
120 - double fast_cache = _srs_config->get_vhost_http_remux_fast_cache(req->vhost);  
121 - if (fast_cache > 0) {  
122 - queue->set_queue_size(fast_cache);  
123 - } 132 + queue->set_queue_size(fast_cache);
124 133
125 while (true) { 134 while (true) {
126 pprint->elapse(); 135 pprint->elapse();
@@ -150,11 +159,7 @@ int SrsStreamCache::cycle() @@ -150,11 +159,7 @@ int SrsStreamCache::cycle()
150 // free the messages. 159 // free the messages.
151 for (int i = 0; i < count; i++) { 160 for (int i = 0; i < count; i++) {
152 SrsSharedPtrMessage* msg = msgs.msgs[i]; 161 SrsSharedPtrMessage* msg = msgs.msgs[i];
153 - if (fast_cache > 0) {  
154 - queue->enqueue(msg);  
155 - } else {  
156 - srs_freep(msg);  
157 - } 162 + queue->enqueue(msg);
158 } 163 }
159 } 164 }
160 165
@@ -1137,8 +1142,10 @@ int SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph) @@ -1137,8 +1142,10 @@ int SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph)
1137 } 1142 }
1138 1143
1139 // hstrs not enabled, ignore. 1144 // hstrs not enabled, ignore.
1140 - // for origin: generally set hstrs to 'off' and mount while stream is pushed to origin.  
1141 - // for edge: must set hstrs to 'on' so that it could trigger rtmp stream before mount. 1145 + // for origin, the http stream will be mount already when publish,
  1146 + // so it must never enter this line for stream already mounted.
  1147 + // for edge, the http stream is trigger by hstrs and mount by it,
  1148 + // so we only hijack when only edge and hstrs is on.
1142 entry = it->second; 1149 entry = it->second;
1143 if (!entry->hstrs) { 1150 if (!entry->hstrs) {
1144 return ret; 1151 return ret;
@@ -1175,12 +1182,18 @@ int SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph) @@ -1175,12 +1182,18 @@ int SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph)
1175 SrsAutoFree(SrsRequest, r); 1182 SrsAutoFree(SrsRequest, r);
1176 1183
1177 std::string sid = r->get_stream_url(); 1184 std::string sid = r->get_stream_url();
1178 - // check if the stream is enabled. 1185 + // check whether the http remux is enabled,
  1186 + // for example, user disable the http flv then reload.
1179 if (sflvs.find(sid) != sflvs.end()) { 1187 if (sflvs.find(sid) != sflvs.end()) {
1180 SrsLiveEntry* s_entry = sflvs[sid]; 1188 SrsLiveEntry* s_entry = sflvs[sid];
1181 if (!s_entry->stream->entry->enabled) { 1189 if (!s_entry->stream->entry->enabled) {
1182 - srs_error("stream is disabled, hijack failed. ret=%d", ret);  
1183 - return ret; 1190 + // only when the http entry is disabled, check the config whether http flv disable,
  1191 + // for the http flv edge use hijack to trigger the edge ingester, we always mount it
  1192 + // eventhough the origin does not exists the specified stream.
  1193 + if (!_srs_config->get_vhost_http_remux_enabled(r->vhost)) {
  1194 + srs_error("stream is disabled, hijack failed. ret=%d", ret);
  1195 + return ret;
  1196 + }
1184 } 1197 }
1185 } 1198 }
1186 1199
@@ -1210,15 +1223,6 @@ int SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph) @@ -1210,15 +1223,6 @@ int SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph)
1210 srs_trace("hstrs: source url=%s, is_edge=%d, source_id=%d[%d]", 1223 srs_trace("hstrs: source url=%s, is_edge=%d, source_id=%d[%d]",
1211 r->get_stream_url().c_str(), vhost_is_edge, s->source_id(), s->source_id()); 1224 r->get_stream_url().c_str(), vhost_is_edge, s->source_id(), s->source_id());
1212 1225
1213 - // TODO: FIXME: disconnect when all connection closed.  
1214 - if (vhost_is_edge) {  
1215 - // notice edge to start for the first client.  
1216 - if ((ret = s->on_edge_start_play()) != ERROR_SUCCESS) {  
1217 - srs_error("notice edge start play stream failed. ret=%d", ret);  
1218 - return ret;  
1219 - }  
1220 - }  
1221 -  
1222 return ret; 1226 return ret;
1223 } 1227 }
1224 1228
@@ -42,6 +42,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -42,6 +42,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
42 class SrsStreamCache : public ISrsEndlessThreadHandler 42 class SrsStreamCache : public ISrsEndlessThreadHandler
43 { 43 {
44 private: 44 private:
  45 + double fast_cache;
  46 +private:
45 SrsMessageQueue* queue; 47 SrsMessageQueue* queue;
46 SrsSource* source; 48 SrsSource* source;
47 SrsRequest* req; 49 SrsRequest* req;
@@ -537,14 +537,6 @@ int SrsRtmpConn::playing(SrsSource* source) @@ -537,14 +537,6 @@ int SrsRtmpConn::playing(SrsSource* source)
537 SrsAutoFree(SrsConsumer, consumer); 537 SrsAutoFree(SrsConsumer, consumer);
538 srs_verbose("consumer created success."); 538 srs_verbose("consumer created success.");
539 539
540 - if (_srs_config->get_vhost_is_edge(req->vhost)) {  
541 - // notice edge to start for the first client.  
542 - if ((ret = source->on_edge_start_play()) != ERROR_SUCCESS) {  
543 - srs_error("notice edge start play stream failed. ret=%d", ret);  
544 - return ret;  
545 - }  
546 - }  
547 -  
548 // use isolate thread to recv, 540 // use isolate thread to recv,
549 // @see: https://github.com/simple-rtmp-server/srs/issues/217 541 // @see: https://github.com/simple-rtmp-server/srs/issues/217
550 SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP); 542 SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP);
@@ -782,8 +774,14 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) @@ -782,8 +774,14 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
782 while (!disposed) { 774 while (!disposed) {
783 pprint->elapse(); 775 pprint->elapse();
784 776
785 - // cond wait for error.  
786 - trd->wait(SRS_CONSTS_RTMP_PUBLISHER_RECV_TIMEOUT_US / 1000); 777 + // cond wait for timeout.
  778 + if (nb_msgs == 0) {
  779 + // when not got msgs, wait for a larger timeout.
  780 + // @see https://github.com/simple-rtmp-server/srs/issues/441
  781 + trd->wait(SRS_CONSTS_RTMP_PUBLISHER_NO_MSG_RECV_TIMEOUT_US / 1000);
  782 + } else {
  783 + trd->wait(SRS_CONSTS_RTMP_PUBLISHER_RECV_TIMEOUT_US / 1000);
  784 + }
787 785
788 // check the thread error code. 786 // check the thread error code.
789 if ((ret = trd->error_code()) != ERROR_SUCCESS) { 787 if ((ret = trd->error_code()) != ERROR_SUCCESS) {
@@ -835,7 +833,6 @@ int SrsRtmpConn::acquire_publish(SrsSource* source, bool is_edge) @@ -835,7 +833,6 @@ int SrsRtmpConn::acquire_publish(SrsSource* source, bool is_edge)
835 if ((ret = source->on_edge_start_publish()) != ERROR_SUCCESS) { 833 if ((ret = source->on_edge_start_publish()) != ERROR_SUCCESS) {
836 srs_error("notice edge start publish stream failed. ret=%d", ret); 834 srs_error("notice edge start publish stream failed. ret=%d", ret);
837 } 835 }
838 - return ret;  
839 } else { 836 } else {
840 if ((ret = source->on_publish()) != ERROR_SUCCESS) { 837 if ((ret = source->on_publish()) != ERROR_SUCCESS) {
841 srs_error("notify publish failed. ret=%d", ret); 838 srs_error("notify publish failed. ret=%d", ret);
@@ -765,7 +765,7 @@ SrsSource* SrsSource::fetch(SrsRequest* r) @@ -765,7 +765,7 @@ SrsSource* SrsSource::fetch(SrsRequest* r)
765 SrsSource* SrsSource::fetch(std::string vhost, std::string app, std::string stream) 765 SrsSource* SrsSource::fetch(std::string vhost, std::string app, std::string stream)
766 { 766 {
767 SrsSource* source = NULL; 767 SrsSource* source = NULL;
768 - string stream_url = srs_generate_stream_url(vhost, app, stream); 768 + string stream_url = srs_generate_stream_url(vhost, app, stream);
769 769
770 if (pool.find(stream_url) == pool.end()) { 770 if (pool.find(stream_url) == pool.end()) {
771 return NULL; 771 return NULL;
@@ -2135,6 +2135,15 @@ int SrsSource::create_consumer(SrsConsumer*& consumer, bool ds, bool dm, bool dg @@ -2135,6 +2135,15 @@ int SrsSource::create_consumer(SrsConsumer*& consumer, bool ds, bool dm, bool dg
2135 } else { 2135 } else {
2136 srs_trace("create consumer, ignore gop cache, jitter=%d", jitter_algorithm); 2136 srs_trace("create consumer, ignore gop cache, jitter=%d", jitter_algorithm);
2137 } 2137 }
  2138 +
  2139 + // for edge, when play edge stream, check the state
  2140 + if (_srs_config->get_vhost_is_edge(_req->vhost)) {
  2141 + // notice edge to start for the first client.
  2142 + if ((ret = play_edge->on_client_play()) != ERROR_SUCCESS) {
  2143 + srs_error("notice edge start play stream failed. ret=%d", ret);
  2144 + return ret;
  2145 + }
  2146 + }
2138 2147
2139 return ret; 2148 return ret;
2140 } 2149 }
@@ -2163,11 +2172,6 @@ SrsRtmpJitterAlgorithm SrsSource::jitter() @@ -2163,11 +2172,6 @@ SrsRtmpJitterAlgorithm SrsSource::jitter()
2163 return jitter_algorithm; 2172 return jitter_algorithm;
2164 } 2173 }
2165 2174
2166 -int SrsSource::on_edge_start_play()  
2167 -{  
2168 - return play_edge->on_client_play();  
2169 -}  
2170 -  
2171 int SrsSource::on_edge_start_publish() 2175 int SrsSource::on_edge_start_publish()
2172 { 2176 {
2173 return publish_edge->on_client_publish(); 2177 return publish_edge->on_client_publish();
@@ -574,8 +574,6 @@ public: @@ -574,8 +574,6 @@ public:
574 virtual SrsRtmpJitterAlgorithm jitter(); 574 virtual SrsRtmpJitterAlgorithm jitter();
575 // internal 575 // internal
576 public: 576 public:
577 - // for edge, when play edge stream, check the state  
578 - virtual int on_edge_start_play();  
579 // for edge, when publish edge stream, check the state 577 // for edge, when publish edge stream, check the state
580 virtual int on_edge_start_publish(); 578 virtual int on_edge_start_publish();
581 // for edge, proxy the publish 579 // for edge, proxy the publish
@@ -78,6 +78,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -78,6 +78,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
78 // we must use more smaller timeout, for the recv never know the status 78 // we must use more smaller timeout, for the recv never know the status
79 // of underlayer socket. 79 // of underlayer socket.
80 #define SRS_CONSTS_RTMP_PUBLISHER_RECV_TIMEOUT_US (int64_t)(3*1000*1000LL) 80 #define SRS_CONSTS_RTMP_PUBLISHER_RECV_TIMEOUT_US (int64_t)(3*1000*1000LL)
  81 +// when no msg recevied for publisher, use larger timeout.
  82 +#define SRS_CONSTS_RTMP_PUBLISHER_NO_MSG_RECV_TIMEOUT_US 10*SRS_CONSTS_RTMP_PUBLISHER_RECV_TIMEOUT_US
81 83
82 // the timeout to wait for client control message, 84 // the timeout to wait for client control message,
83 // if timeout, we generally ignore and send the data to client, 85 // if timeout, we generally ignore and send the data to client,