winlin

add shared ptr to video/audio/data

@@ -65,5 +65,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -65,5 +65,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
65 #define ERROR_SYSTEM_STREAM_INIT 400 65 #define ERROR_SYSTEM_STREAM_INIT 400
66 #define ERROR_SYSTEM_PACKET_INVALID 401 66 #define ERROR_SYSTEM_PACKET_INVALID 401
67 #define ERROR_SYSTEM_CLIENT_INVALID 402 67 #define ERROR_SYSTEM_CLIENT_INVALID 402
  68 +#define ERROR_SYSTEM_ASSERT_FAILED 403
68 69
69 #endif 70 #endif
@@ -29,6 +29,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -29,6 +29,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 #include <srs_core_socket.hpp> 29 #include <srs_core_socket.hpp>
30 #include <srs_core_buffer.hpp> 30 #include <srs_core_buffer.hpp>
31 #include <srs_core_stream.hpp> 31 #include <srs_core_stream.hpp>
  32 +#include <srs_core_auto_free.hpp>
32 33
33 /**************************************************************************** 34 /****************************************************************************
34 ***************************************************************************** 35 *****************************************************************************
@@ -319,10 +320,13 @@ int SrsProtocol::recv_message(SrsMessage** pmsg) @@ -319,10 +320,13 @@ int SrsProtocol::recv_message(SrsMessage** pmsg)
319 return ret; 320 return ret;
320 } 321 }
321 322
322 -int SrsProtocol::send_message(SrsMessage* msg) 323 +int SrsProtocol::send_message(SrsOutputableMessage* msg)
323 { 324 {
324 int ret = ERROR_SUCCESS; 325 int ret = ERROR_SUCCESS;
325 326
  327 + // free msg whatever return value.
  328 + SrsAutoFree(SrsOutputableMessage, msg, false);
  329 +
326 if ((ret = msg->encode_packet()) != ERROR_SUCCESS) { 330 if ((ret = msg->encode_packet()) != ERROR_SUCCESS) {
327 srs_error("encode packet to message payload failed. ret=%d", ret); 331 srs_error("encode packet to message payload failed. ret=%d", ret);
328 return ret; 332 return ret;
@@ -475,15 +479,21 @@ int SrsProtocol::on_recv_message(SrsMessage* msg) @@ -475,15 +479,21 @@ int SrsProtocol::on_recv_message(SrsMessage* msg)
475 return ret; 479 return ret;
476 } 480 }
477 481
478 -int SrsProtocol::on_send_message(SrsMessage* msg) 482 +int SrsProtocol::on_send_message(SrsOutputableMessage* msg)
479 { 483 {
480 int ret = ERROR_SUCCESS; 484 int ret = ERROR_SUCCESS;
481 485
482 - srs_assert(msg != NULL); 486 + SrsMessage* common_msg = dynamic_cast<SrsMessage*>(msg);
  487 + if (!msg) {
  488 + srs_verbose("ignore the shared ptr message.");
  489 + return ret;
  490 + }
483 491
484 - switch (msg->header.message_type) { 492 + srs_assert(common_msg != NULL);
  493 +
  494 + switch (common_msg->header.message_type) {
485 case RTMP_MSG_SetChunkSize: { 495 case RTMP_MSG_SetChunkSize: {
486 - SrsSetChunkSizePacket* pkt = dynamic_cast<SrsSetChunkSizePacket*>(msg->get_packet()); 496 + SrsSetChunkSizePacket* pkt = dynamic_cast<SrsSetChunkSizePacket*>(common_msg->get_packet());
487 srs_assert(pkt != NULL); 497 srs_assert(pkt != NULL);
488 498
489 out_chunk_size = pkt->chunk_size; 499 out_chunk_size = pkt->chunk_size;
@@ -917,17 +927,30 @@ SrsChunkStream::~SrsChunkStream() @@ -917,17 +927,30 @@ SrsChunkStream::~SrsChunkStream()
917 srs_freep(msg); 927 srs_freep(msg);
918 } 928 }
919 929
920 -SrsMessage::SrsMessage() 930 +SrsOutputableMessage::SrsOutputableMessage()
921 { 931 {
  932 + payload = NULL;
922 size = 0; 933 size = 0;
  934 +}
  935 +
  936 +SrsOutputableMessage::~SrsOutputableMessage()
  937 +{
  938 + free_payload();
  939 +}
  940 +
  941 +void SrsOutputableMessage::free_payload()
  942 +{
  943 + srs_freepa(payload);
  944 +}
  945 +
  946 +SrsMessage::SrsMessage()
  947 +{
923 stream = NULL; 948 stream = NULL;
924 - payload = NULL;  
925 packet = NULL; 949 packet = NULL;
926 } 950 }
927 951
928 SrsMessage::~SrsMessage() 952 SrsMessage::~SrsMessage()
929 { 953 {
930 - srs_freep(payload);  
931 srs_freep(packet); 954 srs_freep(packet);
932 srs_freep(stream); 955 srs_freep(stream);
933 } 956 }
@@ -1089,14 +1112,97 @@ int SrsMessage::encode_packet() @@ -1089,14 +1112,97 @@ int SrsMessage::encode_packet()
1089 return packet->encode(size, (char*&)payload); 1112 return packet->encode(size, (char*&)payload);
1090 } 1113 }
1091 1114
  1115 +SrsSharedMessage::SrsSharedPtr::SrsSharedPtr()
  1116 +{
  1117 + payload = NULL;
  1118 + size = 0;
  1119 + perfer_cid = 0;
  1120 + shared_count = 0;
  1121 +}
  1122 +
  1123 +SrsSharedMessage::SrsSharedPtr::~SrsSharedPtr()
  1124 +{
  1125 + srs_freepa(payload);
  1126 +}
  1127 +
1092 SrsSharedMessage::SrsSharedMessage() 1128 SrsSharedMessage::SrsSharedMessage()
1093 { 1129 {
  1130 + ptr = NULL;
1094 } 1131 }
1095 1132
1096 SrsSharedMessage::~SrsSharedMessage() 1133 SrsSharedMessage::~SrsSharedMessage()
1097 { 1134 {
1098 } 1135 }
1099 1136
  1137 +void SrsSharedMessage::free_payload()
  1138 +{
  1139 + if (ptr) {
  1140 + if (ptr->shared_count == 0) {
  1141 + srs_freep(ptr);
  1142 + } else {
  1143 + ptr->shared_count--;
  1144 + }
  1145 + }
  1146 +}
  1147 +
  1148 +int SrsSharedMessage::initialize(SrsMessageHeader* header, char* payload, int size, int perfer_cid)
  1149 +{
  1150 + int ret = ERROR_SUCCESS;
  1151 +
  1152 + super::header = *header;
  1153 +
  1154 + if (ptr) {
  1155 + ret = ERROR_SYSTEM_ASSERT_FAILED;
  1156 + srs_error("should not set the payload twice. ret=%d", ret);
  1157 + srs_assert(false);
  1158 +
  1159 + return ret;
  1160 + }
  1161 +
  1162 + ptr = new SrsSharedPtr();
  1163 + ptr->payload = payload;
  1164 + ptr->size = size;
  1165 + ptr->perfer_cid = perfer_cid;
  1166 +
  1167 + super::payload = (int8_t*)ptr->payload;
  1168 + super::size = ptr->size;
  1169 +
  1170 + return ret;
  1171 +}
  1172 +
  1173 +SrsSharedMessage* SrsSharedMessage::copy()
  1174 +{
  1175 + if (!ptr) {
  1176 + srs_error("invoke initialize to initialize the ptr.");
  1177 + srs_assert(false);
  1178 + return NULL;
  1179 + }
  1180 +
  1181 + SrsSharedMessage* copy = new SrsSharedMessage();
  1182 + copy->ptr = ptr;
  1183 + ptr->shared_count++;
  1184 +
  1185 + copy->payload = (int8_t*)ptr->payload;
  1186 + copy->size = ptr->size;
  1187 +
  1188 + return copy;
  1189 +}
  1190 +
  1191 +int SrsSharedMessage::get_perfer_cid()
  1192 +{
  1193 + if (!ptr) {
  1194 + return 0;
  1195 + }
  1196 +
  1197 + return ptr->perfer_cid;
  1198 +}
  1199 +
  1200 +int SrsSharedMessage::encode_packet()
  1201 +{
  1202 + srs_verbose("shared message ignore the encode method.");
  1203 + return ERROR_SUCCESS;
  1204 +}
  1205 +
1100 SrsPacket::SrsPacket() 1206 SrsPacket::SrsPacket()
1101 { 1207 {
1102 } 1208 }
@@ -47,6 +47,7 @@ class SrsChunkStream; @@ -47,6 +47,7 @@ class SrsChunkStream;
47 class SrsAmf0Object; 47 class SrsAmf0Object;
48 class SrsAmf0Null; 48 class SrsAmf0Null;
49 class SrsAmf0Undefined; 49 class SrsAmf0Undefined;
  50 +class SrsOutputableMessage;
50 51
51 // convert class name to string. 52 // convert class name to string.
52 #define CLASS_NAME_STRING(className) #className 53 #define CLASS_NAME_STRING(className) #className
@@ -111,7 +112,7 @@ public: @@ -111,7 +112,7 @@ public:
111 * then sendout over socket. 112 * then sendout over socket.
112 * @msg this method will free it whatever return value. 113 * @msg this method will free it whatever return value.
113 */ 114 */
114 - virtual int send_message(SrsMessage* msg); 115 + virtual int send_message(SrsOutputableMessage* msg);
115 private: 116 private:
116 /** 117 /**
117 * when recv message, update the context. 118 * when recv message, update the context.
@@ -120,7 +121,7 @@ private: @@ -120,7 +121,7 @@ private:
120 /** 121 /**
121 * when message sentout, update the context. 122 * when message sentout, update the context.
122 */ 123 */
123 - virtual int on_send_message(SrsMessage* msg); 124 + virtual int on_send_message(SrsOutputableMessage* msg);
124 /** 125 /**
125 * try to recv interlaced message from peer, 126 * try to recv interlaced message from peer,
126 * return error if error occur and nerver set the pmsg, 127 * return error if error occur and nerver set the pmsg,
@@ -232,9 +233,9 @@ public: @@ -232,9 +233,9 @@ public:
232 }; 233 };
233 234
234 /** 235 /**
235 -* common RTMP message defines in rtmp.part2.Message-Formats.pdf. 236 +* message to output.
236 */ 237 */
237 -class SrsMessage 238 +class SrsOutputableMessage
238 { 239 {
239 // 4.1. Message Header 240 // 4.1. Message Header
240 public: 241 public:
@@ -249,6 +250,34 @@ public: @@ -249,6 +250,34 @@ public:
249 */ 250 */
250 int32_t size; 251 int32_t size;
251 int8_t* payload; 252 int8_t* payload;
  253 +public:
  254 + SrsOutputableMessage();
  255 + virtual ~SrsOutputableMessage();
  256 +protected:
  257 + virtual void free_payload();
  258 +/**
  259 +* encode functions.
  260 +*/
  261 +public:
  262 + /**
  263 + * get the perfered cid(chunk stream id) which sendout over.
  264 + */
  265 + virtual int get_perfer_cid() = 0;
  266 + /**
  267 + * encode the packet to message payload bytes.
  268 + * @remark there exists empty packet, so maybe the payload is NULL.
  269 + */
  270 + virtual int encode_packet() = 0;
  271 +};
  272 +
  273 +/**
  274 +* common RTMP message defines in rtmp.part2.Message-Formats.pdf.
  275 +* cannbe parse and decode.
  276 +*/
  277 +class SrsMessage : public SrsOutputableMessage
  278 +{
  279 +private:
  280 + typedef SrsOutputableMessage super;
252 // decoded message payload. 281 // decoded message payload.
253 private: 282 private:
254 SrsStream* stream; 283 SrsStream* stream;
@@ -256,6 +285,9 @@ private: @@ -256,6 +285,9 @@ private:
256 public: 285 public:
257 SrsMessage(); 286 SrsMessage();
258 virtual ~SrsMessage(); 287 virtual ~SrsMessage();
  288 +/**
  289 +* decode functions.
  290 +*/
259 public: 291 public:
260 /** 292 /**
261 * decode packet from message payload. 293 * decode packet from message payload.
@@ -266,6 +298,9 @@ public: @@ -266,6 +298,9 @@ public:
266 * @remark, user never free the pkt, the message will auto free it. 298 * @remark, user never free the pkt, the message will auto free it.
267 */ 299 */
268 virtual SrsPacket* get_packet(); 300 virtual SrsPacket* get_packet();
  301 +/**
  302 +* encode functions.
  303 +*/
269 public: 304 public:
270 /** 305 /**
271 * get the perfered cid(chunk stream id) which sendout over. 306 * get the perfered cid(chunk stream id) which sendout over.
@@ -287,12 +322,46 @@ public: @@ -287,12 +322,46 @@ public:
287 /** 322 /**
288 * shared ptr message. 323 * shared ptr message.
289 * for audio/video/data message that need less memory copy. 324 * for audio/video/data message that need less memory copy.
  325 +* and only for output.
290 */ 326 */
291 -class SrsSharedMessage : public SrsMessage 327 +class SrsSharedMessage : public SrsOutputableMessage
292 { 328 {
  329 +private:
  330 + typedef SrsOutputableMessage super;
  331 +private:
  332 + struct SrsSharedPtr
  333 + {
  334 + char* payload;
  335 + int size;
  336 + int perfer_cid;
  337 + int shared_count;
  338 +
  339 + SrsSharedPtr();
  340 + virtual ~SrsSharedPtr();
  341 + };
  342 + SrsSharedPtr* ptr;
293 public: 343 public:
294 SrsSharedMessage(); 344 SrsSharedMessage();
295 virtual ~SrsSharedMessage(); 345 virtual ~SrsSharedMessage();
  346 +protected:
  347 + virtual void free_payload();
  348 +public:
  349 + /**
  350 + * set the shared payload.
  351 + */
  352 + virtual int initialize(SrsMessageHeader* header, char* payload, int size, int perfer_cid);
  353 + virtual SrsSharedMessage* copy();
  354 +public:
  355 + /**
  356 + * get the perfered cid(chunk stream id) which sendout over.
  357 + */
  358 + virtual int get_perfer_cid();
  359 + /**
  360 + * ignored.
  361 + * for shared message, nothing should be done.
  362 + * use initialize() to set the data.
  363 + */
  364 + virtual int encode_packet();
296 }; 365 };
297 366
298 /** 367 /**