胡斌

implement save meta,need more test

Conflicts:

	trunk/src/app/srs_app_source.cpp
	trunk/src/app/srs_app_source.hpp
	trunk/src/protocol/srs_rtmp_stack.cpp
@@ -259,13 +259,35 @@ int SrsFlvSegment::write_metadata(SrsSharedPtrMessage* metadata) @@ -259,13 +259,35 @@ int SrsFlvSegment::write_metadata(SrsSharedPtrMessage* metadata)
259 } 259 }
260 260
261 // to flv file. 261 // to flv file.
262 - if ((ret = enc->write_metadata(18, payload, size)) != ERROR_SUCCESS) { 262 + if ((ret = enc->write_metadata(0, payload, size)) != ERROR_SUCCESS) {
263 return ret; 263 return ret;
264 } 264 }
265 265
266 return ret; 266 return ret;
267 } 267 }
268 268
  269 +int SrsFlvSegment::write_data(SrsSharedPtrMessage* shared_data)
  270 +{
  271 + int ret = ERROR_SUCCESS;
  272 +
  273 + SrsSharedPtrMessage* data = shared_data->copy();
  274 + SrsAutoFree(SrsSharedPtrMessage, data);
  275 +
  276 + if ((jitter->correct(data, jitter_algorithm)) != ERROR_SUCCESS) {
  277 + return ret;
  278 + }
  279 +
  280 + char* payload = data->payload;
  281 + int size = data->size;
  282 + int64_t timestamp = plan->filter_timestamp(data->timestamp);
  283 + if ((ret = enc->write_metadata( timestamp , payload, size)) != ERROR_SUCCESS) {
  284 + return ret;
  285 + }
  286 +
  287 +
  288 + return ret;
  289 +}
  290 +
269 int SrsFlvSegment::write_audio(SrsSharedPtrMessage* shared_audio) 291 int SrsFlvSegment::write_audio(SrsSharedPtrMessage* shared_audio)
270 { 292 {
271 int ret = ERROR_SUCCESS; 293 int ret = ERROR_SUCCESS;
@@ -291,6 +313,8 @@ int SrsFlvSegment::write_audio(SrsSharedPtrMessage* shared_audio) @@ -291,6 +313,8 @@ int SrsFlvSegment::write_audio(SrsSharedPtrMessage* shared_audio)
291 return ret; 313 return ret;
292 } 314 }
293 315
  316 +
  317 +
294 int SrsFlvSegment::write_video(SrsSharedPtrMessage* shared_video) 318 int SrsFlvSegment::write_video(SrsSharedPtrMessage* shared_video)
295 { 319 {
296 int ret = ERROR_SUCCESS; 320 int ret = ERROR_SUCCESS;
@@ -605,6 +629,17 @@ int SrsDvrPlan::on_meta_data(SrsSharedPtrMessage* shared_metadata) @@ -605,6 +629,17 @@ int SrsDvrPlan::on_meta_data(SrsSharedPtrMessage* shared_metadata)
605 return segment->write_metadata(shared_metadata); 629 return segment->write_metadata(shared_metadata);
606 } 630 }
607 631
  632 +int SrsDvrPlan::on_data(SrsSharedPtrMessage* shared_metadata)
  633 +{
  634 + int ret = ERROR_SUCCESS;
  635 +
  636 + if (!dvr_enabled) {
  637 + return ret;
  638 + }
  639 +
  640 + return segment->write_data(shared_metadata);
  641 +}
  642 +
608 int SrsDvrPlan::on_audio(SrsSharedPtrMessage* shared_audio) 643 int SrsDvrPlan::on_audio(SrsSharedPtrMessage* shared_audio)
609 { 644 {
610 int ret = ERROR_SUCCESS; 645 int ret = ERROR_SUCCESS;
@@ -1038,6 +1073,11 @@ int SrsDvr::on_meta_data(SrsOnMetaDataPacket* m) @@ -1038,6 +1073,11 @@ int SrsDvr::on_meta_data(SrsOnMetaDataPacket* m)
1038 return ret; 1073 return ret;
1039 } 1074 }
1040 1075
  1076 +int SrsDvr::on_data(SrsSharedPtrMessage* shared_data)
  1077 +{
  1078 + return plan->on_data(shared_data);
  1079 +}
  1080 +
