winlin

implements basic edge(play and publish), with bug

@@ -166,7 +166,7 @@ int SrsEdgeIngester::ingest() @@ -166,7 +166,7 @@ int SrsEdgeIngester::ingest()
166 // read from client. 166 // read from client.
167 SrsCommonMessage* msg = NULL; 167 SrsCommonMessage* msg = NULL;
168 if ((ret = client->recv_message(&msg)) != ERROR_SUCCESS) { 168 if ((ret = client->recv_message(&msg)) != ERROR_SUCCESS) {
169 - srs_error("recv origin server message failed. ret=%d", ret); 169 + srs_error("ingest recv origin server message failed. ret=%d", ret);
170 return ret; 170 return ret;
171 } 171 }
172 srs_verbose("edge loop recv message. ret=%d", ret); 172 srs_verbose("edge loop recv message. ret=%d", ret);
@@ -312,10 +312,13 @@ SrsEdgeProxyContext::SrsEdgeProxyContext() @@ -312,10 +312,13 @@ SrsEdgeProxyContext::SrsEdgeProxyContext()
312 edge_stream_id = 0; 312 edge_stream_id = 0;
313 edge_io = NULL; 313 edge_io = NULL;
314 edge_rtmp = NULL; 314 edge_rtmp = NULL;
  315 + edge_stfd = NULL;
  316 + edge_got_message = false;
315 317
316 origin_stream_id = 0; 318 origin_stream_id = 0;
317 origin_io = NULL; 319 origin_io = NULL;
318 origin_rtmp = NULL; 320 origin_rtmp = NULL;
  321 + origin_stfd = NULL;
319 } 322 }
320 323
321 SrsEdgeProxyContext::~SrsEdgeProxyContext() 324 SrsEdgeProxyContext::~SrsEdgeProxyContext()
@@ -400,11 +403,22 @@ int SrsEdgeForwarder::proxy(SrsEdgeProxyContext* context) @@ -400,11 +403,22 @@ int SrsEdgeForwarder::proxy(SrsEdgeProxyContext* context)
400 context->origin_io = io; 403 context->origin_io = io;
401 context->origin_rtmp = client; 404 context->origin_rtmp = client;
402 context->origin_stream_id = stream_id; 405 context->origin_stream_id = stream_id;
  406 + context->origin_stfd = stfd;
403 407
404 - client->set_recv_timeout(SRS_PULSE_TIMEOUT_US); 408 + context->origin_rtmp->set_recv_timeout(SRS_RECV_TIMEOUT_US);
  409 + context->edge_rtmp->set_recv_timeout(SRS_RECV_TIMEOUT_US);
  410 +
  411 + context->edge_got_message = false;
405 412
406 SrsPithyPrint pithy_print(SRS_STAGE_EDGE); 413 SrsPithyPrint pithy_print(SRS_STAGE_EDGE);
407 414
  415 + pollfd fds[2];
  416 + fds[0].fd = st_netfd_fileno(context->edge_stfd);
  417 + fds[0].events = POLLIN;
  418 +
  419 + fds[1].fd = st_netfd_fileno(context->origin_stfd);
  420 + fds[1].events = POLLIN;
  421 +
