winlin

add todo for source

1 -/*  
2 -The MIT License (MIT)  
3 -  
4 -Copyright (c) 2013 winlin  
5 -  
6 -Permission is hereby granted, free of charge, to any person obtaining a copy of  
7 -this software and associated documentation files (the "Software"), to deal in  
8 -the Software without restriction, including without limitation the rights to  
9 -use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of  
10 -the Software, and to permit persons to whom the Software is furnished to do so,  
11 -subject to the following conditions:  
12 -  
13 -The above copyright notice and this permission notice shall be included in all  
14 -copies or substantial portions of the Software.  
15 -  
16 -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR  
17 -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS  
18 -FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR  
19 -COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER  
20 -IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN  
21 -CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.  
22 -*/  
23 -  
24 -#include <srs_core_source.hpp>  
25 -  
26 -#include <algorithm>  
27 -  
28 -#include <srs_core_log.hpp>  
29 -#include <srs_core_protocol.hpp>  
30 -#include <srs_core_autofree.hpp>  
31 -#include <srs_core_amf0.hpp>  
32 -#include <srs_core_codec.hpp>  
33 -#include <srs_core_hls.hpp>  
34 -#include <srs_core_forward.hpp>  
35 -#include <srs_core_config.hpp>  
36 -#include <srs_core_encoder.hpp>  
37 -#include <srs_core_rtmp.hpp>  
38 -  
39 -#define CONST_MAX_JITTER_MS 500  
40 -#define DEFAULT_FRAME_TIME_MS 10  
41 -#define PAUSED_SHRINK_SIZE 250  
42 -  
43 -SrsRtmpJitter::SrsRtmpJitter()  
44 -{  
45 - last_pkt_correct_time = last_pkt_time = 0;  
46 -}  
47 -  
48 -SrsRtmpJitter::~SrsRtmpJitter()  
49 -{  
50 -}  
51 -  
52 -int SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, int tba, int tbv, int64_t* corrected_time)  
53 -{  
54 - int ret = ERROR_SUCCESS;  
55 -  
56 - int sample_rate = tba;  
57 - int frame_rate = tbv;  
58 -  
59 - /**  
60 - * we use a very simple time jitter detect/correct algorithm:  
61 - * 1. delta: ensure the delta is positive and valid,  
62 - * we set the delta to DEFAULT_FRAME_TIME_MS,  
63 - * if the delta of time is nagative or greater than CONST_MAX_JITTER_MS.  
64 - * 2. last_pkt_time: specifies the original packet time,  
65 - * is used to detect next jitter.  
66 - * 3. last_pkt_correct_time: simply add the positive delta,  
67 - * and enforce the time monotonically.  
68 - */  
69 - u_int32_t time = msg->header.timestamp;  
70 - int32_t delta = time - last_pkt_time;  
71 -  
72 - // if jitter detected, reset the delta.  
73 - if (delta < 0 || delta > CONST_MAX_JITTER_MS) {  
74 - // calc the right diff by audio sample rate  
75 - if (msg->header.is_audio() && sample_rate > 0) {  
76 - delta = (int32_t)(delta * 1000.0 / sample_rate);  
77 - } else if (msg->header.is_video() && frame_rate > 0) {  
78 - delta = (int32_t)(delta * 1.0 / frame_rate);  
79 - } else {  
80 - delta = DEFAULT_FRAME_TIME_MS;  
81 - }  
82 -  
83 - // sometimes, the time is absolute time, so correct it again.  
84 - if (delta < 0 || delta > CONST_MAX_JITTER_MS) {  
85 - delta = DEFAULT_FRAME_TIME_MS;  
86 - }  
87 -  
88 - srs_info("jitter detected, last_pts=%d, pts=%d, diff=%d, last_time=%d, time=%d, diff=%d",  
89 - last_pkt_time, time, time - last_pkt_time, last_pkt_correct_time, last_pkt_correct_time + delta, delta);  
90 - } else {  
91 - srs_verbose("timestamp no jitter. time=%d, last_pkt=%d, correct_to=%d",  
92 - time, last_pkt_time, last_pkt_correct_time + delta);  
93 - }  
94 -  
95 - last_pkt_correct_time = srs_max(0, last_pkt_correct_time + delta);  
96 -  
97 - if (corrected_time) {  
98 - *corrected_time = last_pkt_correct_time;  
99 - }  
100 - msg->header.timestamp = last_pkt_correct_time;  
101 -  
102 - last_pkt_time = time;  
103 -  
104 - return ret;  
105 -}  
106 -  
107 -int SrsRtmpJitter::get_time()  
108 -{  
109 - return (int)last_pkt_correct_time;  
110 -}  
111 -  
112 -SrsConsumer::SrsConsumer(SrsSource* _source)  
113 -{  
114 - source = _source;  
115 - paused = false;  
116 - jitter = new SrsRtmpJitter();  
117 -}  
118 -  
119 -SrsConsumer::~SrsConsumer()  
120 -{  
121 - clear();  
122 -  
123 - source->on_consumer_destroy(this);  
124 - srs_freep(jitter);  
125 -}  
126 -  
127 -int SrsConsumer::get_time()  
128 -{  
129 - return jitter->get_time();  
130 -}  
131 -  
132 -int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, int tba, int tbv)  
133 -{  
134 - int ret = ERROR_SUCCESS;  
135 -  
136 - if ((ret = jitter->correct(msg, tba, tbv)) != ERROR_SUCCESS) {  
137 - srs_freep(msg);  
138 - return ret;  
139 - }  
140 -  
141 - // TODO: check the queue size and drop packets if overflow.  
142 - msgs.push_back(msg);  
143 -  
144 - return ret;  
145 -}  
146 -  
147 -int SrsConsumer::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count)  
148 -{  
149 - int ret = ERROR_SUCCESS;  
150 -  
151 - if (msgs.empty()) {  
152 - return ret;  
153 - }  
154 -  
155 - if (paused) {  
156 - if ((int)msgs.size() >= PAUSED_SHRINK_SIZE) {  
157 - shrink();  
158 - }  
159 - return ret;  
160 - }  
161 -  
162 - if (max_count == 0) {  
163 - count = (int)msgs.size();  
164 - } else {  
165 - count = srs_min(max_count, (int)msgs.size());  
166 - }  
167 -  
168 - pmsgs = new SrsSharedPtrMessage*[count];  
169 -  
170 - for (int i = 0; i < count; i++) {  
171 - pmsgs[i] = msgs[i];  
172 - }  
173 -  
174 - if (count == (int)msgs.size()) {  
175 - msgs.clear();  
176 - } else {  
177 - msgs.erase(msgs.begin(), msgs.begin() + count);  
178 - }  
179 -  
180 - return ret;  
181 -}  
182 -  
183 -int SrsConsumer::on_play_client_pause(bool is_pause)  
184 -{  
185 - int ret = ERROR_SUCCESS;  
186 -  
187 - srs_trace("stream consumer change pause state %d=>%d", paused, is_pause);  
188 - paused = is_pause;  
189 -  
190 - return ret;  
191 -}  
192 -  
193 -void SrsConsumer::shrink()  
194 -{  
195 - int i = 0;  
196 - std::vector<SrsSharedPtrMessage*>::iterator it;  
197 -  
198 - // issue the last video iframe.  
199 - bool has_video = false;  
200 - int frame_to_remove = 0;  
201 - std::vector<SrsSharedPtrMessage*>::iterator iframe = msgs.end();  
202 - for (i = 0, it = msgs.begin(); it != msgs.end(); ++it, i++) {  
203 - SrsSharedPtrMessage* msg = *it;  
204 - if (msg->header.is_video()) {  
205 - has_video = true;  
206 - if (SrsCodec::video_is_keyframe(msg->payload, msg->size)) {  
207 - iframe = it;  
208 - frame_to_remove = i + 1;  
209 - }  
210 - }  
211 - }  
212 -  
213 - // last iframe is the first elem, ignore it.  
214 - if (iframe == msgs.begin()) {  
215 - return;  
216 - }  
217 -  
218 - // recalc the frame to remove  
219 - if (iframe == msgs.end()) {  
220 - frame_to_remove = 0;  
221 - }  
222 - if (!has_video) {  
223 - frame_to_remove = (int)msgs.size();  
224 - }  
225 -  
226 - srs_trace("shrink the cache queue, has_video=%d, has_iframe=%d, size=%d, removed=%d",  
227 - has_video, iframe != msgs.end(), (int)msgs.size(), frame_to_remove);  
228 -  
229 - // if no video, remove all audio.  
230 - if (!has_video) {  
231 - clear();  
232 - return;  
233 - }  
234 -  
235 - // if exists video Iframe, remove the frames before it.  
236 - if (iframe != msgs.end()) {  
237 - for (it = msgs.begin(); it != iframe; ++it) {  
238 - SrsSharedPtrMessage* msg = *it;  
239 - srs_freep(msg);  
240 - }  
241 - msgs.erase(msgs.begin(), iframe);  
242 - }  
243 -}  
244 -  
245 -void SrsConsumer::clear()  
246 -{  
247 - std::vector<SrsSharedPtrMessage*>::iterator it;  
248 - for (it = msgs.begin(); it != msgs.end(); ++it) {  
249 - SrsSharedPtrMessage* msg = *it;  
250 - srs_freep(msg);  
251 - }  
252 - msgs.clear();  
253 -}  
254 -  
255 -SrsGopCache::SrsGopCache()  
256 -{  
257 - cached_video_count = 0;  
258 - enable_gop_cache = true;  
259 -}  
260 -  
261 -SrsGopCache::~SrsGopCache()  
262 -{  
263 - clear();  
264 -}  
265 -  
266 -void SrsGopCache::set(bool enabled)  
267 -{  
268 - enable_gop_cache = enabled;  
269 -  
270 - if (!enabled) {  
271 - srs_info("disable gop cache, clear %d packets.", (int)gop_cache.size());  
272 - clear();  
273 - return;  
274 - }  
275 -  
276 - srs_info("enable gop cache");  
277 -}  
278 -  
279 -int SrsGopCache::cache(SrsSharedPtrMessage* msg)  
280 -{  
281 - int ret = ERROR_SUCCESS;  
282 -  
283 - if (!enable_gop_cache) {  
284 - srs_verbose("gop cache is disabled.");  
285 - return ret;  
286 - }  
287 -  
288 - // got video, update the video count if acceptable  
289 - if (msg->header.is_video()) {  
290 - cached_video_count++;  
291 - }  
292 -  
293 - // no acceptable video or pure audio, disable the cache.  
294 - if (cached_video_count == 0) {  
295 - srs_verbose("ignore any frame util got a h264 video frame.");  
296 - return ret;  
297 - }  
298 -  
299 - // clear gop cache when got key frame  
300 - if (msg->header.is_video() && SrsCodec::video_is_keyframe(msg->payload, msg->size)) {  
301 - srs_info("clear gop cache when got keyframe. vcount=%d, count=%d",  
302 - cached_video_count, (int)gop_cache.size());  
303 -  
304 - clear();  
305 -  
306 - // curent msg is video frame, so we set to 1.  
307 - cached_video_count = 1;  
308 - }  
309 -  
310 - // cache the frame.  
311 - gop_cache.push_back(msg->copy());  
312 -  
313 - return ret;  
314 -}  
315 -  
316 -void SrsGopCache::clear()  
317 -{  
318 - std::vector<SrsSharedPtrMessage*>::iterator it;  
319 - for (it = gop_cache.begin(); it != gop_cache.end(); ++it) {  
320 - SrsSharedPtrMessage* msg = *it;  
321 - srs_freep(msg);  
322 - }  
323 - gop_cache.clear();  
324 -  
325 - cached_video_count = 0;  
326 -}  
327 -  
328 -int SrsGopCache::dump(SrsConsumer* consumer, int tba, int tbv)  
329 -{  
330 - int ret = ERROR_SUCCESS;  
331 -  
332 - std::vector<SrsSharedPtrMessage*>::iterator it;  
333 - for (it = gop_cache.begin(); it != gop_cache.end(); ++it) {  
334 - SrsSharedPtrMessage* msg = *it;  
335 - if ((ret = consumer->enqueue(msg->copy(), tba, tbv)) != ERROR_SUCCESS) {  
336 - srs_error("dispatch cached gop failed. ret=%d", ret);  
337 - return ret;  
338 - }  
339 - }  
340 - srs_trace("dispatch cached gop success. count=%d, duration=%d", (int)gop_cache.size(), consumer->get_time());  
341 -  
342 - return ret;  
343 -}  
344 -  
345 -std::map<std::string, SrsSource*> SrsSource::pool;  
346 -  
347 -SrsSource* SrsSource::find(std::string stream_url)  
348 -{  
349 - if (pool.find(stream_url) == pool.end()) {  
350 - pool[stream_url] = new SrsSource(stream_url);  
351 - srs_verbose("create new source for url=%s", stream_url.c_str());  
352 - }  
353 -  
354 - return pool[stream_url];  
355 -}  
356 -  
357 -SrsSource::SrsSource(std::string _stream_url)  
358 -{  
359 - stream_url = _stream_url;  
360 -  
361 -#ifdef SRS_HLS  
362 - hls = new SrsHls();  
363 -#endif  
364 -#ifdef SRS_FFMPEG  
365 - encoder = new SrsEncoder();  
366 -#endif  
367 -  
368 - cache_metadata = cache_sh_video = cache_sh_audio = NULL;  
369 -  
370 - frame_rate = sample_rate = 0;  
371 - _can_publish = true;  
372 -  
373 - gop_cache = new SrsGopCache();  
374 -}  
375 -  
376 -SrsSource::~SrsSource()  
377 -{  
378 - if (true) {  
379 - std::vector<SrsConsumer*>::iterator it;  
380 - for (it = consumers.begin(); it != consumers.end(); ++it) {  
381 - SrsConsumer* consumer = *it;  
382 - srs_freep(consumer);  
383 - }  
384 - consumers.clear();  
385 - }  
386 -  
387 - if (true) {  
388 - std::vector<SrsForwarder*>::iterator it;  
389 - for (it = forwarders.begin(); it != forwarders.end(); ++it) {  
390 - SrsForwarder* forwarder = *it;  
391 - srs_freep(forwarder);  
392 - }  
393 - forwarders.clear();  
394 - }  
395 -  
396 - srs_freep(cache_metadata);  
397 - srs_freep(cache_sh_video);  
398 - srs_freep(cache_sh_audio);  
399 -  
400 - srs_freep(gop_cache);  
401 -  
402 -#ifdef SRS_HLS  
403 - srs_freep(hls);  
404 -#endif  
405 -#ifdef SRS_FFMPEG  
406 - srs_freep(encoder);  
407 -#endif  
408 -}  
409 -  
410 -bool SrsSource::can_publish()  
411 -{  
412 - return _can_publish;  
413 -}  
414 -  
415 -int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata)  
416 -{  
417 - int ret = ERROR_SUCCESS;  
418 -  
419 -#ifdef SRS_HLS  
420 - if ((ret = hls->on_meta_data(metadata)) != ERROR_SUCCESS) {  
421 - srs_error("hls process onMetaData message failed. ret=%d", ret);  
422 - return ret;  
423 - }  
424 -#endif  
425 -  
426 - metadata->metadata->set("server", new SrsAmf0String(  
427 - RTMP_SIG_SRS_KEY" "RTMP_SIG_SRS_VERSION" ("RTMP_SIG_SRS_URL_SHORT")"));  
428 -  
429 - SrsAmf0Any* prop = NULL;  
430 - if ((prop = metadata->metadata->get_property("audiosamplerate")) != NULL) {  
431 - if (prop->is_number()) {  
432 - sample_rate = (int)(srs_amf0_convert<SrsAmf0Number>(prop)->value);  
433 - }  
434 - }  
435 - if ((prop = metadata->metadata->get_property("framerate")) != NULL) {  
436 - if (prop->is_number()) {  
437 - frame_rate = (int)(srs_amf0_convert<SrsAmf0Number>(prop)->value);  
438 - }  
439 - }  
440 -  
441 - // encode the metadata to payload  
442 - int size = metadata->get_payload_length();  
443 - if (size <= 0) {  
444 - srs_warn("ignore the invalid metadata. size=%d", size);  
445 - return ret;  
446 - }  
447 - srs_verbose("get metadata size success.");  
448 -  
449 - char* payload = new char[size];  
450 - memset(payload, 0, size);  
451 - if ((ret = metadata->encode(size, payload)) != ERROR_SUCCESS) {  
452 - srs_error("encode metadata error. ret=%d", ret);  
453 - srs_freepa(payload);  
454 - return ret;  
455 - }  
456 - srs_verbose("encode metadata success.");  
457 -  
458 - // create a shared ptr message.  
459 - srs_freep(cache_metadata);  
460 - cache_metadata = new SrsSharedPtrMessage();  
461 -  
462 - // dump message to shared ptr message.  
463 - if ((ret = cache_metadata->initialize(msg, payload, size)) != ERROR_SUCCESS) {  
464 - srs_error("initialize the cache metadata failed. ret=%d", ret);  
465 - return ret;  
466 - }  
467 - srs_verbose("initialize shared ptr metadata success.");  
468 -  
469 - // copy to all consumer  
470 - if (true) {  
471 - std::vector<SrsConsumer*>::iterator it;  
472 - for (it = consumers.begin(); it != consumers.end(); ++it) {  
473 - SrsConsumer* consumer = *it;  
474 - if ((ret = consumer->enqueue(cache_metadata->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {  
475 - srs_error("dispatch the metadata failed. ret=%d", ret);  
476 - return ret;  
477 - }  
478 - }  
479 - srs_trace("dispatch metadata success.");  
480 - }  
481 -  
482 - // copy to all forwarders  
483 - if (true) {  
484 - std::vector<SrsForwarder*>::iterator it;  
485 - for (it = forwarders.begin(); it != forwarders.end(); ++it) {  
486 - SrsForwarder* forwarder = *it;  
487 - if ((ret = forwarder->on_meta_data(cache_metadata->copy())) != ERROR_SUCCESS) {  
488 - srs_error("forwarder process onMetaData message failed. ret=%d", ret);  
489 - return ret;  
490 - }  
491 - }  
492 - }  
493 -  
494 - return ret;  
495 -}  
496 -  
497 -int SrsSource::on_audio(SrsCommonMessage* audio)  
498 -{  
499 - int ret = ERROR_SUCCESS;  
500 -  
501 - SrsSharedPtrMessage* msg = new SrsSharedPtrMessage();  
502 - SrsAutoFree(SrsSharedPtrMessage, msg, false);  
503 - if ((ret = msg->initialize(audio)) != ERROR_SUCCESS) {  
504 - srs_error("initialize the audio failed. ret=%d", ret);  
505 - return ret;  
506 - }  
507 - srs_verbose("initialize shared ptr audio success.");  
508 -  
509 -#ifdef SRS_HLS  
510 - if ((ret = hls->on_audio(msg->copy())) != ERROR_SUCCESS) {  
511 - srs_warn("hls process audio message failed, ignore and disable hls. ret=%d", ret);  
512 -  
513 - // unpublish, ignore ret.  
514 - hls->on_unpublish();  
515 -  
516 - // ignore.  
517 - ret = ERROR_SUCCESS;  
518 - }  
519 -#endif  
520 -  
521 - // copy to all consumer  
522 - if (true) {  
523 - std::vector<SrsConsumer*>::iterator it;  
524 - for (it = consumers.begin(); it != consumers.end(); ++it) {  
525 - SrsConsumer* consumer = *it;  
526 - if ((ret = consumer->enqueue(msg->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {  
527 - srs_error("dispatch the audio failed. ret=%d", ret);  
528 - return ret;  
529 - }  
530 - }  
531 - srs_info("dispatch audio success.");  
532 - }  
533 -  
534 - // copy to all forwarders.  
535 - if (true) {  
536 - std::vector<SrsForwarder*>::iterator it;  
537 - for (it = forwarders.begin(); it != forwarders.end(); ++it) {  
538 - SrsForwarder* forwarder = *it;  
539 - if ((ret = forwarder->on_audio(msg->copy())) != ERROR_SUCCESS) {  
540 - srs_error("forwarder process audio message failed. ret=%d", ret);  
541 - return ret;  
542 - }  
543 - }  
544 - }  
545 -  
546 - // cache the sequence header if h264  
547 - if (SrsCodec::audio_is_sequence_header(msg->payload, msg->size)) {  
548 - srs_freep(cache_sh_audio);  
549 - cache_sh_audio = msg->copy();  
550 - srs_trace("update audio sequence header success. size=%d", msg->header.payload_length);  
551 - return ret;  
552 - }  
553 -  
554 - // cache the last gop packets  
555 - if ((ret = gop_cache->cache(msg)) != ERROR_SUCCESS) {  
556 - srs_error("shrink gop cache failed. ret=%d", ret);  
557 - return ret;  
558 - }  
559 - srs_verbose("cache gop success.");  
560 -  
561 - return ret;  
562 -}  
563 -  
564 -int SrsSource::on_video(SrsCommonMessage* video)  
565 -{  
566 - int ret = ERROR_SUCCESS;  
567 -  
568 - SrsSharedPtrMessage* msg = new SrsSharedPtrMessage();  
569 - SrsAutoFree(SrsSharedPtrMessage, msg, false);  
570 - if ((ret = msg->initialize(video)) != ERROR_SUCCESS) {  
571 - srs_error("initialize the video failed. ret=%d", ret);  
572 - return ret;  
573 - }  
574 - srs_verbose("initialize shared ptr video success.");  
575 -  
576 -#ifdef SRS_HLS  
577 - if ((ret = hls->on_video(msg->copy())) != ERROR_SUCCESS) {  
578 - srs_warn("hls process video message failed, ignore and disable hls. ret=%d", ret);  
579 -  
580 - // unpublish, ignore ret.  
581 - hls->on_unpublish();  
582 -  
583 - // ignore.  
584 - ret = ERROR_SUCCESS;  
585 - }  
586 -#endif  
587 -  
588 - // copy to all consumer  
589 - if (true) {  
590 - std::vector<SrsConsumer*>::iterator it;  
591 - for (it = consumers.begin(); it != consumers.end(); ++it) {  
592 - SrsConsumer* consumer = *it;  
593 - if ((ret = consumer->enqueue(msg->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {  
594 - srs_error("dispatch the video failed. ret=%d", ret);  
595 - return ret;  
596 - }  
597 - }  
598 - srs_info("dispatch video success.");  
599 - }  
600 -  
601 - // copy to all forwarders.  
602 - if (true) {  
603 - std::vector<SrsForwarder*>::iterator it;  
604 - for (it = forwarders.begin(); it != forwarders.end(); ++it) {  
605 - SrsForwarder* forwarder = *it;  
606 - if ((ret = forwarder->on_video(msg->copy())) != ERROR_SUCCESS) {  
607 - srs_error("forwarder process video message failed. ret=%d", ret);  
608 - return ret;  
609 - }  
610 - }  
611 - }  
612 -  
613 - // cache the sequence header if h264  
614 - if (SrsCodec::video_is_sequence_header(msg->payload, msg->size)) {  
615 - srs_freep(cache_sh_video);  
616 - cache_sh_video = msg->copy();  
617 - srs_trace("update video sequence header success. size=%d", msg->header.payload_length);  
618 - return ret;  
619 - }  
620 -  
621 - // cache the last gop packets  
622 - if ((ret = gop_cache->cache(msg)) != ERROR_SUCCESS) {  
623 - srs_error("shrink gop cache failed. ret=%d", ret);  
624 - return ret;  
625 - }  
626 - srs_verbose("cache gop success.");  
627 -  
628 - return ret;  
629 -}  
630 -  
631 -int SrsSource::on_publish(SrsRequest* req)  
632 -{  
633 - int ret = ERROR_SUCCESS;  
634 -  
635 - _can_publish = false;  
636 -  
637 - // TODO: support reload.  
638 -  
639 - // create forwarders  
640 - SrsConfDirective* conf = config->get_forward(req->vhost);  
641 - for (int i = 0; conf && i < (int)conf->args.size(); i++) {  
642 - std::string forward_server = conf->args.at(i);  
643 -  
644 - SrsForwarder* forwarder = new SrsForwarder();  
645 - forwarders.push_back(forwarder);  
646 -  
647 - if ((ret = forwarder->on_publish(req, forward_server)) != ERROR_SUCCESS) {  
648 - srs_error("start forwarder failed. "  
649 - "vhost=%s, app=%s, stream=%s, forward-to=%s",  
650 - req->vhost.c_str(), req->app.c_str(), req->stream.c_str(),  
651 - forward_server.c_str());  
652 - return ret;  
653 - }  
654 - }  
655 -  
656 -#ifdef SRS_FFMPEG  
657 - if ((ret = encoder->on_publish(req)) != ERROR_SUCCESS) {  
658 - return ret;  
659 - }  
660 -#endif  
661 -  
662 -#ifdef SRS_HLS  
663 - if ((ret = hls->on_publish(req)) != ERROR_SUCCESS) {  
664 - return ret;  
665 - }  
666 -#endif  
667 -  
668 - return ret;  
669 -}  
670 -  
671 -void SrsSource::on_unpublish()  
672 -{  
673 - // close all forwarders  
674 - std::vector<SrsForwarder*>::iterator it;  
675 - for (it = forwarders.begin(); it != forwarders.end(); ++it) {  
676 - SrsForwarder* forwarder = *it;  
677 - forwarder->on_unpublish();  
678 - srs_freep(forwarder);  
679 - }  
680 - forwarders.clear();  
681 -  
682 -#ifdef SRS_FFMPEG  
683 - encoder->on_unpublish();  
684 -#endif  
685 -  
686 -#ifdef SRS_HLS  
687 - hls->on_unpublish();  
688 -#endif  
689 -  
690 - gop_cache->clear();  
691 -  
692 - srs_freep(cache_metadata);  
693 - frame_rate = sample_rate = 0;  
694 -  
695 - srs_freep(cache_sh_video);  
696 - srs_freep(cache_sh_audio);  
697 -  
698 - srs_trace("clear cache/metadata/sequence-headers when unpublish.");  
699 -  
700 - _can_publish = true;  
701 -}  
702 -  
703 - int SrsSource::create_consumer(SrsConsumer*& consumer)  
704 -{  
705 - int ret = ERROR_SUCCESS;  
706 -  
707 - consumer = new SrsConsumer(this);  
708 - consumers.push_back(consumer);  
709 -  
710 - if (cache_metadata && (ret = consumer->enqueue(cache_metadata->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {  
711 - srs_error("dispatch metadata failed. ret=%d", ret);  
712 - return ret;  
713 - }  
714 - srs_info("dispatch metadata success");  
715 -  
716 - if (cache_sh_video && (ret = consumer->enqueue(cache_sh_video->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {  
717 - srs_error("dispatch video sequence header failed. ret=%d", ret);  
718 - return ret;  
719 - }  
720 - srs_info("dispatch video sequence header success");  
721 -  
722 - if (cache_sh_audio && (ret = consumer->enqueue(cache_sh_audio->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {  
723 - srs_error("dispatch audio sequence header failed. ret=%d", ret);  
724 - return ret;  
725 - }  
726 - srs_info("dispatch audio sequence header success");  
727 -  
728 - if ((ret = gop_cache->dump(consumer, sample_rate, frame_rate)) != ERROR_SUCCESS) {  
729 - return ret;  
730 - }  
731 -  
732 - return ret;  
733 -}  
734 -  
735 -void SrsSource::on_consumer_destroy(SrsConsumer* consumer)  
736 -{  
737 - std::vector<SrsConsumer*>::iterator it;  
738 - it = std::find(consumers.begin(), consumers.end(), consumer);  
739 - if (it != consumers.end()) {  
740 - consumers.erase(it);  
741 - }  
742 - srs_info("handle consumer destroy success.");  
743 -}  
744 -  
745 -void SrsSource::set_cache(bool enabled)  
746 -{  
747 - gop_cache->set(enabled);  
748 -}  
749 - 1 +/*
  2 +The MIT License (MIT)
  3 +
  4 +Copyright (c) 2013 winlin
  5 +
  6 +Permission is hereby granted, free of charge, to any person obtaining a copy of
  7 +this software and associated documentation files (the "Software"), to deal in
  8 +the Software without restriction, including without limitation the rights to
  9 +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
  10 +the Software, and to permit persons to whom the Software is furnished to do so,
  11 +subject to the following conditions:
  12 +
  13 +The above copyright notice and this permission notice shall be included in all
  14 +copies or substantial portions of the Software.
  15 +
  16 +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17 +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
  18 +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
  19 +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
  20 +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  21 +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  22 +*/
  23 +
  24 +#include <srs_core_source.hpp>
  25 +
  26 +#include <algorithm>
  27 +
  28 +#include <srs_core_log.hpp>
  29 +#include <srs_core_protocol.hpp>
  30 +#include <srs_core_autofree.hpp>
  31 +#include <srs_core_amf0.hpp>
  32 +#include <srs_core_codec.hpp>
  33 +#include <srs_core_hls.hpp>
  34 +#include <srs_core_forward.hpp>
  35 +#include <srs_core_config.hpp>
  36 +#include <srs_core_encoder.hpp>
  37 +#include <srs_core_rtmp.hpp>
  38 +
  39 +#define CONST_MAX_JITTER_MS 500
  40 +#define DEFAULT_FRAME_TIME_MS 10
  41 +#define PAUSED_SHRINK_SIZE 250
  42 +
  43 +SrsRtmpJitter::SrsRtmpJitter()
  44 +{
  45 + last_pkt_correct_time = last_pkt_time = 0;
  46 +}
  47 +
  48 +SrsRtmpJitter::~SrsRtmpJitter()
  49 +{
  50 +}
  51 +
  52 +int SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, int tba, int tbv, int64_t* corrected_time)
  53 +{
  54 + int ret = ERROR_SUCCESS;
  55 +
  56 + int sample_rate = tba;
  57 + int frame_rate = tbv;
  58 +
  59 + /**
  60 + * we use a very simple time jitter detect/correct algorithm:
  61 + * 1. delta: ensure the delta is positive and valid,
  62 + * we set the delta to DEFAULT_FRAME_TIME_MS,
  63 + * if the delta of time is nagative or greater than CONST_MAX_JITTER_MS.
  64 + * 2. last_pkt_time: specifies the original packet time,
  65 + * is used to detect next jitter.
  66 + * 3. last_pkt_correct_time: simply add the positive delta,
  67 + * and enforce the time monotonically.
  68 + */
  69 + u_int32_t time = msg->header.timestamp;
  70 + int32_t delta = time - last_pkt_time;
  71 +
  72 + // if jitter detected, reset the delta.
  73 + if (delta < 0 || delta > CONST_MAX_JITTER_MS) {
  74 + // calc the right diff by audio sample rate
  75 + if (msg->header.is_audio() && sample_rate > 0) {
  76 + delta = (int32_t)(delta * 1000.0 / sample_rate);
  77 + } else if (msg->header.is_video() && frame_rate > 0) {
  78 + delta = (int32_t)(delta * 1.0 / frame_rate);
  79 + } else {
  80 + delta = DEFAULT_FRAME_TIME_MS;
  81 + }
  82 +
  83 + // sometimes, the time is absolute time, so correct it again.
  84 + if (delta < 0 || delta > CONST_MAX_JITTER_MS) {
  85 + delta = DEFAULT_FRAME_TIME_MS;
  86 + }
  87 +
  88 + srs_info("jitter detected, last_pts=%d, pts=%d, diff=%d, last_time=%d, time=%d, diff=%d",
  89 + last_pkt_time, time, time - last_pkt_time, last_pkt_correct_time, last_pkt_correct_time + delta, delta);
  90 + } else {
  91 + srs_verbose("timestamp no jitter. time=%d, last_pkt=%d, correct_to=%d",
  92 + time, last_pkt_time, last_pkt_correct_time + delta);
  93 + }
  94 +
  95 + last_pkt_correct_time = srs_max(0, last_pkt_correct_time + delta);
  96 +
  97 + if (corrected_time) {
  98 + *corrected_time = last_pkt_correct_time;
  99 + }
  100 + msg->header.timestamp = last_pkt_correct_time;
  101 +
  102 + last_pkt_time = time;
  103 +
  104 + return ret;
  105 +}
  106 +
  107 +int SrsRtmpJitter::get_time()
  108 +{
  109 + return (int)last_pkt_correct_time;
  110 +}
  111 +
  112 +SrsConsumer::SrsConsumer(SrsSource* _source)
  113 +{
  114 + source = _source;
  115 + paused = false;
  116 + jitter = new SrsRtmpJitter();
  117 +}
  118 +
  119 +SrsConsumer::~SrsConsumer()
  120 +{
  121 + clear();
  122 +
  123 + source->on_consumer_destroy(this);
  124 + srs_freep(jitter);
  125 +}
  126 +
  127 +int SrsConsumer::get_time()
  128 +{
  129 + return jitter->get_time();
  130 +}
  131 +
  132 +int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, int tba, int tbv)
  133 +{
  134 + int ret = ERROR_SUCCESS;
  135 +
  136 + if ((ret = jitter->correct(msg, tba, tbv)) != ERROR_SUCCESS) {
  137 + srs_freep(msg);
  138 + return ret;
  139 + }
  140 +
  141 + // TODO: check the queue size and drop packets if overflow.
  142 + msgs.push_back(msg);
  143 +
  144 + return ret;
  145 +}
  146 +
  147 +int SrsConsumer::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count)
  148 +{
  149 + int ret = ERROR_SUCCESS;
  150 +
  151 + if (msgs.empty()) {
  152 + return ret;
  153 + }
  154 +
  155 + if (paused) {
  156 + if ((int)msgs.size() >= PAUSED_SHRINK_SIZE) {
  157 + shrink();
  158 + }
  159 + return ret;
  160 + }
  161 +
  162 + if (max_count == 0) {
  163 + count = (int)msgs.size();
  164 + } else {
  165 + count = srs_min(max_count, (int)msgs.size());
  166 + }
  167 +
  168 + pmsgs = new SrsSharedPtrMessage*[count];
  169 +
  170 + for (int i = 0; i < count; i++) {
  171 + pmsgs[i] = msgs[i];
  172 + }
  173 +
  174 + if (count == (int)msgs.size()) {
  175 + msgs.clear();
  176 + } else {
  177 + msgs.erase(msgs.begin(), msgs.begin() + count);
  178 + }
  179 +
  180 + return ret;
  181 +}
  182 +
  183 +int SrsConsumer::on_play_client_pause(bool is_pause)
  184 +{
  185 + int ret = ERROR_SUCCESS;
  186 +
  187 + srs_trace("stream consumer change pause state %d=>%d", paused, is_pause);
  188 + paused = is_pause;
  189 +
  190 + return ret;
  191 +}
  192 +
  193 +void SrsConsumer::shrink()
  194 +{
  195 + int i = 0;
  196 + std::vector<SrsSharedPtrMessage*>::iterator it;
  197 +
  198 + // issue the last video iframe.
  199 + bool has_video = false;
  200 + int frame_to_remove = 0;
  201 + std::vector<SrsSharedPtrMessage*>::iterator iframe = msgs.end();
  202 + for (i = 0, it = msgs.begin(); it != msgs.end(); ++it, i++) {
  203 + SrsSharedPtrMessage* msg = *it;
  204 + if (msg->header.is_video()) {
  205 + has_video = true;
  206 + if (SrsCodec::video_is_keyframe(msg->payload, msg->size)) {
  207 + iframe = it;
  208 + frame_to_remove = i + 1;
  209 + }
  210 + }
  211 + }
  212 +
  213 + // last iframe is the first elem, ignore it.
  214 + if (iframe == msgs.begin()) {
  215 + return;
  216 + }
  217 +
  218 + // recalc the frame to remove
  219 + if (iframe == msgs.end()) {
  220 + frame_to_remove = 0;
  221 + }
  222 + if (!has_video) {
  223 + frame_to_remove = (int)msgs.size();
  224 + }
  225 +
  226 + srs_trace("shrink the cache queue, has_video=%d, has_iframe=%d, size=%d, removed=%d",
  227 + has_video, iframe != msgs.end(), (int)msgs.size(), frame_to_remove);
  228 +
  229 + // if no video, remove all audio.
  230 + if (!has_video) {
  231 + clear();
  232 + return;
  233 + }
  234 +
  235 + // if exists video Iframe, remove the frames before it.
  236 + if (iframe != msgs.end()) {
  237 + for (it = msgs.begin(); it != iframe; ++it) {
  238 + SrsSharedPtrMessage* msg = *it;
  239 + srs_freep(msg);
  240 + }
  241 + msgs.erase(msgs.begin(), iframe);
  242 + }
  243 +}
  244 +
  245 +void SrsConsumer::clear()
  246 +{
  247 + std::vector<SrsSharedPtrMessage*>::iterator it;
  248 + for (it = msgs.begin(); it != msgs.end(); ++it) {
  249 + SrsSharedPtrMessage* msg = *it;
  250 + srs_freep(msg);
  251 + }
  252 + msgs.clear();
  253 +}
  254 +
  255 +SrsGopCache::SrsGopCache()
  256 +{
  257 + cached_video_count = 0;
  258 + enable_gop_cache = true;
  259 +}
  260 +
  261 +SrsGopCache::~SrsGopCache()
  262 +{
  263 + clear();
  264 +}
  265 +
  266 +void SrsGopCache::set(bool enabled)
  267 +{
  268 + enable_gop_cache = enabled;
  269 +
  270 + if (!enabled) {
  271 + srs_info("disable gop cache, clear %d packets.", (int)gop_cache.size());
  272 + clear();
  273 + return;
  274 + }
  275 +
  276 + srs_info("enable gop cache");
  277 +}
  278 +
  279 +int SrsGopCache::cache(SrsSharedPtrMessage* msg)
  280 +{
  281 + int ret = ERROR_SUCCESS;
  282 +
  283 + if (!enable_gop_cache) {
  284 + srs_verbose("gop cache is disabled.");
  285 + return ret;
  286 + }
  287 +
  288 + // got video, update the video count if acceptable
  289 + if (msg->header.is_video()) {
  290 + cached_video_count++;
  291 + }
  292 +
  293 + // no acceptable video or pure audio, disable the cache.
  294 + if (cached_video_count == 0) {
  295 + srs_verbose("ignore any frame util got a h264 video frame.");
  296 + return ret;
  297 + }
  298 +
  299 + // clear gop cache when got key frame
  300 + if (msg->header.is_video() && SrsCodec::video_is_keyframe(msg->payload, msg->size)) {
  301 + srs_info("clear gop cache when got keyframe. vcount=%d, count=%d",
  302 + cached_video_count, (int)gop_cache.size());
  303 +
  304 + clear();
  305 +
  306 + // curent msg is video frame, so we set to 1.
  307 + cached_video_count = 1;
  308 + }
  309 +
  310 + // cache the frame.
  311 + gop_cache.push_back(msg->copy());
  312 +
  313 + return ret;
  314 +}
  315 +
  316 +void SrsGopCache::clear()
  317 +{
  318 + std::vector<SrsSharedPtrMessage*>::iterator it;
  319 + for (it = gop_cache.begin(); it != gop_cache.end(); ++it) {
  320 + SrsSharedPtrMessage* msg = *it;
  321 + srs_freep(msg);
  322 + }
  323 + gop_cache.clear();
  324 +
  325 + cached_video_count = 0;
  326 +}
  327 +
  328 +int SrsGopCache::dump(SrsConsumer* consumer, int tba, int tbv)
  329 +{
  330 + int ret = ERROR_SUCCESS;
  331 +
  332 + std::vector<SrsSharedPtrMessage*>::iterator it;
  333 + for (it = gop_cache.begin(); it != gop_cache.end(); ++it) {
  334 + SrsSharedPtrMessage* msg = *it;
  335 + if ((ret = consumer->enqueue(msg->copy(), tba, tbv)) != ERROR_SUCCESS) {
  336 + srs_error("dispatch cached gop failed. ret=%d", ret);
  337 + return ret;
  338 + }
  339 + }
  340 + srs_trace("dispatch cached gop success. count=%d, duration=%d", (int)gop_cache.size(), consumer->get_time());
  341 +
  342 + return ret;
  343 +}
  344 +
  345 +std::map<std::string, SrsSource*> SrsSource::pool;
  346 +
  347 +SrsSource* SrsSource::find(std::string stream_url)
  348 +{
  349 + if (pool.find(stream_url) == pool.end()) {
  350 + pool[stream_url] = new SrsSource(stream_url);
  351 + srs_verbose("create new source for url=%s", stream_url.c_str());
  352 + }
  353 +
  354 + return pool[stream_url];
  355 +}
  356 +
  357 +SrsSource::SrsSource(std::string _stream_url)
  358 +{
  359 + stream_url = _stream_url;
  360 +
  361 +#ifdef SRS_HLS
  362 + hls = new SrsHls();
  363 +#endif
  364 +#ifdef SRS_FFMPEG
  365 + encoder = new SrsEncoder();
  366 +#endif
  367 +
  368 + cache_metadata = cache_sh_video = cache_sh_audio = NULL;
  369 +
  370 + frame_rate = sample_rate = 0;
  371 + _can_publish = true;
  372 +
  373 + gop_cache = new SrsGopCache();
  374 +}
  375 +
  376 +SrsSource::~SrsSource()
  377 +{
  378 + if (true) {
  379 + std::vector<SrsConsumer*>::iterator it;
  380 + for (it = consumers.begin(); it != consumers.end(); ++it) {
  381 + SrsConsumer* consumer = *it;
  382 + srs_freep(consumer);
  383 + }
  384 + consumers.clear();
  385 + }
  386 +
  387 + if (true) {
  388 + std::vector<SrsForwarder*>::iterator it;
  389 + for (it = forwarders.begin(); it != forwarders.end(); ++it) {
  390 + SrsForwarder* forwarder = *it;
  391 + srs_freep(forwarder);
  392 + }
  393 + forwarders.clear();
  394 + }
  395 +
  396 + srs_freep(cache_metadata);
  397 + srs_freep(cache_sh_video);
  398 + srs_freep(cache_sh_audio);
  399 +
  400 + srs_freep(gop_cache);
  401 +
  402 +#ifdef SRS_HLS
  403 + srs_freep(hls);
  404 +#endif
  405 +#ifdef SRS_FFMPEG
  406 + srs_freep(encoder);
  407 +#endif
  408 +}
  409 +
  410 +bool SrsSource::can_publish()
  411 +{
  412 + return _can_publish;
  413 +}
  414 +
  415 +int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata)
  416 +{
  417 + int ret = ERROR_SUCCESS;
  418 +
  419 +#ifdef SRS_HLS
  420 + if ((ret = hls->on_meta_data(metadata)) != ERROR_SUCCESS) {
  421 + srs_error("hls process onMetaData message failed. ret=%d", ret);
  422 + return ret;
  423 + }
  424 +#endif
  425 +
  426 + metadata->metadata->set("server", new SrsAmf0String(
  427 + RTMP_SIG_SRS_KEY" "RTMP_SIG_SRS_VERSION" ("RTMP_SIG_SRS_URL_SHORT")"));
  428 +
  429 + SrsAmf0Any* prop = NULL;
  430 + if ((prop = metadata->metadata->get_property("audiosamplerate")) != NULL) {
  431 + if (prop->is_number()) {
  432 + sample_rate = (int)(srs_amf0_convert<SrsAmf0Number>(prop)->value);
  433 + }
  434 + }
  435 + if ((prop = metadata->metadata->get_property("framerate")) != NULL) {
  436 + if (prop->is_number()) {
  437 + frame_rate = (int)(srs_amf0_convert<SrsAmf0Number>(prop)->value);
  438 + }
  439 + }
  440 +
  441 + // encode the metadata to payload
  442 + int size = metadata->get_payload_length();
  443 + if (size <= 0) {
  444 + srs_warn("ignore the invalid metadata. size=%d", size);
  445 + return ret;
  446 + }
  447 + srs_verbose("get metadata size success.");
  448 +
  449 + char* payload = new char[size];
  450 + memset(payload, 0, size);
  451 + if ((ret = metadata->encode(size, payload)) != ERROR_SUCCESS) {
  452 + srs_error("encode metadata error. ret=%d", ret);
  453 + srs_freepa(payload);
  454 + return ret;
  455 + }
  456 + srs_verbose("encode metadata success.");
  457 +
  458 + // create a shared ptr message.
  459 + srs_freep(cache_metadata);
  460 + cache_metadata = new SrsSharedPtrMessage();
  461 +
  462 + // dump message to shared ptr message.
  463 + if ((ret = cache_metadata->initialize(msg, payload, size)) != ERROR_SUCCESS) {
  464 + srs_error("initialize the cache metadata failed. ret=%d", ret);
  465 + return ret;
  466 + }
  467 + srs_verbose("initialize shared ptr metadata success.");
  468 +
  469 + // copy to all consumer
  470 + if (true) {
  471 + std::vector<SrsConsumer*>::iterator it;
  472 + for (it = consumers.begin(); it != consumers.end(); ++it) {
  473 + SrsConsumer* consumer = *it;
  474 + if ((ret = consumer->enqueue(cache_metadata->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {
  475 + srs_error("dispatch the metadata failed. ret=%d", ret);
  476 + return ret;
  477 + }
  478 + }
  479 + srs_trace("dispatch metadata success.");
  480 + }
  481 +
  482 + // copy to all forwarders
  483 + if (true) {
  484 + std::vector<SrsForwarder*>::iterator it;
  485 + for (it = forwarders.begin(); it != forwarders.end(); ++it) {
  486 + SrsForwarder* forwarder = *it;
  487 + if ((ret = forwarder->on_meta_data(cache_metadata->copy())) != ERROR_SUCCESS) {
  488 + srs_error("forwarder process onMetaData message failed. ret=%d", ret);
  489 + return ret;
  490 + }
  491 + }
  492 + }
  493 +
  494 + return ret;
  495 +}
  496 +
  497 +int SrsSource::on_audio(SrsCommonMessage* audio)
  498 +{
  499 + int ret = ERROR_SUCCESS;
  500 +
  501 + SrsSharedPtrMessage* msg = new SrsSharedPtrMessage();
  502 + SrsAutoFree(SrsSharedPtrMessage, msg, false);
  503 + if ((ret = msg->initialize(audio)) != ERROR_SUCCESS) {
  504 + srs_error("initialize the audio failed. ret=%d", ret);
  505 + return ret;
  506 + }
  507 + srs_verbose("initialize shared ptr audio success.");
  508 +
  509 +#ifdef SRS_HLS
  510 + if ((ret = hls->on_audio(msg->copy())) != ERROR_SUCCESS) {
  511 + srs_warn("hls process audio message failed, ignore and disable hls. ret=%d", ret);
  512 +
  513 + // unpublish, ignore ret.
  514 + hls->on_unpublish();
  515 +
  516 + // ignore.
  517 + ret = ERROR_SUCCESS;
  518 + }
  519 +#endif
  520 +
  521 + // copy to all consumer
  522 + if (true) {
  523 + std::vector<SrsConsumer*>::iterator it;
  524 + for (it = consumers.begin(); it != consumers.end(); ++it) {
  525 + SrsConsumer* consumer = *it;
  526 + if ((ret = consumer->enqueue(msg->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {
  527 + srs_error("dispatch the audio failed. ret=%d", ret);
  528 + return ret;
  529 + }
  530 + }
  531 + srs_info("dispatch audio success.");
  532 + }
  533 +
  534 + // copy to all forwarders.
  535 + if (true) {
  536 + std::vector<SrsForwarder*>::iterator it;
  537 + for (it = forwarders.begin(); it != forwarders.end(); ++it) {
  538 + SrsForwarder* forwarder = *it;
  539 + if ((ret = forwarder->on_audio(msg->copy())) != ERROR_SUCCESS) {
  540 + srs_error("forwarder process audio message failed. ret=%d", ret);
  541 + return ret;
  542 + }
  543 + }
  544 + }
  545 +
  546 + // cache the sequence header if h264
  547 + if (SrsCodec::audio_is_sequence_header(msg->payload, msg->size)) {
  548 + srs_freep(cache_sh_audio);
  549 + cache_sh_audio = msg->copy();
  550 + srs_trace("update audio sequence header success. size=%d", msg->header.payload_length);
  551 + return ret;
  552 + }
  553 +
  554 + // cache the last gop packets
  555 + if ((ret = gop_cache->cache(msg)) != ERROR_SUCCESS) {
  556 + srs_error("shrink gop cache failed. ret=%d", ret);
  557 + return ret;
  558 + }
  559 + srs_verbose("cache gop success.");
  560 +
  561 + return ret;
  562 +}
  563 +
  564 +int SrsSource::on_video(SrsCommonMessage* video)
  565 +{
  566 + int ret = ERROR_SUCCESS;
  567 +
  568 + SrsSharedPtrMessage* msg = new SrsSharedPtrMessage();
  569 + SrsAutoFree(SrsSharedPtrMessage, msg, false);
  570 + if ((ret = msg->initialize(video)) != ERROR_SUCCESS) {
  571 + srs_error("initialize the video failed. ret=%d", ret);
  572 + return ret;
  573 + }
  574 + srs_verbose("initialize shared ptr video success.");
  575 +
  576 +#ifdef SRS_HLS
  577 + if ((ret = hls->on_video(msg->copy())) != ERROR_SUCCESS) {
  578 + srs_warn("hls process video message failed, ignore and disable hls. ret=%d", ret);
  579 +
  580 + // unpublish, ignore ret.
  581 + hls->on_unpublish();
  582 +
  583 + // ignore.
  584 + ret = ERROR_SUCCESS;
  585 + }
  586 +#endif
  587 +
  588 + // copy to all consumer
  589 + if (true) {
  590 + std::vector<SrsConsumer*>::iterator it;
  591 + for (it = consumers.begin(); it != consumers.end(); ++it) {
  592 + SrsConsumer* consumer = *it;
  593 + if ((ret = consumer->enqueue(msg->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {
  594 + srs_error("dispatch the video failed. ret=%d", ret);
  595 + return ret;
  596 + }
  597 + }
  598 + srs_info("dispatch video success.");
  599 + }
  600 +
  601 + // copy to all forwarders.
  602 + if (true) {
  603 + std::vector<SrsForwarder*>::iterator it;
  604 + for (it = forwarders.begin(); it != forwarders.end(); ++it) {
  605 + SrsForwarder* forwarder = *it;
  606 + if ((ret = forwarder->on_video(msg->copy())) != ERROR_SUCCESS) {
  607 + srs_error("forwarder process video message failed. ret=%d", ret);
  608 + return ret;
  609 + }
  610 + }
  611 + }
  612 +
  613 + // cache the sequence header if h264
  614 + if (SrsCodec::video_is_sequence_header(msg->payload, msg->size)) {
  615 + srs_freep(cache_sh_video);
  616 + cache_sh_video = msg->copy();
  617 + srs_trace("update video sequence header success. size=%d", msg->header.payload_length);
  618 + return ret;
  619 + }
  620 +
  621 + // cache the last gop packets
  622 + if ((ret = gop_cache->cache(msg)) != ERROR_SUCCESS) {
  623 + srs_error("shrink gop cache failed. ret=%d", ret);
  624 + return ret;
  625 + }
  626 + srs_verbose("cache gop success.");
  627 +
  628 + return ret;
  629 +}
  630 +
  631 +int SrsSource::on_publish(SrsRequest* req)
  632 +{
  633 + int ret = ERROR_SUCCESS;
  634 +
  635 + _can_publish = false;
  636 +
  637 + // TODO: support reload.
  638 +
  639 + // create forwarders
  640 + SrsConfDirective* conf = config->get_forward(req->vhost);
  641 + for (int i = 0; conf && i < (int)conf->args.size(); i++) {
  642 + std::string forward_server = conf->args.at(i);
  643 +
  644 + SrsForwarder* forwarder = new SrsForwarder();
  645 + forwarders.push_back(forwarder);
  646 +
  647 + if ((ret = forwarder->on_publish(req, forward_server)) != ERROR_SUCCESS) {
  648 + srs_error("start forwarder failed. "
  649 + "vhost=%s, app=%s, stream=%s, forward-to=%s",
  650 + req->vhost.c_str(), req->app.c_str(), req->stream.c_str(),
  651 + forward_server.c_str());
  652 + return ret;
  653 + }
  654 + }
  655 +
  656 +#ifdef SRS_FFMPEG
  657 + if ((ret = encoder->on_publish(req)) != ERROR_SUCCESS) {
  658 + return ret;
  659 + }
  660 +#endif
  661 +
  662 +#ifdef SRS_HLS
  663 + if ((ret = hls->on_publish(req)) != ERROR_SUCCESS) {
  664 + return ret;
  665 + }
  666 +#endif
  667 +
  668 + return ret;
  669 +}
  670 +
  671 +void SrsSource::on_unpublish()
  672 +{
  673 + // close all forwarders
  674 + std::vector<SrsForwarder*>::iterator it;
  675 + for (it = forwarders.begin(); it != forwarders.end(); ++it) {
  676 + SrsForwarder* forwarder = *it;
  677 + forwarder->on_unpublish();
  678 + srs_freep(forwarder);
  679 + }
  680 + forwarders.clear();
  681 +
  682 +#ifdef SRS_FFMPEG
  683 + encoder->on_unpublish();
  684 +#endif
  685 +
  686 + // TODO: HLS should continue previous sequence and stream.
  687 +#ifdef SRS_HLS
  688 + hls->on_unpublish();
  689 +#endif
  690 +
  691 + gop_cache->clear();
  692 +
  693 + srs_freep(cache_metadata);
  694 + frame_rate = sample_rate = 0;
  695 +
  696 + srs_freep(cache_sh_video);
  697 + srs_freep(cache_sh_audio);
  698 +
  699 + srs_trace("clear cache/metadata/sequence-headers when unpublish.");
  700 +
  701 + _can_publish = true;
  702 +}
  703 +
  704 + int SrsSource::create_consumer(SrsConsumer*& consumer)
  705 +{
  706 + int ret = ERROR_SUCCESS;
  707 +
  708 + consumer = new SrsConsumer(this);
  709 + consumers.push_back(consumer);
  710 +
  711 + if (cache_metadata && (ret = consumer->enqueue(cache_metadata->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {
  712 + srs_error("dispatch metadata failed. ret=%d", ret);
  713 + return ret;
  714 + }
  715 + srs_info("dispatch metadata success");
  716 +
  717 + if (cache_sh_video && (ret = consumer->enqueue(cache_sh_video->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {
  718 + srs_error("dispatch video sequence header failed. ret=%d", ret);
  719 + return ret;
  720 + }
  721 + srs_info("dispatch video sequence header success");
  722 +
  723 + if (cache_sh_audio && (ret = consumer->enqueue(cache_sh_audio->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {
  724 + srs_error("dispatch audio sequence header failed. ret=%d", ret);
  725 + return ret;
  726 + }
  727 + srs_info("dispatch audio sequence header success");
  728 +
  729 + if ((ret = gop_cache->dump(consumer, sample_rate, frame_rate)) != ERROR_SUCCESS) {
  730 + return ret;
  731 + }
  732 +
  733 + return ret;
  734 +}
  735 +
  736 +void SrsSource::on_consumer_destroy(SrsConsumer* consumer)
  737 +{
  738 + std::vector<SrsConsumer*>::iterator it;
  739 + it = std::find(consumers.begin(), consumers.end(), consumer);
  740 + if (it != consumers.end()) {
  741 + consumers.erase(it);
  742 + }
  743 + srs_info("handle consumer destroy success.");
  744 +}
  745 +
  746 +void SrsSource::set_cache(bool enabled)
  747 +{
  748 + gop_cache->set(enabled);
  749 +}
  750 +