winlin

refine shared ptr message, rename initialize to create

@@ -504,17 +504,15 @@ int SrsEdgeForwarder::proxy(SrsMessage* msg) @@ -504,17 +504,15 @@ int SrsEdgeForwarder::proxy(SrsMessage* msg)
504 return ret; 504 return ret;
505 } 505 }
506 506
507 - // TODO: FIXME: use utility to copy msg to shared ptr msg.  
508 - SrsSharedPtrMessage* copy = new SrsSharedPtrMessage();  
509 - SrsAutoFree(SrsSharedPtrMessage, copy);  
510 - if ((ret = copy->initialize(msg)) != ERROR_SUCCESS) { 507 + SrsSharedPtrMessage copy;
  508 + if ((ret = copy.create(msg)) != ERROR_SUCCESS) {
511 srs_error("initialize the msg failed. ret=%d", ret); 509 srs_error("initialize the msg failed. ret=%d", ret);
512 return ret; 510 return ret;
513 } 511 }
514 srs_verbose("initialize shared ptr msg success."); 512 srs_verbose("initialize shared ptr msg success.");
515 513
516 - copy->header.stream_id = stream_id;  
517 - if ((ret = queue->enqueue(copy->copy())) != ERROR_SUCCESS) { 514 + copy.header.stream_id = stream_id;
  515 + if ((ret = queue->enqueue(copy.copy())) != ERROR_SUCCESS) {
518 srs_error("enqueue edge publish msg failed. ret=%d", ret); 516 srs_error("enqueue edge publish msg failed. ret=%d", ret);
519 } 517 }
520 518
@@ -972,7 +972,8 @@ int SrsSource::on_meta_data(SrsMessage* msg, SrsOnMetaDataPacket* metadata) @@ -972,7 +972,8 @@ int SrsSource::on_meta_data(SrsMessage* msg, SrsOnMetaDataPacket* metadata)
972 cache_metadata = new SrsSharedPtrMessage(); 972 cache_metadata = new SrsSharedPtrMessage();
973 973
974 // dump message to shared ptr message. 974 // dump message to shared ptr message.
975 - if ((ret = cache_metadata->initialize(&msg->header, payload, size)) != ERROR_SUCCESS) { 975 + // the payload/size managed by cache_metadata, user should not free it.
  976 + if ((ret = cache_metadata->create(&msg->header, payload, size)) != ERROR_SUCCESS) {
976 srs_error("initialize the cache metadata failed. ret=%d", ret); 977 srs_error("initialize the cache metadata failed. ret=%d", ret);
977 return ret; 978 return ret;
978 } 979 }
@@ -1007,20 +1008,21 @@ int SrsSource::on_meta_data(SrsMessage* msg, SrsOnMetaDataPacket* metadata) @@ -1007,20 +1008,21 @@ int SrsSource::on_meta_data(SrsMessage* msg, SrsOnMetaDataPacket* metadata)
1007 return ret; 1008 return ret;
1008 } 1009 }
1009 1010
1010 -int SrsSource::on_audio(SrsMessage* audio) 1011 +int SrsSource::on_audio(SrsMessage* __audio)
1011 { 1012 {
1012 int ret = ERROR_SUCCESS; 1013 int ret = ERROR_SUCCESS;
1013 1014
1014 - SrsSharedPtrMessage* msg = new SrsSharedPtrMessage();  
1015 - SrsAutoFree(SrsSharedPtrMessage, msg);  
1016 - if ((ret = msg->initialize(audio)) != ERROR_SUCCESS) { 1015 + // convert __audio to msg, user should not use __audio again.
  1016 + // the payload is transfer to msg, and set to NULL in __audio.
  1017 + SrsSharedPtrMessage msg;
  1018 + if ((ret = msg.create(__audio)) != ERROR_SUCCESS) {
1017 srs_error("initialize the audio failed. ret=%d", ret); 1019 srs_error("initialize the audio failed. ret=%d", ret);
1018 return ret; 1020 return ret;
1019 } 1021 }
1020 srs_verbose("initialize shared ptr audio success."); 1022 srs_verbose("initialize shared ptr audio success.");
1021 1023
1022 #ifdef SRS_AUTO_HLS 1024 #ifdef SRS_AUTO_HLS
1023 - if ((ret = hls->on_audio(msg->copy())) != ERROR_SUCCESS) { 1025 + if ((ret = hls->on_audio(msg.copy())) != ERROR_SUCCESS) {
1024 srs_warn("hls process audio message failed, ignore and disable hls. ret=%d", ret); 1026 srs_warn("hls process audio message failed, ignore and disable hls. ret=%d", ret);
1025 1027
1026 // unpublish, ignore ret. 1028 // unpublish, ignore ret.
@@ -1032,7 +1034,7 @@ int SrsSource::on_audio(SrsMessage* audio) @@ -1032,7 +1034,7 @@ int SrsSource::on_audio(SrsMessage* audio)
1032 #endif 1034 #endif
1033 1035
1034 #ifdef SRS_AUTO_DVR 1036 #ifdef SRS_AUTO_DVR
1035 - if ((ret = dvr->on_audio(msg->copy())) != ERROR_SUCCESS) { 1037 + if ((ret = dvr->on_audio(msg.copy())) != ERROR_SUCCESS) {
1036 srs_warn("dvr process audio message failed, ignore and disable dvr. ret=%d", ret); 1038 srs_warn("dvr process audio message failed, ignore and disable dvr. ret=%d", ret);
1037 1039
1038 // unpublish, ignore ret. 1040 // unpublish, ignore ret.
@@ -1047,7 +1049,7 @@ int SrsSource::on_audio(SrsMessage* audio) @@ -1047,7 +1049,7 @@ int SrsSource::on_audio(SrsMessage* audio)
1047 if (true) { 1049 if (true) {
1048 for (int i = 0; i < (int)consumers.size(); i++) { 1050 for (int i = 0; i < (int)consumers.size(); i++) {
1049 SrsConsumer* consumer = consumers.at(i); 1051 SrsConsumer* consumer = consumers.at(i);
1050 - SrsSharedPtrMessage* copy = msg->copy(); 1052 + SrsSharedPtrMessage* copy = msg.copy();
1051 if ((ret = consumer->enqueue(copy, atc, sample_rate, frame_rate, jitter_algorithm)) != ERROR_SUCCESS) { 1053 if ((ret = consumer->enqueue(copy, atc, sample_rate, frame_rate, jitter_algorithm)) != ERROR_SUCCESS) {
1052 srs_error("dispatch the audio failed. ret=%d", ret); 1054 srs_error("dispatch the audio failed. ret=%d", ret);
1053 return ret; 1055 return ret;
@@ -1061,7 +1063,7 @@ int SrsSource::on_audio(SrsMessage* audio) @@ -1061,7 +1063,7 @@ int SrsSource::on_audio(SrsMessage* audio)
1061 std::vector<SrsForwarder*>::iterator it; 1063 std::vector<SrsForwarder*>::iterator it;
1062 for (it = forwarders.begin(); it != forwarders.end(); ++it) { 1064 for (it = forwarders.begin(); it != forwarders.end(); ++it) {
1063 SrsForwarder* forwarder = *it; 1065 SrsForwarder* forwarder = *it;
1064 - if ((ret = forwarder->on_audio(msg->copy())) != ERROR_SUCCESS) { 1066 + if ((ret = forwarder->on_audio(msg.copy())) != ERROR_SUCCESS) {
1065 srs_error("forwarder process audio message failed. ret=%d", ret); 1067 srs_error("forwarder process audio message failed. ret=%d", ret);
1066 return ret; 1068 return ret;
1067 } 1069 }
@@ -1070,14 +1072,14 @@ int SrsSource::on_audio(SrsMessage* audio) @@ -1070,14 +1072,14 @@ int SrsSource::on_audio(SrsMessage* audio)
1070 1072
1071 // cache the sequence header if h264 1073 // cache the sequence header if h264
1072 // donot cache the sequence header to gop_cache, return here. 1074 // donot cache the sequence header to gop_cache, return here.
1073 - if (SrsFlvCodec::audio_is_sequence_header(msg->payload, msg->size)) { 1075 + if (SrsFlvCodec::audio_is_sequence_header(msg.payload, msg.size)) {
1074 srs_freep(cache_sh_audio); 1076 srs_freep(cache_sh_audio);
1075 - cache_sh_audio = msg->copy(); 1077 + cache_sh_audio = msg.copy();
1076 1078
1077 // parse detail audio codec 1079 // parse detail audio codec
1078 SrsAvcAacCodec codec; 1080 SrsAvcAacCodec codec;
1079 SrsCodecSample sample; 1081 SrsCodecSample sample;
1080 - if ((ret = codec.audio_aac_demux(msg->payload, msg->size, &sample)) != ERROR_SUCCESS) { 1082 + if ((ret = codec.audio_aac_demux(msg.payload, msg.size, &sample)) != ERROR_SUCCESS) {
1081 srs_error("codec demux audio failed. ret=%d", ret); 1083 srs_error("codec demux audio failed. ret=%d", ret);
1082 return ret; 1084 return ret;
1083 } 1085 }
@@ -1087,7 +1089,7 @@ int SrsSource::on_audio(SrsMessage* audio) @@ -1087,7 +1089,7 @@ int SrsSource::on_audio(SrsMessage* audio)
1087 srs_trace("%dB audio sh, " 1089 srs_trace("%dB audio sh, "
1088 "codec(%d, profile=%d, %dchannels, %dkbps, %dHZ), " 1090 "codec(%d, profile=%d, %dchannels, %dkbps, %dHZ), "
1089 "flv(%dbits, %dchannels, %dHZ)", 1091 "flv(%dbits, %dchannels, %dHZ)",
1090 - msg->header.payload_length, codec.audio_codec_id, 1092 + msg.header.payload_length, codec.audio_codec_id,
1091 codec.aac_profile, codec.aac_channels, 1093 codec.aac_profile, codec.aac_channels,
1092 codec.audio_data_rate / 1000, aac_sample_rates[codec.aac_sample_rate], 1094 codec.audio_data_rate / 1000, aac_sample_rates[codec.aac_sample_rate],
1093 flv_sample_sizes[sample.sound_size], flv_sound_types[sample.sound_type], 1095 flv_sample_sizes[sample.sound_size], flv_sound_types[sample.sound_type],
@@ -1096,7 +1098,7 @@ int SrsSource::on_audio(SrsMessage* audio) @@ -1096,7 +1098,7 @@ int SrsSource::on_audio(SrsMessage* audio)
1096 } 1098 }
1097 1099
1098 // cache the last gop packets 1100 // cache the last gop packets
1099 - if ((ret = gop_cache->cache(msg)) != ERROR_SUCCESS) { 1101 + if ((ret = gop_cache->cache(&msg)) != ERROR_SUCCESS) {
1100 srs_error("shrink gop cache failed. ret=%d", ret); 1102 srs_error("shrink gop cache failed. ret=%d", ret);
1101 return ret; 1103 return ret;
1102 } 1104 }
@@ -1105,30 +1107,31 @@ int SrsSource::on_audio(SrsMessage* audio) @@ -1105,30 +1107,31 @@ int SrsSource::on_audio(SrsMessage* audio)
1105 // if atc, update the sequence header to abs time. 1107 // if atc, update the sequence header to abs time.
1106 if (atc) { 1108 if (atc) {
1107 if (cache_sh_audio) { 1109 if (cache_sh_audio) {
1108 - cache_sh_audio->header.timestamp = msg->header.timestamp; 1110 + cache_sh_audio->header.timestamp = msg.header.timestamp;
1109 } 1111 }
1110 if (cache_metadata) { 1112 if (cache_metadata) {
1111 - cache_metadata->header.timestamp = msg->header.timestamp; 1113 + cache_metadata->header.timestamp = msg.header.timestamp;
1112 } 1114 }
1113 } 1115 }
1114 1116
1115 return ret; 1117 return ret;
1116 } 1118 }
1117 1119
1118 -int SrsSource::on_video(SrsMessage* video) 1120 +int SrsSource::on_video(SrsMessage* __video)
1119 { 1121 {
1120 int ret = ERROR_SUCCESS; 1122 int ret = ERROR_SUCCESS;
1121 1123
1122 - SrsSharedPtrMessage* msg = new SrsSharedPtrMessage();  
1123 - SrsAutoFree(SrsSharedPtrMessage, msg);  
1124 - if ((ret = msg->initialize(video)) != ERROR_SUCCESS) { 1124 + // convert __video to msg, user should not use __video again.
  1125 + // the payload is transfer to msg, and set to NULL in __video.
  1126 + SrsSharedPtrMessage msg;
  1127 + if ((ret = msg.create(__video)) != ERROR_SUCCESS) {
1125 srs_error("initialize the video failed. ret=%d", ret); 1128 srs_error("initialize the video failed. ret=%d", ret);
1126 return ret; 1129 return ret;
1127 } 1130 }
1128 srs_verbose("initialize shared ptr video success."); 1131 srs_verbose("initialize shared ptr video success.");
1129 1132
1130 #ifdef SRS_AUTO_HLS 1133 #ifdef SRS_AUTO_HLS
1131 - if ((ret = hls->on_video(msg->copy())) != ERROR_SUCCESS) { 1134 + if ((ret = hls->on_video(msg.copy())) != ERROR_SUCCESS) {
1132 srs_warn("hls process video message failed, ignore and disable hls. ret=%d", ret); 1135 srs_warn("hls process video message failed, ignore and disable hls. ret=%d", ret);
1133 1136
1134 // unpublish, ignore ret. 1137 // unpublish, ignore ret.
@@ -1140,7 +1143,7 @@ int SrsSource::on_video(SrsMessage* video) @@ -1140,7 +1143,7 @@ int SrsSource::on_video(SrsMessage* video)
1140 #endif 1143 #endif
1141 1144
1142 #ifdef SRS_AUTO_DVR 1145 #ifdef SRS_AUTO_DVR
1143 - if ((ret = dvr->on_video(msg->copy())) != ERROR_SUCCESS) { 1146 + if ((ret = dvr->on_video(msg.copy())) != ERROR_SUCCESS) {
1144 srs_warn("dvr process video message failed, ignore and disable dvr. ret=%d", ret); 1147 srs_warn("dvr process video message failed, ignore and disable dvr. ret=%d", ret);
1145 1148
1146 // unpublish, ignore ret. 1149 // unpublish, ignore ret.
@@ -1155,7 +1158,7 @@ int SrsSource::on_video(SrsMessage* video) @@ -1155,7 +1158,7 @@ int SrsSource::on_video(SrsMessage* video)
1155 if (true) { 1158 if (true) {
1156 for (int i = 0; i < (int)consumers.size(); i++) { 1159 for (int i = 0; i < (int)consumers.size(); i++) {
1157 SrsConsumer* consumer = consumers.at(i); 1160 SrsConsumer* consumer = consumers.at(i);
1158 - SrsSharedPtrMessage* copy = msg->copy(); 1161 + SrsSharedPtrMessage* copy = msg.copy();
1159 if ((ret = consumer->enqueue(copy, atc, sample_rate, frame_rate, jitter_algorithm)) != ERROR_SUCCESS) { 1162 if ((ret = consumer->enqueue(copy, atc, sample_rate, frame_rate, jitter_algorithm)) != ERROR_SUCCESS) {
1160 srs_error("dispatch the video failed. ret=%d", ret); 1163 srs_error("dispatch the video failed. ret=%d", ret);
1161 return ret; 1164 return ret;
@@ -1169,7 +1172,7 @@ int SrsSource::on_video(SrsMessage* video) @@ -1169,7 +1172,7 @@ int SrsSource::on_video(SrsMessage* video)
1169 std::vector<SrsForwarder*>::iterator it; 1172 std::vector<SrsForwarder*>::iterator it;
1170 for (it = forwarders.begin(); it != forwarders.end(); ++it) { 1173 for (it = forwarders.begin(); it != forwarders.end(); ++it) {
1171 SrsForwarder* forwarder = *it; 1174 SrsForwarder* forwarder = *it;
1172 - if ((ret = forwarder->on_video(msg->copy())) != ERROR_SUCCESS) { 1175 + if ((ret = forwarder->on_video(msg.copy())) != ERROR_SUCCESS) {
1173 srs_error("forwarder process video message failed. ret=%d", ret); 1176 srs_error("forwarder process video message failed. ret=%d", ret);
1174 return ret; 1177 return ret;
1175 } 1178 }
@@ -1178,28 +1181,28 @@ int SrsSource::on_video(SrsMessage* video) @@ -1178,28 +1181,28 @@ int SrsSource::on_video(SrsMessage* video)
1178 1181
1179 // cache the sequence header if h264 1182 // cache the sequence header if h264
1180 // donot cache the sequence header to gop_cache, return here. 1183 // donot cache the sequence header to gop_cache, return here.
1181 - if (SrsFlvCodec::video_is_sequence_header(msg->payload, msg->size)) { 1184 + if (SrsFlvCodec::video_is_sequence_header(msg.payload, msg.size)) {
1182 srs_freep(cache_sh_video); 1185 srs_freep(cache_sh_video);
1183 - cache_sh_video = msg->copy(); 1186 + cache_sh_video = msg.copy();
1184 1187
1185 // parse detail audio codec 1188 // parse detail audio codec
1186 SrsAvcAacCodec codec; 1189 SrsAvcAacCodec codec;
1187 SrsCodecSample sample; 1190 SrsCodecSample sample;
1188 - if ((ret = codec.video_avc_demux(msg->payload, msg->size, &sample)) != ERROR_SUCCESS) { 1191 + if ((ret = codec.video_avc_demux(msg.payload, msg.size, &sample)) != ERROR_SUCCESS) {
1189 srs_error("codec demux video failed. ret=%d", ret); 1192 srs_error("codec demux video failed. ret=%d", ret);
1190 return ret; 1193 return ret;
1191 } 1194 }
1192 1195
1193 srs_trace("%dB video sh, " 1196 srs_trace("%dB video sh, "
1194 "codec(%d, profile=%d, level=%d, %dx%d, %dkbps, %dfps, %ds)", 1197 "codec(%d, profile=%d, level=%d, %dx%d, %dkbps, %dfps, %ds)",
1195 - msg->header.payload_length, codec.video_codec_id, 1198 + msg.header.payload_length, codec.video_codec_id,
1196 codec.avc_profile, codec.avc_level, codec.width, codec.height, 1199 codec.avc_profile, codec.avc_level, codec.width, codec.height,
1197 codec.video_data_rate / 1000, codec.frame_rate, codec.duration); 1200 codec.video_data_rate / 1000, codec.frame_rate, codec.duration);
1198 return ret; 1201 return ret;
1199 } 1202 }
1200 1203
1201 // cache the last gop packets 1204 // cache the last gop packets
1202 - if ((ret = gop_cache->cache(msg)) != ERROR_SUCCESS) { 1205 + if ((ret = gop_cache->cache(&msg)) != ERROR_SUCCESS) {
1203 srs_error("gop cache msg failed. ret=%d", ret); 1206 srs_error("gop cache msg failed. ret=%d", ret);
1204 return ret; 1207 return ret;
1205 } 1208 }
@@ -1208,10 +1211,10 @@ int SrsSource::on_video(SrsMessage* video) @@ -1208,10 +1211,10 @@ int SrsSource::on_video(SrsMessage* video)
1208 // if atc, update the sequence header to abs time. 1211 // if atc, update the sequence header to abs time.
1209 if (atc) { 1212 if (atc) {
1210 if (cache_sh_video) { 1213 if (cache_sh_video) {
1211 - cache_sh_video->header.timestamp = msg->header.timestamp; 1214 + cache_sh_video->header.timestamp = msg.header.timestamp;
1212 } 1215 }
1213 if (cache_metadata) { 1216 if (cache_metadata) {
1214 - cache_metadata->header.timestamp = msg->header.timestamp; 1217 + cache_metadata->header.timestamp = msg.header.timestamp;
1215 } 1218 }
1216 } 1219 }
1217 1220
@@ -388,7 +388,7 @@ int srs_write_packet(srs_rtmp_t rtmp, int type, u_int32_t timestamp, char* data, @@ -388,7 +388,7 @@ int srs_write_packet(srs_rtmp_t rtmp, int type, u_int32_t timestamp, char* data,
388 header.initialize_audio(size, timestamp, context->stream_id); 388 header.initialize_audio(size, timestamp, context->stream_id);
389 389
390 msg = new SrsSharedPtrMessage(); 390 msg = new SrsSharedPtrMessage();
391 - if ((ret = msg->initialize(&header, data, size)) != ERROR_SUCCESS) { 391 + if ((ret = msg->create(&header, data, size)) != ERROR_SUCCESS) {
392 srs_freep(data); 392 srs_freep(data);
393 return ret; 393 return ret;
394 } 394 }
@@ -397,7 +397,7 @@ int srs_write_packet(srs_rtmp_t rtmp, int type, u_int32_t timestamp, char* data, @@ -397,7 +397,7 @@ int srs_write_packet(srs_rtmp_t rtmp, int type, u_int32_t timestamp, char* data,
397 header.initialize_video(size, timestamp, context->stream_id); 397 header.initialize_video(size, timestamp, context->stream_id);
398 398
399 msg = new SrsSharedPtrMessage(); 399 msg = new SrsSharedPtrMessage();
400 - if ((ret = msg->initialize(&header, data, size)) != ERROR_SUCCESS) { 400 + if ((ret = msg->create(&header, data, size)) != ERROR_SUCCESS) {
401 srs_freep(data); 401 srs_freep(data);
402 return ret; 402 return ret;
403 } 403 }
@@ -406,7 +406,7 @@ int srs_write_packet(srs_rtmp_t rtmp, int type, u_int32_t timestamp, char* data, @@ -406,7 +406,7 @@ int srs_write_packet(srs_rtmp_t rtmp, int type, u_int32_t timestamp, char* data,
406 header.initialize_amf0_script(size, context->stream_id); 406 header.initialize_amf0_script(size, context->stream_id);
407 407
408 msg = new SrsSharedPtrMessage(); 408 msg = new SrsSharedPtrMessage();
409 - if ((ret = msg->initialize(&header, data, size)) != ERROR_SUCCESS) { 409 + if ((ret = msg->create(&header, data, size)) != ERROR_SUCCESS) {
410 srs_freep(data); 410 srs_freep(data);
411 return ret; 411 return ret;
412 } 412 }
@@ -34,6 +34,10 @@ class SrsSharedPtrMessage; @@ -34,6 +34,10 @@ class SrsSharedPtrMessage;
34 34
35 /** 35 /**
36 * the class to auto free the shared ptr message array. 36 * the class to auto free the shared ptr message array.
  37 +* when need to get some messages, for instance, from Consumer queue,
  38 +* create a message array, whose msgs can used to accept the msgs,
  39 +* then send each message and set to NULL.
  40 +* @remark: when error, the message array will free the msg not sent out.
37 */ 41 */
38 class SrsSharedPtrMessageArray 42 class SrsSharedPtrMessageArray
39 { 43 {
@@ -1612,26 +1612,28 @@ SrsSharedPtrMessage::~SrsSharedPtrMessage() @@ -1612,26 +1612,28 @@ SrsSharedPtrMessage::~SrsSharedPtrMessage()
1612 } 1612 }
1613 } 1613 }
1614 1614
1615 -int SrsSharedPtrMessage::initialize(SrsMessage* source) 1615 +int SrsSharedPtrMessage::create(SrsMessage* msg)
1616 { 1616 {
1617 int ret = ERROR_SUCCESS; 1617 int ret = ERROR_SUCCESS;
1618 1618
1619 - if ((ret = initialize(&source->header, (char*)source->payload, source->size)) != ERROR_SUCCESS) { 1619 + if ((ret = create(&msg->header, (char*)msg->payload, msg->size)) != ERROR_SUCCESS) {
1620 return ret; 1620 return ret;
1621 } 1621 }
1622 1622
1623 - // detach the payload from source  
1624 - source->payload = NULL;  
1625 - source->size = 0; 1623 + // to prevent double free of payload:
  1624 + // initialize already attach the payload of msg,
  1625 + // detach the payload to transfer the owner to shared ptr.
  1626 + msg->payload = NULL;
  1627 + msg->size = 0;
1626 1628
1627 return ret; 1629 return ret;
1628 } 1630 }
1629 1631
1630 -int SrsSharedPtrMessage::initialize(SrsMessageHeader* source, char* payload, int size) 1632 +int SrsSharedPtrMessage::create(SrsMessageHeader* pheader, char* payload, int size)
1631 { 1633 {
1632 int ret = ERROR_SUCCESS; 1634 int ret = ERROR_SUCCESS;
1633 1635
1634 - srs_assert(source != NULL); 1636 + srs_assert(pheader != NULL);
1635 if (ptr) { 1637 if (ptr) {
1636 ret = ERROR_SYSTEM_ASSERT_FAILED; 1638 ret = ERROR_SYSTEM_ASSERT_FAILED;
1637 srs_error("should not set the payload twice. ret=%d", ret); 1639 srs_error("should not set the payload twice. ret=%d", ret);
@@ -1640,28 +1642,31 @@ int SrsSharedPtrMessage::initialize(SrsMessageHeader* source, char* payload, int @@ -1640,28 +1642,31 @@ int SrsSharedPtrMessage::initialize(SrsMessageHeader* source, char* payload, int
1640 return ret; 1642 return ret;
1641 } 1643 }
1642 1644
1643 - header = *source; 1645 + header = *pheader;
1644 header.payload_length = size; 1646 header.payload_length = size;
1645 1647
1646 ptr = new __SrsSharedPtr(); 1648 ptr = new __SrsSharedPtr();
1647 1649
1648 - // direct attach the data of common message. 1650 + // direct attach the data.
1649 ptr->payload = payload; 1651 ptr->payload = payload;
1650 ptr->size = size; 1652 ptr->size = size;
1651 1653
  1654 + // message can access it.
1652 SrsMessage::payload = (int8_t*)ptr->payload; 1655 SrsMessage::payload = (int8_t*)ptr->payload;
1653 SrsMessage::size = ptr->size; 1656 SrsMessage::size = ptr->size;
1654 1657
1655 return ret; 1658 return ret;
1656 } 1659 }
1657 1660
  1661 +int SrsSharedPtrMessage::count()
  1662 +{
  1663 + srs_assert(ptr);
  1664 + return ptr->shared_count;
  1665 +}
  1666 +
1658 SrsSharedPtrMessage* SrsSharedPtrMessage::copy() 1667 SrsSharedPtrMessage* SrsSharedPtrMessage::copy()
1659 { 1668 {
1660 - if (!ptr) {  
1661 - srs_error("invoke initialize to initialize the ptr.");  
1662 - srs_assert(false);  
1663 - return NULL;  
1664 - } 1669 + srs_assert(ptr);
1665 1670
1666 SrsSharedPtrMessage* copy = new SrsSharedPtrMessage(); 1671 SrsSharedPtrMessage* copy = new SrsSharedPtrMessage();
1667 1672
@@ -370,6 +370,13 @@ public: @@ -370,6 +370,13 @@ public:
370 * shared ptr message. 370 * shared ptr message.
371 * for audio/video/data message that need less memory copy. 371 * for audio/video/data message that need less memory copy.
372 * and only for output. 372 * and only for output.
  373 +*
  374 +* create first object by constructor and create(),
  375 +* use copy if need reference count message.
  376 +*
  377 +* Usage:
  378 +* SrsSharedPtrMessage msg;
  379 +*
373 */ 380 */
374 class SrsSharedPtrMessage : public SrsMessage 381 class SrsSharedPtrMessage : public SrsMessage
375 { 382 {
@@ -390,19 +397,30 @@ public: @@ -390,19 +397,30 @@ public:
390 virtual ~SrsSharedPtrMessage(); 397 virtual ~SrsSharedPtrMessage();
391 public: 398 public:
392 /** 399 /**
393 - * set the shared payload.  
394 - * we will detach the payload of source,  
395 - * so ensure donot use it before. 400 + * create shared ptr message,
  401 + * copy header, manage the payload of msg,
  402 + * set the payload to NULL to prevent double free.
  403 + * @remark payload of msg set to NULL if success.
  404 + */
  405 + virtual int create(SrsMessage* msg);
  406 + /**
  407 + * create shared ptr message,
  408 + * from the header and payload.
  409 + * @remark user should never free the payload.
396 */ 410 */
397 - virtual int initialize(SrsMessage* source); 411 + virtual int create(SrsMessageHeader* pheader, char* payload, int size);
398 /** 412 /**
399 - * set the shared payload.  
400 - * use source header, and specified param payload. 413 + * get current reference count.
  414 + * when this object created, count set to 0.
  415 + * if copy() this object, count increase 1.
  416 + * if this or copy deleted, free payload when count is 0, or count--.
  417 + * @remark, assert object is created.
401 */ 418 */
402 - virtual int initialize(SrsMessageHeader* source, char* payload, int size); 419 + virtual int count();
403 public: 420 public:
404 /** 421 /**
405 * copy current shared ptr message, use ref-count. 422 * copy current shared ptr message, use ref-count.
  423 + * @remark, assert object is created.
406 */ 424 */
407 virtual SrsSharedPtrMessage* copy(); 425 virtual SrsSharedPtrMessage* copy();
408 }; 426 };
@@ -27,6 +27,8 @@ using namespace std; @@ -27,6 +27,8 @@ using namespace std;
27 #include <srs_kernel_error.hpp> 27 #include <srs_kernel_error.hpp>
28 #include <srs_core_autofree.hpp> 28 #include <srs_core_autofree.hpp>
29 #include <srs_protocol_utility.hpp> 29 #include <srs_protocol_utility.hpp>
  30 +#include <srs_protocol_msg_array.hpp>
  31 +#include <srs_protocol_rtmp_stack.hpp>
30 32
31 MockEmptyIO::MockEmptyIO() 33 MockEmptyIO::MockEmptyIO()
32 { 34 {
@@ -389,3 +391,27 @@ VOID TEST(ProtocolUtilityTest, GenerateTcUrl) @@ -389,3 +391,27 @@ VOID TEST(ProtocolUtilityTest, GenerateTcUrl)
389 tcUrl = srs_generate_tc_url(ip, vhost, app, port); 391 tcUrl = srs_generate_tc_url(ip, vhost, app, port);
390 EXPECT_STREQ("rtmp://demo:19351/live", tcUrl.c_str()); 392 EXPECT_STREQ("rtmp://demo:19351/live", tcUrl.c_str());
391 } 393 }
  394 +
  395 +VOID TEST(ProtocolMsgArrayTest, MessageArray)
  396 +{
  397 + SrsMessageHeader header;
  398 + SrsSharedPtrMessage msg;
  399 + char* payload = new char[1024];
  400 + EXPECT_TRUE(ERROR_SUCCESS == msg.create(&header, payload, 1024));
  401 + EXPECT_EQ(0, msg.count());
  402 +
  403 + if (true) {
  404 + SrsSharedPtrMessageArray arr(3);
  405 +
  406 + arr.msgs[0] = msg.copy();
  407 + EXPECT_EQ(1, msg.count());
  408 +
  409 + arr.msgs[1] = msg.copy();
  410 + EXPECT_EQ(2, msg.count());
  411 +
  412 + arr.msgs[2] = msg.copy();
  413 + EXPECT_EQ(3, msg.count());
  414 + }
  415 + EXPECT_EQ(0, msg.count());
  416 +}
  417 +