winlin

fix the forwarder reconnect bug, feed it the sequence header.

@@ -212,6 +212,7 @@ usr sys idl wai hiq siq| read writ| recv send| in out | int csw @@ -212,6 +212,7 @@ usr sys idl wai hiq siq| read writ| recv send| in out | int csw
212 * nginx v1.5.0: 139524 lines <br/> 212 * nginx v1.5.0: 139524 lines <br/>
213 213
214 ### History 214 ### History
  215 +* v0.9, 2013-12-15, fix the forwarder reconnect bug, feed it the sequence header.
215 * v0.9, 2013-12-15, support reload the hls/forwarder/transcoder. 216 * v0.9, 2013-12-15, support reload the hls/forwarder/transcoder.
216 * v0.9, 2013-12-14, refine the thread model for the retry threads. 217 * v0.9, 2013-12-14, refine the thread model for the retry threads.
217 * v0.9, 2013-12-10, auto install depends tools/libs on centos/ubuntu. 218 * v0.9, 2013-12-10, auto install depends tools/libs on centos/ubuntu.
@@ -59,7 +59,9 @@ else @@ -59,7 +59,9 @@ else
59 echo "build x264" 59 echo "build x264"
60 cd $ff_current_dir && 60 cd $ff_current_dir &&
61 rm -rf x264-snapshot-20131129-2245-stable && unzip -q ${ff_src_dir}/x264-snapshot-20131129-2245-stable.zip && 61 rm -rf x264-snapshot-20131129-2245-stable && unzip -q ${ff_src_dir}/x264-snapshot-20131129-2245-stable.zip &&
62 - cd x264-snapshot-20131129-2245-stable && ./configure --prefix=${ff_release_dir} --disable-opencl --bit-depth=8 --enable-static && make && make install 62 + cd x264-snapshot-20131129-2245-stable &&
  63 + ./configure --prefix=${ff_release_dir} --disable-opencl --bit-depth=8 --enable-static &&
  64 + make && make install
