Blame view

trunk/src/app/srs_app_forward.cpp 10.0 KB
1 2 3
/*
The MIT License (MIT)
4
Copyright (c) 2013-2014 winlin
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
24
#include <srs_app_forward.hpp>
25 26 27 28 29 30

#include <stdlib.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
31
#include <srs_app_source.hpp>
32
#include <srs_core_autofree.hpp>
33
#include <srs_app_socket.hpp>
34
#include <srs_kernel_error.hpp>
35
#include <srs_kernel_log.hpp>
36 37
#include <srs_app_config.hpp>
#include <srs_app_pithy_print.hpp>
38
#include <srs_protocol_rtmp.hpp>
39 40 41
#include <srs_protocol_rtmp_stack.hpp>
#include <srs_protocol_utility.hpp>
#include <srs_protocol_rtmp.hpp>
42
winlin authored
43 44 45
// when error, forwarder sleep for a while and retry.
#define SRS_FORWARDER_SLEEP_US (int64_t)(3*1000*1000LL)
46
SrsForwarder::SrsForwarder(SrsSource* _source)
47
{
48 49 50 51 52 53
    source = _source;
    
    io = NULL;
    client = NULL;
    stfd = NULL;
    stream_id = 0;
54
55 56 57
    pthread = new SrsThread(this, SRS_FORWARDER_SLEEP_US);
    queue = new SrsMessageQueue();
    jitter = new SrsRtmpJitter();
58 59 60 61
}

SrsForwarder::~SrsForwarder()
{
62 63 64 65 66
    on_unpublish();
    
    srs_freep(pthread);
    srs_freep(queue);
    srs_freep(jitter);
winlin authored
67 68 69 70
}

void SrsForwarder::set_queue_size(double queue_size)
{
71
    queue->set_queue_size(queue_size);
72 73 74 75
}

int SrsForwarder::on_publish(SrsRequest* req, std::string forward_server)
{
76 77 78 79 80 81 82 83 84 85
    int ret = ERROR_SUCCESS;
    
    // forward app
    app = req->app;
    
    stream_name = req->stream;
    server = forward_server;
    std::string s_port = RTMP_DEFAULT_PORT;
    port = ::atoi(RTMP_DEFAULT_PORT);
    
86
    // TODO: FIXME: parse complex params
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
    size_t pos = forward_server.find(":");
    if (pos != std::string::npos) {
        s_port = forward_server.substr(pos + 1);
        server = forward_server.substr(0, pos);
    }
    // discovery vhost
    std::string vhost = req->vhost;
    srs_vhost_resolve(vhost, s_port);
    port = ::atoi(s_port.c_str());
    
    // generate tcUrl
    tc_url = "rtmp://";
    tc_url += vhost;
    tc_url += "/";
    tc_url += req->app;
    
    // dead loop check
104 105
    std::string source_ep = "rtmp://";
    source_ep += req->host;
106 107
    source_ep += ":";
    source_ep += req->port;
108 109
    source_ep += "?vhost=";
    source_ep += req->vhost;
110
    
111 112 113 114 115 116
    std::string dest_ep = "rtmp://";
    if (forward_server == "127.0.0.1") {
        dest_ep += req->host;
    } else {
        dest_ep += forward_server;
    }
117 118
    dest_ep += ":";
    dest_ep += s_port;
119 120
    dest_ep += "?vhost=";
    dest_ep += vhost;
121 122 123
    
    if (source_ep == dest_ep) {
        ret = ERROR_SYSTEM_FORWARD_LOOP;
124
        srs_warn("forward loop detected. src=%s, dest=%s, ret=%d", 
125 126 127 128 129 130 131 132
            source_ep.c_str(), dest_ep.c_str(), ret);
        return ret;
    }
    srs_trace("start forward %s to %s, stream: %s/%s", 
        source_ep.c_str(), dest_ep.c_str(), tc_url.c_str(), 
        stream_name.c_str());
    
    if ((ret = pthread->start()) != ERROR_SUCCESS) {
133 134
        srs_error("start srs thread failed. ret=%d", ret);
        return ret;
135
    }
winlin authored
136
    srs_trace("forward thread cid=%d, current_cid=%d", pthread->cid(), _srs_context->get_id());
137 138
    
    return ret;
139 140 141 142
}

void SrsForwarder::on_unpublish()
{
143 144 145 146 147 148
    pthread->stop();
    
    close_underlayer_socket();
    
    srs_freep(client);
    srs_freep(io);
149 150
}
151
int SrsForwarder::on_meta_data(SrsSharedPtrMessage* metadata)
152
{
153 154 155 156 157 158 159 160 161 162 163 164
    int ret = ERROR_SUCCESS;
    
    if ((ret = jitter->correct(metadata, 0, 0)) != ERROR_SUCCESS) {
        srs_freep(metadata);
        return ret;
    }
    
    if ((ret = queue->enqueue(metadata)) != ERROR_SUCCESS) {
        return ret;
    }
    
    return ret;
165 166
}
167
int SrsForwarder::on_audio(SrsSharedPtrMessage* msg)
168
{
169 170 171 172 173 174 175 176 177 178 179 180
    int ret = ERROR_SUCCESS;
    
    if ((ret = jitter->correct(msg, 0, 0)) != ERROR_SUCCESS) {
        srs_freep(msg);
        return ret;
    }
    
    if ((ret = queue->enqueue(msg)) != ERROR_SUCCESS) {
        return ret;
    }
    
    return ret;
181 182
}
183
int SrsForwarder::on_video(SrsSharedPtrMessage* msg)
184
{
185 186 187 188 189 190 191 192 193 194 195 196
    int ret = ERROR_SUCCESS;
    
    if ((ret = jitter->correct(msg, 0, 0)) != ERROR_SUCCESS) {
        srs_freep(msg);
        return ret;
    }
    
    if ((ret = queue->enqueue(msg)) != ERROR_SUCCESS) {
        return ret;
    }
    
    return ret;
197 198 199 200
}

int SrsForwarder::cycle()
{
201 202 203 204 205 206
    int ret = ERROR_SUCCESS;
    
    if ((ret = connect_server()) != ERROR_SUCCESS) {
        return ret;
    }
    srs_assert(client);
207
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
    client->set_recv_timeout(SRS_RECV_TIMEOUT_US);
    client->set_send_timeout(SRS_SEND_TIMEOUT_US);
    
    if ((ret = client->handshake()) != ERROR_SUCCESS) {
        srs_error("handshake with server failed. ret=%d", ret);
        return ret;
    }
    if ((ret = client->connect_app(app, tc_url)) != ERROR_SUCCESS) {
        srs_error("connect with server failed, tcUrl=%s. ret=%d", tc_url.c_str(), ret);
        return ret;
    }
    if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {
        srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret);
        return ret;
    }
    
    if ((ret = client->publish(stream_name, stream_id)) != ERROR_SUCCESS) {
        srs_error("connect with server failed, stream_name=%s, stream_id=%d. ret=%d", 
            stream_name.c_str(), stream_id, ret);
        return ret;
    }
    
    if ((ret = source->on_forwarder_start(this)) != ERROR_SUCCESS) {
        srs_error("callback the source to feed the sequence header failed. ret=%d", ret);
        return ret;
    }
    
    if ((ret = forward()) != ERROR_SUCCESS) {
        return ret;
    }
    
    return ret;
240 241 242 243
}

void SrsForwarder::close_underlayer_socket()
{
244
    srs_close_stfd(stfd);
245 246 247 248
}

int SrsForwarder::connect_server()
{
249 250 251 252 253 254 255 256
    int ret = ERROR_SUCCESS;
    
    // reopen
    close_underlayer_socket();
    
    // open socket.
    srs_trace("forward stream=%s, tcUrl=%s to server=%s, port=%d",
        stream_name.c_str(), tc_url.c_str(), server.c_str(), port);
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271

    int sock = socket(AF_INET, SOCK_STREAM, 0);
    if(sock == -1){
        ret = ERROR_SOCKET_CREATE;
        srs_error("create socket error. ret=%d", ret);
        return ret;
    }
    
    srs_assert(!stfd);
    stfd = st_netfd_open_socket(sock);
    if(stfd == NULL){
        ret = ERROR_ST_OPEN_SOCKET;
        srs_error("st_netfd_open_socket failed. ret=%d", ret);
        return ret;
    }
272
    
273 274 275 276 277 278 279 280 281 282 283 284 285 286
    srs_freep(client);
    srs_freep(io);
    
    io = new SrsSocket(stfd);
    client = new SrsRtmpClient(io);
    
    // connect to server.
    std::string ip = srs_dns_resolve(server);
    if (ip.empty()) {
        ret = ERROR_SYSTEM_IP_INVALID;
        srs_error("dns resolve server error, ip empty. ret=%d", ret);
        return ret;
    }
    
287 288 289 290 291 292 293 294 295 296 297 298
    sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(port);
    addr.sin_addr.s_addr = inet_addr(ip.c_str());
    
    if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), ST_UTIME_NO_TIMEOUT) == -1){
        ret = ERROR_ST_CONNECT;
        srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret);
        return ret;
    }
    srs_trace("connect to server success. server=%s, ip=%s, port=%d", server.c_str(), ip.c_str(), port);
    
299
    return ret;
300 301 302 303
}

int SrsForwarder::forward()
{
304 305 306 307 308
    int ret = ERROR_SUCCESS;
    
    client->set_recv_timeout(SRS_PULSE_TIMEOUT_US);
    
    SrsPithyPrint pithy_print(SRS_STAGE_FORWARDER);
309
310 311 312
    while (pthread->can_loop()) {
        // switch to other st-threads.
        st_usleep(0);
313
314 315
        pithy_print.elapse();
316 317
        // read from client.
        if (true) {
318 319
            SrsMessage* msg = NULL;
            ret = client->recv_message(&msg);
320 321 322 323 324 325
            
            srs_verbose("play loop recv message. ret=%d", ret);
            if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) {
                srs_error("recv server control message failed. ret=%d", ret);
                return ret;
            }
326 327
            
            srs_freep(msg);
328 329 330 331
        }
        
        // forward all messages.
        int count = 0;
332
        SrsSharedPtrMessage** msgs = NULL;
333 334 335 336 337
        if ((ret = queue->get_packets(0, msgs, count)) != ERROR_SUCCESS) {
            srs_error("get message to forward failed. ret=%d", ret);
            return ret;
        }
        
338 339
        // pithy print
        if (pithy_print.can_print()) {
340 341 342 343
            srs_trace("-> "SRS_LOG_ID_FOWARDER
                " time=%"PRId64", msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d", 
                pithy_print.age(), count, client->get_send_bytes(), client->get_recv_bytes(), 
                client->get_send_kbps(), client->get_recv_kbps());
344 345
        }
        
346 347 348 349 350
        // ignore when no messages.
        if (count <= 0) {
            srs_verbose("no packets to forward.");
            continue;
        }
351
        SrsAutoFree(SrsSharedPtrMessage*, msgs, true);
352 353 354
    
        // all msgs to forward.
        for (int i = 0; i < count; i++) {
355
            SrsSharedPtrMessage* msg = msgs[i];
356 357 358 359
            
            srs_assert(msg);
            msgs[i] = NULL;
            
360
            if ((ret = client->send_and_free_message(msg)) != ERROR_SUCCESS) {
361 362 363 364 365 366 367
                srs_error("forwarder send message to server failed. ret=%d", ret);
                return ret;
            }
        }
    }
    
    return ret;
368 369
}