Blame view

trunk/src/app/srs_app_source.cpp 33.6 KB
1 2 3
/*
The MIT License (MIT)
4
Copyright (c) 2013-2014 winlin
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
24
#include <srs_app_source.hpp>
25 26

#include <algorithm>
winlin authored
27
using namespace std;
28
29
#include <srs_kernel_log.hpp>
30
#include <srs_protocol_rtmp_stack.hpp>
31
#include <srs_core_autofree.hpp>
32
#include <srs_protocol_amf0.hpp>
33 34 35 36 37
#include <srs_app_codec.hpp>
#include <srs_app_hls.hpp>
#include <srs_app_forward.hpp>
#include <srs_app_config.hpp>
#include <srs_app_encoder.hpp>
38
#include <srs_protocol_rtmp.hpp>
winlin authored
39
#include <srs_app_dvr.hpp>
winlin authored
40
#include <srs_kernel_stream.hpp>
41
#include <srs_app_edge.hpp>
42
43 44
#define CONST_MAX_JITTER_MS         500
#define DEFAULT_FRAME_TIME_MS         40
45 46 47

SrsRtmpJitter::SrsRtmpJitter()
{
48
    last_pkt_correct_time = last_pkt_time = 0;
49 50 51 52 53 54
}

SrsRtmpJitter::~SrsRtmpJitter()
{
}
55
int SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, int tba, int tbv)
56
{
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
    int ret = ERROR_SUCCESS;

    // set to 0 for metadata.
    if (!msg->header.is_video() && !msg->header.is_audio()) {
        msg->header.timestamp = 0;
        return ret;
    }
    
    int sample_rate = tba;
    int frame_rate = tbv;
    
    /**
    * we use a very simple time jitter detect/correct algorithm:
    * 1. delta: ensure the delta is positive and valid,
    *     we set the delta to DEFAULT_FRAME_TIME_MS,
    *     if the delta of time is nagative or greater than CONST_MAX_JITTER_MS.
    * 2. last_pkt_time: specifies the original packet time,
    *     is used to detect next jitter.
    * 3. last_pkt_correct_time: simply add the positive delta, 
    *     and enforce the time monotonically.
    */
    int64_t time = msg->header.timestamp;
    int64_t delta = time - last_pkt_time;

    // if jitter detected, reset the delta.
    if (delta < 0 || delta > CONST_MAX_JITTER_MS) {
        // calc the right diff by audio sample rate
        if (msg->header.is_audio() && sample_rate > 0) {
            delta = (int64_t)(delta * 1000.0 / sample_rate);
        } else if (msg->header.is_video() && frame_rate > 0) {
            delta = (int64_t)(delta * 1.0 / frame_rate);
        } else {
            delta = DEFAULT_FRAME_TIME_MS;
        }

        // sometimes, the time is absolute time, so correct it again.
        if (delta < 0 || delta > CONST_MAX_JITTER_MS) {
            delta = DEFAULT_FRAME_TIME_MS;
        }
        
        srs_info("jitter detected, last_pts=%"PRId64", pts=%"PRId64", diff=%"PRId64", last_time=%"PRId64", time=%"PRId64", diff=%"PRId64"",
            last_pkt_time, time, time - last_pkt_time, last_pkt_correct_time, last_pkt_correct_time + delta, delta);
    } else {
        srs_verbose("timestamp no jitter. time=%"PRId64", last_pkt=%"PRId64", correct_to=%"PRId64"", 
            time, last_pkt_time, last_pkt_correct_time + delta);
    }
    
    last_pkt_correct_time = srs_max(0, last_pkt_correct_time + delta);
    
    msg->header.timestamp = last_pkt_correct_time;
    last_pkt_time = time;
    
    return ret;
110 111 112 113
}

