winlin

support ffmpeg publish

@@ -66,4 +66,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -66,4 +66,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
66 #define RTMP_SIG_SRS_URL "https://github.com/winlinvip/simple-rtmp-server" 66 #define RTMP_SIG_SRS_URL "https://github.com/winlinvip/simple-rtmp-server"
67 #define RTMP_SIG_SRS_VERSION "0.1" 67 #define RTMP_SIG_SRS_VERSION "0.1"
68 68
  69 +// compare
  70 +#define srs_min(a, b) ((a < b)? a : b)
  71 +
69 #endif 72 #endif
@@ -191,6 +191,11 @@ int SrsUnSortedHashtable::size() @@ -191,6 +191,11 @@ int SrsUnSortedHashtable::size()
191 return (int)properties.size(); 191 return (int)properties.size();
192 } 192 }
193 193
  194 +void SrsUnSortedHashtable::clear()
  195 +{
  196 + properties.clear();
  197 +}
  198 +
194 std::string SrsUnSortedHashtable::key_at(int index) 199 std::string SrsUnSortedHashtable::key_at(int index)
195 { 200 {
196 srs_assert(index < size()); 201 srs_assert(index < size());
@@ -255,6 +260,21 @@ SrsAmf0Any* SrsUnSortedHashtable::ensure_property_string(std::string name) @@ -255,6 +260,21 @@ SrsAmf0Any* SrsUnSortedHashtable::ensure_property_string(std::string name)
255 return prop; 260 return prop;
256 } 261 }
257 262
  263 +SrsAmf0Any* SrsUnSortedHashtable::ensure_property_number(std::string name)
  264 +{
  265 + SrsAmf0Any* prop = get_property(name);
  266 +
  267 + if (!prop) {
  268 + return NULL;
  269 + }
  270 +
  271 + if (!prop->is_number()) {
  272 + return NULL;
  273 + }
  274 +
  275 + return prop;
  276 +}
  277 +
258 SrsAmf0Object::SrsAmf0Object() 278 SrsAmf0Object::SrsAmf0Object()
259 { 279 {
260 marker = RTMP_AMF0_Object; 280 marker = RTMP_AMF0_Object;
@@ -294,6 +314,11 @@ SrsAmf0Any* SrsAmf0Object::ensure_property_string(std::string name) @@ -294,6 +314,11 @@ SrsAmf0Any* SrsAmf0Object::ensure_property_string(std::string name)
294 return properties.ensure_property_string(name); 314 return properties.ensure_property_string(name);
295 } 315 }
296 316
  317 +SrsAmf0Any* SrsAmf0Object::ensure_property_number(std::string name)
  318 +{
  319 + return properties.ensure_property_number(name);
  320 +}
  321 +
297 SrsASrsAmf0EcmaArray::SrsASrsAmf0EcmaArray() 322 SrsASrsAmf0EcmaArray::SrsASrsAmf0EcmaArray()
298 { 323 {
299 marker = RTMP_AMF0_EcmaArray; 324 marker = RTMP_AMF0_EcmaArray;
@@ -308,6 +333,11 @@ int SrsASrsAmf0EcmaArray::size() @@ -308,6 +333,11 @@ int SrsASrsAmf0EcmaArray::size()
308 return properties.size(); 333 return properties.size();
309 } 334 }
310 335
  336 +void SrsASrsAmf0EcmaArray::clear()
  337 +{
  338 + properties.clear();
  339 +}
  340 +
