winlin

add forward st thread

@@ -23,19 +23,53 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -23,19 +23,53 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23 23
24 #include <srs_core_forward.hpp> 24 #include <srs_core_forward.hpp>
25 25
  26 +#include <stdlib.h>
  27 +
26 #include <srs_core_error.hpp> 28 #include <srs_core_error.hpp>
  29 +#include <srs_core_rtmp.hpp>
  30 +#include <srs_core_log.hpp>
27 31
28 SrsForwarder::SrsForwarder() 32 SrsForwarder::SrsForwarder()
29 { 33 {
  34 + client = new SrsRtmpClient();
  35 + port = 1935;
  36 + tid = NULL;
  37 + loop = false;
30 } 38 }
31 39
32 SrsForwarder::~SrsForwarder() 40 SrsForwarder::~SrsForwarder()
33 { 41 {
  42 + srs_freep(client);
  43 +
  44 + if (tid) {
  45 + loop = false;
  46 + st_thread_interrupt(tid);
  47 + st_thread_join(tid, NULL);
  48 + tid = NULL;
  49 + }
34 } 50 }
35 51
36 int SrsForwarder::on_publish(std::string vhost, std::string app, std::string stream, std::string forward_server) 52 int SrsForwarder::on_publish(std::string vhost, std::string app, std::string stream, std::string forward_server)
37 { 53 {
38 int ret = ERROR_SUCCESS; 54 int ret = ERROR_SUCCESS;
  55 +
  56 + tc_url = "rtmp://";
  57 + tc_url += vhost;
  58 + tc_url += "/";
  59 + tc_url += app;
  60 +
  61 + stream_name = stream;
  62 + server = forward_server;
  63 +
  64 + size_t pos = forward_server.find(":");
  65 + if (pos != std::string::npos) {
  66 + port = ::atoi(forward_server.substr(pos + 1).c_str());
  67 + server = forward_server.substr(0, pos);
  68 + }
  69 +
  70 + srs_trace("forward stream=%s, tcUrl=%s to server=%s, port=%d",
  71 + stream_name.c_str(), tc_url.c_str(), server.c_str(), port);
  72 +
39 return ret; 73 return ret;
40 } 74 }
41 75
@@ -33,12 +33,21 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -33,12 +33,21 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
33 33
34 class SrsSharedPtrMessage; 34 class SrsSharedPtrMessage;
35 class SrsOnMetaDataPacket; 35 class SrsOnMetaDataPacket;
  36 +class SrsRtmpClient;
36 37
37 /** 38 /**
38 * forward the stream to other servers. 39 * forward the stream to other servers.
39 */ 40 */
40 class SrsForwarder 41 class SrsForwarder
41 { 42 {
  43 +private:
  44 + std::string tc_url;
  45 + std::string stream_name;
  46 + std::string server;
  47 + int port;
  48 + SrsRtmpClient* client;
  49 + st_thread_t tid;
  50 + bool loop;
42 public: 51 public:
43 SrsForwarder(); 52 SrsForwarder();
44 virtual ~SrsForwarder(); 53 virtual ~SrsForwarder();
@@ -163,6 +163,24 @@ SrsResponse::~SrsResponse() @@ -163,6 +163,24 @@ SrsResponse::~SrsResponse()
163 { 163 {
164 } 164 }
165 165
  166 +SrsRtmpClient::SrsRtmpClient()
  167 +{
  168 + stfd = NULL;
  169 +}
  170 +
  171 +SrsRtmpClient::~SrsRtmpClient()
  172 +{
  173 + if (stfd) {
  174 + int fd = st_netfd_fileno(stfd);
  175 + st_netfd_close(stfd);
  176 + stfd = NULL;
  177 +
  178 + // st does not close it sometimes,
  179 + // close it manually.
  180 + close(fd);
  181 + }
  182 +}
  183 +
166 SrsRtmp::SrsRtmp(st_netfd_t client_stfd) 184 SrsRtmp::SrsRtmp(st_netfd_t client_stfd)
167 { 185 {
168 protocol = new SrsProtocol(client_stfd); 186 protocol = new SrsProtocol(client_stfd);
@@ -93,6 +93,18 @@ enum SrsClientType @@ -93,6 +93,18 @@ enum SrsClientType
93 }; 93 };
94 94
95 /** 95 /**
  96 +* implements the client role protocol.
  97 +*/
  98 +class SrsRtmpClient
  99 +{
  100 +private:
  101 + st_netfd_t stfd;
  102 +public:
  103 + SrsRtmpClient();
  104 + virtual ~SrsRtmpClient();
  105 +};
  106 +
  107 +/**
96 * the rtmp provices rtmp-command-protocol services, 108 * the rtmp provices rtmp-command-protocol services,
97 * a high level protocol, media stream oriented services, 109 * a high level protocol, media stream oriented services,
98 * such as connect to vhost/app, play stream, get audio/video data. 110 * such as connect to vhost/app, play stream, get audio/video data.