1041 int SrsDvr::on_audio(SrsSharedPtrMessage* shared_audio) 1081 int SrsDvr::on_audio(SrsSharedPtrMessage* shared_audio)
1042 { 1082 {
1043 return plan->on_audio(shared_audio); 1083 return plan->on_audio(shared_audio);
@@ -141,6 +141,10 @@ public: @@ -141,6 +141,10 @@ public:
141 */ 141 */
142 virtual int write_metadata(SrsSharedPtrMessage* metadata); 142 virtual int write_metadata(SrsSharedPtrMessage* metadata);
143 /** 143 /**
  144 + * @param shared_data, directly ptr, copy it if need to save it.
  145 + */
  146 + virtual int write_data(SrsSharedPtrMessage* shared_data);
  147 + /**
144 * @param shared_audio, directly ptr, copy it if need to save it. 148 * @param shared_audio, directly ptr, copy it if need to save it.
145 */ 149 */
146 virtual int write_audio(SrsSharedPtrMessage* shared_audio); 150 virtual int write_audio(SrsSharedPtrMessage* shared_audio);
@@ -221,6 +225,10 @@ public: @@ -221,6 +225,10 @@ public:
221 */ 225 */
222 virtual int on_meta_data(SrsSharedPtrMessage* shared_metadata); 226 virtual int on_meta_data(SrsSharedPtrMessage* shared_metadata);
223 /** 227 /**
  228 + * when got metadata.
  229 + */
  230 + virtual int on_data(SrsSharedPtrMessage* shared_metadata);
  231 + /**
224 * @param shared_audio, directly ptr, copy it if need to save it. 232 * @param shared_audio, directly ptr, copy it if need to save it.
225 */ 233 */
226 virtual int on_audio(SrsSharedPtrMessage* shared_audio); 234 virtual int on_audio(SrsSharedPtrMessage* shared_audio);
@@ -328,6 +336,11 @@ public: @@ -328,6 +336,11 @@ public:
328 */ 336 */
329 virtual int on_meta_data(SrsOnMetaDataPacket* m); 337 virtual int on_meta_data(SrsOnMetaDataPacket* m);
330 /** 338 /**
  339 + * the data packets to dvr.
  340 + * @param shared_data, directly ptr, copy it if need to save it.
  341 + */
  342 + virtual int on_data(SrsSharedPtrMessage* shared_data);
  343 + /**
331 * mux the audio packets to dvr. 344 * mux the audio packets to dvr.
332 * @param shared_audio, directly ptr, copy it if need to save it. 345 * @param shared_audio, directly ptr, copy it if need to save it.
333 */ 346 */
@@ -271,7 +271,7 @@ int SrsFlvStreamEncoder::write_video(int64_t timestamp, char* data, int size) @@ -271,7 +271,7 @@ int SrsFlvStreamEncoder::write_video(int64_t timestamp, char* data, int size)
271 271
272 int SrsFlvStreamEncoder::write_metadata(int64_t timestamp, char* data, int size) 272 int SrsFlvStreamEncoder::write_metadata(int64_t timestamp, char* data, int size)
273 { 273 {
274 - return enc->write_metadata(SrsCodecFlvTagScript, data, size); 274 + return enc->write_metadata(timestamp, data, size);
275 } 275 }
276 276
277 bool SrsFlvStreamEncoder::has_cache() 277 bool SrsFlvStreamEncoder::has_cache()
@@ -1063,6 +1063,7 @@ int SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* ms @@ -1063,6 +1063,7 @@ int SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* ms
1063 1063
1064 // process onMetaData 1064 // process onMetaData
1065 if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) { 1065 if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
  1066 +#if 0
1066 SrsPacket* pkt = NULL; 1067 SrsPacket* pkt = NULL;
1067 if ((ret = rtmp->decode_message(msg, &pkt)) != ERROR_SUCCESS) { 1068 if ((ret = rtmp->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
1068 srs_error("decode onMetaData message failed. ret=%d", ret); 1069 srs_error("decode onMetaData message failed. ret=%d", ret);
@@ -1082,6 +1083,14 @@ int SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* ms @@ -1082,6 +1083,14 @@ int SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* ms
1082 1083
1083 srs_info("ignore AMF0/AMF3 data message."); 1084 srs_info("ignore AMF0/AMF3 data message.");
1084 return ret; 1085 return ret;
  1086 +#else
  1087 + if ((ret = source->on_data(msg)) != ERROR_SUCCESS) {
  1088 + srs_error("source process onMetaData message failed. ret=%d", ret);
  1089 + return ret;
  1090 + }
  1091 + srs_info("process onMetaData message success.");
  1092 + return ret;
  1093 +#endif
1085 } 1094 }
1086 1095
1087 return ret; 1096 return ret;
@@ -1579,6 +1579,33 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata @@ -1579,6 +1579,33 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata
1579 return ret; 1579 return ret;
1580 } 1580 }
1581 1581
  1582 +int SrsSource::on_data(SrsCommonMessage* shared_data)
  1583 +{
  1584 + int ret = ERROR_SUCCESS;
  1585 + last_packet_time = shared_data->header.timestamp;
  1586 +
  1587 + // convert shared_audio to msg, user should not use shared_audio again.
  1588 + // the payload is transfer to msg, and set to NULL in shared_audio.
  1589 + SrsSharedPtrMessage msg;
  1590 + if ((ret = msg.create(shared_data)) != ERROR_SUCCESS) {
  1591 + srs_error("initialize the audio failed. ret=%d", ret);
  1592 + return ret;
  1593 + }
  1594 +#ifdef SRS_AUTO_DVR
  1595 + if ((ret = dvr->on_data(&msg)) != ERROR_SUCCESS) {
  1596 + srs_warn("dvr process data message failed, ignore and disable dvr. ret=%d", ret);
  1597 +
  1598 + // unpublish, ignore ret.
  1599 + dvr->on_unpublish();
  1600 +
  1601 + // ignore.
  1602 + ret = ERROR_SUCCESS;
  1603 + }
  1604 +#endif
  1605 +
  1606 + return ret;
  1607 +}
  1608 +
1582 int SrsSource::on_audio(SrsCommonMessage* shared_audio) 1609 int SrsSource::on_audio(SrsCommonMessage* shared_audio)
1583 { 1610 {
1584 int ret = ERROR_SUCCESS; 1611 int ret = ERROR_SUCCESS;
@@ -555,6 +555,8 @@ public: @@ -555,6 +555,8 @@ public:
555 virtual bool can_publish(bool is_edge); 555 virtual bool can_publish(bool is_edge);
556 virtual int on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata); 556 virtual int on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata);
557 public: 557 public:
  558 + virtual int on_data(SrsCommonMessage* data);
  559 +public:
558 virtual int on_audio(SrsCommonMessage* audio); 560 virtual int on_audio(SrsCommonMessage* audio);
559 private: 561 private:
560 virtual int on_audio_imp(SrsSharedPtrMessage* audio); 562 virtual int on_audio_imp(SrsSharedPtrMessage* audio);
@@ -416,13 +416,13 @@ int SrsFlvEncoder::write_header(char flv_header[9]) @@ -416,13 +416,13 @@ int SrsFlvEncoder::write_header(char flv_header[9])
416 return ret; 416 return ret;
417 } 417 }
418 418
419 -int SrsFlvEncoder::write_metadata(char type, char* data, int size) 419 +int SrsFlvEncoder::write_metadata(int64_t timestamp, char* data, int size)
420 { 420 {
421 int ret = ERROR_SUCCESS; 421 int ret = ERROR_SUCCESS;
422 422
423 srs_assert(data); 423 srs_assert(data);
424 424
425 - if ((ret = write_metadata_to_cache(type, data, size, tag_header)) != ERROR_SUCCESS) { 425 + if ((ret = write_metadata_to_cache(timestamp, data, size, tag_header)) != ERROR_SUCCESS) {
426 return ret; 426 return ret;
427 } 427 }
428 428
@@ -528,7 +528,7 @@ int SrsFlvEncoder::write_tags(SrsSharedPtrMessage** msgs, int count) @@ -528,7 +528,7 @@ int SrsFlvEncoder::write_tags(SrsSharedPtrMessage** msgs, int count)
528 return ret; 528 return ret;
529 } 529 }
530 } else { 530 } else {
531 - if ((ret = write_metadata_to_cache(SrsCodecFlvTagScript, msg->payload, msg->size, cache)) != ERROR_SUCCESS) { 531 + if ((ret = write_metadata_to_cache(msg->timestamp, msg->payload, msg->size, cache)) != ERROR_SUCCESS) {
532 return ret; 532 return ret;
533 } 533 }
534 } 534 }
@@ -563,7 +563,7 @@ int SrsFlvEncoder::write_tags(SrsSharedPtrMessage** msgs, int count) @@ -563,7 +563,7 @@ int SrsFlvEncoder::write_tags(SrsSharedPtrMessage** msgs, int count)
563 } 563 }
564 #endif 564 #endif
565 565
566 -int SrsFlvEncoder::write_metadata_to_cache(char type, char* data, int size, char* cache) 566 +int SrsFlvEncoder::write_metadata_to_cache(int64_t timestamp, char* data, int size, char* cache)
567 { 567 {
568 int ret = ERROR_SUCCESS; 568 int ret = ERROR_SUCCESS;
569 569
@@ -582,10 +582,11 @@ int SrsFlvEncoder::write_metadata_to_cache(char type, char* data, int size, char @@ -582,10 +582,11 @@ int SrsFlvEncoder::write_metadata_to_cache(char type, char* data, int size, char
582 if ((ret = tag_stream->initialize(cache, 11)) != ERROR_SUCCESS) { 582 if ((ret = tag_stream->initialize(cache, 11)) != ERROR_SUCCESS) {
583 return ret; 583 return ret;
584 } 584 }
585 - tag_stream->write_1bytes(type); 585 + tag_stream->write_1bytes(SrsCodecFlvTagScript);
586 tag_stream->write_3bytes(size); 586 tag_stream->write_3bytes(size);
587 - tag_stream->write_3bytes(0x00);  
588 - tag_stream->write_1bytes(0x00); 587 + tag_stream->write_3bytes((int32_t)timestamp);
  588 + // default to little-endian
  589 + tag_stream->write_1bytes((timestamp >> 24) & 0xFF);
589 tag_stream->write_3bytes(0x00); 590 tag_stream->write_3bytes(0x00);
590 591
591 return ret; 592 return ret;
@@ -460,14 +460,13 @@ public: @@ -460,14 +460,13 @@ public:
460 virtual int write_header(char flv_header[9]); 460 virtual int write_header(char flv_header[9]);
461 /** 461 /**
462 * write flv metadata. 462 * write flv metadata.
463 - * @param type, the type of data, or other message type.  
464 - * @see SrsCodecFlvTag 463 + * @param timestamp
465 * @param data, the amf0 metadata which serialize from: 464 * @param data, the amf0 metadata which serialize from:
466 * AMF0 string: onMetaData, 465 * AMF0 string: onMetaData,
467 * AMF0 object: the metadata object. 466 * AMF0 object: the metadata object.
468 * @remark assert data is not NULL. 467 * @remark assert data is not NULL.
469 */ 468 */
470 - virtual int write_metadata(char type, char* data, int size); 469 + virtual int write_metadata(int64_t timestamp, char* data, int size);
471 /** 470 /**
472 * write audio/video packet. 471 * write audio/video packet.
473 * @remark assert data is not NULL. 472 * @remark assert data is not NULL.
@@ -499,7 +498,7 @@ public: @@ -499,7 +498,7 @@ public:
499 virtual int write_tags(SrsSharedPtrMessage** msgs, int count); 498 virtual int write_tags(SrsSharedPtrMessage** msgs, int count);
500 #endif 499 #endif
501 private: 500 private:
502 - virtual int write_metadata_to_cache(char type, char* data, int size, char* cache); 501 + virtual int write_metadata_to_cache(int64_t timestamp, char* data, int size, char* cache);
503 virtual int write_audio_to_cache(int64_t timestamp, char* data, int size, char* cache); 502 virtual int write_audio_to_cache(int64_t timestamp, char* data, int size, char* cache);
504 virtual int write_video_to_cache(int64_t timestamp, char* data, int size, char* cache); 503 virtual int write_video_to_cache(int64_t timestamp, char* data, int size, char* cache);
505 virtual int write_pts_to_cache(int size, char* cache); 504 virtual int write_pts_to_cache(int size, char* cache);
@@ -1659,7 +1659,7 @@ int srs_flv_write_tag(srs_flv_t flv, char type, int32_t time, char* data, int si @@ -1659,7 +1659,7 @@ int srs_flv_write_tag(srs_flv_t flv, char type, int32_t time, char* data, int si
1659 } else if (type == SRS_RTMP_TYPE_VIDEO) { 1659 } else if (type == SRS_RTMP_TYPE_VIDEO) {
1660 return context->enc.write_video(time, data, size); 1660 return context->enc.write_video(time, data, size);
1661 } else { 1661 } else {
1662 - return context->enc.write_metadata(type, data, size); 1662 + return context->enc.write_metadata(time, data, size);
1663 } 1663 }
1664 1664
1665 return ret; 1665 return ret;
@@ -691,9 +691,25 @@ int SrsProtocol::do_decode_message(SrsMessageHeader& header, SrsStream* stream, @@ -691,9 +691,25 @@ int SrsProtocol::do_decode_message(SrsMessageHeader& header, SrsStream* stream,
691 int ret = ERROR_SUCCESS; 691 int ret = ERROR_SUCCESS;
692 692
693 SrsPacket* packet = NULL; 693 SrsPacket* packet = NULL;
694 - 694 + if (header.is_amf0_data() || header.is_amf3_data()) {
  695 + if (header.is_amf3_data() && stream->require(1)) {
  696 + srs_verbose("skip 1bytes to decode DATA3 ");
  697 + stream->skip(1);
  698 + }
  699 + std::string name;
  700 + if ((ret = srs_amf0_read_string(stream, name)) != ERROR_SUCCESS) {
  701 + srs_error(
  702 + "decode AMF0/AMF3 script name failed. message type:%d ret=%d",
  703 + header.message_type, ret);
  704 + return ret;
  705 + }
  706 + srs_verbose("do docode script data:%s", name);
  707 + *ppacket = packet = new SrsOnMetaDataPacket();
  708 + return ret;
  709 + }
  710 +
695 // decode specified packet type 711 // decode specified packet type
696 - if (header.is_amf0_command() || header.is_amf3_command() || header.is_amf0_data() || header.is_amf3_data()) { 712 + if (header.is_amf0_command() || header.is_amf3_command()) {
697 srs_verbose("start to decode AMF0/AMF3 command message."); 713 srs_verbose("start to decode AMF0/AMF3 command message.");
698 714
699 // skip 1bytes to decode the amf3 command. 715 // skip 1bytes to decode the amf3 command.
@@ -460,7 +460,7 @@ VOID TEST(KernelFlvTest, FlvEncoderWriteMetadata) @@ -460,7 +460,7 @@ VOID TEST(KernelFlvTest, FlvEncoderWriteMetadata)
460 }; 460 };
461 char pts[] = { (char)0x00, (char)0x00, (char)0x00, (char)19 }; 461 char pts[] = { (char)0x00, (char)0x00, (char)0x00, (char)19 };
462 462
463 - ASSERT_TRUE(ERROR_SUCCESS == enc.write_metadata(18, md, 8)); 463 + ASSERT_TRUE(ERROR_SUCCESS == enc.write_metadata(0, md, 8));
464 ASSERT_TRUE(11 + 8 + 4 == fs.offset); 464 ASSERT_TRUE(11 + 8 + 4 == fs.offset);
465 465
466 EXPECT_TRUE(srs_bytes_equals(tag_header, fs.data, 11)); 466 EXPECT_TRUE(srs_bytes_equals(tag_header, fs.data, 11));