winlin

for #250, the mpegts over udp stream caster framework.

@@ -51,6 +51,12 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -51,6 +51,12 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
51 // nginx also set to 512 51 // nginx also set to 512
52 #define SERVER_LISTEN_BACKLOG 512 52 #define SERVER_LISTEN_BACKLOG 512
53 53
  54 +// sleep in ms for udp recv packet.
  55 +#define SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS 0
  56 +
  57 +// set the max packet size.
  58 +#define SRS_UDP_MAX_PACKET_SIZE 65535
  59 +
54 // system interval in ms, 60 // system interval in ms,
55 // all resolution times should be times togother, 61 // all resolution times should be times togother,
56 // for example, system-interval is x=1s(1000ms), 62 // for example, system-interval is x=1s(1000ms),
@@ -222,6 +228,8 @@ int SrsListener::cycle() @@ -222,6 +228,8 @@ int SrsListener::cycle()
222 228
223 SrsUdpListener::SrsUdpListener(SrsServer* server, SrsListenerType type) : SrsListener(server, type) 229 SrsUdpListener::SrsUdpListener(SrsServer* server, SrsListenerType type) : SrsListener(server, type)
224 { 230 {
  231 + nb_buf = SRS_UDP_MAX_PACKET_SIZE;
  232 + buf = new char[nb_buf];
225 } 233 }
226 234
227 SrsUdpListener::~SrsUdpListener() 235 SrsUdpListener::~SrsUdpListener()
@@ -294,6 +302,27 @@ int SrsUdpListener::cycle() @@ -294,6 +302,27 @@ int SrsUdpListener::cycle()
294 // we just assert here for unknown stream caster. 302 // we just assert here for unknown stream caster.
295 srs_assert(_type == SrsListenerMpegTsOverUdp); 303 srs_assert(_type == SrsListenerMpegTsOverUdp);
296 304
  305 + for (;;) {
  306 + // TODO: FIXME: support ipv6, @see man 7 ipv6
  307 + sockaddr_in from;
  308 + int nb_from = sizeof(sockaddr_in);
  309 + int nread = 0;
  310 +
  311 + if ((nread = st_recvfrom(stfd, buf, nb_buf, (sockaddr*)&from, &nb_from, ST_UTIME_NO_TIMEOUT)) <= 0) {
  312 + srs_warn("ignore recv udp packet failed, nread=%d", nread);
  313 + continue;
  314 + }
  315 +
  316 + if ((ret = _server->on_udp_packet(_type, &from, buf, nread)) != ERROR_SUCCESS) {
  317 + srs_warn("handle udp packet failed. ret=%d", ret);
  318 + continue;
  319 + }
  320 +
  321 + if (SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS > 0) {
  322 + st_usleep(SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS * 1000);
  323 + }
  324 + }
  325 +
297 // TODO: FIXME: recv udp packet. 326 // TODO: FIXME: recv udp packet.
298 st_sleep(1); 327 st_sleep(1);
299 328
@@ -1112,6 +1141,19 @@ int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd) @@ -1112,6 +1141,19 @@ int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd)
1112 return ret; 1141 return ret;
1113 } 1142 }
1114 1143
  1144 +int SrsServer::on_udp_packet(SrsListenerType type, sockaddr_in* from, char* buf, int nb_buf)
  1145 +{
  1146 + int ret = ERROR_SUCCESS;
  1147 +
  1148 + std::string peer_ip = inet_ntoa(from->sin_addr);
  1149 + int peer_port = ntohs(from->sin_port);
  1150 +
  1151 + // TODO: FIXME: implements it.
  1152 + srs_warn("udp: drop %s:%d packet %d bytes", peer_ip.c_str(), peer_port, nb_buf);
  1153 +
  1154 + return ret;
  1155 +}
  1156 +
1115 int SrsServer::on_reload_listen() 1157 int SrsServer::on_reload_listen()
1116 { 1158 {
1117 return listen(); 1159 return listen();
@@ -44,6 +44,7 @@ class SrsHttpServer; @@ -44,6 +44,7 @@ class SrsHttpServer;
44 class SrsIngester; 44 class SrsIngester;
45 class SrsHttpHeartbeat; 45 class SrsHttpHeartbeat;
46 class SrsKbps; 46 class SrsKbps;
  47 +class sockaddr_in;
47 48
48 // listener type for server to identify the connection, 49 // listener type for server to identify the connection,
49 // that is, use different type to process the connection. 50 // that is, use different type to process the connection.
@@ -88,6 +89,9 @@ public: @@ -88,6 +89,9 @@ public:
88 */ 89 */
89 class SrsUdpListener : public SrsListener 90 class SrsUdpListener : public SrsListener
90 { 91 {
  92 +private:
  93 + char* buf;
  94 + int nb_buf;
91 public: 95 public:
92 SrsUdpListener(SrsServer* server, SrsListenerType type); 96 SrsUdpListener(SrsServer* server, SrsListenerType type);
93 virtual ~SrsUdpListener(); 97 virtual ~SrsUdpListener();
@@ -252,6 +256,16 @@ public: @@ -252,6 +256,16 @@ public:
252 * @param client_stfd, the client fd in st boxed, the underlayer fd. 256 * @param client_stfd, the client fd in st boxed, the underlayer fd.
253 */ 257 */
254 virtual int accept_client(SrsListenerType type, st_netfd_t client_stfd); 258 virtual int accept_client(SrsListenerType type, st_netfd_t client_stfd);
  259 + /**
  260 + * when udp listener got a udp packet, notice server to process it.
  261 + * @param type, the client type, used to create concrete connection,
  262 + * for instance RTMP connection to serve client.
  263 + * @param from, the udp packet from address.
  264 + * @param buf, the udp packet bytes, user should copy if need to use.
  265 + * @param nb_buf, the size of udp packet bytes.
  266 + * @remark user should never use the buf, for it's a shared memory bytes.
  267 + */
  268 + virtual int on_udp_packet(SrsListenerType type, sockaddr_in* from, char* buf, int nb_buf);
255 // interface ISrsThreadHandler. 269 // interface ISrsThreadHandler.
256 public: 270 public:
257 virtual int on_reload_listen(); 271 virtual int on_reload_listen();