winlin

support ack size.

@@ -311,6 +311,12 @@ int SrsProtocol::recv_message(SrsMessage** pmsg) @@ -311,6 +311,12 @@ int SrsProtocol::recv_message(SrsMessage** pmsg)
311 continue; 311 continue;
312 } 312 }
313 313
  314 + if ((ret = on_recv_message(msg)) != ERROR_SUCCESS) {
  315 + srs_error("update context when received msg. ret=%d", ret);
  316 + delete msg;
  317 + return ret;
  318 + }
  319 +
314 srs_verbose("get a msg with raw/undecoded payload"); 320 srs_verbose("get a msg with raw/undecoded payload");
315 *pmsg = msg; 321 *pmsg = msg;
316 break; 322 break;
@@ -431,6 +437,35 @@ int SrsProtocol::send_message(SrsMessage* msg) @@ -431,6 +437,35 @@ int SrsProtocol::send_message(SrsMessage* msg)
431 return ret; 437 return ret;
432 } 438 }
433 439
  440 +int SrsProtocol::on_recv_message(SrsMessage* msg)
  441 +{
  442 + int ret = ERROR_SUCCESS;
  443 +
  444 + srs_assert(msg != NULL);
  445 +
  446 + switch (msg->header.message_type) {
  447 + case RTMP_MSG_WindowAcknowledgementSize:
  448 + if ((ret = msg->decode_packet()) != ERROR_SUCCESS) {
  449 + srs_error("decode packet from message payload failed. ret=%d", ret);
  450 + return ret;
  451 + }
  452 + srs_verbose("decode packet from message payload success.");
  453 + break;
  454 + }
  455 +
  456 + switch (msg->header.message_type) {
  457 + case RTMP_MSG_WindowAcknowledgementSize: {
  458 + SrsSetWindowAckSizePacket* pkt = dynamic_cast<SrsSetWindowAckSizePacket*>(msg->get_packet());
  459 + srs_assert(pkt != NULL);
  460 + // TODO: take effect.
  461 + srs_trace("set ack window size to %d", pkt->ackowledgement_window_size);
  462 + break;
  463 + }
  464 + }
  465 +
  466 + return ret;
  467 +}
  468 +
