winlin

fix the timestamp bug. correct the audio diff by audiosamplerate

@@ -817,19 +817,6 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz @@ -817,19 +817,6 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz
817 pp[0] = *p++; 817 pp[0] = *p++;
818 pp[3] = 0; 818 pp[3] = 0;
819 819
820 - if (fmt == RTMP_FMT_TYPE0) {  
821 - // 6.1.2.1. Type 0  
822 - // For a type-0 chunk, the absolute timestamp of the message is sent  
823 - // here.  
824 - chunk->header.timestamp = chunk->header.timestamp_delta;  
825 - } else {  
826 - // 6.1.2.2. Type 1  
827 - // 6.1.2.3. Type 2  
828 - // For a type-1 or type-2 chunk, the difference between the previous  
829 - // chunk's timestamp and the current chunk's timestamp is sent here.  
830 - chunk->header.timestamp += chunk->header.timestamp_delta;  
831 - }  
832 -  
833 // fmt: 0 820 // fmt: 0
834 // timestamp: 3 bytes 821 // timestamp: 3 bytes
835 // If the timestamp is greater than or equal to 16777215 822 // If the timestamp is greater than or equal to 16777215
@@ -845,7 +832,32 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz @@ -845,7 +832,32 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz
845 // the entire delta. 832 // the entire delta.
846 chunk->extended_timestamp = (chunk->header.timestamp_delta >= RTMP_EXTENDED_TIMESTAMP); 833 chunk->extended_timestamp = (chunk->header.timestamp_delta >= RTMP_EXTENDED_TIMESTAMP);
847 if (chunk->extended_timestamp) { 834 if (chunk->extended_timestamp) {
  835 + // Extended timestamp: 0 or 4 bytes
  836 + // This field MUST be sent when the normal timsestamp is set to
  837 + // 0xffffff, it MUST NOT be sent if the normal timestamp is set to
  838 + // anything else. So for values less than 0xffffff the normal
  839 + // timestamp field SHOULD be used in which case the extended timestamp
  840 + // MUST NOT be present. For values greater than or equal to 0xffffff
  841 + // the normal timestamp field MUST NOT be used and MUST be set to
  842 + // 0xffffff and the extended timestamp MUST be sent.
  843 + //
  844 + // if extended timestamp, the timestamp must >= RTMP_EXTENDED_TIMESTAMP
  845 + // we set the timestamp to RTMP_EXTENDED_TIMESTAMP to identify we
  846 + // got an extended timestamp.
848 chunk->header.timestamp = RTMP_EXTENDED_TIMESTAMP; 847 chunk->header.timestamp = RTMP_EXTENDED_TIMESTAMP;
  848 + } else {
  849 + if (fmt == RTMP_FMT_TYPE0) {
  850 + // 6.1.2.1. Type 0
  851 + // For a type-0 chunk, the absolute timestamp of the message is sent
  852 + // here.
  853 + chunk->header.timestamp = chunk->header.timestamp_delta;
  854 + } else {
  855 + // 6.1.2.2. Type 1
  856 + // 6.1.2.3. Type 2
  857 + // For a type-1 or type-2 chunk, the difference between the previous
  858 + // chunk's timestamp and the current chunk's timestamp is sent here.
  859 + chunk->header.timestamp += chunk->header.timestamp_delta;
  860 + }
849 } 861 }
850 862
851 if (fmt <= RTMP_FMT_TYPE1) { 863 if (fmt <= RTMP_FMT_TYPE1) {
@@ -913,7 +925,10 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz @@ -913,7 +925,10 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz
913 pp[1] = *p++; 925 pp[1] = *p++;
914 pp[0] = *p++; 926 pp[0] = *p++;
915 927
916 - if (chunk->header.timestamp > RTMP_EXTENDED_TIMESTAMP && chunk->header.timestamp != timestamp) { 928 + // compare to the chunk timestamp, which is set by chunk message header
  929 + // type 0,1 or 2.
  930 + int32_t chunk_timestamp = chunk->header.timestamp;
  931 + if (chunk_timestamp > RTMP_EXTENDED_TIMESTAMP && chunk_timestamp != timestamp) {
917 mh_size -= 4; 932 mh_size -= 4;
918 srs_verbose("ignore the 4bytes extended timestamp. mh_size=%d", mh_size); 933 srs_verbose("ignore the 4bytes extended timestamp. mh_size=%d", mh_size);
919 } else { 934 } else {
@@ -69,11 +69,11 @@ int SrsConsumer::get_time() @@ -69,11 +69,11 @@ int SrsConsumer::get_time()
69 return (int)last_pkt_correct_time; 69 return (int)last_pkt_correct_time;
70 } 70 }
71 71
72 -int SrsConsumer::enqueue(SrsSharedPtrMessage* msg) 72 +int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, int audio_sample_rate)
73 { 73 {
74 int ret = ERROR_SUCCESS; 74 int ret = ERROR_SUCCESS;
75 75
76 - if ((ret = jitter_correct(msg)) != ERROR_SUCCESS) { 76 + if ((ret = jitter_correct(msg, audio_sample_rate)) != ERROR_SUCCESS) {
77 return ret; 77 return ret;
78 } 78 }
79 79
@@ -111,7 +111,7 @@ int SrsConsumer::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& c @@ -111,7 +111,7 @@ int SrsConsumer::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& c
111 return ret; 111 return ret;
112 } 112 }
113 113
114 -int SrsConsumer::jitter_correct(SrsSharedPtrMessage* msg) 114 +int SrsConsumer::jitter_correct(SrsSharedPtrMessage* msg, int audio_sample_rate)
115 { 115 {
116 int ret = ERROR_SUCCESS; 116 int ret = ERROR_SUCCESS;
117 117
@@ -130,10 +130,15 @@ int SrsConsumer::jitter_correct(SrsSharedPtrMessage* msg) @@ -130,10 +130,15 @@ int SrsConsumer::jitter_correct(SrsSharedPtrMessage* msg)
130 130
131 // if jitter detected, reset the delta. 131 // if jitter detected, reset the delta.
132 if (delta < 0 || delta > CONST_MAX_JITTER_MS) { 132 if (delta < 0 || delta > CONST_MAX_JITTER_MS) {
133 - delta = DEFAULT_FRAME_TIME_MS; 133 + // calc the right diff by audio sample rate
  134 + if (msg->header.is_audio() && audio_sample_rate > 0) {
  135 + delta = (int32_t)(delta * 1000.0 / audio_sample_rate);
  136 + } else {
  137 + delta = DEFAULT_FRAME_TIME_MS;
  138 + }
134 139
135 - srs_info("jitter detected, delta=%d, last_pkt=%d, time=%d, correct_to=%d",  
136 - delta, last_pkt_time, time, last_pkt_correct_time + delta); 140 + srs_info("jitter detected, last_pts=%d, pts=%d, diff=%d, last_time=%d, time=%d, diff=%d",
  141 + last_pkt_time, time, time - last_pkt_time, last_pkt_correct_time, last_pkt_correct_time + delta, delta);
137 } else { 142 } else {
138 srs_verbose("timestamp no jitter. time=%d, last_pkt=%d, correct_to=%d", 143 srs_verbose("timestamp no jitter. time=%d, last_pkt=%d, correct_to=%d",
139 time, last_pkt_time, last_pkt_correct_time + delta); 144 time, last_pkt_time, last_pkt_correct_time + delta);
@@ -155,6 +160,8 @@ SrsSource::SrsSource(std::string _stream_url) @@ -155,6 +160,8 @@ SrsSource::SrsSource(std::string _stream_url)
155 160
156 cached_video_count = 0; 161 cached_video_count = 0;
157 enable_gop_cache = true; 162 enable_gop_cache = true;
  163 +
  164 + audio_sample_rate = 0;
158 } 165 }
159 166
160 SrsSource::~SrsSource() 167 SrsSource::~SrsSource()
@@ -182,6 +189,13 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata @@ -182,6 +189,13 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata
182 metadata->metadata->set("server", new SrsAmf0String( 189 metadata->metadata->set("server", new SrsAmf0String(
183 RTMP_SIG_SRS_KEY" "RTMP_SIG_SRS_VERSION" ("RTMP_SIG_SRS_URL_SHORT")")); 190 RTMP_SIG_SRS_KEY" "RTMP_SIG_SRS_VERSION" ("RTMP_SIG_SRS_URL_SHORT")"));
184 191
  192 + SrsAmf0Any* prop = NULL;
  193 + if ((prop = metadata->metadata->get_property("audiosamplerate")) != NULL) {
  194 + if (prop->is_number()) {
  195 + audio_sample_rate = (int)(srs_amf0_convert<SrsAmf0Number>(prop)->value);
  196 + }
  197 + }
  198 +
185 // encode the metadata to payload 199 // encode the metadata to payload
186 int size = metadata->get_payload_length(); 200 int size = metadata->get_payload_length();
187 if (size <= 0) { 201 if (size <= 0) {
@@ -214,7 +228,7 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata @@ -214,7 +228,7 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata
214 std::vector<SrsConsumer*>::iterator it; 228 std::vector<SrsConsumer*>::iterator it;
215 for (it = consumers.begin(); it != consumers.end(); ++it) { 229 for (it = consumers.begin(); it != consumers.end(); ++it) {
216 SrsConsumer* consumer = *it; 230 SrsConsumer* consumer = *it;
217 - if ((ret = consumer->enqueue(cache_metadata->copy())) != ERROR_SUCCESS) { 231 + if ((ret = consumer->enqueue(cache_metadata->copy(), audio_sample_rate)) != ERROR_SUCCESS) {
218 srs_error("dispatch the metadata failed. ret=%d", ret); 232 srs_error("dispatch the metadata failed. ret=%d", ret);
219 return ret; 233 return ret;
220 } 234 }
@@ -244,7 +258,7 @@ int SrsSource::on_audio(SrsCommonMessage* audio) @@ -244,7 +258,7 @@ int SrsSource::on_audio(SrsCommonMessage* audio)
244 std::vector<SrsConsumer*>::iterator it; 258 std::vector<SrsConsumer*>::iterator it;
245 for (it = consumers.begin(); it != consumers.end(); ++it) { 259 for (it = consumers.begin(); it != consumers.end(); ++it) {
246 SrsConsumer* consumer = *it; 260 SrsConsumer* consumer = *it;
247 - if ((ret = consumer->enqueue(msg->copy())) != ERROR_SUCCESS) { 261 + if ((ret = consumer->enqueue(msg->copy(), audio_sample_rate)) != ERROR_SUCCESS) {
248 srs_error("dispatch the audio failed. ret=%d", ret); 262 srs_error("dispatch the audio failed. ret=%d", ret);
249 return ret; 263 return ret;
250 } 264 }
@@ -288,7 +302,7 @@ int SrsSource::on_video(SrsCommonMessage* video) @@ -288,7 +302,7 @@ int SrsSource::on_video(SrsCommonMessage* video)
288 std::vector<SrsConsumer*>::iterator it; 302 std::vector<SrsConsumer*>::iterator it;
289 for (it = consumers.begin(); it != consumers.end(); ++it) { 303 for (it = consumers.begin(); it != consumers.end(); ++it) {
290 SrsConsumer* consumer = *it; 304 SrsConsumer* consumer = *it;
291 - if ((ret = consumer->enqueue(msg->copy())) != ERROR_SUCCESS) { 305 + if ((ret = consumer->enqueue(msg->copy(), audio_sample_rate)) != ERROR_SUCCESS) {
292 srs_error("dispatch the video failed. ret=%d", ret); 306 srs_error("dispatch the video failed. ret=%d", ret);
293 return ret; 307 return ret;
294 } 308 }
@@ -319,19 +333,19 @@ int SrsSource::on_video(SrsCommonMessage* video) @@ -319,19 +333,19 @@ int SrsSource::on_video(SrsCommonMessage* video)
319 consumer = new SrsConsumer(this); 333 consumer = new SrsConsumer(this);
320 consumers.push_back(consumer); 334 consumers.push_back(consumer);
321 335
322 - if (cache_metadata && (ret = consumer->enqueue(cache_metadata->copy())) != ERROR_SUCCESS) { 336 + if (cache_metadata && (ret = consumer->enqueue(cache_metadata->copy(), audio_sample_rate)) != ERROR_SUCCESS) {
323 srs_error("dispatch metadata failed. ret=%d", ret); 337 srs_error("dispatch metadata failed. ret=%d", ret);
324 return ret; 338 return ret;
325 } 339 }
326 srs_info("dispatch metadata success"); 340 srs_info("dispatch metadata success");
327 341
328 - if (cache_sh_video && (ret = consumer->enqueue(cache_sh_video->copy())) != ERROR_SUCCESS) { 342 + if (cache_sh_video && (ret = consumer->enqueue(cache_sh_video->copy(), audio_sample_rate)) != ERROR_SUCCESS) {
329 srs_error("dispatch video sequence header failed. ret=%d", ret); 343 srs_error("dispatch video sequence header failed. ret=%d", ret);
330 return ret; 344 return ret;
331 } 345 }
332 srs_info("dispatch video sequence header success"); 346 srs_info("dispatch video sequence header success");
333 347
334 - if (cache_sh_audio && (ret = consumer->enqueue(cache_sh_audio->copy())) != ERROR_SUCCESS) { 348 + if (cache_sh_audio && (ret = consumer->enqueue(cache_sh_audio->copy(), audio_sample_rate)) != ERROR_SUCCESS) {
335 srs_error("dispatch audio sequence header failed. ret=%d", ret); 349 srs_error("dispatch audio sequence header failed. ret=%d", ret);
336 return ret; 350 return ret;
337 } 351 }
@@ -340,7 +354,7 @@ int SrsSource::on_video(SrsCommonMessage* video) @@ -340,7 +354,7 @@ int SrsSource::on_video(SrsCommonMessage* video)
340 std::vector<SrsSharedPtrMessage*>::iterator it; 354 std::vector<SrsSharedPtrMessage*>::iterator it;
341 for (it = gop_cache.begin(); it != gop_cache.end(); ++it) { 355 for (it = gop_cache.begin(); it != gop_cache.end(); ++it) {
342 SrsSharedPtrMessage* msg = *it; 356 SrsSharedPtrMessage* msg = *it;
343 - if ((ret = consumer->enqueue(msg->copy())) != ERROR_SUCCESS) { 357 + if ((ret = consumer->enqueue(msg->copy(), audio_sample_rate)) != ERROR_SUCCESS) {
344 srs_error("dispatch cached gop failed. ret=%d", ret); 358 srs_error("dispatch cached gop failed. ret=%d", ret);
345 return ret; 359 return ret;
346 } 360 }
@@ -61,7 +61,7 @@ public: @@ -61,7 +61,7 @@ public:
61 /** 61 /**
62 * enqueue an shared ptr message. 62 * enqueue an shared ptr message.
63 */ 63 */
64 - virtual int enqueue(SrsSharedPtrMessage* msg); 64 + virtual int enqueue(SrsSharedPtrMessage* msg, int audio_sample_rate);
65 /** 65 /**
66 * get packets in consumer queue. 66 * get packets in consumer queue.
67 * @pmsgs SrsMessages*[], output the prt array. 67 * @pmsgs SrsMessages*[], output the prt array.
@@ -74,7 +74,7 @@ private: @@ -74,7 +74,7 @@ private:
74 * detect the time jitter and correct it. 74 * detect the time jitter and correct it.
75 * @doc update the README.cmd 75 * @doc update the README.cmd
76 */ 76 */
77 - virtual int jitter_correct(SrsSharedPtrMessage* msg); 77 + virtual int jitter_correct(SrsSharedPtrMessage* msg, int audio_sample_rate);
78 }; 78 };
79 79
80 /** 80 /**
@@ -113,6 +113,11 @@ private: @@ -113,6 +113,11 @@ private:
113 */ 113 */
114 std::vector<SrsSharedPtrMessage*> gop_cache; 114 std::vector<SrsSharedPtrMessage*> gop_cache;
115 private: 115 private:
  116 + /**
  117 + * the sample rate of audio in metadata.
  118 + */
  119 + int audio_sample_rate;
  120 +private:
116 SrsSharedPtrMessage* cache_metadata; 121 SrsSharedPtrMessage* cache_metadata;
117 // the cached video sequence header. 122 // the cached video sequence header.
118 SrsSharedPtrMessage* cache_sh_video; 123 SrsSharedPtrMessage* cache_sh_video;