int SrsRtmpJitter::get_time()
{
114
    return (int)last_pkt_correct_time;
115 116
}
winlin authored
117
SrsMessageQueue::SrsMessageQueue()
118
{
119 120
    queue_size_ms = 0;
    av_start_time = av_end_time = -1;
121 122
}
winlin authored
123
SrsMessageQueue::~SrsMessageQueue()
124
{
125
    clear();
126 127
}
winlin authored
128
void SrsMessageQueue::set_queue_size(double queue_size)
129
{
130
    queue_size_ms = (int)(queue_size * 1000);
131 132
}
133
int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg)
134
{
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
    int ret = ERROR_SUCCESS;
    
    if (msg->header.is_video() || msg->header.is_audio()) {
        if (av_start_time == -1) {
            av_start_time = msg->header.timestamp;
        }
        
        av_end_time = msg->header.timestamp;
    }
    
    msgs.push_back(msg);

    while (av_end_time - av_start_time > queue_size_ms) {
        shrink();
    }
    
    return ret;
152 153
}
154
int SrsMessageQueue::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count)
155
{
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
    int ret = ERROR_SUCCESS;
    
    if (msgs.empty()) {
        return ret;
    }
    
    if (max_count == 0) {
        count = (int)msgs.size();
    } else {
        count = srs_min(max_count, (int)msgs.size());
    }

    if (count <= 0) {
        return ret;
    }
    
172
    pmsgs = new SrsSharedPtrMessage*[count];
173 174 175 176 177
    
    for (int i = 0; i < count; i++) {
        pmsgs[i] = msgs[i];
    }
    
178
    SrsSharedPtrMessage* last = msgs[count - 1];
179 180 181 182 183 184 185 186 187
    av_start_time = last->header.timestamp;
    
    if (count == (int)msgs.size()) {
        msgs.clear();
    } else {
        msgs.erase(msgs.begin(), msgs.begin() + count);
    }
    
    return ret;
188 189
}
winlin authored
190
void SrsMessageQueue::shrink()
191
{
192 193 194 195 196 197 198
    int iframe_index = -1;
    
    // issue the first iframe.
    // skip the first frame, whatever the type of it,
    // for when we shrinked, the first is the iframe,
    // we will directly remove the gop next time.
    for (int i = 1; i < (int)msgs.size(); i++) {
199
        SrsSharedPtrMessage* msg = msgs[i];
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
        
        if (msg->header.is_video()) {
            if (SrsCodec::video_is_keyframe(msg->payload, msg->size)) {
                // the max frame index to remove.
                iframe_index = i;
                
                // set the start time, we will remove until this frame.
                av_start_time = msg->header.timestamp;
                
                break;
            }
        }
    }
    
    // no iframe, clear the queue.
    if (iframe_index < 0) {
        clear();
        return;
    }
    
    srs_trace("shrink the cache queue, size=%d, removed=%d, max=%.2f", 
        (int)msgs.size(), iframe_index, queue_size_ms / 1000.0);
    
    // remove the first gop from the front
    for (int i = 0; i < iframe_index; i++) {
225
        SrsSharedPtrMessage* msg = msgs[i];
226 227 228
        srs_freep(msg);
    }
    msgs.erase(msgs.begin(), msgs.begin() + iframe_index);
229 230
}
winlin authored
231
void SrsMessageQueue::clear()
232
{
233
    std::vector<SrsSharedPtrMessage*>::iterator it;
winlin authored
234
235
    for (it = msgs.begin(); it != msgs.end(); ++it) {
236
        SrsSharedPtrMessage* msg = *it;
237 238
        srs_freep(msg);
    }
winlin authored
239
240 241 242
    msgs.clear();
    
    av_start_time = av_end_time = -1;
winlin authored
243 244 245 246
}

SrsConsumer::SrsConsumer(SrsSource* _source)
{
247 248 249 250
    source = _source;
    paused = false;
    jitter = new SrsRtmpJitter();
    queue = new SrsMessageQueue();
winlin authored
251 252 253 254
}

SrsConsumer::~SrsConsumer()
{
255 256 257
    source->on_consumer_destroy(this);
    srs_freep(jitter);
    srs_freep(queue);
winlin authored
258 259 260 261
}

void SrsConsumer::set_queue_size(double queue_size)
{
262
    queue->set_queue_size(queue_size);
winlin authored
263 264 265 266
}

int SrsConsumer::get_time()
{
267
    return jitter->get_time();
winlin authored
268 269
}
270
int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, int tba, int tbv)
winlin authored
271
{
272 273
    int ret = ERROR_SUCCESS;
    
274 275 276 277 278
    if (!source->is_atc()) {
        if ((ret = jitter->correct(msg, tba, tbv)) != ERROR_SUCCESS) {
            srs_freep(msg);
            return ret;
        }
279 280 281 282 283 284 285
    }
    
    if ((ret = queue->enqueue(msg)) != ERROR_SUCCESS) {
        return ret;
    }
    
    return ret;
winlin authored
286 287
}
288
int SrsConsumer::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count)
winlin authored
289
{
290 291 292 293 294 295
    // paused, return nothing.
    if (paused) {
        return ERROR_SUCCESS;
    }
    
    return queue->get_packets(max_count, pmsgs, count);
winlin authored
296 297 298 299
}

int SrsConsumer::on_play_client_pause(bool is_pause)
{
300 301 302 303 304 305
    int ret = ERROR_SUCCESS;
    
    srs_trace("stream consumer change pause state %d=>%d", paused, is_pause);
    paused = is_pause;
    
    return ret;
306 307 308 309
}

SrsGopCache::SrsGopCache()
{
310 311
    cached_video_count = 0;
    enable_gop_cache = true;
312 313 314 315
}

SrsGopCache::~SrsGopCache()
{
316
    clear();
317 318 319 320
}

void SrsGopCache::set(bool enabled)
{
321 322 323 324 325 326 327 328 329
    enable_gop_cache = enabled;
    
    if (!enabled) {
        srs_info("disable gop cache, clear %d packets.", (int)gop_cache.size());
        clear();
        return;
    }
    
    srs_info("enable gop cache");
330 331
}
332
int SrsGopCache::cache(SrsSharedPtrMessage* msg)
333
{
334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366
    int ret = ERROR_SUCCESS;
    
    if (!enable_gop_cache) {
        srs_verbose("gop cache is disabled.");
        return ret;
    }
    
    // got video, update the video count if acceptable
    if (msg->header.is_video()) {
        cached_video_count++;
    }
    
    // no acceptable video or pure audio, disable the cache.
    if (cached_video_count == 0) {
        srs_verbose("ignore any frame util got a h264 video frame.");
        return ret;
    }
    
    // clear gop cache when got key frame
    if (msg->header.is_video() && SrsCodec::video_is_keyframe(msg->payload, msg->size)) {
        srs_info("clear gop cache when got keyframe. vcount=%d, count=%d",
            cached_video_count, (int)gop_cache.size());
            
        clear();
        
        // curent msg is video frame, so we set to 1.
        cached_video_count = 1;
    }
    
    // cache the frame.
    gop_cache.push_back(msg->copy());
    
    return ret;
367 368 369 370
}