434 int SrsProtocol::recv_interlaced_message(SrsMessage** pmsg) 469 int SrsProtocol::recv_interlaced_message(SrsMessage** pmsg)
435 { 470 {
436 int ret = ERROR_SUCCESS; 471 int ret = ERROR_SUCCESS;
@@ -816,22 +851,30 @@ int SrsMessage::decode_packet() @@ -816,22 +851,30 @@ int SrsMessage::decode_packet()
816 srs_assert(payload != NULL); 851 srs_assert(payload != NULL);
817 srs_assert(size > 0); 852 srs_assert(size > 0);
818 853
  854 + if (packet) {
  855 + srs_verbose("msg already decoded");
  856 + return ret;
  857 + }
  858 +
819 if (!stream) { 859 if (!stream) {
820 srs_verbose("create decode stream for message."); 860 srs_verbose("create decode stream for message.");
821 stream = new SrsStream(); 861 stream = new SrsStream();
822 } 862 }
823 863
  864 + // initialize the decode stream for all message,
  865 + // it's ok for the initialize if fast and without memory copy.
  866 + if ((ret = stream->initialize((char*)payload, size)) != ERROR_SUCCESS) {
  867 + srs_error("initialize stream failed. ret=%d", ret);
  868 + return ret;
  869 + }
  870 + srs_verbose("decode stream initialized success");
  871 +
  872 + // decode specified packet type
824 if (header.message_type == RTMP_MSG_AMF0CommandMessage) { 873 if (header.message_type == RTMP_MSG_AMF0CommandMessage) {
825 srs_verbose("start to decode AMF0 command message."); 874 srs_verbose("start to decode AMF0 command message.");
826 875
827 // amf0 command message. 876 // amf0 command message.
828 // need to read the command name. 877 // need to read the command name.
829 - if ((ret = stream->initialize((char*)payload, size)) != ERROR_SUCCESS) {  
830 - srs_error("initialize stream failed. ret=%d", ret);  
831 - return ret;  
832 - }  
833 - srs_verbose("decode stream initialized success");  
834 -  
835 std::string command; 878 std::string command;
836 if ((ret = srs_amf0_read_string(stream, command)) != ERROR_SUCCESS) { 879 if ((ret = srs_amf0_read_string(stream, command)) != ERROR_SUCCESS) {
837 srs_error("decode AMF0 command name failed. ret=%d", ret); 880 srs_error("decode AMF0 command name failed. ret=%d", ret);
@@ -850,12 +893,16 @@ int SrsMessage::decode_packet() @@ -850,12 +893,16 @@ int SrsMessage::decode_packet()
850 srs_trace("drop the AMF0 command message, command_name=%s", command.c_str()); 893 srs_trace("drop the AMF0 command message, command_name=%s", command.c_str());
851 packet = new SrsPacket(); 894 packet = new SrsPacket();
852 return ret; 895 return ret;
  896 + } else if(header.message_type == RTMP_MSG_WindowAcknowledgementSize) {
  897 + srs_verbose("start to decode set ack window size message.");
  898 + packet = new SrsSetWindowAckSizePacket();
  899 + return packet->decode(stream);
  900 + } else {
  901 + // default packet to drop message.
  902 + srs_trace("drop the unknown message, type=%d", header.message_type);
  903 + packet = new SrsPacket();
853 } 904 }
854 905
855 - // default packet to drop message.  
856 - srs_trace("drop the unknown message, type=%d", header.message_type);  
857 - packet = new SrsPacket();  
858 -  
859 return ret; 906 return ret;
860 } 907 }
861 908
@@ -1137,6 +1184,22 @@ SrsSetWindowAckSizePacket::~SrsSetWindowAckSizePacket() @@ -1137,6 +1184,22 @@ SrsSetWindowAckSizePacket::~SrsSetWindowAckSizePacket()
1137 { 1184 {
1138 } 1185 }
1139 1186
  1187 +int SrsSetWindowAckSizePacket::decode(SrsStream* stream)
  1188 +{
  1189 + int ret = ERROR_SUCCESS;
  1190 +
  1191 + if (!stream->require(4)) {
  1192 + ret = ERROR_RTMP_MESSAGE_DECODE;
  1193 + srs_error("decode ack window size failed. ret=%d", ret);
  1194 + return ret;
  1195 + }
  1196 +
  1197 + ackowledgement_window_size = stream->read_4bytes();
  1198 + srs_info("decode ack window size success");
  1199 +
  1200 + return ret;
  1201 +}
  1202 +
1140 int SrsSetWindowAckSizePacket::get_perfer_cid() 1203 int SrsSetWindowAckSizePacket::get_perfer_cid()
1141 { 1204 {
1142 return RTMP_CID_ProtocolControl; 1205 return RTMP_CID_ProtocolControl;
@@ -108,6 +108,10 @@ public: @@ -108,6 +108,10 @@ public:
108 virtual int send_message(SrsMessage* msg); 108 virtual int send_message(SrsMessage* msg);
109 private: 109 private:
110 /** 110 /**
  111 + * when recv message, update the context.
  112 + */
  113 + virtual int on_recv_message(SrsMessage* msg);
  114 + /**
111 * try to recv interlaced message from peer, 115 * try to recv interlaced message from peer,
112 * return error if error occur and nerver set the pmsg, 116 * return error if error occur and nerver set the pmsg,
113 * return success and pmsg set to NULL if no entire message got, 117 * return success and pmsg set to NULL if no entire message got,
@@ -373,6 +377,8 @@ public: @@ -373,6 +377,8 @@ public:
373 SrsSetWindowAckSizePacket(); 377 SrsSetWindowAckSizePacket();
374 virtual ~SrsSetWindowAckSizePacket(); 378 virtual ~SrsSetWindowAckSizePacket();
375 public: 379 public:
  380 + virtual int decode(SrsStream* stream);
  381 +public:
376 virtual int get_perfer_cid(); 382 virtual int get_perfer_cid();
377 public: 383 public:
378 virtual int get_message_type(); 384 virtual int get_message_type();