winlin

refine code, extract the rtmp jitter for hls

@@ -41,6 +41,7 @@ SrsHLS::SrsHLS() @@ -41,6 +41,7 @@ SrsHLS::SrsHLS()
41 codec = new SrsCodec(); 41 codec = new SrsCodec();
42 sample = new SrsCodecSample(); 42 sample = new SrsCodecSample();
43 muxer = NULL; 43 muxer = NULL;
  44 + jitter = new SrsRtmpJitter();
44 } 45 }
45 46
46 SrsHLS::~SrsHLS() 47 SrsHLS::~SrsHLS()
@@ -48,6 +49,7 @@ SrsHLS::~SrsHLS() @@ -48,6 +49,7 @@ SrsHLS::~SrsHLS()
48 srs_freep(codec); 49 srs_freep(codec);
49 srs_freep(sample); 50 srs_freep(sample);
50 srs_freep(muxer); 51 srs_freep(muxer);
  52 + srs_freep(jitter);
51 } 53 }
52 54
53 int SrsHLS::on_publish(std::string _vhost) 55 int SrsHLS::on_publish(std::string _vhost)
@@ -38,6 +38,7 @@ class SrsCodecBuffer; @@ -38,6 +38,7 @@ class SrsCodecBuffer;
38 class SrsMpegtsFrame; 38 class SrsMpegtsFrame;
39 class SrsTSMuxer; 39 class SrsTSMuxer;
40 class SrsCodec; 40 class SrsCodec;
  41 +class SrsRtmpJitter;