void SrsGopCache::clear()
{
371
    std::vector<SrsSharedPtrMessage*>::iterator it;
372
    for (it = gop_cache.begin(); it != gop_cache.end(); ++it) {
373
        SrsSharedPtrMessage* msg = *it;
374 375 376
        srs_freep(msg);
    }
    gop_cache.clear();
377
378
    cached_video_count = 0;
379
}
380
    
381 382
int SrsGopCache::dump(SrsConsumer* consumer, int tba, int tbv)
{
383 384
    int ret = ERROR_SUCCESS;
    
385
    std::vector<SrsSharedPtrMessage*>::iterator it;
386
    for (it = gop_cache.begin(); it != gop_cache.end(); ++it) {
387
        SrsSharedPtrMessage* msg = *it;
388 389 390 391 392 393 394 395
        if ((ret = consumer->enqueue(msg->copy(), tba, tbv)) != ERROR_SUCCESS) {
            srs_error("dispatch cached gop failed. ret=%d", ret);
            return ret;
        }
    }
    srs_trace("dispatch cached gop success. count=%d, duration=%d", (int)gop_cache.size(), consumer->get_time());
    
    return ret;
396 397
}
398 399 400 401 402 403 404 405 406 407 408
bool SrsGopCache::empty()
{
    return gop_cache.empty();
}

int64_t SrsGopCache::get_start_time()
{
    if (empty()) {
        return 0;
    }
    
409
    SrsSharedPtrMessage* msg = gop_cache[0];
410 411 412 413 414
    srs_assert(msg);
    
    return msg->header.timestamp;
}
415 416
std::map<std::string, SrsSource*> SrsSource::pool;
417
int SrsSource::find(SrsRequest* req, SrsSource** ppsource)
418
{
419 420
    int ret = ERROR_SUCCESS;
    
421 422 423 424
    string stream_url = req->get_stream_url();
    string vhost = req->vhost;
    
    if (pool.find(stream_url) == pool.end()) {
425 426 427 428 429 430 431
        SrsSource* source = new SrsSource(req);
        if ((ret = source->initialize()) != ERROR_SUCCESS) {
            srs_freep(source);
            return ret;
        }
        
        pool[stream_url] = source;
winlin authored
432
        srs_info("create new source for url=%s, vhost=%s", stream_url.c_str(), vhost.c_str());
433 434
    }
    
435 436 437
    *ppsource = pool[stream_url];
    
    return ret;
438 439
}
440
SrsSource::SrsSource(SrsRequest* req)
441
{
442
    _req = req->copy();
443
    
444
#ifdef SRS_AUTO_HLS
445
    hls = new SrsHls(this);
446
#endif
winlin authored
447 448 449
#ifdef SRS_AUTO_DVR
    dvr = new SrsDvr(this);
#endif
450
#ifdef SRS_AUTO_TRANSCODE
451
    encoder = new SrsEncoder();
452
#endif
453 454 455 456 457 458
    
    cache_metadata = cache_sh_video = cache_sh_audio = NULL;
    
    frame_rate = sample_rate = 0;
    _can_publish = true;
    
winlin authored
459 460
    play_edge = new SrsPlayEdge();
    publish_edge = new SrsPublishEdge();
461 462 463
    gop_cache = new SrsGopCache();
    
    _srs_config->subscribe(this);
464
    atc = _srs_config->get_atc(_req->vhost);
465 466 467 468
}

