winlin

support decode user control message. response ping automatically

@@ -449,6 +449,47 @@ int SrsProtocol::send_message(ISrsMessage* msg) @@ -449,6 +449,47 @@ int SrsProtocol::send_message(ISrsMessage* msg)
449 return ret; 449 return ret;
450 } 450 }
451 451
  452 +int SrsProtocol::response_acknowledgement_message()
  453 +{
  454 + int ret = ERROR_SUCCESS;
  455 +
  456 + SrsCommonMessage* msg = new SrsCommonMessage();
  457 + SrsAcknowledgementPacket* pkt = new SrsAcknowledgementPacket();
  458 +
  459 + in_ack_size.acked_size = pkt->sequence_number = skt->get_recv_bytes();
  460 + msg->set_packet(pkt, 0);
  461 +
  462 + if ((ret = send_message(msg)) != ERROR_SUCCESS) {
  463 + srs_error("send acknowledgement failed. ret=%d", ret);
  464 + return ret;
  465 + }
  466 + srs_verbose("send acknowledgement success.");
  467 +
  468 + return ret;
  469 +}
  470 +
  471 +int SrsProtocol::response_ping_message(int32_t timestamp)
  472 +{
  473 + int ret = ERROR_SUCCESS;
  474 +
  475 + srs_trace("get a ping request, response it. timestamp=%d", timestamp);
  476 +
  477 + SrsCommonMessage* msg = new SrsCommonMessage();
  478 + SrsUserControlPacket* pkt = new SrsUserControlPacket();
  479 +
  480 + pkt->event_type = SrcPCUCPingResponse;
  481 + pkt->event_data = timestamp;
  482 + msg->set_packet(pkt, 0);
  483 +
  484 + if ((ret = send_message(msg)) != ERROR_SUCCESS) {
  485 + srs_error("send ping response failed. ret=%d", ret);
  486 + return ret;
  487 + }
  488 + srs_verbose("send ping response success.");
  489 +
  490 + return ret;
  491 +}
  492 +
452 int SrsProtocol::on_recv_message(SrsCommonMessage* msg) 493 int SrsProtocol::on_recv_message(SrsCommonMessage* msg)
453 { 494 {
454 int ret = ERROR_SUCCESS; 495 int ret = ERROR_SUCCESS;
@@ -457,21 +498,14 @@ int SrsProtocol::on_recv_message(SrsCommonMessage* msg) @@ -457,21 +498,14 @@ int SrsProtocol::on_recv_message(SrsCommonMessage* msg)
457 498
458 // acknowledgement 499 // acknowledgement
459 if (skt->get_recv_bytes() - in_ack_size.acked_size > in_ack_size.ack_window_size) { 500 if (skt->get_recv_bytes() - in_ack_size.acked_size > in_ack_size.ack_window_size) {
460 - SrsCommonMessage* ack = new SrsCommonMessage();  
461 - SrsAcknowledgementPacket* pkt = new SrsAcknowledgementPacket();  
462 -  
463 - in_ack_size.acked_size = pkt->sequence_number = skt->get_recv_bytes();  
464 - ack->set_packet(pkt, 0);  
465 -  
466 - if ((ret = send_message(ack)) != ERROR_SUCCESS) {  
467 - srs_error("send acknowledgement failed. ret=%d", ret); 501 + if ((ret = response_acknowledgement_message()) != ERROR_SUCCESS) {
468 return ret; 502 return ret;
469 } 503 }
470 - srs_verbose("send acknowledgement success.");  
471 } 504 }
472 505
473 switch (msg->header.message_type) { 506 switch (msg->header.message_type) {
474 case RTMP_MSG_SetChunkSize: 507 case RTMP_MSG_SetChunkSize:
  508 + case RTMP_MSG_UserControlMessage:
475 case RTMP_MSG_WindowAcknowledgementSize: 509 case RTMP_MSG_WindowAcknowledgementSize:
476 if ((ret = msg->decode_packet()) != ERROR_SUCCESS) { 510 if ((ret = msg->decode_packet()) != ERROR_SUCCESS) {
477 srs_error("decode packet from message payload failed. ret=%d", ret); 511 srs_error("decode packet from message payload failed. ret=%d", ret);
@@ -503,6 +537,20 @@ int SrsProtocol::on_recv_message(SrsCommonMessage* msg) @@ -503,6 +537,20 @@ int SrsProtocol::on_recv_message(SrsCommonMessage* msg)
503 srs_trace("set input chunk size to %d", pkt->chunk_size); 537 srs_trace("set input chunk size to %d", pkt->chunk_size);
504 break; 538 break;
505 } 539 }
  540 + case RTMP_MSG_UserControlMessage: {
  541 + SrsUserControlPacket* pkt = dynamic_cast<SrsUserControlPacket*>(msg->get_packet());
  542 + srs_assert(pkt != NULL);
  543 +
  544 + if (pkt->event_type == SrcPCUCSetBufferLength) {
  545 + srs_trace("ignored. set buffer length to %d", pkt->extra_data);
  546 + }
  547 + if (pkt->event_type == SrcPCUCPingRequest) {
  548 + if ((ret = response_ping_message(pkt->event_data)) != ERROR_SUCCESS) {
  549 + return ret;
  550 + }
  551 + }
  552 + break;
  553 + }
506 } 554 }
507 555
508 return ret; 556 return ret;
@@ -963,6 +1011,11 @@ bool SrsMessageHeader::is_set_chunk_size() @@ -963,6 +1011,11 @@ bool SrsMessageHeader::is_set_chunk_size()
963 return message_type == RTMP_MSG_SetChunkSize; 1011 return message_type == RTMP_MSG_SetChunkSize;
964 } 1012 }
965 1013
  1014 +bool SrsMessageHeader::is_user_control_message()
  1015 +{
  1016 + return message_type == RTMP_MSG_UserControlMessage;
  1017 +}
  1018 +
