Blame view

trunk/research/librtmp/srs_ingest_flv.c 7.5 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
/*
The MIT License (MIT)

Copyright (c) 2013-2014 winlin

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
/**
gcc srs_ingest_flv.c ../../objs/lib/srs_librtmp.a -g -O0 -lstdc++ -o srs_ingest_flv
*/

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

#include "../../objs/include/srs_librtmp.h"
winlin authored
37
int proxy(srs_flv_t flv, srs_rtmp_t ortmp);
38 39
int connect_oc(srs_rtmp_t ortmp);
40
#define RE_PULSE_MS 300
41
int64_t re_create();
42 43
void re_update(int64_t re, int32_t starttime, u_int32_t time);
void re_cleanup(int64_t re, int32_t starttime, u_int32_t time);
44
45
int64_t tools_main_entrance_startup_time;
46 47 48 49
int main(int argc, char** argv)
{
    int ret = 0;
    
50
    // main function
51
    tools_main_entrance_startup_time = srs_utils_time_ms();
52
    
53 54 55 56 57 58 59
    // user option parse index.
    int opt = 0;
    // user options.
    char* in_flv_file; char* out_rtmp_url;
    // rtmp handler
    srs_rtmp_t ortmp;
    // flv handler
winlin authored
60
    srs_flv_t flv;
61
    
62 63 64 65
    printf("ingest flv file and publish to RTMP server like FFMPEG.\n");
    printf("srs(simple-rtmp-server) client librtmp library.\n");
    printf("version: %d.%d.%d\n", srs_version_major(), srs_version_minor(), srs_version_revision());
    
66 67 68 69 70 71
    if (argc <= 2) {
        printf("ingest flv file and publish to RTMP server\n"
            "Usage: %s <-i in_flv_file> <-y out_rtmp_url>\n"
            "   in_flv_file     input flv file, ingest from this file.\n"
            "   out_rtmp_url    output rtmp url, publish to this url.\n"
            "For example:\n"
72 73 74 75
            "   %s -i doc/source.200kbps.768x320.flv -y rtmp://127.0.0.1/live/livestream\n"
            "   %s -i ../../doc/source.200kbps.768x320.flv -y rtmp://127.0.0.1/live/livestream\n",
            argv[0], argv[0], argv[0]);
        exit(-1);
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
    }
    
    // parse options in FFMPEG format.
    while ((opt = getopt(argc, argv, "i:y:")) != -1) {
        switch (opt) {
            case 'i':
                in_flv_file = optarg;
                break;
            case 'y':
                out_rtmp_url = optarg;
                break;
            default:
                break;
        }
    }
    
92 93
    srs_human_trace("input:  %s", in_flv_file);
    srs_human_trace("output: %s", out_rtmp_url);
94
95
    if ((flv = srs_flv_open_read(in_flv_file)) == NULL) {
96
        ret = 2;
97
        srs_human_trace("open flv file failed. ret=%d", ret);
98 99 100 101 102
        return ret;
    }
    
    ortmp = srs_rtmp_create(out_rtmp_url);
winlin authored
103
    ret = proxy(flv, ortmp);
104
    srs_human_trace("ingest flv to RTMP completed");
105 106
    
    srs_rtmp_destroy(ortmp);
winlin authored
107
    srs_flv_close(flv);
108 109 110 111
    
    return ret;
}
112
int do_proxy(srs_flv_t flv, srs_rtmp_t ortmp, int64_t re, int32_t* pstarttime, u_int32_t* ptimestamp)
113 114 115 116
{
    int ret = 0;
    
    // packet data
winlin authored
117 118
    char type;
    int size;
119 120
    char* data = NULL;
    
121
    srs_human_trace("start ingest flv to RTMP stream");
122
    for (;;) {
winlin authored
123 124 125
        // tag header
        if ((ret = srs_flv_read_tag_header(flv, &type, &size, ptimestamp)) != 0) {
            if (srs_flv_is_eof(ret)) {
126
                srs_human_trace("parse completed.");
winlin authored
127 128
                return 0;
            }
129
            srs_human_trace("flv get packet failed. ret=%d", ret);
winlin authored
130 131 132 133
            return ret;
        }
        
        if (size <= 0) {
134
            srs_human_trace("invalid size=%d", size);
winlin authored
135 136 137 138 139 140
            break;
        }
        
        // TODO: FIXME: mem leak when error.
        data = (char*)malloc(size);
        if ((ret = srs_flv_read_tag_data(flv, data, size)) != 0) {
141 142 143
            return ret;
        }
        
144 145
        u_int32_t timestamp = *ptimestamp;
        
146 147
        if ((ret = srs_human_print_rtmp_packet(type, timestamp, data, size)) != 0) {
            srs_human_trace("print packet failed. ret=%d", ret);
148 149 150
            return ret;
        }
        
151
        if ((ret = srs_rtmp_write_packet(ortmp, type, *ptimestamp, data, size)) != 0) {
152
            srs_human_trace("irtmp get packet failed. ret=%d", ret);
153 154
            return ret;
        }
155 156 157 158
            
        if (*pstarttime < 0) {
            *pstarttime = *ptimestamp;
        }
159
        
160
        re_update(re, *pstarttime, *ptimestamp);
161 162 163 164 165
    }
    
    return ret;
}
winlin authored
166
int proxy(srs_flv_t flv, srs_rtmp_t ortmp)
167 168 169
{
    int ret = 0;
    u_int32_t timestamp = 0;
170
    int32_t starttime = -1;
171
    
winlin authored
172 173
    char header[13];
    if ((ret = srs_flv_read_header(flv, header)) != 0) {
174 175 176 177 178 179 180 181
        return ret;
    }
    if ((ret = connect_oc(ortmp)) != 0) {
        return ret;
    }
    
    int64_t re = re_create();
    
182
    ret = do_proxy(flv, ortmp, re, &starttime, &timestamp);
183 184
    
    // for the last pulse, always sleep.
185
    re_cleanup(re, starttime, timestamp);
186 187 188 189
    
    return ret;
}
190 191 192 193
int connect_oc(srs_rtmp_t ortmp)
{
    int ret = 0;
    
194
    if ((ret = srs_rtmp_handshake(ortmp)) != 0) {
195
        srs_human_trace("ortmp simple handshake failed. ret=%d", ret);
196 197
        return ret;
    }
198
    srs_human_trace("ortmp simple handshake success");
199
    
200
    if ((ret = srs_rtmp_connect_app(ortmp)) != 0) {
201
        srs_human_trace("ortmp connect vhost/app failed. ret=%d", ret);
202 203
        return ret;
    }
204
    srs_human_trace("ortmp connect vhost/app success");
205
    
206
    if ((ret = srs_rtmp_publish_stream(ortmp)) != 0) {
207
        srs_human_trace("ortmp publish stream failed. ret=%d", ret);
208 209
        return ret;
    }
210
    srs_human_trace("ortmp publish stream success");
211 212 213 214 215 216
    
    return ret;
}

