winlin

dispatch video/audio/data to consumers

@@ -25,6 +25,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -25,6 +25,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
25 25
26 #include <srs_core_log.hpp> 26 #include <srs_core_log.hpp>
27 #include <srs_core_protocol.hpp> 27 #include <srs_core_protocol.hpp>
  28 +#include <srs_core_auto_free.hpp>
28 #include <srs_core_amf0.hpp> 29 #include <srs_core_amf0.hpp>
29 30
30 std::map<std::string, SrsSource*> SrsSource::pool; 31 std::map<std::string, SrsSource*> SrsSource::pool;
@@ -47,6 +48,12 @@ SrsConsumer::~SrsConsumer() @@ -47,6 +48,12 @@ SrsConsumer::~SrsConsumer()
47 { 48 {
48 } 49 }
49 50
  51 +int SrsConsumer::enqueue(SrsSharedPtrMessage* msg)
  52 +{
  53 + int ret = ERROR_SUCCESS;
  54 + return ret;
  55 +}
  56 +
50 int SrsConsumer::get_packets(int max_count, SrsCommonMessage**& msgs, int& count) 57 int SrsConsumer::get_packets(int max_count, SrsCommonMessage**& msgs, int& count)
51 { 58 {
52 msgs = NULL; 59 msgs = NULL;
@@ -59,6 +66,7 @@ int SrsConsumer::get_packets(int max_count, SrsCommonMessage**& msgs, int& count @@ -59,6 +66,7 @@ int SrsConsumer::get_packets(int max_count, SrsCommonMessage**& msgs, int& count
59 SrsSource::SrsSource(std::string _stream_url) 66 SrsSource::SrsSource(std::string _stream_url)
60 { 67 {
61 stream_url = _stream_url; 68 stream_url = _stream_url;
  69 + cache_metadata = new SrsSharedPtrMessage();
62 } 70 }
63 71
64 SrsSource::~SrsSource() 72 SrsSource::~SrsSource()
@@ -69,6 +77,8 @@ SrsSource::~SrsSource() @@ -69,6 +77,8 @@ SrsSource::~SrsSource()
69 srs_freep(consumer); 77 srs_freep(consumer);
70 } 78 }
71 consumers.clear(); 79 consumers.clear();
  80 +
  81 + srs_freep(cache_metadata);
72 } 82 }
73 83
74 int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata) 84 int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata)
@@ -78,18 +88,105 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata @@ -78,18 +88,105 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata
78 metadata->metadata->set("server", 88 metadata->metadata->set("server",
79 new SrsAmf0String(RTMP_SIG_SRS_NAME""RTMP_SIG_SRS_VERSION)); 89 new SrsAmf0String(RTMP_SIG_SRS_NAME""RTMP_SIG_SRS_VERSION));
80 90
  91 + // encode the metadata to payload
  92 + int size = metadata->get_payload_length();
  93 + if (size <= 0) {
  94 + srs_warn("ignore the invalid metadata. size=%d", size);
  95 + return ret;
  96 + }
  97 + srs_verbose("get metadata size success.");
  98 +
  99 + char* payload = new char[size];
  100 + memset(payload, 0, size);
  101 + if ((ret = metadata->encode(size, payload)) != ERROR_SUCCESS) {
  102 + srs_error("encode metadata error. ret=%d", ret);
  103 + srs_freepa(payload);
  104 + return ret;
  105 + }
  106 + srs_verbose("encode metadata success.");
  107 +
  108 + // create a shared ptr message.
  109 + srs_freep(cache_metadata);
  110 + cache_metadata = new SrsSharedPtrMessage();
  111 +
  112 + // dump message to shared ptr message.
  113 + if ((ret = cache_metadata->initialize(&msg->header, payload, size, msg->get_perfer_cid())) != ERROR_SUCCESS) {
  114 + srs_error("initialize the cache metadata failed. ret=%d", ret);
  115 + return ret;
  116 + }
  117 + srs_verbose("initialize shared ptr metadata success.");
  118 +
  119 + // copy to all consumer
  120 + std::vector<SrsConsumer*>::iterator it;
  121 + for (it = consumers.begin(); it != consumers.end(); ++it) {
  122 + SrsConsumer* consumer = *it;
  123 + if ((ret = consumer->enqueue(cache_metadata->copy())) != ERROR_SUCCESS) {
  124 + srs_error("dispatch the metadata failed. ret=%d", ret);
  125 + return ret;
  126 + }
  127 + }
  128 + srs_trace("dispatch metadata success.");
  129 +