966 SrsChunkStream::SrsChunkStream(int _cid) 1019 SrsChunkStream::SrsChunkStream(int _cid)
967 { 1020 {
968 fmt = 0; 1021 fmt = 0;
@@ -1098,6 +1151,10 @@ int SrsCommonMessage::decode_packet() @@ -1098,6 +1151,10 @@ int SrsCommonMessage::decode_packet()
1098 srs_trace("drop the AMF0/AMF3 command message, command_name=%s", command.c_str()); 1151 srs_trace("drop the AMF0/AMF3 command message, command_name=%s", command.c_str());
1099 packet = new SrsPacket(); 1152 packet = new SrsPacket();
1100 return ret; 1153 return ret;
  1154 + } else if(header.is_user_control_message()) {
  1155 + srs_verbose("start to decode user control message.");
  1156 + packet = new SrsUserControlPacket();
  1157 + return packet->decode(stream);
1101 } else if(header.is_window_ackledgement_size()) { 1158 } else if(header.is_window_ackledgement_size()) {
1102 srs_verbose("start to decode set ack window size message."); 1159 srs_verbose("start to decode set ack window size message.");
1103 packet = new SrsSetWindowAckSizePacket(); 1160 packet = new SrsSetWindowAckSizePacket();
@@ -2396,45 +2453,86 @@ int SrsSetPeerBandwidthPacket::encode_packet(SrsStream* stream) @@ -2396,45 +2453,86 @@ int SrsSetPeerBandwidthPacket::encode_packet(SrsStream* stream)
2396 return ret; 2453 return ret;
2397 } 2454 }
2398 2455
2399 -SrsPCUC4BytesPacket::SrsPCUC4BytesPacket() 2456 +SrsUserControlPacket::SrsUserControlPacket()
2400 { 2457 {
2401 event_type = 0; 2458 event_type = 0;
2402 event_data = 0; 2459 event_data = 0;
  2460 + extra_data = 0;
2403 } 2461 }
2404 2462
2405 -SrsPCUC4BytesPacket::~SrsPCUC4BytesPacket() 2463 +SrsUserControlPacket::~SrsUserControlPacket()
2406 { 2464 {
2407 } 2465 }
2408 2466
2409 -int SrsPCUC4BytesPacket::get_perfer_cid() 2467 +int SrsUserControlPacket::decode(SrsStream* stream)
  2468 +{
  2469 + int ret = ERROR_SUCCESS;
  2470 +
  2471 + if (!stream->require(6)) {
  2472 + ret = ERROR_RTMP_MESSAGE_DECODE;
  2473 + srs_error("decode user control failed. ret=%d", ret);
  2474 + return ret;
  2475 + }
  2476 +
  2477 + event_type = stream->read_2bytes();
  2478 + event_data = stream->read_4bytes();
  2479 +
  2480 + if (event_type == SrcPCUCSetBufferLength) {
  2481 + if (!stream->require(2)) {
  2482 + ret = ERROR_RTMP_MESSAGE_ENCODE;
  2483 + srs_error("decode user control packet failed. ret=%d", ret);
  2484 + return ret;
  2485 + }
  2486 + extra_data = stream->read_4bytes();
  2487 + }
  2488 +
  2489 + srs_info("decode user control success. "
  2490 + "event_type=%d, event_data=%d, extra_data=%d",
  2491 + event_type, event_data, extra_data);
  2492 +
  2493 + return ret;
  2494 +}
  2495 +
  2496 +int SrsUserControlPacket::get_perfer_cid()