63 ret=$?; if [[ 0 -ne ${ret} ]]; then echo "build x264 failed"; exit 1; fi 65 ret=$?; if [[ 0 -ne ${ret} ]]; then echo "build x264 failed"; exit 1; fi
64 fi 66 fi
65 67
@@ -92,7 +92,7 @@ vhost __defaultVhost__ { @@ -92,7 +92,7 @@ vhost __defaultVhost__ {
92 vhost dev { 92 vhost dev {
93 enabled on; 93 enabled on;
94 gop_cache on; 94 gop_cache on;
95 - #forward 127.0.0.1:19350; 95 + forward 127.0.0.1:19350;
96 hls { 96 hls {
97 enabled off; 97 enabled off;
98 hls_path ./objs/nginx/html; 98 hls_path ./objs/nginx/html;
@@ -35,14 +35,17 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -35,14 +35,17 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
35 #include <srs_core_pithy_print.hpp> 35 #include <srs_core_pithy_print.hpp>
36 #include <srs_core_rtmp.hpp> 36 #include <srs_core_rtmp.hpp>
37 #include <srs_core_config.hpp> 37 #include <srs_core_config.hpp>
  38 +#include <srs_core_source.hpp>
38 39
39 #define SRS_PULSE_TIMEOUT_MS 100 40 #define SRS_PULSE_TIMEOUT_MS 100
40 #define SRS_FORWARDER_SLEEP_MS 2000 41 #define SRS_FORWARDER_SLEEP_MS 2000
41 #define SRS_SEND_TIMEOUT_US 3000000L 42 #define SRS_SEND_TIMEOUT_US 3000000L
42 #define SRS_RECV_TIMEOUT_US SRS_SEND_TIMEOUT_US 43 #define SRS_RECV_TIMEOUT_US SRS_SEND_TIMEOUT_US
43 44
44 -SrsForwarder::SrsForwarder() 45 +SrsForwarder::SrsForwarder(SrsSource* _source)
45 { 46 {
  47 + source = _source;
  48 +
46 client = NULL; 49 client = NULL;
47 stfd = NULL; 50 stfd = NULL;
48 stream_id = 0; 51 stream_id = 0;
@@ -182,14 +185,17 @@ int SrsForwarder::cycle() @@ -182,14 +185,17 @@ int SrsForwarder::cycle()
182 return ret; 185 return ret;
183 } 186 }
184 187
185 - // TODO: FIXME: need to cache the metadata and sequence header when reconnect.  
186 -  
187 if ((ret = client->publish(stream_name, stream_id)) != ERROR_SUCCESS) { 188 if ((ret = client->publish(stream_name, stream_id)) != ERROR_SUCCESS) {
188 srs_error("connect with server failed, stream_name=%s, stream_id=%d. ret=%d", 189 srs_error("connect with server failed, stream_name=%s, stream_id=%d. ret=%d",
189 stream_name.c_str(), stream_id, ret); 190 stream_name.c_str(), stream_id, ret);
190 return ret; 191 return ret;
191 } 192 }
192 193
  194 + if ((ret = source->on_forwarder_start(this)) != ERROR_SUCCESS) {
  195 + srs_error("callback the source to feed the sequence header failed. ret=%d", ret);
  196 + return ret;
  197 + }
  198 +
193 if ((ret = forward()) != ERROR_SUCCESS) { 199 if ((ret = forward()) != ERROR_SUCCESS) {
194 return ret; 200 return ret;
195 } 201 }
@@ -38,6 +38,7 @@ class SrsSharedPtrMessage; @@ -38,6 +38,7 @@ class SrsSharedPtrMessage;
38 class SrsOnMetaDataPacket; 38 class SrsOnMetaDataPacket;
39 class SrsRtmpClient; 39 class SrsRtmpClient;
40 class SrsRequest; 40 class SrsRequest;
  41 +class SrsSource;
41 42
42 /** 43 /**
43 * forward the stream to other servers. 44 * forward the stream to other servers.
@@ -55,10 +56,11 @@ private: @@ -55,10 +56,11 @@ private:
55 st_netfd_t stfd; 56 st_netfd_t stfd;
56 SrsThread* pthread; 57 SrsThread* pthread;
57 private: 58 private:
  59 + SrsSource* source;
58 SrsRtmpClient* client; 60 SrsRtmpClient* client;
59 std::vector<SrsSharedPtrMessage*> msgs; 61 std::vector<SrsSharedPtrMessage*> msgs;
60 public: 62 public:
61 - SrsForwarder(); 63 + SrsForwarder(SrsSource* _source);
62 virtual ~SrsForwarder(); 64 virtual ~SrsForwarder();
63 public: 65 public:
64 virtual int on_publish(SrsRequest* req, std::string forward_server); 66 virtual int on_publish(SrsRequest* req, std::string forward_server);
@@ -450,7 +450,7 @@ int SrsSource::on_reload_forward(string vhost) @@ -450,7 +450,7 @@ int SrsSource::on_reload_forward(string vhost)
450 srs_error("create forwarders failed. ret=%d", ret); 450 srs_error("create forwarders failed. ret=%d", ret);
451 return ret; 451 return ret;
452 } 452 }
453 - // TODO: FIXME: must feed it the sequence header. 453 +
454 srs_trace("vhost %s forwarders reload success", vhost.c_str()); 454 srs_trace("vhost %s forwarders reload success", vhost.c_str());
455 455
456 return ret; 456 return ret;
@@ -498,6 +498,28 @@ int SrsSource::on_reload_transcode(string vhost) @@ -498,6 +498,28 @@ int SrsSource::on_reload_transcode(string vhost)
498 return ret; 498 return ret;
499 } 499 }
500 500
  501 +int SrsSource::on_forwarder_start(SrsForwarder* forwarder)
  502 +{
  503 + int ret = ERROR_SUCCESS;
  504 +
  505 + // feed the forwarder the metadata/sequence header,
  506 + // when reload to enable the forwarder.
  507 + if (cache_metadata && (ret = forwarder->on_meta_data(cache_metadata->copy())) != ERROR_SUCCESS) {
  508 + srs_error("forwarder process onMetaData message failed. ret=%d", ret);
  509 + return ret;
  510 + }
  511 + if (cache_sh_video && (ret = forwarder->on_video(cache_sh_video->copy())) != ERROR_SUCCESS) {
  512 + srs_error("forwarder process video sequence header message failed. ret=%d", ret);
  513 + return ret;
  514 + }
  515 + if (cache_sh_audio && (ret = forwarder->on_audio(cache_sh_audio->copy())) != ERROR_SUCCESS) {
  516 + srs_error("forwarder process audio sequence header message failed. ret=%d", ret);
  517 + return ret;
  518 + }
  519 +
  520 + return ret;
  521 +}
  522 +
501 bool SrsSource::can_publish() 523 bool SrsSource::can_publish()
502 { 524 {
503 return _can_publish; 525 return _can_publish;
@@ -837,7 +859,7 @@ int SrsSource::create_forwarders() @@ -837,7 +859,7 @@ int SrsSource::create_forwarders()
837 for (int i = 0; conf && i < (int)conf->args.size(); i++) { 859 for (int i = 0; conf && i < (int)conf->args.size(); i++) {
838 std::string forward_server = conf->args.at(i); 860 std::string forward_server = conf->args.at(i);
839 861
840 - SrsForwarder* forwarder = new SrsForwarder(); 862 + SrsForwarder* forwarder = new SrsForwarder(this);
841 forwarders.push_back(forwarder); 863 forwarders.push_back(forwarder);
842 864
843 if ((ret = forwarder->on_publish(req, forward_server)) != ERROR_SUCCESS) { 865 if ((ret = forwarder->on_publish(req, forward_server)) != ERROR_SUCCESS) {
@@ -221,6 +221,9 @@ public: @@ -221,6 +221,9 @@ public:
221 virtual int on_reload_forward(std::string vhost); 221 virtual int on_reload_forward(std::string vhost);
222 virtual int on_reload_hls(std::string vhost); 222 virtual int on_reload_hls(std::string vhost);
223 virtual int on_reload_transcode(std::string vhost); 223 virtual int on_reload_transcode(std::string vhost);
  224 +// for the SrsForwarder to callback to request the sequence headers.
  225 +public:
  226 + virtual int on_forwarder_start(SrsForwarder* forwarder);
224 public: 227 public:
225 virtual bool can_publish(); 228 virtual bool can_publish();
226 virtual int on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata); 229 virtual int on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata);