winlin

send set peer bandwidth packet.

@@ -225,7 +225,7 @@ int srs_amf0_read_string(SrsStream* stream, std::string& value) @@ -225,7 +225,7 @@ int srs_amf0_read_string(SrsStream* stream, std::string& value)
225 return ret; 225 return ret;
226 } 226 }
227 227
228 - char marker = stream->read_char(); 228 + char marker = stream->read_1bytes();
229 if (marker != RTMP_AMF0_String) { 229 if (marker != RTMP_AMF0_String) {
230 ret = ERROR_RTMP_AMF0_DECODE; 230 ret = ERROR_RTMP_AMF0_DECODE;
231 srs_error("amf0 check string marker failed. " 231 srs_error("amf0 check string marker failed. "
@@ -248,7 +248,7 @@ int srs_amf0_read_boolean(SrsStream* stream, bool& value) @@ -248,7 +248,7 @@ int srs_amf0_read_boolean(SrsStream* stream, bool& value)
248 return ret; 248 return ret;
249 } 249 }
250 250
251 - char marker = stream->read_char(); 251 + char marker = stream->read_1bytes();
252 if (marker != RTMP_AMF0_Boolean) { 252 if (marker != RTMP_AMF0_Boolean) {
253 ret = ERROR_RTMP_AMF0_DECODE; 253 ret = ERROR_RTMP_AMF0_DECODE;
254 srs_error("amf0 check bool marker failed. " 254 srs_error("amf0 check bool marker failed. "
@@ -264,7 +264,7 @@ int srs_amf0_read_boolean(SrsStream* stream, bool& value) @@ -264,7 +264,7 @@ int srs_amf0_read_boolean(SrsStream* stream, bool& value)
264 return ret; 264 return ret;
265 } 265 }
266 266
267 - if (stream->read_char() == 0) { 267 + if (stream->read_1bytes() == 0) {
268 value = false; 268 value = false;
269 } else { 269 } else {
270 value = true; 270 value = true;
@@ -286,7 +286,7 @@ int srs_amf0_read_number(SrsStream* stream, double& value) @@ -286,7 +286,7 @@ int srs_amf0_read_number(SrsStream* stream, double& value)
286 return ret; 286 return ret;
287 } 287 }
288 288
289 - char marker = stream->read_char(); 289 + char marker = stream->read_1bytes();
290 if (marker != RTMP_AMF0_Number) { 290 if (marker != RTMP_AMF0_Number) {
291 ret = ERROR_RTMP_AMF0_DECODE; 291 ret = ERROR_RTMP_AMF0_DECODE;
292 srs_error("amf0 check number marker failed. " 292 srs_error("amf0 check number marker failed. "
@@ -321,7 +321,7 @@ int srs_amf0_read_any(SrsStream* stream, SrsAmf0Any*& value) @@ -321,7 +321,7 @@ int srs_amf0_read_any(SrsStream* stream, SrsAmf0Any*& value)
321 return ret; 321 return ret;
322 } 322 }
323 323
324 - char marker = stream->read_char(); 324 + char marker = stream->read_1bytes();
325 srs_verbose("amf0 any marker success"); 325 srs_verbose("amf0 any marker success");
326 326
327 // backward the 1byte marker. 327 // backward the 1byte marker.
@@ -393,7 +393,7 @@ int srs_amf0_read_object_eof(SrsStream* stream, SrsAmf0ObjectEOF*& value) @@ -393,7 +393,7 @@ int srs_amf0_read_object_eof(SrsStream* stream, SrsAmf0ObjectEOF*& value)
393 return ret; 393 return ret;
394 } 394 }
395 395
396 - char marker = stream->read_char(); 396 + char marker = stream->read_1bytes();
397 if (marker != RTMP_AMF0_ObjectEnd) { 397 if (marker != RTMP_AMF0_ObjectEnd) {
398 ret = ERROR_RTMP_AMF0_DECODE; 398 ret = ERROR_RTMP_AMF0_DECODE;
399 srs_error("amf0 check object eof marker failed. " 399 srs_error("amf0 check object eof marker failed. "
@@ -420,7 +420,7 @@ int srs_amf0_read_object(SrsStream* stream, SrsAmf0Object*& value) @@ -420,7 +420,7 @@ int srs_amf0_read_object(SrsStream* stream, SrsAmf0Object*& value)
420 return ret; 420 return ret;
421 } 421 }
422 422
423 - char marker = stream->read_char(); 423 + char marker = stream->read_1bytes();
424 if (marker != RTMP_AMF0_Object) { 424 if (marker != RTMP_AMF0_Object) {
425 ret = ERROR_RTMP_AMF0_DECODE; 425 ret = ERROR_RTMP_AMF0_DECODE;
426 srs_error("amf0 check object marker failed. " 426 srs_error("amf0 check object marker failed. "
@@ -86,6 +86,12 @@ int SrsClient::do_cycle() @@ -86,6 +86,12 @@ int SrsClient::do_cycle()
86 return ret; 86 return ret;
87 } 87 }
88 srs_verbose("set window acknowledgement size success"); 88 srs_verbose("set window acknowledgement size success");
  89 +
  90 + if ((ret = rtmp->set_peer_bandwidth(2.5 * 1000 * 1000, 2)) != ERROR_SUCCESS) {
  91 + srs_error("set peer bandwidth failed. ret=%d", ret);
  92 + return ret;
  93 + }
  94 + srs_verbose("set peer bandwidth success");
89 95
90 return ret; 96 return ret;
91 } 97 }
@@ -923,9 +923,16 @@ SrsPacket::~SrsPacket() @@ -923,9 +923,16 @@ SrsPacket::~SrsPacket()
923 { 923 {
924 } 924 }
925 925
926 -int SrsPacket::decode(SrsStream* /*stream*/) 926 +int SrsPacket::decode(SrsStream* stream)
927 { 927 {
928 int ret = ERROR_SUCCESS; 928 int ret = ERROR_SUCCESS;
  929 +
  930 + srs_assert(stream != NULL);
  931 +
  932 + ret = ERROR_SYSTEM_PACKET_INVALID;
  933 + srs_error("current packet is not support to decode. "
  934 + "paket=%s, ret=%d", get_class_name(), ret);
  935 +
929 return ret; 936 return ret;
930 } 937 }
931 938
@@ -988,7 +995,7 @@ int SrsPacket::encode_packet(SrsStream* stream) @@ -988,7 +995,7 @@ int SrsPacket::encode_packet(SrsStream* stream)
988 srs_assert(stream != NULL); 995 srs_assert(stream != NULL);
989 996
990 ret = ERROR_SYSTEM_PACKET_INVALID; 997 ret = ERROR_SYSTEM_PACKET_INVALID;
991 - srs_error("current packet is not support to sendout. " 998 + srs_error("current packet is not support to encode. "
992 "paket=%s, ret=%d", get_class_name(), ret); 999 "paket=%s, ret=%d", get_class_name(), ret);
993 1000
994 return ret; 1001 return ret;
@@ -1008,10 +1015,6 @@ SrsConnectAppPacket::~SrsConnectAppPacket() @@ -1008,10 +1015,6 @@ SrsConnectAppPacket::~SrsConnectAppPacket()
1008 int SrsConnectAppPacket::decode(SrsStream* stream) 1015 int SrsConnectAppPacket::decode(SrsStream* stream)
1009 { 1016 {
1010 int ret = ERROR_SUCCESS; 1017 int ret = ERROR_SUCCESS;
1011 -  
1012 - if ((ret = super::decode(stream)) != ERROR_SUCCESS) {  
1013 - return ret;  
1014 - }  
1015 1018
1016 if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) { 1019 if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) {
1017 srs_error("amf0 decode connect command_name failed. ret=%d", ret); 1020 srs_error("amf0 decode connect command_name failed. ret=%d", ret);
@@ -1059,26 +1062,6 @@ SrsSetWindowAckSizePacket::~SrsSetWindowAckSizePacket() @@ -1059,26 +1062,6 @@ SrsSetWindowAckSizePacket::~SrsSetWindowAckSizePacket()
1059 { 1062 {
1060 } 1063 }
1061 1064
1062 -int SrsSetWindowAckSizePacket::decode(SrsStream* stream)  
1063 -{  
1064 - int ret = ERROR_SUCCESS;  
1065 -  
1066 - if ((ret = super::decode(stream)) != ERROR_SUCCESS) {  
1067 - return ret;  
1068 - }  
1069 -  
1070 - if (!stream->require(4)) {  
1071 - ret = ERROR_RTMP_MESSAGE_DECODE;  
1072 - srs_error("set window ack size failed. ret=%d", ret);  
1073 - return ret;  
1074 - }  
1075 -  
1076 - ackowledgement_window_size = stream->read_4bytes();  
1077 - srs_info("decode window ack size success. ack_size=%d", ackowledgement_window_size);  
1078 -  
1079 - return ret;  
1080 -}  
1081 -  
1082 int SrsSetWindowAckSizePacket::get_perfer_cid() 1065 int SrsSetWindowAckSizePacket::get_perfer_cid()
1083 { 1066 {
1084 return RTMP_CID_ProtocolControl; 1067 return RTMP_CID_ProtocolControl;
@@ -1112,3 +1095,47 @@ int SrsSetWindowAckSizePacket::encode_packet(SrsStream* stream) @@ -1112,3 +1095,47 @@ int SrsSetWindowAckSizePacket::encode_packet(SrsStream* stream)
1112 return ret; 1095 return ret;
1113 } 1096 }
1114 1097
  1098 +SrsSetPeerBandwidthPacket::SrsSetPeerBandwidthPacket()
  1099 +{
  1100 + bandwidth = 0;
  1101 + type = 2;
  1102 +}
  1103 +
  1104 +SrsSetPeerBandwidthPacket::~SrsSetPeerBandwidthPacket()
  1105 +{
  1106 +}
  1107 +
  1108 +int SrsSetPeerBandwidthPacket::get_perfer_cid()
  1109 +{
  1110 + return RTMP_CID_ProtocolControl;
  1111 +}
  1112 +
  1113 +int SrsSetPeerBandwidthPacket::get_message_type()
  1114 +{
  1115 + return RTMP_MSG_SetPeerBandwidth;
  1116 +}
  1117 +
  1118 +int SrsSetPeerBandwidthPacket::get_size()
  1119 +{
  1120 + return 5;
  1121 +}
  1122 +
  1123 +int SrsSetPeerBandwidthPacket::encode_packet(SrsStream* stream)
  1124 +{
  1125 + int ret = ERROR_SUCCESS;
  1126 +
  1127 + if (!stream->require(5)) {
  1128 + ret = ERROR_RTMP_MESSAGE_ENCODE;
  1129 + srs_error("encode set bandwidth packet failed. ret=%d", ret);
  1130 + return ret;
  1131 + }
  1132 +
  1133 + stream->write_4bytes(bandwidth);
  1134 + stream->write_1bytes(type);
  1135 +
  1136 + srs_verbose("encode set bandwidth packet "
  1137 + "success. bandwidth=%d, type=%d", bandwidth, type);
  1138 +
  1139 + return ret;
  1140 +}
  1141 +
@@ -256,10 +256,22 @@ public: @@ -256,10 +256,22 @@ public:
256 */ 256 */
257 class SrsPacket 257 class SrsPacket
258 { 258 {
  259 +protected:
  260 + /**
  261 + * subpacket must override to provide the right class name.
  262 + */
  263 + virtual const char* get_class_name()
  264 + {
  265 + return CLASS_NAME_STRING(SrsPacket);
  266 + }
259 public: 267 public:
260 SrsPacket(); 268 SrsPacket();
261 virtual ~SrsPacket(); 269 virtual ~SrsPacket();
262 public: 270 public:
  271 + /**
  272 + * subpacket must override to decode packet from stream.
  273 + * @remark never invoke the super.decode, it always failed.
  274 + */
263 virtual int decode(SrsStream* stream); 275 virtual int decode(SrsStream* stream);
264 public: 276 public:
265 virtual int get_perfer_cid(); 277 virtual int get_perfer_cid();
@@ -283,16 +295,9 @@ protected: @@ -283,16 +295,9 @@ protected:
283 virtual int get_size(); 295 virtual int get_size();
284 /** 296 /**
285 * subpacket can override to encode the payload to stream. 297 * subpacket can override to encode the payload to stream.
  298 + * @remark never invoke the super.encode_packet, it always failed.
286 */ 299 */
287 virtual int encode_packet(SrsStream* stream); 300 virtual int encode_packet(SrsStream* stream);
288 -protected:  
289 - /**  
290 - * subpacket must override to provide the right class name.  
291 - */  
292 - virtual const char* get_class_name()  
293 - {  
294 - return CLASS_NAME_STRING(SrsPacket);  
295 - }  
296 }; 301 };
297 302
298 /** 303 /**
@@ -304,6 +309,11 @@ class SrsConnectAppPacket : public SrsPacket @@ -304,6 +309,11 @@ class SrsConnectAppPacket : public SrsPacket
304 { 309 {
305 private: 310 private:
306 typedef SrsPacket super; 311 typedef SrsPacket super;
  312 +protected:
  313 + virtual const char* get_class_name()
  314 + {
  315 + return CLASS_NAME_STRING(SrsConnectAppPacket);
  316 + }
307 public: 317 public:
308 std::string command_name; 318 std::string command_name;
309 double transaction_id; 319 double transaction_id;
@@ -313,11 +323,6 @@ public: @@ -313,11 +323,6 @@ public:
313 virtual ~SrsConnectAppPacket(); 323 virtual ~SrsConnectAppPacket();
314 public: 324 public:
315 virtual int decode(SrsStream* stream); 325 virtual int decode(SrsStream* stream);
316 -protected:  
317 - virtual const char* get_class_name()  
318 - {  
319 - return CLASS_NAME_STRING(SrsConnectAppPacket);  
320 - }  
321 }; 326 };
322 327
323 /** 328 /**
@@ -329,25 +334,52 @@ class SrsSetWindowAckSizePacket : public SrsPacket @@ -329,25 +334,52 @@ class SrsSetWindowAckSizePacket : public SrsPacket
329 { 334 {
330 private: 335 private:
331 typedef SrsPacket super; 336 typedef SrsPacket super;
  337 +protected:
  338 + virtual const char* get_class_name()
  339 + {
  340 + return CLASS_NAME_STRING(SrsSetWindowAckSizePacket);
  341 + }
332 public: 342 public:
333 int32_t ackowledgement_window_size; 343 int32_t ackowledgement_window_size;
334 public: 344 public:
335 SrsSetWindowAckSizePacket(); 345 SrsSetWindowAckSizePacket();
336 virtual ~SrsSetWindowAckSizePacket(); 346 virtual ~SrsSetWindowAckSizePacket();
337 public: 347 public:
338 - virtual int decode(SrsStream* stream);  
339 -public:  
340 virtual int get_perfer_cid(); 348 virtual int get_perfer_cid();
341 public: 349 public:
342 virtual int get_message_type(); 350 virtual int get_message_type();
343 protected: 351 protected:
344 virtual int get_size(); 352 virtual int get_size();
345 virtual int encode_packet(SrsStream* stream); 353 virtual int encode_packet(SrsStream* stream);
  354 +};
  355 +
  356 +/**
  357 +* 5.6. Set Peer Bandwidth (6)
  358 +* The client or the server sends this message to update the output
  359 +* bandwidth of the peer.
  360 +*/
  361 +class SrsSetPeerBandwidthPacket : public SrsPacket
  362 +{
  363 +private:
  364 + typedef SrsPacket super;
346 protected: 365 protected:
347 virtual const char* get_class_name() 366 virtual const char* get_class_name()
348 { 367 {
349 - return CLASS_NAME_STRING(SrsSetWindowAckSizePacket); 368 + return CLASS_NAME_STRING(SrsSetPeerBandwidthPacket);
350 } 369 }
  370 +public:
  371 + int32_t bandwidth;
  372 + int8_t type;
  373 +public:
  374 + SrsSetPeerBandwidthPacket();
  375 + virtual ~SrsSetPeerBandwidthPacket();
  376 +public:
  377 + virtual int get_perfer_cid();
  378 +public:
  379 + virtual int get_message_type();
  380 +protected:
  381 + virtual int get_size();
  382 + virtual int encode_packet(SrsStream* stream);
351 }; 383 };
352 384
353 /** 385 /**
@@ -184,3 +184,24 @@ int SrsRtmp::set_window_ack_size(int ack_size) @@ -184,3 +184,24 @@ int SrsRtmp::set_window_ack_size(int ack_size)
184 return ret; 184 return ret;
185 } 185 }
186 186
  187 +int SrsRtmp::set_peer_bandwidth(int bandwidth, int type)
  188 +{
  189 + int ret = ERROR_SUCCESS;
  190 +
  191 + SrsMessage* msg = new SrsMessage();
  192 + SrsSetPeerBandwidthPacket* pkt = new SrsSetPeerBandwidthPacket();
  193 +
  194 + pkt->bandwidth = bandwidth;
  195 + pkt->type = type;
  196 + msg->set_packet(pkt);
  197 +
  198 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  199 + srs_error("send set bandwidth message failed. ret=%d", ret);
  200 + return ret;
  201 + }
  202 + srs_info("send set bandwidth message "
  203 + "success. bandwidth=%d, type=%d", bandwidth, type);
  204 +
  205 + return ret;
  206 +}
  207 +
@@ -74,6 +74,11 @@ public: @@ -74,6 +74,11 @@ public:
74 virtual int handshake(); 74 virtual int handshake();
75 virtual int connect_app(SrsRequest* req); 75 virtual int connect_app(SrsRequest* req);
76 virtual int set_window_ack_size(int ack_size); 76 virtual int set_window_ack_size(int ack_size);
  77 + /**
  78 + * @type: The sender can mark this message hard (0), soft (1), or dynamic (2)
  79 + * using the Limit type field.
  80 + */
  81 + virtual int set_peer_bandwidth(int bandwidth, int type);
77 }; 82 };
78 83
79 #endif 84 #endif
@@ -78,11 +78,11 @@ void SrsStream::skip(int size) @@ -78,11 +78,11 @@ void SrsStream::skip(int size)
78 p += size; 78 p += size;
79 } 79 }
80 80
81 -char SrsStream::read_char() 81 +int8_t SrsStream::read_1bytes()
82 { 82 {
83 srs_assert(require(1)); 83 srs_assert(require(1));
84 84
85 - return *p++; 85 + return (int8_t)*p++;
86 } 86 }
87 87
88 int16_t SrsStream::read_2bytes() 88 int16_t SrsStream::read_2bytes()
@@ -152,3 +152,10 @@ void SrsStream::write_4bytes(int32_t value) @@ -152,3 +152,10 @@ void SrsStream::write_4bytes(int32_t value)
152 *p++ = pp[0]; 152 *p++ = pp[0];
153 } 153 }
154 154
  155 +void SrsStream::write_1bytes(int8_t value)
  156 +{
  157 + srs_assert(require(1));
  158 +
  159 + *p++ = value;
  160 +}
  161 +
@@ -74,7 +74,7 @@ public: @@ -74,7 +74,7 @@ public:
74 /** 74 /**
75 * get 1bytes char from stream. 75 * get 1bytes char from stream.
76 */ 76 */
77 - virtual char read_char(); 77 + virtual int8_t read_1bytes();
78 /** 78 /**
79 * get 2bytes int from stream. 79 * get 2bytes int from stream.
80 */ 80 */
@@ -96,6 +96,10 @@ public: @@ -96,6 +96,10 @@ public:
96 * write 4bytes int to stream. 96 * write 4bytes int to stream.
97 */ 97 */
98 virtual void write_4bytes(int32_t value); 98 virtual void write_4bytes(int32_t value);
  99 + /**
  100 + * write 1bytes char to stream.
  101 + */
  102 + virtual void write_1bytes(int8_t value);
99 }; 103 };
100 104
101 #endif 105 #endif
@@ -34,7 +34,7 @@ int main(int /*argc*/, char** /*argv*/){ @@ -34,7 +34,7 @@ int main(int /*argc*/, char** /*argv*/){
34 return ret; 34 return ret;
35 } 35 }
36 36
37 - if ((ret = server.listen(19350)) != ERROR_SUCCESS) { 37 + if ((ret = server.listen(1935)) != ERROR_SUCCESS) {
38 return ret; 38 return ret;
39 } 39 }
40 40