408 while (true) { 422 while (true) {
409 // switch to other st-threads. 423 // switch to other st-threads.
410 st_usleep(0); 424 st_usleep(0);
@@ -417,59 +431,102 @@ int SrsEdgeForwarder::proxy(SrsEdgeProxyContext* context) @@ -417,59 +431,102 @@ int SrsEdgeForwarder::proxy(SrsEdgeProxyContext* context)
417 pithy_print.age(), client->get_send_bytes(), client->get_recv_bytes(), client->get_send_kbps(), client->get_recv_kbps()); 431 pithy_print.age(), client->get_send_bytes(), client->get_recv_bytes(), client->get_send_kbps(), client->get_recv_kbps());
418 } 432 }
419 433
420 - if ((ret = proxy_message(context)) != ERROR_SUCCESS) { 434 + fds[0].revents = 0;
  435 + fds[1].revents = 0;
  436 +
  437 + // Upon successful completion, a non-negative value is returned.
  438 + // A positive value indicates the total number of OS file descriptors in pds that have events.
  439 + // A value of 0 indicates that the call timed out.
  440 + // Upon failure, a value of -1 is returned and errno is set to indicate the error
  441 + if(st_poll(fds, 2, ST_UTIME_NO_TIMEOUT) <= 0){
  442 + ret = ERROR_RTMP_EDGE_PROXY_PULL;
  443 + srs_error("edge wait for st_poll error. ret=%d", ret);
  444 + return ret;
  445 + }
  446 +
  447 + // edge active
  448 + if(fds[0].revents & POLLIN){
  449 + if((ret = proxy_edge_message(context)) != ERROR_SUCCESS){
  450 + return ret;
  451 + }
  452 + }
  453 +
  454 + // origin active
  455 + if(fds[1].revents & POLLIN){
  456 + if((ret = proxy_origin_message(context)) != ERROR_SUCCESS){
421 return ret; 457 return ret;
422 } 458 }
423 } 459 }
  460 + }
