winlin

for #738, add srs ingest mp4

@@ -105,7 +105,7 @@ int main(int argc, char** argv) @@ -105,7 +105,7 @@ int main(int argc, char** argv)
105 return -1; 105 return -1;
106 } 106 }
107 107
108 - srs_human_trace("input: %s", in_flv_file); 108 + srs_human_trace("input: %s", in_flv_file);
109 srs_human_trace("output: %s", out_rtmp_url); 109 srs_human_trace("output: %s", out_rtmp_url);
110 110
111 if ((flv = srs_flv_open_read(in_flv_file)) == NULL) { 111 if ((flv = srs_flv_open_read(in_flv_file)) == NULL) {
@@ -26,10 +26,9 @@ @@ -26,10 +26,9 @@
26 26
27 #include "../../objs/include/srs_librtmp.h" 27 #include "../../objs/include/srs_librtmp.h"
28 28
  29 +int proxy(srs_mp4_t mp4, srs_rtmp_t ortmp);
29 int main(int argc, char** argv) 30 int main(int argc, char** argv)
30 { 31 {
31 - int ret, opt;  
32 -  
33 printf("Ingest mp4 file and publish to RTMP server like FFMPEG.\n"); 32 printf("Ingest mp4 file and publish to RTMP server like FFMPEG.\n");
34 printf("SRS(OSSRS) client librtmp library.\n"); 33 printf("SRS(OSSRS) client librtmp library.\n");
35 printf("Version: %d.%d.%d\n", srs_version_major(), srs_version_minor(), srs_version_revision()); 34 printf("Version: %d.%d.%d\n", srs_version_major(), srs_version_minor(), srs_version_revision());
@@ -46,9 +45,210 @@ int main(int argc, char** argv) @@ -46,9 +45,210 @@ int main(int argc, char** argv)
46 exit(-1); 45 exit(-1);
47 } 46 }
48 47
49 - for (opt = 0; opt < argc; opt++) {  
50 - srs_human_trace("argv[%d]=%s", opt, argv[opt]); 48 + for (int opt = 0; opt < argc; opt++) {
  49 + srs_human_trace("The argv[%d]=%s", opt, argv[opt]);
  50 + }
  51 +
  52 + // fill the options for mac
  53 + char* in_file = NULL;
  54 + char* out_rtmp_url = NULL;
  55 + for (int opt = 0; opt < argc - 1; opt++) {
  56 + // ignore all options except -i and -y.
  57 + char* p = argv[opt];
  58 +
  59 + // only accept -x
  60 + if (p[0] != '-' || p[1] == 0 || p[2] != 0) {
  61 + continue;
  62 + }
  63 +
  64 + // parse according the option name.
  65 + switch (p[1]) {
  66 + case 'i': in_file = argv[opt + 1]; break;
  67 + case 'y': out_rtmp_url = argv[opt + 1]; break;
  68 + default: break;
  69 + }
  70 + }
  71 +
  72 + if (!in_file) {
  73 + srs_human_trace("Invalid input file, use -i <input>");
  74 + return -1;
  75 + }
  76 + if (!out_rtmp_url) {
  77 + srs_human_trace("Invalid output url, use -y <output>");
  78 + return -1;
  79 + }
  80 +
  81 + srs_human_trace("Input file: %s", in_file);
  82 + srs_human_trace("Output url: %s", out_rtmp_url);
  83 +
  84 + int ret = 0;
  85 +
  86 + srs_mp4_t mp4 = NULL;
  87 + if ((mp4 = srs_mp4_open_read(in_file)) == NULL) {
  88 + ret = 2;
  89 + srs_human_trace("open mp4 file failed. ret=%d", ret);
  90 + return ret;
  91 + }
  92 +
  93 + srs_rtmp_t ortmp = srs_rtmp_create(out_rtmp_url);
  94 +
  95 + ret = proxy(mp4, ortmp);
  96 + srs_human_trace("Ingest mp4 to RTMP ok.");
  97 +
  98 + srs_rtmp_destroy(ortmp);
  99 + srs_mp4_close(mp4);
  100 +
  101 + return ret;
  102 +}
  103 +
  104 +int connect_oc(srs_rtmp_t ortmp)
  105 +{
  106 + int ret = 0;
  107 +
  108 + if ((ret = srs_rtmp_handshake(ortmp)) != 0) {
  109 + srs_human_trace("ortmp simple handshake failed. ret=%d", ret);
  110 + return ret;
  111 + }
  112 + srs_human_trace("ortmp simple handshake success");
  113 +
  114 + if ((ret = srs_rtmp_connect_app(ortmp)) != 0) {
  115 + srs_human_trace("ortmp connect vhost/app failed. ret=%d", ret);
  116 + return ret;
51 } 117 }
  118 + srs_human_trace("ortmp connect vhost/app success");
  119 +
  120 + if ((ret = srs_rtmp_publish_stream(ortmp)) != 0) {
  121 + srs_human_trace("ortmp publish stream failed. ret=%d", ret);
  122 + return ret;
  123 + }
  124 + srs_human_trace("ortmp publish stream success");
  125 +
  126 + return ret;
  127 +}
  128 +
  129 +#define RE_PULSE_MS 300
  130 +#define RE_PULSE_JITTER_MS 3000
  131 +int64_t tools_main_entrance_startup_time;
  132 +
  133 +int64_t re_create()
  134 +{
  135 + // if not very precise, we can directly use this as re.
  136 + int64_t re = srs_utils_time_ms();
  137 +
  138 + // use the starttime to get the deviation
  139 + int64_t deviation = re - tools_main_entrance_startup_time;
  140 + srs_human_trace("deviation is %d ms, pulse is %d ms", (int)(deviation), (int)(RE_PULSE_MS));
  141 +
  142 + // so, we adjust time to max(0, deviation)
  143 + // because the last pulse, we already sleeped
  144 + int adjust = (int)(deviation);
  145 + if (adjust > 0) {
  146 + srs_human_trace("adjust re time for %d ms", adjust);
  147 + re -= adjust;
  148 + } else {
  149 + srs_human_trace("no need to adjust re time");
  150 + }
  151 +
  152 + return re;
  153 +}
  154 +
  155 +void re_update(int64_t re, int32_t starttime, uint32_t time)
  156 +{
  157 + // send by pulse algorithm.
  158 + int64_t now = srs_utils_time_ms();
  159 + int64_t diff = time - starttime - (now -re);
  160 + if (diff > RE_PULSE_MS && diff < RE_PULSE_JITTER_MS) {
  161 + usleep((useconds_t)(diff * 1000));
  162 + }
  163 +}
  164 +
  165 +void re_cleanup(int64_t re, int32_t starttime, uint32_t time)
  166 +{
  167 + // for the last pulse, always sleep.
  168 + // for the virtual live encoder long time publishing.
  169 + int64_t now = srs_utils_time_ms();
  170 + int64_t diff = time - starttime - (now -re);
  171 + if (diff > 0) {
  172 + srs_human_trace("re_cleanup, diff=%d, start=%d, last=%d ms",
  173 + (int)diff, starttime, time);
  174 + usleep((useconds_t)(diff * 1000));
  175 + }
  176 +}
  177 +
  178 +int do_proxy(srs_mp4_t mp4, srs_rtmp_t ortmp, int64_t re, int32_t* pstarttime, uint32_t* ptimestamp)
  179 +{
  180 + int ret = 0;
  181 +
  182 + // packet data
  183 + char type;
  184 + int size;
  185 + char* data = NULL;
  186 +
  187 + srs_human_trace("start ingest mp4 to RTMP stream");
  188 + for (;;) {
  189 +#if 0
  190 + // tag header
  191 + if ((ret = srs_flv_read_tag_header(flv, &type, &size, ptimestamp)) != 0) {
  192 + if (srs_flv_is_eof(ret)) {
  193 + srs_human_trace("parse completed.");
  194 + return 0;
  195 + }
  196 + srs_human_trace("flv get packet failed. ret=%d", ret);
  197 + return ret;
  198 + }
  199 +
  200 + if (size <= 0) {
  201 + srs_human_trace("invalid size=%d", size);
  202 + break;
  203 + }
  204 +
  205 + // TODO: FIXME: mem leak when error.
  206 + data = (char*)malloc(size);
  207 + if ((ret = srs_flv_read_tag_data(flv, data, size)) != 0) {
  208 + return ret;
  209 + }
  210 +#endif
  211 + uint32_t timestamp = *ptimestamp;
  212 +
  213 + if ((ret = srs_human_print_rtmp_packet(type, timestamp, data, size)) != 0) {
  214 + srs_human_trace("print packet failed. ret=%d", ret);
  215 + return ret;
  216 + }
  217 +
  218 + if ((ret = srs_rtmp_write_packet(ortmp, type, *ptimestamp, data, size)) != 0) {
  219 + srs_human_trace("irtmp get packet failed. ret=%d", ret);
  220 + return ret;
  221 + }
  222 +
  223 + if (*pstarttime < 0 && srs_utils_flv_tag_is_av(type)) {
  224 + *pstarttime = *ptimestamp;
  225 + }
  226 +
  227 + re_update(re, *pstarttime, *ptimestamp);
  228 + }
  229 +
  230 + return ret;
  231 +}
  232 +
  233 +int proxy(srs_mp4_t mp4, srs_rtmp_t ortmp)
  234 +{
  235 + int ret = 0;
  236 +
  237 + if ((ret = srs_mp4_init_demuxer(mp4)) != 0) {
  238 + return ret;
  239 + }
  240 + if ((ret = connect_oc(ortmp)) != 0) {
  241 + return ret;
  242 + }
  243 +
  244 + int64_t re = re_create();
  245 +
  246 + uint32_t timestamp = 0;
  247 + int32_t starttime = -1;
  248 + ret = do_proxy(mp4, ortmp, re, &starttime, &timestamp);
  249 +
  250 + // for the last pulse, always sleep.
  251 + re_cleanup(re, starttime, timestamp);
52 252
53 return ret; 253 return ret;
54 } 254 }
@@ -25,6 +25,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -25,6 +25,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
25 25
26 #include <string.h> 26 #include <string.h>
27 27
  28 +#include <srs_kernel_log.hpp>
28 #include <srs_kernel_error.hpp> 29 #include <srs_kernel_error.hpp>
29 30
30 SrsMp4Box::SrsMp4Box() 31 SrsMp4Box::SrsMp4Box()
@@ -434,9 +435,11 @@ SrsMp4Decoder::~SrsMp4Decoder() @@ -434,9 +435,11 @@ SrsMp4Decoder::~SrsMp4Decoder()
434 435
435 int SrsMp4Decoder::initialize(ISrsReader* r) 436 int SrsMp4Decoder::initialize(ISrsReader* r)
436 { 437 {
  438 + int ret = ERROR_SUCCESS;
  439 +
437 srs_assert(r); 440 srs_assert(r);
438 reader = r; 441 reader = r;
439 442
440 - return ERROR_SUCCESS; 443 + return ret;
441 } 444 }
442 445