SrsSource::~SrsSource()
{
469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492
    _srs_config->unsubscribe(this);
    
    if (true) {
        std::vector<SrsConsumer*>::iterator it;
        for (it = consumers.begin(); it != consumers.end(); ++it) {
            SrsConsumer* consumer = *it;
            srs_freep(consumer);
        }
        consumers.clear();
    }

    if (true) {
        std::vector<SrsForwarder*>::iterator it;
        for (it = forwarders.begin(); it != forwarders.end(); ++it) {
            SrsForwarder* forwarder = *it;
            srs_freep(forwarder);
        }
        forwarders.clear();
    }
    
    srs_freep(cache_metadata);
    srs_freep(cache_sh_video);
    srs_freep(cache_sh_audio);
    
winlin authored
493 494
    srs_freep(play_edge);
    srs_freep(publish_edge);
495 496
    srs_freep(gop_cache);
    
497
#ifdef SRS_AUTO_HLS
498
    srs_freep(hls);
499
#endif
winlin authored
500 501 502
#ifdef SRS_AUTO_DVR
    srs_freep(dvr);
#endif
503
#ifdef SRS_AUTO_TRANSCODE
504
    srs_freep(encoder);
505
#endif
winlin authored
506
507
    srs_freep(_req);
508 509
}
510 511 512 513 514
int SrsSource::initialize()
{
    int ret = ERROR_SUCCESS;
    
#ifdef SRS_AUTO_DVR
515
    if ((ret = dvr->initialize(_req)) != ERROR_SUCCESS) {
516 517 518
        return ret;
    }
#endif
519
winlin authored
520 521 522 523
    if ((ret = play_edge->initialize(this, _req)) != ERROR_SUCCESS) {
        return ret;
    }
    if ((ret = publish_edge->initialize(this, _req)) != ERROR_SUCCESS) {
524 525
        return ret;
    }
526
    
527 528 529
    double queue_size = _srs_config->get_queue_length(_req->vhost);
    publish_edge->set_queue_size(queue_size);
    
530 531 532
    return ret;
}
533
int SrsSource::on_reload_vhost_atc(string vhost)
winlin authored
534 535 536
{
    int ret = ERROR_SUCCESS;
    
537
    if (_req->vhost != vhost) {
winlin authored
538 539 540 541 542 543 544 545 546 547 548 549 550 551
        return ret;
    }
    
    // atc changed.
    bool enabled_atc = _srs_config->get_atc(vhost);
    
    srs_warn("vhost %s atc changed to %d, connected client may corrupt.", 
        vhost.c_str(), enabled_atc);
    
    gop_cache->clear();
    
    return ret;
}
552
int SrsSource::on_reload_vhost_gop_cache(string vhost)
winlin authored
553
{
554 555
    int ret = ERROR_SUCCESS;
    
556
    if (_req->vhost != vhost) {
557 558 559 560 561 562 563
        return ret;
    }
    
    // gop cache changed.
    bool enabled_cache = _srs_config->get_gop_cache(vhost);
    
    srs_trace("vhost %s gop_cache changed to %d, source url=%s", 
564
        vhost.c_str(), enabled_cache, _req->get_stream_url().c_str());
565 566 567 568
    
    set_cache(enabled_cache);
    
    return ret;
winlin authored
569 570
}
571
int SrsSource::on_reload_vhost_queue_length(string vhost)
winlin authored
572
{
573 574
    int ret = ERROR_SUCCESS;
    
575
    if (_req->vhost != vhost) {
576 577 578
        return ret;
    }
579
    double queue_size = _srs_config->get_queue_length(_req->vhost);
580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602
    
    if (true) {
        std::vector<SrsConsumer*>::iterator it;
        
        for (it = consumers.begin(); it != consumers.end(); ++it) {
            SrsConsumer* consumer = *it;
            consumer->set_queue_size(queue_size);
        }

        srs_trace("consumers reload queue size success.");
    }
    
    if (true) {
        std::vector<SrsForwarder*>::iterator it;
        
        for (it = forwarders.begin(); it != forwarders.end(); ++it) {
            SrsForwarder* forwarder = *it;
            forwarder->set_queue_size(queue_size);
        }

        srs_trace("forwarders reload queue size success.");
    }
    
603 604 605 606 607
    if (true) {
        publish_edge->set_queue_size(queue_size);
        srs_trace("publish_edge reload queue size success.");
    }
    
608
    return ret;
winlin authored
609 610
}
611
int SrsSource::on_reload_vhost_forward(string vhost)
winlin authored
612
{
613 614
    int ret = ERROR_SUCCESS;
    
615
    if (_req->vhost != vhost) {
616 617
        return ret;
    }
winlin authored
618
619 620 621 622 623 624
    // forwarders
    destroy_forwarders();
    if ((ret = create_forwarders()) != ERROR_SUCCESS) {
        srs_error("create forwarders failed. ret=%d", ret);
        return ret;
    }
625
626 627 628
    srs_trace("vhost %s forwarders reload success", vhost.c_str());
    
    return ret;
winlin authored
629 630
}
631
int SrsSource::on_reload_vhost_hls(string vhost)
632
{
633 634
    int ret = ERROR_SUCCESS;
    
635
    if (_req->vhost != vhost) {
636 637 638
        return ret;
    }
    
639
#ifdef SRS_AUTO_HLS
640
    hls->on_unpublish();
641
    if ((ret = hls->on_publish(_req)) != ERROR_SUCCESS) {
642 643 644 645
        srs_error("hls publish failed. ret=%d", ret);
        return ret;
    }
    srs_trace("vhost %s hls reload success", vhost.c_str());
646
#endif
647 648
    
    return ret;
649 650
}
winlin authored
651 652 653 654
int SrsSource::on_reload_vhost_dvr(string vhost)
{
    int ret = ERROR_SUCCESS;
    
655
    if (_req->vhost != vhost) {
winlin authored
656 657 658 659
        return ret;
    }
    
#ifdef SRS_AUTO_DVR
660
    // cleanup dvr
winlin authored
661
    dvr->on_unpublish();
662 663

    // reinitialize the dvr, update plan.
664
    if ((ret = dvr->initialize(_req)) != ERROR_SUCCESS) {
665 666 667 668
        return ret;
    }

    // start to publish by new plan.
669
    if ((ret = dvr->on_publish(_req)) != ERROR_SUCCESS) {
winlin authored
670 671 672
        srs_error("dvr publish failed. ret=%d", ret);
        return ret;
    }
673
    
winlin authored
674 675 676 677 678 679
    srs_trace("vhost %s dvr reload success", vhost.c_str());
#endif
    
    return ret;
}
680
int SrsSource::on_reload_vhost_transcode(string vhost)
681
{
682 683
    int ret = ERROR_SUCCESS;
    
684
    if (_req->vhost != vhost) {
685 686 687
        return ret;
    }
    
688
#ifdef SRS_AUTO_TRANSCODE
689
    encoder->on_unpublish();
690
    if ((ret = encoder->on_publish(_req)) != ERROR_SUCCESS) {
691 692 693 694
        srs_error("start encoder failed. ret=%d", ret);
        return ret;
    }
    srs_trace("vhost %s transcode reload success", vhost.c_str());
695
#endif
696 697
    
    return ret;
698 699
}
700 701
int SrsSource::on_forwarder_start(SrsForwarder* forwarder)
{
702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719
    int ret = ERROR_SUCCESS;
        
    // feed the forwarder the metadata/sequence header,
    // when reload to enable the forwarder.
    if (cache_metadata && (ret = forwarder->on_meta_data(cache_metadata->copy())) != ERROR_SUCCESS) {
        srs_error("forwarder process onMetaData message failed. ret=%d", ret);
        return ret;
    }
    if (cache_sh_video && (ret = forwarder->on_video(cache_sh_video->copy())) != ERROR_SUCCESS) {
        srs_error("forwarder process video sequence header message failed. ret=%d", ret);
        return ret;
    }
    if (cache_sh_audio && (ret = forwarder->on_audio(cache_sh_audio->copy())) != ERROR_SUCCESS) {
        srs_error("forwarder process audio sequence header message failed. ret=%d", ret);
        return ret;
    }
    
    return ret;
720 721
}
722 723
int SrsSource::on_hls_start()
{
724 725
    int ret = ERROR_SUCCESS;
    
726
#ifdef SRS_AUTO_HLS
727
    // feed the hls the metadata/sequence header,
728 729
    // when reload to start hls, hls will never get the sequence header in stream,
    // use the SrsSource.on_hls_start to push the sequence header to HLS.
730 731 732 733 734 735 736 737 738
    // TODO: maybe need to decode the metadata?
    if (cache_sh_video && (ret = hls->on_video(cache_sh_video->copy())) != ERROR_SUCCESS) {
        srs_error("hls process video sequence header message failed. ret=%d", ret);
        return ret;
    }
    if (cache_sh_audio && (ret = hls->on_audio(cache_sh_audio->copy())) != ERROR_SUCCESS) {
        srs_error("hls process audio sequence header message failed. ret=%d", ret);
        return ret;
    }
winlin authored
739 740 741 742 743
#endif
    
    return ret;
}
744
int SrsSource::on_dvr_request_sh()
winlin authored
745 746 747 748 749 750
{
    int ret = ERROR_SUCCESS;
    
#ifdef SRS_AUTO_DVR
    // feed the dvr the metadata/sequence header,
    // when reload to start dvr, dvr will never get the sequence header in stream,
751
    // use the SrsSource.on_dvr_request_sh to push the sequence header to DVR.
winlin authored
752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772
    if (cache_metadata) {
        char* payload = (char*)cache_metadata->payload;
        int size = (int)cache_metadata->size;
        
        SrsStream stream;
        if ((ret = stream.initialize(payload, size)) != ERROR_SUCCESS) {
            srs_error("dvr decode metadata stream failed. ret=%d", ret);
            return ret;
        }
        
        SrsOnMetaDataPacket pkt;
        if ((ret = pkt.decode(&stream)) != ERROR_SUCCESS) {
            srs_error("dvr decode metadata packet failed.");
            return ret;
        }
        
        if ((ret = dvr->on_meta_data(&pkt)) != ERROR_SUCCESS) {
            srs_error("dvr process onMetaData message failed. ret=%d", ret);
            return ret;
        }
    }
773
    
winlin authored
774 775 776 777 778 779 780 781
    if (cache_sh_video && (ret = dvr->on_video(cache_sh_video->copy())) != ERROR_SUCCESS) {
        srs_error("dvr process video sequence header message failed. ret=%d", ret);
        return ret;
    }
    if (cache_sh_audio && (ret = dvr->on_audio(cache_sh_audio->copy())) != ERROR_SUCCESS) {
        srs_error("dvr process audio sequence header message failed. ret=%d", ret);
        return ret;
    }
782
#endif
783 784
    
    return ret;
785 786
}
787 788
bool SrsSource::can_publish()
{
789
    return _can_publish;
790 791
}
792
int SrsSource::on_meta_data(SrsMessage* msg, SrsOnMetaDataPacket* metadata)
793
{
794 795
    int ret = ERROR_SUCCESS;
    
796
#ifdef SRS_AUTO_HLS
797 798 799 800
    if (metadata && (ret = hls->on_meta_data(metadata->metadata)) != ERROR_SUCCESS) {
        srs_error("hls process onMetaData message failed. ret=%d", ret);
        return ret;
    }
801
#endif
802
    
winlin authored
803
#ifdef SRS_AUTO_DVR
winlin authored
804
    if (metadata && (ret = dvr->on_meta_data(metadata)) != ERROR_SUCCESS) {
winlin authored
805 806 807 808 809
        srs_error("dvr process onMetaData message failed. ret=%d", ret);
        return ret;
    }
#endif
    
810 811 812 813 814 815 816 817 818 819 820 821 822 823 824
    metadata->metadata->set("server", SrsAmf0Any::str(RTMP_SIG_SRS_KEY" "RTMP_SIG_SRS_VERSION" ("RTMP_SIG_SRS_URL_SHORT")"));
    metadata->metadata->set("contributor", SrsAmf0Any::str(RTMP_SIG_SRS_PRIMARY_AUTHROS));
    
    SrsAmf0Any* prop = NULL;
    if ((prop = metadata->metadata->get_property("audiosamplerate")) != NULL) {
        if (prop->is_number()) {
            sample_rate = (int)prop->to_number();
        }
    }
    if ((prop = metadata->metadata->get_property("framerate")) != NULL) {
        if (prop->is_number()) {
            frame_rate = (int)prop->to_number();
        }
    }
    
825
    // if allow atc_auto and bravo-atc detected, open atc for vhost.
winlin authored
826
    atc = _srs_config->get_atc(_req->vhost);
827 828 829 830 831
    if (_srs_config->get_atc_auto(_req->vhost)) {
        if ((prop = metadata->metadata->get_property("bravo_atc")) != NULL) {
            if (prop->is_string() && prop->to_str() == "true") {
                atc = true;
            }
832 833 834
        }
    }
    
835
    // encode the metadata to payload
836
    int size = 0;
837 838 839 840 841 842 843 844
    char* payload = NULL;
    if ((ret = metadata->encode(size, payload)) != ERROR_SUCCESS) {
        srs_error("encode metadata error. ret=%d", ret);
        srs_freepa(payload);
        return ret;
    }
    srs_verbose("encode metadata success.");
    
845 846 847 848 849
    if (size <= 0) {
        srs_warn("ignore the invalid metadata. size=%d", size);
        return ret;
    }
    
850 851
    // create a shared ptr message.
    srs_freep(cache_metadata);
852
    cache_metadata = new SrsSharedPtrMessage();
853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886
    
    // dump message to shared ptr message.
    if ((ret = cache_metadata->initialize(&msg->header, payload, size)) != ERROR_SUCCESS) {
        srs_error("initialize the cache metadata failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("initialize shared ptr metadata success.");
    
    // copy to all consumer
    if (true) {
        std::vector<SrsConsumer*>::iterator it;
        for (it = consumers.begin(); it != consumers.end(); ++it) {
            SrsConsumer* consumer = *it;
            if ((ret = consumer->enqueue(cache_metadata->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {
                srs_error("dispatch the metadata failed. ret=%d", ret);
                return ret;
            }
        }
        srs_trace("dispatch metadata success.");
    }
    
    // copy to all forwarders
    if (true) {
        std::vector<SrsForwarder*>::iterator it;
        for (it = forwarders.begin(); it != forwarders.end(); ++it) {
            SrsForwarder* forwarder = *it;
            if ((ret = forwarder->on_meta_data(cache_metadata->copy())) != ERROR_SUCCESS) {
                srs_error("forwarder process onMetaData message failed. ret=%d", ret);
                return ret;
            }
        }
    }
    
    return ret;
887 888
}
889
int SrsSource::on_audio(SrsMessage* audio)
890
{
891 892
    int ret = ERROR_SUCCESS;
    
893 894
    SrsSharedPtrMessage* msg = new SrsSharedPtrMessage();
    SrsAutoFree(SrsSharedPtrMessage, msg, false);
895 896 897 898 899 900
    if ((ret = msg->initialize(audio)) != ERROR_SUCCESS) {
        srs_error("initialize the audio failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("initialize shared ptr audio success.");
    
901
#ifdef SRS_AUTO_HLS
902 903 904 905 906 907 908 909 910
    if ((ret = hls->on_audio(msg->copy())) != ERROR_SUCCESS) {
        srs_warn("hls process audio message failed, ignore and disable hls. ret=%d", ret);
        
        // unpublish, ignore ret.
        hls->on_unpublish();
        
        // ignore.
        ret = ERROR_SUCCESS;
    }
911
#endif
912
    
winlin authored
913 914 915 916 917 918 919 920 921 922 923 924
#ifdef SRS_AUTO_DVR
    if ((ret = dvr->on_audio(msg->copy())) != ERROR_SUCCESS) {
        srs_warn("dvr process audio message failed, ignore and disable dvr. ret=%d", ret);
        
        // unpublish, ignore ret.
        dvr->on_unpublish();
        
        // ignore.
        ret = ERROR_SUCCESS;
    }
#endif
    
925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964
    // copy to all consumer
    if (true) {
        std::vector<SrsConsumer*>::iterator it;
        for (it = consumers.begin(); it != consumers.end(); ++it) {
            SrsConsumer* consumer = *it;
            if ((ret = consumer->enqueue(msg->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {
                srs_error("dispatch the audio failed. ret=%d", ret);
                return ret;
            }
        }
        srs_info("dispatch audio success.");
    }

    // copy to all forwarders.
    if (true) {
        std::vector<SrsForwarder*>::iterator it;
        for (it = forwarders.begin(); it != forwarders.end(); ++it) {
            SrsForwarder* forwarder = *it;
            if ((ret = forwarder->on_audio(msg->copy())) != ERROR_SUCCESS) {
                srs_error("forwarder process audio message failed. ret=%d", ret);
                return ret;
            }
        }
    }

    // cache the sequence header if h264
    if (SrsCodec::audio_is_sequence_header(msg->payload, msg->size)) {
        srs_freep(cache_sh_audio);
        cache_sh_audio = msg->copy();
        srs_trace("update audio sequence header success. size=%d", msg->header.payload_length);
        return ret;
    }
    
    // cache the last gop packets
    if ((ret = gop_cache->cache(msg)) != ERROR_SUCCESS) {
        srs_error("shrink gop cache failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("cache gop success.");
    
965 966 967 968 969 970 971 972 973 974
    // if atc, update the sequence header to abs time.
    if (atc) {
        if (cache_sh_audio) {
            cache_sh_audio->header.timestamp = msg->header.timestamp;
        }
        if (cache_metadata) {
            cache_metadata->header.timestamp = msg->header.timestamp;
        }
    }
    
975
    return ret;
976 977
}
978
int SrsSource::on_video(SrsMessage* video)
979
{
980 981
    int ret = ERROR_SUCCESS;
    
982 983
    SrsSharedPtrMessage* msg = new SrsSharedPtrMessage();
    SrsAutoFree(SrsSharedPtrMessage, msg, false);
984 985 986 987 988 989
    if ((ret = msg->initialize(video)) != ERROR_SUCCESS) {
        srs_error("initialize the video failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("initialize shared ptr video success.");
    
990
#ifdef SRS_AUTO_HLS
991 992 993 994 995 996 997 998 999
    if ((ret = hls->on_video(msg->copy())) != ERROR_SUCCESS) {
        srs_warn("hls process video message failed, ignore and disable hls. ret=%d", ret);
        
        // unpublish, ignore ret.
        hls->on_unpublish();
        
        // ignore.
        ret = ERROR_SUCCESS;
    }
1000
#endif
1001
    
winlin authored
1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013
#ifdef SRS_AUTO_DVR
    if ((ret = dvr->on_video(msg->copy())) != ERROR_SUCCESS) {
        srs_warn("dvr process video message failed, ignore and disable dvr. ret=%d", ret);
        
        // unpublish, ignore ret.
        dvr->on_unpublish();
        
        // ignore.
        ret = ERROR_SUCCESS;
    }
#endif
    
1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053
    // copy to all consumer
    if (true) {
        std::vector<SrsConsumer*>::iterator it;
        for (it = consumers.begin(); it != consumers.end(); ++it) {
            SrsConsumer* consumer = *it;
            if ((ret = consumer->enqueue(msg->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {
                srs_error("dispatch the video failed. ret=%d", ret);
                return ret;
            }
        }
        srs_info("dispatch video success.");
    }

    // copy to all forwarders.
    if (true) {
        std::vector<SrsForwarder*>::iterator it;
        for (it = forwarders.begin(); it != forwarders.end(); ++it) {
            SrsForwarder* forwarder = *it;
            if ((ret = forwarder->on_video(msg->copy())) != ERROR_SUCCESS) {
                srs_error("forwarder process video message failed. ret=%d", ret);
                return ret;
            }
        }
    }

    // cache the sequence header if h264
    if (SrsCodec::video_is_sequence_header(msg->payload, msg->size)) {
        srs_freep(cache_sh_video);
        cache_sh_video = msg->copy();
        srs_trace("update video sequence header success. size=%d", msg->header.payload_length);
        return ret;
    }

    // cache the last gop packets
    if ((ret = gop_cache->cache(msg)) != ERROR_SUCCESS) {
        srs_error("gop cache msg failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("cache gop success.");
    
1054 1055 1056 1057 1058 1059 1060 1061 1062 1063
    // if atc, update the sequence header to abs time.
    if (atc) {
        if (cache_sh_video) {
            cache_sh_video->header.timestamp = msg->header.timestamp;
        }
        if (cache_metadata) {
            cache_metadata->header.timestamp = msg->header.timestamp;
        }
    }
    
1064
    return ret;
1065 1066
}
1067
int SrsSource::on_publish()
1068
{
1069 1070 1071
    int ret = ERROR_SUCCESS;
    
    // update the request object.
1072
    srs_assert(_req);
1073 1074 1075 1076 1077 1078 1079 1080 1081
    
    _can_publish = false;
    
    // create forwarders
    if ((ret = create_forwarders()) != ERROR_SUCCESS) {
        srs_error("create forwarders failed. ret=%d", ret);
        return ret;
    }
    
1082
#ifdef SRS_AUTO_TRANSCODE
1083
    if ((ret = encoder->on_publish(_req)) != ERROR_SUCCESS) {
1084 1085 1086
        srs_error("start encoder failed. ret=%d", ret);
        return ret;
    }
1087
#endif
1088
    
1089
#ifdef SRS_AUTO_HLS
1090
    if ((ret = hls->on_publish(_req)) != ERROR_SUCCESS) {
1091 1092 1093
        srs_error("start hls failed. ret=%d", ret);
        return ret;
    }
1094
#endif
winlin authored
1095 1096
    
#ifdef SRS_AUTO_DVR
1097
    if ((ret = dvr->on_publish(_req)) != ERROR_SUCCESS) {
winlin authored
1098 1099 1100 1101
        srs_error("start dvr failed. ret=%d", ret);
        return ret;
    }
#endif
1102
1103
    return ret;
1104 1105 1106 1107
}

void SrsSource::on_unpublish()
{
1108 1109
    // destroy all forwarders
    destroy_forwarders();
1110
1111
#ifdef SRS_AUTO_TRANSCODE
1112
    encoder->on_unpublish();
1113 1114
#endif
1115
#ifdef SRS_AUTO_HLS
1116
    hls->on_unpublish();
1117
#endif
winlin authored
1118 1119 1120 1121
    
#ifdef SRS_AUTO_DVR
    dvr->on_unpublish();
#endif
1122
1123
    gop_cache->clear();
1124
1125 1126 1127 1128 1129 1130 1131 1132 1133
    srs_freep(cache_metadata);
    frame_rate = sample_rate = 0;
    
    srs_freep(cache_sh_video);
    srs_freep(cache_sh_audio);
    
    srs_trace("clear cache/metadata/sequence-headers when unpublish.");
    
    _can_publish = true;
1134 1135 1136 1137
}

 int SrsSource::create_consumer(SrsConsumer*& consumer)
{
1138 1139 1140 1141 1142
    int ret = ERROR_SUCCESS;
    
    consumer = new SrsConsumer(this);
    consumers.push_back(consumer);
    
1143
    double queue_size = _srs_config->get_queue_length(_req->vhost);
1144 1145 1146 1147 1148 1149 1150 1151
    consumer->set_queue_size(queue_size);

    if (cache_metadata && (ret = consumer->enqueue(cache_metadata->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {
        srs_error("dispatch metadata failed. ret=%d", ret);
        return ret;
    }
    srs_info("dispatch metadata success");
    
1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162
    // if atc, update the sequence header to gop cache time.
    if (atc && !gop_cache->empty()) {
        if (cache_sh_video) {
            cache_sh_video->header.timestamp = gop_cache->get_start_time();
        }
        if (cache_sh_audio) {
            cache_sh_audio->header.timestamp = gop_cache->get_start_time();
        }
    }
    
    // copy sequence header
1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174
    if (cache_sh_video && (ret = consumer->enqueue(cache_sh_video->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {
        srs_error("dispatch video sequence header failed. ret=%d", ret);
        return ret;
    }
    srs_info("dispatch video sequence header success");
    
    if (cache_sh_audio && (ret = consumer->enqueue(cache_sh_audio->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {
        srs_error("dispatch audio sequence header failed. ret=%d", ret);
        return ret;
    }
    srs_info("dispatch audio sequence header success");
    
1175
    // copy gop cache to client.
1176 1177 1178 1179 1180 1181 1182
    if ((ret = gop_cache->dump(consumer, sample_rate, frame_rate)) != ERROR_SUCCESS) {
        return ret;
    }
    
    srs_trace("create consumer, queue_size=%.2f, tba=%d, tbv=%d", queue_size, sample_rate, frame_rate);
    
    return ret;
1183 1184 1185 1186
}

void SrsSource::on_consumer_destroy(SrsConsumer* consumer)
{
1187 1188 1189 1190 1191 1192
    std::vector<SrsConsumer*>::iterator it;
    it = std::find(consumers.begin(), consumers.end(), consumer);
    if (it != consumers.end()) {
        consumers.erase(it);
    }
    srs_info("handle consumer destroy success.");
1193 1194
    
    if (consumers.empty()) {
winlin authored
1195
        play_edge->on_all_client_stop();
1196
    }
1197 1198 1199 1200
}

void SrsSource::set_cache(bool enabled)
{
1201
    gop_cache->set(enabled);
1202 1203
}
1204 1205 1206 1207 1208
bool SrsSource::is_atc()
{
    return atc;
}
1209
int SrsSource::on_edge_start_play()
winlin authored
1210
{
winlin authored
1211 1212 1213 1214 1215 1216
    return play_edge->on_client_play();
}

int SrsSource::on_edge_start_publish()
{
    return publish_edge->on_client_publish();
winlin authored
1217 1218
}
1219
int SrsSource::on_edge_proxy_publish(SrsMessage* msg)
1220
{
1221 1222 1223 1224 1225 1226
    return publish_edge->on_proxy_publish(msg);
}

void SrsSource::on_edge_proxy_unpublish()
{
    publish_edge->on_proxy_unpublish();
1227 1228
}
winlin authored
1229 1230
int SrsSource::create_forwarders()
{
1231 1232
    int ret = ERROR_SUCCESS;
    
1233
    SrsConfDirective* conf = _srs_config->get_forward(_req->vhost);
1234 1235 1236 1237 1238 1239
    for (int i = 0; conf && i < (int)conf->args.size(); i++) {
        std::string forward_server = conf->args.at(i);
        
        SrsForwarder* forwarder = new SrsForwarder(this);
        forwarders.push_back(forwarder);
    
1240
        double queue_size = _srs_config->get_queue_length(_req->vhost);
1241 1242
        forwarder->set_queue_size(queue_size);
        
1243
        if ((ret = forwarder->on_publish(_req, forward_server)) != ERROR_SUCCESS) {
1244 1245
            srs_error("start forwarder failed. "
                "vhost=%s, app=%s, stream=%s, forward-to=%s",
1246
                _req->vhost.c_str(), _req->app.c_str(), _req->stream.c_str(),
1247 1248 1249 1250 1251 1252
                forward_server.c_str());
            return ret;
        }
    }

    return ret;
winlin authored
1253 1254 1255 1256
}

void SrsSource::destroy_forwarders()
{
1257 1258 1259 1260 1261 1262 1263
    std::vector<SrsForwarder*>::iterator it;
    for (it = forwarders.begin(); it != forwarders.end(); ++it) {
        SrsForwarder* forwarder = *it;
        forwarder->on_unpublish();
        srs_freep(forwarder);
    }
    forwarders.clear();
winlin authored
1264 1265
}