winlin

support createStream and play, identity the play client

@@ -84,6 +84,11 @@ bool SrsAmf0Any::is_number() @@ -84,6 +84,11 @@ bool SrsAmf0Any::is_number()
84 return marker == RTMP_AMF0_Number; 84 return marker == RTMP_AMF0_Number;
85 } 85 }
86 86
  87 +bool SrsAmf0Any::is_null()
  88 +{
  89 + return marker == RTMP_AMF0_Null;
  90 +}
  91 +
87 bool SrsAmf0Any::is_object() 92 bool SrsAmf0Any::is_object()
88 { 93 {
89 return marker == RTMP_AMF0_Object; 94 return marker == RTMP_AMF0_Object;
@@ -131,6 +136,15 @@ SrsAmf0Number::~SrsAmf0Number() @@ -131,6 +136,15 @@ SrsAmf0Number::~SrsAmf0Number()
131 { 136 {
132 } 137 }
133 138
  139 +SrsAmf0Null::SrsAmf0Null()
  140 +{
  141 + marker = RTMP_AMF0_Null;
  142 +}
  143 +
  144 +SrsAmf0Null::~SrsAmf0Null()
  145 +{
  146 +}
  147 +
134 SrsAmf0ObjectEOF::SrsAmf0ObjectEOF() 148 SrsAmf0ObjectEOF::SrsAmf0ObjectEOF()
135 { 149 {
136 marker = RTMP_AMF0_ObjectEnd; 150 marker = RTMP_AMF0_ObjectEnd;
@@ -470,6 +484,45 @@ int srs_amf0_write_number(SrsStream* stream, double value) @@ -470,6 +484,45 @@ int srs_amf0_write_number(SrsStream* stream, double value)
470 return ret; 484 return ret;
471 } 485 }
472 486
  487 +int srs_amf0_read_null(SrsStream* stream)
  488 +{
  489 + int ret = ERROR_SUCCESS;
  490 +
  491 + // marker
  492 + if (!stream->require(1)) {
  493 + ret = ERROR_RTMP_AMF0_DECODE;
  494 + srs_error("amf0 read null marker failed. ret=%d", ret);
  495 + return ret;
  496 + }
  497 +
  498 + char marker = stream->read_1bytes();
  499 + if (marker != RTMP_AMF0_Null) {
  500 + ret = ERROR_RTMP_AMF0_DECODE;
  501 + srs_error("amf0 check null marker failed. "
  502 + "marker=%#x, required=%#x, ret=%d", marker, RTMP_AMF0_Null, ret);
  503 + return ret;
  504 + }
  505 + srs_verbose("amf0 read null success");
  506 +
  507 + return ret;
  508 +}
  509 +int srs_amf0_write_null(SrsStream* stream)
  510 +{
  511 + int ret = ERROR_SUCCESS;
  512 +
  513 + // marker
  514 + if (!stream->require(1)) {
  515 + ret = ERROR_RTMP_AMF0_ENCODE;
  516 + srs_error("amf0 write null marker failed. ret=%d", ret);
  517 + return ret;
  518 + }
  519 +
  520 + stream->write_1bytes(RTMP_AMF0_Null);
  521 + srs_verbose("amf0 write null marker success");
  522 +
  523 + return ret;
  524 +}
  525 +
473 int srs_amf0_read_any(SrsStream* stream, SrsAmf0Any*& value) 526 int srs_amf0_read_any(SrsStream* stream, SrsAmf0Any*& value)
474 { 527 {
475 int ret = ERROR_SUCCESS; 528 int ret = ERROR_SUCCESS;
@@ -515,6 +568,10 @@ int srs_amf0_read_any(SrsStream* stream, SrsAmf0Any*& value) @@ -515,6 +568,10 @@ int srs_amf0_read_any(SrsStream* stream, SrsAmf0Any*& value)
515 srs_amf0_convert<SrsAmf0Number>(value)->value = data; 568 srs_amf0_convert<SrsAmf0Number>(value)->value = data;
516 return ret; 569 return ret;
517 } 570 }
  571 + case RTMP_AMF0_Null: {
  572 + value = new SrsAmf0Null();
  573 + return ret;
  574 + }
518 case RTMP_AMF0_ObjectEnd: { 575 case RTMP_AMF0_ObjectEnd: {
519 SrsAmf0ObjectEOF* p = NULL; 576 SrsAmf0ObjectEOF* p = NULL;
520 if ((ret = srs_amf0_read_object_eof(stream, p)) != ERROR_SUCCESS) { 577 if ((ret = srs_amf0_read_object_eof(stream, p)) != ERROR_SUCCESS) {
@@ -568,6 +625,9 @@ int srs_amf0_write_any(SrsStream* stream, SrsAmf0Any* value) @@ -568,6 +625,9 @@ int srs_amf0_write_any(SrsStream* stream, SrsAmf0Any* value)
568 double data = srs_amf0_convert<SrsAmf0Number>(value)->value; 625 double data = srs_amf0_convert<SrsAmf0Number>(value)->value;
569 return srs_amf0_write_number(stream, data); 626 return srs_amf0_write_number(stream, data);
570 } 627 }
  628 + case RTMP_AMF0_Null: {
  629 + return srs_amf0_write_null(stream);
  630 + }
571 case RTMP_AMF0_ObjectEnd: { 631 case RTMP_AMF0_ObjectEnd: {
572 SrsAmf0ObjectEOF* p = srs_amf0_convert<SrsAmf0ObjectEOF>(value); 632 SrsAmf0ObjectEOF* p = srs_amf0_convert<SrsAmf0ObjectEOF>(value);
573 return srs_amf0_write_object_eof(stream, p); 633 return srs_amf0_write_object_eof(stream, p);
@@ -590,7 +650,6 @@ int srs_amf0_write_any(SrsStream* stream, SrsAmf0Any* value) @@ -590,7 +650,6 @@ int srs_amf0_write_any(SrsStream* stream, SrsAmf0Any* value)
590 650
591 return ret; 651 return ret;
592 } 652 }
593 -  
594 int srs_amf0_get_any_size(SrsAmf0Any* value) 653 int srs_amf0_get_any_size(SrsAmf0Any* value)
595 { 654 {
596 if (!value) { 655 if (!value) {
@@ -613,6 +672,10 @@ int srs_amf0_get_any_size(SrsAmf0Any* value) @@ -613,6 +672,10 @@ int srs_amf0_get_any_size(SrsAmf0Any* value)
613 size += srs_amf0_get_number_size(); 672 size += srs_amf0_get_number_size();
614 break; 673 break;
615 } 674 }
  675 + case RTMP_AMF0_Null: {
  676 + size += srs_amf0_get_null_size();
  677 + break;
  678 + }
616 case RTMP_AMF0_ObjectEnd: { 679 case RTMP_AMF0_ObjectEnd: {
617 size += srs_amf0_get_object_eof_size(); 680 size += srs_amf0_get_object_eof_size();
618 break; 681 break;
@@ -941,6 +1004,11 @@ int srs_amf0_get_number_size() @@ -941,6 +1004,11 @@ int srs_amf0_get_number_size()
941 return 1 + 8; 1004 return 1 + 8;
942 } 1005 }
943 1006
  1007 +int srs_amf0_get_null_size()
  1008 +{
  1009 + return 1;
  1010 +}
  1011 +
944 int srs_amf0_get_boolean_size() 1012 int srs_amf0_get_boolean_size()
945 { 1013 {
946 return 1 + 1; 1014 return 1 + 1;
@@ -54,6 +54,7 @@ struct SrsAmf0Any @@ -54,6 +54,7 @@ struct SrsAmf0Any
54 virtual bool is_string(); 54 virtual bool is_string();
55 virtual bool is_boolean(); 55 virtual bool is_boolean();
56 virtual bool is_number(); 56 virtual bool is_number();
  57 + virtual bool is_null();
57 virtual bool is_object(); 58 virtual bool is_object();
58 virtual bool is_object_eof(); 59 virtual bool is_object_eof();
59 virtual bool is_ecma_array(); 60 virtual bool is_ecma_array();
@@ -103,6 +104,17 @@ struct SrsAmf0Number : public SrsAmf0Any @@ -103,6 +104,17 @@ struct SrsAmf0Number : public SrsAmf0Any
103 }; 104 };
104 105
105 /** 106 /**
  107 +* read amf0 null from stream.
  108 +* 2.7 null Type
  109 +* null-type = null-marker
  110 +*/
  111 +struct SrsAmf0Null : public SrsAmf0Any
  112 +{
  113 + SrsAmf0Null();
  114 + virtual ~SrsAmf0Null();
  115 +};
  116 +
  117 +/**
106 * 2.11 Object End Type 118 * 2.11 Object End Type
107 * object-end-type = UTF-8-empty object-end-marker 119 * object-end-type = UTF-8-empty object-end-marker
108 * 0x00 0x00 0x09 120 * 0x00 0x00 0x09
@@ -188,6 +200,14 @@ extern int srs_amf0_read_number(SrsStream* stream, double& value); @@ -188,6 +200,14 @@ extern int srs_amf0_read_number(SrsStream* stream, double& value);
188 extern int srs_amf0_write_number(SrsStream* stream, double value); 200 extern int srs_amf0_write_number(SrsStream* stream, double value);
189 201
190 /** 202 /**
  203 +* read amf0 null from stream.
  204 +* 2.7 null Type
  205 +* null-type = null-marker
  206 +*/
  207 +extern int srs_amf0_read_null(SrsStream* stream);
  208 +extern int srs_amf0_write_null(SrsStream* stream);
  209 +
  210 +/**
191 * read amf0 object from stream. 211 * read amf0 object from stream.
192 * 2.5 Object Type 212 * 2.5 Object Type
193 * anonymous-object-type = object-marker *(object-property) 213 * anonymous-object-type = object-marker *(object-property)
@@ -212,6 +232,7 @@ extern int srs_amf0_write_ecma_array(SrsStream* stream, SrsASrsAmf0EcmaArray* va @@ -212,6 +232,7 @@ extern int srs_amf0_write_ecma_array(SrsStream* stream, SrsASrsAmf0EcmaArray* va
212 extern int srs_amf0_get_utf8_size(std::string value); 232 extern int srs_amf0_get_utf8_size(std::string value);
213 extern int srs_amf0_get_string_size(std::string value); 233 extern int srs_amf0_get_string_size(std::string value);
214 extern int srs_amf0_get_number_size(); 234 extern int srs_amf0_get_number_size();
  235 +extern int srs_amf0_get_null_size();
215 extern int srs_amf0_get_boolean_size(); 236 extern int srs_amf0_get_boolean_size();
216 extern int srs_amf0_get_object_size(SrsAmf0Object* obj); 237 extern int srs_amf0_get_object_size(SrsAmf0Object* obj);
217 extern int srs_amf0_get_ecma_array_size(SrsASrsAmf0EcmaArray* arr); 238 extern int srs_amf0_get_ecma_array_size(SrsASrsAmf0EcmaArray* arr);
@@ -29,6 +29,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -29,6 +29,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 #include <srs_core_log.hpp> 29 #include <srs_core_log.hpp>
30 #include <srs_core_rtmp.hpp> 30 #include <srs_core_rtmp.hpp>
31 31
  32 +// default stream id for response the createStream request.
  33 +#define SRS_DEFAULT_SID 1
  34 +
32 SrsClient::SrsClient(SrsServer* srs_server, st_netfd_t client_stfd) 35 SrsClient::SrsClient(SrsServer* srs_server, st_netfd_t client_stfd)
33 : SrsConnection(srs_server, client_stfd) 36 : SrsConnection(srs_server, client_stfd)
34 { 37 {
@@ -98,6 +101,20 @@ int SrsClient::do_cycle() @@ -98,6 +101,20 @@ int SrsClient::do_cycle()
98 return ret; 101 return ret;
99 } 102 }
100 srs_verbose("response connect app success"); 103 srs_verbose("response connect app success");
  104 +
  105 + if ((ret = rtmp->on_bw_done()) != ERROR_SUCCESS) {
  106 + srs_error("on_bw_done failed. ret=%d", ret);
  107 + return ret;
  108 + }
  109 + srs_verbose("on_bw_done success");
  110 +
  111 + SrsClientType type;
  112 + std::string stream_name;
  113 + if ((ret = rtmp->identify_client(SRS_DEFAULT_SID, type, stream_name)) != ERROR_SUCCESS) {
  114 + srs_error("identify client failed. ret=%d", ret);
  115 + return ret;
  116 + }
  117 + srs_verbose("identify client success. type=%d", type);
101 118
102 return ret; 119 return ret;
103 } 120 }
@@ -202,9 +202,13 @@ messages. @@ -202,9 +202,13 @@ messages.
202 ***************************************************************************** 202 *****************************************************************************
203 ****************************************************************************/ 203 ****************************************************************************/
204 /** 204 /**
205 -* amf0 command message, command name: "connect" 205 +* amf0 command message, command name macros
206 */ 206 */
207 #define RTMP_AMF0_COMMAND_CONNECT "connect" 207 #define RTMP_AMF0_COMMAND_CONNECT "connect"
  208 +#define RTMP_AMF0_COMMAND_CREATE_STREAM "createStream"
  209 +#define RTMP_AMF0_COMMAND_PLAY "play"
  210 +#define RTMP_AMF0_COMMAND_ON_BW_DONE "onBWDone"
  211 +#define RTMP_AMF0_COMMAND_RESULT "_result"
208 212
209 /**************************************************************************** 213 /****************************************************************************
210 ***************************************************************************** 214 *****************************************************************************
@@ -585,11 +589,12 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz @@ -585,11 +589,12 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz
585 int ret = ERROR_SUCCESS; 589 int ret = ERROR_SUCCESS;
586 590
587 // when not exists cached msg, means get an new message, 591 // when not exists cached msg, means get an new message,
588 - // the fmt must be type0 which means new message.  
589 - if (!chunk->msg && fmt != RTMP_FMT_TYPE0) { 592 + // the fmt must be type0/type1 which means new message.
  593 + if (!chunk->msg && fmt != RTMP_FMT_TYPE0 && fmt != RTMP_FMT_TYPE1) {
590 ret = ERROR_RTMP_CHUNK_START; 594 ret = ERROR_RTMP_CHUNK_START;
591 srs_error("chunk stream start, " 595 srs_error("chunk stream start, "
592 - "fmt must be %d, actual is %d. ret=%d", RTMP_FMT_TYPE0, fmt, ret); 596 + "fmt must be %d or %d, actual is %d. ret=%d",
  597 + RTMP_FMT_TYPE0, RTMP_FMT_TYPE1, fmt, ret);
593 return ret; 598 return ret;
594 } 599 }
595 600
@@ -604,7 +609,7 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz @@ -604,7 +609,7 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz
604 609
605 // create msg when new chunk stream start 610 // create msg when new chunk stream start
606 if (!chunk->msg) { 611 if (!chunk->msg) {
607 - srs_assert(fmt == RTMP_FMT_TYPE0); 612 + srs_assert(fmt == RTMP_FMT_TYPE0 || fmt == RTMP_FMT_TYPE1);
608 chunk->msg = new SrsMessage(); 613 chunk->msg = new SrsMessage();
609 srs_verbose("create message for new chunk, fmt=%d, cid=%d", fmt, chunk->cid); 614 srs_verbose("create message for new chunk, fmt=%d, cid=%d", fmt, chunk->cid);
610 } 615 }
@@ -802,6 +807,21 @@ SrsMessageHeader::~SrsMessageHeader() @@ -802,6 +807,21 @@ SrsMessageHeader::~SrsMessageHeader()
802 { 807 {
803 } 808 }
804 809
  810 +bool SrsMessageHeader::is_amf0_command()
  811 +{
  812 + return message_type == RTMP_MSG_AMF0CommandMessage;
  813 +}
  814 +
  815 +bool SrsMessageHeader::is_amf3_command()
  816 +{
  817 + return message_type == RTMP_MSG_AMF3CommandMessage;
  818 +}
  819 +
  820 +bool SrsMessageHeader::is_window_ackledgement_size()
  821 +{
  822 + return message_type == RTMP_MSG_WindowAcknowledgementSize;
  823 +}
  824 +
805 SrsChunkStream::SrsChunkStream(int _cid) 825 SrsChunkStream::SrsChunkStream(int _cid)
806 { 826 {
807 fmt = 0; 827 fmt = 0;
@@ -870,30 +890,50 @@ int SrsMessage::decode_packet() @@ -870,30 +890,50 @@ int SrsMessage::decode_packet()
870 srs_verbose("decode stream initialized success"); 890 srs_verbose("decode stream initialized success");
871 891
872 // decode specified packet type 892 // decode specified packet type
873 - if (header.message_type == RTMP_MSG_AMF0CommandMessage) {  
874 - srs_verbose("start to decode AMF0 command message."); 893 + if (header.is_amf0_command() || header.is_amf3_command()) {
  894 + srs_verbose("start to decode AMF0/AMF3 command message.");
  895 +
  896 + // skip 1bytes to decode the amf3 command.
  897 + if (header.is_amf3_command() && stream->require(1)) {
  898 + srs_verbose("skip 1bytes to decode AMF3 command");
  899 + stream->skip(1);
  900 + }
875 901
876 // amf0 command message. 902 // amf0 command message.
877 // need to read the command name. 903 // need to read the command name.
878 std::string command; 904 std::string command;
879 if ((ret = srs_amf0_read_string(stream, command)) != ERROR_SUCCESS) { 905 if ((ret = srs_amf0_read_string(stream, command)) != ERROR_SUCCESS) {
880 - srs_error("decode AMF0 command name failed. ret=%d", ret); 906 + srs_error("decode AMF0/AMF3 command name failed. ret=%d", ret);
881 return ret; 907 return ret;
882 } 908 }
883 - srs_verbose("AMF0 command message, command_name=%s", command.c_str()); 909 + srs_verbose("AMF0/AMF3 command message, command_name=%s", command.c_str());
884 910
  911 + // reset to zero(amf3 to 1) to restart decode.
885 stream->reset(); 912 stream->reset();
  913 + if (header.is_amf3_command()) {
  914 + stream->skip(1);
  915 + }
  916 +
  917 + // decode command object.
886 if (command == RTMP_AMF0_COMMAND_CONNECT) { 918 if (command == RTMP_AMF0_COMMAND_CONNECT) {
887 - srs_info("decode the AMF0 command(connect vhost/app message)."); 919 + srs_info("decode the AMF0/AMF3 command(connect vhost/app message).");
888 packet = new SrsConnectAppPacket(); 920 packet = new SrsConnectAppPacket();
889 return packet->decode(stream); 921 return packet->decode(stream);
  922 + } else if(command == RTMP_AMF0_COMMAND_CREATE_STREAM) {
  923 + srs_info("decode the AMF0/AMF3 command(createStream message).");
  924 + packet = new SrsCreateStreamPacket();
  925 + return packet->decode(stream);
  926 + } else if(command == RTMP_AMF0_COMMAND_PLAY) {
  927 + srs_info("decode the AMF0/AMF3 command(paly message).");
  928 + packet = new SrsPlayPacket();
  929 + return packet->decode(stream);
890 } 930 }
891 931
892 // default packet to drop message. 932 // default packet to drop message.
893 - srs_trace("drop the AMF0 command message, command_name=%s", command.c_str()); 933 + srs_trace("drop the AMF0/AMF3 command message, command_name=%s", command.c_str());
894 packet = new SrsPacket(); 934 packet = new SrsPacket();
895 return ret; 935 return ret;
896 - } else if(header.message_type == RTMP_MSG_WindowAcknowledgementSize) { 936 + } else if(header.is_window_ackledgement_size()) {
897 srs_verbose("start to decode set ack window size message."); 937 srs_verbose("start to decode set ack window size message.");
898 packet = new SrsSetWindowAckSizePacket(); 938 packet = new SrsSetWindowAckSizePacket();
899 return packet->decode(stream); 939 return packet->decode(stream);
@@ -1106,7 +1146,7 @@ int SrsConnectAppPacket::decode(SrsStream* stream) @@ -1106,7 +1146,7 @@ int SrsConnectAppPacket::decode(SrsStream* stream)
1106 1146
1107 SrsConnectAppResPacket::SrsConnectAppResPacket() 1147 SrsConnectAppResPacket::SrsConnectAppResPacket()
1108 { 1148 {
1109 - command_name = RTMP_AMF0_COMMAND_CONNECT; 1149 + command_name = RTMP_AMF0_COMMAND_RESULT;
1110 transaction_id = 1; 1150 transaction_id = 1;
1111 props = new SrsAmf0Object(); 1151 props = new SrsAmf0Object();
1112 info = new SrsAmf0Object(); 1152 info = new SrsAmf0Object();
@@ -1175,6 +1215,313 @@ int SrsConnectAppResPacket::encode_packet(SrsStream* stream) @@ -1175,6 +1215,313 @@ int SrsConnectAppResPacket::encode_packet(SrsStream* stream)
1175 return ret; 1215 return ret;
1176 } 1216 }
1177 1217
  1218 +SrsCreateStreamPacket::SrsCreateStreamPacket()
  1219 +{
  1220 + command_name = RTMP_AMF0_COMMAND_CREATE_STREAM;
  1221 + transaction_id = 2;
  1222 + command_object = new SrsAmf0Null();
  1223 +}
  1224 +
  1225 +SrsCreateStreamPacket::~SrsCreateStreamPacket()
  1226 +{
  1227 + if (command_object) {
  1228 + delete command_object;
  1229 + command_object = NULL;
  1230 + }
  1231 +}
  1232 +
  1233 +int SrsCreateStreamPacket::decode(SrsStream* stream)
  1234 +{
  1235 + int ret = ERROR_SUCCESS;
  1236 +
  1237 + if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) {
  1238 + srs_error("amf0 decode createStream command_name failed. ret=%d", ret);
  1239 + return ret;
  1240 + }
  1241 + if (command_name.empty() || command_name != RTMP_AMF0_COMMAND_CREATE_STREAM) {
  1242 + ret = ERROR_RTMP_AMF0_DECODE;
  1243 + srs_error("amf0 decode createStream command_name failed. "
  1244 + "command_name=%s, ret=%d", command_name.c_str(), ret);
  1245 + return ret;
  1246 + }
  1247 +
  1248 + if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) {
  1249 + srs_error("amf0 decode createStream transaction_id failed. ret=%d", ret);
  1250 + return ret;
  1251 + }
  1252 +
  1253 + if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) {
  1254 + srs_error("amf0 decode createStream command_object failed. ret=%d", ret);
  1255 + return ret;
  1256 + }
  1257 +
  1258 + srs_info("amf0 decode createStream packet success");
  1259 +
  1260 + return ret;
  1261 +}
  1262 +
  1263 +SrsCreateStreamResPacket::SrsCreateStreamResPacket(double _transaction_id, double _stream_id)
  1264 +{
  1265 + command_name = RTMP_AMF0_COMMAND_RESULT;
  1266 + transaction_id = _transaction_id;
  1267 + command_object = new SrsAmf0Null();
  1268 + stream_id = _stream_id;
  1269 +}
  1270 +
  1271 +SrsCreateStreamResPacket::~SrsCreateStreamResPacket()
  1272 +{
  1273 + if (command_object) {
  1274 + delete command_object;
  1275 + command_object = NULL;
  1276 + }
  1277 +}
  1278 +
  1279 +int SrsCreateStreamResPacket::get_perfer_cid()
  1280 +{
  1281 + return RTMP_CID_OverConnection;
  1282 +}
  1283 +
  1284 +int SrsCreateStreamResPacket::get_message_type()
  1285 +{
  1286 + return RTMP_MSG_AMF0CommandMessage;
  1287 +}
  1288 +
  1289 +int SrsCreateStreamResPacket::get_size()
  1290 +{
  1291 + return srs_amf0_get_string_size(command_name) + srs_amf0_get_number_size()
  1292 + + srs_amf0_get_null_size() + srs_amf0_get_number_size();
  1293 +}
  1294 +
  1295 +int SrsCreateStreamResPacket::encode_packet(SrsStream* stream)
  1296 +{
  1297 + int ret = ERROR_SUCCESS;
  1298 +
  1299 + if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) {
  1300 + srs_error("encode command_name failed. ret=%d", ret);
  1301 + return ret;
  1302 + }
  1303 + srs_verbose("encode command_name success.");
  1304 +
  1305 + if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) {
  1306 + srs_error("encode transaction_id failed. ret=%d", ret);
  1307 + return ret;
  1308 + }
  1309 + srs_verbose("encode transaction_id success.");
  1310 +
  1311 + if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) {
  1312 + srs_error("encode command_object failed. ret=%d", ret);
  1313 + return ret;
  1314 + }
  1315 + srs_verbose("encode command_object success.");
  1316 +
  1317 + if ((ret = srs_amf0_write_number(stream, stream_id)) != ERROR_SUCCESS) {
  1318 + srs_error("encode stream_id failed. ret=%d", ret);
  1319 + return ret;
  1320 + }
  1321 + srs_verbose("encode stream_id success.");
  1322 +
  1323 +
  1324 + srs_info("encode createStream response packet success.");
  1325 +
  1326 + return ret;
  1327 +}
  1328 +
  1329 +SrsPlayPacket::SrsPlayPacket()
  1330 +{
  1331 + command_name = RTMP_AMF0_COMMAND_PLAY;
  1332 + transaction_id = 0;
  1333 + command_object = new SrsAmf0Null();
  1334 +
  1335 + start = -2;
  1336 + duration = -1;
  1337 + reset = true;
  1338 +}
  1339 +
  1340 +SrsPlayPacket::~SrsPlayPacket()
  1341 +{
  1342 + if (command_object) {
  1343 + delete command_object;
  1344 + command_object = NULL;
  1345 + }
  1346 +}
  1347 +
  1348 +int SrsPlayPacket::decode(SrsStream* stream)
  1349 +{
  1350 + int ret = ERROR_SUCCESS;
  1351 +
  1352 + if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) {
  1353 + srs_error("amf0 decode play command_name failed. ret=%d", ret);
  1354 + return ret;
  1355 + }
  1356 + if (command_name.empty() || command_name != RTMP_AMF0_COMMAND_PLAY) {
  1357 + ret = ERROR_RTMP_AMF0_DECODE;
  1358 + srs_error("amf0 decode play command_name failed. "
  1359 + "command_name=%s, ret=%d", command_name.c_str(), ret);
  1360 + return ret;
  1361 + }
  1362 +
  1363 + if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) {
  1364 + srs_error("amf0 decode play transaction_id failed. ret=%d", ret);
  1365 + return ret;
  1366 + }
  1367 +
  1368 + if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) {
  1369 + srs_error("amf0 decode play command_object failed. ret=%d", ret);
  1370 + return ret;
  1371 + }
  1372 +
  1373 + if ((ret = srs_amf0_read_string(stream, stream_name)) != ERROR_SUCCESS) {
  1374 + srs_error("amf0 decode play stream_name failed. ret=%d", ret);
  1375 + return ret;
  1376 + }
  1377 +
  1378 + if (!stream->empty() && (ret = srs_amf0_read_number(stream, start)) != ERROR_SUCCESS) {
  1379 + srs_error("amf0 decode play start failed. ret=%d", ret);
  1380 + return ret;
  1381 + }
  1382 + if (!stream->empty() && (ret = srs_amf0_read_number(stream, duration)) != ERROR_SUCCESS) {
  1383 + srs_error("amf0 decode play duration failed. ret=%d", ret);
  1384 + return ret;
  1385 + }
  1386 + if (!stream->empty() && (ret = srs_amf0_read_boolean(stream, reset)) != ERROR_SUCCESS) {
  1387 + srs_error("amf0 decode play reset failed. ret=%d", ret);
  1388 + return ret;
  1389 + }
  1390 +
  1391 + srs_info("amf0 decode play packet success");
  1392 +
  1393 + return ret;
  1394 +}
  1395 +
  1396 +SrsPlayResPacket::SrsPlayResPacket()
  1397 +{
  1398 + command_name = RTMP_AMF0_COMMAND_RESULT;
  1399 + transaction_id = 0;
  1400 + command_object = new SrsAmf0Null();
  1401 + desc = new SrsAmf0Object();
  1402 +}
  1403 +
  1404 +SrsPlayResPacket::~SrsPlayResPacket()
  1405 +{
  1406 + if (command_object) {
  1407 + delete command_object;
  1408 + command_object = NULL;
  1409 + }
  1410 +
  1411 + if (desc) {
  1412 + delete desc;
  1413 + desc = NULL;
  1414 + }
  1415 +}
  1416 +
  1417 +int SrsPlayResPacket::get_perfer_cid()
  1418 +{
  1419 + return RTMP_CID_OverStream;
  1420 +}
  1421 +
  1422 +int SrsPlayResPacket::get_message_type()
  1423 +{
  1424 + return RTMP_MSG_AMF0CommandMessage;
  1425 +}
  1426 +
  1427 +int SrsPlayResPacket::get_size()
  1428 +{
  1429 + return srs_amf0_get_string_size(command_name) + srs_amf0_get_number_size()
  1430 + + srs_amf0_get_null_size() + srs_amf0_get_object_size(desc);
  1431 +}
  1432 +
  1433 +int SrsPlayResPacket::encode_packet(SrsStream* stream)
  1434 +{
  1435 + int ret = ERROR_SUCCESS;
  1436 +
  1437 + if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) {
  1438 + srs_error("encode command_name failed. ret=%d", ret);
  1439 + return ret;
  1440 + }
  1441 + srs_verbose("encode command_name success.");
  1442 +
  1443 + if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) {
  1444 + srs_error("encode transaction_id failed. ret=%d", ret);
  1445 + return ret;
  1446 + }
  1447 + srs_verbose("encode transaction_id success.");
  1448 +
  1449 + if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) {
  1450 + srs_error("encode command_object failed. ret=%d", ret);
  1451 + return ret;
  1452 + }
  1453 + srs_verbose("encode command_object success.");
  1454 +
  1455 + if ((ret = srs_amf0_write_object(stream, desc)) != ERROR_SUCCESS) {
  1456 + srs_error("encode desc failed. ret=%d", ret);
  1457 + return ret;
  1458 + }
  1459 + srs_verbose("encode desc success.");
  1460 +
  1461 +
  1462 + srs_info("encode play response packet success.");
  1463 +
  1464 + return ret;
  1465 +}
  1466 +
  1467 +SrsOnBWDonePacket::SrsOnBWDonePacket()
  1468 +{
  1469 + command_name = RTMP_AMF0_COMMAND_ON_BW_DONE;
  1470 + transaction_id = 0;
  1471 + args = new SrsAmf0Null();
  1472 +}
  1473 +
  1474 +SrsOnBWDonePacket::~SrsOnBWDonePacket()
  1475 +{
  1476 + if (args) {
  1477 + delete args;
  1478 + args = NULL;
  1479 + }
  1480 +}
  1481 +
  1482 +int SrsOnBWDonePacket::get_perfer_cid()
  1483 +{
  1484 + return RTMP_CID_OverConnection;
  1485 +}
  1486 +
  1487 +int SrsOnBWDonePacket::get_message_type()
  1488 +{
  1489 + return RTMP_MSG_AMF0CommandMessage;
  1490 +}
  1491 +
  1492 +int SrsOnBWDonePacket::get_size()
  1493 +{
  1494 + return srs_amf0_get_string_size(command_name) + srs_amf0_get_number_size()
  1495 + + srs_amf0_get_null_size();
  1496 +}
  1497 +
  1498 +int SrsOnBWDonePacket::encode_packet(SrsStream* stream)
  1499 +{
  1500 + int ret = ERROR_SUCCESS;
  1501 +
  1502 + if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) {
  1503 + srs_error("encode command_name failed. ret=%d", ret);
  1504 + return ret;
  1505 + }
  1506 + srs_verbose("encode command_name success.");
  1507 +
  1508 + if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) {
  1509 + srs_error("encode transaction_id failed. ret=%d", ret);
  1510 + return ret;
  1511 + }
  1512 + srs_verbose("encode transaction_id success.");
  1513 +
  1514 + if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) {
  1515 + srs_error("encode args failed. ret=%d", ret);
  1516 + return ret;
  1517 + }
  1518 + srs_verbose("encode args success.");
  1519 +
  1520 + srs_info("encode onBWDone packet success.");
  1521 +
  1522 + return ret;
  1523 +}
  1524 +
1178 SrsSetWindowAckSizePacket::SrsSetWindowAckSizePacket() 1525 SrsSetWindowAckSizePacket::SrsSetWindowAckSizePacket()
1179 { 1526 {
1180 ackowledgement_window_size = 0; 1527 ackowledgement_window_size = 0;
@@ -45,6 +45,7 @@ class SrsStream; @@ -45,6 +45,7 @@ class SrsStream;
45 class SrsMessage; 45 class SrsMessage;
46 class SrsChunkStream; 46 class SrsChunkStream;
47 class SrsAmf0Object; 47 class SrsAmf0Object;
  48 +class SrsAmf0Null;
48 49
49 // convert class name to string. 50 // convert class name to string.
50 #define CLASS_NAME_STRING(className) #className 51 #define CLASS_NAME_STRING(className) #className
@@ -166,6 +167,10 @@ struct SrsMessageHeader @@ -166,6 +167,10 @@ struct SrsMessageHeader
166 167
167 SrsMessageHeader(); 168 SrsMessageHeader();
168 virtual ~SrsMessageHeader(); 169 virtual ~SrsMessageHeader();
  170 +
  171 + bool is_amf0_command();
  172 + bool is_amf3_command();
  173 + bool is_window_ackledgement_size();
169 }; 174 };
170 175
171 /** 176 /**
@@ -358,6 +363,146 @@ protected: @@ -358,6 +363,146 @@ protected:
358 }; 363 };
359 364
360 /** 365 /**
  366 +* 4.1.3. createStream
  367 +* The client sends this command to the server to create a logical
  368 +* channel for message communication The publishing of audio, video, and
  369 +* metadata is carried out over stream channel created using the
  370 +* createStream command.
  371 +*/
  372 +class SrsCreateStreamPacket : public SrsPacket
  373 +{
  374 +private:
  375 + typedef SrsPacket super;
  376 +protected:
  377 + virtual const char* get_class_name()
  378 + {
  379 + return CLASS_NAME_STRING(SrsCreateStreamPacket);
  380 + }
  381 +public:
  382 + std::string command_name;
  383 + double transaction_id;
  384 + SrsAmf0Null* command_object;
  385 +public:
  386 + SrsCreateStreamPacket();
  387 + virtual ~SrsCreateStreamPacket();
  388 +public:
  389 + virtual int decode(SrsStream* stream);
  390 +};
  391 +/**
  392 +* response for SrsCreateStreamPacket.
  393 +*/
  394 +class SrsCreateStreamResPacket : public SrsPacket
  395 +{
  396 +private:
  397 + typedef SrsPacket super;
  398 +protected:
  399 + virtual const char* get_class_name()
  400 + {
  401 + return CLASS_NAME_STRING(SrsCreateStreamResPacket);
  402 + }
  403 +public:
  404 + std::string command_name;
  405 + double transaction_id;
  406 + SrsAmf0Null* command_object;
  407 + double stream_id;
  408 +public:
  409 + SrsCreateStreamResPacket(double _transaction_id, double _stream_id);
  410 + virtual ~SrsCreateStreamResPacket();
  411 +public:
  412 + virtual int get_perfer_cid();
  413 +public:
  414 + virtual int get_message_type();
  415 +protected:
  416 + virtual int get_size();
  417 + virtual int encode_packet(SrsStream* stream);
  418 +};
  419 +
  420 +/**
  421 +* 4.2.1. play
  422 +* The client sends this command to the server to play a stream.
  423 +*/
  424 +class SrsPlayPacket : public SrsPacket
  425 +{
  426 +private:
  427 + typedef SrsPacket super;
  428 +protected:
  429 + virtual const char* get_class_name()
  430 + {
  431 + return CLASS_NAME_STRING(SrsPlayPacket);
  432 + }
  433 +public:
  434 + std::string command_name;
  435 + double transaction_id;
  436 + SrsAmf0Null* command_object;
  437 + std::string stream_name;
  438 + double start;
  439 + double duration;
  440 + bool reset;
  441 +public:
  442 + SrsPlayPacket();
  443 + virtual ~SrsPlayPacket();
  444 +public:
  445 + virtual int decode(SrsStream* stream);
  446 +};
  447 +/**
  448 +* response for SrsPlayPacket.
  449 +* @remark, user must set the stream_id in header.
  450 +*/
  451 +class SrsPlayResPacket : public SrsPacket
  452 +{
  453 +private:
  454 + typedef SrsPacket super;
  455 +protected:
  456 + virtual const char* get_class_name()
  457 + {
  458 + return CLASS_NAME_STRING(SrsPlayResPacket);
  459 + }
  460 +public:
  461 + std::string command_name;
  462 + double transaction_id;
  463 + SrsAmf0Null* command_object;
  464 + SrsAmf0Object* desc;
  465 +public:
  466 + SrsPlayResPacket();
  467 + virtual ~SrsPlayResPacket();
  468 +public:
  469 + virtual int get_perfer_cid();
  470 +public:
  471 + virtual int get_message_type();
  472 +protected:
  473 + virtual int get_size();
  474 + virtual int encode_packet(SrsStream* stream);
  475 +};
  476 +
  477 +/**
  478 +* when bandwidth test done, notice client.
  479 +*/
  480 +class SrsOnBWDonePacket : public SrsPacket
  481 +{
  482 +private:
  483 + typedef SrsPacket super;
  484 +protected:
  485 + virtual const char* get_class_name()
  486 + {
  487 + return CLASS_NAME_STRING(SrsOnBWDonePacket);
  488 + }
  489 +public:
  490 + std::string command_name;
  491 + double transaction_id;
  492 + SrsAmf0Null* args;
  493 +public:
  494 + SrsOnBWDonePacket();
  495 + virtual ~SrsOnBWDonePacket();
  496 +public:
  497 + virtual int get_perfer_cid();
  498 +public:
  499 + virtual int get_message_type();
  500 +protected:
  501 + virtual int get_size();
  502 + virtual int encode_packet(SrsStream* stream);
  503 +};
  504 +
  505 +/**
361 * 5.5. Window Acknowledgement Size (5) 506 * 5.5. Window Acknowledgement Size (5)
362 * The client or the server sends this message to inform the peer which 507 * The client or the server sends this message to inform the peer which
363 * window size to use when sending acknowledgment. 508 * window size to use when sending acknowledgment.
@@ -221,8 +221,6 @@ int SrsRtmp::response_connect_app() @@ -221,8 +221,6 @@ int SrsRtmp::response_connect_app()
221 SrsMessage* msg = new SrsMessage(); 221 SrsMessage* msg = new SrsMessage();
222 SrsConnectAppResPacket* pkt = new SrsConnectAppResPacket(); 222 SrsConnectAppResPacket* pkt = new SrsConnectAppResPacket();
223 223
224 - pkt->command_name = "_result";  
225 -  
226 pkt->props->properties["fmsVer"] = new SrsAmf0String("FMS/"RTMP_SIG_FMS_VER); 224 pkt->props->properties["fmsVer"] = new SrsAmf0String("FMS/"RTMP_SIG_FMS_VER);
227 pkt->props->properties["capabilities"] = new SrsAmf0Number(123); 225 pkt->props->properties["capabilities"] = new SrsAmf0Number(123);
228 pkt->props->properties["mode"] = new SrsAmf0Number(1); 226 pkt->props->properties["mode"] = new SrsAmf0Number(1);
@@ -250,3 +248,110 @@ int SrsRtmp::response_connect_app() @@ -250,3 +248,110 @@ int SrsRtmp::response_connect_app()
250 return ret; 248 return ret;
251 } 249 }
252 250
  251 +int SrsRtmp::on_bw_done()
  252 +{
  253 + int ret = ERROR_SUCCESS;
  254 +
  255 + SrsMessage* msg = new SrsMessage();
  256 + SrsOnBWDonePacket* pkt = new SrsOnBWDonePacket();
  257 +
  258 + msg->set_packet(pkt);
  259 +
  260 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  261 + srs_error("send onBWDone message failed. ret=%d", ret);
  262 + return ret;
  263 + }
  264 + srs_info("send onBWDone message success.");
  265 +
  266 + return ret;
  267 +}
  268 +
  269 +int SrsRtmp::identify_client(int stream_id, SrsClientType& type, std::string& stream_name)
  270 +{
  271 + type = SrsClientUnknown;
  272 + int ret = ERROR_SUCCESS;
  273 +
  274 + while (true) {
  275 + SrsMessage* msg = NULL;
  276 + if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {
  277 + srs_error("recv identify client message failed. ret=%d", ret);
  278 + return ret;
  279 + }
  280 +
  281 + SrsAutoFree(SrsMessage, msg, false);
  282 +
  283 + if (!msg->header.is_amf0_command() && !msg->header.is_amf3_command()) {
  284 + srs_trace("identify ignore messages except "
  285 + "AMF0/AMF3 command message. type=%#x", msg->header.message_type);
  286 + continue;
  287 + }
  288 +
  289 + if ((ret = msg->decode_packet()) != ERROR_SUCCESS) {
  290 + srs_error("identify decode message failed. ret=%d", ret);
  291 + return ret;
  292 + }
  293 +
  294 + SrsPacket* pkt = msg->get_packet();
  295 + if (dynamic_cast<SrsCreateStreamPacket*>(pkt)) {
  296 + return identify_create_stream_client(
  297 + dynamic_cast<SrsCreateStreamPacket*>(pkt), stream_id, type, stream_name);
  298 + }
  299 +
  300 + srs_trace("ignore AMF0/AMF3 command message.");
  301 + }
  302 +
  303 + return ret;
  304 +}
  305 +
  306 +int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name)
  307 +{
  308 + int ret = ERROR_SUCCESS;
  309 +
  310 + if (true) {
  311 + SrsMessage* msg = new SrsMessage();
  312 + SrsCreateStreamResPacket* pkt = new SrsCreateStreamResPacket(req->transaction_id, stream_id);
  313 +
  314 + msg->set_packet(pkt);
  315 +
  316 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  317 + srs_error("send createStream response message failed. ret=%d", ret);
  318 + return ret;
  319 + }
  320 + srs_info("send createStream response message success.");
  321 + }
  322 +
  323 + while (true) {
  324 + SrsMessage* msg = NULL;
  325 + if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {
  326 + srs_error("recv identify client message failed. ret=%d", ret);
  327 + return ret;
  328 + }
  329 +
  330 + SrsAutoFree(SrsMessage, msg, false);
  331 +
  332 + if (!msg->header.is_amf0_command() && !msg->header.is_amf3_command()) {
  333 + srs_trace("identify ignore messages except "
  334 + "AMF0/AMF3 command message. type=%#x", msg->header.message_type);
  335 + continue;
  336 + }
  337 +
  338 + if ((ret = msg->decode_packet()) != ERROR_SUCCESS) {
  339 + srs_error("identify decode message failed. ret=%d", ret);
  340 + return ret;
  341 + }
  342 +
  343 + SrsPacket* pkt = msg->get_packet();
  344 + if (dynamic_cast<SrsPlayPacket*>(pkt)) {
  345 + SrsPlayPacket* play = dynamic_cast<SrsPlayPacket*>(pkt);
  346 + type = SrsClientPublish;
  347 + stream_name = play->stream_name;
  348 + srs_trace("identity client type=play, stream_name=%s", stream_name.c_str());
  349 + return ret;
  350 + }
  351 +
  352 + srs_trace("ignore AMF0/AMF3 command message.");
  353 + }
  354 +
  355 + return ret;
  356 +}
  357 +
@@ -35,6 +35,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -35,6 +35,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
35 #include <st.h> 35 #include <st.h>
36 36
37 class SrsProtocol; 37 class SrsProtocol;
  38 +class SrsCreateStreamPacket;
38 39
39 /** 40 /**
40 * the original request from client. 41 * the original request from client.
@@ -58,6 +59,16 @@ struct SrsRequest @@ -58,6 +59,16 @@ struct SrsRequest
58 }; 59 };
59 60
60 /** 61 /**
  62 +* the rtmp client type.
  63 +*/
  64 +enum SrsClientType
  65 +{
  66 + SrsClientUnknown,
  67 + SrsClientPlay,
  68 + SrsClientPublish,
  69 +};
  70 +
  71 +/**
61 * the rtmp provices rtmp-command-protocol services, 72 * the rtmp provices rtmp-command-protocol services,
62 * a high level protocol, media stream oriented services, 73 * a high level protocol, media stream oriented services,
63 * such as connect to vhost/app, play stream, get audio/video data. 74 * such as connect to vhost/app, play stream, get audio/video data.
@@ -80,6 +91,16 @@ public: @@ -80,6 +91,16 @@ public:
80 */ 91 */
81 virtual int set_peer_bandwidth(int bandwidth, int type); 92 virtual int set_peer_bandwidth(int bandwidth, int type);
82 virtual int response_connect_app(); 93 virtual int response_connect_app();
  94 + virtual int on_bw_done();
  95 + /**
  96 + * recv some message to identify the client.
  97 + * @stream_id, client will createStream to play or publish by flash,
  98 + * the stream_id used to response the createStream request.
  99 + * @type, output the client type.
  100 + */
  101 + virtual int identify_client(int stream_id, SrsClientType& type, std::string& stream_name);
  102 +private:
  103 + virtual int identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name);
83 }; 104 };
84 105
85 #endif 106 #endif