81 return ret; 130 return ret;
82 } 131 }
83 132
84 int SrsSource::on_audio(SrsCommonMessage* audio) 133 int SrsSource::on_audio(SrsCommonMessage* audio)
85 { 134 {
86 int ret = ERROR_SUCCESS; 135 int ret = ERROR_SUCCESS;
  136 +
  137 + SrsSharedPtrMessage* msg = new SrsSharedPtrMessage();
  138 + SrsAutoFree(SrsSharedPtrMessage, msg, false);
  139 + if ((ret = msg->initialize(&audio->header, (char*)audio->payload, audio->size, audio->get_perfer_cid())) != ERROR_SUCCESS) {
  140 + srs_error("initialize the audio failed. ret=%d", ret);
  141 + return ret;
  142 + }
  143 + srs_verbose("initialize shared ptr audio success.");
  144 +
  145 + // detach the original audio
  146 + audio->payload = NULL;
  147 + audio->size = 0;
  148 +
  149 + // copy to all consumer
  150 + std::vector<SrsConsumer*>::iterator it;
  151 + for (it = consumers.begin(); it != consumers.end(); ++it) {
  152 + SrsConsumer* consumer = *it;
  153 + if ((ret = consumer->enqueue(msg->copy())) != ERROR_SUCCESS) {
  154 + srs_error("dispatch the audio failed. ret=%d", ret);
  155 + return ret;
  156 + }
  157 + }
  158 + srs_info("dispatch audio success.");
  159 +
87 return ret; 160 return ret;
88 } 161 }
89 162
90 -int SrsSource::on_video(SrsCommonMessage* audio) 163 +int SrsSource::on_video(SrsCommonMessage* video)
91 { 164 {
92 int ret = ERROR_SUCCESS; 165 int ret = ERROR_SUCCESS;
  166 +
  167 + SrsSharedPtrMessage* msg = new SrsSharedPtrMessage();
  168 + SrsAutoFree(SrsSharedPtrMessage, msg, false);
  169 + if ((ret = msg->initialize(&video->header, (char*)video->payload, video->size, video->get_perfer_cid())) != ERROR_SUCCESS) {
  170 + srs_error("initialize the video failed. ret=%d", ret);
  171 + return ret;
  172 + }
  173 + srs_verbose("initialize shared ptr video success.");
  174 +
  175 + // detach the original audio
  176 + video->payload = NULL;
  177 + video->size = 0;
  178 +
  179 + // copy to all consumer
  180 + std::vector<SrsConsumer*>::iterator it;
  181 + for (it = consumers.begin(); it != consumers.end(); ++it) {
  182 + SrsConsumer* consumer = *it;
  183 + if ((ret = consumer->enqueue(msg->copy())) != ERROR_SUCCESS) {
  184 + srs_error("dispatch the video failed. ret=%d", ret);
  185 + return ret;
  186 + }
  187 + }
  188 + srs_info("dispatch video success.");
  189 +
93 return ret; 190 return ret;
94 } 191 }
95 192
@@ -31,10 +31,12 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -31,10 +31,12 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
31 #include <srs_core.hpp> 31 #include <srs_core.hpp>
32 32
33 #include <map> 33 #include <map>
  34 +#include <vector>
34 #include <string> 35 #include <string>
35 36
36 class SrsCommonMessage; 37 class SrsCommonMessage;
37 class SrsOnMetaDataPacket; 38 class SrsOnMetaDataPacket;
  39 +class SrsSharedPtrMessage;
38 40
39 /** 41 /**
40 * the consumer for SrsSource, that is a play client. 42 * the consumer for SrsSource, that is a play client.
@@ -46,6 +48,10 @@ public: @@ -46,6 +48,10 @@ public:
46 virtual ~SrsConsumer(); 48 virtual ~SrsConsumer();
47 public: 49 public:
48 /** 50 /**
  51 + * enqueue an shared ptr message.
  52 + */
  53 + virtual int enqueue(SrsSharedPtrMessage* msg);
  54 + /**
49 * get packets in consumer queue. 55 * get packets in consumer queue.
50 * @msgs SrsMessages*[], output the prt array. 56 * @msgs SrsMessages*[], output the prt array.
51 * @count the count in array. 57 * @count the count in array.
@@ -72,6 +78,8 @@ public: @@ -72,6 +78,8 @@ public:
72 private: 78 private:
73 std::string stream_url; 79 std::string stream_url;
74 std::vector<SrsConsumer*> consumers; 80 std::vector<SrsConsumer*> consumers;
  81 +private:
  82 + SrsSharedPtrMessage* cache_metadata;
75 public: 83 public:
76 SrsSource(std::string _stream_url); 84 SrsSource(std::string _stream_url);
77 virtual ~SrsSource(); 85 virtual ~SrsSource();