winlin

add ingest flv to RTMP over srs-librtmp

@@ -28,9 +28,262 @@ gcc srs_ingest_flv.c ../../objs/lib/srs_librtmp.a -g -O0 -lstdc++ -o srs_ingest_ @@ -28,9 +28,262 @@ gcc srs_ingest_flv.c ../../objs/lib/srs_librtmp.a -g -O0 -lstdc++ -o srs_ingest_
28 #include <stdlib.h> 28 #include <stdlib.h>
29 #include <unistd.h> 29 #include <unistd.h>
30 30
  31 +#include <sys/types.h>
  32 +#include <sys/stat.h>
  33 +#include <fcntl.h>
  34 +
31 #include "../../objs/include/srs_librtmp.h" 35 #include "../../objs/include/srs_librtmp.h"
32 36
  37 +#define trace(msg, ...) printf(msg, ##__VA_ARGS__);printf("\n")
  38 +#define verbose(msg, ...) printf(msg, ##__VA_ARGS__);printf("\n")
  39 +#if 1
  40 +#undef verbose
  41 +#define verbose(msg, ...) (void)0
  42 +#endif
  43 +
  44 +int proxy(int flv_fd, srs_rtmp_t ortmp);
  45 +int connect_oc(srs_rtmp_t ortmp);
  46 +
  47 +int open_flv_file(char* in_flv_file);
  48 +void close_flv_file(int flv_fd);
  49 +int flv_open_ic(int flv_fd);
  50 +int flv_read_packet(int flv_fd, int* type, u_int32_t* timestamp, char** data, int* size);
  51 +