int64_t re_create()
{
217
    // if not very precise, we can directly use this as re.
218
    int64_t re = srs_utils_time_ms();
219 220 221
    
    // use the starttime to get the deviation
    int64_t deviation = re - tools_main_entrance_startup_time;
222
    srs_human_trace("deviation is %d ms, pulse is %d ms", (int)(deviation), (int)(RE_PULSE_MS));
223
    
224 225 226
    // so, we adjust time to max(0, deviation)
    // because the last pulse, we already sleeped
    int adjust = (int)(deviation);
227
    if (adjust > 0) {
228
        srs_human_trace("adjust re time for %d ms", adjust);
229 230
        re -= adjust;
    } else {
231
        srs_human_trace("no need to adjust re time");
232 233 234 235
    }
    
    return re;
}
236
void re_update(int64_t re, int32_t starttime, u_int32_t time)
237
{
238
    // send by pulse algorithm.
239
    int64_t now = srs_utils_time_ms();
240
    int64_t diff = time - starttime - (now -re);
241 242 243 244
    if (diff > RE_PULSE_MS) {
        usleep(diff * 1000);
    }
}
245
void re_cleanup(int64_t re, int32_t starttime, u_int32_t time)
246 247 248
{
    // for the last pulse, always sleep.
    // for the virtual live encoder long time publishing.
249
    int64_t now = srs_utils_time_ms();
250
    int64_t diff = time - starttime - (now -re);
251
    if (diff > 0) {
252
        srs_human_trace("re_cleanup, diff=%d, start=%d, last=%d ms", 
253
            (int)diff, starttime, time);
254 255 256
        usleep(diff * 1000);
    }
}