winlin

add rtmp client

... ... @@ -15,7 +15,7 @@ vhost __defaultVhost__ {
hls_path ./objs/nginx/html;
hls_fragment 5;
hls_window 30;
forward 127.0.0.1:1936;
forward 192.168.1.50;
}
# the vhost which forward publish streams.
vhost forward.vhost.com {
... ...
... ... @@ -39,8 +39,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core_hls.hpp>
#define SRS_PULSE_TIMEOUT_MS 100
#define SRS_SEND_TIMEOUT_MS 5000000L
#define SRS_RECV_TIMEOUT_MS SRS_SEND_TIMEOUT_MS
#define SRS_SEND_TIMEOUT_US 5000000L
#define SRS_RECV_TIMEOUT_US SRS_SEND_TIMEOUT_US
#define SRS_STREAM_BUSY_SLEEP_MS 2000
SrsClient::SrsClient(SrsServer* srs_server, st_netfd_t client_stfd)
... ... @@ -72,10 +72,10 @@ int SrsClient::do_cycle()
return ret;
}
srs_trace("get peer ip success. ip=%s, send_to=%"PRId64", recv_to=%"PRId64"",
ip, SRS_SEND_TIMEOUT_MS, SRS_RECV_TIMEOUT_MS);
ip, SRS_SEND_TIMEOUT_US, SRS_RECV_TIMEOUT_US);
rtmp->set_recv_timeout(SRS_RECV_TIMEOUT_MS * 1000);
rtmp->set_send_timeout(SRS_SEND_TIMEOUT_MS * 1000);
rtmp->set_recv_timeout(SRS_RECV_TIMEOUT_US);
rtmp->set_send_timeout(SRS_SEND_TIMEOUT_US);
if ((ret = rtmp->handshake()) != ERROR_SUCCESS) {
srs_error("rtmp handshake failed. ret=%d", ret);
... ... @@ -400,7 +400,7 @@ int SrsClient::process_publish_message(SrsSource* source, SrsCommonMessage* msg,
// process onMetaData
if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
if ((ret = msg->decode_packet()) != ERROR_SUCCESS) {
if ((ret = msg->decode_packet(rtmp->get_protocol())) != ERROR_SUCCESS) {
srs_error("decode onMetaData message failed. ret=%d", ret);
return ret;
}
... ... @@ -422,7 +422,7 @@ int SrsClient::process_publish_message(SrsSource* source, SrsCommonMessage* msg,
// process UnPublish event.
if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) {
if ((ret = msg->decode_packet()) != ERROR_SUCCESS) {
if ((ret = msg->decode_packet(rtmp->get_protocol())) != ERROR_SUCCESS) {
srs_error("decode unpublish message failed. ret=%d", ret);
return ret;
}
... ... @@ -496,7 +496,7 @@ int SrsClient::process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage*
return ret;
}
if ((ret = msg->decode_packet()) != ERROR_SUCCESS) {
if ((ret = msg->decode_packet(rtmp->get_protocol())) != ERROR_SUCCESS) {
srs_error("decode the amf0/amf3 command packet failed. ret=%d", ret);
return ret;
}
... ...
... ... @@ -37,6 +37,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define ERROR_ST_OPEN_SOCKET 102
#define ERROR_ST_CREATE_LISTEN_THREAD 103
#define ERROR_ST_CREATE_CYCLE_THREAD 104
#define ERROR_ST_CREATE_FORWARD_THREAD 105
#define ERROR_ST_CONNECT 106
#define ERROR_SOCKET_CREATE 200
#define ERROR_SOCKET_SETREUSE 201
... ... @@ -67,6 +69,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define ERROR_RTMP_PACKET_SIZE 313
#define ERROR_RTMP_VHOST_NOT_FOUND 314
#define ERROR_RTMP_ACCESS_DENIED 315
#define ERROR_RTMP_HANDSHAKE 316
#define ERROR_RTMP_NO_REQUEST 317
#define ERROR_SYSTEM_STREAM_INIT 400
#define ERROR_SYSTEM_PACKET_INVALID 401
... ... @@ -79,6 +83,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define ERROR_SYSTEM_CONFIG_BLOCK_END 408
#define ERROR_SYSTEM_CONFIG_EOF 409
#define ERROR_SYSTEM_STREAM_BUSY 410
#define ERROR_SYSTEM_IP_INVALID 411
// see librtmp.
// failed when open ssl create the dh
... ...
... ... @@ -24,42 +24,47 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core_forward.hpp>
#include <stdlib.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <srs_core_error.hpp>
#include <srs_core_rtmp.hpp>
#include <srs_core_log.hpp>
#define SRS_FORWARDER_SLEEP_MS 2000
#define SRS_SEND_TIMEOUT_US 3000000L
#define SRS_RECV_TIMEOUT_US SRS_SEND_TIMEOUT_US
SrsForwarder::SrsForwarder()
{
client = new SrsRtmpClient();
client = NULL;
tid = NULL;
stfd = NULL;
loop = false;
stream_id = 0;
}
SrsForwarder::~SrsForwarder()
{
if (tid) {
loop = false;
st_thread_interrupt(tid);
st_thread_join(tid, NULL);
tid = NULL;
}
srs_freep(client);
on_unpublish();
}
int SrsForwarder::on_publish(std::string vhost, std::string app, std::string stream, std::string forward_server)
int SrsForwarder::on_publish(std::string vhost, std::string _app, std::string stream, std::string forward_server)
{
int ret = ERROR_SUCCESS;
std::string tc_url = "rtmp://";
app = _app;
tc_url = "rtmp://";
tc_url += vhost;
tc_url += "/";
tc_url += app;
std::string stream_name = stream;
std::string server = forward_server;
int port = 1935;
stream_name = stream;
server = forward_server;
port = 1935;
size_t pos = forward_server.find(":");
if (pos != std::string::npos) {
... ... @@ -67,14 +72,40 @@ int SrsForwarder::on_publish(std::string vhost, std::string app, std::string str
server = forward_server.substr(0, pos);
}
srs_trace("forward stream=%s, tcUrl=%s to server=%s, port=%d",
stream_name.c_str(), tc_url.c_str(), server.c_str(), port);
if ((ret = open_socket()) != ERROR_SUCCESS) {
return ret;
}
srs_assert(!tid);
if((tid = st_thread_create(forward_thread, this, 1, 0)) == NULL){
ret = ERROR_ST_CREATE_FORWARD_THREAD;
srs_error("st_thread_create failed. ret=%d", ret);
return ret;
}
return ret;
}
void SrsForwarder::on_unpublish()
{
if (tid) {
loop = false;
st_thread_interrupt(tid);
st_thread_join(tid, NULL);
tid = NULL;
}
if (stfd) {
int fd = st_netfd_fileno(stfd);
st_netfd_close(stfd);
stfd = NULL;
// st does not close it sometimes,
// close it manually.
close(fd);
}
srs_freep(client);
}
int SrsForwarder::on_meta_data(SrsOnMetaDataPacket* metadata)
... ... @@ -95,3 +126,147 @@ int SrsForwarder::on_video(SrsSharedPtrMessage* msg)
return ret;
}
int SrsForwarder::open_socket()
{
int ret = ERROR_SUCCESS;
srs_trace("forward stream=%s, tcUrl=%s to server=%s, port=%d",
stream_name.c_str(), tc_url.c_str(), server.c_str(), port);
int sock = socket(AF_INET, SOCK_STREAM, 0);
if(sock == -1){
ret = ERROR_SOCKET_CREATE;
srs_error("create socket error. ret=%d", ret);
return ret;
}
stfd = st_netfd_open_socket(sock);
if(stfd == NULL){
ret = ERROR_ST_OPEN_SOCKET;
srs_error("st_netfd_open_socket failed. ret=%d", ret);
return ret;
}
srs_freep(client);
client = new SrsRtmpClient(stfd);
return ret;
}
int SrsForwarder::connect_server()
{
int ret = ERROR_SUCCESS;
std::string ip = parse_server(server);
if (ip.empty()) {
ret = ERROR_SYSTEM_IP_INVALID;
srs_error("dns resolve server error, ip empty. ret=%d", ret);
return ret;
}
sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), ST_UTIME_NO_TIMEOUT) == -1){
ret = ERROR_ST_CONNECT;
srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret);
return ret;
}
srs_trace("connect to server success. server=%s, ip=%s, port=%d", server.c_str(), ip.c_str(), port);
return ret;
}
std::string SrsForwarder::parse_server(std::string host)
{
if (inet_addr(host.c_str()) != INADDR_NONE) {
return host;
}
hostent* answer = gethostbyname(host.c_str());
if (answer == NULL) {
srs_error("dns resolve host %s error.", host.c_str());
return "";
}
char ipv4[16];
memset(ipv4, 0, sizeof(ipv4));
for (int i = 0; i < answer->h_length; i++) {
inet_ntop(AF_INET, answer->h_addr_list[i], ipv4, sizeof(ipv4));
srs_info("dns resolve host %s to %s.", host.c_str(), ipv4);
break;
}
return ipv4;
}
int SrsForwarder::forward_cycle_imp()
{
int ret = ERROR_SUCCESS;
client->set_recv_timeout(SRS_RECV_TIMEOUT_US);
client->set_send_timeout(SRS_SEND_TIMEOUT_US);
if ((ret = connect_server()) != ERROR_SUCCESS) {
return ret;
}
srs_assert(client);
if ((ret = client->handshake()) != ERROR_SUCCESS) {
srs_error("handshake with server failed. ret=%d", ret);
return ret;
}
if ((ret = client->connect_app(app, tc_url)) != ERROR_SUCCESS) {
srs_error("connect with server failed, tcUrl=%s. ret=%d", tc_url.c_str(), ret);
return ret;
}
if ((ret = client->play_stream(stream_name, stream_id)) != ERROR_SUCCESS) {
srs_error("connect with server failed, stream_name=%s. ret=%d", stream_name.c_str(), ret);
return ret;
}
return ret;
}
void SrsForwarder::forward_cycle()
{
int ret = ERROR_SUCCESS;
log_context->generate_id();
srs_trace("forward cycle start");
while (loop) {
if ((ret = forward_cycle_imp()) != ERROR_SUCCESS) {
srs_warn("forward cycle failed, ignored and retry, ret=%d", ret);
} else {
srs_info("forward cycle success, retry");
}
if (!loop) {
break;
}
st_usleep(SRS_FORWARDER_SLEEP_MS * 1000);
if ((ret = open_socket()) != ERROR_SUCCESS) {
srs_warn("forward cycle reopen failed, ignored and retry, ret=%d", ret);
} else {
srs_info("forward cycle reopen success");
}
}
srs_trace("forward cycle finished");
}
void* SrsForwarder::forward_thread(void* arg)
{
SrsForwarder* obj = (SrsForwarder*)arg;
srs_assert(obj != NULL);
obj->loop = true;
obj->forward_cycle();
return NULL;
}
... ...
... ... @@ -43,11 +43,14 @@ class SrsRtmpClient;
class SrsForwarder
{
private:
std::string app;
std::string tc_url;
std::string stream_name;
int stream_id;
std::string server;
int port;
private:
st_netfd_t stfd;
st_thread_t tid;
bool loop;
private:
... ... @@ -61,6 +64,14 @@ public:
virtual int on_meta_data(SrsOnMetaDataPacket* metadata);
virtual int on_audio(SrsSharedPtrMessage* msg);
virtual int on_video(SrsSharedPtrMessage* msg);
private:
virtual int open_socket();
virtual int connect_server();
std::string parse_server(std::string host);
private:
virtual int forward_cycle_imp();
virtual void forward_cycle();
static void* forward_thread(void* arg);
};
#endif
... ...
... ... @@ -1067,7 +1067,7 @@ SrsSimpleHandshake::~SrsSimpleHandshake()
{
}
int SrsSimpleHandshake::handshake(SrsSocket& skt, SrsComplexHandshake& complex_hs)
int SrsSimpleHandshake::handshake_with_client(SrsSocket& skt, SrsComplexHandshake& complex_hs)
{
int ret = ERROR_SUCCESS;
... ... @@ -1090,7 +1090,7 @@ int SrsSimpleHandshake::handshake(SrsSocket& skt, SrsComplexHandshake& complex_h
srs_verbose("check c0 success, required plain text.");
// try complex handshake
ret = complex_hs.handshake(skt, c0c1 + 1);
ret = complex_hs.handshake_with_client(skt, c0c1 + 1);
if (ret == ERROR_SUCCESS) {
srs_trace("complex handshake success.");
return ret;
... ... @@ -1125,6 +1125,67 @@ int SrsSimpleHandshake::handshake(SrsSocket& skt, SrsComplexHandshake& complex_h
return ret;
}
int SrsSimpleHandshake::handshake_with_server(SrsSocket& skt, SrsComplexHandshake& complex_hs)
{
int ret = ERROR_SUCCESS;
// try complex handshake
ret = complex_hs.handshake_with_server(skt);
if (ret == ERROR_SUCCESS) {
srs_trace("complex handshake success.");
return ret;
}
if (ret != ERROR_RTMP_TRY_SIMPLE_HS) {
srs_error("complex handshake failed. ret=%d", ret);
return ret;
}
srs_info("rollback complex to simple handshake. ret=%d", ret);
// simple handshake
ssize_t nsize;
char* c0c1 = new char[1537];
SrsAutoFree(char, c0c1, true);
srs_random_generate(c0c1, 1537);
// plain text required.
c0c1[0] = 0x03;
if ((ret = skt.write(c0c1, 1537, &nsize)) != ERROR_SUCCESS) {
srs_warn("write c0c1 failed. ret=%d", ret);
return ret;
}
srs_verbose("write c0c1 success.");
char* s0s1s2 = new char[3073];
SrsAutoFree(char, s0s1s2, true);
if ((ret = skt.read_fully(s0s1s2, 3073, &nsize)) != ERROR_SUCCESS) {
srs_warn("simple handshake recv s0s1s2 failed. ret=%d", ret);
return ret;
}
srs_verbose("simple handshake recv s0s1s2 success.");
// plain text required.
if (s0s1s2[0] != 0x03) {
ret = ERROR_RTMP_HANDSHAKE;
srs_warn("handshake failed, plain text required. ret=%d", ret);
return ret;
}
char* c2 = new char[1536];
SrsAutoFree(char, c2, true);
srs_random_generate(c2, 1536);
if ((ret = skt.write(c2, 1536, &nsize)) != ERROR_SUCCESS) {
srs_warn("simple handshake write c2 failed. ret=%d", ret);
return ret;
}
srs_verbose("simple handshake write c2 success.");
srs_trace("simple handshake success.");
return ret;
}
SrsComplexHandshake::SrsComplexHandshake()
{
}
... ... @@ -1134,12 +1195,12 @@ SrsComplexHandshake::~SrsComplexHandshake()
}
#ifndef SRS_SSL
int SrsComplexHandshake::handshake(SrsSocket& /*skt*/, char* /*_c1*/)
int SrsComplexHandshake::handshake_with_client(SrsSocket& /*skt*/, char* /*_c1*/)
{
return ERROR_RTMP_TRY_SIMPLE_HS;
}
#else
int SrsComplexHandshake::handshake(SrsSocket& skt, char* _c1)
int SrsComplexHandshake::handshake_with_client(SrsSocket& skt, char* _c1)
{
int ret = ERROR_SUCCESS;
... ... @@ -1216,3 +1277,20 @@ int SrsComplexHandshake::handshake(SrsSocket& skt, char* _c1)
}
#endif
#ifndef SRS_SSL
int SrsComplexHandshake::handshake_with_server(SrsSocket& /*skt*/)
{
return ERROR_RTMP_TRY_SIMPLE_HS;
}
#else
int SrsComplexHandshake::handshake_with_server(SrsSocket& /*skt*/)
{
int ret = ERROR_SUCCESS;
// TODO: implements complex handshake.
ret = ERROR_RTMP_TRY_SIMPLE_HS;
return ret;
}
#endif
... ...
... ... @@ -47,7 +47,8 @@ public:
* @param complex_hs, try complex handshake first,
* if failed, rollback to simple handshake.
*/
virtual int handshake(SrsSocket& skt, SrsComplexHandshake& complex_hs);
virtual int handshake_with_client(SrsSocket& skt, SrsComplexHandshake& complex_hs);
virtual int handshake_with_server(SrsSocket& skt, SrsComplexHandshake& complex_hs);
};
/**
... ... @@ -70,7 +71,8 @@ public:
* try simple handshake if error is ERROR_RTMP_TRY_SIMPLE_HS,
* otherwise, disconnect
*/
virtual int handshake(SrsSocket& skt, char* _c1);
virtual int handshake_with_client(SrsSocket& skt, char* _c1);
virtual int handshake_with_server(SrsSocket& skt);
};
#endif
\ No newline at end of file
... ...
... ... @@ -199,6 +199,7 @@ messages.
#define RTMP_AMF0_COMMAND_ON_BW_DONE "onBWDone"
#define RTMP_AMF0_COMMAND_ON_STATUS "onStatus"
#define RTMP_AMF0_COMMAND_RESULT "_result"
#define RTMP_AMF0_COMMAND_ERROR "_error"
#define RTMP_AMF0_COMMAND_RELEASE_STREAM "releaseStream"
#define RTMP_AMF0_COMMAND_FC_PUBLISH "FCPublish"
#define RTMP_AMF0_COMMAND_UNPUBLISH "FCUnpublish"
... ... @@ -282,6 +283,15 @@ SrsProtocol::~SrsProtocol()
srs_freep(skt);
}
std::string SrsProtocol::get_request_name(double transcationId)
{
if (requests.find(transcationId) == requests.end()) {
return "";
}
return requests[transcationId];
}
void SrsProtocol::set_recv_timeout(int64_t timeout_us)
{
return skt->set_recv_timeout(timeout_us);
... ... @@ -548,7 +558,7 @@ int SrsProtocol::on_recv_message(SrsCommonMessage* msg)
case RTMP_MSG_SetChunkSize:
case RTMP_MSG_UserControlMessage:
case RTMP_MSG_WindowAcknowledgementSize:
if ((ret = msg->decode_packet()) != ERROR_SUCCESS) {
if ((ret = msg->decode_packet(this)) != ERROR_SUCCESS) {
srs_error("decode packet from message payload failed. ret=%d", ret);
return ret;
}
... ... @@ -624,6 +634,17 @@ int SrsProtocol::on_send_message(ISrsMessage* msg)
srs_trace("set output chunk size to %d", pkt->chunk_size);
break;
}
case RTMP_MSG_AMF0CommandMessage:
case RTMP_MSG_AMF3CommandMessage: {
if (true) {
SrsConnectAppPacket* pkt = NULL;
pkt = dynamic_cast<SrsConnectAppPacket*>(common_msg->get_packet());
if (pkt) {
requests[pkt->transaction_id] = RTMP_AMF0_COMMAND_CONNECT;
}
}
break;
}
}
return ret;
... ... @@ -1157,7 +1178,7 @@ bool SrsCommonMessage::can_decode()
return true;
}
int SrsCommonMessage::decode_packet()
int SrsCommonMessage::decode_packet(SrsProtocol* protocol)
{
int ret = ERROR_SUCCESS;
... ... @@ -1201,6 +1222,39 @@ int SrsCommonMessage::decode_packet()
}
srs_verbose("AMF0/AMF3 command message, command_name=%s", command.c_str());
// result/error packet
if (command == RTMP_AMF0_COMMAND_RESULT || command == RTMP_AMF0_COMMAND_ERROR) {
double transactionId = 0.0;
if ((ret = srs_amf0_read_number(stream, transactionId)) != ERROR_SUCCESS) {
srs_error("decode AMF0/AMF3 transcationId failed. ret=%d", ret);
return ret;
}
srs_verbose("AMF0/AMF3 command id, transcationId=%.2f", transactionId);
// reset stream, for header read completed.
stream->reset();
std::string request_name = protocol->get_request_name(transactionId);
if (request_name.empty()) {
ret = ERROR_RTMP_NO_REQUEST;
srs_error("decode AMF0/AMF3 request failed. ret=%d", ret);
return ret;
}
srs_verbose("AMF0/AMF3 request parsed. request_name=%s", request_name.c_str());
if (request_name == RTMP_AMF0_COMMAND_CONNECT) {
srs_info("decode the AMF0/AMF3 response command(connect vhost/app message).");
packet = new SrsConnectAppResPacket();
return packet->decode(stream);
} else {
ret = ERROR_RTMP_NO_REQUEST;
srs_error("decode AMF0/AMF3 request failed. "
"request_name=%s, transactionId=%.2f, ret=%d",
request_name.c_str(), transactionId, ret);
return ret;
}
}
// reset to zero(amf3 to 1) to restart decode.
stream->reset();
if (header.is_amf3_command()) {
... ... @@ -1319,7 +1373,13 @@ int SrsCommonMessage::encode_packet()
size = 0;
srs_freepa(payload);
return packet->encode(size, (char*&)payload);
if ((ret = packet->encode(size, (char*&)payload)) != ERROR_SUCCESS) {
return ret;
}
header.payload_length = size;
return ret;
}
SrsSharedPtrMessage::SrsSharedPtr::SrsSharedPtr()
... ... @@ -1582,6 +1642,49 @@ int SrsConnectAppPacket::decode(SrsStream* stream)
return ret;
}
int SrsConnectAppPacket::get_perfer_cid()
{
return RTMP_CID_OverConnection;
}
int SrsConnectAppPacket::get_message_type()
{
return RTMP_MSG_AMF0CommandMessage;
}
int SrsConnectAppPacket::get_size()
{
return srs_amf0_get_string_size(command_name) + srs_amf0_get_number_size()
+ srs_amf0_get_object_size(command_object);
}
int SrsConnectAppPacket::encode_packet(SrsStream* stream)
{
int ret = ERROR_SUCCESS;
if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) {
srs_error("encode command_name failed. ret=%d", ret);
return ret;
}
srs_verbose("encode command_name success.");
if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) {
srs_error("encode transaction_id failed. ret=%d", ret);
return ret;
}
srs_verbose("encode transaction_id success.");
if ((ret = srs_amf0_write_object(stream, command_object)) != ERROR_SUCCESS) {
srs_error("encode command_object failed. ret=%d", ret);
return ret;
}
srs_verbose("encode command_object success.");
srs_info("encode connect app request packet success.");
return ret;
}
SrsConnectAppResPacket::SrsConnectAppResPacket()
{
command_name = RTMP_AMF0_COMMAND_RESULT;
... ... @@ -1596,6 +1699,57 @@ SrsConnectAppResPacket::~SrsConnectAppResPacket()
srs_freep(info);
}
int SrsConnectAppResPacket::decode(SrsStream* stream)
{
int ret = ERROR_SUCCESS;
if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) {
srs_error("amf0 decode connect command_name failed. ret=%d", ret);
return ret;
}
if (command_name.empty() || command_name != RTMP_AMF0_COMMAND_RESULT) {
ret = ERROR_RTMP_AMF0_DECODE;
srs_error("amf0 decode connect command_name failed. "
"command_name=%s, ret=%d", command_name.c_str(), ret);
return ret;
}
if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) {
srs_error("amf0 decode connect transaction_id failed. ret=%d", ret);
return ret;
}
if (transaction_id != 1.0) {
ret = ERROR_RTMP_AMF0_DECODE;
srs_error("amf0 decode connect transaction_id failed. "
"required=%.1f, actual=%.1f, ret=%d", 1.0, transaction_id, ret);
return ret;
}
if ((ret = srs_amf0_read_object(stream, props)) != ERROR_SUCCESS) {
srs_error("amf0 decode connect props failed. ret=%d", ret);
return ret;
}
if (props == NULL) {
ret = ERROR_RTMP_AMF0_DECODE;
srs_error("amf0 decode connect props failed. ret=%d", ret);
return ret;
}
if ((ret = srs_amf0_read_object(stream, info)) != ERROR_SUCCESS) {
srs_error("amf0 decode connect info failed. ret=%d", ret);
return ret;
}
if (info == NULL) {
ret = ERROR_RTMP_AMF0_DECODE;
srs_error("amf0 decode connect info failed. ret=%d", ret);
return ret;
}
srs_info("amf0 decode connect response packet success");
return ret;
}
int SrsConnectAppResPacket::get_perfer_cid()
{
return RTMP_CID_OverConnection;
... ...
... ... @@ -88,6 +88,12 @@ private:
st_netfd_t stfd;
SrsSocket* skt;
char* pp;
/**
* requests sent out, used to build the response.
* key: transactionId
* value: the request command name
*/
std::map<double, std::string> requests;
// peer in
private:
std::map<int, SrsChunkStream*> chunk_streams;
... ... @@ -103,6 +109,7 @@ public:
SrsProtocol(st_netfd_t client_stfd);
virtual ~SrsProtocol();
public:
std::string get_request_name(double transcationId);
/**
* set the timeout in us.
* if timeout, recv/send message return ERROR_SOCKET_TIMEOUT.
... ... @@ -319,7 +326,7 @@ public:
/**
* decode packet from message payload.
*/
virtual int decode_packet();
virtual int decode_packet(SrsProtocol* protocol);
/**
* get the decoded packet which decoded by decode_packet().
* @remark, user never free the pkt, the message will auto free it.
... ... @@ -481,6 +488,13 @@ public:
virtual ~SrsConnectAppPacket();
public:
virtual int decode(SrsStream* stream);
public:
virtual int get_perfer_cid();
public:
virtual int get_message_type();
protected:
virtual int get_size();
virtual int encode_packet(SrsStream* stream);
};
/**
* response for SrsConnectAppPacket.
... ... @@ -503,6 +517,8 @@ public:
SrsConnectAppResPacket();
virtual ~SrsConnectAppResPacket();
public:
virtual int decode(SrsStream* stream);
public:
virtual int get_perfer_cid();
public:
virtual int get_message_type();
... ... @@ -1076,7 +1092,7 @@ int srs_rtmp_expect_message(SrsProtocol* protocol, SrsCommonMessage** pmsg, T**
}
srs_verbose("recv message success.");
if ((ret = msg->decode_packet()) != ERROR_SUCCESS) {
if ((ret = msg->decode_packet(protocol)) != ERROR_SUCCESS) {
delete msg;
srs_error("decode message failed. ret=%d", ret);
return ret;
... ...
... ... @@ -23,11 +23,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core_rtmp.hpp>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <srs_core_log.hpp>
#include <srs_core_error.hpp>
#include <srs_core_socket.hpp>
... ... @@ -168,50 +163,104 @@ SrsResponse::~SrsResponse()
{
}
SrsRtmpClient::SrsRtmpClient()
SrsRtmpClient::SrsRtmpClient(st_netfd_t _stfd)
{
stfd = NULL;
stfd = _stfd;
protocol = new SrsProtocol(stfd);
}
SrsRtmpClient::~SrsRtmpClient()
{
if (stfd) {
int fd = st_netfd_fileno(stfd);
st_netfd_close(stfd);
stfd = NULL;
srs_freep(protocol);
}
// st does not close it sometimes,
// close it manually.
close(fd);
}
void SrsRtmpClient::set_recv_timeout(int64_t timeout_us)
{
protocol->set_recv_timeout(timeout_us);
}
int SrsRtmpClient::connect_to(std::string server, int port)
void SrsRtmpClient::set_send_timeout(int64_t timeout_us)
{
protocol->set_send_timeout(timeout_us);
}
int SrsRtmpClient::handshake()
{
int ret = ERROR_SUCCESS;
SrsSocket skt(stfd);
SrsComplexHandshake complex_hs;
SrsSimpleHandshake simple_hs;
if ((ret = simple_hs.handshake_with_server(skt, complex_hs)) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
std::string SrsRtmpClient::parse_server(std::string host){
if(inet_addr(host.c_str()) != INADDR_NONE){
return host;
int SrsRtmpClient::connect_app(std::string app, std::string tc_url)
{
int ret = ERROR_SUCCESS;
// Connect(vhost, app)
if (true) {
SrsCommonMessage* msg = new SrsCommonMessage();
SrsConnectAppPacket* pkt = new SrsConnectAppPacket();
msg->set_packet(pkt, 0);
pkt->command_object = new SrsAmf0Object();
pkt->command_object->set("app", new SrsAmf0String(app.c_str()));
pkt->command_object->set("swfUrl", new SrsAmf0String());
pkt->command_object->set("tcUrl", new SrsAmf0String(tc_url.c_str()));
pkt->command_object->set("fpad", new SrsAmf0Boolean(false));
pkt->command_object->set("capabilities", new SrsAmf0Number(239));
pkt->command_object->set("audioCodecs", new SrsAmf0Number(3575));
pkt->command_object->set("videoCodecs", new SrsAmf0Number(252));
pkt->command_object->set("videoFunction", new SrsAmf0Number(1));
pkt->command_object->set("pageUrl", new SrsAmf0String());
pkt->command_object->set("objectEncoding", new SrsAmf0Number(0));
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
return ret;
}
}
// Set Window Acknowledgement size(2500000)
if (true) {
SrsCommonMessage* msg = new SrsCommonMessage();
SrsSetWindowAckSizePacket* pkt = new SrsSetWindowAckSizePacket();
pkt->ackowledgement_window_size = 2500000;
msg->set_packet(pkt, 0);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
return ret;
}
}
hostent* answer = gethostbyname(host.c_str());
if(answer == NULL){
srs_error("dns resolve host %s error.", host.c_str());
return "";
// expect connect _result
SrsCommonMessage* msg = NULL;
SrsConnectAppResPacket* pkt = NULL;
if ((ret = srs_rtmp_expect_message<SrsConnectAppResPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
srs_error("expect connect app response message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsCommonMessage, msg, false);
srs_info("get connect app response message");
char ipv4[16];
memset(ipv4, 0, sizeof(ipv4));
for(int i = 0; i < answer->h_length; i++){
inet_ntop(AF_INET, answer->h_addr_list[i], ipv4, sizeof(ipv4));
srs_info("dns resolve host %s to %s.", host.c_str(), ipv4);
break;
return ret;
}
int SrsRtmpClient::play_stream(std::string stream, int& stream_id)
{
int ret = ERROR_SUCCESS;
// CreateStream
if (true) {
}
return ipv4;
return ret;
}
SrsRtmp::SrsRtmp(st_netfd_t client_stfd)
... ... @@ -225,9 +274,14 @@ SrsRtmp::~SrsRtmp()
srs_freep(protocol);
}
SrsProtocol* SrsRtmp::get_protocol()
{
return protocol;
}
void SrsRtmp::set_recv_timeout(int64_t timeout_us)
{
return protocol->set_recv_timeout(timeout_us);
protocol->set_recv_timeout(timeout_us);
}
int64_t SrsRtmp::get_recv_timeout()
... ... @@ -237,7 +291,7 @@ int64_t SrsRtmp::get_recv_timeout()
void SrsRtmp::set_send_timeout(int64_t timeout_us)
{
return protocol->set_send_timeout(timeout_us);
protocol->set_send_timeout(timeout_us);
}
int64_t SrsRtmp::get_recv_bytes()
... ... @@ -278,7 +332,7 @@ int SrsRtmp::handshake()
SrsComplexHandshake complex_hs;
SrsSimpleHandshake simple_hs;
if ((ret = simple_hs.handshake(skt, complex_hs)) != ERROR_SUCCESS) {
if ((ret = simple_hs.handshake_with_client(skt, complex_hs)) != ERROR_SUCCESS) {
return ret;
}
... ... @@ -441,7 +495,7 @@ int SrsRtmp::identify_client(int stream_id, SrsClientType& type, std::string& st
continue;
}
if ((ret = msg->decode_packet()) != ERROR_SUCCESS) {
if ((ret = msg->decode_packet(protocol)) != ERROR_SUCCESS) {
srs_error("identify decode message failed. ret=%d", ret);
return ret;
}
... ... @@ -884,7 +938,7 @@ int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int strea
continue;
}
if ((ret = msg->decode_packet()) != ERROR_SUCCESS) {
if ((ret = msg->decode_packet(protocol)) != ERROR_SUCCESS) {
srs_error("identify decode message failed. ret=%d", ret);
return ret;
}
... ...
... ... @@ -100,13 +100,18 @@ enum SrsClientType
class SrsRtmpClient
{
private:
SrsProtocol* protocol;
st_netfd_t stfd;
public:
SrsRtmpClient();
SrsRtmpClient(st_netfd_t _stfd);
virtual ~SrsRtmpClient();
private:
virtual int connect_to(std::string server, int port);
std::string parse_server(std::string host);
public:
virtual void set_recv_timeout(int64_t timeout_us);
virtual void set_send_timeout(int64_t timeout_us);
public:
virtual int handshake();
virtual int connect_app(std::string app, std::string tc_url);
virtual int play_stream(std::string stream, int& stream_id);
};
/**
... ... @@ -123,6 +128,7 @@ public:
SrsRtmp(st_netfd_t client_stfd);
virtual ~SrsRtmp();
public:
virtual SrsProtocol* get_protocol();
virtual void set_recv_timeout(int64_t timeout_us);
virtual int64_t get_recv_timeout();
virtual void set_send_timeout(int64_t timeout_us);
... ...