winlin

fix bug of resolve vhost, must strip then get from config.

1 -/*  
2 -The MIT License (MIT)  
3 -  
4 -Copyright (c) 2013 winlin  
5 -  
6 -Permission is hereby granted, free of charge, to any person obtaining a copy of  
7 -this software and associated documentation files (the "Software"), to deal in  
8 -the Software without restriction, including without limitation the rights to  
9 -use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of  
10 -the Software, and to permit persons to whom the Software is furnished to do so,  
11 -subject to the following conditions:  
12 -  
13 -The above copyright notice and this permission notice shall be included in all  
14 -copies or substantial portions of the Software.  
15 -  
16 -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR  
17 -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS  
18 -FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR  
19 -COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER  
20 -IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN  
21 -CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.  
22 -*/  
23 -  
24 -#include <srs_core_rtmp.hpp>  
25 -  
26 -#include <srs_core_log.hpp>  
27 -#include <srs_core_error.hpp>  
28 -#include <srs_core_socket.hpp>  
29 -#include <srs_core_protocol.hpp>  
30 -#include <srs_core_autofree.hpp>  
31 -#include <srs_core_amf0.hpp>  
32 -#include <srs_core_handshake.hpp>  
33 -#include <srs_core_config.hpp>  
34 -  
35 -using namespace std;  
36 -  
37 -/**  
38 -* the signature for packets to client.  
39 -*/  
40 -#define RTMP_SIG_FMS_VER "3,5,3,888"  
41 -#define RTMP_SIG_AMF0_VER 0  
42 -#define RTMP_SIG_CLIENT_ID "ASAICiss"  
43 -  
44 -/**  
45 -* onStatus consts.  
46 -*/  
47 -#define StatusLevel "level"  
48 -#define StatusCode "code"  
49 -#define StatusDescription "description"  
50 -#define StatusDetails "details"  
51 -#define StatusClientId "clientid"  
52 -// status value  
53 -#define StatusLevelStatus "status"  
54 -// code value  
55 -#define StatusCodeConnectSuccess "NetConnection.Connect.Success"  
56 -#define StatusCodeStreamReset "NetStream.Play.Reset"  
57 -#define StatusCodeStreamStart "NetStream.Play.Start"  
58 -#define StatusCodeStreamPause "NetStream.Pause.Notify"  
59 -#define StatusCodeStreamUnpause "NetStream.Unpause.Notify"  
60 -#define StatusCodePublishStart "NetStream.Publish.Start"  
61 -#define StatusCodeDataStart "NetStream.Data.Start"  
62 -#define StatusCodeUnpublishSuccess "NetStream.Unpublish.Success"  
63 -  
64 -// FMLE  
65 -#define RTMP_AMF0_COMMAND_ON_FC_PUBLISH "onFCPublish"  
66 -#define RTMP_AMF0_COMMAND_ON_FC_UNPUBLISH "onFCUnpublish"  
67 -  
68 -// default stream id for response the createStream request.  
69 -#define SRS_DEFAULT_SID 1  
70 -  
71 -SrsRequest::SrsRequest()  
72 -{  
73 - objectEncoding = RTMP_SIG_AMF0_VER;  
74 -}  
75 -  
76 -SrsRequest::~SrsRequest()  
77 -{  
78 -}  
79 -  
80 -SrsRequest* SrsRequest::copy()  
81 -{  
82 - SrsRequest* cp = new SrsRequest();  
83 -  
84 - cp->app = app;  
85 - cp->objectEncoding = objectEncoding;  
86 - cp->pageUrl = pageUrl;  
87 - cp->port = port;  
88 - cp->schema = schema;  
89 - cp->stream = stream;  
90 - cp->swfUrl = swfUrl;  
91 - cp->tcUrl = tcUrl;  
92 - cp->vhost = vhost;  
93 -  
94 - return cp;  
95 -}  
96 -  
97 -int SrsRequest::discovery_app()  
98 -{  
99 - int ret = ERROR_SUCCESS;  
100 -  
101 - size_t pos = std::string::npos;  
102 - std::string url = tcUrl;  
103 -  
104 - if ((pos = url.find("://")) != std::string::npos) {  
105 - schema = url.substr(0, pos);  
106 - url = url.substr(schema.length() + 3);  
107 - srs_verbose("discovery schema=%s", schema.c_str());  
108 - }  
109 -  
110 - if ((pos = url.find("/")) != std::string::npos) {  
111 - vhost = url.substr(0, pos);  
112 - url = url.substr(vhost.length() + 1);  
113 - srs_verbose("discovery vhost=%s", vhost.c_str());  
114 - }  
115 -  
116 - port = RTMP_DEFAULT_PORTS;  
117 - if ((pos = vhost.find(":")) != std::string::npos) {  
118 - port = vhost.substr(pos + 1);  
119 - vhost = vhost.substr(0, pos);  
120 - srs_verbose("discovery vhost=%s, port=%s", vhost.c_str(), port.c_str());  
121 - }  
122 -  
123 - app = url;  
124 - srs_vhost_resolve(vhost, app);  
125 -  
126 - // resolve the vhost from config  
127 - SrsConfDirective* parsed_vhost = config->get_vhost(vhost);  
128 - if (parsed_vhost) {  
129 - vhost = parsed_vhost->arg0();  
130 - }  
131 -  
132 - // TODO: discovery the params of vhost.  
133 -  
134 - srs_info("discovery app success. schema=%s, vhost=%s, port=%s, app=%s",  
135 - schema.c_str(), vhost.c_str(), port.c_str(), app.c_str());  
136 -  
137 - if (schema.empty() || vhost.empty() || port.empty() || app.empty()) {  
138 - ret = ERROR_RTMP_REQ_TCURL;  
139 - srs_error("discovery tcUrl failed. "  
140 - "tcUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, ret=%d",  
141 - tcUrl.c_str(), schema.c_str(), vhost.c_str(), port.c_str(), app.c_str(), ret);  
142 - return ret;  
143 - }  
144 -  
145 - strip();  
146 -  
147 - return ret;  
148 -}  
149 -  
150 -string SrsRequest::get_stream_url()  
151 -{  
152 - std::string url = "";  
153 -  
154 - url += vhost;  
155 - url += "/";  
156 - url += app;  
157 - url += "/";  
158 - url += stream;  
159 -  
160 - return url;  
161 -}  
162 -  
163 -void SrsRequest::strip()  
164 -{  
165 - trim(vhost, "/ \n\r\t");  
166 - trim(app, "/ \n\r\t");  
167 - trim(stream, "/ \n\r\t");  
168 -}  
169 -  
170 -std::string& SrsRequest::trim(string& str, string chs)  
171 -{  
172 - for (int i = 0; i < (int)chs.length(); i++) {  
173 - char ch = chs.at(i);  
174 -  
175 - for (std::string::iterator it = str.begin(); it != str.end();) {  
176 - if (ch == *it) {  
177 - it = str.erase(it);  
178 - } else {  
179 - ++it;  
180 - }  
181 - }  
182 - }  
183 -  
184 - return str;  
185 -}  
186 -  
187 -SrsResponse::SrsResponse()  
188 -{  
189 - stream_id = SRS_DEFAULT_SID;  
190 -}  
191 -  
192 -SrsResponse::~SrsResponse()  
193 -{  
194 -}  
195 -  
196 -SrsRtmpClient::SrsRtmpClient(st_netfd_t _stfd)  
197 -{  
198 - stfd = _stfd;  
199 - protocol = new SrsProtocol(stfd);  
200 -}  
201 -  
202 -SrsRtmpClient::~SrsRtmpClient()  
203 -{  
204 - srs_freep(protocol);  
205 -}  
206 -  
207 -void SrsRtmpClient::set_recv_timeout(int64_t timeout_us)  
208 -{  
209 - protocol->set_recv_timeout(timeout_us);  
210 -}  
211 -  
212 -void SrsRtmpClient::set_send_timeout(int64_t timeout_us)  
213 -{  
214 - protocol->set_send_timeout(timeout_us);  
215 -}  
216 -  
217 -int64_t SrsRtmpClient::get_recv_bytes()  
218 -{  
219 - return protocol->get_recv_bytes();  
220 -}  
221 -  
222 -int64_t SrsRtmpClient::get_send_bytes()  
223 -{  
224 - return protocol->get_send_bytes();  
225 -}  
226 -  
227 -int SrsRtmpClient::get_recv_kbps()  
228 -{  
229 - return protocol->get_recv_kbps();  
230 -}  
231 -  
232 -int SrsRtmpClient::get_send_kbps()  
233 -{  
234 - return protocol->get_send_kbps();  
235 -}  
236 -  
237 -int SrsRtmpClient::recv_message(SrsCommonMessage** pmsg)  
238 -{  
239 - return protocol->recv_message(pmsg);  
240 -}  
241 -  
242 -int SrsRtmpClient::send_message(ISrsMessage* msg)  
243 -{  
244 - return protocol->send_message(msg);  
245 -}  
246 -  
247 -int SrsRtmpClient::handshake()  
248 -{  
249 - int ret = ERROR_SUCCESS;  
250 -  
251 - SrsSocket skt(stfd);  
252 -  
253 - skt.set_recv_timeout(protocol->get_recv_timeout());  
254 - skt.set_send_timeout(protocol->get_send_timeout());  
255 -  
256 - SrsComplexHandshake complex_hs;  
257 - SrsSimpleHandshake simple_hs;  
258 - if ((ret = simple_hs.handshake_with_server(skt, complex_hs)) != ERROR_SUCCESS) {  
259 - return ret;  
260 - }  
261 -  
262 - return ret;  
263 -}  
264 -  
265 -int SrsRtmpClient::connect_app(string app, string tc_url)  
266 -{  
267 - int ret = ERROR_SUCCESS;  
268 -  
269 - // Connect(vhost, app)  
270 - if (true) {  
271 - SrsCommonMessage* msg = new SrsCommonMessage();  
272 - SrsConnectAppPacket* pkt = new SrsConnectAppPacket();  
273 - msg->set_packet(pkt, 0);  
274 -  
275 - pkt->command_object = new SrsAmf0Object();  
276 - pkt->command_object->set("app", new SrsAmf0String(app.c_str()));  
277 - pkt->command_object->set("swfUrl", new SrsAmf0String());  
278 - pkt->command_object->set("tcUrl", new SrsAmf0String(tc_url.c_str()));  
279 - pkt->command_object->set("fpad", new SrsAmf0Boolean(false));  
280 - pkt->command_object->set("capabilities", new SrsAmf0Number(239));  
281 - pkt->command_object->set("audioCodecs", new SrsAmf0Number(3575));  
282 - pkt->command_object->set("videoCodecs", new SrsAmf0Number(252));  
283 - pkt->command_object->set("videoFunction", new SrsAmf0Number(1));  
284 - pkt->command_object->set("pageUrl", new SrsAmf0String());  
285 - pkt->command_object->set("objectEncoding", new SrsAmf0Number(0));  
286 -  
287 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
288 - return ret;  
289 - }  
290 - }  
291 -  
292 - // Set Window Acknowledgement size(2500000)  
293 - if (true) {  
294 - SrsCommonMessage* msg = new SrsCommonMessage();  
295 - SrsSetWindowAckSizePacket* pkt = new SrsSetWindowAckSizePacket();  
296 -  
297 - pkt->ackowledgement_window_size = 2500000;  
298 - msg->set_packet(pkt, 0);  
299 -  
300 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
301 - return ret;  
302 - }  
303 - }  
304 -  
305 - // expect connect _result  
306 - SrsCommonMessage* msg = NULL;  
307 - SrsConnectAppResPacket* pkt = NULL;  
308 - if ((ret = srs_rtmp_expect_message<SrsConnectAppResPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {  
309 - srs_error("expect connect app response message failed. ret=%d", ret);  
310 - return ret;  
311 - }  
312 - SrsAutoFree(SrsCommonMessage, msg, false);  
313 - srs_info("get connect app response message");  
314 -  
315 - return ret;  
316 -}  
317 -  
318 -int SrsRtmpClient::create_stream(int& stream_id)  
319 -{  
320 - int ret = ERROR_SUCCESS;  
321 -  
322 - // CreateStream  
323 - if (true) {  
324 - SrsCommonMessage* msg = new SrsCommonMessage();  
325 - SrsCreateStreamPacket* pkt = new SrsCreateStreamPacket();  
326 -  
327 - msg->set_packet(pkt, 0);  
328 -  
329 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
330 - return ret;  
331 - }  
332 - }  
333 -  
334 - // CreateStream _result.  
335 - if (true) {  
336 - SrsCommonMessage* msg = NULL;  
337 - SrsCreateStreamResPacket* pkt = NULL;  
338 - if ((ret = srs_rtmp_expect_message<SrsCreateStreamResPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {  
339 - srs_error("expect create stream response message failed. ret=%d", ret);  
340 - return ret;  
341 - }  
342 - SrsAutoFree(SrsCommonMessage, msg, false);  
343 - srs_info("get create stream response message");  
344 -  
345 - stream_id = (int)pkt->stream_id;  
346 - }  
347 -  
348 - return ret;  
349 -}  
350 -  
351 -int SrsRtmpClient::play(string stream, int stream_id)  
352 -{  
353 - int ret = ERROR_SUCCESS;  
354 -  
355 - // Play(stream)  
356 - if (true) {  
357 - SrsCommonMessage* msg = new SrsCommonMessage();  
358 - SrsPlayPacket* pkt = new SrsPlayPacket();  
359 -  
360 - pkt->stream_name = stream;  
361 - msg->set_packet(pkt, stream_id);  
362 -  
363 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
364 - srs_error("send play stream failed. "  
365 - "stream=%s, stream_id=%d, ret=%d",  
366 - stream.c_str(), stream_id, ret);  
367 - return ret;  
368 - }  
369 - }  
370 -  
371 - // SetBufferLength(1000ms)  
372 - int buffer_length_ms = 1000;  
373 - if (true) {  
374 - SrsCommonMessage* msg = new SrsCommonMessage();  
375 - SrsUserControlPacket* pkt = new SrsUserControlPacket();  
376 -  
377 - pkt->event_type = SrcPCUCSetBufferLength;  
378 - pkt->event_data = stream_id;  
379 - pkt->extra_data = buffer_length_ms;  
380 - msg->set_packet(pkt, 0);  
381 -  
382 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
383 - srs_error("send set buffer length failed. "  
384 - "stream=%s, stream_id=%d, bufferLength=%d, ret=%d",  
385 - stream.c_str(), stream_id, buffer_length_ms, ret);  
386 - return ret;  
387 - }  
388 - }  
389 -  
390 - return ret;  
391 -}  
392 -  
393 -int SrsRtmpClient::publish(string stream, int stream_id)  
394 -{  
395 - int ret = ERROR_SUCCESS;  
396 -  
397 - // publish(stream)  
398 - if (true) {  
399 - SrsCommonMessage* msg = new SrsCommonMessage();  
400 - SrsPublishPacket* pkt = new SrsPublishPacket();  
401 -  
402 - pkt->stream_name = stream;  
403 - msg->set_packet(pkt, stream_id);  
404 -  
405 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
406 - srs_error("send publish message failed. "  
407 - "stream=%s, stream_id=%d, ret=%d",  
408 - stream.c_str(), stream_id, ret);  
409 - return ret;  
410 - }  
411 - }  
412 -  
413 - return ret;  
414 -}  
415 -  
416 -SrsRtmp::SrsRtmp(st_netfd_t client_stfd)  
417 -{  
418 - protocol = new SrsProtocol(client_stfd);  
419 - stfd = client_stfd;  
420 -}  
421 -  
422 -SrsRtmp::~SrsRtmp()  
423 -{  
424 - srs_freep(protocol);  
425 -}  
426 -  
427 -SrsProtocol* SrsRtmp::get_protocol()  
428 -{  
429 - return protocol;  
430 -}  
431 -  
432 -void SrsRtmp::set_recv_timeout(int64_t timeout_us)  
433 -{  
434 - protocol->set_recv_timeout(timeout_us);  
435 -}  
436 -  
437 -int64_t SrsRtmp::get_recv_timeout()  
438 -{  
439 - return protocol->get_recv_timeout();  
440 -}  
441 -  
442 -void SrsRtmp::set_send_timeout(int64_t timeout_us)  
443 -{  
444 - protocol->set_send_timeout(timeout_us);  
445 -}  
446 -  
447 -int64_t SrsRtmp::get_send_timeout()  
448 -{  
449 - return protocol->get_send_timeout();  
450 -}  
451 -  
452 -int64_t SrsRtmp::get_recv_bytes()  
453 -{  
454 - return protocol->get_recv_bytes();  
455 -}  
456 -  
457 -int64_t SrsRtmp::get_send_bytes()  
458 -{  
459 - return protocol->get_send_bytes();  
460 -}  
461 -  
462 -int SrsRtmp::get_recv_kbps()  
463 -{  
464 - return protocol->get_recv_kbps();  
465 -}  
466 -  
467 -int SrsRtmp::get_send_kbps()  
468 -{  
469 - return protocol->get_send_kbps();  
470 -}  
471 -  
472 -int SrsRtmp::recv_message(SrsCommonMessage** pmsg)  
473 -{  
474 - return protocol->recv_message(pmsg);  
475 -}  
476 -  
477 -int SrsRtmp::send_message(ISrsMessage* msg)  
478 -{  
479 - return protocol->send_message(msg);  
480 -}  
481 -  
482 -int SrsRtmp::handshake()  
483 -{  
484 - int ret = ERROR_SUCCESS;  
485 -  
486 - SrsSocket skt(stfd);  
487 -  
488 - skt.set_recv_timeout(protocol->get_recv_timeout());  
489 - skt.set_send_timeout(protocol->get_send_timeout());  
490 -  
491 - SrsComplexHandshake complex_hs;  
492 - SrsSimpleHandshake simple_hs;  
493 - if ((ret = simple_hs.handshake_with_client(skt, complex_hs)) != ERROR_SUCCESS) {  
494 - return ret;  
495 - }  
496 -  
497 - return ret;  
498 -}  
499 -  
500 -int SrsRtmp::connect_app(SrsRequest* req)  
501 -{  
502 - int ret = ERROR_SUCCESS;  
503 -  
504 - SrsCommonMessage* msg = NULL;  
505 - SrsConnectAppPacket* pkt = NULL;  
506 - if ((ret = srs_rtmp_expect_message<SrsConnectAppPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {  
507 - srs_error("expect connect app message failed. ret=%d", ret);  
508 - return ret;  
509 - }  
510 - SrsAutoFree(SrsCommonMessage, msg, false);  
511 - srs_info("get connect app message");  
512 -  
513 - SrsAmf0Any* prop = NULL;  
514 -  
515 - if ((prop = pkt->command_object->ensure_property_string("tcUrl")) == NULL) {  
516 - ret = ERROR_RTMP_REQ_CONNECT;  
517 - srs_error("invalid request, must specifies the tcUrl. ret=%d", ret);  
518 - return ret;  
519 - }  
520 - req->tcUrl = srs_amf0_convert<SrsAmf0String>(prop)->value;  
521 -  
522 - if ((prop = pkt->command_object->ensure_property_string("pageUrl")) != NULL) {  
523 - req->pageUrl = srs_amf0_convert<SrsAmf0String>(prop)->value;  
524 - }  
525 -  
526 - if ((prop = pkt->command_object->ensure_property_string("swfUrl")) != NULL) {  
527 - req->swfUrl = srs_amf0_convert<SrsAmf0String>(prop)->value;  
528 - }  
529 -  
530 - if ((prop = pkt->command_object->ensure_property_number("objectEncoding")) != NULL) {  
531 - req->objectEncoding = srs_amf0_convert<SrsAmf0Number>(prop)->value;  
532 - }  
533 -  
534 - srs_info("get connect app message params success.");  
535 -  
536 - return req->discovery_app();  
537 -}  
538 -  
539 -int SrsRtmp::set_window_ack_size(int ack_size)  
540 -{  
541 - int ret = ERROR_SUCCESS;  
542 -  
543 - SrsCommonMessage* msg = new SrsCommonMessage();  
544 - SrsSetWindowAckSizePacket* pkt = new SrsSetWindowAckSizePacket();  
545 -  
546 - pkt->ackowledgement_window_size = ack_size;  
547 - msg->set_packet(pkt, 0);  
548 -  
549 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
550 - srs_error("send ack size message failed. ret=%d", ret);  
551 - return ret;  
552 - }  
553 - srs_info("send ack size message success. ack_size=%d", ack_size);  
554 -  
555 - return ret;  
556 -}  
557 -  
558 -int SrsRtmp::set_peer_bandwidth(int bandwidth, int type)  
559 -{  
560 - int ret = ERROR_SUCCESS;  
561 -  
562 - SrsCommonMessage* msg = new SrsCommonMessage();  
563 - SrsSetPeerBandwidthPacket* pkt = new SrsSetPeerBandwidthPacket();  
564 -  
565 - pkt->bandwidth = bandwidth;  
566 - pkt->type = type;  
567 - msg->set_packet(pkt, 0);  
568 -  
569 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
570 - srs_error("send set bandwidth message failed. ret=%d", ret);  
571 - return ret;  
572 - }  
573 - srs_info("send set bandwidth message "  
574 - "success. bandwidth=%d, type=%d", bandwidth, type);  
575 -  
576 - return ret;  
577 -}  
578 -  
579 -int SrsRtmp::response_connect_app(SrsRequest* req)  
580 -{  
581 - int ret = ERROR_SUCCESS;  
582 -  
583 - SrsCommonMessage* msg = new SrsCommonMessage();  
584 - SrsConnectAppResPacket* pkt = new SrsConnectAppResPacket();  
585 -  
586 - pkt->props->set("fmsVer", new SrsAmf0String("FMS/"RTMP_SIG_FMS_VER));  
587 - pkt->props->set("capabilities", new SrsAmf0Number(127));  
588 - pkt->props->set("mode", new SrsAmf0Number(1));  
589 -  
590 - pkt->info->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));  
591 - pkt->info->set(StatusCode, new SrsAmf0String(StatusCodeConnectSuccess));  
592 - pkt->info->set(StatusDescription, new SrsAmf0String("Connection succeeded"));  
593 - pkt->info->set("objectEncoding", new SrsAmf0Number(req->objectEncoding));  
594 - SrsASrsAmf0EcmaArray* data = new SrsASrsAmf0EcmaArray();  
595 - pkt->info->set("data", data);  
596 -  
597 - data->set("srs_version", new SrsAmf0String(RTMP_SIG_FMS_VER));  
598 - data->set("srs_server", new SrsAmf0String(RTMP_SIG_SRS_KEY" "RTMP_SIG_SRS_VERSION" ("RTMP_SIG_SRS_URL_SHORT")"));  
599 - data->set("srs_license", new SrsAmf0String(RTMP_SIG_SRS_LICENSE));  
600 - data->set("srs_role", new SrsAmf0String(RTMP_SIG_SRS_ROLE));  
601 - data->set("srs_url", new SrsAmf0String(RTMP_SIG_SRS_URL));  
602 - data->set("srs_version", new SrsAmf0String(RTMP_SIG_SRS_VERSION));  
603 - data->set("srs_site", new SrsAmf0String(RTMP_SIG_SRS_WEB));  
604 - data->set("srs_email", new SrsAmf0String(RTMP_SIG_SRS_EMAIL));  
605 - data->set("srs_copyright", new SrsAmf0String(RTMP_SIG_SRS_COPYRIGHT));  
606 - data->set("srs_contributor", new SrsAmf0String(RTMP_SIG_SRS_CONTRIBUTOR));  
607 -  
608 - msg->set_packet(pkt, 0);  
609 -  
610 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
611 - srs_error("send connect app response message failed. ret=%d", ret);  
612 - return ret;  
613 - }  
614 - srs_info("send connect app response message success.");  
615 -  
616 - return ret;  
617 -}  
618 -  
619 -int SrsRtmp::on_bw_done()  
620 -{  
621 - int ret = ERROR_SUCCESS;  
622 -  
623 - SrsCommonMessage* msg = new SrsCommonMessage();  
624 - SrsOnBWDonePacket* pkt = new SrsOnBWDonePacket();  
625 -  
626 - msg->set_packet(pkt, 0);  
627 -  
628 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
629 - srs_error("send onBWDone message failed. ret=%d", ret);  
630 - return ret;  
631 - }  
632 - srs_info("send onBWDone message success.");  
633 -  
634 - return ret;  
635 -}  
636 -  
637 -int SrsRtmp::identify_client(int stream_id, SrsClientType& type, std::string& stream_name)  
638 -{  
639 - type = SrsClientUnknown;  
640 - int ret = ERROR_SUCCESS;  
641 -  
642 - while (true) {  
643 - SrsCommonMessage* msg = NULL;  
644 - if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {  
645 - srs_error("recv identify client message failed. ret=%d", ret);  
646 - return ret;  
647 - }  
648 -  
649 - SrsAutoFree(SrsCommonMessage, msg, false);  
650 -  
651 - if (!msg->header.is_amf0_command() && !msg->header.is_amf3_command()) {  
652 - srs_trace("identify ignore messages except "  
653 - "AMF0/AMF3 command message. type=%#x", msg->header.message_type);  
654 - continue;  
655 - }  
656 -  
657 - if ((ret = msg->decode_packet(protocol)) != ERROR_SUCCESS) {  
658 - srs_error("identify decode message failed. ret=%d", ret);  
659 - return ret;  
660 - }  
661 -  
662 - SrsPacket* pkt = msg->get_packet();  
663 - if (dynamic_cast<SrsCreateStreamPacket*>(pkt)) {  
664 - srs_info("identify client by create stream, play or flash publish.");  
665 - return identify_create_stream_client(  
666 - dynamic_cast<SrsCreateStreamPacket*>(pkt), stream_id, type, stream_name);  
667 - }  
668 - if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) {  
669 - srs_info("identify client by releaseStream, fmle publish.");  
670 - return identify_fmle_publish_client(  
671 - dynamic_cast<SrsFMLEStartPacket*>(pkt), type, stream_name);  
672 - }  
673 -  
674 - srs_trace("ignore AMF0/AMF3 command message.");  
675 - }  
676 -  
677 - return ret;  
678 -}  
679 -  
680 -int SrsRtmp::set_chunk_size(int chunk_size)  
681 -{  
682 - int ret = ERROR_SUCCESS;  
683 -  
684 - SrsCommonMessage* msg = new SrsCommonMessage();  
685 - SrsSetChunkSizePacket* pkt = new SrsSetChunkSizePacket();  
686 -  
687 - pkt->chunk_size = chunk_size;  
688 - msg->set_packet(pkt, 0);  
689 -  
690 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
691 - srs_error("send set chunk size message failed. ret=%d", ret);  
692 - return ret;  
693 - }  
694 - srs_info("send set chunk size message success. chunk_size=%d", chunk_size);  
695 -  
696 - return ret;  
697 -}  
698 -  
699 -int SrsRtmp::start_play(int stream_id)  
700 -{  
701 - int ret = ERROR_SUCCESS;  
702 -  
703 - // StreamBegin  
704 - if (true) {  
705 - SrsCommonMessage* msg = new SrsCommonMessage();  
706 - SrsUserControlPacket* pkt = new SrsUserControlPacket();  
707 -  
708 - pkt->event_type = SrcPCUCStreamBegin;  
709 - pkt->event_data = stream_id;  
710 - msg->set_packet(pkt, 0);  
711 -  
712 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
713 - srs_error("send PCUC(StreamBegin) message failed. ret=%d", ret);  
714 - return ret;  
715 - }  
716 - srs_info("send PCUC(StreamBegin) message success.");  
717 - }  
718 -  
719 - // onStatus(NetStream.Play.Reset)  
720 - if (true) {  
721 - SrsCommonMessage* msg = new SrsCommonMessage();  
722 - SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();  
723 -  
724 - pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));  
725 - pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeStreamReset));  
726 - pkt->data->set(StatusDescription, new SrsAmf0String("Playing and resetting stream."));  
727 - pkt->data->set(StatusDetails, new SrsAmf0String("stream"));  
728 - pkt->data->set(StatusClientId, new SrsAmf0String(RTMP_SIG_CLIENT_ID));  
729 -  
730 - msg->set_packet(pkt, stream_id);  
731 -  
732 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
733 - srs_error("send onStatus(NetStream.Play.Reset) message failed. ret=%d", ret);  
734 - return ret;  
735 - }  
736 - srs_info("send onStatus(NetStream.Play.Reset) message success.");  
737 - }  
738 -  
739 - // onStatus(NetStream.Play.Start)  
740 - if (true) {  
741 - SrsCommonMessage* msg = new SrsCommonMessage();  
742 - SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();  
743 -  
744 - pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));  
745 - pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeStreamStart));  
746 - pkt->data->set(StatusDescription, new SrsAmf0String("Started playing stream."));  
747 - pkt->data->set(StatusDetails, new SrsAmf0String("stream"));  
748 - pkt->data->set(StatusClientId, new SrsAmf0String(RTMP_SIG_CLIENT_ID));  
749 -  
750 - msg->set_packet(pkt, stream_id);  
751 -  
752 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
753 - srs_error("send onStatus(NetStream.Play.Reset) message failed. ret=%d", ret);  
754 - return ret;  
755 - }  
756 - srs_info("send onStatus(NetStream.Play.Reset) message success.");  
757 - }  
758 -  
759 - // |RtmpSampleAccess(false, false)  
760 - if (true) {  
761 - SrsCommonMessage* msg = new SrsCommonMessage();  
762 - SrsSampleAccessPacket* pkt = new SrsSampleAccessPacket();  
763 -  
764 - msg->set_packet(pkt, stream_id);  
765 -  
766 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
767 - srs_error("send |RtmpSampleAccess(false, false) message failed. ret=%d", ret);  
768 - return ret;  
769 - }  
770 - srs_info("send |RtmpSampleAccess(false, false) message success.");  
771 - }  
772 -  
773 - // onStatus(NetStream.Data.Start)  
774 - if (true) {  
775 - SrsCommonMessage* msg = new SrsCommonMessage();  
776 - SrsOnStatusDataPacket* pkt = new SrsOnStatusDataPacket();  
777 -  
778 - pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeDataStart));  
779 -  
780 - msg->set_packet(pkt, stream_id);  
781 -  
782 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
783 - srs_error("send onStatus(NetStream.Data.Start) message failed. ret=%d", ret);  
784 - return ret;  
785 - }  
786 - srs_info("send onStatus(NetStream.Data.Start) message success.");  
787 - }  
788 -  
789 - srs_info("start play success.");  
790 -  
791 - return ret;  
792 -}  
793 -  
794 -int SrsRtmp::on_play_client_pause(int stream_id, bool is_pause)  
795 -{  
796 - int ret = ERROR_SUCCESS;  
797 -  
798 - if (is_pause) {  
799 - // onStatus(NetStream.Pause.Notify)  
800 - if (true) {  
801 - SrsCommonMessage* msg = new SrsCommonMessage();  
802 - SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();  
803 -  
804 - pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));  
805 - pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeStreamPause));  
806 - pkt->data->set(StatusDescription, new SrsAmf0String("Paused stream."));  
807 -  
808 - msg->set_packet(pkt, stream_id);  
809 -  
810 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
811 - srs_error("send onStatus(NetStream.Pause.Notify) message failed. ret=%d", ret);  
812 - return ret;  
813 - }  
814 - srs_info("send onStatus(NetStream.Pause.Notify) message success.");  
815 - }  
816 - // StreamEOF  
817 - if (true) {  
818 - SrsCommonMessage* msg = new SrsCommonMessage();  
819 - SrsUserControlPacket* pkt = new SrsUserControlPacket();  
820 -  
821 - pkt->event_type = SrcPCUCStreamEOF;  
822 - pkt->event_data = stream_id;  
823 - msg->set_packet(pkt, 0);  
824 -  
825 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
826 - srs_error("send PCUC(StreamEOF) message failed. ret=%d", ret);  
827 - return ret;  
828 - }  
829 - srs_info("send PCUC(StreamEOF) message success.");  
830 - }  
831 - } else {  
832 - // onStatus(NetStream.Unpause.Notify)  
833 - if (true) {  
834 - SrsCommonMessage* msg = new SrsCommonMessage();  
835 - SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();  
836 -  
837 - pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));  
838 - pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeStreamUnpause));  
839 - pkt->data->set(StatusDescription, new SrsAmf0String("Unpaused stream."));  
840 -  
841 - msg->set_packet(pkt, stream_id);  
842 -  
843 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
844 - srs_error("send onStatus(NetStream.Unpause.Notify) message failed. ret=%d", ret);  
845 - return ret;  
846 - }  
847 - srs_info("send onStatus(NetStream.Unpause.Notify) message success.");  
848 - }  
849 - // StreanBegin  
850 - if (true) {  
851 - SrsCommonMessage* msg = new SrsCommonMessage();  
852 - SrsUserControlPacket* pkt = new SrsUserControlPacket();  
853 -  
854 - pkt->event_type = SrcPCUCStreamBegin;  
855 - pkt->event_data = stream_id;  
856 - msg->set_packet(pkt, 0);  
857 -  
858 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
859 - srs_error("send PCUC(StreanBegin) message failed. ret=%d", ret);  
860 - return ret;  
861 - }  
862 - srs_info("send PCUC(StreanBegin) message success.");  
863 - }  
864 - }  
865 -  
866 - return ret;  
867 -}  
868 -  
869 -int SrsRtmp::start_fmle_publish(int stream_id)  
870 -{  
871 - int ret = ERROR_SUCCESS;  
872 -  
873 - // FCPublish  
874 - double fc_publish_tid = 0;  
875 - if (true) {  
876 - SrsCommonMessage* msg = NULL;  
877 - SrsFMLEStartPacket* pkt = NULL;  
878 - if ((ret = srs_rtmp_expect_message<SrsFMLEStartPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {  
879 - srs_error("recv FCPublish message failed. ret=%d", ret);  
880 - return ret;  
881 - }  
882 - srs_info("recv FCPublish request message success.");  
883 -  
884 - SrsAutoFree(SrsCommonMessage, msg, false);  
885 - fc_publish_tid = pkt->transaction_id;  
886 - }  
887 - // FCPublish response  
888 - if (true) {  
889 - SrsCommonMessage* msg = new SrsCommonMessage();  
890 - SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(fc_publish_tid);  
891 -  
892 - msg->set_packet(pkt, 0);  
893 -  
894 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
895 - srs_error("send FCPublish response message failed. ret=%d", ret);  
896 - return ret;  
897 - }  
898 - srs_info("send FCPublish response message success.");  
899 - }  
900 -  
901 - // createStream  
902 - double create_stream_tid = 0;  
903 - if (true) {  
904 - SrsCommonMessage* msg = NULL;  
905 - SrsCreateStreamPacket* pkt = NULL;  
906 - if ((ret = srs_rtmp_expect_message<SrsCreateStreamPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {  
907 - srs_error("recv createStream message failed. ret=%d", ret);  
908 - return ret;  
909 - }  
910 - srs_info("recv createStream request message success.");  
911 -  
912 - SrsAutoFree(SrsCommonMessage, msg, false);  
913 - create_stream_tid = pkt->transaction_id;  
914 - }  
915 - // createStream response  
916 - if (true) {  
917 - SrsCommonMessage* msg = new SrsCommonMessage();  
918 - SrsCreateStreamResPacket* pkt = new SrsCreateStreamResPacket(create_stream_tid, stream_id);  
919 -  
920 - msg->set_packet(pkt, 0);  
921 -  
922 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
923 - srs_error("send createStream response message failed. ret=%d", ret);  
924 - return ret;  
925 - }  
926 - srs_info("send createStream response message success.");  
927 - }  
928 -  
929 - // publish  
930 - if (true) {  
931 - SrsCommonMessage* msg = NULL;  
932 - SrsPublishPacket* pkt = NULL;  
933 - if ((ret = srs_rtmp_expect_message<SrsPublishPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {  
934 - srs_error("recv publish message failed. ret=%d", ret);  
935 - return ret;  
936 - }  
937 - srs_info("recv publish request message success.");  
938 -  
939 - SrsAutoFree(SrsCommonMessage, msg, false);  
940 - }  
941 - // publish response onFCPublish(NetStream.Publish.Start)  
942 - if (true) {  
943 - SrsCommonMessage* msg = new SrsCommonMessage();  
944 - SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();  
945 -  
946 - pkt->command_name = RTMP_AMF0_COMMAND_ON_FC_PUBLISH;  
947 - pkt->data->set(StatusCode, new SrsAmf0String(StatusCodePublishStart));  
948 - pkt->data->set(StatusDescription, new SrsAmf0String("Started publishing stream."));  
949 -  
950 - msg->set_packet(pkt, stream_id);  
951 -  
952 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
953 - srs_error("send onFCPublish(NetStream.Publish.Start) message failed. ret=%d", ret);  
954 - return ret;  
955 - }  
956 - srs_info("send onFCPublish(NetStream.Publish.Start) message success.");  
957 - }  
958 - // publish response onStatus(NetStream.Publish.Start)  
959 - if (true) {  
960 - SrsCommonMessage* msg = new SrsCommonMessage();  
961 - SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();  
962 -  
963 - pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));  
964 - pkt->data->set(StatusCode, new SrsAmf0String(StatusCodePublishStart));  
965 - pkt->data->set(StatusDescription, new SrsAmf0String("Started publishing stream."));  
966 - pkt->data->set(StatusClientId, new SrsAmf0String(RTMP_SIG_CLIENT_ID));  
967 -  
968 - msg->set_packet(pkt, stream_id);  
969 -  
970 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
971 - srs_error("send onStatus(NetStream.Publish.Start) message failed. ret=%d", ret);  
972 - return ret;  
973 - }  
974 - srs_info("send onStatus(NetStream.Publish.Start) message success.");  
975 - }  
976 -  
977 - srs_info("FMLE publish success.");  
978 -  
979 - return ret;  
980 -}  
981 -  
982 -int SrsRtmp::fmle_unpublish(int stream_id, double unpublish_tid)  
983 -{  
984 - int ret = ERROR_SUCCESS;  
985 -  
986 - // publish response onFCUnpublish(NetStream.unpublish.Success)  
987 - if (true) {  
988 - SrsCommonMessage* msg = new SrsCommonMessage();  
989 - SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();  
990 -  
991 - pkt->command_name = RTMP_AMF0_COMMAND_ON_FC_UNPUBLISH;  
992 - pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeUnpublishSuccess));  
993 - pkt->data->set(StatusDescription, new SrsAmf0String("Stop publishing stream."));  
994 -  
995 - msg->set_packet(pkt, stream_id);  
996 -  
997 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
998 - srs_error("send onFCUnpublish(NetStream.unpublish.Success) message failed. ret=%d", ret);  
999 - return ret;  
1000 - }  
1001 - srs_info("send onFCUnpublish(NetStream.unpublish.Success) message success.");  
1002 - }  
1003 - // FCUnpublish response  
1004 - if (true) {  
1005 - SrsCommonMessage* msg = new SrsCommonMessage();  
1006 - SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(unpublish_tid);  
1007 -  
1008 - msg->set_packet(pkt, stream_id);  
1009 -  
1010 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
1011 - srs_error("send FCUnpublish response message failed. ret=%d", ret);  
1012 - return ret;  
1013 - }  
1014 - srs_info("send FCUnpublish response message success.");  
1015 - }  
1016 - // publish response onStatus(NetStream.Unpublish.Success)  
1017 - if (true) {  
1018 - SrsCommonMessage* msg = new SrsCommonMessage();  
1019 - SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();  
1020 -  
1021 - pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));  
1022 - pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeUnpublishSuccess));  
1023 - pkt->data->set(StatusDescription, new SrsAmf0String("Stream is now unpublished"));  
1024 - pkt->data->set(StatusClientId, new SrsAmf0String(RTMP_SIG_CLIENT_ID));  
1025 -  
1026 - msg->set_packet(pkt, stream_id);  
1027 -  
1028 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
1029 - srs_error("send onStatus(NetStream.Unpublish.Success) message failed. ret=%d", ret);  
1030 - return ret;  
1031 - }  
1032 - srs_info("send onStatus(NetStream.Unpublish.Success) message success.");  
1033 - }  
1034 -  
1035 - srs_info("FMLE unpublish success.");  
1036 -  
1037 - return ret;  
1038 -}  
1039 -  
1040 -int SrsRtmp::start_flash_publish(int stream_id)  
1041 -{  
1042 - int ret = ERROR_SUCCESS;  
1043 -  
1044 - // publish response onStatus(NetStream.Publish.Start)  
1045 - if (true) {  
1046 - SrsCommonMessage* msg = new SrsCommonMessage();  
1047 - SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();  
1048 -  
1049 - pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));  
1050 - pkt->data->set(StatusCode, new SrsAmf0String(StatusCodePublishStart));  
1051 - pkt->data->set(StatusDescription, new SrsAmf0String("Started publishing stream."));  
1052 - pkt->data->set(StatusClientId, new SrsAmf0String(RTMP_SIG_CLIENT_ID));  
1053 -  
1054 - msg->set_packet(pkt, stream_id);  
1055 -  
1056 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
1057 - srs_error("send onStatus(NetStream.Publish.Start) message failed. ret=%d", ret);  
1058 - return ret;  
1059 - }  
1060 - srs_info("send onStatus(NetStream.Publish.Start) message success.");  
1061 - }  
1062 -  
1063 - srs_info("flash publish success.");  
1064 -  
1065 - return ret;  
1066 -}  
1067 -  
1068 -int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, string& stream_name)  
1069 -{  
1070 - int ret = ERROR_SUCCESS;  
1071 -  
1072 - if (true) {  
1073 - SrsCommonMessage* msg = new SrsCommonMessage();  
1074 - SrsCreateStreamResPacket* pkt = new SrsCreateStreamResPacket(req->transaction_id, stream_id);  
1075 -  
1076 - msg->set_packet(pkt, 0);  
1077 -  
1078 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
1079 - srs_error("send createStream response message failed. ret=%d", ret);  
1080 - return ret;  
1081 - }  
1082 - srs_info("send createStream response message success.");  
1083 - }  
1084 -  
1085 - while (true) {  
1086 - SrsCommonMessage* msg = NULL;  
1087 - if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {  
1088 - srs_error("recv identify client message failed. ret=%d", ret);  
1089 - return ret;  
1090 - }  
1091 -  
1092 - SrsAutoFree(SrsCommonMessage, msg, false);  
1093 -  
1094 - if (!msg->header.is_amf0_command() && !msg->header.is_amf3_command()) {  
1095 - srs_trace("identify ignore messages except "  
1096 - "AMF0/AMF3 command message. type=%#x", msg->header.message_type);  
1097 - continue;  
1098 - }  
1099 -  
1100 - if ((ret = msg->decode_packet(protocol)) != ERROR_SUCCESS) {  
1101 - srs_error("identify decode message failed. ret=%d", ret);  
1102 - return ret;  
1103 - }  
1104 -  
1105 - SrsPacket* pkt = msg->get_packet();  
1106 - if (dynamic_cast<SrsPlayPacket*>(pkt)) {  
1107 - SrsPlayPacket* play = dynamic_cast<SrsPlayPacket*>(pkt);  
1108 - type = SrsClientPlay;  
1109 - stream_name = play->stream_name;  
1110 - srs_trace("identity client type=play, stream_name=%s", stream_name.c_str());  
1111 - return ret;  
1112 - }  
1113 - if (dynamic_cast<SrsPublishPacket*>(pkt)) {  
1114 - srs_info("identify client by publish, falsh publish.");  
1115 - return identify_flash_publish_client(  
1116 - dynamic_cast<SrsPublishPacket*>(pkt), type, stream_name);  
1117 - }  
1118 -  
1119 - srs_trace("ignore AMF0/AMF3 command message.");  
1120 - }  
1121 -  
1122 - return ret;  
1123 -}  
1124 -  
1125 -int SrsRtmp::identify_fmle_publish_client(SrsFMLEStartPacket* req, SrsClientType& type, string& stream_name)  
1126 -{  
1127 - int ret = ERROR_SUCCESS;  
1128 -  
1129 - type = SrsClientFMLEPublish;  
1130 - stream_name = req->stream_name;  
1131 -  
1132 - // releaseStream response  
1133 - if (true) {  
1134 - SrsCommonMessage* msg = new SrsCommonMessage();  
1135 - SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(req->transaction_id);  
1136 -  
1137 - msg->set_packet(pkt, 0);  
1138 -  
1139 - if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {  
1140 - srs_error("send releaseStream response message failed. ret=%d", ret);  
1141 - return ret;  
1142 - }  
1143 - srs_info("send releaseStream response message success.");  
1144 - }  
1145 -  
1146 - return ret;  
1147 -}  
1148 -  
1149 -int SrsRtmp::identify_flash_publish_client(SrsPublishPacket* req, SrsClientType& type, string& stream_name)  
1150 -{  
1151 - int ret = ERROR_SUCCESS;  
1152 -  
1153 - type = SrsClientFlashPublish;  
1154 - stream_name = req->stream_name;  
1155 -  
1156 - return ret;  
1157 -}  
1158 - 1 +/*
  2 +The MIT License (MIT)
  3 +
  4 +Copyright (c) 2013 winlin
  5 +
  6 +Permission is hereby granted, free of charge, to any person obtaining a copy of
  7 +this software and associated documentation files (the "Software"), to deal in
  8 +the Software without restriction, including without limitation the rights to
  9 +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
  10 +the Software, and to permit persons to whom the Software is furnished to do so,
  11 +subject to the following conditions:
  12 +
  13 +The above copyright notice and this permission notice shall be included in all
  14 +copies or substantial portions of the Software.
  15 +
  16 +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17 +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
  18 +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
  19 +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
  20 +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  21 +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  22 +*/
  23 +
  24 +#include <srs_core_rtmp.hpp>
  25 +
  26 +#include <srs_core_log.hpp>
  27 +#include <srs_core_error.hpp>
  28 +#include <srs_core_socket.hpp>
  29 +#include <srs_core_protocol.hpp>
  30 +#include <srs_core_autofree.hpp>
  31 +#include <srs_core_amf0.hpp>
  32 +#include <srs_core_handshake.hpp>
  33 +#include <srs_core_config.hpp>
  34 +
  35 +using namespace std;
  36 +
  37 +/**
  38 +* the signature for packets to client.
  39 +*/
  40 +#define RTMP_SIG_FMS_VER "3,5,3,888"
  41 +#define RTMP_SIG_AMF0_VER 0
  42 +#define RTMP_SIG_CLIENT_ID "ASAICiss"
  43 +
  44 +/**
  45 +* onStatus consts.
  46 +*/
  47 +#define StatusLevel "level"
  48 +#define StatusCode "code"
  49 +#define StatusDescription "description"
  50 +#define StatusDetails "details"
  51 +#define StatusClientId "clientid"
  52 +// status value
  53 +#define StatusLevelStatus "status"
  54 +// code value
  55 +#define StatusCodeConnectSuccess "NetConnection.Connect.Success"
  56 +#define StatusCodeStreamReset "NetStream.Play.Reset"
  57 +#define StatusCodeStreamStart "NetStream.Play.Start"
  58 +#define StatusCodeStreamPause "NetStream.Pause.Notify"
  59 +#define StatusCodeStreamUnpause "NetStream.Unpause.Notify"
  60 +#define StatusCodePublishStart "NetStream.Publish.Start"
  61 +#define StatusCodeDataStart "NetStream.Data.Start"
  62 +#define StatusCodeUnpublishSuccess "NetStream.Unpublish.Success"
  63 +
  64 +// FMLE
  65 +#define RTMP_AMF0_COMMAND_ON_FC_PUBLISH "onFCPublish"
  66 +#define RTMP_AMF0_COMMAND_ON_FC_UNPUBLISH "onFCUnpublish"
  67 +
  68 +// default stream id for response the createStream request.
  69 +#define SRS_DEFAULT_SID 1
  70 +
  71 +SrsRequest::SrsRequest()
  72 +{
  73 + objectEncoding = RTMP_SIG_AMF0_VER;
  74 +}
  75 +
  76 +SrsRequest::~SrsRequest()
  77 +{
  78 +}
  79 +
  80 +SrsRequest* SrsRequest::copy()
  81 +{
  82 + SrsRequest* cp = new SrsRequest();
  83 +
  84 + cp->app = app;
  85 + cp->objectEncoding = objectEncoding;
  86 + cp->pageUrl = pageUrl;
  87 + cp->port = port;
  88 + cp->schema = schema;
  89 + cp->stream = stream;
  90 + cp->swfUrl = swfUrl;
  91 + cp->tcUrl = tcUrl;
  92 + cp->vhost = vhost;
  93 +
  94 + return cp;
  95 +}
  96 +
  97 +int SrsRequest::discovery_app()
  98 +{
  99 + int ret = ERROR_SUCCESS;
  100 +
  101 + size_t pos = std::string::npos;
  102 + std::string url = tcUrl;
  103 +
  104 + if ((pos = url.find("://")) != std::string::npos) {
  105 + schema = url.substr(0, pos);
  106 + url = url.substr(schema.length() + 3);
  107 + srs_verbose("discovery schema=%s", schema.c_str());
  108 + }
  109 +
  110 + if ((pos = url.find("/")) != std::string::npos) {
  111 + vhost = url.substr(0, pos);
  112 + url = url.substr(vhost.length() + 1);
  113 + srs_verbose("discovery vhost=%s", vhost.c_str());
  114 + }
  115 +
  116 + port = RTMP_DEFAULT_PORTS;
  117 + if ((pos = vhost.find(":")) != std::string::npos) {
  118 + port = vhost.substr(pos + 1);
  119 + vhost = vhost.substr(0, pos);
  120 + srs_verbose("discovery vhost=%s, port=%s", vhost.c_str(), port.c_str());
  121 + }
  122 +
  123 + app = url;
  124 + srs_vhost_resolve(vhost, app);
  125 + strip();
  126 +
  127 + // resolve the vhost from config
  128 + SrsConfDirective* parsed_vhost = config->get_vhost(vhost);
  129 + if (parsed_vhost) {
  130 + vhost = parsed_vhost->arg0();
  131 + }
  132 +
  133 + // TODO: discovery the params of vhost.
  134 +
  135 + srs_info("discovery app success. schema=%s, vhost=%s, port=%s, app=%s",
  136 + schema.c_str(), vhost.c_str(), port.c_str(), app.c_str());
  137 +
  138 + if (schema.empty() || vhost.empty() || port.empty() || app.empty()) {
  139 + ret = ERROR_RTMP_REQ_TCURL;
  140 + srs_error("discovery tcUrl failed. "
  141 + "tcUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, ret=%d",
  142 + tcUrl.c_str(), schema.c_str(), vhost.c_str(), port.c_str(), app.c_str(), ret);
  143 + return ret;
  144 + }
  145 +
  146 + strip();
  147 +
  148 + return ret;
  149 +}
  150 +
  151 +string SrsRequest::get_stream_url()
  152 +{
  153 + std::string url = "";
  154 +
  155 + url += vhost;
  156 + url += "/";
  157 + url += app;
  158 + url += "/";
  159 + url += stream;
  160 +
  161 + return url;
  162 +}
  163 +
  164 +void SrsRequest::strip()
  165 +{
  166 + trim(vhost, "/ \n\r\t");
  167 + trim(app, "/ \n\r\t");
  168 + trim(stream, "/ \n\r\t");
  169 +}
  170 +
  171 +std::string& SrsRequest::trim(string& str, string chs)
  172 +{
  173 + for (int i = 0; i < (int)chs.length(); i++) {
  174 + char ch = chs.at(i);
  175 +
  176 + for (std::string::iterator it = str.begin(); it != str.end();) {
  177 + if (ch == *it) {
  178 + it = str.erase(it);
  179 + } else {
  180 + ++it;
  181 + }
  182 + }
  183 + }
  184 +
  185 + return str;
  186 +}
  187 +
  188 +SrsResponse::SrsResponse()
  189 +{
  190 + stream_id = SRS_DEFAULT_SID;
  191 +}
  192 +
  193 +SrsResponse::~SrsResponse()
  194 +{
  195 +}
  196 +
  197 +SrsRtmpClient::SrsRtmpClient(st_netfd_t _stfd)
  198 +{
  199 + stfd = _stfd;
  200 + protocol = new SrsProtocol(stfd);
  201 +}
  202 +
  203 +SrsRtmpClient::~SrsRtmpClient()
  204 +{
  205 + srs_freep(protocol);
  206 +}
  207 +
  208 +void SrsRtmpClient::set_recv_timeout(int64_t timeout_us)
  209 +{
  210 + protocol->set_recv_timeout(timeout_us);
  211 +}
  212 +
  213 +void SrsRtmpClient::set_send_timeout(int64_t timeout_us)
  214 +{
  215 + protocol->set_send_timeout(timeout_us);
  216 +}
  217 +
  218 +int64_t SrsRtmpClient::get_recv_bytes()
  219 +{
  220 + return protocol->get_recv_bytes();
  221 +}
  222 +
  223 +int64_t SrsRtmpClient::get_send_bytes()
  224 +{
  225 + return protocol->get_send_bytes();
  226 +}
  227 +
  228 +int SrsRtmpClient::get_recv_kbps()
  229 +{
  230 + return protocol->get_recv_kbps();
  231 +}
  232 +
  233 +int SrsRtmpClient::get_send_kbps()
  234 +{
  235 + return protocol->get_send_kbps();
  236 +}
  237 +
  238 +int SrsRtmpClient::recv_message(SrsCommonMessage** pmsg)
  239 +{
  240 + return protocol->recv_message(pmsg);
  241 +}
  242 +
  243 +int SrsRtmpClient::send_message(ISrsMessage* msg)
  244 +{
  245 + return protocol->send_message(msg);
  246 +}
  247 +
  248 +int SrsRtmpClient::handshake()
  249 +{
  250 + int ret = ERROR_SUCCESS;
  251 +
  252 + SrsSocket skt(stfd);
  253 +
  254 + skt.set_recv_timeout(protocol->get_recv_timeout());
  255 + skt.set_send_timeout(protocol->get_send_timeout());
  256 +
  257 + SrsComplexHandshake complex_hs;
  258 + SrsSimpleHandshake simple_hs;
  259 + if ((ret = simple_hs.handshake_with_server(skt, complex_hs)) != ERROR_SUCCESS) {
  260 + return ret;
  261 + }
  262 +
  263 + return ret;
  264 +}
  265 +
  266 +int SrsRtmpClient::connect_app(string app, string tc_url)
  267 +{
  268 + int ret = ERROR_SUCCESS;
  269 +
  270 + // Connect(vhost, app)
  271 + if (true) {
  272 + SrsCommonMessage* msg = new SrsCommonMessage();
  273 + SrsConnectAppPacket* pkt = new SrsConnectAppPacket();
  274 + msg->set_packet(pkt, 0);
  275 +
  276 + pkt->command_object = new SrsAmf0Object();
  277 + pkt->command_object->set("app", new SrsAmf0String(app.c_str()));
  278 + pkt->command_object->set("swfUrl", new SrsAmf0String());
  279 + pkt->command_object->set("tcUrl", new SrsAmf0String(tc_url.c_str()));
  280 + pkt->command_object->set("fpad", new SrsAmf0Boolean(false));
  281 + pkt->command_object->set("capabilities", new SrsAmf0Number(239));
  282 + pkt->command_object->set("audioCodecs", new SrsAmf0Number(3575));
  283 + pkt->command_object->set("videoCodecs", new SrsAmf0Number(252));
  284 + pkt->command_object->set("videoFunction", new SrsAmf0Number(1));
  285 + pkt->command_object->set("pageUrl", new SrsAmf0String());
  286 + pkt->command_object->set("objectEncoding", new SrsAmf0Number(0));
  287 +
  288 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  289 + return ret;
  290 + }
  291 + }
  292 +
  293 + // Set Window Acknowledgement size(2500000)
  294 + if (true) {
  295 + SrsCommonMessage* msg = new SrsCommonMessage();
  296 + SrsSetWindowAckSizePacket* pkt = new SrsSetWindowAckSizePacket();
  297 +
  298 + pkt->ackowledgement_window_size = 2500000;
  299 + msg->set_packet(pkt, 0);
  300 +
  301 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  302 + return ret;
  303 + }
  304 + }
  305 +
  306 + // expect connect _result
  307 + SrsCommonMessage* msg = NULL;
  308 + SrsConnectAppResPacket* pkt = NULL;
  309 + if ((ret = srs_rtmp_expect_message<SrsConnectAppResPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
  310 + srs_error("expect connect app response message failed. ret=%d", ret);
  311 + return ret;
  312 + }
  313 + SrsAutoFree(SrsCommonMessage, msg, false);
  314 + srs_info("get connect app response message");
  315 +
  316 + return ret;
  317 +}
  318 +
  319 +int SrsRtmpClient::create_stream(int& stream_id)
  320 +{
  321 + int ret = ERROR_SUCCESS;
  322 +
  323 + // CreateStream
  324 + if (true) {
  325 + SrsCommonMessage* msg = new SrsCommonMessage();
  326 + SrsCreateStreamPacket* pkt = new SrsCreateStreamPacket();
  327 +
  328 + msg->set_packet(pkt, 0);
  329 +
  330 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  331 + return ret;
  332 + }
  333 + }
  334 +
  335 + // CreateStream _result.
  336 + if (true) {
  337 + SrsCommonMessage* msg = NULL;
  338 + SrsCreateStreamResPacket* pkt = NULL;
  339 + if ((ret = srs_rtmp_expect_message<SrsCreateStreamResPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
  340 + srs_error("expect create stream response message failed. ret=%d", ret);
  341 + return ret;
  342 + }
  343 + SrsAutoFree(SrsCommonMessage, msg, false);
  344 + srs_info("get create stream response message");
  345 +
  346 + stream_id = (int)pkt->stream_id;
  347 + }
  348 +
  349 + return ret;
  350 +}
  351 +
  352 +int SrsRtmpClient::play(string stream, int stream_id)
  353 +{
  354 + int ret = ERROR_SUCCESS;
  355 +
  356 + // Play(stream)
  357 + if (true) {
  358 + SrsCommonMessage* msg = new SrsCommonMessage();
  359 + SrsPlayPacket* pkt = new SrsPlayPacket();
  360 +
  361 + pkt->stream_name = stream;
  362 + msg->set_packet(pkt, stream_id);
  363 +
  364 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  365 + srs_error("send play stream failed. "
  366 + "stream=%s, stream_id=%d, ret=%d",
  367 + stream.c_str(), stream_id, ret);
  368 + return ret;
  369 + }
  370 + }
  371 +
  372 + // SetBufferLength(1000ms)
  373 + int buffer_length_ms = 1000;
  374 + if (true) {
  375 + SrsCommonMessage* msg = new SrsCommonMessage();
  376 + SrsUserControlPacket* pkt = new SrsUserControlPacket();
  377 +
  378 + pkt->event_type = SrcPCUCSetBufferLength;
  379 + pkt->event_data = stream_id;
  380 + pkt->extra_data = buffer_length_ms;
  381 + msg->set_packet(pkt, 0);
  382 +
  383 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  384 + srs_error("send set buffer length failed. "
  385 + "stream=%s, stream_id=%d, bufferLength=%d, ret=%d",
  386 + stream.c_str(), stream_id, buffer_length_ms, ret);
  387 + return ret;
  388 + }
  389 + }
  390 +
  391 + return ret;
  392 +}
  393 +
  394 +int SrsRtmpClient::publish(string stream, int stream_id)
  395 +{
  396 + int ret = ERROR_SUCCESS;
  397 +
  398 + // publish(stream)
  399 + if (true) {
  400 + SrsCommonMessage* msg = new SrsCommonMessage();
  401 + SrsPublishPacket* pkt = new SrsPublishPacket();
  402 +
  403 + pkt->stream_name = stream;
  404 + msg->set_packet(pkt, stream_id);
  405 +
  406 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  407 + srs_error("send publish message failed. "
  408 + "stream=%s, stream_id=%d, ret=%d",
  409 + stream.c_str(), stream_id, ret);
  410 + return ret;
  411 + }
  412 + }
  413 +
  414 + return ret;
  415 +}
  416 +
  417 +SrsRtmp::SrsRtmp(st_netfd_t client_stfd)
  418 +{
  419 + protocol = new SrsProtocol(client_stfd);
  420 + stfd = client_stfd;
  421 +}
  422 +
  423 +SrsRtmp::~SrsRtmp()
  424 +{
  425 + srs_freep(protocol);
  426 +}
  427 +
  428 +SrsProtocol* SrsRtmp::get_protocol()
  429 +{
  430 + return protocol;
  431 +}
  432 +
  433 +void SrsRtmp::set_recv_timeout(int64_t timeout_us)
  434 +{
  435 + protocol->set_recv_timeout(timeout_us);
  436 +}
  437 +
  438 +int64_t SrsRtmp::get_recv_timeout()
  439 +{
  440 + return protocol->get_recv_timeout();
  441 +}
  442 +
  443 +void SrsRtmp::set_send_timeout(int64_t timeout_us)
  444 +{
  445 + protocol->set_send_timeout(timeout_us);
  446 +}
  447 +
  448 +int64_t SrsRtmp::get_send_timeout()
  449 +{
  450 + return protocol->get_send_timeout();
  451 +}
  452 +
  453 +int64_t SrsRtmp::get_recv_bytes()
  454 +{
  455 + return protocol->get_recv_bytes();
  456 +}
  457 +
  458 +int64_t SrsRtmp::get_send_bytes()
  459 +{
  460 + return protocol->get_send_bytes();
  461 +}
  462 +
  463 +int SrsRtmp::get_recv_kbps()
  464 +{
  465 + return protocol->get_recv_kbps();
  466 +}
  467 +
  468 +int SrsRtmp::get_send_kbps()
  469 +{
  470 + return protocol->get_send_kbps();
  471 +}
  472 +
  473 +int SrsRtmp::recv_message(SrsCommonMessage** pmsg)
  474 +{
  475 + return protocol->recv_message(pmsg);
  476 +}
  477 +
  478 +int SrsRtmp::send_message(ISrsMessage* msg)
  479 +{
  480 + return protocol->send_message(msg);
  481 +}
  482 +
  483 +int SrsRtmp::handshake()
  484 +{
  485 + int ret = ERROR_SUCCESS;
  486 +
  487 + SrsSocket skt(stfd);
  488 +
  489 + skt.set_recv_timeout(protocol->get_recv_timeout());
  490 + skt.set_send_timeout(protocol->get_send_timeout());
  491 +
  492 + SrsComplexHandshake complex_hs;
  493 + SrsSimpleHandshake simple_hs;
  494 + if ((ret = simple_hs.handshake_with_client(skt, complex_hs)) != ERROR_SUCCESS) {
  495 + return ret;
  496 + }
  497 +
  498 + return ret;
  499 +}
  500 +
  501 +int SrsRtmp::connect_app(SrsRequest* req)
  502 +{
  503 + int ret = ERROR_SUCCESS;
  504 +
  505 + SrsCommonMessage* msg = NULL;
  506 + SrsConnectAppPacket* pkt = NULL;
  507 + if ((ret = srs_rtmp_expect_message<SrsConnectAppPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
  508 + srs_error("expect connect app message failed. ret=%d", ret);
  509 + return ret;
  510 + }
  511 + SrsAutoFree(SrsCommonMessage, msg, false);
  512 + srs_info("get connect app message");
  513 +
  514 + SrsAmf0Any* prop = NULL;
  515 +
  516 + if ((prop = pkt->command_object->ensure_property_string("tcUrl")) == NULL) {
  517 + ret = ERROR_RTMP_REQ_CONNECT;
  518 + srs_error("invalid request, must specifies the tcUrl. ret=%d", ret);
  519 + return ret;
  520 + }
  521 + req->tcUrl = srs_amf0_convert<SrsAmf0String>(prop)->value;
  522 +
  523 + if ((prop = pkt->command_object->ensure_property_string("pageUrl")) != NULL) {
  524 + req->pageUrl = srs_amf0_convert<SrsAmf0String>(prop)->value;
  525 + }
  526 +
  527 + if ((prop = pkt->command_object->ensure_property_string("swfUrl")) != NULL) {
  528 + req->swfUrl = srs_amf0_convert<SrsAmf0String>(prop)->value;
  529 + }
  530 +
  531 + if ((prop = pkt->command_object->ensure_property_number("objectEncoding")) != NULL) {
  532 + req->objectEncoding = srs_amf0_convert<SrsAmf0Number>(prop)->value;
  533 + }
  534 +
  535 + srs_info("get connect app message params success.");
  536 +
  537 + return req->discovery_app();
  538 +}
  539 +
  540 +int SrsRtmp::set_window_ack_size(int ack_size)
  541 +{
  542 + int ret = ERROR_SUCCESS;
  543 +
  544 + SrsCommonMessage* msg = new SrsCommonMessage();
  545 + SrsSetWindowAckSizePacket* pkt = new SrsSetWindowAckSizePacket();
  546 +
  547 + pkt->ackowledgement_window_size = ack_size;
  548 + msg->set_packet(pkt, 0);
  549 +
  550 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  551 + srs_error("send ack size message failed. ret=%d", ret);
  552 + return ret;
  553 + }
  554 + srs_info("send ack size message success. ack_size=%d", ack_size);
  555 +
  556 + return ret;
  557 +}
  558 +
  559 +int SrsRtmp::set_peer_bandwidth(int bandwidth, int type)
  560 +{
  561 + int ret = ERROR_SUCCESS;
  562 +
  563 + SrsCommonMessage* msg = new SrsCommonMessage();
  564 + SrsSetPeerBandwidthPacket* pkt = new SrsSetPeerBandwidthPacket();
  565 +
  566 + pkt->bandwidth = bandwidth;
  567 + pkt->type = type;
  568 + msg->set_packet(pkt, 0);
  569 +
  570 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  571 + srs_error("send set bandwidth message failed. ret=%d", ret);
  572 + return ret;
  573 + }
  574 + srs_info("send set bandwidth message "
  575 + "success. bandwidth=%d, type=%d", bandwidth, type);
  576 +
  577 + return ret;
  578 +}
  579 +
  580 +int SrsRtmp::response_connect_app(SrsRequest* req)
  581 +{
  582 + int ret = ERROR_SUCCESS;
  583 +
  584 + SrsCommonMessage* msg = new SrsCommonMessage();
  585 + SrsConnectAppResPacket* pkt = new SrsConnectAppResPacket();
  586 +
  587 + pkt->props->set("fmsVer", new SrsAmf0String("FMS/"RTMP_SIG_FMS_VER));
  588 + pkt->props->set("capabilities", new SrsAmf0Number(127));
  589 + pkt->props->set("mode", new SrsAmf0Number(1));
  590 +
  591 + pkt->info->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));
  592 + pkt->info->set(StatusCode, new SrsAmf0String(StatusCodeConnectSuccess));
  593 + pkt->info->set(StatusDescription, new SrsAmf0String("Connection succeeded"));
  594 + pkt->info->set("objectEncoding", new SrsAmf0Number(req->objectEncoding));
  595 + SrsASrsAmf0EcmaArray* data = new SrsASrsAmf0EcmaArray();
  596 + pkt->info->set("data", data);
  597 +
  598 + data->set("srs_version", new SrsAmf0String(RTMP_SIG_FMS_VER));
  599 + data->set("srs_server", new SrsAmf0String(RTMP_SIG_SRS_KEY" "RTMP_SIG_SRS_VERSION" ("RTMP_SIG_SRS_URL_SHORT")"));
  600 + data->set("srs_license", new SrsAmf0String(RTMP_SIG_SRS_LICENSE));
  601 + data->set("srs_role", new SrsAmf0String(RTMP_SIG_SRS_ROLE));
  602 + data->set("srs_url", new SrsAmf0String(RTMP_SIG_SRS_URL));
  603 + data->set("srs_version", new SrsAmf0String(RTMP_SIG_SRS_VERSION));
  604 + data->set("srs_site", new SrsAmf0String(RTMP_SIG_SRS_WEB));
  605 + data->set("srs_email", new SrsAmf0String(RTMP_SIG_SRS_EMAIL));
  606 + data->set("srs_copyright", new SrsAmf0String(RTMP_SIG_SRS_COPYRIGHT));
  607 + data->set("srs_contributor", new SrsAmf0String(RTMP_SIG_SRS_CONTRIBUTOR));
  608 +
  609 + msg->set_packet(pkt, 0);
  610 +
  611 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  612 + srs_error("send connect app response message failed. ret=%d", ret);
  613 + return ret;
  614 + }
  615 + srs_info("send connect app response message success.");
  616 +
  617 + return ret;
  618 +}
  619 +
  620 +int SrsRtmp::on_bw_done()
  621 +{
  622 + int ret = ERROR_SUCCESS;
  623 +
  624 + SrsCommonMessage* msg = new SrsCommonMessage();
  625 + SrsOnBWDonePacket* pkt = new SrsOnBWDonePacket();
  626 +
  627 + msg->set_packet(pkt, 0);
  628 +
  629 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  630 + srs_error("send onBWDone message failed. ret=%d", ret);
  631 + return ret;
  632 + }
  633 + srs_info("send onBWDone message success.");
  634 +
  635 + return ret;
  636 +}
  637 +
  638 +int SrsRtmp::identify_client(int stream_id, SrsClientType& type, std::string& stream_name)
  639 +{
  640 + type = SrsClientUnknown;
  641 + int ret = ERROR_SUCCESS;
  642 +
  643 + while (true) {
  644 + SrsCommonMessage* msg = NULL;
  645 + if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {
  646 + srs_error("recv identify client message failed. ret=%d", ret);
  647 + return ret;
  648 + }
  649 +
  650 + SrsAutoFree(SrsCommonMessage, msg, false);
  651 +
  652 + if (!msg->header.is_amf0_command() && !msg->header.is_amf3_command()) {
  653 + srs_trace("identify ignore messages except "
  654 + "AMF0/AMF3 command message. type=%#x", msg->header.message_type);
  655 + continue;
  656 + }
  657 +
  658 + if ((ret = msg->decode_packet(protocol)) != ERROR_SUCCESS) {
  659 + srs_error("identify decode message failed. ret=%d", ret);
  660 + return ret;
  661 + }
  662 +
  663 + SrsPacket* pkt = msg->get_packet();
  664 + if (dynamic_cast<SrsCreateStreamPacket*>(pkt)) {
  665 + srs_info("identify client by create stream, play or flash publish.");
  666 + return identify_create_stream_client(
  667 + dynamic_cast<SrsCreateStreamPacket*>(pkt), stream_id, type, stream_name);
  668 + }
  669 + if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) {
  670 + srs_info("identify client by releaseStream, fmle publish.");
  671 + return identify_fmle_publish_client(
  672 + dynamic_cast<SrsFMLEStartPacket*>(pkt), type, stream_name);
  673 + }
  674 +
  675 + srs_trace("ignore AMF0/AMF3 command message.");
  676 + }
  677 +
  678 + return ret;
  679 +}
  680 +
  681 +int SrsRtmp::set_chunk_size(int chunk_size)
  682 +{
  683 + int ret = ERROR_SUCCESS;
  684 +
  685 + SrsCommonMessage* msg = new SrsCommonMessage();
  686 + SrsSetChunkSizePacket* pkt = new SrsSetChunkSizePacket();
  687 +
  688 + pkt->chunk_size = chunk_size;
  689 + msg->set_packet(pkt, 0);
  690 +
  691 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  692 + srs_error("send set chunk size message failed. ret=%d", ret);
  693 + return ret;
  694 + }
  695 + srs_info("send set chunk size message success. chunk_size=%d", chunk_size);
  696 +
  697 + return ret;
  698 +}
  699 +
  700 +int SrsRtmp::start_play(int stream_id)
  701 +{
  702 + int ret = ERROR_SUCCESS;
  703 +
  704 + // StreamBegin
  705 + if (true) {
  706 + SrsCommonMessage* msg = new SrsCommonMessage();
  707 + SrsUserControlPacket* pkt = new SrsUserControlPacket();
  708 +
  709 + pkt->event_type = SrcPCUCStreamBegin;
  710 + pkt->event_data = stream_id;
  711 + msg->set_packet(pkt, 0);
  712 +
  713 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  714 + srs_error("send PCUC(StreamBegin) message failed. ret=%d", ret);
  715 + return ret;
  716 + }
  717 + srs_info("send PCUC(StreamBegin) message success.");
  718 + }
  719 +
  720 + // onStatus(NetStream.Play.Reset)
  721 + if (true) {
  722 + SrsCommonMessage* msg = new SrsCommonMessage();
  723 + SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
  724 +
  725 + pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));
  726 + pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeStreamReset));
  727 + pkt->data->set(StatusDescription, new SrsAmf0String("Playing and resetting stream."));
  728 + pkt->data->set(StatusDetails, new SrsAmf0String("stream"));
  729 + pkt->data->set(StatusClientId, new SrsAmf0String(RTMP_SIG_CLIENT_ID));
  730 +
  731 + msg->set_packet(pkt, stream_id);
  732 +
  733 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  734 + srs_error("send onStatus(NetStream.Play.Reset) message failed. ret=%d", ret);
  735 + return ret;
  736 + }
  737 + srs_info("send onStatus(NetStream.Play.Reset) message success.");
  738 + }
  739 +
  740 + // onStatus(NetStream.Play.Start)
  741 + if (true) {
  742 + SrsCommonMessage* msg = new SrsCommonMessage();
  743 + SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
  744 +
  745 + pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));
  746 + pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeStreamStart));
  747 + pkt->data->set(StatusDescription, new SrsAmf0String("Started playing stream."));
  748 + pkt->data->set(StatusDetails, new SrsAmf0String("stream"));
  749 + pkt->data->set(StatusClientId, new SrsAmf0String(RTMP_SIG_CLIENT_ID));
  750 +
  751 + msg->set_packet(pkt, stream_id);
  752 +
  753 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  754 + srs_error("send onStatus(NetStream.Play.Reset) message failed. ret=%d", ret);
  755 + return ret;
  756 + }
  757 + srs_info("send onStatus(NetStream.Play.Reset) message success.");
  758 + }
  759 +
  760 + // |RtmpSampleAccess(false, false)
  761 + if (true) {
  762 + SrsCommonMessage* msg = new SrsCommonMessage();
  763 + SrsSampleAccessPacket* pkt = new SrsSampleAccessPacket();
  764 +
  765 + msg->set_packet(pkt, stream_id);
  766 +
  767 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  768 + srs_error("send |RtmpSampleAccess(false, false) message failed. ret=%d", ret);
  769 + return ret;
  770 + }
  771 + srs_info("send |RtmpSampleAccess(false, false) message success.");
  772 + }
  773 +
  774 + // onStatus(NetStream.Data.Start)
  775 + if (true) {
  776 + SrsCommonMessage* msg = new SrsCommonMessage();
  777 + SrsOnStatusDataPacket* pkt = new SrsOnStatusDataPacket();
  778 +
  779 + pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeDataStart));
  780 +
  781 + msg->set_packet(pkt, stream_id);
  782 +
  783 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  784 + srs_error("send onStatus(NetStream.Data.Start) message failed. ret=%d", ret);
  785 + return ret;
  786 + }
  787 + srs_info("send onStatus(NetStream.Data.Start) message success.");
  788 + }
  789 +
  790 + srs_info("start play success.");
  791 +
  792 + return ret;
  793 +}
  794 +
  795 +int SrsRtmp::on_play_client_pause(int stream_id, bool is_pause)
  796 +{
  797 + int ret = ERROR_SUCCESS;
  798 +
  799 + if (is_pause) {
  800 + // onStatus(NetStream.Pause.Notify)
  801 + if (true) {
  802 + SrsCommonMessage* msg = new SrsCommonMessage();
  803 + SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
  804 +
  805 + pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));
  806 + pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeStreamPause));
  807 + pkt->data->set(StatusDescription, new SrsAmf0String("Paused stream."));
  808 +
  809 + msg->set_packet(pkt, stream_id);
  810 +
  811 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  812 + srs_error("send onStatus(NetStream.Pause.Notify) message failed. ret=%d", ret);
  813 + return ret;
  814 + }
  815 + srs_info("send onStatus(NetStream.Pause.Notify) message success.");
  816 + }
  817 + // StreamEOF
  818 + if (true) {
  819 + SrsCommonMessage* msg = new SrsCommonMessage();
  820 + SrsUserControlPacket* pkt = new SrsUserControlPacket();
  821 +
  822 + pkt->event_type = SrcPCUCStreamEOF;
  823 + pkt->event_data = stream_id;
  824 + msg->set_packet(pkt, 0);
  825 +
  826 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  827 + srs_error("send PCUC(StreamEOF) message failed. ret=%d", ret);
  828 + return ret;
  829 + }
  830 + srs_info("send PCUC(StreamEOF) message success.");
  831 + }
  832 + } else {
  833 + // onStatus(NetStream.Unpause.Notify)
  834 + if (true) {
  835 + SrsCommonMessage* msg = new SrsCommonMessage();
  836 + SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
  837 +
  838 + pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));
  839 + pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeStreamUnpause));
  840 + pkt->data->set(StatusDescription, new SrsAmf0String("Unpaused stream."));
  841 +
  842 + msg->set_packet(pkt, stream_id);
  843 +
  844 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  845 + srs_error("send onStatus(NetStream.Unpause.Notify) message failed. ret=%d", ret);
  846 + return ret;
  847 + }
  848 + srs_info("send onStatus(NetStream.Unpause.Notify) message success.");
  849 + }
  850 + // StreanBegin
  851 + if (true) {
  852 + SrsCommonMessage* msg = new SrsCommonMessage();
  853 + SrsUserControlPacket* pkt = new SrsUserControlPacket();
  854 +
  855 + pkt->event_type = SrcPCUCStreamBegin;
  856 + pkt->event_data = stream_id;
  857 + msg->set_packet(pkt, 0);
  858 +
  859 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  860 + srs_error("send PCUC(StreanBegin) message failed. ret=%d", ret);
  861 + return ret;
  862 + }
  863 + srs_info("send PCUC(StreanBegin) message success.");
  864 + }
  865 + }
  866 +
  867 + return ret;
  868 +}
  869 +
  870 +int SrsRtmp::start_fmle_publish(int stream_id)
  871 +{
  872 + int ret = ERROR_SUCCESS;
  873 +
  874 + // FCPublish
  875 + double fc_publish_tid = 0;
  876 + if (true) {
  877 + SrsCommonMessage* msg = NULL;
  878 + SrsFMLEStartPacket* pkt = NULL;
  879 + if ((ret = srs_rtmp_expect_message<SrsFMLEStartPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
  880 + srs_error("recv FCPublish message failed. ret=%d", ret);
  881 + return ret;
  882 + }
  883 + srs_info("recv FCPublish request message success.");
  884 +
  885 + SrsAutoFree(SrsCommonMessage, msg, false);
  886 + fc_publish_tid = pkt->transaction_id;
  887 + }
  888 + // FCPublish response
  889 + if (true) {
  890 + SrsCommonMessage* msg = new SrsCommonMessage();
  891 + SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(fc_publish_tid);
  892 +
  893 + msg->set_packet(pkt, 0);
  894 +
  895 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  896 + srs_error("send FCPublish response message failed. ret=%d", ret);
  897 + return ret;
  898 + }
  899 + srs_info("send FCPublish response message success.");
  900 + }
  901 +
  902 + // createStream
  903 + double create_stream_tid = 0;
  904 + if (true) {
  905 + SrsCommonMessage* msg = NULL;
  906 + SrsCreateStreamPacket* pkt = NULL;
  907 + if ((ret = srs_rtmp_expect_message<SrsCreateStreamPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
  908 + srs_error("recv createStream message failed. ret=%d", ret);
  909 + return ret;
  910 + }
  911 + srs_info("recv createStream request message success.");
  912 +
  913 + SrsAutoFree(SrsCommonMessage, msg, false);
  914 + create_stream_tid = pkt->transaction_id;
  915 + }
  916 + // createStream response
  917 + if (true) {
  918 + SrsCommonMessage* msg = new SrsCommonMessage();
  919 + SrsCreateStreamResPacket* pkt = new SrsCreateStreamResPacket(create_stream_tid, stream_id);
  920 +
  921 + msg->set_packet(pkt, 0);
  922 +
  923 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  924 + srs_error("send createStream response message failed. ret=%d", ret);
  925 + return ret;
  926 + }
  927 + srs_info("send createStream response message success.");
  928 + }
  929 +
  930 + // publish
  931 + if (true) {
  932 + SrsCommonMessage* msg = NULL;
  933 + SrsPublishPacket* pkt = NULL;
  934 + if ((ret = srs_rtmp_expect_message<SrsPublishPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
  935 + srs_error("recv publish message failed. ret=%d", ret);
  936 + return ret;
  937 + }
  938 + srs_info("recv publish request message success.");
  939 +
  940 + SrsAutoFree(SrsCommonMessage, msg, false);
  941 + }
  942 + // publish response onFCPublish(NetStream.Publish.Start)
  943 + if (true) {
  944 + SrsCommonMessage* msg = new SrsCommonMessage();
  945 + SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
  946 +
  947 + pkt->command_name = RTMP_AMF0_COMMAND_ON_FC_PUBLISH;
  948 + pkt->data->set(StatusCode, new SrsAmf0String(StatusCodePublishStart));
  949 + pkt->data->set(StatusDescription, new SrsAmf0String("Started publishing stream."));
  950 +
  951 + msg->set_packet(pkt, stream_id);
  952 +
  953 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  954 + srs_error("send onFCPublish(NetStream.Publish.Start) message failed. ret=%d", ret);
  955 + return ret;
  956 + }
  957 + srs_info("send onFCPublish(NetStream.Publish.Start) message success.");
  958 + }
  959 + // publish response onStatus(NetStream.Publish.Start)
  960 + if (true) {
  961 + SrsCommonMessage* msg = new SrsCommonMessage();
  962 + SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
  963 +
  964 + pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));
  965 + pkt->data->set(StatusCode, new SrsAmf0String(StatusCodePublishStart));
  966 + pkt->data->set(StatusDescription, new SrsAmf0String("Started publishing stream."));
  967 + pkt->data->set(StatusClientId, new SrsAmf0String(RTMP_SIG_CLIENT_ID));
  968 +
  969 + msg->set_packet(pkt, stream_id);
  970 +
  971 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  972 + srs_error("send onStatus(NetStream.Publish.Start) message failed. ret=%d", ret);
  973 + return ret;
  974 + }
  975 + srs_info("send onStatus(NetStream.Publish.Start) message success.");
  976 + }
  977 +
  978 + srs_info("FMLE publish success.");
  979 +
  980 + return ret;
  981 +}
  982 +
  983 +int SrsRtmp::fmle_unpublish(int stream_id, double unpublish_tid)
  984 +{
  985 + int ret = ERROR_SUCCESS;
  986 +
  987 + // publish response onFCUnpublish(NetStream.unpublish.Success)
  988 + if (true) {
  989 + SrsCommonMessage* msg = new SrsCommonMessage();
  990 + SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
  991 +
  992 + pkt->command_name = RTMP_AMF0_COMMAND_ON_FC_UNPUBLISH;
  993 + pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeUnpublishSuccess));
  994 + pkt->data->set(StatusDescription, new SrsAmf0String("Stop publishing stream."));
  995 +
  996 + msg->set_packet(pkt, stream_id);
  997 +
  998 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  999 + srs_error("send onFCUnpublish(NetStream.unpublish.Success) message failed. ret=%d", ret);
  1000 + return ret;
  1001 + }
  1002 + srs_info("send onFCUnpublish(NetStream.unpublish.Success) message success.");
  1003 + }
  1004 + // FCUnpublish response
  1005 + if (true) {
  1006 + SrsCommonMessage* msg = new SrsCommonMessage();
  1007 + SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(unpublish_tid);
  1008 +
  1009 + msg->set_packet(pkt, stream_id);
  1010 +
  1011 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  1012 + srs_error("send FCUnpublish response message failed. ret=%d", ret);
  1013 + return ret;
  1014 + }
  1015 + srs_info("send FCUnpublish response message success.");
  1016 + }
  1017 + // publish response onStatus(NetStream.Unpublish.Success)
  1018 + if (true) {
  1019 + SrsCommonMessage* msg = new SrsCommonMessage();
  1020 + SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
  1021 +
  1022 + pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));
  1023 + pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeUnpublishSuccess));
  1024 + pkt->data->set(StatusDescription, new SrsAmf0String("Stream is now unpublished"));
  1025 + pkt->data->set(StatusClientId, new SrsAmf0String(RTMP_SIG_CLIENT_ID));
  1026 +
  1027 + msg->set_packet(pkt, stream_id);
  1028 +
  1029 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  1030 + srs_error("send onStatus(NetStream.Unpublish.Success) message failed. ret=%d", ret);
  1031 + return ret;
  1032 + }
  1033 + srs_info("send onStatus(NetStream.Unpublish.Success) message success.");
  1034 + }
  1035 +
  1036 + srs_info("FMLE unpublish success.");
  1037 +
  1038 + return ret;
  1039 +}
  1040 +
  1041 +int SrsRtmp::start_flash_publish(int stream_id)
  1042 +{
  1043 + int ret = ERROR_SUCCESS;
  1044 +
  1045 + // publish response onStatus(NetStream.Publish.Start)
  1046 + if (true) {
  1047 + SrsCommonMessage* msg = new SrsCommonMessage();
  1048 + SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
  1049 +
  1050 + pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));
  1051 + pkt->data->set(StatusCode, new SrsAmf0String(StatusCodePublishStart));
  1052 + pkt->data->set(StatusDescription, new SrsAmf0String("Started publishing stream."));
  1053 + pkt->data->set(StatusClientId, new SrsAmf0String(RTMP_SIG_CLIENT_ID));
  1054 +
  1055 + msg->set_packet(pkt, stream_id);
  1056 +
  1057 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  1058 + srs_error("send onStatus(NetStream.Publish.Start) message failed. ret=%d", ret);
  1059 + return ret;
  1060 + }
  1061 + srs_info("send onStatus(NetStream.Publish.Start) message success.");
  1062 + }
  1063 +
  1064 + srs_info("flash publish success.");
  1065 +
  1066 + return ret;
  1067 +}
  1068 +
  1069 +int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, string& stream_name)
  1070 +{
  1071 + int ret = ERROR_SUCCESS;
  1072 +
  1073 + if (true) {
  1074 + SrsCommonMessage* msg = new SrsCommonMessage();
  1075 + SrsCreateStreamResPacket* pkt = new SrsCreateStreamResPacket(req->transaction_id, stream_id);
  1076 +
  1077 + msg->set_packet(pkt, 0);
  1078 +
  1079 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  1080 + srs_error("send createStream response message failed. ret=%d", ret);
  1081 + return ret;
  1082 + }
  1083 + srs_info("send createStream response message success.");
  1084 + }
  1085 +
  1086 + while (true) {
  1087 + SrsCommonMessage* msg = NULL;
  1088 + if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {
  1089 + srs_error("recv identify client message failed. ret=%d", ret);
  1090 + return ret;
  1091 + }
  1092 +
  1093 + SrsAutoFree(SrsCommonMessage, msg, false);
  1094 +
  1095 + if (!msg->header.is_amf0_command() && !msg->header.is_amf3_command()) {
  1096 + srs_trace("identify ignore messages except "
  1097 + "AMF0/AMF3 command message. type=%#x", msg->header.message_type);
  1098 + continue;
  1099 + }
  1100 +
  1101 + if ((ret = msg->decode_packet(protocol)) != ERROR_SUCCESS) {
  1102 + srs_error("identify decode message failed. ret=%d", ret);
  1103 + return ret;
  1104 + }
  1105 +
  1106 + SrsPacket* pkt = msg->get_packet();
  1107 + if (dynamic_cast<SrsPlayPacket*>(pkt)) {
  1108 + SrsPlayPacket* play = dynamic_cast<SrsPlayPacket*>(pkt);
  1109 + type = SrsClientPlay;
  1110 + stream_name = play->stream_name;
  1111 + srs_trace("identity client type=play, stream_name=%s", stream_name.c_str());
  1112 + return ret;
  1113 + }
  1114 + if (dynamic_cast<SrsPublishPacket*>(pkt)) {
  1115 + srs_info("identify client by publish, falsh publish.");
  1116 + return identify_flash_publish_client(
  1117 + dynamic_cast<SrsPublishPacket*>(pkt), type, stream_name);
  1118 + }
  1119 +
  1120 + srs_trace("ignore AMF0/AMF3 command message.");
  1121 + }
  1122 +
  1123 + return ret;
  1124 +}
  1125 +
  1126 +int SrsRtmp::identify_fmle_publish_client(SrsFMLEStartPacket* req, SrsClientType& type, string& stream_name)
  1127 +{
  1128 + int ret = ERROR_SUCCESS;
  1129 +
  1130 + type = SrsClientFMLEPublish;
  1131 + stream_name = req->stream_name;
  1132 +
  1133 + // releaseStream response
  1134 + if (true) {
  1135 + SrsCommonMessage* msg = new SrsCommonMessage();
  1136 + SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(req->transaction_id);
  1137 +
  1138 + msg->set_packet(pkt, 0);
  1139 +
  1140 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  1141 + srs_error("send releaseStream response message failed. ret=%d", ret);
  1142 + return ret;
  1143 + }
  1144 + srs_info("send releaseStream response message success.");
  1145 + }
  1146 +
  1147 + return ret;
  1148 +}
  1149 +
  1150 +int SrsRtmp::identify_flash_publish_client(SrsPublishPacket* req, SrsClientType& type, string& stream_name)
  1151 +{
  1152 + int ret = ERROR_SUCCESS;
  1153 +
  1154 + type = SrsClientFlashPublish;
  1155 + stream_name = req->stream_name;
  1156 +
  1157 + return ret;
  1158 +}
  1159 +