424 461
425 return ret; 462 return ret;
426 } 463 }
427 464
428 -int SrsEdgeForwarder::proxy_message(SrsEdgeProxyContext* context) 465 +int SrsEdgeForwarder::proxy_origin_message(SrsEdgeProxyContext* context)
429 { 466 {
430 int ret = ERROR_SUCCESS; 467 int ret = ERROR_SUCCESS;
431 468
432 SrsCommonMessage* msg = NULL; 469 SrsCommonMessage* msg = NULL;
433 470
434 - // proxy origin message to client  
435 - msg = NULL; 471 + // process origin message.
436 ret = context->origin_rtmp->recv_message(&msg); 472 ret = context->origin_rtmp->recv_message(&msg);
437 if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) { 473 if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) {
438 - srs_error("recv origin server message failed. ret=%d", ret); 474 + srs_error("forward recv origin server message failed. ret=%d", ret);
439 return ret; 475 return ret;
440 } 476 }
441 477
442 - if (msg) {  
443 - if (msg->size <= 0) { 478 + srs_assert(msg);
  479 +
  480 + if (msg->size <= 0
  481 + || !context->edge_got_message
  482 + || msg->header.is_set_chunk_size()
  483 + || msg->header.is_window_ackledgement_size()
  484 + || msg->header.is_ackledgement()
  485 + ) {
444 srs_freep(msg); 486 srs_freep(msg);
445 - } else { 487 + return ret;
  488 + }
  489 +
446 msg->header.stream_id = context->edge_stream_id; 490 msg->header.stream_id = context->edge_stream_id;
447 if ((ret = context->edge_rtmp->send_message(msg)) != ERROR_SUCCESS) { 491 if ((ret = context->edge_rtmp->send_message(msg)) != ERROR_SUCCESS) {
448 srs_error("send origin message to client failed. ret=%d", ret); 492 srs_error("send origin message to client failed. ret=%d", ret);
449 return ret; 493 return ret;
450 } 494 }
451 - }  
452 - } 495 +
  496 + return ret;
  497 +}
  498 +
  499 +int SrsEdgeForwarder::proxy_edge_message(SrsEdgeProxyContext* context)
  500 +{
  501 + int ret = ERROR_SUCCESS;
  502 +
  503 + SrsCommonMessage* msg = NULL;
453 504
454 // proxy client message to origin 505 // proxy client message to origin
455 - msg = NULL;  
456 ret = context->edge_rtmp->recv_message(&msg); 506 ret = context->edge_rtmp->recv_message(&msg);
457 if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) { 507 if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) {
458 srs_error("recv client message failed. ret=%d", ret); 508 srs_error("recv client message failed. ret=%d", ret);
459 return ret; 509 return ret;
460 } 510 }
461 511
462 - if (msg) {  
463 - if (msg->size <= 0) { 512 + srs_assert(msg);
  513 +
  514 + context->edge_got_message = true;
  515 +
  516 + if (msg->size <= 0
  517 + || msg->header.is_set_chunk_size()
  518 + || msg->header.is_window_ackledgement_size()
  519 + || msg->header.is_ackledgement()
  520 + ) {
464 srs_freep(msg); 521 srs_freep(msg);
465 - } else { 522 + return ret;
  523 + }
  524 +
466 msg->header.stream_id = context->origin_stream_id; 525 msg->header.stream_id = context->origin_stream_id;
467 if ((ret = context->origin_rtmp->send_message(msg)) != ERROR_SUCCESS) { 526 if ((ret = context->origin_rtmp->send_message(msg)) != ERROR_SUCCESS) {
468 srs_error("send client message to origin failed. ret=%d", ret); 527 srs_error("send client message to origin failed. ret=%d", ret);
469 return ret; 528 return ret;
470 } 529 }
471 - }  
472 - }  
473 530
474 return ret; 531 return ret;
475 } 532 }
@@ -105,10 +105,13 @@ class SrsEdgeProxyContext @@ -105,10 +105,13 @@ class SrsEdgeProxyContext
105 { 105 {
106 public: 106 public:
107 int edge_stream_id; 107 int edge_stream_id;
  108 + st_netfd_t edge_stfd;
108 ISrsProtocolReaderWriter* edge_io; 109 ISrsProtocolReaderWriter* edge_io;
109 SrsRtmpServer* edge_rtmp; 110 SrsRtmpServer* edge_rtmp;
  111 + bool edge_got_message;
110 public: 112 public:
111 int origin_stream_id; 113 int origin_stream_id;
  114 + st_netfd_t origin_stfd;
112 ISrsProtocolReaderWriter* origin_io; 115 ISrsProtocolReaderWriter* origin_io;
113 SrsRtmpClient* origin_rtmp; 116 SrsRtmpClient* origin_rtmp;
114 public: 117 public:
@@ -141,7 +144,8 @@ public: @@ -141,7 +144,8 @@ public:
141 public: 144 public:
142 virtual int proxy(SrsEdgeProxyContext* context); 145 virtual int proxy(SrsEdgeProxyContext* context);
143 private: 146 private:
144 - virtual int proxy_message(SrsEdgeProxyContext* context); 147 + virtual int proxy_origin_message(SrsEdgeProxyContext* context);
  148 + virtual int proxy_edge_message(SrsEdgeProxyContext* context);
145 virtual void close_underlayer_socket(); 149 virtual void close_underlayer_socket();
146 virtual int connect_server(); 150 virtual int connect_server();
147 }; 151 };
@@ -277,7 +277,11 @@ int SrsRtmpConn::stream_service_cycle() @@ -277,7 +277,11 @@ int SrsRtmpConn::stream_service_cycle()
277 } 277 }
278 srs_assert(source != NULL); 278 srs_assert(source != NULL);
279 279
280 - // check publish available. 280 + bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
  281 +
  282 + // check publish available
  283 + // for edge, never check it, for edge use proxy mode.
  284 + if (!vhost_is_edge) {
281 if (type != SrsRtmpConnPlay && !source->can_publish()) { 285 if (type != SrsRtmpConnPlay && !source->can_publish()) {
282 ret = ERROR_SYSTEM_STREAM_BUSY; 286 ret = ERROR_SYSTEM_STREAM_BUSY;
283 srs_warn("stream %s is already publishing. ret=%d", 287 srs_warn("stream %s is already publishing. ret=%d",
@@ -286,8 +290,8 @@ int SrsRtmpConn::stream_service_cycle() @@ -286,8 +290,8 @@ int SrsRtmpConn::stream_service_cycle()
286 st_usleep(SRS_STREAM_BUSY_SLEEP_US); 290 st_usleep(SRS_STREAM_BUSY_SLEEP_US);
287 return ret; 291 return ret;
288 } 292 }
  293 + }
289 294
290 - bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);  
291 bool enabled_cache = _srs_config->get_gop_cache(req->vhost); 295 bool enabled_cache = _srs_config->get_gop_cache(req->vhost);
292 srs_trace("source found, url=%s, enabled_cache=%d, edge=%d", 296 srs_trace("source found, url=%s, enabled_cache=%d, edge=%d",
293 req->get_stream_url().c_str(), enabled_cache, vhost_is_edge); 297 req->get_stream_url().c_str(), enabled_cache, vhost_is_edge);
@@ -338,6 +342,7 @@ int SrsRtmpConn::stream_service_cycle() @@ -338,6 +342,7 @@ int SrsRtmpConn::stream_service_cycle()
338 context.edge_io = skt; 342 context.edge_io = skt;
339 context.edge_stream_id = res->stream_id; 343 context.edge_stream_id = res->stream_id;
340 context.edge_rtmp = rtmp; 344 context.edge_rtmp = rtmp;
  345 + context.edge_stfd = stfd;
341 if (vhost_is_edge) { 346 if (vhost_is_edge) {
342 return source->on_edge_proxy_publish(&context); 347 return source->on_edge_proxy_publish(&context);
343 } 348 }
@@ -371,6 +376,7 @@ int SrsRtmpConn::stream_service_cycle() @@ -371,6 +376,7 @@ int SrsRtmpConn::stream_service_cycle()
371 context.edge_io = skt; 376 context.edge_io = skt;
372 context.edge_stream_id = res->stream_id; 377 context.edge_stream_id = res->stream_id;
373 context.edge_rtmp = rtmp; 378 context.edge_rtmp = rtmp;
  379 + context.edge_stfd = stfd;
374 if (vhost_is_edge) { 380 if (vhost_is_edge) {
375 return source->on_edge_proxy_publish(&context); 381 return source->on_edge_proxy_publish(&context);
376 } 382 }
@@ -81,6 +81,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -81,6 +81,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
81 #define ERROR_RTMP_EDGE_PLAY_STATE 320 81 #define ERROR_RTMP_EDGE_PLAY_STATE 320
82 // invalid state for client to publish edge stream. 82 // invalid state for client to publish edge stream.
83 #define ERROR_RTMP_EDGE_PUBLISH_STATE 321 83 #define ERROR_RTMP_EDGE_PUBLISH_STATE 321
  84 +#define ERROR_RTMP_EDGE_PROXY_PULL 322
84 85
85 #define ERROR_SYSTEM_STREAM_INIT 400 86 #define ERROR_SYSTEM_STREAM_INIT 400
86 #define ERROR_SYSTEM_PACKET_INVALID 401 87 #define ERROR_SYSTEM_PACKET_INVALID 401
@@ -1202,6 +1202,11 @@ bool SrsMessageHeader::is_window_ackledgement_size() @@ -1202,6 +1202,11 @@ bool SrsMessageHeader::is_window_ackledgement_size()
1202 return message_type == RTMP_MSG_WindowAcknowledgementSize; 1202 return message_type == RTMP_MSG_WindowAcknowledgementSize;
1203 } 1203 }
1204 1204
  1205 +bool SrsMessageHeader::is_ackledgement()
  1206 +{
  1207 + return message_type == RTMP_MSG_Acknowledgement;
  1208 +}
  1209 +
1205 bool SrsMessageHeader::is_set_chunk_size() 1210 bool SrsMessageHeader::is_set_chunk_size()
1206 { 1211 {
1207 return message_type == RTMP_MSG_SetChunkSize; 1212 return message_type == RTMP_MSG_SetChunkSize;
@@ -236,6 +236,7 @@ struct SrsMessageHeader @@ -236,6 +236,7 @@ struct SrsMessageHeader
236 bool is_amf3_command(); 236 bool is_amf3_command();
237 bool is_amf3_data(); 237 bool is_amf3_data();
238 bool is_window_ackledgement_size(); 238 bool is_window_ackledgement_size();
  239 + bool is_ackledgement();
239 bool is_set_chunk_size(); 240 bool is_set_chunk_size();
240 bool is_user_control_message(); 241 bool is_user_control_message();
241 242