winlin

for #293, support rtmp remux to http flv live stream.

@@ -42,6 +42,8 @@ using namespace std; @@ -42,6 +42,8 @@ using namespace std;
42 #include <srs_kernel_file.hpp> 42 #include <srs_kernel_file.hpp>
43 #include <srs_kernel_flv.hpp> 43 #include <srs_kernel_flv.hpp>
44 #include <srs_protocol_rtmp.hpp> 44 #include <srs_protocol_rtmp.hpp>
  45 +#include <srs_app_source.hpp>
  46 +#include <srs_protocol_msg_array.hpp>
45 47
46 SrsVodStream::SrsVodStream(string root_dir) 48 SrsVodStream::SrsVodStream(string root_dir)
47 : SrsGoHttpFileServer(root_dir) 49 : SrsGoHttpFileServer(root_dir)
@@ -136,6 +138,42 @@ int SrsVodStream::serve_flv_stream(ISrsGoHttpResponseWriter* w, SrsHttpMessage* @@ -136,6 +138,42 @@ int SrsVodStream::serve_flv_stream(ISrsGoHttpResponseWriter* w, SrsHttpMessage*
136 return ret; 138 return ret;
137 } 139 }
138 140
  141 +SrsFlvStreamWriter::SrsFlvStreamWriter(ISrsGoHttpResponseWriter* w)
  142 +{
  143 + writer = w;
  144 +}
  145 +
  146 +SrsFlvStreamWriter::~SrsFlvStreamWriter()
  147 +{
  148 +}
  149 +
  150 +int SrsFlvStreamWriter::open(std::string /*file*/)
  151 +{
  152 + return ERROR_SUCCESS;
  153 +}
  154 +
  155 +void SrsFlvStreamWriter::close()
  156 +{
  157 +}
  158 +
  159 +bool SrsFlvStreamWriter::is_open()
  160 +{
  161 + return true;
  162 +}
  163 +
  164 +int64_t SrsFlvStreamWriter::tellg()
  165 +{
  166 + return 0;
  167 +}
  168 +
  169 +int SrsFlvStreamWriter::write(void* buf, size_t count, ssize_t* pnwrite)
  170 +{
  171 + if (pnwrite) {
  172 + *pnwrite = count;
  173 + }
  174 + return writer->write((char*)buf, (int)count);
  175 +}
  176 +
