胡斌

implement save meta,need more test

... ... @@ -259,13 +259,35 @@ int SrsFlvSegment::write_metadata(SrsSharedPtrMessage* metadata)
}
// to flv file.
if ((ret = enc->write_metadata(18, payload, size)) != ERROR_SUCCESS) {
if ((ret = enc->write_metadata(0, payload, size)) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
int SrsFlvSegment::write_data(SrsSharedPtrMessage* shared_data)
{
int ret = ERROR_SUCCESS;
SrsSharedPtrMessage* data = shared_data->copy();
SrsAutoFree(SrsSharedPtrMessage, data);
if ((jitter->correct(data, jitter_algorithm)) != ERROR_SUCCESS) {
return ret;
}
char* payload = data->payload;
int size = data->size;
int64_t timestamp = plan->filter_timestamp(data->timestamp);
if ((ret = enc->write_metadata( timestamp , payload, size)) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
int SrsFlvSegment::write_audio(SrsSharedPtrMessage* shared_audio)
{
int ret = ERROR_SUCCESS;
... ... @@ -291,6 +313,8 @@ int SrsFlvSegment::write_audio(SrsSharedPtrMessage* shared_audio)
return ret;
}
int SrsFlvSegment::write_video(SrsSharedPtrMessage* shared_video)
{
int ret = ERROR_SUCCESS;
... ... @@ -601,6 +625,17 @@ int SrsDvrPlan::on_meta_data(SrsSharedPtrMessage* shared_metadata)
return segment->write_metadata(shared_metadata);
}
int SrsDvrPlan::on_data(SrsSharedPtrMessage* shared_metadata)
{
int ret = ERROR_SUCCESS;
if (!dvr_enabled) {
return ret;
}
return segment->write_data(shared_metadata);
}
int SrsDvrPlan::on_audio(SrsSharedPtrMessage* shared_audio)
{
int ret = ERROR_SUCCESS;
... ... @@ -1054,6 +1089,16 @@ int SrsDvr::on_meta_data(SrsOnMetaDataPacket* m)
return ret;
}
int SrsDvr::on_data(SrsSharedPtrMessage* shared_data)
{
// the dvr for this stream is not actived.
if (!actived) {
return ERROR_SUCCESS;
}
return plan->on_data(shared_data);
}
int SrsDvr::on_audio(SrsSharedPtrMessage* shared_audio)
{
// the dvr for this stream is not actived.
... ...
... ... @@ -142,6 +142,10 @@ public:
*/
virtual int write_metadata(SrsSharedPtrMessage* metadata);
/**
* @param shared_data, directly ptr, copy it if need to save it.
*/
virtual int write_data(SrsSharedPtrMessage* shared_data);
/**
* @param shared_audio, directly ptr, copy it if need to save it.
*/
virtual int write_audio(SrsSharedPtrMessage* shared_audio);
... ... @@ -222,6 +226,10 @@ public:
*/
virtual int on_meta_data(SrsSharedPtrMessage* shared_metadata);
/**
* when got metadata.
*/
virtual int on_data(SrsSharedPtrMessage* shared_metadata);
/**
* @param shared_audio, directly ptr, copy it if need to save it.
*/
virtual int on_audio(SrsSharedPtrMessage* shared_audio);
... ... @@ -335,6 +343,11 @@ public:
*/
virtual int on_meta_data(SrsOnMetaDataPacket* m);
/**
* the data packets to dvr.
* @param shared_data, directly ptr, copy it if need to save it.
*/
virtual int on_data(SrsSharedPtrMessage* shared_data);
/**
* mux the audio packets to dvr.
* @param shared_audio, directly ptr, copy it if need to save it.
*/
... ...
... ... @@ -266,7 +266,7 @@ int SrsFlvStreamEncoder::write_video(int64_t timestamp, char* data, int size)
int SrsFlvStreamEncoder::write_metadata(int64_t timestamp, char* data, int size)
{
return enc->write_metadata(SrsCodecFlvTagScript, data, size);
return enc->write_metadata(timestamp, data, size);
}
bool SrsFlvStreamEncoder::has_cache()
... ...
... ... @@ -1298,6 +1298,7 @@ int SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* ms
// process onMetaData
if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
#if 0
SrsPacket* pkt = NULL;
if ((ret = rtmp->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
srs_error("decode onMetaData message failed. ret=%d", ret);
... ... @@ -1317,6 +1318,14 @@ int SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* ms
srs_info("ignore AMF0/AMF3 data message.");
return ret;
#else
if ((ret = source->on_data(msg)) != ERROR_SUCCESS) {
srs_error("source process onMetaData message failed. ret=%d", ret);
return ret;
}
srs_info("process onMetaData message success.");
return ret;
#endif
}
return ret;
... ...
... ... @@ -948,6 +948,31 @@ int SrsOriginHub::on_original_metadata(SrsOnMetaDataPacket* metadata)
return ret;
}
int SrsOriginHub::on_original_data(SrsSharedPtrMessage* shared_data)
{
int ret = ERROR_SUCCESS;
#ifdef SRS_AUTO_DVR
if (shared_data && (ret = dvr->on_data(shared_data)) != ERROR_SUCCESS) {
srs_error("dvr process onData message failed. ret=%d", ret);
return ret;
}
#endif
// copy to all forwarders
if (true) {
std::vector<SrsForwarder*>::iterator it;
for (it = forwarders.begin(); it != forwarders.end(); ++it) {
SrsForwarder* forwarder = *it;
if ((ret = forwarder->on_meta_data(shared_data)) != ERROR_SUCCESS) {
srs_error("forwarder process onMetaData message failed. ret=%d", ret);
return ret;
}
}
}
return ret;
}
int SrsOriginHub::on_meta_data(SrsSharedPtrMessage* shared_metadata)
{
int ret = ERROR_SUCCESS;
... ... @@ -1972,6 +1997,27 @@ bool SrsSource::can_publish(bool is_edge)
return _can_publish;
}
int SrsSource::on_data(SrsCommonMessage* shared_data)
{
int ret = ERROR_SUCCESS;
last_packet_time = shared_data->header.timestamp;
// convert shared_audio to msg, user should not use shared_audio again.
// the payload is transfer to msg, and set to NULL in shared_audio.
SrsSharedPtrMessage msg;
if ((ret = msg.create(shared_data)) != ERROR_SUCCESS) {
srs_error("initialize the audio failed. ret=%d", ret);
return ret;
}
// Notify hub about the original metadata.
if ((ret = hub->on_original_data(&msg)) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata)
{
int ret = ERROR_SUCCESS;
... ... @@ -1980,7 +2026,7 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata
if ((ret = hub->on_original_metadata(metadata)) != ERROR_SUCCESS) {
return ret;
}
// if allow atc_auto and bravo-atc detected, open atc for vhost.
SrsAmf0Any* prop = NULL;
atc = _srs_config->get_atc(req->vhost);
... ... @@ -2007,7 +2053,7 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata
drop_for_reduce = true;
srs_warn("drop for reduce sh metadata, size=%d", msg->size);
}
// copy to all consumer
if (!drop_for_reduce) {
std::vector<SrsConsumer*>::iterator it;
... ...
... ... @@ -458,6 +458,8 @@ public:
public:
// When got a original metadata.
virtual int on_original_metadata(SrsOnMetaDataPacket* metadata);
// When got a original metadata.
virtual int on_original_data(SrsSharedPtrMessage* shared_data);
// When got a parsed metadata.
virtual int on_meta_data(SrsSharedPtrMessage* shared_metadata);
// When got a parsed audio packet.
... ... @@ -645,6 +647,7 @@ public:
virtual bool can_publish(bool is_edge);
virtual int on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata);
public:
virtual int on_data(SrsCommonMessage* data);
virtual int on_audio(SrsCommonMessage* audio);
private:
virtual int on_audio_imp(SrsSharedPtrMessage* audio);
... ...
... ... @@ -420,13 +420,13 @@ int SrsFlvEncoder::write_header(char flv_header[9])
return ret;
}
int SrsFlvEncoder::write_metadata(char type, char* data, int size)
int SrsFlvEncoder::write_metadata(int64_t timestamp, char* data, int size)
{
int ret = ERROR_SUCCESS;
srs_assert(data);
if ((ret = write_metadata_to_cache(type, data, size, tag_header)) != ERROR_SUCCESS) {
if ((ret = write_metadata_to_cache(timestamp, data, size, tag_header)) != ERROR_SUCCESS) {
return ret;
}
... ... @@ -532,7 +532,7 @@ int SrsFlvEncoder::write_tags(SrsSharedPtrMessage** msgs, int count)
return ret;
}
} else {
if ((ret = write_metadata_to_cache(SrsCodecFlvTagScript, msg->payload, msg->size, cache)) != ERROR_SUCCESS) {
if ((ret = write_metadata_to_cache(msg->timestamp, msg->payload, msg->size, cache)) != ERROR_SUCCESS) {
return ret;
}
}
... ... @@ -567,7 +567,7 @@ int SrsFlvEncoder::write_tags(SrsSharedPtrMessage** msgs, int count)
}
#endif
int SrsFlvEncoder::write_metadata_to_cache(char type, char* data, int size, char* cache)
int SrsFlvEncoder::write_metadata_to_cache(int64_t timestamp, char* data, int size, char* cache)
{
int ret = ERROR_SUCCESS;
... ... @@ -586,10 +586,11 @@ int SrsFlvEncoder::write_metadata_to_cache(char type, char* data, int size, char
if ((ret = tag_stream->initialize(cache, 11)) != ERROR_SUCCESS) {
return ret;
}
tag_stream->write_1bytes(type);
tag_stream->write_1bytes(SrsCodecFlvTagScript);
tag_stream->write_3bytes(size);
tag_stream->write_3bytes(0x00);
tag_stream->write_1bytes(0x00);
tag_stream->write_3bytes((int32_t)timestamp);
// default to little-endian
tag_stream->write_1bytes((timestamp >> 24) & 0xFF);
tag_stream->write_3bytes(0x00);
return ret;
... ...
... ... @@ -469,14 +469,13 @@ public:
virtual int write_header(char flv_header[9]);
/**
* write flv metadata.
* @param type, the type of data, or other message type.
* @see SrsCodecFlvTag
* @param timestamp
* @param data, the amf0 metadata which serialize from:
* AMF0 string: onMetaData,
* AMF0 object: the metadata object.
* @remark assert data is not NULL.
*/
virtual int write_metadata(char type, char* data, int size);
virtual int write_metadata(int64_t timestamp, char* data, int size);
/**
* write audio/video packet.
* @remark assert data is not NULL.
... ... @@ -508,7 +507,7 @@ public:
virtual int write_tags(SrsSharedPtrMessage** msgs, int count);
#endif
private:
virtual int write_metadata_to_cache(char type, char* data, int size, char* cache);
virtual int write_metadata_to_cache(int64_t timestamp, char* data, int size, char* cache);
virtual int write_audio_to_cache(int64_t timestamp, char* data, int size, char* cache);
virtual int write_video_to_cache(int64_t timestamp, char* data, int size, char* cache);
virtual int write_pts_to_cache(int size, char* cache);
... ...
... ... @@ -1735,7 +1735,7 @@ int srs_flv_write_tag(srs_flv_t flv, char type, int32_t time, char* data, int si
} else if (type == SRS_RTMP_TYPE_VIDEO) {
return context->enc.write_video(time, data, size);
} else {
return context->enc.write_metadata(type, data, size);
return context->enc.write_metadata(time, data, size);
}
return ret;
... ...
... ... @@ -697,13 +697,29 @@ int SrsProtocol::do_decode_message(SrsMessageHeader& header, SrsBuffer* stream,
int ret = ERROR_SUCCESS;
SrsPacket* packet = NULL;
if (header.is_amf0_data() || header.is_amf3_data()) {
if (header.is_amf3_data() && stream->require(1)) {
srs_verbose("skip 1bytes to decode DATA3 ");
stream->skip(1);
}
std::string name;
if ((ret = srs_amf0_read_string(stream, name)) != ERROR_SUCCESS) {
srs_error(
"decode AMF0/AMF3 script name failed. message type:%d ret=%d",
header.message_type, ret);
return ret;
}
srs_verbose("do docode script data:%s", name);
*ppacket = packet = new SrsOnMetaDataPacket();
return ret;
}
// decode specified packet type
if (header.is_amf0_command() || header.is_amf3_command() || header.is_amf0_data() || header.is_amf3_data()) {
if (header.is_amf0_command() || header.is_amf3_command()) {
srs_verbose("start to decode AMF0/AMF3 command message.");
// skip 1bytes to decode the amf3 command.
if ((header.is_amf3_command() || header.is_amf3_data()) && stream->require(1)) {
if (header.is_amf3_command() && stream->require(1)) {
srs_verbose("skip 1bytes to decode AMF3 command");
stream->skip(1);
}
... ...
... ... @@ -461,7 +461,7 @@ VOID TEST(KernelFlvTest, FlvEncoderWriteMetadata)
};
char pts[] = { (char)0x00, (char)0x00, (char)0x00, (char)19 };
ASSERT_TRUE(ERROR_SUCCESS == enc.write_metadata(18, md, 8));
ASSERT_TRUE(ERROR_SUCCESS == enc.write_metadata(0, md, 8));
ASSERT_TRUE(11 + 8 + 4 == fs.offset);
EXPECT_TRUE(srs_bytes_equals(tag_header, fs.data, 11));
... ...