winlin

refine mpegts code, use simple rtmp client

@@ -214,8 +214,13 @@ int SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec) @@ -214,8 +214,13 @@ int SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec)
214 return ret; 214 return ret;
215 } 215 }
216 216
  217 + SrsSharedPtrMessage* msg = NULL;
  218 + if ((ret = sdk->rtmp_create_msg(type, time, data, size, &msg)) != ERROR_SUCCESS) {
  219 + return ret;
  220 + }
  221 +
217 // TODO: FIXME: for post flv, reconnect when error. 222 // TODO: FIXME: for post flv, reconnect when error.
218 - if ((ret = sdk->rtmp_write_packet(type, time, data, size)) != ERROR_SUCCESS) { 223 + if ((ret = sdk->send_and_free_message(msg)) != ERROR_SUCCESS) {
219 if (!srs_is_client_gracefully_close(ret)) { 224 if (!srs_is_client_gracefully_close(ret)) {
220 srs_error("flv: proxy rtmp packet failed. ret=%d", ret); 225 srs_error("flv: proxy rtmp packet failed. ret=%d", ret);
221 } 226 }
@@ -49,6 +49,7 @@ using namespace std; @@ -49,6 +49,7 @@ using namespace std;
49 #include <srs_protocol_amf0.hpp> 49 #include <srs_protocol_amf0.hpp>
50 #include <srs_raw_avc.hpp> 50 #include <srs_raw_avc.hpp>
51 #include <srs_app_pithy_print.hpp> 51 #include <srs_app_pithy_print.hpp>
  52 +#include <srs_app_rtmp_conn.hpp>
52 53
53 SrsMpegtsQueue::SrsMpegtsQueue() 54 SrsMpegtsQueue::SrsMpegtsQueue()
54 { 55 {
@@ -132,9 +133,7 @@ SrsMpegtsOverUdp::SrsMpegtsOverUdp(SrsConfDirective* c) @@ -132,9 +133,7 @@ SrsMpegtsOverUdp::SrsMpegtsOverUdp(SrsConfDirective* c)
132 output = _srs_config->get_stream_caster_output(c); 133 output = _srs_config->get_stream_caster_output(c);
133 134
134 req = NULL; 135 req = NULL;
135 - client = NULL;  
136 - transport = new SrsTcpClient();  
137 - stream_id = 0; 136 + sdk = new SrsSimpleRtmpClient();
138 137
139 avc = new SrsRawH264Stream(); 138 avc = new SrsRawH264Stream();
140 aac = new SrsRawAacStream(); 139 aac = new SrsRawAacStream();
@@ -149,7 +148,7 @@ SrsMpegtsOverUdp::~SrsMpegtsOverUdp() @@ -149,7 +148,7 @@ SrsMpegtsOverUdp::~SrsMpegtsOverUdp()
149 { 148 {
150 close(); 149 close();
151 150
152 - srs_freep(transport); 151 + srs_freep(sdk);
153 srs_freep(buffer); 152 srs_freep(buffer);
154 srs_freep(stream); 153 srs_freep(stream);
155 srs_freep(context); 154 srs_freep(context);
@@ -565,8 +564,8 @@ int SrsMpegtsOverUdp::rtmp_write_packet(char type, u_int32_t timestamp, char* da @@ -565,8 +564,8 @@ int SrsMpegtsOverUdp::rtmp_write_packet(char type, u_int32_t timestamp, char* da
565 int ret = ERROR_SUCCESS; 564 int ret = ERROR_SUCCESS;
566 565
567 SrsSharedPtrMessage* msg = NULL; 566 SrsSharedPtrMessage* msg = NULL;
568 -  
569 - if ((ret = srs_rtmp_create_msg(type, timestamp, data, size, stream_id, &msg)) != ERROR_SUCCESS) { 567 +
  568 + if ((ret = sdk->rtmp_create_msg(type, timestamp, data, size, &msg)) != ERROR_SUCCESS) {
570 srs_error("mpegts: create shared ptr msg failed. ret=%d", ret); 569 srs_error("mpegts: create shared ptr msg failed. ret=%d", ret);
571 return ret; 570 return ret;
572 } 571 }
@@ -590,7 +589,7 @@ int SrsMpegtsOverUdp::rtmp_write_packet(char type, u_int32_t timestamp, char* da @@ -590,7 +589,7 @@ int SrsMpegtsOverUdp::rtmp_write_packet(char type, u_int32_t timestamp, char* da
590 } 589 }
591 590
592 // send out encoded msg. 591 // send out encoded msg.
593 - if ((ret = client->send_and_free_message(msg, stream_id)) != ERROR_SUCCESS) { 592 + if ((ret = sdk->send_and_free_message(msg)) != ERROR_SUCCESS) {
594 return ret; 593 return ret;
595 } 594 }
596 } 595 }
@@ -604,110 +603,29 @@ int SrsMpegtsOverUdp::connect() @@ -604,110 +603,29 @@ int SrsMpegtsOverUdp::connect()
604 603
605 // when ok, ignore. 604 // when ok, ignore.
606 // TODO: FIXME: should reconnect when disconnected. 605 // TODO: FIXME: should reconnect when disconnected.
607 - if (transport->connected()) {  
608 - return ret;  
609 - }  
610 -  
611 - // parse uri  
612 - if (!req) {  
613 - req = new SrsRequest();  
614 - srs_parse_rtmp_url(output, req->tcUrl, req->stream);  
615 - srs_discovery_tc_url(req->tcUrl, req->schema, req->host, req->vhost, req->app, req->port, req->param);  
616 - }  
617 -  
618 - // connect host.  
619 - if ((ret = transport->connect(req->host, req->port, ST_UTIME_NO_TIMEOUT)) != ERROR_SUCCESS) {  
620 - srs_error("mpegts: connect server %s:%d failed. ret=%d", req->host.c_str(), req->port, ret); 606 + if (sdk->connected()) {
621 return ret; 607 return ret;
622 } 608 }
623 609
624 - srs_freep(client);  
625 - client = new SrsRtmpClient(transport);  
626 -  
627 - client->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT_US);  
628 - client->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT_US);  
629 -  
630 - // connect to vhost/app  
631 - if ((ret = client->handshake()) != ERROR_SUCCESS) {  
632 - srs_error("mpegts: handshake with server failed. ret=%d", ret);  
633 - return ret;  
634 - }  
635 - if ((ret = connect_app(req->host, req->port)) != ERROR_SUCCESS) {  
636 - srs_error("mpegts: connect with server failed. ret=%d", ret);  
637 - return ret;  
638 - }  
639 - if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {  
640 - srs_error("mpegts: connect with server failed, stream_id=%d. ret=%d", stream_id, ret); 610 + int64_t cto = SRS_CONSTS_RTMP_TIMEOUT_US;
  611 + int64_t sto = SRS_CONSTS_RTMP_PULSE_TIMEOUT_US;
  612 + if ((ret = sdk->connect(output, cto, sto)) != ERROR_SUCCESS) {
  613 + srs_error("mpegts: connect %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", output.c_str(), cto, sto, ret);
641 return ret; 614 return ret;
642 } 615 }
643 616
644 - // publish.  
645 - if ((ret = client->publish(req->stream, stream_id)) != ERROR_SUCCESS) {  
646 - srs_error("mpegts: publish failed, stream=%s, stream_id=%d. ret=%d",  
647 - req->stream.c_str(), stream_id, ret); 617 + if ((ret = sdk->publish()) != ERROR_SUCCESS) {
  618 + srs_error("mpegts: publish failed. ret=%d", ret);
648 return ret; 619 return ret;
649 } 620 }
650 621
651 return ret; 622 return ret;
652 } 623 }
653 624
654 -// TODO: FIXME: refine the connect_app.  
655 -int SrsMpegtsOverUdp::connect_app(string ep_server, int ep_port)  
656 -{  
657 - int ret = ERROR_SUCCESS;  
658 -  
659 - // args of request takes the srs info.  
660 - if (req->args == NULL) {  
661 - req->args = SrsAmf0Any::object();  
662 - }  
663 -  
664 - // notify server the edge identity,  
665 - // @see https://github.com/simple-rtmp-server/srs/issues/147  
666 - SrsAmf0Object* data = req->args;  
667 - data->set("srs_sig", SrsAmf0Any::str(RTMP_SIG_SRS_KEY));  
668 - data->set("srs_server", SrsAmf0Any::str(RTMP_SIG_SRS_KEY" "RTMP_SIG_SRS_VERSION" ("RTMP_SIG_SRS_URL_SHORT")"));  
669 - data->set("srs_license", SrsAmf0Any::str(RTMP_SIG_SRS_LICENSE));  
670 - data->set("srs_role", SrsAmf0Any::str(RTMP_SIG_SRS_ROLE));  
671 - data->set("srs_url", SrsAmf0Any::str(RTMP_SIG_SRS_URL));  
672 - data->set("srs_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION));  
673 - data->set("srs_site", SrsAmf0Any::str(RTMP_SIG_SRS_WEB));  
674 - data->set("srs_email", SrsAmf0Any::str(RTMP_SIG_SRS_EMAIL));  
675 - data->set("srs_copyright", SrsAmf0Any::str(RTMP_SIG_SRS_COPYRIGHT));  
676 - data->set("srs_primary", SrsAmf0Any::str(RTMP_SIG_SRS_PRIMARY));  
677 - data->set("srs_authors", SrsAmf0Any::str(RTMP_SIG_SRS_AUTHROS));  
678 - // for edge to directly get the id of client.  
679 - data->set("srs_pid", SrsAmf0Any::number(getpid()));  
680 - data->set("srs_id", SrsAmf0Any::number(_srs_context->get_id()));  
681 -  
682 - // local ip of edge  
683 - std::vector<std::string> ips = srs_get_local_ipv4_ips();  
684 - assert(_srs_config->get_stats_network() < (int)ips.size());  
685 - std::string local_ip = ips[_srs_config->get_stats_network()];  
686 - data->set("srs_server_ip", SrsAmf0Any::str(local_ip.c_str()));  
687 -  
688 - // generate the tcUrl  
689 - std::string param = "";  
690 - std::string tc_url = srs_generate_tc_url(ep_server, req->vhost, req->app, ep_port, param);  
691 -  
692 - // upnode server identity will show in the connect_app of client.  
693 - // @see https://github.com/simple-rtmp-server/srs/issues/160  
694 - // the debug_srs_upnode is config in vhost and default to true.  
695 - bool debug_srs_upnode = _srs_config->get_debug_srs_upnode(req->vhost);  
696 - if ((ret = client->connect_app(req->app, tc_url, req, debug_srs_upnode)) != ERROR_SUCCESS) {  
697 - srs_error("mpegts: connect with server failed, tcUrl=%s, dsu=%d. ret=%d",  
698 - tc_url.c_str(), debug_srs_upnode, ret);  
699 - return ret;  
700 - }  
701 -  
702 - return ret;  
703 -}  
704 -  
705 void SrsMpegtsOverUdp::close() 625 void SrsMpegtsOverUdp::close()
706 { 626 {
707 - srs_freep(client);  
708 srs_freep(req); 627 srs_freep(req);
709 -  
710 - transport->close(); 628 + sdk->close();
711 } 629 }
712 630
713 #endif 631 #endif
@@ -48,6 +48,7 @@ class SrsSharedPtrMessage; @@ -48,6 +48,7 @@ class SrsSharedPtrMessage;
48 class SrsRawAacStream; 48 class SrsRawAacStream;
49 struct SrsRawAacStreamCodec; 49 struct SrsRawAacStreamCodec;
50 class SrsPithyPrint; 50 class SrsPithyPrint;
  51 +class SrsSimpleRtmpClient;
51 52
52 #include <srs_app_st.hpp> 53 #include <srs_app_st.hpp>
53 #include <srs_kernel_ts.hpp> 54 #include <srs_kernel_ts.hpp>
@@ -86,9 +87,7 @@ private: @@ -86,9 +87,7 @@ private:
86 std::string output; 87 std::string output;
87 private: 88 private:
88 SrsRequest* req; 89 SrsRequest* req;
89 - SrsTcpClient* transport;  
90 - SrsRtmpClient* client;  
91 - int stream_id; 90 + SrsSimpleRtmpClient* sdk;
92 private: 91 private:
93 SrsRawH264Stream* avc; 92 SrsRawH264Stream* avc;
94 std::string h264_sps; 93 std::string h264_sps;
@@ -125,7 +124,6 @@ private: @@ -125,7 +124,6 @@ private:
125 // connect to rtmp output url. 124 // connect to rtmp output url.
126 // @remark ignore when not connected, reconnect when disconnected. 125 // @remark ignore when not connected, reconnect when disconnected.
127 virtual int connect(); 126 virtual int connect();
128 - virtual int connect_app(std::string ep_server, int ep_port);  
129 // close the connected io and rtmp to ready to be re-connect. 127 // close the connected io and rtmp to ready to be re-connect.
130 virtual void close(); 128 virtual void close();
131 }; 129 };
@@ -198,6 +198,11 @@ int SrsSimpleRtmpClient::connect_app() @@ -198,6 +198,11 @@ int SrsSimpleRtmpClient::connect_app()
198 return ret; 198 return ret;
199 } 199 }
200 200
  201 +bool SrsSimpleRtmpClient::connected()
  202 +{
  203 + return transport->connected();
  204 +}
  205 +
