winlin

support send acknowledgement when recv message.

@@ -78,7 +78,7 @@ extern ILogContext* log_context; @@ -78,7 +78,7 @@ extern ILogContext* log_context;
78 #undef srs_verbose 78 #undef srs_verbose
79 #define srs_verbose(msg, ...) (void)0 79 #define srs_verbose(msg, ...) (void)0
80 #endif 80 #endif
81 -#if 0 81 +#if 1
82 #undef srs_info 82 #undef srs_info
83 #define srs_info(msg, ...) (void)0 83 #define srs_info(msg, ...) (void)0
84 #endif 84 #endif
@@ -249,6 +249,10 @@ messages. @@ -249,6 +249,10 @@ messages.
249 /**************************************************************************** 249 /****************************************************************************
250 ***************************************************************************** 250 *****************************************************************************
251 ****************************************************************************/ 251 ****************************************************************************/
  252 +SrsProtocol::AckWindowSize::AckWindowSize()
  253 +{
  254 + ack_window_size = acked_size = 0;
  255 +}
252 256
253 SrsProtocol::SrsProtocol(st_netfd_t client_stfd) 257 SrsProtocol::SrsProtocol(st_netfd_t client_stfd)
254 { 258 {
@@ -450,6 +454,21 @@ int SrsProtocol::on_recv_message(SrsCommonMessage* msg) @@ -450,6 +454,21 @@ int SrsProtocol::on_recv_message(SrsCommonMessage* msg)
450 int ret = ERROR_SUCCESS; 454 int ret = ERROR_SUCCESS;
451 455
452 srs_assert(msg != NULL); 456 srs_assert(msg != NULL);
  457 +
  458 + // acknowledgement
  459 + if (skt->get_recv_bytes() - in_ack_size.acked_size > in_ack_size.ack_window_size) {
  460 + SrsCommonMessage* ack = new SrsCommonMessage();
  461 + SrsAcknowledgementPacket* pkt = new SrsAcknowledgementPacket();
  462 +
  463 + in_ack_size.acked_size = pkt->sequence_number = skt->get_recv_bytes();
  464 + ack->set_packet(pkt, 0);
  465 +
  466 + if ((ret = send_message(ack)) != ERROR_SUCCESS) {
  467 + srs_error("send acknowledgement failed. ret=%d", ret);
  468 + return ret;
  469 + }
  470 + srs_verbose("send acknowledgement success.");
  471 + }
453 472
454 switch (msg->header.message_type) { 473 switch (msg->header.message_type) {
455 case RTMP_MSG_SetChunkSize: 474 case RTMP_MSG_SetChunkSize:
@@ -466,8 +485,13 @@ int SrsProtocol::on_recv_message(SrsCommonMessage* msg) @@ -466,8 +485,13 @@ int SrsProtocol::on_recv_message(SrsCommonMessage* msg)
466 case RTMP_MSG_WindowAcknowledgementSize: { 485 case RTMP_MSG_WindowAcknowledgementSize: {
467 SrsSetWindowAckSizePacket* pkt = dynamic_cast<SrsSetWindowAckSizePacket*>(msg->get_packet()); 486 SrsSetWindowAckSizePacket* pkt = dynamic_cast<SrsSetWindowAckSizePacket*>(msg->get_packet());
468 srs_assert(pkt != NULL); 487 srs_assert(pkt != NULL);
469 - // TODO: take effect.  
470 - srs_trace("set ack window size to %d", pkt->ackowledgement_window_size); 488 +
  489 + if (pkt->ackowledgement_window_size > 0) {
  490 + in_ack_size.ack_window_size = pkt->ackowledgement_window_size;
  491 + srs_trace("set ack window size to %d", pkt->ackowledgement_window_size);
  492 + } else {
  493 + srs_warn("ignored. set ack window size is %d", pkt->ackowledgement_window_size);
  494 + }
471 break; 495 break;
472 } 496 }
473 case RTMP_MSG_SetChunkSize: { 497 case RTMP_MSG_SetChunkSize: {
@@ -2222,6 +2246,48 @@ int SrsSetWindowAckSizePacket::encode_packet(SrsStream* stream) @@ -2222,6 +2246,48 @@ int SrsSetWindowAckSizePacket::encode_packet(SrsStream* stream)
2222 return ret; 2246 return ret;
2223 } 2247 }
2224 2248
  2249 +SrsAcknowledgementPacket::SrsAcknowledgementPacket()
  2250 +{
  2251 + sequence_number = 0;
  2252 +}
  2253 +
  2254 +SrsAcknowledgementPacket::~SrsAcknowledgementPacket()
  2255 +{
  2256 +}
  2257 +
  2258 +int SrsAcknowledgementPacket::get_perfer_cid()
  2259 +{
  2260 + return RTMP_CID_ProtocolControl;
  2261 +}
  2262 +
  2263 +int SrsAcknowledgementPacket::get_message_type()
  2264 +{
  2265 + return RTMP_MSG_Acknowledgement;
  2266 +}
  2267 +
  2268 +int SrsAcknowledgementPacket::get_size()
  2269 +{
  2270 + return 4;
  2271 +}
  2272 +
  2273 +int SrsAcknowledgementPacket::encode_packet(SrsStream* stream)
  2274 +{
  2275 + int ret = ERROR_SUCCESS;
  2276 +
  2277 + if (!stream->require(4)) {
  2278 + ret = ERROR_RTMP_MESSAGE_ENCODE;
  2279 + srs_error("encode acknowledgement packet failed. ret=%d", ret);
  2280 + return ret;
  2281 + }
  2282 +
  2283 + stream->write_4bytes(sequence_number);
  2284 +
  2285 + srs_verbose("encode acknowledgement packet "
  2286 + "success. sequence_number=%d", sequence_number);
  2287 +
  2288 + return ret;
  2289 +}
  2290 +
2225 SrsSetChunkSizePacket::SrsSetChunkSizePacket() 2291 SrsSetChunkSizePacket::SrsSetChunkSizePacket()
2226 { 2292 {
2227 chunk_size = RTMP_DEFAULT_CHUNK_SIZE; 2293 chunk_size = RTMP_DEFAULT_CHUNK_SIZE;
@@ -75,6 +75,14 @@ class ISrsMessage; @@ -75,6 +75,14 @@ class ISrsMessage;
75 */ 75 */
76 class SrsProtocol 76 class SrsProtocol
77 { 77 {
  78 +private:
  79 + struct AckWindowSize
  80 + {
  81 + int ack_window_size;
  82 + int64_t acked_size;
  83 +
  84 + AckWindowSize();
  85 + };
78 // peer in/out 86 // peer in/out
79 private: 87 private:
80 st_netfd_t stfd; 88 st_netfd_t stfd;
@@ -85,6 +93,7 @@ private: @@ -85,6 +93,7 @@ private:
85 std::map<int, SrsChunkStream*> chunk_streams; 93 std::map<int, SrsChunkStream*> chunk_streams;
86 SrsBuffer* buffer; 94 SrsBuffer* buffer;
87 int32_t in_chunk_size; 95 int32_t in_chunk_size;
  96 + AckWindowSize in_ack_size;
88 // peer out 97 // peer out
89 private: 98 private:
90 char out_header_fmt0[RTMP_MAX_FMT0_HEADER_SIZE]; 99 char out_header_fmt0[RTMP_MAX_FMT0_HEADER_SIZE];
@@ -850,6 +859,34 @@ protected: @@ -850,6 +859,34 @@ protected:
850 }; 859 };
851 860
852 /** 861 /**
  862 +* 5.3. Acknowledgement (3)
  863 +* The client or the server sends the acknowledgment to the peer after
  864 +* receiving bytes equal to the window size.
  865 +*/
  866 +class SrsAcknowledgementPacket : public SrsPacket
  867 +{
  868 +private:
  869 + typedef SrsPacket super;
  870 +protected:
  871 + virtual const char* get_class_name()
  872 + {
  873 + return CLASS_NAME_STRING(SrsAcknowledgementPacket);
  874 + }
  875 +public:
  876 + int32_t sequence_number;
  877 +public:
  878 + SrsAcknowledgementPacket();
  879 + virtual ~SrsAcknowledgementPacket();
  880 +public:
  881 + virtual int get_perfer_cid();
  882 +public:
  883 + virtual int get_message_type();
  884 +protected:
  885 + virtual int get_size();
  886 + virtual int encode_packet(SrsStream* stream);
  887 +};
  888 +
  889 +/**
853 * 7.1. Set Chunk Size 890 * 7.1. Set Chunk Size
854 * Protocol control message 1, Set Chunk Size, is used to notify the 891 * Protocol control message 1, Set Chunk Size, is used to notify the
855 * peer about the new maximum chunk size. 892 * peer about the new maximum chunk size.
@@ -28,8 +28,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -28,8 +28,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
28 SrsSocket::SrsSocket(st_netfd_t client_stfd) 28 SrsSocket::SrsSocket(st_netfd_t client_stfd)
29 { 29 {
30 stfd = client_stfd; 30 stfd = client_stfd;
31 - recv_timeout = ST_UTIME_NO_TIMEOUT;  
32 - send_timeout = ST_UTIME_NO_TIMEOUT; 31 + send_timeout = recv_timeout = ST_UTIME_NO_TIMEOUT;
  32 + recv_bytes = send_bytes = 0;
33 } 33 }
34 34
35 SrsSocket::~SrsSocket() 35 SrsSocket::~SrsSocket()
@@ -46,6 +46,16 @@ void SrsSocket::set_send_timeout(int timeout_ms) @@ -46,6 +46,16 @@ void SrsSocket::set_send_timeout(int timeout_ms)
46 send_timeout = timeout_ms * 1000; 46 send_timeout = timeout_ms * 1000;
47 } 47 }
48 48
  49 +int64_t SrsSocket::get_recv_bytes()
  50 +{
  51 + return recv_bytes;
  52 +}
  53 +
  54 +int64_t SrsSocket::get_send_bytes()
  55 +{
  56 + return send_bytes;
  57 +}
  58 +
49 int SrsSocket::read(const void* buf, size_t size, ssize_t* nread) 59 int SrsSocket::read(const void* buf, size_t size, ssize_t* nread)
50 { 60 {
51 int ret = ERROR_SUCCESS; 61 int ret = ERROR_SUCCESS;
@@ -63,8 +73,10 @@ int SrsSocket::read(const void* buf, size_t size, ssize_t* nread) @@ -63,8 +73,10 @@ int SrsSocket::read(const void* buf, size_t size, ssize_t* nread)
63 errno = ECONNRESET; 73 errno = ECONNRESET;
64 } 74 }
65 75
66 - ret = ERROR_SOCKET_READ; 76 + return ERROR_SOCKET_READ;
67 } 77 }
  78 +
  79 + recv_bytes += *nread;
68 80
69 return ret; 81 return ret;
70 } 82 }
@@ -86,9 +98,11 @@ int SrsSocket::read_fully(const void* buf, size_t size, ssize_t* nread) @@ -86,9 +98,11 @@ int SrsSocket::read_fully(const void* buf, size_t size, ssize_t* nread)
86 errno = ECONNRESET; 98 errno = ECONNRESET;
87 } 99 }
88 100
89 - ret = ERROR_SOCKET_READ_FULLY; 101 + return ERROR_SOCKET_READ_FULLY;
90 } 102 }
91 103
  104 + recv_bytes += *nread;
  105 +
92 return ret; 106 return ret;
93 } 107 }
94 108
@@ -103,8 +117,10 @@ int SrsSocket::write(const void* buf, size_t size, ssize_t* nwrite) @@ -103,8 +117,10 @@ int SrsSocket::write(const void* buf, size_t size, ssize_t* nwrite)
103 return ERROR_SOCKET_TIMEOUT; 117 return ERROR_SOCKET_TIMEOUT;
104 } 118 }
105 119
106 - ret = ERROR_SOCKET_WRITE; 120 + return ERROR_SOCKET_WRITE;
107 } 121 }
  122 +
  123 + send_bytes += *nwrite;
108 124
109 return ret; 125 return ret;
110 } 126 }
@@ -120,9 +136,11 @@ int SrsSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite) @@ -120,9 +136,11 @@ int SrsSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
120 return ERROR_SOCKET_TIMEOUT; 136 return ERROR_SOCKET_TIMEOUT;
121 } 137 }
122 138
123 - ret = ERROR_SOCKET_WRITE; 139 + return ERROR_SOCKET_WRITE;
124 } 140 }
125 141
  142 + send_bytes += *nwrite;
  143 +
126 return ret; 144 return ret;
127 } 145 }
128 146
@@ -41,6 +41,8 @@ class SrsSocket @@ -41,6 +41,8 @@ class SrsSocket
41 private: 41 private:
42 int64_t recv_timeout; 42 int64_t recv_timeout;
43 int64_t send_timeout; 43 int64_t send_timeout;
  44 + int64_t recv_bytes;
  45 + int64_t send_bytes;
44 st_netfd_t stfd; 46 st_netfd_t stfd;
45 public: 47 public:
46 SrsSocket(st_netfd_t client_stfd); 48 SrsSocket(st_netfd_t client_stfd);
@@ -48,6 +50,9 @@ public: @@ -48,6 +50,9 @@ public:
48 public: 50 public:
49 virtual void set_recv_timeout(int timeout_ms); 51 virtual void set_recv_timeout(int timeout_ms);
50 virtual void set_send_timeout(int timeout_ms); 52 virtual void set_send_timeout(int timeout_ms);
  53 + virtual int64_t get_recv_bytes();
  54 + virtual int64_t get_send_bytes();
  55 +public:
51 virtual int read(const void* buf, size_t size, ssize_t* nread); 56 virtual int read(const void* buf, size_t size, ssize_t* nread);
52 virtual int read_fully(const void* buf, size_t size, ssize_t* nread); 57 virtual int read_fully(const void* buf, size_t size, ssize_t* nread);
53 virtual int write(const void* buf, size_t size, ssize_t* nwrite); 58 virtual int write(const void* buf, size_t size, ssize_t* nwrite);