139 SrsLiveStream::SrsLiveStream(SrsSource* s, SrsRequest* r) 177 SrsLiveStream::SrsLiveStream(SrsSource* s, SrsRequest* r)
140 { 178 {
141 source = s; 179 source = s;
@@ -150,7 +188,97 @@ SrsLiveStream::~SrsLiveStream() @@ -150,7 +188,97 @@ SrsLiveStream::~SrsLiveStream()
150 int SrsLiveStream::serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r) 188 int SrsLiveStream::serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r)
151 { 189 {
152 int ret = ERROR_SUCCESS; 190 int ret = ERROR_SUCCESS;
153 - // TODO: FIMXE: implements it. 191 +
  192 + // create consumer of souce.
  193 + SrsConsumer* consumer = NULL;
  194 + if ((ret = source->create_consumer(consumer)) != ERROR_SUCCESS) {
  195 + srs_error("http: create consumer failed. ret=%d", ret);
  196 + return ret;
  197 + }
  198 + SrsAutoFree(SrsConsumer, consumer);
  199 + srs_verbose("http: consumer created success.");
  200 +
  201 + SrsMessageArray msgs(SRS_PERF_MW_MSGS);
  202 + // TODO: FIMXE: add pithy print.
  203 +
  204 + // write http header for ts.
  205 + w->header()->set_content_length((int64_t)2 * 1024 * 1024 * 1024);
  206 + w->header()->set_content_type("video/x-flv");
  207 +
  208 + // the memory writer.
  209 + SrsFlvStreamWriter writer(w);
  210 +
  211 + SrsFlvEncoder enc;
  212 + if ((ret = enc.initialize(&writer)) != ERROR_SUCCESS) {
  213 + return ret;
  214 + }
  215 +
  216 + // write flv header.
  217 + if ((ret = enc.write_header()) != ERROR_SUCCESS) {
  218 + return ret;
  219 + }
  220 +
  221 + while (true) {
  222 + // get messages from consumer.
  223 + // each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
  224 + int count = 0;
  225 + if ((ret = consumer->dump_packets(&msgs, count)) != ERROR_SUCCESS) {
  226 + srs_error("get messages from consumer failed. ret=%d", ret);
  227 + return ret;
  228 + }
  229 +
  230 + if (count <= 0) {
  231 + srs_info("mw sleep %dms for no msg", mw_sleep);
  232 + // directly use sleep, donot use consumer wait.
  233 + st_usleep(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
  234 +
  235 + // ignore when nothing got.
  236 + continue;
  237 + }
  238 + srs_info("got %d msgs, min=%d, mw=%d", count,
  239 + SRS_PERF_MW_MIN_MSGS, SRS_CONSTS_RTMP_PULSE_TIMEOUT_US / 1000);
  240 +
  241 + // sendout all messages.
  242 + ret = send_messages(&enc, msgs.msgs, count);
  243 +
  244 + // free the messages.
  245 + for (int i = 0; i < count; i++) {
  246 + SrsSharedPtrMessage* msg = msgs.msgs[i];
  247 + srs_freep(msg);
  248 + }
  249 +
  250 + // check send error code.
  251 + if (ret != ERROR_SUCCESS) {
  252 + if (!srs_is_client_gracefully_close(ret)) {
  253 + srs_error("send messages to client failed. ret=%d", ret);
  254 + }
  255 + return ret;
  256 + }
  257 + }
  258 +
  259 + return ret;
  260 +}
  261 +
  262 +int SrsLiveStream::send_messages(SrsFlvEncoder* enc, SrsSharedPtrMessage** msgs, int nb_msgs)
  263 +{
  264 + int ret = ERROR_SUCCESS;
  265 +
  266 + for (int i = 0; i < nb_msgs; i++) {
  267 + SrsSharedPtrMessage* msg = msgs[i];
  268 +
  269 + if (msg->is_audio()) {
  270 + ret = enc->write_audio(msg->timestamp, msg->payload, msg->size);
  271 + } else if (msg->is_video()) {
  272 + ret = enc->write_video(msg->timestamp, msg->payload, msg->size);
  273 + } else {
  274 + ret = enc->write_metadata(msg->timestamp, msg->payload, msg->size);
  275 + }
  276 +
  277 + if (ret != ERROR_SUCCESS) {
  278 + return ret;
  279 + }
  280 + }
  281 +
154 return ret; 282 return ret;
155 } 283 }
156 284
@@ -198,7 +326,7 @@ int SrsHttpServer::mount(SrsSource* s, SrsRequest* r) @@ -198,7 +326,7 @@ int SrsHttpServer::mount(SrsSource* s, SrsRequest* r)
198 mount = srs_string_replace(mount, "[stream]", r->stream); 326 mount = srs_string_replace(mount, "[stream]", r->stream);
199 327
200 // remove the default vhost mount 328 // remove the default vhost mount
201 - mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", ""); 329 + mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", "/");
202 330
203 // mount the http flv stream. 331 // mount the http flv stream.
204 if ((ret = mux.handle(mount, new SrsLiveStream(s, r))) != ERROR_SUCCESS) { 332 if ((ret = mux.handle(mount, new SrsLiveStream(s, r))) != ERROR_SUCCESS) {
@@ -261,7 +389,7 @@ int SrsHttpServer::mount_static_file() @@ -261,7 +389,7 @@ int SrsHttpServer::mount_static_file()
261 mount = srs_string_replace(mount, "[vhost]", vhost); 389 mount = srs_string_replace(mount, "[vhost]", vhost);
262 390
263 // remove the default vhost mount 391 // remove the default vhost mount
264 - mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", ""); 392 + mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", "/");
265 393
266 // the dir mount must always ends with "/" 394 // the dir mount must always ends with "/"
267 if (mount != "/" && mount.rfind("/") != mount.length() - 1) { 395 if (mount != "/" && mount.rfind("/") != mount.length() - 1) {
@@ -36,13 +36,16 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -36,13 +36,16 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
36 #include <srs_app_conn.hpp> 36 #include <srs_app_conn.hpp>
37 #include <srs_app_http.hpp> 37 #include <srs_app_http.hpp>
38 #include <srs_app_reload.hpp> 38 #include <srs_app_reload.hpp>
  39 +#include <srs_kernel_file.hpp>
39 40
40 class SrsSource; 41 class SrsSource;
41 class SrsRequest; 42 class SrsRequest;
42 class SrsStSocket; 43 class SrsStSocket;
  44 +class SrsFlvEncoder;
43 class SrsHttpParser; 45 class SrsHttpParser;
44 class SrsHttpMessage; 46 class SrsHttpMessage;
45 class SrsHttpHandler; 47 class SrsHttpHandler;
  48 +class SrsSharedPtrMessage;
46 49
47 /** 50 /**
48 * the flv vod stream supports flv?start=offset-bytes. 51 * the flv vod stream supports flv?start=offset-bytes.
@@ -60,6 +63,26 @@ protected: @@ -60,6 +63,26 @@ protected:
60 }; 63 };
61 64
62 /** 65 /**
  66 +* write stream to http response direclty.
  67 +*/
  68 +class SrsFlvStreamWriter : public SrsFileWriter
  69 +{
  70 +private:
  71 + ISrsGoHttpResponseWriter* writer;
  72 +public:
  73 + SrsFlvStreamWriter(ISrsGoHttpResponseWriter* w);
  74 + virtual ~SrsFlvStreamWriter();
  75 +public:
  76 + virtual int open(std::string file);
  77 + virtual void close();
  78 +public:
  79 + virtual bool is_open();
  80 + virtual int64_t tellg();
  81 +public:
  82 + virtual int write(void* buf, size_t count, ssize_t* pnwrite);
  83 +};
  84 +
  85 +/**
63 * the flv live stream supports access rtmp in flv over http. 86 * the flv live stream supports access rtmp in flv over http.
64 * srs will remux rtmp to flv streaming. 87 * srs will remux rtmp to flv streaming.
65 */ 88 */
@@ -73,6 +96,8 @@ public: @@ -73,6 +96,8 @@ public:
73 virtual ~SrsLiveStream(); 96 virtual ~SrsLiveStream();
74 public: 97 public:
75 virtual int serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r); 98 virtual int serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r);
  99 +private:
  100 + virtual int send_messages(SrsFlvEncoder* enc, SrsSharedPtrMessage** msgs, int nb_msgs);
76 }; 101 };
77 102
78 /** 103 /**