33 int main(int argc, char** argv) 52 int main(int argc, char** argv)
34 { 53 {
  54 + int ret = 0;
  55 +
  56 + // user option parse index.
  57 + int opt = 0;
  58 + // user options.
  59 + char* in_flv_file; char* out_rtmp_url;
  60 + // rtmp handler
  61 + srs_rtmp_t ortmp;
  62 + // flv handler
  63 + int flv_fd;
  64 +
  65 + if (argc <= 2) {
  66 + printf("ingest flv file and publish to RTMP server\n"
  67 + "Usage: %s <-i in_flv_file> <-y out_rtmp_url>\n"
  68 + " in_flv_file input flv file, ingest from this file.\n"
  69 + " out_rtmp_url output rtmp url, publish to this url.\n"
  70 + "For example:\n"
  71 + " %s -i ../../doc/source.200kbps.768x320.flv -y rtmp://127.0.0.1/live/demo\n",
  72 + argv[0]);
  73 + ret = 1;
  74 + exit(ret);
  75 + return ret;
  76 + }
  77 +
  78 + // parse options in FFMPEG format.
  79 + while ((opt = getopt(argc, argv, "i:y:")) != -1) {
  80 + switch (opt) {
  81 + case 'i':
  82 + in_flv_file = optarg;
  83 + break;
  84 + case 'y':
  85 + out_rtmp_url = optarg;
  86 + break;
  87 + default:
  88 + break;
  89 + }
  90 + }
  91 +
  92 + trace("ingest flv file and publish to RTMP server like FFMPEG.");
  93 + trace("srs(simple-rtmp-server) client librtmp library.");
  94 + trace("version: %d.%d.%d", srs_version_major(), srs_version_minor(), srs_version_revision());
  95 + trace("input: %s", in_flv_file);
  96 + trace("output: %s", out_rtmp_url);
  97 +
  98 + flv_fd = open_flv_file(in_flv_file);
  99 + if (flv_fd <= 0) {
  100 + ret = 2;
  101 + trace("open flv file failed. ret=%d", ret);
  102 + return ret;
  103 + }
  104 +
  105 + ortmp = srs_rtmp_create(out_rtmp_url);
  106 +
  107 + ret = proxy(flv_fd, ortmp);
  108 + trace("proxy completed");
  109 +
  110 + srs_rtmp_destroy(ortmp);
  111 + close_flv_file(flv_fd);
  112 +
  113 + return ret;
  114 +}
  115 +
  116 +int64_t re_create()
  117 +{
35 return 0; 118 return 0;
36 } 119 }
  120 +int64_t re_update(int64_t re, u_int32_t time)
  121 +{
  122 + if (time - re > 500) {
  123 + usleep((time - re) * 1000);
  124 + return time;
  125 + }
  126 +
  127 + return re;
  128 +}
  129 +
  130 +int proxy(int flv_fd, srs_rtmp_t ortmp)
  131 +{
  132 + int ret = 0;
  133 +
  134 + // packet data
  135 + int type, size;
  136 + u_int32_t timestamp = 0;
  137 + char* data = NULL;
  138 + // re
  139 + int64_t re = re_create();
  140 +
  141 + if ((ret = flv_open_ic(flv_fd)) != 0) {
  142 + return ret;
  143 + }
  144 + if ((ret = connect_oc(ortmp)) != 0) {
  145 + return ret;
  146 + }
  147 +
  148 + trace("start proxy RTMP stream");
  149 + for (;;) {
  150 + if ((ret = flv_read_packet(flv_fd, &type, &timestamp, &data, &size)) != 0) {
  151 + trace("irtmp get packet failed. ret=%d", ret);
  152 + return ret;
  153 + }
  154 + verbose("irtmp got packet: type=%s, time=%d, size=%d",
  155 + srs_type2string(type), timestamp, size);
  156 +
  157 + if ((ret = srs_write_packet(ortmp, type, timestamp, data, size)) != 0) {
  158 + trace("irtmp get packet failed. ret=%d", ret);
  159 + return ret;
  160 + }
  161 + verbose("ortmp sent packet: type=%s, time=%d, size=%d",
  162 + srs_type2string(type), timestamp, size);
  163 +
  164 + re = re_update(re, timestamp);
  165 + }
  166 +
  167 + return ret;
  168 +}
  169 +
  170 +int connect_oc(srs_rtmp_t ortmp)
  171 +{
  172 + int ret = 0;
  173 +
  174 + if ((ret = srs_simple_handshake(ortmp)) != 0) {
  175 + trace("ortmp simple handshake failed. ret=%d", ret);
  176 + return ret;
  177 + }
  178 + trace("ortmp simple handshake success");
  179 +
  180 + if ((ret = srs_connect_app(ortmp)) != 0) {
  181 + trace("ortmp connect vhost/app failed. ret=%d", ret);
  182 + return ret;
  183 + }
  184 + trace("ortmp connect vhost/app success");
  185 +
  186 + if ((ret = srs_publish_stream(ortmp)) != 0) {
  187 + trace("ortmp publish stream failed. ret=%d", ret);
  188 + return ret;
  189 + }
  190 + trace("ortmp publish stream success");
  191 +
  192 + return ret;
  193 +}
  194 +
  195 +int open_flv_file(char* in_flv_file)
  196 +{
  197 + return open(in_flv_file, O_RDONLY);
  198 +}
  199 +
  200 +void close_flv_file(int fd)
  201 +{
  202 + if (fd > 0) {
  203 + close(fd);
  204 + }
  205 +}
  206 +
  207 +int flv_open_ic(int flv_fd)
  208 +{
  209 + int ret = 0;
  210 +
  211 + char h[13]; // 9+4
  212 +
  213 + if (read(flv_fd, h, sizeof(h)) != sizeof(h)) {
  214 + ret = -1;
  215 + trace("read flv header failed. ret=%d", ret);
  216 + return ret;
  217 + }
  218 +
  219 + if (h[0] != 'F' || h[1] != 'L' || h[2] != 'V') {
  220 + ret = -1;
  221 + trace("input is not a flv file. ret=%d", ret);
  222 + return ret;
  223 + }
  224 +
  225 + return ret;
  226 +}
  227 +
  228 +int flv_read_packet(int flv_fd, int* type, u_int32_t* timestamp, char** data, int* size)
  229 +{
  230 + int ret = 0;
  231 +
  232 + char th[11]; // tag header
  233 + char ts[4]; // tag size
  234 +
  235 + u_int32_t data_size;
  236 + u_int32_t time;
  237 +
  238 + char* pp;
  239 +
  240 + // read tag header
  241 + if (read(flv_fd, th, sizeof(th)) != sizeof(th)) {
  242 + ret = -1;
  243 + trace("read flv tag header failed. ret=%d", ret);
  244 + return ret;
  245 + }
  246 +
  247 + // Reserved UB [2]
  248 + // Filter UB [1]
  249 + // TagType UB [5]
  250 + *type = (int)(th[0] & 0x1F);
  251 +
  252 + // DataSize UI24
  253 + pp = (char*)&data_size;
  254 + pp[2] = th[1];
  255 + pp[1] = th[2];
  256 + pp[0] = th[3];
  257 +
  258 + // Timestamp UI24
  259 + pp = (char*)&time;
  260 + pp[2] = th[4];
  261 + pp[1] = th[5];
  262 + pp[0] = th[6];
  263 +
  264 + // TimestampExtended UI8
  265 + pp[3] = th[7];
  266 +
  267 + *timestamp = time;
  268 +
  269 + if (data_size > 0) {
  270 + *size = data_size;
  271 + *data = (char*)malloc(data_size);
  272 +
  273 + // read tag data
  274 + if (read(flv_fd, *data, data_size) != data_size) {
  275 + ret = -1;
  276 + trace("read flv tag data failed. size=%d, ret=%d", data_size, ret);
  277 + return ret;
  278 + }
  279 + }
  280 +
  281 + // ignore 4bytes tag size.
  282 + if (read(flv_fd, ts, sizeof(ts)) != sizeof(ts)) {
  283 + ret = -1;
  284 + trace("read flv tag size failed. ret=%d", ret);
  285 + return ret;
  286 + }
  287 +
  288 + return ret;
  289 +}
@@ -44,6 +44,7 @@ int proxy(srs_rtmp_t irtmp, srs_rtmp_t ortmp); @@ -44,6 +44,7 @@ int proxy(srs_rtmp_t irtmp, srs_rtmp_t ortmp);
44 int main(int argc, char** argv) 44 int main(int argc, char** argv)
45 { 45 {
46 int ret = 0; 46 int ret = 0;
  47 +
47 // user option parse index. 48 // user option parse index.
48 int opt = 0; 49 int opt = 0;
49 // user options. 50 // user options.