311 std::string SrsASrsAmf0EcmaArray::key_at(int index) 341 std::string SrsASrsAmf0EcmaArray::key_at(int index)
312 { 342 {
313 return properties.key_at(index); 343 return properties.key_at(index);
@@ -155,12 +155,14 @@ public: @@ -155,12 +155,14 @@ public:
155 virtual ~SrsUnSortedHashtable(); 155 virtual ~SrsUnSortedHashtable();
156 156
157 virtual int size(); 157 virtual int size();
  158 + virtual void clear();
158 virtual std::string key_at(int index); 159 virtual std::string key_at(int index);
159 virtual SrsAmf0Any* value_at(int index); 160 virtual SrsAmf0Any* value_at(int index);
160 virtual void set(std::string key, SrsAmf0Any* value); 161 virtual void set(std::string key, SrsAmf0Any* value);
161 162
162 virtual SrsAmf0Any* get_property(std::string name); 163 virtual SrsAmf0Any* get_property(std::string name);
163 virtual SrsAmf0Any* ensure_property_string(std::string name); 164 virtual SrsAmf0Any* ensure_property_string(std::string name);
  165 + virtual SrsAmf0Any* ensure_property_number(std::string name);
164 }; 166 };
165 167
166 /** 168 /**
@@ -185,6 +187,7 @@ public: @@ -185,6 +187,7 @@ public:
185 187
186 virtual SrsAmf0Any* get_property(std::string name); 188 virtual SrsAmf0Any* get_property(std::string name);
187 virtual SrsAmf0Any* ensure_property_string(std::string name); 189 virtual SrsAmf0Any* ensure_property_string(std::string name);
  190 + virtual SrsAmf0Any* ensure_property_number(std::string name);
188 }; 191 };
189 192
190 /** 193 /**
@@ -205,6 +208,7 @@ public: @@ -205,6 +208,7 @@ public:
205 virtual ~SrsASrsAmf0EcmaArray(); 208 virtual ~SrsASrsAmf0EcmaArray();
206 209
207 virtual int size(); 210 virtual int size();
  211 + virtual void clear();
208 virtual std::string key_at(int index); 212 virtual std::string key_at(int index);
209 virtual SrsAmf0Any* value_at(int index); 213 virtual SrsAmf0Any* value_at(int index);
210 virtual void set(std::string key, SrsAmf0Any* value); 214 virtual void set(std::string key, SrsAmf0Any* value);
@@ -265,6 +269,8 @@ extern int srs_amf0_write_null(SrsStream* stream); @@ -265,6 +269,8 @@ extern int srs_amf0_write_null(SrsStream* stream);
265 extern int srs_amf0_read_undefined(SrsStream* stream); 269 extern int srs_amf0_read_undefined(SrsStream* stream);
266 extern int srs_amf0_write_undefined(SrsStream* stream); 270 extern int srs_amf0_write_undefined(SrsStream* stream);
267 271
  272 +extern int srs_amf0_read_any(SrsStream* stream, SrsAmf0Any*& value);
  273 +
268 /** 274 /**
269 * read amf0 object from stream. 275 * read amf0 object from stream.
270 * 2.5 Object Type 276 * 2.5 Object Type
@@ -157,12 +157,20 @@ int SrsClient::streaming_play(SrsSource* source) @@ -157,12 +157,20 @@ int SrsClient::streaming_play(SrsSource* source)
157 { 157 {
158 int ret = ERROR_SUCCESS; 158 int ret = ERROR_SUCCESS;
159 159
160 - SrsConsumer* consumer = source->create_consumer(); 160 + SrsConsumer* consumer = NULL;
  161 + if ((ret = source->create_consumer(consumer)) != ERROR_SUCCESS) {
  162 + srs_error("create consumer failed. ret=%d", ret);
  163 + return ret;
  164 + }
  165 +
161 srs_assert(consumer != NULL); 166 srs_assert(consumer != NULL);
162 SrsAutoFree(SrsConsumer, consumer, false); 167 SrsAutoFree(SrsConsumer, consumer, false);
163 - srs_verbose("consumer created."); 168 + srs_verbose("consumer created success.");
164 169
165 while (true) { 170 while (true) {
  171 + // switch to other st-threads.
  172 + st_usleep(0);
  173 +
166 bool ready = false; 174 bool ready = false;
167 if ((ret = rtmp->can_read(SRS_PULSE_TIME_MS, ready)) != ERROR_SUCCESS) { 175 if ((ret = rtmp->can_read(SRS_PULSE_TIME_MS, ready)) != ERROR_SUCCESS) {
168 srs_error("wait client control message failed. ret=%d", ret); 176 srs_error("wait client control message failed. ret=%d", ret);
@@ -183,17 +191,27 @@ int SrsClient::streaming_play(SrsSource* source) @@ -183,17 +191,27 @@ int SrsClient::streaming_play(SrsSource* source)
183 } 191 }
184 192
185 // get messages from consumer. 193 // get messages from consumer.
186 - SrsCommonMessage** msgs = NULL; 194 + SrsSharedPtrMessage** msgs = NULL;
187 int count = 0; 195 int count = 0;
188 if ((ret = consumer->get_packets(0, msgs, count)) != ERROR_SUCCESS) { 196 if ((ret = consumer->get_packets(0, msgs, count)) != ERROR_SUCCESS) {
189 srs_error("get messages from consumer failed. ret=%d", ret); 197 srs_error("get messages from consumer failed. ret=%d", ret);
190 return ret; 198 return ret;
191 } 199 }
192 - SrsAutoFree(SrsCommonMessage*, msgs, true); 200 +
  201 + if (count <= 0) {
  202 + srs_verbose("no packets in queue.");
  203 + continue;
  204 + }
  205 + SrsAutoFree(SrsSharedPtrMessage*, msgs, true);
193 206
194 // sendout messages 207 // sendout messages
195 for (int i = 0; i < count; i++) { 208 for (int i = 0; i < count; i++) {
196 - SrsCommonMessage* msg = msgs[i]; 209 + SrsSharedPtrMessage* msg = msgs[i];
  210 +
  211 + // the send_message will free the msg,
  212 + // so set the msgs[i] to NULL.
  213 + msgs[i] = NULL;
  214 +
197 if ((ret = rtmp->send_message(msg)) != ERROR_SUCCESS) { 215 if ((ret = rtmp->send_message(msg)) != ERROR_SUCCESS) {
198 srs_error("send message to client failed. ret=%d", ret); 216 srs_error("send message to client failed. ret=%d", ret);
199 return ret; 217 return ret;
@@ -209,6 +227,9 @@ int SrsClient::streaming_publish(SrsSource* source) @@ -209,6 +227,9 @@ int SrsClient::streaming_publish(SrsSource* source)
209 int ret = ERROR_SUCCESS; 227 int ret = ERROR_SUCCESS;
210 228
211 while (true) { 229 while (true) {
  230 + // switch to other st-threads.
  231 + st_usleep(0);
  232 +
212 SrsCommonMessage* msg = NULL; 233 SrsCommonMessage* msg = NULL;
213 if ((ret = rtmp->recv_message(&msg)) != ERROR_SUCCESS) { 234 if ((ret = rtmp->recv_message(&msg)) != ERROR_SUCCESS) {
214 srs_error("recv identify client message failed. ret=%d", ret); 235 srs_error("recv identify client message failed. ret=%d", ret);
@@ -409,9 +409,7 @@ int SrsProtocol::send_message(ISrsMessage* msg) @@ -409,9 +409,7 @@ int SrsProtocol::send_message(ISrsMessage* msg)
409 // sendout header and payload by writev. 409 // sendout header and payload by writev.
410 // decrease the sys invoke count to get higher performance. 410 // decrease the sys invoke count to get higher performance.
411 int payload_size = msg->size - (p - (char*)msg->payload); 411 int payload_size = msg->size - (p - (char*)msg->payload);
412 - if (payload_size > out_chunk_size) {  
413 - payload_size = out_chunk_size;  
414 - } 412 + payload_size = srs_min(payload_size, out_chunk_size);
415 413
416 // send by writev 414 // send by writev
417 iovec iov[2]; 415 iovec iov[2];
@@ -821,9 +819,7 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh @@ -821,9 +819,7 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh
821 819
822 // the chunk payload size. 820 // the chunk payload size.
823 payload_size = chunk->header.payload_length - chunk->msg->size; 821 payload_size = chunk->header.payload_length - chunk->msg->size;
824 - if (payload_size > in_chunk_size) {  
825 - payload_size = in_chunk_size;  
826 - } 822 + payload_size = srs_min(payload_size, in_chunk_size);
827 srs_verbose("chunk payload size is %d, message_size=%d, received_size=%d, in_chunk_size=%d", 823 srs_verbose("chunk payload size is %d, message_size=%d, received_size=%d, in_chunk_size=%d",
828 payload_size, chunk->header.payload_length, chunk->msg->size, in_chunk_size); 824 payload_size, chunk->header.payload_length, chunk->msg->size, in_chunk_size);
829 825
@@ -940,12 +936,6 @@ ISrsMessage::ISrsMessage() @@ -940,12 +936,6 @@ ISrsMessage::ISrsMessage()
940 936
941 ISrsMessage::~ISrsMessage() 937 ISrsMessage::~ISrsMessage()
942 { 938 {
943 - free_payload();  
944 -}  
945 -  
946 -void ISrsMessage::free_payload()  
947 -{  
948 - srs_freepa(payload);  
949 } 939 }
950 940
951 SrsCommonMessage::SrsCommonMessage() 941 SrsCommonMessage::SrsCommonMessage()
@@ -956,6 +946,11 @@ SrsCommonMessage::SrsCommonMessage() @@ -956,6 +946,11 @@ SrsCommonMessage::SrsCommonMessage()
956 946
957 SrsCommonMessage::~SrsCommonMessage() 947 SrsCommonMessage::~SrsCommonMessage()
958 { 948 {
  949 + // we must directly free the ptrs,
  950 + // nevery use the virtual functions to delete,
  951 + // for in the destructor, the virtual functions is disabled.
  952 +
  953 + srs_freepa(payload);
959 srs_freep(packet); 954 srs_freep(packet);
960 srs_freep(stream); 955 srs_freep(stream);
961 } 956 }
@@ -1142,10 +1137,6 @@ SrsSharedPtrMessage::SrsSharedPtrMessage() @@ -1142,10 +1137,6 @@ SrsSharedPtrMessage::SrsSharedPtrMessage()
1142 1137
1143 SrsSharedPtrMessage::~SrsSharedPtrMessage() 1138 SrsSharedPtrMessage::~SrsSharedPtrMessage()
1144 { 1139 {
1145 -}  
1146 -  
1147 -void SrsSharedPtrMessage::free_payload()  
1148 -{  
1149 if (ptr) { 1140 if (ptr) {
1150 if (ptr->shared_count == 0) { 1141 if (ptr->shared_count == 0) {
1151 srs_freep(ptr); 1142 srs_freep(ptr);
@@ -1157,15 +1148,14 @@ void SrsSharedPtrMessage::free_payload() @@ -1157,15 +1148,14 @@ void SrsSharedPtrMessage::free_payload()
1157 1148
1158 bool SrsSharedPtrMessage::can_decode() 1149 bool SrsSharedPtrMessage::can_decode()
1159 { 1150 {
1160 - return true; 1151 + return false;
1161 } 1152 }
1162 1153
1163 -int SrsSharedPtrMessage::initialize(SrsMessageHeader* header, char* payload, int size, int perfer_cid) 1154 +int SrsSharedPtrMessage::initialize(ISrsMessage* msg, char* payload, int size)
1164 { 1155 {
1165 int ret = ERROR_SUCCESS; 1156 int ret = ERROR_SUCCESS;
1166 1157
1167 - super::header = *header;  
1168 - 1158 + srs_assert(msg != NULL);
1169 if (ptr) { 1159 if (ptr) {
1170 ret = ERROR_SYSTEM_ASSERT_FAILED; 1160 ret = ERROR_SYSTEM_ASSERT_FAILED;
1171 srs_error("should not set the payload twice. ret=%d", ret); 1161 srs_error("should not set the payload twice. ret=%d", ret);
@@ -1174,10 +1164,13 @@ int SrsSharedPtrMessage::initialize(SrsMessageHeader* header, char* payload, int @@ -1174,10 +1164,13 @@ int SrsSharedPtrMessage::initialize(SrsMessageHeader* header, char* payload, int
1174 return ret; 1164 return ret;
1175 } 1165 }
1176 1166
  1167 + header = msg->header;
  1168 + header.payload_length = size;
  1169 +
1177 ptr = new SrsSharedPtr(); 1170 ptr = new SrsSharedPtr();
1178 ptr->payload = payload; 1171 ptr->payload = payload;
1179 ptr->size = size; 1172 ptr->size = size;
1180 - ptr->perfer_cid = perfer_cid; 1173 + ptr->perfer_cid = msg->get_perfer_cid();
1181 1174
1182 super::payload = (int8_t*)ptr->payload; 1175 super::payload = (int8_t*)ptr->payload;
1183 super::size = ptr->size; 1176 super::size = ptr->size;
@@ -1194,6 +1187,9 @@ SrsSharedPtrMessage* SrsSharedPtrMessage::copy() @@ -1194,6 +1187,9 @@ SrsSharedPtrMessage* SrsSharedPtrMessage::copy()
1194 } 1187 }
1195 1188
1196 SrsSharedPtrMessage* copy = new SrsSharedPtrMessage(); 1189 SrsSharedPtrMessage* copy = new SrsSharedPtrMessage();
  1190 +
  1191 + copy->header = header;
  1192 +
1197 copy->ptr = ptr; 1193 copy->ptr = ptr;
1198 ptr->shared_count++; 1194 ptr->shared_count++;
1199 1195
@@ -2070,12 +2066,33 @@ int SrsOnMetaDataPacket::decode(SrsStream* stream) @@ -2070,12 +2066,33 @@ int SrsOnMetaDataPacket::decode(SrsStream* stream)
2070 2066
2071 srs_verbose("decode metadata name success. name=%s", name.c_str()); 2067 srs_verbose("decode metadata name success. name=%s", name.c_str());
2072 2068
2073 - if ((ret = srs_amf0_read_object(stream, metadata)) != ERROR_SUCCESS) { 2069 + // the metadata maybe object or ecma array
  2070 + SrsAmf0Any* any = NULL;
  2071 + if ((ret = srs_amf0_read_any(stream, any)) != ERROR_SUCCESS) {
2074 srs_error("decode metadata metadata failed. ret=%d", ret); 2072 srs_error("decode metadata metadata failed. ret=%d", ret);
2075 return ret; 2073 return ret;
2076 } 2074 }
2077 2075
2078 - srs_info("decode metadata success"); 2076 + if (any->is_object()) {
  2077 + srs_freep(metadata);
  2078 + metadata = srs_amf0_convert<SrsAmf0Object>(any);
  2079 + srs_info("decode metadata object success");
  2080 + return ret;
  2081 + }
  2082 +
  2083 + SrsASrsAmf0EcmaArray* arr = dynamic_cast<SrsASrsAmf0EcmaArray*>(any);
  2084 + if (!arr) {
  2085 + ret = ERROR_RTMP_AMF0_DECODE;
  2086 + srs_error("decode metadata array failed. ret=%d", ret);
  2087 + srs_freep(any);
  2088 + return ret;
  2089 + }
  2090 +
  2091 + for (int i = 0; i < arr->size(); i++) {
  2092 + metadata->set(arr->key_at(i), arr->value_at(i));
  2093 + }
  2094 + arr->clear();
  2095 + srs_info("decode metadata array success");
2079 2096
2080 return ret; 2097 return ret;
2081 } 2098 }
@@ -253,8 +253,6 @@ public: @@ -253,8 +253,6 @@ public:
253 public: 253 public:
254 ISrsMessage(); 254 ISrsMessage();
255 virtual ~ISrsMessage(); 255 virtual ~ISrsMessage();
256 -protected:  
257 - virtual void free_payload();  
258 public: 256 public:
259 /** 257 /**
260 * whether message canbe decoded. 258 * whether message canbe decoded.
@@ -351,15 +349,13 @@ private: @@ -351,15 +349,13 @@ private:
351 public: 349 public:
352 SrsSharedPtrMessage(); 350 SrsSharedPtrMessage();
353 virtual ~SrsSharedPtrMessage(); 351 virtual ~SrsSharedPtrMessage();
354 -protected:  
355 - virtual void free_payload();  
356 public: 352 public:
357 virtual bool can_decode(); 353 virtual bool can_decode();
358 public: 354 public:
359 /** 355 /**
360 * set the shared payload. 356 * set the shared payload.
361 */ 357 */
362 - virtual int initialize(SrsMessageHeader* header, char* payload, int size, int perfer_cid); 358 + virtual int initialize(ISrsMessage* msg, char* payload, int size);
363 virtual SrsSharedPtrMessage* copy(); 359 virtual SrsSharedPtrMessage* copy();
364 public: 360 public:
365 /** 361 /**
@@ -114,9 +114,13 @@ int SrsRequest::discovery_app() @@ -114,9 +114,13 @@ int SrsRequest::discovery_app()
114 114
115 std::string SrsRequest::get_stream_url() 115 std::string SrsRequest::get_stream_url()
116 { 116 {
117 - std::string url = vhost; 117 + std::string url = "";
118 118
  119 + //url += vhost;
  120 +
  121 + url += "/";
119 url += app; 122 url += app;
  123 + url += "/";
120 url += stream; 124 url += stream;
121 125
122 return url; 126 return url;
@@ -152,7 +156,7 @@ int SrsRtmp::can_read(int timeout_ms, bool& ready) @@ -152,7 +156,7 @@ int SrsRtmp::can_read(int timeout_ms, bool& ready)
152 return protocol->can_read(timeout_ms, ready); 156 return protocol->can_read(timeout_ms, ready);
153 } 157 }
154 158
155 -int SrsRtmp::send_message(SrsCommonMessage* msg) 159 +int SrsRtmp::send_message(ISrsMessage* msg)
156 { 160 {
157 return protocol->send_message(msg); 161 return protocol->send_message(msg);
158 } 162 }
@@ -233,7 +237,7 @@ int SrsRtmp::connect_app(SrsRequest* req) @@ -233,7 +237,7 @@ int SrsRtmp::connect_app(SrsRequest* req)
233 req->swfUrl = srs_amf0_convert<SrsAmf0String>(prop)->value; 237 req->swfUrl = srs_amf0_convert<SrsAmf0String>(prop)->value;
234 } 238 }
235 239
236 - if ((prop = pkt->command_object->ensure_property_string("objectEncoding")) != NULL) { 240 + if ((prop = pkt->command_object->ensure_property_number("objectEncoding")) != NULL) {
237 req->objectEncoding = srs_amf0_convert<SrsAmf0Number>(prop)->value; 241 req->objectEncoding = srs_amf0_convert<SrsAmf0Number>(prop)->value;
238 } 242 }
239 243
@@ -35,6 +35,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -35,6 +35,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
35 #include <st.h> 35 #include <st.h>
36 36
37 class SrsProtocol; 37 class SrsProtocol;
  38 +class ISrsMessage;
38 class SrsCommonMessage; 39 class SrsCommonMessage;
39 class SrsCreateStreamPacket; 40 class SrsCreateStreamPacket;
40 class SrsFMLEStartPacket; 41 class SrsFMLEStartPacket;
@@ -102,7 +103,7 @@ public: @@ -102,7 +103,7 @@ public:
102 public: 103 public:
103 virtual int recv_message(SrsCommonMessage** pmsg); 104 virtual int recv_message(SrsCommonMessage** pmsg);
104 virtual int can_read(int timeout_ms, bool& ready); 105 virtual int can_read(int timeout_ms, bool& ready);
105 - virtual int send_message(SrsCommonMessage* msg); 106 + virtual int send_message(ISrsMessage* msg);
106 public: 107 public:
107 virtual int handshake(); 108 virtual int handshake();
108 virtual int connect_app(SrsRequest* req); 109 virtual int connect_app(SrsRequest* req);
@@ -46,27 +46,54 @@ SrsConsumer::SrsConsumer() @@ -46,27 +46,54 @@ SrsConsumer::SrsConsumer()
46 46
47 SrsConsumer::~SrsConsumer() 47 SrsConsumer::~SrsConsumer()
48 { 48 {
  49 + std::vector<SrsSharedPtrMessage*>::iterator it;
  50 + for (it = msgs.begin(); it != msgs.end(); ++it) {
  51 + SrsSharedPtrMessage* msg = *it;
  52 + srs_freep(msg);
  53 + }
  54 + msgs.clear();
49 } 55 }
50 56
51 int SrsConsumer::enqueue(SrsSharedPtrMessage* msg) 57 int SrsConsumer::enqueue(SrsSharedPtrMessage* msg)
52 { 58 {
53 int ret = ERROR_SUCCESS; 59 int ret = ERROR_SUCCESS;
  60 + msgs.push_back(msg);
54 return ret; 61 return ret;
55 } 62 }
56 63
57 -int SrsConsumer::get_packets(int max_count, SrsCommonMessage**& msgs, int& count) 64 +int SrsConsumer::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count)
58 { 65 {
59 - msgs = NULL;  
60 - count = 0;  
61 -  
62 int ret = ERROR_SUCCESS; 66 int ret = ERROR_SUCCESS;
  67 +
  68 + if (msgs.empty()) {
  69 + return ret;
  70 + }
  71 +
  72 + if (max_count == 0) {
  73 + count = (int)msgs.size();
  74 + } else {
  75 + count = srs_min(max_count, (int)msgs.size());
  76 + }
  77 +
  78 + pmsgs = new SrsSharedPtrMessage*[count];
  79 +
  80 + for (int i = 0; i < count; i++) {
  81 + pmsgs[i] = msgs[i];
  82 + }
  83 +
  84 + if (count == (int)msgs.size()) {
  85 + msgs.clear();
  86 + } else {
  87 + msgs.erase(msgs.begin(), msgs.begin() + count);
  88 + }
  89 +
63 return ret; 90 return ret;
64 } 91 }
65 92
66 SrsSource::SrsSource(std::string _stream_url) 93 SrsSource::SrsSource(std::string _stream_url)
67 { 94 {
68 stream_url = _stream_url; 95 stream_url = _stream_url;
69 - cache_metadata = new SrsSharedPtrMessage(); 96 + cache_metadata = NULL;
70 } 97 }
71 98
72 SrsSource::~SrsSource() 99 SrsSource::~SrsSource()
@@ -110,7 +137,7 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata @@ -110,7 +137,7 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata
110 cache_metadata = new SrsSharedPtrMessage(); 137 cache_metadata = new SrsSharedPtrMessage();
111 138
112 // dump message to shared ptr message. 139 // dump message to shared ptr message.
113 - if ((ret = cache_metadata->initialize(&msg->header, payload, size, msg->get_perfer_cid())) != ERROR_SUCCESS) { 140 + if ((ret = cache_metadata->initialize(msg, payload, size)) != ERROR_SUCCESS) {
114 srs_error("initialize the cache metadata failed. ret=%d", ret); 141 srs_error("initialize the cache metadata failed. ret=%d", ret);
115 return ret; 142 return ret;
116 } 143 }
@@ -136,7 +163,7 @@ int SrsSource::on_audio(SrsCommonMessage* audio) @@ -136,7 +163,7 @@ int SrsSource::on_audio(SrsCommonMessage* audio)
136 163
137 SrsSharedPtrMessage* msg = new SrsSharedPtrMessage(); 164 SrsSharedPtrMessage* msg = new SrsSharedPtrMessage();
138 SrsAutoFree(SrsSharedPtrMessage, msg, false); 165 SrsAutoFree(SrsSharedPtrMessage, msg, false);
139 - if ((ret = msg->initialize(&audio->header, (char*)audio->payload, audio->size, audio->get_perfer_cid())) != ERROR_SUCCESS) { 166 + if ((ret = msg->initialize(audio, (char*)audio->payload, audio->size)) != ERROR_SUCCESS) {
140 srs_error("initialize the audio failed. ret=%d", ret); 167 srs_error("initialize the audio failed. ret=%d", ret);
141 return ret; 168 return ret;
142 } 169 }
@@ -166,7 +193,7 @@ int SrsSource::on_video(SrsCommonMessage* video) @@ -166,7 +193,7 @@ int SrsSource::on_video(SrsCommonMessage* video)
166 193
167 SrsSharedPtrMessage* msg = new SrsSharedPtrMessage(); 194 SrsSharedPtrMessage* msg = new SrsSharedPtrMessage();
168 SrsAutoFree(SrsSharedPtrMessage, msg, false); 195 SrsAutoFree(SrsSharedPtrMessage, msg, false);
169 - if ((ret = msg->initialize(&video->header, (char*)video->payload, video->size, video->get_perfer_cid())) != ERROR_SUCCESS) { 196 + if ((ret = msg->initialize(video, (char*)video->payload, video->size)) != ERROR_SUCCESS) {
170 srs_error("initialize the video failed. ret=%d", ret); 197 srs_error("initialize the video failed. ret=%d", ret);
171 return ret; 198 return ret;
172 } 199 }
@@ -190,10 +217,16 @@ int SrsSource::on_video(SrsCommonMessage* video) @@ -190,10 +217,16 @@ int SrsSource::on_video(SrsCommonMessage* video)
190 return ret; 217 return ret;
191 } 218 }
192 219
193 -SrsConsumer* SrsSource::create_consumer() 220 + int SrsSource::create_consumer(SrsConsumer*& consumer)
194 { 221 {
195 - SrsConsumer* consumer = new SrsConsumer(); 222 + consumer = new SrsConsumer();
196 consumers.push_back(consumer); 223 consumers.push_back(consumer);
197 - return consumer; 224 +
  225 + if (!cache_metadata) {
  226 + srs_info("no metadata found.");
  227 + return ERROR_SUCCESS;
  228 + }
  229 +
  230 + return consumer->enqueue(cache_metadata->copy());
198 } 231 }
199 232
@@ -43,6 +43,8 @@ class SrsSharedPtrMessage; @@ -43,6 +43,8 @@ class SrsSharedPtrMessage;
43 */ 43 */
44 class SrsConsumer 44 class SrsConsumer
45 { 45 {
  46 +private:
  47 + std::vector<SrsSharedPtrMessage*> msgs;
46 public: 48 public:
47 SrsConsumer(); 49 SrsConsumer();
48 virtual ~SrsConsumer(); 50 virtual ~SrsConsumer();
@@ -53,11 +55,11 @@ public: @@ -53,11 +55,11 @@ public:
53 virtual int enqueue(SrsSharedPtrMessage* msg); 55 virtual int enqueue(SrsSharedPtrMessage* msg);
54 /** 56 /**
55 * get packets in consumer queue. 57 * get packets in consumer queue.
56 - * @msgs SrsMessages*[], output the prt array. 58 + * @pmsgs SrsMessages*[], output the prt array.
57 * @count the count in array. 59 * @count the count in array.
58 * @max_count the max count to dequeue, 0 to dequeue all. 60 * @max_count the max count to dequeue, 0 to dequeue all.
59 */ 61 */
60 - virtual int get_packets(int max_count, SrsCommonMessage**& msgs, int& count); 62 + virtual int get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count);
61 }; 63 };
62 64
63 /** 65 /**
@@ -88,7 +90,7 @@ public: @@ -88,7 +90,7 @@ public:
88 virtual int on_audio(SrsCommonMessage* audio); 90 virtual int on_audio(SrsCommonMessage* audio);
89 virtual int on_video(SrsCommonMessage* video); 91 virtual int on_video(SrsCommonMessage* video);
90 public: 92 public:
91 - virtual SrsConsumer* create_consumer(); 93 + virtual int create_consumer(SrsConsumer*& consumer);
92 }; 94 };
93 95
94 #endif 96 #endif