2410 { 2497 {
2411 return RTMP_CID_ProtocolControl; 2498 return RTMP_CID_ProtocolControl;
2412 } 2499 }
2413 2500
2414 -int SrsPCUC4BytesPacket::get_message_type() 2501 +int SrsUserControlPacket::get_message_type()
2415 { 2502 {
2416 return RTMP_MSG_UserControlMessage; 2503 return RTMP_MSG_UserControlMessage;
2417 } 2504 }
2418 2505
2419 -int SrsPCUC4BytesPacket::get_size() 2506 +int SrsUserControlPacket::get_size()
2420 { 2507 {
2421 - return 2 + 4; 2508 + if (event_type == SrcPCUCSetBufferLength) {
  2509 + return 2 + 4 + 4;
  2510 + } else {
  2511 + return 2 + 4;
  2512 + }
2422 } 2513 }
2423 2514
2424 -int SrsPCUC4BytesPacket::encode_packet(SrsStream* stream) 2515 +int SrsUserControlPacket::encode_packet(SrsStream* stream)
2425 { 2516 {
2426 int ret = ERROR_SUCCESS; 2517 int ret = ERROR_SUCCESS;
2427 2518
2428 - if (!stream->require(6)) { 2519 + if (!stream->require(get_size())) {
2429 ret = ERROR_RTMP_MESSAGE_ENCODE; 2520 ret = ERROR_RTMP_MESSAGE_ENCODE;
2430 - srs_error("encode set bandwidth packet failed. ret=%d", ret); 2521 + srs_error("encode user control packet failed. ret=%d", ret);
2431 return ret; 2522 return ret;
2432 } 2523 }
2433 2524
2434 stream->write_2bytes(event_type); 2525 stream->write_2bytes(event_type);
2435 stream->write_4bytes(event_data); 2526 stream->write_4bytes(event_data);
  2527 +
  2528 + // when event type is set buffer length,
  2529 + // read the extra buffer length.
  2530 + if (event_type == SrcPCUCSetBufferLength) {
  2531 + stream->write_2bytes(extra_data);
  2532 + srs_verbose("user control message, buffer_length=%d", extra_data);
  2533 + }
2436 2534
2437 - srs_verbose("encode PCUC packet success. " 2535 + srs_verbose("encode user control packet success. "
2438 "event_type=%d, event_data=%d", event_type, event_data); 2536 "event_type=%d, event_data=%d", event_type, event_data);
2439 2537
2440 return ret; 2538 return ret;
@@ -129,6 +129,8 @@ private: @@ -129,6 +129,8 @@ private:
129 * when recv message, update the context. 129 * when recv message, update the context.
130 */ 130 */
131 virtual int on_recv_message(SrsCommonMessage* msg); 131 virtual int on_recv_message(SrsCommonMessage* msg);
  132 + virtual int response_acknowledgement_message();
  133 + virtual int response_ping_message(int32_t timestamp);
132 /** 134 /**
133 * when message sentout, update the context. 135 * when message sentout, update the context.
134 */ 136 */
@@ -205,6 +207,7 @@ struct SrsMessageHeader @@ -205,6 +207,7 @@ struct SrsMessageHeader
205 bool is_amf3_data(); 207 bool is_amf3_data();
206 bool is_window_ackledgement_size(); 208 bool is_window_ackledgement_size();
207 bool is_set_chunk_size(); 209 bool is_set_chunk_size();
  210 + bool is_user_control_message();
208 }; 211 };
209 212
210 /** 213 /**
@@ -965,6 +968,7 @@ enum SrcPCUCEventType @@ -965,6 +968,7 @@ enum SrcPCUCEventType
965 * Stream Begin(=0) 4-bytes stream ID 968 * Stream Begin(=0) 4-bytes stream ID
966 * Stream EOF(=1) 4-bytes stream ID 969 * Stream EOF(=1) 4-bytes stream ID
967 * StreamDry(=2) 4-bytes stream ID 970 * StreamDry(=2) 4-bytes stream ID
  971 +* SetBufferLength(=3) 8-bytes 4bytes stream ID, 4bytes buffer length.
968 * StreamIsRecorded(=4) 4-bytes stream ID 972 * StreamIsRecorded(=4) 4-bytes stream ID
969 * PingRequest(=6) 4-bytes timestamp local server time 973 * PingRequest(=6) 4-bytes timestamp local server time
970 * PingResponse(=7) 4-bytes timestamp received ping request. 974 * PingResponse(=7) 4-bytes timestamp received ping request.
@@ -975,21 +979,27 @@ enum SrcPCUCEventType @@ -975,21 +979,27 @@ enum SrcPCUCEventType
975 * +------------------------------+------------------------- 979 * +------------------------------+-------------------------
976 * Figure 5 Pay load for the ‘User Control Message’. 980 * Figure 5 Pay load for the ‘User Control Message’.
977 */ 981 */
978 -class SrsPCUC4BytesPacket : public SrsPacket 982 +class SrsUserControlPacket : public SrsPacket
979 { 983 {
980 private: 984 private:
981 typedef SrsPacket super; 985 typedef SrsPacket super;
982 protected: 986 protected:
983 virtual const char* get_class_name() 987 virtual const char* get_class_name()
984 { 988 {
985 - return CLASS_NAME_STRING(SrsPCUC4BytesPacket); 989 + return CLASS_NAME_STRING(SrsUserControlPacket);
986 } 990 }
987 public: 991 public:
988 int16_t event_type; 992 int16_t event_type;
989 int32_t event_data; 993 int32_t event_data;
  994 + /**
  995 + * 4bytes if event_type is SetBufferLength; otherwise 0.
  996 + */
  997 + int32_t extra_data;
990 public: 998 public:
991 - SrsPCUC4BytesPacket();  
992 - virtual ~SrsPCUC4BytesPacket(); 999 + SrsUserControlPacket();
  1000 + virtual ~SrsUserControlPacket();
  1001 +public:
  1002 + virtual int decode(SrsStream* stream);
993 public: 1003 public:
994 virtual int get_perfer_cid(); 1004 virtual int get_perfer_cid();
995 public: 1005 public:
@@ -412,7 +412,7 @@ int SrsRtmp::start_play(int stream_id) @@ -412,7 +412,7 @@ int SrsRtmp::start_play(int stream_id)
412 // StreamBegin 412 // StreamBegin
413 if (true) { 413 if (true) {
414 SrsCommonMessage* msg = new SrsCommonMessage(); 414 SrsCommonMessage* msg = new SrsCommonMessage();
415 - SrsPCUC4BytesPacket* pkt = new SrsPCUC4BytesPacket(); 415 + SrsUserControlPacket* pkt = new SrsUserControlPacket();
416 416
417 pkt->event_type = SrcPCUCStreamBegin; 417 pkt->event_type = SrcPCUCStreamBegin;
418 pkt->event_data = stream_id; 418 pkt->event_data = stream_id;