41 42
42 class SrsHLS 43 class SrsHLS
43 { 44 {
@@ -47,6 +48,7 @@ private: @@ -47,6 +48,7 @@ private:
47 SrsCodec* codec; 48 SrsCodec* codec;
48 SrsCodecSample* sample; 49 SrsCodecSample* sample;
49 SrsTSMuxer* muxer; 50 SrsTSMuxer* muxer;
  51 + SrsRtmpJitter* jitter;
50 public: 52 public:
51 SrsHLS(); 53 SrsHLS();
52 virtual ~SrsHLS(); 54 virtual ~SrsHLS();
@@ -36,23 +36,72 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -36,23 +36,72 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
36 #define DEFAULT_FRAME_TIME_MS 10 36 #define DEFAULT_FRAME_TIME_MS 10
37 #define PAUSED_SHRINK_SIZE 250 37 #define PAUSED_SHRINK_SIZE 250
38 38
39 -std::map<std::string, SrsSource*> SrsSource::pool; 39 +SrsRtmpJitter::SrsRtmpJitter()
  40 +{
  41 + last_pkt_correct_time = last_pkt_time = 0;
  42 +}
40 43
41 -SrsSource* SrsSource::find(std::string stream_url) 44 +SrsRtmpJitter::~SrsRtmpJitter()
42 { 45 {
43 - if (pool.find(stream_url) == pool.end()) {  
44 - pool[stream_url] = new SrsSource(stream_url);  
45 - srs_verbose("create new source for url=%s", stream_url.c_str()); 46 +}
  47 +
  48 +int SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, int audio_sample_rate, int video_frame_rate)
  49 +{
  50 + int ret = ERROR_SUCCESS;
  51 +
  52 + /**
  53 + * we use a very simple time jitter detect/correct algorithm:
  54 + * 1. delta: ensure the delta is positive and valid,
  55 + * we set the delta to DEFAULT_FRAME_TIME_MS,
  56 + * if the delta of time is nagative or greater than CONST_MAX_JITTER_MS.
  57 + * 2. last_pkt_time: specifies the original packet time,
  58 + * is used to detect next jitter.
  59 + * 3. last_pkt_correct_time: simply add the positive delta,
  60 + * and enforce the time monotonically.
  61 + */
  62 + u_int32_t time = msg->header.timestamp;
  63 + int32_t delta = time - last_pkt_time;
  64 +
  65 + // if jitter detected, reset the delta.
  66 + if (delta < 0 || delta > CONST_MAX_JITTER_MS) {
  67 + // calc the right diff by audio sample rate
  68 + if (msg->header.is_audio() && audio_sample_rate > 0) {
  69 + delta = (int32_t)(delta * 1000.0 / audio_sample_rate);
  70 + } else if (msg->header.is_video() && video_frame_rate > 0) {
  71 + delta = (int32_t)(delta * 1.0 / video_frame_rate);
  72 + } else {
  73 + delta = DEFAULT_FRAME_TIME_MS;
  74 + }
  75 +
  76 + // sometimes, the time is absolute time, so correct it again.
  77 + if (delta < 0 || delta > CONST_MAX_JITTER_MS) {
  78 + delta = DEFAULT_FRAME_TIME_MS;
  79 + }
  80 +
  81 + srs_info("jitter detected, last_pts=%d, pts=%d, diff=%d, last_time=%d, time=%d, diff=%d",
  82 + last_pkt_time, time, time - last_pkt_time, last_pkt_correct_time, last_pkt_correct_time + delta, delta);
  83 + } else {
  84 + srs_verbose("timestamp no jitter. time=%d, last_pkt=%d, correct_to=%d",
  85 + time, last_pkt_time, last_pkt_correct_time + delta);
46 } 86 }
47 87
48 - return pool[stream_url]; 88 + last_pkt_correct_time = srs_max(0, last_pkt_correct_time + delta);
  89 + msg->header.timestamp = last_pkt_correct_time;
  90 + last_pkt_time = time;
  91 +
  92 + return ret;
  93 +}
  94 +
  95 +int SrsRtmpJitter::get_time()
  96 +{
  97 + return (int)last_pkt_correct_time;
49 } 98 }
50 99
51 SrsConsumer::SrsConsumer(SrsSource* _source) 100 SrsConsumer::SrsConsumer(SrsSource* _source)
52 { 101 {
53 source = _source; 102 source = _source;
54 - last_pkt_correct_time = last_pkt_time = 0;  
55 paused = false; 103 paused = false;
  104 + jitter = new SrsRtmpJitter();
56 } 105 }
57 106
58 SrsConsumer::~SrsConsumer() 107 SrsConsumer::~SrsConsumer()
@@ -60,18 +109,19 @@ SrsConsumer::~SrsConsumer() @@ -60,18 +109,19 @@ SrsConsumer::~SrsConsumer()
60 clear(); 109 clear();
61 110
62 source->on_consumer_destroy(this); 111 source->on_consumer_destroy(this);
  112 + srs_freep(jitter);
63 } 113 }
64 114
65 int SrsConsumer::get_time() 115 int SrsConsumer::get_time()
66 { 116 {
67 - return (int)last_pkt_correct_time; 117 + return jitter->get_time();
68 } 118 }
69 119
70 int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, int audio_sample_rate, int video_frame_rate) 120 int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, int audio_sample_rate, int video_frame_rate)
71 { 121 {
72 int ret = ERROR_SUCCESS; 122 int ret = ERROR_SUCCESS;
73 123
74 - if ((ret = jitter_correct(msg, audio_sample_rate, video_frame_rate)) != ERROR_SUCCESS) { 124 + if ((ret = jitter->correct(msg, audio_sample_rate, video_frame_rate)) != ERROR_SUCCESS) {
75 return ret; 125 return ret;
76 } 126 }
77 127
@@ -178,53 +228,6 @@ void SrsConsumer::shrink() @@ -178,53 +228,6 @@ void SrsConsumer::shrink()
178 } 228 }
179 } 229 }
180 230
181 -int SrsConsumer::jitter_correct(SrsSharedPtrMessage* msg, int audio_sample_rate, int video_frame_rate)  
182 -{  
183 - int ret = ERROR_SUCCESS;  
184 -  
185 - /**  
186 - * we use a very simple time jitter detect/correct algorithm:  
187 - * 1. delta: ensure the delta is positive and valid,  
188 - * we set the delta to DEFAULT_FRAME_TIME_MS,  
189 - * if the delta of time is nagative or greater than CONST_MAX_JITTER_MS.  
190 - * 2. last_pkt_time: specifies the original packet time,  
191 - * is used to detect next jitter.  
192 - * 3. last_pkt_correct_time: simply add the positive delta,  
193 - * and enforce the time monotonically.  
194 - */  
195 - u_int32_t time = msg->header.timestamp;  
196 - int32_t delta = time - last_pkt_time;  
197 -  
198 - // if jitter detected, reset the delta.  
199 - if (delta < 0 || delta > CONST_MAX_JITTER_MS) {  
200 - // calc the right diff by audio sample rate  
201 - if (msg->header.is_audio() && audio_sample_rate > 0) {  
202 - delta = (int32_t)(delta * 1000.0 / audio_sample_rate);  
203 - } else if (msg->header.is_video() && video_frame_rate > 0) {  
204 - delta = (int32_t)(delta * 1.0 / video_frame_rate);  
205 - } else {  
206 - delta = DEFAULT_FRAME_TIME_MS;  
207 - }  
208 -  
209 - // sometimes, the time is absolute time, so correct it again.  
210 - if (delta < 0 || delta > CONST_MAX_JITTER_MS) {  
211 - delta = DEFAULT_FRAME_TIME_MS;  
212 - }  
213 -  
214 - srs_info("jitter detected, last_pts=%d, pts=%d, diff=%d, last_time=%d, time=%d, diff=%d",  
215 - last_pkt_time, time, time - last_pkt_time, last_pkt_correct_time, last_pkt_correct_time + delta, delta);  
216 - } else {  
217 - srs_verbose("timestamp no jitter. time=%d, last_pkt=%d, correct_to=%d",  
218 - time, last_pkt_time, last_pkt_correct_time + delta);  
219 - }  
220 -  
221 - last_pkt_correct_time = srs_max(0, last_pkt_correct_time + delta);  
222 - msg->header.timestamp = last_pkt_correct_time;  
223 - last_pkt_time = time;  
224 -  
225 - return ret;  
226 -}  
227 -  
228 void SrsConsumer::clear() 231 void SrsConsumer::clear()
229 { 232 {
230 std::vector<SrsSharedPtrMessage*>::iterator it; 233 std::vector<SrsSharedPtrMessage*>::iterator it;
@@ -235,6 +238,18 @@ void SrsConsumer::clear() @@ -235,6 +238,18 @@ void SrsConsumer::clear()
235 msgs.clear(); 238 msgs.clear();
236 } 239 }
237 240
  241 +std::map<std::string, SrsSource*> SrsSource::pool;
  242 +
  243 +SrsSource* SrsSource::find(std::string stream_url)
  244 +{
  245 + if (pool.find(stream_url) == pool.end()) {
  246 + pool[stream_url] = new SrsSource(stream_url);
  247 + srs_verbose("create new source for url=%s", stream_url.c_str());
  248 + }
  249 +
  250 + return pool[stream_url];
  251 +}
  252 +
