winlin

add todo for the bug of forwarder

@@ -255,6 +255,9 @@ int SrsForwarder::cycle() @@ -255,6 +255,9 @@ int SrsForwarder::cycle()
255 srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret); 255 srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret);
256 return ret; 256 return ret;
257 } 257 }
  258 +
  259 + // TODO: FIXME: need to cache the metadata and sequence header when reconnect.
  260 +
258 if ((ret = client->publish(stream_name, stream_id)) != ERROR_SUCCESS) { 261 if ((ret = client->publish(stream_name, stream_id)) != ERROR_SUCCESS) {
259 srs_error("connect with server failed, stream_name=%s, stream_id=%d. ret=%d", 262 srs_error("connect with server failed, stream_name=%s, stream_id=%d. ret=%d",
260 stream_name.c_str(), stream_id, ret); 263 stream_name.c_str(), stream_id, ret);
1 -/*  
2 -The MIT License (MIT)  
3 -  
4 -Copyright (c) 2013 winlin  
5 -  
6 -Permission is hereby granted, free of charge, to any person obtaining a copy of  
7 -this software and associated documentation files (the "Software"), to deal in  
8 -the Software without restriction, including without limitation the rights to  
9 -use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of  
10 -the Software, and to permit persons to whom the Software is furnished to do so,  
11 -subject to the following conditions:  
12 -  
13 -The above copyright notice and this permission notice shall be included in all  
14 -copies or substantial portions of the Software.  
15 -  
16 -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR  
17 -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS  
18 -FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR  
19 -COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER  
20 -IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN  
21 -CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.  
22 -*/  
23 -  
24 -#include <srs_core_rtmp.hpp>  
25 -  
26 -#include <srs_core_log.hpp>  
27 -#include <srs_core_error.hpp>  
28 -#include <srs_core_socket.hpp>  
29 -#include <srs_core_protocol.hpp>  
30 -#include <srs_core_autofree.hpp>  
31 -#include <srs_core_amf0.hpp>  
32 -#include <srs_core_handshake.hpp>  
33 -#include <srs_core_config.hpp>  
34 -  
35 -/**  
36 -* the signature for packets to client.  
37 -*/  
38 -#define RTMP_SIG_FMS_VER "3,5,3,888"  
39 -#define RTMP_SIG_AMF0_VER 0  
40 -#define RTMP_SIG_CLIENT_ID "ASAICiss"  
41 -  
42 -/**  
43 -* onStatus consts.  
44 -*/  
45 -#define StatusLevel "level"  
46 -#define StatusCode "code"  
47 -#define StatusDescription "description"  
48 -#define StatusDetails "details"  
49 -#define StatusClientId "clientid"  
50 -// status value  
51 -#define StatusLevelStatus "status"  
52 -// code value  
53 -#define StatusCodeConnectSuccess "NetConnection.Connect.Success"  
54 -#define StatusCodeStreamReset "NetStream.Play.Reset"  
55 -#define StatusCodeStreamStart "NetStream.Play.Start"  
56 -#define StatusCodeStreamPause "NetStream.Pause.Notify"  
57 -#define StatusCodeStreamUnpause "NetStream.Unpause.Notify"  
58 -#define StatusCodePublishStart "NetStream.Publish.Start"  
59 -#define StatusCodeDataStart "NetStream.Data.Start"  
60 -#define StatusCodeUnpublishSuccess "NetStream.Unpublish.Success"  
61 -  
62 -// FMLE  
63 -#define RTMP_AMF0_COMMAND_ON_FC_PUBLISH "onFCPublish"  
64 -#define RTMP_AMF0_COMMAND_ON_FC_UNPUBLISH "onFCUnpublish"  
65 -  
66 -// default stream id for response the createStream request.  
67 -#define SRS_DEFAULT_SID 1  
68 -  
69 -SrsRequest::SrsRequest()  
70 -{  
71 - objectEncoding = RTMP_SIG_AMF0_VER;  
72 -}  
73 -  
74 -SrsRequest::~SrsRequest()  
75 -{  
76 -}  
77 -  
78 -int SrsRequest::discovery_app()  
79 -{  
80 - int ret = ERROR_SUCCESS;  
81 -  
82 - size_t pos = std::string::npos;  
83 - std::string url = tcUrl;  
84 -  
85 - if ((pos = url.find("://")) != std::string::npos) {  
86 - schema = url.substr(0, pos);  
87 - url = url.substr(schema.length() + 3);  
88 - srs_verbose("discovery schema=%s", schema.c_str());  
89 - }  
90 -  
91 - if ((pos = url.find("/")) != std::string::npos) {  
92 - vhost = url.substr(0, pos);  
93 - url = url.substr(vhost.length() + 1);  
94 - srs_verbose("discovery vhost=%s", vhost.c_str());  
95 - }  
96 -  
97 - port = RTMP_DEFAULT_PORTS;  
98 - if ((pos = vhost.find(":")) != std::string::npos) {  
99 - port = vhost.substr(pos + 1);  
100 - vhost = vhost.substr(0, pos);  
101 - srs_verbose("discovery vhost=%s, port=%s", vhost.c_str(), port.c_str());  
102 - }  
103 -  
104 - app = url;  
105 - srs_vhost_resolve(vhost, app);  
106 -  
107 - // resolve the vhost from config  
108 - SrsConfDirective* parsed_vhost = config->get_vhost(vhost);  
109 - if (parsed_vhost) {  
110 - vhost = parsed_vhost->arg0();  
111 - }  
112 -  
113 - srs_info("discovery app success. schema=%s, vhost=%s, port=%s, app=%s",  
114 - schema.c_str(), vhost.c_str(), port.c_str(), app.c_str());  
115 -  
116 - if (schema.empty() || vhost.empty() || port.empty() || app.empty()) {  
117 - ret = ERROR_RTMP_REQ_TCURL;  
118 - srs_error("discovery tcUrl failed. "  
119 - "tcUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, ret=%d",  
120 - tcUrl.c_str(), schema.c_str(), vhost.c_str(), port.c_str(), app.c_str(), ret);  
121 - return ret;  
122 - }  
123 -  
124 - strip();  
125 -  
126 - return ret;  
127 -}  
128 -  
129 -std::string SrsRequest::get_stream_url()  
130 -{  
131 - std::string url = "";  
132 -  
133 - url += vhost;  
134 - url += "/";  
135 - url += app;  
136 - url += "/";  
137 - url += stream;  
138 -  
139 - return url;  
140 -}  
141 -  
142 -void SrsRequest::strip()  
143 -{  
144 - trim(vhost, "/ \n\r\t");  
145 - trim(app, "/ \n\r\t");  
146 - trim(stream, "/ \n\r\t");  
147 -}  
148 -  
149 -std::string& SrsRequest::trim(std::string& str, std::string chs)  
150 -{  
151 - for (int i = 0; i < (int)chs.length(); i++) {  
152 - char ch = chs.at(i);  
153 -  
154 - for (std::string::iterator it = str.begin(); it != str.end();) {  
155 - if (ch == *it) {  
156 - it = str.erase(it);  
157 - } else {  
158 - ++it;  
159 - }  
160 - }  
161 - }  
162 -  
163 - return str;  
164 -}  
165 -  
166 -SrsResponse::SrsResponse()  
167 -{  
168 - stream_id = SRS_DEFAULT_SID;  
169 -}  
170 -  
171 -SrsResponse::~SrsResponse()  
172 -{  
173 -}  
174 -  
175 -SrsRtmpClient::SrsRtmpClient(st_netfd_t _stfd)  
176 -{  
177 - stfd = _stfd;  
178 - protocol = new SrsProtocol(stfd);  
179 -}  
180 -  
181 -SrsRtmpClient::~SrsRtmpClient()  
182 -{  
183 - srs_freep(protocol);  
184 -}  
185 -  
186 -void SrsRtmpClient::set_recv_timeout(int64_t timeout_us)  
187 -{  
188 - protocol->set_recv_timeout(timeout_us);  
189 -}  
190 -  
191 -void SrsRtmpClient::set_send_timeout(int64_t timeout_us)  
192 -{  
193 - protocol->set_send_timeout(timeout_us);  
194 -}  
195 -  
196 -int64_t SrsRtmpClient::get_recv_bytes()  
197 -{  
198 - return protocol->get_recv_bytes();  
199 -}  
200 -  
201 -int64_t SrsRtmpClient::get_send_bytes()  
202 -{  
203 - return protocol->get_send_bytes();  
204 -}  
205 -  
206 -int SrsRtmpClient::get_recv_kbps()  
207 -{  
208 - return protocol->get_recv_kbps();  
209 -}  
210 -  
211 -int SrsRtmpClient::get_send_kbps()  
212 -{  
213 - return protocol->get_send_kbps();  
214 -}  
215 -  
216 -int SrsRtmpClient::recv_message(SrsCommonMessage** pmsg)  
217 -{  
218 - return protocol->recv_message(pmsg);  
219 -}  
220 -  
221 -int SrsRtmpClient::send_message(ISrsMessage* msg)  
222 -{  
223 - return protocol->send_message(msg);  
224 -}  
225 -  
226 -int SrsRtmpClient::handshake()  
227 -{  
228 - int ret = ERROR_SUCCESS;  
229 -  
230 - SrsSocket skt(stfd);  
231 -  
232 - SrsComplexHandshake complex_hs;  
233 - SrsSimpleHandshake simple_hs;  
234 - if ((ret = simple_hs.handshake_with_server(skt, complex_hs)) != ERROR_SUCCESS) {  
235 - return ret;  
236 - }  
237 -  
238 - return ret;  
239 -}  
240 -  
241 -int SrsRtmpClient::connect_app(std::string app, std::string tc_url)  
242 -{  
243 - int ret = ERROR_SUCCESS;  
244 -  
245 - // Connect(vhost, app)  
246 - if (true) {  
247 - SrsCommonMessage* msg = new SrsCommonMessage();  
248 - SrsConnectAppPacket* pkt = new SrsConnectAppPacket();  
249 - msg->set_packet(pkt, 0);  
250 -  
251 - pkt->command_object = new SrsAmf0Object();  
252 - pkt->command_object->set("app", new SrsAmf0String(app.c_str()));  
253 - pkt->command_object->set("swfUrl", new SrsAmf0String());  
254 - pkt->command_object->set("tcUrl", new SrsAmf0String(tc_url.c_str()));  
255 - pkt->command_object->set("fpad", new SrsAmf0Boolean(false));  
256 - pkt->command_object->set("capabilities", new SrsAmf0Number(239));  
257 - pkt->command_object->set("audioCodecs", new SrsAmf0Number(3575));  
258 - pkt->command_object->set("videoCodecs", new SrsAmf0Number(252));  
259 - pkt->command_object->set("videoFunction", new SrsAmf0Number(1));  
260 - pkt->command_object->set("pageUrl", new SrsAmf0String());  
261 - pkt->command_object->set("objectEncoding", new SrsAmf0Number(0));  
262 -  
263 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
264 - return ret;  
265 - }  
266 - }  
267 -  
268 - // Set Window Acknowledgement size(2500000)  
269 - if (true) {  
270 - SrsCommonMessage* msg = new SrsCommonMessage();  
271 - SrsSetWindowAckSizePacket* pkt = new SrsSetWindowAckSizePacket();  
272 -  
273 - pkt->ackowledgement_window_size = 2500000;  
274 - msg->set_packet(pkt, 0);  
275 -  
276 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
277 - return ret;  
278 - }  
279 - }  
280 -  
281 - // expect connect _result  
282 - SrsCommonMessage* msg = NULL;  
283 - SrsConnectAppResPacket* pkt = NULL;  
284 - if ((ret = srs_rtmp_expect_message<SrsConnectAppResPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {  
285 - srs_error("expect connect app response message failed. ret=%d", ret);  
286 - return ret;  
287 - }  
288 - SrsAutoFree(SrsCommonMessage, msg, false);  
289 - srs_info("get connect app response message");  
290 -  
291 - return ret;  
292 -}  
293 -  
294 -int SrsRtmpClient::create_stream(int& stream_id)  
295 -{  
296 - int ret = ERROR_SUCCESS;  
297 -  
298 - // CreateStream  
299 - if (true) {  
300 - SrsCommonMessage* msg = new SrsCommonMessage();  
301 - SrsCreateStreamPacket* pkt = new SrsCreateStreamPacket();  
302 -  
303 - msg->set_packet(pkt, 0);  
304 -  
305 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
306 - return ret;  
307 - }  
308 - }  
309 -  
310 - // CreateStream _result.  
311 - if (true) {  
312 - SrsCommonMessage* msg = NULL;  
313 - SrsCreateStreamResPacket* pkt = NULL;  
314 - if ((ret = srs_rtmp_expect_message<SrsCreateStreamResPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {  
315 - srs_error("expect create stream response message failed. ret=%d", ret);  
316 - return ret;  
317 - }  
318 - SrsAutoFree(SrsCommonMessage, msg, false);  
319 - srs_info("get create stream response message");  
320 -  
321 - stream_id = (int)pkt->stream_id;  
322 - }  
323 -  
324 - return ret;  
325 -}  
326 -  
327 -int SrsRtmpClient::play(std::string stream, int stream_id)  
328 -{  
329 - int ret = ERROR_SUCCESS;  
330 -  
331 - // Play(stream)  
332 - if (true) {  
333 - SrsCommonMessage* msg = new SrsCommonMessage();  
334 - SrsPlayPacket* pkt = new SrsPlayPacket();  
335 -  
336 - pkt->stream_name = stream;  
337 - msg->set_packet(pkt, stream_id);  
338 -  
339 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
340 - srs_error("send play stream failed. "  
341 - "stream=%s, stream_id=%d, ret=%d",  
342 - stream.c_str(), stream_id, ret);  
343 - return ret;  
344 - }  
345 - }  
346 -  
347 - // SetBufferLength(1000ms)  
348 - int buffer_length_ms = 1000;  
349 - if (true) {  
350 - SrsCommonMessage* msg = new SrsCommonMessage();  
351 - SrsUserControlPacket* pkt = new SrsUserControlPacket();  
352 -  
353 - pkt->event_type = SrcPCUCSetBufferLength;  
354 - pkt->event_data = stream_id;  
355 - pkt->extra_data = buffer_length_ms;  
356 - msg->set_packet(pkt, 0);  
357 -  
358 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
359 - srs_error("send set buffer length failed. "  
360 - "stream=%s, stream_id=%d, bufferLength=%d, ret=%d",  
361 - stream.c_str(), stream_id, buffer_length_ms, ret);  
362 - return ret;  
363 - }  
364 - }  
365 -  
366 - return ret;  
367 -}  
368 -  
369 -int SrsRtmpClient::publish(std::string stream, int stream_id)  
370 -{  
371 - int ret = ERROR_SUCCESS;  
372 -  
373 - // publish(stream)  
374 - if (true) {  
375 - SrsCommonMessage* msg = new SrsCommonMessage();  
376 - SrsPublishPacket* pkt = new SrsPublishPacket();  
377 -  
378 - pkt->stream_name = stream;  
379 - msg->set_packet(pkt, stream_id);  
380 -  
381 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
382 - srs_error("send publish message failed. "  
383 - "stream=%s, stream_id=%d, ret=%d",  
384 - stream.c_str(), stream_id, ret);  
385 - return ret;  
386 - }  
387 - }  
388 -  
389 - return ret;  
390 -}  
391 -  
392 -SrsRtmp::SrsRtmp(st_netfd_t client_stfd)  
393 -{  
394 - protocol = new SrsProtocol(client_stfd);  
395 - stfd = client_stfd;  
396 -}  
397 -  
398 -SrsRtmp::~SrsRtmp()  
399 -{  
400 - srs_freep(protocol);  
401 -}  
402 -  
403 -SrsProtocol* SrsRtmp::get_protocol()  
404 -{  
405 - return protocol;  
406 -}  
407 -  
408 -void SrsRtmp::set_recv_timeout(int64_t timeout_us)  
409 -{  
410 - protocol->set_recv_timeout(timeout_us);  
411 -}  
412 -  
413 -int64_t SrsRtmp::get_recv_timeout()  
414 -{  
415 - return protocol->get_recv_timeout();  
416 -}  
417 -  
418 -void SrsRtmp::set_send_timeout(int64_t timeout_us)  
419 -{  
420 - protocol->set_send_timeout(timeout_us);  
421 -}  
422 -  
423 -int64_t SrsRtmp::get_recv_bytes()  
424 -{  
425 - return protocol->get_recv_bytes();  
426 -}  
427 -  
428 -int64_t SrsRtmp::get_send_bytes()  
429 -{  
430 - return protocol->get_send_bytes();  
431 -}  
432 -  
433 -int SrsRtmp::get_recv_kbps()  
434 -{  
435 - return protocol->get_recv_kbps();  
436 -}  
437 -  
438 -int SrsRtmp::get_send_kbps()  
439 -{  
440 - return protocol->get_send_kbps();  
441 -}  
442 -  
443 -int SrsRtmp::recv_message(SrsCommonMessage** pmsg)  
444 -{  
445 - return protocol->recv_message(pmsg);  
446 -}  
447 -  
448 -int SrsRtmp::send_message(ISrsMessage* msg)  
449 -{  
450 - return protocol->send_message(msg);  
451 -}  
452 -  
453 -int SrsRtmp::handshake()  
454 -{  
455 - int ret = ERROR_SUCCESS;  
456 -  
457 - SrsSocket skt(stfd);  
458 -  
459 - SrsComplexHandshake complex_hs;  
460 - SrsSimpleHandshake simple_hs;  
461 - if ((ret = simple_hs.handshake_with_client(skt, complex_hs)) != ERROR_SUCCESS) {  
462 - return ret;  
463 - }  
464 -  
465 - return ret;  
466 -}  
467 -  
468 -int SrsRtmp::connect_app(SrsRequest* req)  
469 -{  
470 - int ret = ERROR_SUCCESS;  
471 -  
472 - SrsCommonMessage* msg = NULL;  
473 - SrsConnectAppPacket* pkt = NULL;  
474 - if ((ret = srs_rtmp_expect_message<SrsConnectAppPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {  
475 - srs_error("expect connect app message failed. ret=%d", ret);  
476 - return ret;  
477 - }  
478 - SrsAutoFree(SrsCommonMessage, msg, false);  
479 - srs_info("get connect app message");  
480 -  
481 - SrsAmf0Any* prop = NULL;  
482 -  
483 - if ((prop = pkt->command_object->ensure_property_string("tcUrl")) == NULL) {  
484 - ret = ERROR_RTMP_REQ_CONNECT;  
485 - srs_error("invalid request, must specifies the tcUrl. ret=%d", ret);  
486 - return ret;  
487 - }  
488 - req->tcUrl = srs_amf0_convert<SrsAmf0String>(prop)->value;  
489 -  
490 - if ((prop = pkt->command_object->ensure_property_string("pageUrl")) != NULL) {  
491 - req->pageUrl = srs_amf0_convert<SrsAmf0String>(prop)->value;  
492 - }  
493 -  
494 - if ((prop = pkt->command_object->ensure_property_string("swfUrl")) != NULL) {  
495 - req->swfUrl = srs_amf0_convert<SrsAmf0String>(prop)->value;  
496 - }  
497 -  
498 - if ((prop = pkt->command_object->ensure_property_number("objectEncoding")) != NULL) {  
499 - req->objectEncoding = srs_amf0_convert<SrsAmf0Number>(prop)->value;  
500 - }  
501 -  
502 - srs_info("get connect app message params success.");  
503 -  
504 - return req->discovery_app();  
505 -}  
506 -  
507 -int SrsRtmp::set_window_ack_size(int ack_size)  
508 -{  
509 - int ret = ERROR_SUCCESS;  
510 -  
511 - SrsCommonMessage* msg = new SrsCommonMessage();  
512 - SrsSetWindowAckSizePacket* pkt = new SrsSetWindowAckSizePacket();  
513 -  
514 - pkt->ackowledgement_window_size = ack_size;  
515 - msg->set_packet(pkt, 0);  
516 -  
517 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
518 - srs_error("send ack size message failed. ret=%d", ret);  
519 - return ret;  
520 - }  
521 - srs_info("send ack size message success. ack_size=%d", ack_size);  
522 -  
523 - return ret;  
524 -}  
525 -  
526 -int SrsRtmp::set_peer_bandwidth(int bandwidth, int type)  
527 -{  
528 - int ret = ERROR_SUCCESS;  
529 -  
530 - SrsCommonMessage* msg = new SrsCommonMessage();  
531 - SrsSetPeerBandwidthPacket* pkt = new SrsSetPeerBandwidthPacket();  
532 -  
533 - pkt->bandwidth = bandwidth;  
534 - pkt->type = type;  
535 - msg->set_packet(pkt, 0);  
536 -  
537 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
538 - srs_error("send set bandwidth message failed. ret=%d", ret);  
539 - return ret;  
540 - }  
541 - srs_info("send set bandwidth message "  
542 - "success. bandwidth=%d, type=%d", bandwidth, type);  
543 -  
544 - return ret;  
545 -}  
546 -  
547 -int SrsRtmp::response_connect_app(SrsRequest* req)  
548 -{  
549 - int ret = ERROR_SUCCESS;  
550 -  
551 - SrsCommonMessage* msg = new SrsCommonMessage();  
552 - SrsConnectAppResPacket* pkt = new SrsConnectAppResPacket();  
553 -  
554 - pkt->props->set("fmsVer", new SrsAmf0String("FMS/"RTMP_SIG_FMS_VER));  
555 - pkt->props->set("capabilities", new SrsAmf0Number(127));  
556 - pkt->props->set("mode", new SrsAmf0Number(1));  
557 -  
558 - pkt->info->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));  
559 - pkt->info->set(StatusCode, new SrsAmf0String(StatusCodeConnectSuccess));  
560 - pkt->info->set(StatusDescription, new SrsAmf0String("Connection succeeded"));  
561 - pkt->info->set("objectEncoding", new SrsAmf0Number(req->objectEncoding));  
562 - SrsASrsAmf0EcmaArray* data = new SrsASrsAmf0EcmaArray();  
563 - pkt->info->set("data", data);  
564 -  
565 - data->set("srs_version", new SrsAmf0String(RTMP_SIG_FMS_VER));  
566 - data->set("srs_server", new SrsAmf0String(RTMP_SIG_SRS_NAME));  
567 - data->set("srs_license", new SrsAmf0String(RTMP_SIG_SRS_LICENSE));  
568 - data->set("srs_role", new SrsAmf0String(RTMP_SIG_SRS_ROLE));  
569 - data->set("srs_url", new SrsAmf0String(RTMP_SIG_SRS_URL));  
570 - data->set("srs_version", new SrsAmf0String(RTMP_SIG_SRS_VERSION));  
571 - data->set("srs_site", new SrsAmf0String(RTMP_SIG_SRS_WEB));  
572 - data->set("srs_email", new SrsAmf0String(RTMP_SIG_SRS_EMAIL));  
573 - data->set("srs_copyright", new SrsAmf0String(RTMP_SIG_SRS_COPYRIGHT));  
574 -  
575 - msg->set_packet(pkt, 0);  
576 -  
577 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
578 - srs_error("send connect app response message failed. ret=%d", ret);  
579 - return ret;  
580 - }  
581 - srs_info("send connect app response message success.");  
582 -  
583 - return ret;  
584 -}  
585 -  
586 -int SrsRtmp::on_bw_done()  
587 -{  
588 - int ret = ERROR_SUCCESS;  
589 -  
590 - SrsCommonMessage* msg = new SrsCommonMessage();  
591 - SrsOnBWDonePacket* pkt = new SrsOnBWDonePacket();  
592 -  
593 - msg->set_packet(pkt, 0);  
594 -  
595 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
596 - srs_error("send onBWDone message failed. ret=%d", ret);  
597 - return ret;  
598 - }  
599 - srs_info("send onBWDone message success.");  
600 -  
601 - return ret;  
602 -}  
603 -  
604 -int SrsRtmp::identify_client(int stream_id, SrsClientType& type, std::string& stream_name)  
605 -{  
606 - type = SrsClientUnknown;  
607 - int ret = ERROR_SUCCESS;  
608 -  
609 - while (true) {  
610 - SrsCommonMessage* msg = NULL;  
611 - if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {  
612 - srs_error("recv identify client message failed. ret=%d", ret);  
613 - return ret;  
614 - }  
615 -  
616 - SrsAutoFree(SrsCommonMessage, msg, false);  
617 -  
618 - if (!msg->header.is_amf0_command() && !msg->header.is_amf3_command()) {  
619 - srs_trace("identify ignore messages except "  
620 - "AMF0/AMF3 command message. type=%#x", msg->header.message_type);  
621 - continue;  
622 - }  
623 -  
624 - if ((ret = msg->decode_packet(protocol)) != ERROR_SUCCESS) {  
625 - srs_error("identify decode message failed. ret=%d", ret);  
626 - return ret;  
627 - }  
628 -  
629 - SrsPacket* pkt = msg->get_packet();  
630 - if (dynamic_cast<SrsCreateStreamPacket*>(pkt)) {  
631 - srs_info("identify client by create stream, play or flash publish.");  
632 - return identify_create_stream_client(  
633 - dynamic_cast<SrsCreateStreamPacket*>(pkt), stream_id, type, stream_name);  
634 - }  
635 - if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) {  
636 - srs_info("identify client by releaseStream, fmle publish.");  
637 - return identify_fmle_publish_client(  
638 - dynamic_cast<SrsFMLEStartPacket*>(pkt), type, stream_name);  
639 - }  
640 -  
641 - srs_trace("ignore AMF0/AMF3 command message.");  
642 - }  
643 -  
644 - return ret;  
645 -}  
646 -  
647 -int SrsRtmp::set_chunk_size(int chunk_size)  
648 -{  
649 - int ret = ERROR_SUCCESS;  
650 -  
651 - SrsCommonMessage* msg = new SrsCommonMessage();  
652 - SrsSetChunkSizePacket* pkt = new SrsSetChunkSizePacket();  
653 -  
654 - pkt->chunk_size = chunk_size;  
655 - msg->set_packet(pkt, 0);  
656 -  
657 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
658 - srs_error("send set chunk size message failed. ret=%d", ret);  
659 - return ret;  
660 - }  
661 - srs_info("send set chunk size message success. chunk_size=%d", chunk_size);  
662 -  
663 - return ret;  
664 -}  
665 -  
666 -int SrsRtmp::start_play(int stream_id)  
667 -{  
668 - int ret = ERROR_SUCCESS;  
669 -  
670 - // StreamBegin  
671 - if (true) {  
672 - SrsCommonMessage* msg = new SrsCommonMessage();  
673 - SrsUserControlPacket* pkt = new SrsUserControlPacket();  
674 -  
675 - pkt->event_type = SrcPCUCStreamBegin;  
676 - pkt->event_data = stream_id;  
677 - msg->set_packet(pkt, 0);  
678 -  
679 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
680 - srs_error("send PCUC(StreamBegin) message failed. ret=%d", ret);  
681 - return ret;  
682 - }  
683 - srs_info("send PCUC(StreamBegin) message success.");  
684 - }  
685 -  
686 - // onStatus(NetStream.Play.Reset)  
687 - if (true) {  
688 - SrsCommonMessage* msg = new SrsCommonMessage();  
689 - SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();  
690 -  
691 - pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));  
692 - pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeStreamReset));  
693 - pkt->data->set(StatusDescription, new SrsAmf0String("Playing and resetting stream."));  
694 - pkt->data->set(StatusDetails, new SrsAmf0String("stream"));  
695 - pkt->data->set(StatusClientId, new SrsAmf0String(RTMP_SIG_CLIENT_ID));  
696 -  
697 - msg->set_packet(pkt, stream_id);  
698 -  
699 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
700 - srs_error("send onStatus(NetStream.Play.Reset) message failed. ret=%d", ret);  
701 - return ret;  
702 - }  
703 - srs_info("send onStatus(NetStream.Play.Reset) message success.");  
704 - }  
705 -  
706 - // onStatus(NetStream.Play.Start)  
707 - if (true) {  
708 - SrsCommonMessage* msg = new SrsCommonMessage();  
709 - SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();  
710 -  
711 - pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));  
712 - pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeStreamStart));  
713 - pkt->data->set(StatusDescription, new SrsAmf0String("Started playing stream."));  
714 - pkt->data->set(StatusDetails, new SrsAmf0String("stream"));  
715 - pkt->data->set(StatusClientId, new SrsAmf0String(RTMP_SIG_CLIENT_ID));  
716 -  
717 - msg->set_packet(pkt, stream_id);  
718 -  
719 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
720 - srs_error("send onStatus(NetStream.Play.Reset) message failed. ret=%d", ret);  
721 - return ret;  
722 - }  
723 - srs_info("send onStatus(NetStream.Play.Reset) message success.");  
724 - }  
725 -  
726 - // |RtmpSampleAccess(false, false)  
727 - if (true) {  
728 - SrsCommonMessage* msg = new SrsCommonMessage();  
729 - SrsSampleAccessPacket* pkt = new SrsSampleAccessPacket();  
730 -  
731 - msg->set_packet(pkt, stream_id);  
732 -  
733 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
734 - srs_error("send |RtmpSampleAccess(false, false) message failed. ret=%d", ret);  
735 - return ret;  
736 - }  
737 - srs_info("send |RtmpSampleAccess(false, false) message success.");  
738 - }  
739 -  
740 - // onStatus(NetStream.Data.Start)  
741 - if (true) {  
742 - SrsCommonMessage* msg = new SrsCommonMessage();  
743 - SrsOnStatusDataPacket* pkt = new SrsOnStatusDataPacket();  
744 -  
745 - pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeDataStart));  
746 -  
747 - msg->set_packet(pkt, stream_id);  
748 -  
749 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
750 - srs_error("send onStatus(NetStream.Data.Start) message failed. ret=%d", ret);  
751 - return ret;  
752 - }  
753 - srs_info("send onStatus(NetStream.Data.Start) message success.");  
754 - }  
755 -  
756 - srs_info("start play success.");  
757 -  
758 - return ret;  
759 -}  
760 -  
761 -int SrsRtmp::on_play_client_pause(int stream_id, bool is_pause)  
762 -{  
763 - int ret = ERROR_SUCCESS;  
764 -  
765 - if (is_pause) {  
766 - // onStatus(NetStream.Pause.Notify)  
767 - if (true) {  
768 - SrsCommonMessage* msg = new SrsCommonMessage();  
769 - SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();  
770 -  
771 - pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));  
772 - pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeStreamPause));  
773 - pkt->data->set(StatusDescription, new SrsAmf0String("Paused stream."));  
774 -  
775 - msg->set_packet(pkt, stream_id);  
776 -  
777 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
778 - srs_error("send onStatus(NetStream.Pause.Notify) message failed. ret=%d", ret);  
779 - return ret;  
780 - }  
781 - srs_info("send onStatus(NetStream.Pause.Notify) message success.");  
782 - }  
783 - // StreamEOF  
784 - if (true) {  
785 - SrsCommonMessage* msg = new SrsCommonMessage();  
786 - SrsUserControlPacket* pkt = new SrsUserControlPacket();  
787 -  
788 - pkt->event_type = SrcPCUCStreamEOF;  
789 - pkt->event_data = stream_id;  
790 - msg->set_packet(pkt, 0);  
791 -  
792 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
793 - srs_error("send PCUC(StreamEOF) message failed. ret=%d", ret);  
794 - return ret;  
795 - }  
796 - srs_info("send PCUC(StreamEOF) message success.");  
797 - }  
798 - } else {  
799 - // onStatus(NetStream.Unpause.Notify)  
800 - if (true) {  
801 - SrsCommonMessage* msg = new SrsCommonMessage();  
802 - SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();  
803 -  
804 - pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));  
805 - pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeStreamUnpause));  
806 - pkt->data->set(StatusDescription, new SrsAmf0String("Unpaused stream."));  
807 -  
808 - msg->set_packet(pkt, stream_id);  
809 -  
810 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
811 - srs_error("send onStatus(NetStream.Unpause.Notify) message failed. ret=%d", ret);  
812 - return ret;  
813 - }  
814 - srs_info("send onStatus(NetStream.Unpause.Notify) message success.");  
815 - }  
816 - // StreanBegin  
817 - if (true) {  
818 - SrsCommonMessage* msg = new SrsCommonMessage();  
819 - SrsUserControlPacket* pkt = new SrsUserControlPacket();  
820 -  
821 - pkt->event_type = SrcPCUCStreamBegin;  
822 - pkt->event_data = stream_id;  
823 - msg->set_packet(pkt, 0);  
824 -  
825 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
826 - srs_error("send PCUC(StreanBegin) message failed. ret=%d", ret);  
827 - return ret;  
828 - }  
829 - srs_info("send PCUC(StreanBegin) message success.");  
830 - }  
831 - }  
832 -  
833 - return ret;  
834 -}  
835 -  
836 -int SrsRtmp::start_fmle_publish(int stream_id)  
837 -{  
838 - int ret = ERROR_SUCCESS;  
839 -  
840 - // FCPublish  
841 - double fc_publish_tid = 0;  
842 - if (true) {  
843 - SrsCommonMessage* msg = NULL;  
844 - SrsFMLEStartPacket* pkt = NULL;  
845 - if ((ret = srs_rtmp_expect_message<SrsFMLEStartPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {  
846 - srs_error("recv FCPublish message failed. ret=%d", ret);  
847 - return ret;  
848 - }  
849 - srs_info("recv FCPublish request message success.");  
850 -  
851 - SrsAutoFree(SrsCommonMessage, msg, false);  
852 - fc_publish_tid = pkt->transaction_id;  
853 - }  
854 - // FCPublish response  
855 - if (true) {  
856 - SrsCommonMessage* msg = new SrsCommonMessage();  
857 - SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(fc_publish_tid);  
858 -  
859 - msg->set_packet(pkt, 0);  
860 -  
861 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
862 - srs_error("send FCPublish response message failed. ret=%d", ret);  
863 - return ret;  
864 - }  
865 - srs_info("send FCPublish response message success.");  
866 - }  
867 -  
868 - // createStream  
869 - double create_stream_tid = 0;  
870 - if (true) {  
871 - SrsCommonMessage* msg = NULL;  
872 - SrsCreateStreamPacket* pkt = NULL;  
873 - if ((ret = srs_rtmp_expect_message<SrsCreateStreamPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {  
874 - srs_error("recv createStream message failed. ret=%d", ret);  
875 - return ret;  
876 - }  
877 - srs_info("recv createStream request message success.");  
878 -  
879 - SrsAutoFree(SrsCommonMessage, msg, false);  
880 - create_stream_tid = pkt->transaction_id;  
881 - }  
882 - // createStream response  
883 - if (true) {  
884 - SrsCommonMessage* msg = new SrsCommonMessage();  
885 - SrsCreateStreamResPacket* pkt = new SrsCreateStreamResPacket(create_stream_tid, stream_id);  
886 -  
887 - msg->set_packet(pkt, 0);  
888 -  
889 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
890 - srs_error("send createStream response message failed. ret=%d", ret);  
891 - return ret;  
892 - }  
893 - srs_info("send createStream response message success.");  
894 - }  
895 -  
896 - // publish  
897 - if (true) {  
898 - SrsCommonMessage* msg = NULL;  
899 - SrsPublishPacket* pkt = NULL;  
900 - if ((ret = srs_rtmp_expect_message<SrsPublishPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {  
901 - srs_error("recv publish message failed. ret=%d", ret);  
902 - return ret;  
903 - }  
904 - srs_info("recv publish request message success.");  
905 -  
906 - SrsAutoFree(SrsCommonMessage, msg, false);  
907 - }  
908 - // publish response onFCPublish(NetStream.Publish.Start)  
909 - if (true) {  
910 - SrsCommonMessage* msg = new SrsCommonMessage();  
911 - SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();  
912 -  
913 - pkt->command_name = RTMP_AMF0_COMMAND_ON_FC_PUBLISH;  
914 - pkt->data->set(StatusCode, new SrsAmf0String(StatusCodePublishStart));  
915 - pkt->data->set(StatusDescription, new SrsAmf0String("Started publishing stream."));  
916 -  
917 - msg->set_packet(pkt, stream_id);  
918 -  
919 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
920 - srs_error("send onFCPublish(NetStream.Publish.Start) message failed. ret=%d", ret);  
921 - return ret;  
922 - }  
923 - srs_info("send onFCPublish(NetStream.Publish.Start) message success.");  
924 - }  
925 - // publish response onStatus(NetStream.Publish.Start)  
926 - if (true) {  
927 - SrsCommonMessage* msg = new SrsCommonMessage();  
928 - SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();  
929 -  
930 - pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));  
931 - pkt->data->set(StatusCode, new SrsAmf0String(StatusCodePublishStart));  
932 - pkt->data->set(StatusDescription, new SrsAmf0String("Started publishing stream."));  
933 - pkt->data->set(StatusClientId, new SrsAmf0String(RTMP_SIG_CLIENT_ID));  
934 -  
935 - msg->set_packet(pkt, stream_id);  
936 -  
937 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
938 - srs_error("send onStatus(NetStream.Publish.Start) message failed. ret=%d", ret);  
939 - return ret;  
940 - }  
941 - srs_info("send onStatus(NetStream.Publish.Start) message success.");  
942 - }  
943 -  
944 - srs_info("FMLE publish success.");  
945 -  
946 - return ret;  
947 -}  
948 -  
949 -int SrsRtmp::fmle_unpublish(int stream_id, double unpublish_tid)  
950 -{  
951 - int ret = ERROR_SUCCESS;  
952 -  
953 - // publish response onFCUnpublish(NetStream.unpublish.Success)  
954 - if (true) {  
955 - SrsCommonMessage* msg = new SrsCommonMessage();  
956 - SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();  
957 -  
958 - pkt->command_name = RTMP_AMF0_COMMAND_ON_FC_UNPUBLISH;  
959 - pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeUnpublishSuccess));  
960 - pkt->data->set(StatusDescription, new SrsAmf0String("Stop publishing stream."));  
961 -  
962 - msg->set_packet(pkt, stream_id);  
963 -  
964 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
965 - srs_error("send onFCUnpublish(NetStream.unpublish.Success) message failed. ret=%d", ret);  
966 - return ret;  
967 - }  
968 - srs_info("send onFCUnpublish(NetStream.unpublish.Success) message success.");  
969 - }  
970 - // FCUnpublish response  
971 - if (true) {  
972 - SrsCommonMessage* msg = new SrsCommonMessage();  
973 - SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(unpublish_tid);  
974 -  
975 - msg->set_packet(pkt, stream_id);  
976 -  
977 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
978 - srs_error("send FCUnpublish response message failed. ret=%d", ret);  
979 - return ret;  
980 - }  
981 - srs_info("send FCUnpublish response message success.");  
982 - }  
983 - // publish response onStatus(NetStream.Unpublish.Success)  
984 - if (true) {  
985 - SrsCommonMessage* msg = new SrsCommonMessage();  
986 - SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();  
987 -  
988 - pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));  
989 - pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeUnpublishSuccess));  
990 - pkt->data->set(StatusDescription, new SrsAmf0String("Stream is now unpublished"));  
991 - pkt->data->set(StatusClientId, new SrsAmf0String(RTMP_SIG_CLIENT_ID));  
992 -  
993 - msg->set_packet(pkt, stream_id);  
994 -  
995 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
996 - srs_error("send onStatus(NetStream.Unpublish.Success) message failed. ret=%d", ret);  
997 - return ret;  
998 - }  
999 - srs_info("send onStatus(NetStream.Unpublish.Success) message success.");  
1000 - }  
1001 -  
1002 - srs_info("FMLE unpublish success.");  
1003 -  
1004 - return ret;  
1005 -}  
1006 -  
1007 -int SrsRtmp::start_flash_publish(int stream_id)  
1008 -{  
1009 - int ret = ERROR_SUCCESS;  
1010 -  
1011 - // publish response onStatus(NetStream.Publish.Start)  
1012 - if (true) {  
1013 - SrsCommonMessage* msg = new SrsCommonMessage();  
1014 - SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();  
1015 -  
1016 - pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));  
1017 - pkt->data->set(StatusCode, new SrsAmf0String(StatusCodePublishStart));  
1018 - pkt->data->set(StatusDescription, new SrsAmf0String("Started publishing stream."));  
1019 - pkt->data->set(StatusClientId, new SrsAmf0String(RTMP_SIG_CLIENT_ID));  
1020 -  
1021 - msg->set_packet(pkt, stream_id);  
1022 -  
1023 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
1024 - srs_error("send onStatus(NetStream.Publish.Start) message failed. ret=%d", ret);  
1025 - return ret;  
1026 - }  
1027 - srs_info("send onStatus(NetStream.Publish.Start) message success.");  
1028 - }  
1029 -  
1030 - srs_info("flash publish success.");  
1031 -  
1032 - return ret;  
1033 -}  
1034 -  
1035 -int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name)  
1036 -{  
1037 - int ret = ERROR_SUCCESS;  
1038 -  
1039 - if (true) {  
1040 - SrsCommonMessage* msg = new SrsCommonMessage();  
1041 - SrsCreateStreamResPacket* pkt = new SrsCreateStreamResPacket(req->transaction_id, stream_id);  
1042 -  
1043 - msg->set_packet(pkt, 0);  
1044 -  
1045 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
1046 - srs_error("send createStream response message failed. ret=%d", ret);  
1047 - return ret;  
1048 - }  
1049 - srs_info("send createStream response message success.");  
1050 - }  
1051 -  
1052 - while (true) {  
1053 - SrsCommonMessage* msg = NULL;  
1054 - if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {  
1055 - srs_error("recv identify client message failed. ret=%d", ret);  
1056 - return ret;  
1057 - }  
1058 -  
1059 - SrsAutoFree(SrsCommonMessage, msg, false);  
1060 -  
1061 - if (!msg->header.is_amf0_command() && !msg->header.is_amf3_command()) {  
1062 - srs_trace("identify ignore messages except "  
1063 - "AMF0/AMF3 command message. type=%#x", msg->header.message_type);  
1064 - continue;  
1065 - }  
1066 -  
1067 - if ((ret = msg->decode_packet(protocol)) != ERROR_SUCCESS) {  
1068 - srs_error("identify decode message failed. ret=%d", ret);  
1069 - return ret;  
1070 - }  
1071 -  
1072 - SrsPacket* pkt = msg->get_packet();  
1073 - if (dynamic_cast<SrsPlayPacket*>(pkt)) {  
1074 - SrsPlayPacket* play = dynamic_cast<SrsPlayPacket*>(pkt);  
1075 - type = SrsClientPlay;  
1076 - stream_name = play->stream_name;  
1077 - srs_trace("identity client type=play, stream_name=%s", stream_name.c_str());  
1078 - return ret;  
1079 - }  
1080 - if (dynamic_cast<SrsPublishPacket*>(pkt)) {  
1081 - srs_info("identify client by publish, falsh publish.");  
1082 - return identify_flash_publish_client(  
1083 - dynamic_cast<SrsPublishPacket*>(pkt), type, stream_name);  
1084 - }  
1085 -  
1086 - srs_trace("ignore AMF0/AMF3 command message.");  
1087 - }  
1088 -  
1089 - return ret;  
1090 -}  
1091 -  
1092 -int SrsRtmp::identify_fmle_publish_client(SrsFMLEStartPacket* req, SrsClientType& type, std::string& stream_name)  
1093 -{  
1094 - int ret = ERROR_SUCCESS;  
1095 -  
1096 - type = SrsClientFMLEPublish;  
1097 - stream_name = req->stream_name;  
1098 -  
1099 - // releaseStream response  
1100 - if (true) {  
1101 - SrsCommonMessage* msg = new SrsCommonMessage();  
1102 - SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(req->transaction_id);  
1103 -  
1104 - msg->set_packet(pkt, 0);  
1105 -  
1106 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
1107 - srs_error("send releaseStream response message failed. ret=%d", ret);  
1108 - return ret;  
1109 - }  
1110 - srs_info("send releaseStream response message success.");  
1111 - }  
1112 -  
1113 - return ret;  
1114 -}  
1115 -  
1116 -int SrsRtmp::identify_flash_publish_client(SrsPublishPacket* req, SrsClientType& type, std::string& stream_name)  
1117 -{  
1118 - int ret = ERROR_SUCCESS;  
1119 -  
1120 - type = SrsClientFlashPublish;  
1121 - stream_name = req->stream_name;  
1122 -  
1123 - return ret;  
1124 -}  
1125 - 1 +/*
  2 +The MIT License (MIT)
  3 +
  4 +Copyright (c) 2013 winlin
  5 +
  6 +Permission is hereby granted, free of charge, to any person obtaining a copy of
  7 +this software and associated documentation files (the "Software"), to deal in
  8 +the Software without restriction, including without limitation the rights to
  9 +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
  10 +the Software, and to permit persons to whom the Software is furnished to do so,
  11 +subject to the following conditions:
  12 +
  13 +The above copyright notice and this permission notice shall be included in all
  14 +copies or substantial portions of the Software.
  15 +
  16 +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17 +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
  18 +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
  19 +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
  20 +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  21 +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  22 +*/
  23 +
  24 +#include <srs_core_rtmp.hpp>
  25 +
  26 +#include <srs_core_log.hpp>
  27 +#include <srs_core_error.hpp>
  28 +#include <srs_core_socket.hpp>
  29 +#include <srs_core_protocol.hpp>
  30 +#include <srs_core_autofree.hpp>
  31 +#include <srs_core_amf0.hpp>
  32 +#include <srs_core_handshake.hpp>
  33 +#include <srs_core_config.hpp>
  34 +
  35 +/**
  36 +* the signature for packets to client.
  37 +*/
  38 +#define RTMP_SIG_FMS_VER "3,5,3,888"
  39 +#define RTMP_SIG_AMF0_VER 0
  40 +#define RTMP_SIG_CLIENT_ID "ASAICiss"
  41 +
  42 +/**
  43 +* onStatus consts.
  44 +*/
  45 +#define StatusLevel "level"
  46 +#define StatusCode "code"
  47 +#define StatusDescription "description"
  48 +#define StatusDetails "details"
  49 +#define StatusClientId "clientid"
  50 +// status value
  51 +#define StatusLevelStatus "status"
  52 +// code value
  53 +#define StatusCodeConnectSuccess "NetConnection.Connect.Success"
  54 +#define StatusCodeStreamReset "NetStream.Play.Reset"
  55 +#define StatusCodeStreamStart "NetStream.Play.Start"
  56 +#define StatusCodeStreamPause "NetStream.Pause.Notify"
  57 +#define StatusCodeStreamUnpause "NetStream.Unpause.Notify"
  58 +#define StatusCodePublishStart "NetStream.Publish.Start"
  59 +#define StatusCodeDataStart "NetStream.Data.Start"
  60 +#define StatusCodeUnpublishSuccess "NetStream.Unpublish.Success"
  61 +
  62 +// FMLE
  63 +#define RTMP_AMF0_COMMAND_ON_FC_PUBLISH "onFCPublish"
  64 +#define RTMP_AMF0_COMMAND_ON_FC_UNPUBLISH "onFCUnpublish"
  65 +
  66 +// default stream id for response the createStream request.
  67 +#define SRS_DEFAULT_SID 1
  68 +
  69 +SrsRequest::SrsRequest()
  70 +{
  71 + objectEncoding = RTMP_SIG_AMF0_VER;
  72 +}
  73 +
  74 +SrsRequest::~SrsRequest()
  75 +{
  76 +}
  77 +
  78 +int SrsRequest::discovery_app()
  79 +{
  80 + int ret = ERROR_SUCCESS;
  81 +
  82 + size_t pos = std::string::npos;
  83 + std::string url = tcUrl;
  84 +
  85 + if ((pos = url.find("://")) != std::string::npos) {
  86 + schema = url.substr(0, pos);
  87 + url = url.substr(schema.length() + 3);
  88 + srs_verbose("discovery schema=%s", schema.c_str());
  89 + }
  90 +
  91 + if ((pos = url.find("/")) != std::string::npos) {
  92 + vhost = url.substr(0, pos);
  93 + url = url.substr(vhost.length() + 1);
  94 + srs_verbose("discovery vhost=%s", vhost.c_str());
  95 + }
  96 +
  97 + port = RTMP_DEFAULT_PORTS;
  98 + if ((pos = vhost.find(":")) != std::string::npos) {
  99 + port = vhost.substr(pos + 1);
  100 + vhost = vhost.substr(0, pos);
  101 + srs_verbose("discovery vhost=%s, port=%s", vhost.c_str(), port.c_str());
  102 + }
  103 +
  104 + app = url;
  105 + srs_vhost_resolve(vhost, app);
  106 +
  107 + // resolve the vhost from config
  108 + SrsConfDirective* parsed_vhost = config->get_vhost(vhost);
  109 + if (parsed_vhost) {
  110 + vhost = parsed_vhost->arg0();
  111 + }
  112 +
  113 + // TODO: discovery the params of vhost.
  114 +
  115 + srs_info("discovery app success. schema=%s, vhost=%s, port=%s, app=%s",
  116 + schema.c_str(), vhost.c_str(), port.c_str(), app.c_str());
  117 +
  118 + if (schema.empty() || vhost.empty() || port.empty() || app.empty()) {
  119 + ret = ERROR_RTMP_REQ_TCURL;
  120 + srs_error("discovery tcUrl failed. "
  121 + "tcUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, ret=%d",
  122 + tcUrl.c_str(), schema.c_str(), vhost.c_str(), port.c_str(), app.c_str(), ret);
  123 + return ret;
  124 + }
  125 +
  126 + strip();
  127 +
  128 + return ret;
  129 +}
  130 +
  131 +std::string SrsRequest::get_stream_url()
  132 +{
  133 + std::string url = "";
  134 +
  135 + url += vhost;
  136 + url += "/";
  137 + url += app;
  138 + url += "/";
  139 + url += stream;
  140 +
  141 + return url;
  142 +}
  143 +
  144 +void SrsRequest::strip()
  145 +{
  146 + trim(vhost, "/ \n\r\t");
  147 + trim(app, "/ \n\r\t");
  148 + trim(stream, "/ \n\r\t");
  149 +}
  150 +
  151 +std::string& SrsRequest::trim(std::string& str, std::string chs)
  152 +{
  153 + for (int i = 0; i < (int)chs.length(); i++) {
  154 + char ch = chs.at(i);
  155 +
  156 + for (std::string::iterator it = str.begin(); it != str.end();) {
  157 + if (ch == *it) {
  158 + it = str.erase(it);
  159 + } else {
  160 + ++it;
  161 + }
  162 + }
  163 + }
  164 +
  165 + return str;
  166 +}
  167 +
  168 +SrsResponse::SrsResponse()
  169 +{
  170 + stream_id = SRS_DEFAULT_SID;
  171 +}
  172 +
  173 +SrsResponse::~SrsResponse()
  174 +{
  175 +}
  176 +
  177 +SrsRtmpClient::SrsRtmpClient(st_netfd_t _stfd)
  178 +{
  179 + stfd = _stfd;
  180 + protocol = new SrsProtocol(stfd);
  181 +}
  182 +
  183 +SrsRtmpClient::~SrsRtmpClient()
  184 +{
  185 + srs_freep(protocol);
  186 +}
  187 +
  188 +void SrsRtmpClient::set_recv_timeout(int64_t timeout_us)
  189 +{
  190 + protocol->set_recv_timeout(timeout_us);
  191 +}
  192 +
  193 +void SrsRtmpClient::set_send_timeout(int64_t timeout_us)
  194 +{
  195 + protocol->set_send_timeout(timeout_us);
  196 +}
  197 +
  198 +int64_t SrsRtmpClient::get_recv_bytes()
  199 +{
  200 + return protocol->get_recv_bytes();
  201 +}
  202 +
  203 +int64_t SrsRtmpClient::get_send_bytes()
  204 +{
  205 + return protocol->get_send_bytes();
  206 +}
  207 +
  208 +int SrsRtmpClient::get_recv_kbps()
  209 +{
  210 + return protocol->get_recv_kbps();
  211 +}
  212 +
  213 +int SrsRtmpClient::get_send_kbps()
  214 +{
  215 + return protocol->get_send_kbps();
  216 +}
  217 +
  218 +int SrsRtmpClient::recv_message(SrsCommonMessage** pmsg)
  219 +{
  220 + return protocol->recv_message(pmsg);
  221 +}
  222 +
  223 +int SrsRtmpClient::send_message(ISrsMessage* msg)
  224 +{
  225 + return protocol->send_message(msg);
  226 +}
  227 +
  228 +int SrsRtmpClient::handshake()
  229 +{
  230 + int ret = ERROR_SUCCESS;
  231 +
  232 + SrsSocket skt(stfd);
  233 +
  234 + SrsComplexHandshake complex_hs;
  235 + SrsSimpleHandshake simple_hs;
  236 + if ((ret = simple_hs.handshake_with_server(skt, complex_hs)) != ERROR_SUCCESS) {
  237 + return ret;
  238 + }
  239 +
  240 + return ret;
  241 +}
  242 +
  243 +int SrsRtmpClient::connect_app(std::string app, std::string tc_url)
  244 +{
  245 + int ret = ERROR_SUCCESS;
  246 +
  247 + // Connect(vhost, app)
  248 + if (true) {
  249 + SrsCommonMessage* msg = new SrsCommonMessage();
  250 + SrsConnectAppPacket* pkt = new SrsConnectAppPacket();
  251 + msg->set_packet(pkt, 0);
  252 +
  253 + pkt->command_object = new SrsAmf0Object();
  254 + pkt->command_object->set("app", new SrsAmf0String(app.c_str()));
  255 + pkt->command_object->set("swfUrl", new SrsAmf0String());
  256 + pkt->command_object->set("tcUrl", new SrsAmf0String(tc_url.c_str()));
  257 + pkt->command_object->set("fpad", new SrsAmf0Boolean(false));
  258 + pkt->command_object->set("capabilities", new SrsAmf0Number(239));
  259 + pkt->command_object->set("audioCodecs", new SrsAmf0Number(3575));
  260 + pkt->command_object->set("videoCodecs", new SrsAmf0Number(252));
  261 + pkt->command_object->set("videoFunction", new SrsAmf0Number(1));
  262 + pkt->command_object->set("pageUrl", new SrsAmf0String());
  263 + pkt->command_object->set("objectEncoding", new SrsAmf0Number(0));
  264 +
  265 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  266 + return ret;
  267 + }
  268 + }
  269 +
  270 + // Set Window Acknowledgement size(2500000)
  271 + if (true) {
  272 + SrsCommonMessage* msg = new SrsCommonMessage();
  273 + SrsSetWindowAckSizePacket* pkt = new SrsSetWindowAckSizePacket();
  274 +
  275 + pkt->ackowledgement_window_size = 2500000;
  276 + msg->set_packet(pkt, 0);
  277 +
  278 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  279 + return ret;
  280 + }
  281 + }
  282 +
  283 + // expect connect _result
  284 + SrsCommonMessage* msg = NULL;
  285 + SrsConnectAppResPacket* pkt = NULL;
  286 + if ((ret = srs_rtmp_expect_message<SrsConnectAppResPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
  287 + srs_error("expect connect app response message failed. ret=%d", ret);
  288 + return ret;
  289 + }
  290 + SrsAutoFree(SrsCommonMessage, msg, false);
  291 + srs_info("get connect app response message");
  292 +
  293 + return ret;
  294 +}
  295 +
  296 +int SrsRtmpClient::create_stream(int& stream_id)
  297 +{
  298 + int ret = ERROR_SUCCESS;
  299 +
  300 + // CreateStream
  301 + if (true) {
  302 + SrsCommonMessage* msg = new SrsCommonMessage();
  303 + SrsCreateStreamPacket* pkt = new SrsCreateStreamPacket();
  304 +
  305 + msg->set_packet(pkt, 0);
  306 +
  307 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  308 + return ret;
  309 + }
  310 + }
  311 +
  312 + // CreateStream _result.
  313 + if (true) {
  314 + SrsCommonMessage* msg = NULL;
  315 + SrsCreateStreamResPacket* pkt = NULL;
  316 + if ((ret = srs_rtmp_expect_message<SrsCreateStreamResPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
  317 + srs_error("expect create stream response message failed. ret=%d", ret);
  318 + return ret;
  319 + }
  320 + SrsAutoFree(SrsCommonMessage, msg, false);
  321 + srs_info("get create stream response message");
  322 +
  323 + stream_id = (int)pkt->stream_id;
  324 + }
  325 +
  326 + return ret;
  327 +}
  328 +
  329 +int SrsRtmpClient::play(std::string stream, int stream_id)
  330 +{
  331 + int ret = ERROR_SUCCESS;
  332 +
  333 + // Play(stream)
  334 + if (true) {
  335 + SrsCommonMessage* msg = new SrsCommonMessage();
  336 + SrsPlayPacket* pkt = new SrsPlayPacket();
  337 +
  338 + pkt->stream_name = stream;
  339 + msg->set_packet(pkt, stream_id);
  340 +
  341 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  342 + srs_error("send play stream failed. "
  343 + "stream=%s, stream_id=%d, ret=%d",
  344 + stream.c_str(), stream_id, ret);
  345 + return ret;
  346 + }
  347 + }
  348 +
  349 + // SetBufferLength(1000ms)
  350 + int buffer_length_ms = 1000;
  351 + if (true) {
  352 + SrsCommonMessage* msg = new SrsCommonMessage();
  353 + SrsUserControlPacket* pkt = new SrsUserControlPacket();
  354 +
  355 + pkt->event_type = SrcPCUCSetBufferLength;
  356 + pkt->event_data = stream_id;
  357 + pkt->extra_data = buffer_length_ms;
  358 + msg->set_packet(pkt, 0);
  359 +
  360 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  361 + srs_error("send set buffer length failed. "
  362 + "stream=%s, stream_id=%d, bufferLength=%d, ret=%d",
  363 + stream.c_str(), stream_id, buffer_length_ms, ret);
  364 + return ret;
  365 + }
  366 + }
  367 +
  368 + return ret;
  369 +}
  370 +
  371 +int SrsRtmpClient::publish(std::string stream, int stream_id)
  372 +{
  373 + int ret = ERROR_SUCCESS;
  374 +
  375 + // publish(stream)
  376 + if (true) {
  377 + SrsCommonMessage* msg = new SrsCommonMessage();
  378 + SrsPublishPacket* pkt = new SrsPublishPacket();
  379 +
  380 + pkt->stream_name = stream;
  381 + msg->set_packet(pkt, stream_id);
  382 +
  383 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  384 + srs_error("send publish message failed. "
  385 + "stream=%s, stream_id=%d, ret=%d",
  386 + stream.c_str(), stream_id, ret);
  387 + return ret;
  388 + }
  389 + }
  390 +
  391 + return ret;
  392 +}
  393 +
  394 +SrsRtmp::SrsRtmp(st_netfd_t client_stfd)
  395 +{
  396 + protocol = new SrsProtocol(client_stfd);
  397 + stfd = client_stfd;
  398 +}
  399 +
  400 +SrsRtmp::~SrsRtmp()
  401 +{
  402 + srs_freep(protocol);
  403 +}
  404 +
  405 +SrsProtocol* SrsRtmp::get_protocol()
  406 +{
  407 + return protocol;
  408 +}
  409 +
  410 +void SrsRtmp::set_recv_timeout(int64_t timeout_us)
  411 +{
  412 + protocol->set_recv_timeout(timeout_us);
  413 +}
  414 +
  415 +int64_t SrsRtmp::get_recv_timeout()
  416 +{
  417 + return protocol->get_recv_timeout();
  418 +}
  419 +
  420 +void SrsRtmp::set_send_timeout(int64_t timeout_us)
  421 +{
  422 + protocol->set_send_timeout(timeout_us);
  423 +}
  424 +
  425 +int64_t SrsRtmp::get_recv_bytes()
  426 +{
  427 + return protocol->get_recv_bytes();
  428 +}
  429 +
  430 +int64_t SrsRtmp::get_send_bytes()
  431 +{
  432 + return protocol->get_send_bytes();
  433 +}
  434 +
  435 +int SrsRtmp::get_recv_kbps()
  436 +{
  437 + return protocol->get_recv_kbps();
  438 +}
  439 +
  440 +int SrsRtmp::get_send_kbps()
  441 +{
  442 + return protocol->get_send_kbps();
  443 +}
  444 +
  445 +int SrsRtmp::recv_message(SrsCommonMessage** pmsg)
  446 +{
  447 + return protocol->recv_message(pmsg);
  448 +}
  449 +
  450 +int SrsRtmp::send_message(ISrsMessage* msg)
  451 +{
  452 + return protocol->send_message(msg);
  453 +}
  454 +
  455 +int SrsRtmp::handshake()
  456 +{
  457 + int ret = ERROR_SUCCESS;
  458 +
  459 + SrsSocket skt(stfd);
  460 +
  461 + SrsComplexHandshake complex_hs;
  462 + SrsSimpleHandshake simple_hs;
  463 + if ((ret = simple_hs.handshake_with_client(skt, complex_hs)) != ERROR_SUCCESS) {
  464 + return ret;
  465 + }
  466 +
  467 + return ret;
  468 +}
  469 +
  470 +int SrsRtmp::connect_app(SrsRequest* req)
  471 +{
  472 + int ret = ERROR_SUCCESS;
  473 +
  474 + SrsCommonMessage* msg = NULL;
  475 + SrsConnectAppPacket* pkt = NULL;
  476 + if ((ret = srs_rtmp_expect_message<SrsConnectAppPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
  477 + srs_error("expect connect app message failed. ret=%d", ret);
  478 + return ret;
  479 + }
  480 + SrsAutoFree(SrsCommonMessage, msg, false);
  481 + srs_info("get connect app message");
  482 +
  483 + SrsAmf0Any* prop = NULL;
  484 +
  485 + if ((prop = pkt->command_object->ensure_property_string("tcUrl")) == NULL) {
  486 + ret = ERROR_RTMP_REQ_CONNECT;
  487 + srs_error("invalid request, must specifies the tcUrl. ret=%d", ret);
  488 + return ret;
  489 + }
  490 + req->tcUrl = srs_amf0_convert<SrsAmf0String>(prop)->value;
  491 +
  492 + if ((prop = pkt->command_object->ensure_property_string("pageUrl")) != NULL) {
  493 + req->pageUrl = srs_amf0_convert<SrsAmf0String>(prop)->value;
  494 + }
  495 +
  496 + if ((prop = pkt->command_object->ensure_property_string("swfUrl")) != NULL) {
  497 + req->swfUrl = srs_amf0_convert<SrsAmf0String>(prop)->value;
  498 + }
  499 +
  500 + if ((prop = pkt->command_object->ensure_property_number("objectEncoding")) != NULL) {
  501 + req->objectEncoding = srs_amf0_convert<SrsAmf0Number>(prop)->value;
  502 + }
  503 +
  504 + srs_info("get connect app message params success.");
  505 +
  506 + return req->discovery_app();
  507 +}
  508 +
  509 +int SrsRtmp::set_window_ack_size(int ack_size)
  510 +{
  511 + int ret = ERROR_SUCCESS;
  512 +
  513 + SrsCommonMessage* msg = new SrsCommonMessage();
  514 + SrsSetWindowAckSizePacket* pkt = new SrsSetWindowAckSizePacket();
  515 +
  516 + pkt->ackowledgement_window_size = ack_size;
  517 + msg->set_packet(pkt, 0);
  518 +
  519 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  520 + srs_error("send ack size message failed. ret=%d", ret);
  521 + return ret;
  522 + }
  523 + srs_info("send ack size message success. ack_size=%d", ack_size);
  524 +
  525 + return ret;
  526 +}
  527 +
  528 +int SrsRtmp::set_peer_bandwidth(int bandwidth, int type)
  529 +{
  530 + int ret = ERROR_SUCCESS;
  531 +
  532 + SrsCommonMessage* msg = new SrsCommonMessage();
  533 + SrsSetPeerBandwidthPacket* pkt = new SrsSetPeerBandwidthPacket();
  534 +
  535 + pkt->bandwidth = bandwidth;
  536 + pkt->type = type;
  537 + msg->set_packet(pkt, 0);
  538 +
  539 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  540 + srs_error("send set bandwidth message failed. ret=%d", ret);
  541 + return ret;
  542 + }
  543 + srs_info("send set bandwidth message "
  544 + "success. bandwidth=%d, type=%d", bandwidth, type);
  545 +
  546 + return ret;
  547 +}
  548 +
  549 +int SrsRtmp::response_connect_app(SrsRequest* req)
  550 +{
  551 + int ret = ERROR_SUCCESS;
  552 +
  553 + SrsCommonMessage* msg = new SrsCommonMessage();
  554 + SrsConnectAppResPacket* pkt = new SrsConnectAppResPacket();
  555 +
  556 + pkt->props->set("fmsVer", new SrsAmf0String("FMS/"RTMP_SIG_FMS_VER));
  557 + pkt->props->set("capabilities", new SrsAmf0Number(127));
  558 + pkt->props->set("mode", new SrsAmf0Number(1));
  559 +
  560 + pkt->info->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));
  561 + pkt->info->set(StatusCode, new SrsAmf0String(StatusCodeConnectSuccess));
  562 + pkt->info->set(StatusDescription, new SrsAmf0String("Connection succeeded"));
  563 + pkt->info->set("objectEncoding", new SrsAmf0Number(req->objectEncoding));
  564 + SrsASrsAmf0EcmaArray* data = new SrsASrsAmf0EcmaArray();
  565 + pkt->info->set("data", data);
  566 +
  567 + data->set("srs_version", new SrsAmf0String(RTMP_SIG_FMS_VER));
  568 + data->set("srs_server", new SrsAmf0String(RTMP_SIG_SRS_NAME));
  569 + data->set("srs_license", new SrsAmf0String(RTMP_SIG_SRS_LICENSE));
  570 + data->set("srs_role", new SrsAmf0String(RTMP_SIG_SRS_ROLE));
  571 + data->set("srs_url", new SrsAmf0String(RTMP_SIG_SRS_URL));
  572 + data->set("srs_version", new SrsAmf0String(RTMP_SIG_SRS_VERSION));
  573 + data->set("srs_site", new SrsAmf0String(RTMP_SIG_SRS_WEB));
  574 + data->set("srs_email", new SrsAmf0String(RTMP_SIG_SRS_EMAIL));
  575 + data->set("srs_copyright", new SrsAmf0String(RTMP_SIG_SRS_COPYRIGHT));
  576 +
  577 + msg->set_packet(pkt, 0);
  578 +
  579 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  580 + srs_error("send connect app response message failed. ret=%d", ret);
  581 + return ret;
  582 + }
  583 + srs_info("send connect app response message success.");
  584 +
  585 + return ret;
  586 +}
  587 +
  588 +int SrsRtmp::on_bw_done()
  589 +{
  590 + int ret = ERROR_SUCCESS;
  591 +
  592 + SrsCommonMessage* msg = new SrsCommonMessage();
  593 + SrsOnBWDonePacket* pkt = new SrsOnBWDonePacket();
  594 +
  595 + msg->set_packet(pkt, 0);
  596 +
  597 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  598 + srs_error("send onBWDone message failed. ret=%d", ret);
  599 + return ret;
  600 + }
  601 + srs_info("send onBWDone message success.");
  602 +
  603 + return ret;
  604 +}
  605 +
  606 +int SrsRtmp::identify_client(int stream_id, SrsClientType& type, std::string& stream_name)
  607 +{
  608 + type = SrsClientUnknown;
  609 + int ret = ERROR_SUCCESS;
  610 +
  611 + while (true) {
  612 + SrsCommonMessage* msg = NULL;
  613 + if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {
  614 + srs_error("recv identify client message failed. ret=%d", ret);
  615 + return ret;
  616 + }
  617 +
  618 + SrsAutoFree(SrsCommonMessage, msg, false);
  619 +
  620 + if (!msg->header.is_amf0_command() && !msg->header.is_amf3_command()) {
  621 + srs_trace("identify ignore messages except "
  622 + "AMF0/AMF3 command message. type=%#x", msg->header.message_type);
  623 + continue;
  624 + }
  625 +
  626 + if ((ret = msg->decode_packet(protocol)) != ERROR_SUCCESS) {
  627 + srs_error("identify decode message failed. ret=%d", ret);
  628 + return ret;
  629 + }
  630 +
  631 + SrsPacket* pkt = msg->get_packet();
  632 + if (dynamic_cast<SrsCreateStreamPacket*>(pkt)) {
  633 + srs_info("identify client by create stream, play or flash publish.");
  634 + return identify_create_stream_client(
  635 + dynamic_cast<SrsCreateStreamPacket*>(pkt), stream_id, type, stream_name);
  636 + }
  637 + if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) {
  638 + srs_info("identify client by releaseStream, fmle publish.");
  639 + return identify_fmle_publish_client(
  640 + dynamic_cast<SrsFMLEStartPacket*>(pkt), type, stream_name);
  641 + }
  642 +
  643 + srs_trace("ignore AMF0/AMF3 command message.");
  644 + }
  645 +
  646 + return ret;
  647 +}
  648 +
  649 +int SrsRtmp::set_chunk_size(int chunk_size)
  650 +{
  651 + int ret = ERROR_SUCCESS;
  652 +
  653 + SrsCommonMessage* msg = new SrsCommonMessage();
  654 + SrsSetChunkSizePacket* pkt = new SrsSetChunkSizePacket();
  655 +
  656 + pkt->chunk_size = chunk_size;
  657 + msg->set_packet(pkt, 0);
  658 +
  659 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  660 + srs_error("send set chunk size message failed. ret=%d", ret);
  661 + return ret;
  662 + }
  663 + srs_info("send set chunk size message success. chunk_size=%d", chunk_size);
  664 +
  665 + return ret;
  666 +}
  667 +
  668 +int SrsRtmp::start_play(int stream_id)
  669 +{
  670 + int ret = ERROR_SUCCESS;
  671 +
  672 + // StreamBegin
  673 + if (true) {
  674 + SrsCommonMessage* msg = new SrsCommonMessage();
  675 + SrsUserControlPacket* pkt = new SrsUserControlPacket();
  676 +
  677 + pkt->event_type = SrcPCUCStreamBegin;
  678 + pkt->event_data = stream_id;
  679 + msg->set_packet(pkt, 0);
  680 +
  681 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  682 + srs_error("send PCUC(StreamBegin) message failed. ret=%d", ret);
  683 + return ret;
  684 + }
  685 + srs_info("send PCUC(StreamBegin) message success.");
  686 + }
  687 +
  688 + // onStatus(NetStream.Play.Reset)
  689 + if (true) {
  690 + SrsCommonMessage* msg = new SrsCommonMessage();
  691 + SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
  692 +
  693 + pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));
  694 + pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeStreamReset));
  695 + pkt->data->set(StatusDescription, new SrsAmf0String("Playing and resetting stream."));
  696 + pkt->data->set(StatusDetails, new SrsAmf0String("stream"));
  697 + pkt->data->set(StatusClientId, new SrsAmf0String(RTMP_SIG_CLIENT_ID));
  698 +
  699 + msg->set_packet(pkt, stream_id);
  700 +
  701 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  702 + srs_error("send onStatus(NetStream.Play.Reset) message failed. ret=%d", ret);
  703 + return ret;
  704 + }
  705 + srs_info("send onStatus(NetStream.Play.Reset) message success.");
  706 + }
  707 +
  708 + // onStatus(NetStream.Play.Start)
  709 + if (true) {
  710 + SrsCommonMessage* msg = new SrsCommonMessage();
  711 + SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
  712 +
  713 + pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));
  714 + pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeStreamStart));
  715 + pkt->data->set(StatusDescription, new SrsAmf0String("Started playing stream."));
  716 + pkt->data->set(StatusDetails, new SrsAmf0String("stream"));
  717 + pkt->data->set(StatusClientId, new SrsAmf0String(RTMP_SIG_CLIENT_ID));
  718 +
  719 + msg->set_packet(pkt, stream_id);
  720 +
  721 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  722 + srs_error("send onStatus(NetStream.Play.Reset) message failed. ret=%d", ret);
  723 + return ret;
  724 + }
  725 + srs_info("send onStatus(NetStream.Play.Reset) message success.");
  726 + }
  727 +
  728 + // |RtmpSampleAccess(false, false)
  729 + if (true) {
  730 + SrsCommonMessage* msg = new SrsCommonMessage();
  731 + SrsSampleAccessPacket* pkt = new SrsSampleAccessPacket();
  732 +
  733 + msg->set_packet(pkt, stream_id);
  734 +
  735 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  736 + srs_error("send |RtmpSampleAccess(false, false) message failed. ret=%d", ret);
  737 + return ret;
  738 + }
  739 + srs_info("send |RtmpSampleAccess(false, false) message success.");
  740 + }
  741 +
  742 + // onStatus(NetStream.Data.Start)
  743 + if (true) {
  744 + SrsCommonMessage* msg = new SrsCommonMessage();
  745 + SrsOnStatusDataPacket* pkt = new SrsOnStatusDataPacket();
  746 +
  747 + pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeDataStart));
  748 +
  749 + msg->set_packet(pkt, stream_id);
  750 +
  751 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  752 + srs_error("send onStatus(NetStream.Data.Start) message failed. ret=%d", ret);
  753 + return ret;
  754 + }
  755 + srs_info("send onStatus(NetStream.Data.Start) message success.");
  756 + }
  757 +
  758 + srs_info("start play success.");
  759 +
  760 + return ret;
  761 +}
  762 +
  763 +int SrsRtmp::on_play_client_pause(int stream_id, bool is_pause)
  764 +{
  765 + int ret = ERROR_SUCCESS;
  766 +
  767 + if (is_pause) {
  768 + // onStatus(NetStream.Pause.Notify)
  769 + if (true) {
  770 + SrsCommonMessage* msg = new SrsCommonMessage();
  771 + SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
  772 +
  773 + pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));
  774 + pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeStreamPause));
  775 + pkt->data->set(StatusDescription, new SrsAmf0String("Paused stream."));
  776 +
  777 + msg->set_packet(pkt, stream_id);
  778 +
  779 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  780 + srs_error("send onStatus(NetStream.Pause.Notify) message failed. ret=%d", ret);
  781 + return ret;
  782 + }
  783 + srs_info("send onStatus(NetStream.Pause.Notify) message success.");
  784 + }
  785 + // StreamEOF
  786 + if (true) {
  787 + SrsCommonMessage* msg = new SrsCommonMessage();
  788 + SrsUserControlPacket* pkt = new SrsUserControlPacket();
  789 +
  790 + pkt->event_type = SrcPCUCStreamEOF;
  791 + pkt->event_data = stream_id;
  792 + msg->set_packet(pkt, 0);
  793 +
  794 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  795 + srs_error("send PCUC(StreamEOF) message failed. ret=%d", ret);
  796 + return ret;
  797 + }
  798 + srs_info("send PCUC(StreamEOF) message success.");
  799 + }
  800 + } else {
  801 + // onStatus(NetStream.Unpause.Notify)
  802 + if (true) {
  803 + SrsCommonMessage* msg = new SrsCommonMessage();
  804 + SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
  805 +
  806 + pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));
  807 + pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeStreamUnpause));
  808 + pkt->data->set(StatusDescription, new SrsAmf0String("Unpaused stream."));
  809 +
  810 + msg->set_packet(pkt, stream_id);
  811 +
  812 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  813 + srs_error("send onStatus(NetStream.Unpause.Notify) message failed. ret=%d", ret);
  814 + return ret;
  815 + }
  816 + srs_info("send onStatus(NetStream.Unpause.Notify) message success.");
  817 + }
  818 + // StreanBegin
  819 + if (true) {
  820 + SrsCommonMessage* msg = new SrsCommonMessage();
  821 + SrsUserControlPacket* pkt = new SrsUserControlPacket();
  822 +
  823 + pkt->event_type = SrcPCUCStreamBegin;
  824 + pkt->event_data = stream_id;
  825 + msg->set_packet(pkt, 0);
  826 +
  827 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  828 + srs_error("send PCUC(StreanBegin) message failed. ret=%d", ret);
  829 + return ret;
  830 + }
  831 + srs_info("send PCUC(StreanBegin) message success.");
  832 + }
  833 + }
  834 +
  835 + return ret;
  836 +}
  837 +
  838 +int SrsRtmp::start_fmle_publish(int stream_id)
  839 +{
  840 + int ret = ERROR_SUCCESS;
  841 +
  842 + // FCPublish
  843 + double fc_publish_tid = 0;
  844 + if (true) {
  845 + SrsCommonMessage* msg = NULL;
  846 + SrsFMLEStartPacket* pkt = NULL;
  847 + if ((ret = srs_rtmp_expect_message<SrsFMLEStartPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
  848 + srs_error("recv FCPublish message failed. ret=%d", ret);
  849 + return ret;
  850 + }
  851 + srs_info("recv FCPublish request message success.");
  852 +
  853 + SrsAutoFree(SrsCommonMessage, msg, false);
  854 + fc_publish_tid = pkt->transaction_id;
  855 + }
  856 + // FCPublish response
  857 + if (true) {
  858 + SrsCommonMessage* msg = new SrsCommonMessage();
  859 + SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(fc_publish_tid);
  860 +
  861 + msg->set_packet(pkt, 0);
  862 +
  863 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  864 + srs_error("send FCPublish response message failed. ret=%d", ret);
  865 + return ret;
  866 + }
  867 + srs_info("send FCPublish response message success.");
  868 + }
  869 +
  870 + // createStream
  871 + double create_stream_tid = 0;
  872 + if (true) {
  873 + SrsCommonMessage* msg = NULL;
  874 + SrsCreateStreamPacket* pkt = NULL;
  875 + if ((ret = srs_rtmp_expect_message<SrsCreateStreamPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
  876 + srs_error("recv createStream message failed. ret=%d", ret);
  877 + return ret;
  878 + }
  879 + srs_info("recv createStream request message success.");
  880 +
  881 + SrsAutoFree(SrsCommonMessage, msg, false);
  882 + create_stream_tid = pkt->transaction_id;
  883 + }
  884 + // createStream response
  885 + if (true) {
  886 + SrsCommonMessage* msg = new SrsCommonMessage();
  887 + SrsCreateStreamResPacket* pkt = new SrsCreateStreamResPacket(create_stream_tid, stream_id);
  888 +
  889 + msg->set_packet(pkt, 0);
  890 +
  891 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  892 + srs_error("send createStream response message failed. ret=%d", ret);
  893 + return ret;
  894 + }
  895 + srs_info("send createStream response message success.");
  896 + }
  897 +
  898 + // publish
  899 + if (true) {
  900 + SrsCommonMessage* msg = NULL;
  901 + SrsPublishPacket* pkt = NULL;
  902 + if ((ret = srs_rtmp_expect_message<SrsPublishPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
  903 + srs_error("recv publish message failed. ret=%d", ret);
  904 + return ret;
  905 + }
  906 + srs_info("recv publish request message success.");
  907 +
  908 + SrsAutoFree(SrsCommonMessage, msg, false);
  909 + }
  910 + // publish response onFCPublish(NetStream.Publish.Start)
  911 + if (true) {
  912 + SrsCommonMessage* msg = new SrsCommonMessage();
  913 + SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
  914 +
  915 + pkt->command_name = RTMP_AMF0_COMMAND_ON_FC_PUBLISH;
  916 + pkt->data->set(StatusCode, new SrsAmf0String(StatusCodePublishStart));
  917 + pkt->data->set(StatusDescription, new SrsAmf0String("Started publishing stream."));
  918 +
  919 + msg->set_packet(pkt, stream_id);
  920 +
  921 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  922 + srs_error("send onFCPublish(NetStream.Publish.Start) message failed. ret=%d", ret);
  923 + return ret;
  924 + }
  925 + srs_info("send onFCPublish(NetStream.Publish.Start) message success.");
  926 + }
  927 + // publish response onStatus(NetStream.Publish.Start)
  928 + if (true) {
  929 + SrsCommonMessage* msg = new SrsCommonMessage();
  930 + SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
  931 +
  932 + pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));
  933 + pkt->data->set(StatusCode, new SrsAmf0String(StatusCodePublishStart));
  934 + pkt->data->set(StatusDescription, new SrsAmf0String("Started publishing stream."));
  935 + pkt->data->set(StatusClientId, new SrsAmf0String(RTMP_SIG_CLIENT_ID));
  936 +
  937 + msg->set_packet(pkt, stream_id);
  938 +
  939 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  940 + srs_error("send onStatus(NetStream.Publish.Start) message failed. ret=%d", ret);
  941 + return ret;
  942 + }
  943 + srs_info("send onStatus(NetStream.Publish.Start) message success.");
  944 + }
  945 +
  946 + srs_info("FMLE publish success.");
  947 +
  948 + return ret;
  949 +}
  950 +
  951 +int SrsRtmp::fmle_unpublish(int stream_id, double unpublish_tid)
  952 +{
  953 + int ret = ERROR_SUCCESS;
  954 +
  955 + // publish response onFCUnpublish(NetStream.unpublish.Success)
  956 + if (true) {
  957 + SrsCommonMessage* msg = new SrsCommonMessage();
  958 + SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
  959 +
  960 + pkt->command_name = RTMP_AMF0_COMMAND_ON_FC_UNPUBLISH;
  961 + pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeUnpublishSuccess));
  962 + pkt->data->set(StatusDescription, new SrsAmf0String("Stop publishing stream."));
  963 +
  964 + msg->set_packet(pkt, stream_id);
  965 +
  966 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  967 + srs_error("send onFCUnpublish(NetStream.unpublish.Success) message failed. ret=%d", ret);
  968 + return ret;
  969 + }
  970 + srs_info("send onFCUnpublish(NetStream.unpublish.Success) message success.");
  971 + }
  972 + // FCUnpublish response
  973 + if (true) {
  974 + SrsCommonMessage* msg = new SrsCommonMessage();
  975 + SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(unpublish_tid);
  976 +
  977 + msg->set_packet(pkt, stream_id);
  978 +
  979 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  980 + srs_error("send FCUnpublish response message failed. ret=%d", ret);
  981 + return ret;
  982 + }
  983 + srs_info("send FCUnpublish response message success.");
  984 + }
  985 + // publish response onStatus(NetStream.Unpublish.Success)
  986 + if (true) {
  987 + SrsCommonMessage* msg = new SrsCommonMessage();
  988 + SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
  989 +
  990 + pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));
  991 + pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeUnpublishSuccess));
  992 + pkt->data->set(StatusDescription, new SrsAmf0String("Stream is now unpublished"));
  993 + pkt->data->set(StatusClientId, new SrsAmf0String(RTMP_SIG_CLIENT_ID));
  994 +
  995 + msg->set_packet(pkt, stream_id);
  996 +
  997 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  998 + srs_error("send onStatus(NetStream.Unpublish.Success) message failed. ret=%d", ret);
  999 + return ret;
  1000 + }
  1001 + srs_info("send onStatus(NetStream.Unpublish.Success) message success.");
  1002 + }
  1003 +
  1004 + srs_info("FMLE unpublish success.");
  1005 +
  1006 + return ret;
  1007 +}
  1008 +
  1009 +int SrsRtmp::start_flash_publish(int stream_id)
  1010 +{
  1011 + int ret = ERROR_SUCCESS;
  1012 +
  1013 + // publish response onStatus(NetStream.Publish.Start)
  1014 + if (true) {
  1015 + SrsCommonMessage* msg = new SrsCommonMessage();
  1016 + SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
  1017 +
  1018 + pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));
  1019 + pkt->data->set(StatusCode, new SrsAmf0String(StatusCodePublishStart));
  1020 + pkt->data->set(StatusDescription, new SrsAmf0String("Started publishing stream."));
  1021 + pkt->data->set(StatusClientId, new SrsAmf0String(RTMP_SIG_CLIENT_ID));
  1022 +
  1023 + msg->set_packet(pkt, stream_id);
  1024 +
  1025 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  1026 + srs_error("send onStatus(NetStream.Publish.Start) message failed. ret=%d", ret);
  1027 + return ret;
  1028 + }
  1029 + srs_info("send onStatus(NetStream.Publish.Start) message success.");
  1030 + }
  1031 +
  1032 + srs_info("flash publish success.");
  1033 +
  1034 + return ret;
  1035 +}
  1036 +
  1037 +int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name)
  1038 +{
  1039 + int ret = ERROR_SUCCESS;
  1040 +
  1041 + if (true) {
  1042 + SrsCommonMessage* msg = new SrsCommonMessage();
  1043 + SrsCreateStreamResPacket* pkt = new SrsCreateStreamResPacket(req->transaction_id, stream_id);
  1044 +
  1045 + msg->set_packet(pkt, 0);
  1046 +
  1047 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  1048 + srs_error("send createStream response message failed. ret=%d", ret);
  1049 + return ret;
  1050 + }
  1051 + srs_info("send createStream response message success.");
  1052 + }
  1053 +
  1054 + while (true) {
  1055 + SrsCommonMessage* msg = NULL;
  1056 + if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {
  1057 + srs_error("recv identify client message failed. ret=%d", ret);
  1058 + return ret;
  1059 + }
  1060 +
  1061 + SrsAutoFree(SrsCommonMessage, msg, false);
  1062 +
  1063 + if (!msg->header.is_amf0_command() && !msg->header.is_amf3_command()) {
  1064 + srs_trace("identify ignore messages except "
  1065 + "AMF0/AMF3 command message. type=%#x", msg->header.message_type);
  1066 + continue;
  1067 + }
  1068 +
  1069 + if ((ret = msg->decode_packet(protocol)) != ERROR_SUCCESS) {
  1070 + srs_error("identify decode message failed. ret=%d", ret);
  1071 + return ret;
  1072 + }
  1073 +
  1074 + SrsPacket* pkt = msg->get_packet();
  1075 + if (dynamic_cast<SrsPlayPacket*>(pkt)) {
  1076 + SrsPlayPacket* play = dynamic_cast<SrsPlayPacket*>(pkt);
  1077 + type = SrsClientPlay;
  1078 + stream_name = play->stream_name;
  1079 + srs_trace("identity client type=play, stream_name=%s", stream_name.c_str());
  1080 + return ret;
  1081 + }
  1082 + if (dynamic_cast<SrsPublishPacket*>(pkt)) {
  1083 + srs_info("identify client by publish, falsh publish.");
  1084 + return identify_flash_publish_client(
  1085 + dynamic_cast<SrsPublishPacket*>(pkt), type, stream_name);
  1086 + }
  1087 +
  1088 + srs_trace("ignore AMF0/AMF3 command message.");
  1089 + }
  1090 +
  1091 + return ret;
  1092 +}
  1093 +
  1094 +int SrsRtmp::identify_fmle_publish_client(SrsFMLEStartPacket* req, SrsClientType& type, std::string& stream_name)
  1095 +{
  1096 + int ret = ERROR_SUCCESS;
  1097 +
  1098 + type = SrsClientFMLEPublish;
  1099 + stream_name = req->stream_name;
  1100 +
  1101 + // releaseStream response
  1102 + if (true) {
  1103 + SrsCommonMessage* msg = new SrsCommonMessage();
  1104 + SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(req->transaction_id);
  1105 +
  1106 + msg->set_packet(pkt, 0);
  1107 +
  1108 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  1109 + srs_error("send releaseStream response message failed. ret=%d", ret);
  1110 + return ret;
  1111 + }
  1112 + srs_info("send releaseStream response message success.");
  1113 + }
  1114 +
  1115 + return ret;
  1116 +}
  1117 +
  1118 +int SrsRtmp::identify_flash_publish_client(SrsPublishPacket* req, SrsClientType& type, std::string& stream_name)
  1119 +{
  1120 + int ret = ERROR_SUCCESS;
  1121 +
  1122 + type = SrsClientFlashPublish;
  1123 + stream_name = req->stream_name;
  1124 +
  1125 + return ret;
  1126 +}
  1127 +