winlin

srs-librtmp: implements the simple socket stream.

@@ -30,10 +30,21 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -30,10 +30,21 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
30 #include <sys/socket.h> 30 #include <sys/socket.h>
31 #include <netinet/in.h> 31 #include <netinet/in.h>
32 #include <arpa/inet.h> 32 #include <arpa/inet.h>
  33 +#include <errno.h>
  34 +#include <sys/uio.h>
  35 +
  36 +#ifndef ST_UTIME_NO_TIMEOUT
  37 + #define ST_UTIME_NO_TIMEOUT -1
  38 +#endif
33 39
34 SimpleSocketStream::SimpleSocketStream() 40 SimpleSocketStream::SimpleSocketStream()
35 { 41 {
36 fd = -1; 42 fd = -1;
  43 + send_timeout = recv_timeout = ST_UTIME_NO_TIMEOUT;
  44 + recv_bytes = send_bytes = 0;
  45 +
  46 + srs_update_system_time_ms();
  47 + start_time_ms = srs_get_system_time_ms();
37 } 48 }
38 49
39 SimpleSocketStream::~SimpleSocketStream() 50 SimpleSocketStream::~SimpleSocketStream()
@@ -71,70 +82,149 @@ int SimpleSocketStream::connect(const char* server_ip, int port) @@ -71,70 +82,149 @@ int SimpleSocketStream::connect(const char* server_ip, int port)
71 int SimpleSocketStream::read(const void* buf, size_t size, ssize_t* nread) 82 int SimpleSocketStream::read(const void* buf, size_t size, ssize_t* nread)
72 { 83 {
73 int ret = ERROR_SUCCESS; 84 int ret = ERROR_SUCCESS;
  85 +
  86 + *nread = ::recv(fd, (void*)buf, size, 0);
  87 +
  88 + // On success a non-negative integer indicating the number of bytes actually read is returned
  89 + // (a value of 0 means the network connection is closed or end of file is reached).
  90 + if (*nread <= 0) {
  91 + if (errno == ETIME) {
  92 + return ERROR_SOCKET_TIMEOUT;
  93 + }
  94 +
  95 + if (*nread == 0) {
  96 + errno = ECONNRESET;
  97 + }
  98 +
  99 + return ERROR_SOCKET_READ;
  100 + }
  101 +
  102 + recv_bytes += *nread;
  103 +
74 return ret; 104 return ret;
75 } 105 }
76 106
77 // ISrsProtocolReader 107 // ISrsProtocolReader
78 void SimpleSocketStream::set_recv_timeout(int64_t timeout_us) 108 void SimpleSocketStream::set_recv_timeout(int64_t timeout_us)
79 { 109 {
  110 + recv_timeout = timeout_us;
80 } 111 }
81 112
82 int64_t SimpleSocketStream::get_recv_timeout() 113 int64_t SimpleSocketStream::get_recv_timeout()
83 { 114 {
84 - return -1; 115 + return recv_timeout;
85 } 116 }
86 117
87 int64_t SimpleSocketStream::get_recv_bytes() 118 int64_t SimpleSocketStream::get_recv_bytes()
88 { 119 {
89 - return 0; 120 + return recv_bytes;
90 } 121 }
91 122
92 int SimpleSocketStream::get_recv_kbps() 123 int SimpleSocketStream::get_recv_kbps()
93 { 124 {
94 - return 0; 125 + srs_update_system_time_ms();
  126 + int64_t diff_ms = srs_get_system_time_ms() - start_time_ms;
  127 +
  128 + if (diff_ms <= 0) {
  129 + return 0;
  130 + }
  131 +
  132 + return recv_bytes * 8 / diff_ms;
95 } 133 }
96 134
97 // ISrsProtocolWriter 135 // ISrsProtocolWriter
98 void SimpleSocketStream::set_send_timeout(int64_t timeout_us) 136 void SimpleSocketStream::set_send_timeout(int64_t timeout_us)
99 { 137 {
  138 + send_timeout = timeout_us;
100 } 139 }
101 140
102 int64_t SimpleSocketStream::get_send_timeout() 141 int64_t SimpleSocketStream::get_send_timeout()
103 { 142 {
104 - return -1; 143 + return send_timeout;
105 } 144 }
106 145
107 int64_t SimpleSocketStream::get_send_bytes() 146 int64_t SimpleSocketStream::get_send_bytes()
108 { 147 {
109 - return 0; 148 + return send_bytes;
110 } 149 }
111 150
112 int SimpleSocketStream::get_send_kbps() 151 int SimpleSocketStream::get_send_kbps()
113 { 152 {
114 - return 0; 153 + srs_update_system_time_ms();
  154 + int64_t diff_ms = srs_get_system_time_ms() - start_time_ms;
  155 +
  156 + if (diff_ms <= 0) {
  157 + return 0;
  158 + }
  159 +
  160 + return send_bytes * 8 / diff_ms;
115 } 161 }
116 162
117 int SimpleSocketStream::writev(const iovec *iov, int iov_size, ssize_t* nwrite) 163 int SimpleSocketStream::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
118 { 164 {
119 int ret = ERROR_SUCCESS; 165 int ret = ERROR_SUCCESS;
  166 +
  167 + *nwrite = ::writev(fd, iov, iov_size);
  168 +
  169 + if (*nwrite <= 0) {
  170 + if (errno == ETIME) {
  171 + return ERROR_SOCKET_TIMEOUT;
  172 + }
  173 +
  174 + return ERROR_SOCKET_WRITE;
  175 + }
  176 +
  177 + send_bytes += *nwrite;
  178 +
120 return ret; 179 return ret;
121 } 180 }
122 181
123 // ISrsProtocolReaderWriter 182 // ISrsProtocolReaderWriter
124 bool SimpleSocketStream::is_never_timeout(int64_t timeout_us) 183 bool SimpleSocketStream::is_never_timeout(int64_t timeout_us)
125 { 184 {
126 - return true; 185 + return timeout_us == (int64_t)ST_UTIME_NO_TIMEOUT;
127 } 186 }
128 187
129 int SimpleSocketStream::read_fully(const void* buf, size_t size, ssize_t* nread) 188 int SimpleSocketStream::read_fully(const void* buf, size_t size, ssize_t* nread)
130 { 189 {
131 int ret = ERROR_SUCCESS; 190 int ret = ERROR_SUCCESS;
  191 +
  192 + size_t left = size;
  193 + *nread = 0;
  194 +
  195 + while (left > 0) {
  196 + char* this_buf = (char*)buf + *nread;
  197 + ssize_t this_nread;
  198 +
  199 + if ((ret = this->read(this_buf, left, &this_nread)) != ERROR_SUCCESS) {
  200 + return ret;
  201 + }
  202 +
  203 + *nread += this_nread;
  204 + left -= this_nread;
  205 + }
  206 +
  207 + recv_bytes += *nread;
  208 +
132 return ret; 209 return ret;
133 } 210 }
134 211
135 int SimpleSocketStream::write(const void* buf, size_t size, ssize_t* nwrite) 212 int SimpleSocketStream::write(const void* buf, size_t size, ssize_t* nwrite)
136 { 213 {
137 int ret = ERROR_SUCCESS; 214 int ret = ERROR_SUCCESS;
  215 +
  216 + *nwrite = ::send(fd, (void*)buf, size, 0);
  217 +
  218 + if (*nwrite <= 0) {
  219 + if (errno == ETIME) {
  220 + return ERROR_SOCKET_TIMEOUT;
  221 + }
  222 +
  223 + return ERROR_SOCKET_WRITE;
  224 + }
  225 +
  226 + send_bytes += *nwrite;
  227 +
138 return ret; 228 return ret;
139 } 229 }
140 230
@@ -39,6 +39,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -39,6 +39,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
39 class SimpleSocketStream : public ISrsProtocolReaderWriter 39 class SimpleSocketStream : public ISrsProtocolReaderWriter
40 { 40 {
41 private: 41 private:
  42 + int64_t start_time_ms;
  43 + int64_t recv_timeout;
  44 + int64_t send_timeout;
  45 + int64_t recv_bytes;
  46 + int64_t send_bytes;
42 int fd; 47 int fd;
43 public: 48 public:
44 SimpleSocketStream(); 49 SimpleSocketStream();