238 SrsSource::SrsSource(std::string _stream_url) 253 SrsSource::SrsSource(std::string _stream_url)
239 { 254 {
240 stream_url = _stream_url; 255 stream_url = _stream_url;
@@ -41,13 +41,35 @@ class SrsSharedPtrMessage; @@ -41,13 +41,35 @@ class SrsSharedPtrMessage;
41 class SrsHLS; 41 class SrsHLS;
42 42
43 /** 43 /**
44 -* the consumer for SrsSource, that is a play client. 44 +* time jitter detect and correct,
  45 +* to ensure the rtmp stream is monotonically.
45 */ 46 */
46 -class SrsConsumer 47 +class SrsRtmpJitter
47 { 48 {
48 private: 49 private:
49 u_int32_t last_pkt_time; 50 u_int32_t last_pkt_time;
50 u_int32_t last_pkt_correct_time; 51 u_int32_t last_pkt_correct_time;
  52 +public:
  53 + SrsRtmpJitter();
  54 + virtual ~SrsRtmpJitter();
  55 +public:
  56 + /**
  57 + * detect the time jitter and correct it.
  58 + */
  59 + virtual int correct(SrsSharedPtrMessage* msg, int audio_sample_rate, int video_frame_rate);
  60 + /**
  61 + * get current client time, the last packet time.
  62 + */
  63 + virtual int get_time();
  64 +};
  65 +
  66 +/**
  67 +* the consumer for SrsSource, that is a play client.
  68 +*/
  69 +class SrsConsumer
  70 +{
  71 +private:
  72 + SrsRtmpJitter* jitter;
51 SrsSource* source; 73 SrsSource* source;
52 std::vector<SrsSharedPtrMessage*> msgs; 74 std::vector<SrsSharedPtrMessage*> msgs;
53 bool paused; 75 bool paused;
@@ -82,10 +104,6 @@ private: @@ -82,10 +104,6 @@ private:
82 * remove to cache only one gop. 104 * remove to cache only one gop.
83 */ 105 */
84 virtual void shrink(); 106 virtual void shrink();
85 - /**  
86 - * detect the time jitter and correct it.  
87 - */  
88 - virtual int jitter_correct(SrsSharedPtrMessage* msg, int audio_sample_rate, int video_frame_rate);  
89 virtual void clear(); 107 virtual void clear();
90 }; 108 };
91 109