201 void SrsSimpleRtmpClient::close() 206 void SrsSimpleRtmpClient::close()
202 { 207 {
203 transport->close(); 208 transport->close();
@@ -266,22 +271,16 @@ int SrsSimpleRtmpClient::sid() @@ -266,22 +271,16 @@ int SrsSimpleRtmpClient::sid()
266 return stream_id; 271 return stream_id;
267 } 272 }
268 273
269 -int SrsSimpleRtmpClient::rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size) 274 +int SrsSimpleRtmpClient::rtmp_create_msg(char type, u_int32_t timestamp, char* data, int size, SrsSharedPtrMessage** pmsg)
270 { 275 {
271 - int ret = ERROR_SUCCESS; 276 + *pmsg = NULL;
272 277
273 - SrsSharedPtrMessage* msg = NULL; 278 + int ret = ERROR_SUCCESS;
274 279
275 - if ((ret = srs_rtmp_create_msg(type, timestamp, data, size, stream_id, &msg)) != ERROR_SUCCESS) { 280 + if ((ret = srs_rtmp_create_msg(type, timestamp, data, size, stream_id, pmsg)) != ERROR_SUCCESS) {
276 srs_error("sdk: create shared ptr msg failed. ret=%d", ret); 281 srs_error("sdk: create shared ptr msg failed. ret=%d", ret);
277 return ret; 282 return ret;
278 } 283 }
279 - srs_assert(msg);  
280 -  
281 - // send out encoded msg.  
282 - if ((ret = client->send_and_free_message(msg, stream_id)) != ERROR_SUCCESS) {  
283 - return ret;  
284 - }  
285 284
286 return ret; 285 return ret;
287 } 286 }
@@ -78,6 +78,7 @@ public: @@ -78,6 +78,7 @@ public:
78 private: 78 private:
79 virtual int connect_app(); 79 virtual int connect_app();
80 public: 80 public:
  81 + virtual bool connected();
81 virtual void close(); 82 virtual void close();
82 public: 83 public:
83 virtual int publish(); 84 virtual int publish();
@@ -86,7 +87,8 @@ public: @@ -86,7 +87,8 @@ public:
86 virtual void kbps_sample(const char* label, int64_t age, int msgs); 87 virtual void kbps_sample(const char* label, int64_t age, int msgs);
87 virtual int sid(); 88 virtual int sid();
88 public: 89 public:
89 - virtual int rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size); 90 + virtual int rtmp_create_msg(char type, u_int32_t timestamp, char* data, int size, SrsSharedPtrMessage** pmsg);
  91 +public:
90 virtual int recv_message(SrsCommonMessage** pmsg); 92 virtual int recv_message(SrsCommonMessage** pmsg);
91 virtual int decode_message(SrsCommonMessage* msg, SrsPacket** ppacket); 93 virtual int decode_message(SrsCommonMessage* msg, SrsPacket** ppacket);
92 virtual int send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs); 94 virtual int send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs);