winlin

refine RTMP protocol stack.

... ... @@ -189,22 +189,22 @@ int SrsBandwidth::do_bandwidth_check()
pkt->data->set("publish_bytes", SrsAmf0Any::number(publish_bytes));
pkt->data->set("publish_time", SrsAmf0Any::number(publish_actual_duration_ms));
SrsCommonMessage* msg = (new SrsCommonMessage())->set_packet(pkt, 0);
if ((ret = rtmp->send_message(msg)) != ERROR_SUCCESS) {
if ((ret = rtmp->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send bandwidth check finish message failed. ret=%d", ret);
return ret;
}
// if flash, we notice the result, and expect a final packet.
while (true) {
SrsCommonMessage* msg = NULL;
__SrsMessage* msg = NULL;
SrsBandwidthPacket* pkt = NULL;
if ((ret = srs_rtmp_expect_message<SrsBandwidthPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
if ((ret = __srs_rtmp_expect_message<SrsBandwidthPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
// info level to ignore and return success.
srs_info("expect final message failed. ret=%d", ret);
return ERROR_SUCCESS;
}
SrsAutoFree(SrsCommonMessage, msg, false);
SrsAutoFree(__SrsMessage, msg, false);
SrsAutoFree(SrsBandwidthPacket, pkt, false);
srs_info("get final message success.");
if (pkt->is_flash_final()) {
... ... @@ -233,8 +233,7 @@ int SrsBandwidth::check_play(
pkt->data->set("duration_ms", SrsAmf0Any::number(duration_ms));
pkt->data->set("interval_ms", SrsAmf0Any::number(interval_ms));
SrsCommonMessage* msg = (new SrsCommonMessage())->set_packet(pkt, 0);
if ((ret = rtmp->send_message(msg)) != ERROR_SUCCESS) {
if ((ret = rtmp->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send bandwidth check start play message failed. ret=%d", ret);
return ret;
}
... ... @@ -243,13 +242,14 @@ int SrsBandwidth::check_play(
while (true) {
// recv client's starting play response
SrsCommonMessage* msg = NULL;
__SrsMessage* msg = NULL;
SrsBandwidthPacket* pkt = NULL;
if ((ret = srs_rtmp_expect_message<SrsBandwidthPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
if ((ret = __srs_rtmp_expect_message<SrsBandwidthPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
srs_error("expect bandwidth message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsCommonMessage, msg, false);
SrsAutoFree(__SrsMessage, msg, false);
SrsAutoFree(SrsBandwidthPacket, pkt, false);
srs_info("get bandwidth message succes.");
if (pkt->is_starting_play()) {
... ... @@ -284,8 +284,7 @@ int SrsBandwidth::check_play(
// TODO: FIXME: get length from the rtmp protocol stack.
play_bytes += pkt->get_payload_length();
SrsCommonMessage* msg = (new SrsCommonMessage())->set_packet(pkt, 0);
if ((ret = rtmp->send_message(msg)) != ERROR_SUCCESS) {
if ((ret = rtmp->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send bandwidth check play messages failed. ret=%d", ret);
return ret;
}
... ... @@ -309,13 +308,13 @@ int SrsBandwidth::check_play(
if (true) {
// notify client to stop play
SrsBandwidthPacket* pkt = SrsBandwidthPacket::create_stop_play();
pkt->data->set("duration_ms", SrsAmf0Any::number(duration_ms));
pkt->data->set("interval_ms", SrsAmf0Any::number(interval_ms));
pkt->data->set("duration_delta", SrsAmf0Any::number(actual_duration_ms));
pkt->data->set("bytes_delta", SrsAmf0Any::number(play_bytes));
SrsCommonMessage* msg = (new SrsCommonMessage())->set_packet(pkt, 0);
if ((ret = rtmp->send_message(msg)) != ERROR_SUCCESS) {
if ((ret = rtmp->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send bandwidth check stop play message failed. ret=%d", ret);
return ret;
}
... ... @@ -324,13 +323,14 @@ int SrsBandwidth::check_play(
while (true) {
// recv client's stop play response.
SrsCommonMessage* msg = NULL;
__SrsMessage* msg = NULL;
SrsBandwidthPacket* pkt = NULL;
if ((ret = srs_rtmp_expect_message<SrsBandwidthPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
if ((ret = __srs_rtmp_expect_message<SrsBandwidthPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
srs_error("expect bandwidth message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsCommonMessage, msg, false);
SrsAutoFree(__SrsMessage, msg, false);
SrsAutoFree(SrsBandwidthPacket, pkt, false);
srs_info("get bandwidth message succes.");
if (pkt->is_stopped_play()) {
... ... @@ -357,8 +357,7 @@ int SrsBandwidth::check_publish(
pkt->data->set("duration_ms", SrsAmf0Any::number(duration_ms));
pkt->data->set("interval_ms", SrsAmf0Any::number(interval_ms));
SrsCommonMessage* msg = (new SrsCommonMessage())->set_packet(pkt, 0);
if ((ret = rtmp->send_message(msg)) != ERROR_SUCCESS) {
if ((ret = rtmp->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send bandwidth check start publish message failed. ret=%d", ret);
return ret;
}
... ... @@ -367,13 +366,14 @@ int SrsBandwidth::check_publish(
while (true) {
// read client's notification of starting publish
SrsCommonMessage* msg = NULL;
__SrsMessage* msg = NULL;
SrsBandwidthPacket* pkt = NULL;
if ((ret = srs_rtmp_expect_message<SrsBandwidthPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
if ((ret = __srs_rtmp_expect_message<SrsBandwidthPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
srs_error("expect bandwidth message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsCommonMessage, msg, false);
SrsAutoFree(__SrsMessage, msg, false);
SrsAutoFree(SrsBandwidthPacket, pkt, false);
srs_info("get bandwidth message succes.");
if (pkt->is_starting_publish()) {
... ... @@ -387,12 +387,12 @@ int SrsBandwidth::check_publish(
while ( (srs_get_system_time_ms() - current_time) < duration_ms ) {
st_usleep(0);
SrsCommonMessage* msg = NULL;
if ((ret = rtmp->recv_message(&msg)) != ERROR_SUCCESS) {
__SrsMessage* msg = NULL;
if ((ret = rtmp->__recv_message(&msg)) != ERROR_SUCCESS) {
srs_error("recv message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsCommonMessage, msg, false);
SrsAutoFree(__SrsMessage, msg, false);
// TODO: FIXME.
publish_bytes += msg->header.payload_length;
... ... @@ -420,8 +420,7 @@ int SrsBandwidth::check_publish(
pkt->data->set("duration_delta", SrsAmf0Any::number(actual_duration_ms));
pkt->data->set("bytes_delta", SrsAmf0Any::number(publish_bytes));
SrsCommonMessage* msg = (new SrsCommonMessage())->set_packet(pkt, 0);
if ((ret = rtmp->send_message(msg)) != ERROR_SUCCESS) {
if ((ret = rtmp->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send bandwidth check stop publish message failed. ret=%d", ret);
return ret;
}
... ... @@ -436,13 +435,14 @@ int SrsBandwidth::check_publish(
// TODO: FIXME: check whether flash client.
while (false) {
// recv client's stop publish response.
SrsCommonMessage* msg = NULL;
__SrsMessage* msg = NULL;
SrsBandwidthPacket* pkt = NULL;
if ((ret = srs_rtmp_expect_message<SrsBandwidthPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
if ((ret = __srs_rtmp_expect_message<SrsBandwidthPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
srs_error("expect bandwidth message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsCommonMessage, msg, false);
SrsAutoFree(__SrsMessage, msg, false);
SrsAutoFree(SrsBandwidthPacket, pkt, false);
srs_info("get bandwidth message succes.");
if (pkt->is_stopped_publish()) {
... ...
... ... @@ -315,8 +315,8 @@ int SrsForwarder::forward()
// read from client.
if (true) {
SrsCommonMessage* msg = NULL;
ret = client->recv_message(&msg);
__SrsMessage* msg = NULL;
ret = client->__recv_message(&msg);
srs_verbose("play loop recv message. ret=%d", ret);
if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) {
... ...
... ... @@ -713,6 +713,7 @@ int SrsRtmpConn::process_publish_message(SrsSource* source, __SrsMessage* msg, b
srs_error("source process audio message failed. ret=%d", ret);
return ret;
}
return ret;
}
// process video packet
if (msg->header.is_video()) {
... ... @@ -720,6 +721,7 @@ int SrsRtmpConn::process_publish_message(SrsSource* source, __SrsMessage* msg, b
srs_error("source process video message failed. ret=%d", ret);
return ret;
}
return ret;
}
// process onMetaData
... ...
... ... @@ -283,15 +283,15 @@ int srs_read_packet(srs_rtmp_t rtmp, int* type, u_int32_t* timestamp, char** dat
Context* context = (Context*)rtmp;
for (;;) {
SrsCommonMessage* msg = NULL;
if ((ret = context->rtmp->recv_message(&msg)) != ERROR_SUCCESS) {
__SrsMessage* msg = NULL;
if ((ret = context->rtmp->__recv_message(&msg)) != ERROR_SUCCESS) {
return ret;
}
if (!msg) {
continue;
}
SrsAutoFree(SrsCommonMessage, msg, false);
SrsAutoFree(__SrsMessage, msg, false);
if (msg->header.is_audio()) {
*type = SRS_RTMP_TYPE_AUDIO;
... ... @@ -332,13 +332,13 @@ int srs_write_packet(srs_rtmp_t rtmp, int type, u_int32_t timestamp, char* data,
srs_assert(rtmp != NULL);
Context* context = (Context*)rtmp;
SrsSharedPtrMessage* msg = NULL;
__SrsSharedPtrMessage* msg = NULL;
if (type == SRS_RTMP_TYPE_AUDIO) {
SrsMessageHeader header;
header.initialize_audio(size, timestamp, context->stream_id);
msg = new SrsSharedPtrMessage();
msg = new __SrsSharedPtrMessage();
if ((ret = msg->initialize(&header, data, size)) != ERROR_SUCCESS) {
srs_freepa(data);
return ret;
... ... @@ -347,7 +347,7 @@ int srs_write_packet(srs_rtmp_t rtmp, int type, u_int32_t timestamp, char* data,
SrsMessageHeader header;
header.initialize_video(size, timestamp, context->stream_id);
msg = new SrsSharedPtrMessage();
msg = new __SrsSharedPtrMessage();
if ((ret = msg->initialize(&header, data, size)) != ERROR_SUCCESS) {
srs_freepa(data);
return ret;
... ... @@ -356,7 +356,7 @@ int srs_write_packet(srs_rtmp_t rtmp, int type, u_int32_t timestamp, char* data,
SrsMessageHeader header;
header.initialize_amf0_script(size, context->stream_id);
msg = new SrsSharedPtrMessage();
msg = new __SrsSharedPtrMessage();
if ((ret = msg->initialize(&header, data, size)) != ERROR_SUCCESS) {
srs_freepa(data);
return ret;
... ... @@ -365,7 +365,7 @@ int srs_write_packet(srs_rtmp_t rtmp, int type, u_int32_t timestamp, char* data,
if (msg) {
// send out encoded msg.
if ((ret = context->rtmp->send_message(msg)) != ERROR_SUCCESS) {
if ((ret = context->rtmp->__send_and_free_message(msg)) != ERROR_SUCCESS) {
return ret;
}
} else {
... ...
... ... @@ -341,13 +341,14 @@ int SrsBandCheckClient::expect_start_play()
int ret = ERROR_SUCCESS;
// expect connect _result
SrsCommonMessage* msg = NULL;
__SrsMessage* msg = NULL;
SrsBandwidthPacket* pkt = NULL;
if ((ret = srs_rtmp_expect_message<SrsBandwidthPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
if ((ret = __srs_rtmp_expect_message<SrsBandwidthPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
srs_error("expect bandcheck start play message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsCommonMessage, msg, false);
SrsAutoFree(__SrsMessage, msg, false);
SrsAutoFree(SrsBandwidthPacket, pkt, false);
srs_info("get bandcheck start play message");
if (pkt->command_name != SRS_BW_CHECK_START_PLAY) {
... ... @@ -362,12 +363,9 @@ int SrsBandCheckClient::send_starting_play()
{
int ret = ERROR_SUCCESS;
SrsCommonMessage* msg = new SrsCommonMessage;
SrsBandwidthPacket* pkt = new SrsBandwidthPacket;
pkt->command_name = SRS_BW_CHECK_STARTING_PLAY;
msg->set_packet(pkt, 0);
if ((ret = send_message(msg)) != ERROR_SUCCESS) {
if ((ret = __send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send starting play msg failed. ret=%d", ret);
return ret;
}
... ... @@ -380,13 +378,14 @@ int SrsBandCheckClient::expect_stop_play()
int ret = ERROR_SUCCESS;
while (true) {
SrsCommonMessage* msg = NULL;
__SrsMessage* msg = NULL;
SrsBandwidthPacket* pkt = NULL;
if ((ret = srs_rtmp_expect_message<SrsBandwidthPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
if ((ret = __srs_rtmp_expect_message<SrsBandwidthPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
srs_error("expect stop play message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsCommonMessage, msg, false);
SrsAutoFree(__SrsMessage, msg, false);
SrsAutoFree(SrsBandwidthPacket, pkt, false);
srs_info("get bandcheck stop play message");
if (pkt->command_name == SRS_BW_CHECK_STOP_PLAY) {
... ... @@ -401,12 +400,9 @@ int SrsBandCheckClient::send_stopped_play()
{
int ret = ERROR_SUCCESS;
SrsCommonMessage* msg = new SrsCommonMessage;
SrsBandwidthPacket* pkt = new SrsBandwidthPacket;
pkt->command_name = SRS_BW_CHECK_STOPPED_PLAY;
msg->set_packet(pkt, 0);
if ((ret = send_message(msg)) != ERROR_SUCCESS) {
if ((ret = __send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send stopped play msg failed. ret=%d", ret);
return ret;
}
... ... @@ -419,13 +415,14 @@ int SrsBandCheckClient::expect_start_pub()
int ret = ERROR_SUCCESS;
while (true) {
SrsCommonMessage* msg = NULL;
__SrsMessage* msg = NULL;
SrsBandwidthPacket* pkt = NULL;
if ((ret = srs_rtmp_expect_message<SrsBandwidthPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
if ((ret = __srs_rtmp_expect_message<SrsBandwidthPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
srs_error("expect start pub message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsCommonMessage, msg, false);
SrsAutoFree(__SrsMessage, msg, false);
SrsAutoFree(SrsBandwidthPacket, pkt, false);
srs_info("get bandcheck start pub message");
if (pkt->command_name == SRS_BW_CHECK_START_PUBLISH) {
... ... @@ -440,12 +437,9 @@ int SrsBandCheckClient::send_starting_pub()
{
int ret = ERROR_SUCCESS;
SrsCommonMessage* msg = new SrsCommonMessage;
SrsBandwidthPacket* pkt = new SrsBandwidthPacket;
pkt->command_name = SRS_BW_CHECK_STARTING_PUBLISH;
msg->set_packet(pkt, 0);
if ((ret = send_message(msg)) != ERROR_SUCCESS) {
if ((ret = __send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send starting play msg failed. ret=%d", ret);
return ret;
}
... ... @@ -460,10 +454,8 @@ int SrsBandCheckClient::send_pub_data()
int data_count = 100;
while (true) {
SrsCommonMessage* msg = new SrsCommonMessage;
SrsBandwidthPacket* pkt = new SrsBandwidthPacket;
pkt->command_name = SRS_BW_CHECK_PUBLISHING;
msg->set_packet(pkt, 0);
for (int i = 0; i < data_count; ++i) {
std::stringstream seq;
... ... @@ -473,7 +465,7 @@ int SrsBandCheckClient::send_pub_data()
}
data_count += 100;
if ((ret = send_message(msg)) != ERROR_SUCCESS) {
if ((ret = __send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send publish message failed.ret=%d", ret);
return ret;
}
... ... @@ -493,11 +485,13 @@ int SrsBandCheckClient::expect_stop_pub()
this->set_recv_timeout(1000 * 1000);
this->set_send_timeout(1000 * 1000);
SrsCommonMessage* msg;
__SrsMessage* msg;
SrsBandwidthPacket* pkt;
if ((ret = srs_rtmp_expect_message<SrsBandwidthPacket>(this->protocol, &msg, &pkt)) != ERROR_SUCCESS) {
if ((ret = __srs_rtmp_expect_message<SrsBandwidthPacket>(this->protocol, &msg, &pkt)) != ERROR_SUCCESS) {
return ret;
}
SrsAutoFree(__SrsMessage, msg, false);
SrsAutoFree(SrsBandwidthPacket, pkt, false);
if (pkt->command_name == SRS_BW_CHECK_STOP_PUBLISH) {
return ret;
}
... ... @@ -510,13 +504,14 @@ int SrsBandCheckClient::expect_finished()
int ret = ERROR_SUCCESS;
while (true) {
SrsCommonMessage* msg = NULL;
__SrsMessage* msg = NULL;
SrsBandwidthPacket* pkt = NULL;
if ((ret = srs_rtmp_expect_message<SrsBandwidthPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
if ((ret = __srs_rtmp_expect_message<SrsBandwidthPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
srs_error("expect finished message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsCommonMessage, msg, false);
SrsAutoFree(__SrsMessage, msg, false);
SrsAutoFree(SrsBandwidthPacket, pkt, false);
srs_info("get bandcheck finished message");
if (pkt->command_name == SRS_BW_CHECK_FINISHED) {
... ... @@ -596,12 +591,9 @@ int SrsBandCheckClient::send_stopped_pub()
{
int ret = ERROR_SUCCESS;
SrsCommonMessage* msg = new SrsCommonMessage;
SrsBandwidthPacket* pkt = new SrsBandwidthPacket;
pkt->command_name = SRS_BW_CHECK_STOPPED_PUBLISH;
msg->set_packet(pkt, 0);
if ((ret = send_message(msg)) != ERROR_SUCCESS) {
if ((ret = __send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send stopped pub msg failed. ret=%d", ret);
return ret;
}
... ... @@ -614,12 +606,9 @@ int SrsBandCheckClient::send_final()
{
int ret = ERROR_SUCCESS;
SrsCommonMessage* msg = new SrsCommonMessage;
SrsBandwidthPacket* pkt = new SrsBandwidthPacket;
pkt->command_name = SRS_BW_CHECK_FLASH_FINAL;
msg->set_packet(pkt, 0);
if ((ret = send_message(msg)) != ERROR_SUCCESS) {
if ((ret = __send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send final msg failed. ret=%d", ret);
return ret;
}
... ...
... ... @@ -372,16 +372,6 @@ int SrsRtmpClient::get_send_kbps()
return protocol->get_send_kbps();
}
int SrsRtmpClient::recv_message(SrsCommonMessage** pmsg)
{
return protocol->recv_message(pmsg);
}
int SrsRtmpClient::send_message(ISrsMessage* msg)
{
return protocol->send_message(msg);
}
int SrsRtmpClient::__recv_message(__SrsMessage** pmsg)
{
return protocol->__recv_message(pmsg);
... ... @@ -462,9 +452,7 @@ int SrsRtmpClient::connect_app(string app, string tc_url)
// Connect(vhost, app)
if (true) {
SrsCommonMessage* msg = new SrsCommonMessage();
SrsConnectAppPacket* pkt = new SrsConnectAppPacket();
msg->set_packet(pkt, 0);
pkt->command_object = SrsAmf0Any::object();
pkt->command_object->set("app", SrsAmf0Any::str(app.c_str()));
... ... @@ -478,32 +466,29 @@ int SrsRtmpClient::connect_app(string app, string tc_url)
pkt->command_object->set("pageUrl", SrsAmf0Any::str());
pkt->command_object->set("objectEncoding", SrsAmf0Any::number(0));
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
if ((ret = protocol->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
return ret;
}
}
// Set Window Acknowledgement size(2500000)
if (true) {
SrsCommonMessage* msg = new SrsCommonMessage();
SrsSetWindowAckSizePacket* pkt = new SrsSetWindowAckSizePacket();
pkt->ackowledgement_window_size = 2500000;
msg->set_packet(pkt, 0);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
if ((ret = protocol->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
return ret;
}
}
// expect connect _result
SrsCommonMessage* msg = NULL;
__SrsMessage* msg = NULL;
SrsConnectAppResPacket* pkt = NULL;
if ((ret = srs_rtmp_expect_message<SrsConnectAppResPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
if ((ret = __srs_rtmp_expect_message<SrsConnectAppResPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
srs_error("expect connect app response message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsCommonMessage, msg, false);
SrsAutoFree(__SrsMessage, msg, false);
SrsAutoFree(SrsConnectAppResPacket, pkt, false);
srs_info("get connect app response message");
return ret;
... ... @@ -515,25 +500,22 @@ int SrsRtmpClient::create_stream(int& stream_id)
// CreateStream
if (true) {
SrsCommonMessage* msg = new SrsCommonMessage();
SrsCreateStreamPacket* pkt = new SrsCreateStreamPacket();
msg->set_packet(pkt, 0);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
if ((ret = protocol->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
return ret;
}
}
// CreateStream _result.
if (true) {
SrsCommonMessage* msg = NULL;
__SrsMessage* msg = NULL;
SrsCreateStreamResPacket* pkt = NULL;
if ((ret = srs_rtmp_expect_message<SrsCreateStreamResPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
if ((ret = __srs_rtmp_expect_message<SrsCreateStreamResPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
srs_error("expect create stream response message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsCommonMessage, msg, false);
SrsAutoFree(__SrsMessage, msg, false);
SrsAutoFree(SrsCreateStreamResPacket, pkt, false);
srs_info("get create stream response message");
stream_id = (int)pkt->stream_id;
... ... @@ -548,13 +530,9 @@ int SrsRtmpClient::play(string stream, int stream_id)
// Play(stream)
if (true) {
SrsCommonMessage* msg = new SrsCommonMessage();
SrsPlayPacket* pkt = new SrsPlayPacket();
pkt->stream_name = stream;
msg->set_packet(pkt, stream_id);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
if ((ret = protocol->__send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) {
srs_error("send play stream failed. "
"stream=%s, stream_id=%d, ret=%d",
stream.c_str(), stream_id, ret);
... ... @@ -565,15 +543,13 @@ int SrsRtmpClient::play(string stream, int stream_id)
// SetBufferLength(1000ms)
int buffer_length_ms = 1000;
if (true) {
SrsCommonMessage* msg = new SrsCommonMessage();
SrsUserControlPacket* pkt = new SrsUserControlPacket();
pkt->event_type = SrcPCUCSetBufferLength;
pkt->event_data = stream_id;
pkt->extra_data = buffer_length_ms;
msg->set_packet(pkt, 0);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
if ((ret = protocol->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send set buffer length failed. "
"stream=%s, stream_id=%d, bufferLength=%d, ret=%d",
stream.c_str(), stream_id, buffer_length_ms, ret);
... ... @@ -583,13 +559,9 @@ int SrsRtmpClient::play(string stream, int stream_id)
// SetChunkSize
if (true) {
SrsCommonMessage* msg = new SrsCommonMessage();
SrsSetChunkSizePacket* pkt = new SrsSetChunkSizePacket();
pkt->chunk_size = SRS_CONF_DEFAULT_CHUNK_SIZE;
msg->set_packet(pkt, 0);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
if ((ret = protocol->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send set chunk size failed. "
"stream=%s, chunk_size=%d, ret=%d",
stream.c_str(), SRS_CONF_DEFAULT_CHUNK_SIZE, ret);
... ... @@ -606,13 +578,9 @@ int SrsRtmpClient::publish(string stream, int stream_id)
// SetChunkSize
if (true) {
SrsCommonMessage* msg = new SrsCommonMessage();
SrsSetChunkSizePacket* pkt = new SrsSetChunkSizePacket();
pkt->chunk_size = SRS_CONF_DEFAULT_CHUNK_SIZE;
msg->set_packet(pkt, 0);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
if ((ret = protocol->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send set chunk size failed. "
"stream=%s, chunk_size=%d, ret=%d",
stream.c_str(), SRS_CONF_DEFAULT_CHUNK_SIZE, ret);
... ... @@ -622,13 +590,9 @@ int SrsRtmpClient::publish(string stream, int stream_id)
// publish(stream)
if (true) {
SrsCommonMessage* msg = new SrsCommonMessage();
SrsPublishPacket* pkt = new SrsPublishPacket();
pkt->stream_name = stream;
msg->set_packet(pkt, stream_id);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
if ((ret = protocol->__send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) {
srs_error("send publish message failed. "
"stream=%s, stream_id=%d, ret=%d",
stream.c_str(), stream_id, ret);
... ... @@ -647,12 +611,8 @@ int SrsRtmpClient::fmle_publish(string stream, int& stream_id)
// SrsFMLEStartPacket
if (true) {
SrsCommonMessage* msg = new SrsCommonMessage();
SrsFMLEStartPacket* pkt = SrsFMLEStartPacket::create_release_stream(stream);
msg->set_packet(pkt, 0);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
if ((ret = protocol->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send FMLE publish "
"release stream failed. stream=%s, ret=%d", stream.c_str(), ret);
return ret;
... ... @@ -661,12 +621,8 @@ int SrsRtmpClient::fmle_publish(string stream, int& stream_id)
// FCPublish
if (true) {
SrsCommonMessage* msg = new SrsCommonMessage();
SrsFMLEStartPacket* pkt = SrsFMLEStartPacket::create_FC_publish(stream);
msg->set_packet(pkt, 0);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
if ((ret = protocol->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send FMLE publish "
"FCPublish failed. stream=%s, ret=%d", stream.c_str(), ret);
return ret;
... ... @@ -675,13 +631,9 @@ int SrsRtmpClient::fmle_publish(string stream, int& stream_id)
// CreateStream
if (true) {
SrsCommonMessage* msg = new SrsCommonMessage();
SrsCreateStreamPacket* pkt = new SrsCreateStreamPacket();
pkt->transaction_id = 4;
msg->set_packet(pkt, 0);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
if ((ret = protocol->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send FMLE publish "
"createStream failed. stream=%s, ret=%d", stream.c_str(), ret);
return ret;
... ... @@ -690,13 +642,14 @@ int SrsRtmpClient::fmle_publish(string stream, int& stream_id)
// expect result of CreateStream
if (true) {
SrsCommonMessage* msg = NULL;
__SrsMessage* msg = NULL;
SrsCreateStreamResPacket* pkt = NULL;
if ((ret = srs_rtmp_expect_message<SrsCreateStreamResPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
if ((ret = __srs_rtmp_expect_message<SrsCreateStreamResPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
srs_error("expect create stream response message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsCommonMessage, msg, false);
SrsAutoFree(__SrsMessage, msg, false);
SrsAutoFree(SrsCreateStreamResPacket, pkt, false);
srs_info("get create stream response message");
stream_id = (int)pkt->stream_id;
... ... @@ -704,13 +657,9 @@ int SrsRtmpClient::fmle_publish(string stream, int& stream_id)
// publish(stream)
if (true) {
SrsCommonMessage* msg = new SrsCommonMessage();
SrsPublishPacket* pkt = new SrsPublishPacket();
pkt->stream_name = stream;
msg->set_packet(pkt, stream_id);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
if ((ret = protocol->__send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) {
srs_error("send FMLE publish publish failed. "
"stream=%s, stream_id=%d, ret=%d", stream.c_str(), stream_id, ret);
return ret;
... ... @@ -778,16 +727,6 @@ int SrsRtmpServer::get_send_kbps()
return protocol->get_send_kbps();
}
int SrsRtmpServer::recv_message(SrsCommonMessage** pmsg)
{
return protocol->recv_message(pmsg);
}
int SrsRtmpServer::send_message(ISrsMessage* msg)
{
return protocol->send_message(msg);
}
int SrsRtmpServer::__recv_message(__SrsMessage** pmsg)
{
return protocol->__recv_message(pmsg);
... ... @@ -841,6 +780,7 @@ int SrsRtmpServer::connect_app(SrsRequest* req)
return ret;
}
SrsAutoFree(__SrsMessage, msg, false);
SrsAutoFree(SrsConnectAppPacket, pkt, false);
srs_info("get connect app message");
SrsAmf0Any* prop = NULL;
... ... @@ -954,8 +894,7 @@ void SrsRtmpServer::response_connect_reject(SrsRequest *req, const char* desc)
pkt->props->set(StatusDescription, SrsAmf0Any::str(desc));
//pkt->props->set("objectEncoding", SrsAmf0Any::number(req->objectEncoding));
SrsCommonMessage* msg = (new SrsCommonMessage())->set_packet(pkt, 0);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
if ((ret = protocol->__send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send connect app response rejected message failed. ret=%d", ret);
return;
}
... ... @@ -1197,6 +1136,8 @@ int SrsRtmpServer::start_fmle_publish(int stream_id)
srs_info("recv FCPublish request message success.");
SrsAutoFree(__SrsMessage, msg, false);
SrsAutoFree(SrsFMLEStartPacket, pkt, false);
fc_publish_tid = pkt->transaction_id;
}
// FCPublish response
... ... @@ -1221,6 +1162,8 @@ int SrsRtmpServer::start_fmle_publish(int stream_id)
srs_info("recv createStream request message success.");
SrsAutoFree(__SrsMessage, msg, false);
SrsAutoFree(SrsCreateStreamPacket, pkt, false);
create_stream_tid = pkt->transaction_id;
}
// createStream response
... ... @@ -1244,6 +1187,7 @@ int SrsRtmpServer::start_fmle_publish(int stream_id)
srs_info("recv publish request message success.");
SrsAutoFree(__SrsMessage, msg, false);
SrsAutoFree(SrsPublishPacket, pkt, false);
}
// publish response onFCPublish(NetStream.Publish.Start)
if (true) {
... ...
... ... @@ -164,8 +164,6 @@ public:
virtual int64_t get_send_bytes();
virtual int get_recv_kbps();
virtual int get_send_kbps();
virtual int recv_message(SrsCommonMessage** pmsg);
virtual int send_message(ISrsMessage* msg);
virtual int __recv_message(__SrsMessage** pmsg);
virtual int __decode_message(__SrsMessage* msg, SrsPacket** ppacket);
virtual int __send_and_free_message(__SrsMessage* msg);
... ... @@ -213,8 +211,6 @@ public:
virtual int64_t get_send_bytes();
virtual int get_recv_kbps();
virtual int get_send_kbps();
virtual int recv_message(SrsCommonMessage** pmsg);
virtual int send_message(ISrsMessage* msg);
virtual int __recv_message(__SrsMessage** pmsg);
virtual int __decode_message(__SrsMessage* msg, SrsPacket** ppacket);
virtual int __send_and_free_message(__SrsMessage* msg);
... ...
... ... @@ -306,16 +306,6 @@ SrsProtocol::SrsProtocol(ISrsProtocolReaderWriter* io)
SrsProtocol::~SrsProtocol()
{
if (true) {
std::map<int, SrsChunkStream*>::iterator it;
for (it = chunk_streams.begin(); it != chunk_streams.end(); ++it) {
SrsChunkStream* stream = it->second;
srs_freep(stream);
}
chunk_streams.clear();
}
if (true) {
std::map<int, __SrsChunkStream*>::iterator it;
for (it = __chunk_streams.begin(); it != __chunk_streams.end(); ++it) {
... ... @@ -819,6 +809,9 @@ int SrsProtocol::__recv_interlaced_message(__SrsMessage** pmsg)
if (__chunk_streams.find(cid) == __chunk_streams.end()) {
chunk = __chunk_streams[cid] = new __SrsChunkStream(cid);
// set the perfer cid of chunk,
// which will copy to the message received.
chunk->header.perfer_cid = cid;
srs_verbose("cache new chunk stream: fmt=%d, cid=%d", fmt, cid);
} else {
chunk = __chunk_streams[cid];
... ... @@ -979,7 +972,6 @@ int SrsProtocol::__read_message_header(__SrsChunkStream* chunk, char fmt, int bh
// create msg when new chunk stream start
if (!chunk->msg) {
chunk->msg = new __SrsMessage();
chunk->msg->header.perfer_cid = chunk->cid;
srs_verbose("create message for new chunk, fmt=%d, cid=%d", fmt, chunk->cid);
}
... ... @@ -1355,187 +1347,13 @@ int SrsProtocol::__on_send_message(__SrsMessage* msg, SrsPacket* packet)
return ret;
}
int SrsProtocol::recv_message(SrsCommonMessage** pmsg)
{
*pmsg = NULL;
int ret = ERROR_SUCCESS;
while (true) {
SrsCommonMessage* msg = NULL;
if ((ret = recv_interlaced_message(&msg)) != ERROR_SUCCESS) {
if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
srs_error("recv interlaced message failed. ret=%d", ret);
}
return ret;
}
srs_verbose("entire msg received");
if (!msg) {
continue;
}
if (msg->size <= 0 || msg->header.payload_length <= 0) {
srs_trace("ignore empty message(type=%d, size=%d, time=%"PRId64", sid=%d).",
msg->header.message_type, msg->header.payload_length,
msg->header.timestamp, msg->header.stream_id);
srs_freep(msg);
continue;
}
if ((ret = on_recv_message(msg)) != ERROR_SUCCESS) {
srs_error("hook the received msg failed. ret=%d", ret);
srs_freep(msg);
return ret;
}
srs_verbose("get a msg with raw/undecoded payload");
*pmsg = msg;
break;
}
return ret;
}
int SrsProtocol::send_message(ISrsMessage* msg)
{
int ret = ERROR_SUCCESS;
// free msg whatever return value.
SrsAutoFree(ISrsMessage, msg, false);
if ((ret = msg->encode_packet()) != ERROR_SUCCESS) {
srs_error("encode packet to message payload failed. ret=%d", ret);
return ret;
}
srs_info("encode packet to message payload success");
// p set to current write position,
// it's ok when payload is NULL and size is 0.
char* p = (char*)msg->payload;
// always write the header event payload is empty.
do {
// generate the header.
char* pheader = out_header_cache;
if (p == (char*)msg->payload) {
// write new chunk stream header, fmt is 0
*pheader++ = 0x00 | (msg->get_perfer_cid() & 0x3F);
// chunk message header, 11 bytes
// timestamp, 3bytes, big-endian
u_int32_t timestamp = (u_int32_t)msg->header.timestamp;
if (timestamp >= RTMP_EXTENDED_TIMESTAMP) {
*pheader++ = 0xFF;
*pheader++ = 0xFF;
*pheader++ = 0xFF;
} else {
pp = (char*)&timestamp;
*pheader++ = pp[2];
*pheader++ = pp[1];
*pheader++ = pp[0];
}
// message_length, 3bytes, big-endian
pp = (char*)&msg->header.payload_length;
*pheader++ = pp[2];
*pheader++ = pp[1];
*pheader++ = pp[0];
// message_type, 1bytes
*pheader++ = msg->header.message_type;
// message_length, 3bytes, little-endian
pp = (char*)&msg->header.stream_id;
*pheader++ = pp[0];
*pheader++ = pp[1];
*pheader++ = pp[2];
*pheader++ = pp[3];
// chunk extended timestamp header, 0 or 4 bytes, big-endian
if(timestamp >= RTMP_EXTENDED_TIMESTAMP){
pp = (char*)&timestamp;
*pheader++ = pp[3];
*pheader++ = pp[2];
*pheader++ = pp[1];
*pheader++ = pp[0];
}
} else {
// write no message header chunk stream, fmt is 3
*pheader++ = 0xC0 | (msg->get_perfer_cid() & 0x3F);
// chunk extended timestamp header, 0 or 4 bytes, big-endian
// 6.1.3. Extended Timestamp
// This field is transmitted only when the normal time stamp in the
// chunk message header is set to 0x00ffffff. If normal time stamp is
// set to any value less than 0x00ffffff, this field MUST NOT be
// present. This field MUST NOT be present if the timestamp field is not
// present. Type 3 chunks MUST NOT have this field.
// adobe changed for Type3 chunk:
// FMLE always sendout the extended-timestamp,
// must send the extended-timestamp to FMS,
// must send the extended-timestamp to flash-player.
// @see: ngx_rtmp_prepare_message
// @see: http://blog.csdn.net/win_lin/article/details/13363699
u_int32_t timestamp = (u_int32_t)msg->header.timestamp;
if(timestamp >= RTMP_EXTENDED_TIMESTAMP){
pp = (char*)&timestamp;
*pheader++ = pp[3];
*pheader++ = pp[2];
*pheader++ = pp[1];
*pheader++ = pp[0];
}
}
// sendout header and payload by writev.
// decrease the sys invoke count to get higher performance.
int payload_size = msg->size - (p - (char*)msg->payload);
payload_size = srs_min(payload_size, out_chunk_size);
// always has header
int header_size = pheader - out_header_cache;
srs_assert(header_size > 0);
// send by writev
iovec iov[2];
iov[0].iov_base = out_header_cache;
iov[0].iov_len = header_size;
iov[1].iov_base = p;
iov[1].iov_len = payload_size;
ssize_t nwrite;
if ((ret = skt->writev(iov, 2, &nwrite)) != ERROR_SUCCESS) {
srs_error("send with writev failed. ret=%d", ret);
return ret;
}
// consume sendout bytes when not empty packet.
if (msg->payload && msg->size > 0) {
p += payload_size;
}
} while (p < (char*)msg->payload + msg->size);
if ((ret = on_send_message(msg)) != ERROR_SUCCESS) {
srs_error("hook the send message failed. ret=%d", ret);
return ret;
}
return ret;
}
int SrsProtocol::response_acknowledgement_message()
{
int ret = ERROR_SUCCESS;
SrsCommonMessage* msg = new SrsCommonMessage();
SrsAcknowledgementPacket* pkt = new SrsAcknowledgementPacket();
in_ack_size.acked_size = pkt->sequence_number = skt->get_recv_bytes();
msg->set_packet(pkt, 0);
if ((ret = send_message(msg)) != ERROR_SUCCESS) {
if ((ret = __send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send acknowledgement failed. ret=%d", ret);
return ret;
}
... ... @@ -1550,14 +1368,12 @@ int SrsProtocol::response_ping_message(int32_t timestamp)
srs_trace("get a ping request, response it. timestamp=%d", timestamp);
SrsCommonMessage* msg = new SrsCommonMessage();
SrsUserControlPacket* pkt = new SrsUserControlPacket();
pkt->event_type = SrcPCUCPingResponse;
pkt->event_data = timestamp;
msg->set_packet(pkt, 0);
if ((ret = send_message(msg)) != ERROR_SUCCESS) {
if ((ret = __send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
srs_error("send ping response failed. ret=%d", ret);
return ret;
}
... ... @@ -1566,1108 +1382,140 @@ int SrsProtocol::response_ping_message(int32_t timestamp)
return ret;
}
int SrsProtocol::on_recv_message(SrsCommonMessage* msg)
SrsMessageHeader::SrsMessageHeader()
{
int ret = ERROR_SUCCESS;
message_type = 0;
payload_length = 0;
timestamp_delta = 0;
stream_id = 0;
srs_assert(msg != NULL);
timestamp = 0;
perfer_cid = RTMP_CID_ProtocolControl;
}
// acknowledgement
if (in_ack_size.ack_window_size > 0 && skt->get_recv_bytes() - in_ack_size.acked_size > in_ack_size.ack_window_size) {
if ((ret = response_acknowledgement_message()) != ERROR_SUCCESS) {
return ret;
}
}
SrsMessageHeader::~SrsMessageHeader()
{
}
switch (msg->header.message_type) {
case RTMP_MSG_SetChunkSize:
case RTMP_MSG_UserControlMessage:
case RTMP_MSG_WindowAcknowledgementSize:
if ((ret = msg->decode_packet(this)) != ERROR_SUCCESS) {
srs_error("decode packet from message payload failed. ret=%d", ret);
return ret;
}
srs_verbose("decode packet from message payload success.");
break;
}
bool SrsMessageHeader::is_audio()
{
return message_type == RTMP_MSG_AudioMessage;
}
switch (msg->header.message_type) {
case RTMP_MSG_WindowAcknowledgementSize: {
SrsSetWindowAckSizePacket* pkt = dynamic_cast<SrsSetWindowAckSizePacket*>(msg->get_packet());
srs_assert(pkt != NULL);
bool SrsMessageHeader::is_video()
{
return message_type == RTMP_MSG_VideoMessage;
}
if (pkt->ackowledgement_window_size > 0) {
in_ack_size.ack_window_size = pkt->ackowledgement_window_size;
srs_trace("set ack window size to %d", pkt->ackowledgement_window_size);
} else {
srs_warn("ignored. set ack window size is %d", pkt->ackowledgement_window_size);
}
break;
}
case RTMP_MSG_SetChunkSize: {
SrsSetChunkSizePacket* pkt = dynamic_cast<SrsSetChunkSizePacket*>(msg->get_packet());
srs_assert(pkt != NULL);
bool SrsMessageHeader::is_amf0_command()
{
return message_type == RTMP_MSG_AMF0CommandMessage;
}
in_chunk_size = pkt->chunk_size;
bool SrsMessageHeader::is_amf0_data()
{
return message_type == RTMP_MSG_AMF0DataMessage;
}
srs_trace("set input chunk size to %d", pkt->chunk_size);
break;
}
case RTMP_MSG_UserControlMessage: {
SrsUserControlPacket* pkt = dynamic_cast<SrsUserControlPacket*>(msg->get_packet());
srs_assert(pkt != NULL);
bool SrsMessageHeader::is_amf3_command()
{
return message_type == RTMP_MSG_AMF3CommandMessage;
}
if (pkt->event_type == SrcPCUCSetBufferLength) {
srs_trace("ignored. set buffer length to %d", pkt->extra_data);
}
if (pkt->event_type == SrcPCUCPingRequest) {
if ((ret = response_ping_message(pkt->event_data)) != ERROR_SUCCESS) {
return ret;
}
}
break;
}
}
bool SrsMessageHeader::is_amf3_data()
{
return message_type == RTMP_MSG_AMF3DataMessage;
}
return ret;
bool SrsMessageHeader::is_window_ackledgement_size()
{
return message_type == RTMP_MSG_WindowAcknowledgementSize;
}
int SrsProtocol::on_send_message(ISrsMessage* msg)
bool SrsMessageHeader::is_ackledgement()
{
int ret = ERROR_SUCCESS;
return message_type == RTMP_MSG_Acknowledgement;
}
if (!msg->can_decode()) {
srs_verbose("ignore the un-decodable message.");
return ret;
}
SrsCommonMessage* common_msg = dynamic_cast<SrsCommonMessage*>(msg);
if (!common_msg) {
srs_verbose("ignore the shared ptr message.");
return ret;
}
// for proxy, the common msg is not decoded, ignore.
if (!common_msg->has_packet()) {
srs_verbose("ignore the proxy common message.");
return ret;
}
srs_assert(common_msg != NULL);
switch (common_msg->header.message_type) {
case RTMP_MSG_SetChunkSize: {
SrsSetChunkSizePacket* pkt = dynamic_cast<SrsSetChunkSizePacket*>(common_msg->get_packet());
srs_assert(pkt != NULL);
out_chunk_size = pkt->chunk_size;
srs_trace("set output chunk size to %d", pkt->chunk_size);
break;
}
case RTMP_MSG_AMF0CommandMessage:
case RTMP_MSG_AMF3CommandMessage: {
if (true) {
SrsConnectAppPacket* pkt = NULL;
pkt = dynamic_cast<SrsConnectAppPacket*>(common_msg->get_packet());
if (pkt) {
requests[pkt->transaction_id] = pkt->command_name;
break;
}
}
if (true) {
SrsCreateStreamPacket* pkt = NULL;
pkt = dynamic_cast<SrsCreateStreamPacket*>(common_msg->get_packet());
if (pkt) {
requests[pkt->transaction_id] = pkt->command_name;
break;
}
}
if (true) {
SrsFMLEStartPacket* pkt = NULL;
pkt = dynamic_cast<SrsFMLEStartPacket*>(common_msg->get_packet());
if (pkt) {
requests[pkt->transaction_id] = pkt->command_name;
break;
}
}
break;
}
}
return ret;
}
int SrsProtocol::recv_interlaced_message(SrsCommonMessage** pmsg)
{
int ret = ERROR_SUCCESS;
// chunk stream basic header.
char fmt = 0;
int cid = 0;
int bh_size = 0;
if ((ret = read_basic_header(fmt, cid, bh_size)) != ERROR_SUCCESS) {
if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
srs_error("read basic header failed. ret=%d", ret);
}
return ret;
}
srs_verbose("read basic header success. fmt=%d, cid=%d, bh_size=%d", fmt, cid, bh_size);
// once we got the chunk message header,
// that is there is a real message in cache,
// increase the timeout to got it.
// For example, in the play loop, we set timeout to 100ms,
// when we got a chunk header, we should increase the timeout,
// or we maybe timeout and disconnect the client.
int64_t timeout_us = skt->get_recv_timeout();
if (!skt->is_never_timeout(timeout_us)) {
int64_t pkt_timeout_us = srs_max(timeout_us, SRS_MIN_RECV_TIMEOUT_US);
skt->set_recv_timeout(pkt_timeout_us);
srs_verbose("change recv timeout_us "
"from %"PRId64" to %"PRId64"", timeout_us, pkt_timeout_us);
}
// get the cached chunk stream.
SrsChunkStream* chunk = NULL;
if (chunk_streams.find(cid) == chunk_streams.end()) {
chunk = chunk_streams[cid] = new SrsChunkStream(cid);
srs_verbose("cache new chunk stream: fmt=%d, cid=%d", fmt, cid);
} else {
chunk = chunk_streams[cid];
srs_verbose("cached chunk stream: fmt=%d, cid=%d, size=%d, message(type=%d, size=%d, time=%"PRId64", sid=%d)",
chunk->fmt, chunk->cid, (chunk->msg? chunk->msg->size : 0), chunk->header.message_type, chunk->header.payload_length,
chunk->header.timestamp, chunk->header.stream_id);
}
// chunk stream message header
int mh_size = 0;
if ((ret = read_message_header(chunk, fmt, bh_size, mh_size)) != ERROR_SUCCESS) {
if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
srs_error("read message header failed. ret=%d", ret);
}
return ret;
}
srs_verbose("read message header success. "
"fmt=%d, mh_size=%d, ext_time=%d, size=%d, message(type=%d, size=%d, time=%"PRId64", sid=%d)",
fmt, mh_size, chunk->extended_timestamp, (chunk->msg? chunk->msg->size : 0), chunk->header.message_type,
chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id);
// read msg payload from chunk stream.
SrsCommonMessage* msg = NULL;
int payload_size = 0;
if ((ret = read_message_payload(chunk, bh_size, mh_size, payload_size, &msg)) != ERROR_SUCCESS) {
if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
srs_error("read message payload failed. ret=%d", ret);
}
return ret;
}
// reset the recv timeout
if (!skt->is_never_timeout(timeout_us)) {
skt->set_recv_timeout(timeout_us);
srs_verbose("reset recv timeout_us to %"PRId64"", timeout_us);
}
// not got an entire RTMP message, try next chunk.
if (!msg) {
srs_verbose("get partial message success. chunk_payload_size=%d, size=%d, message(type=%d, size=%d, time=%"PRId64", sid=%d)",
payload_size, (msg? msg->size : (chunk->msg? chunk->msg->size : 0)), chunk->header.message_type, chunk->header.payload_length,
chunk->header.timestamp, chunk->header.stream_id);
return ret;
}
*pmsg = msg;
srs_info("get entire message success. chunk_payload_size=%d, size=%d, message(type=%d, size=%d, time=%"PRId64", sid=%d)",
payload_size, (msg? msg->size : (chunk->msg? chunk->msg->size : 0)), chunk->header.message_type, chunk->header.payload_length,
chunk->header.timestamp, chunk->header.stream_id);
return ret;
}
int SrsProtocol::read_basic_header(char& fmt, int& cid, int& bh_size)
{
int ret = ERROR_SUCCESS;
int required_size = 1;
if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) {
if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
srs_error("read 1bytes basic header failed. required_size=%d, ret=%d", required_size, ret);
}
return ret;
}
char* p = buffer->bytes();
fmt = (*p >> 6) & 0x03;
cid = *p & 0x3f;
bh_size = 1;
if (cid > 1) {
srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", bh_size, fmt, cid);
return ret;
}
if (cid == 0) {
required_size = 2;
if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) {
if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
srs_error("read 2bytes basic header failed. required_size=%d, ret=%d", required_size, ret);
}
return ret;
}
cid = 64;
cid += *(++p);
bh_size = 2;
srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", bh_size, fmt, cid);
} else if (cid == 1) {
required_size = 3;
if ((ret = buffer->ensure_buffer_bytes(skt, 3)) != ERROR_SUCCESS) {
if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
srs_error("read 3bytes basic header failed. required_size=%d, ret=%d", required_size, ret);
}
return ret;
}
cid = 64;
cid += *(++p);
cid += *(++p) * 256;
bh_size = 3;
srs_verbose("%dbytes basic header parsed. fmt=%d, cid=%d", bh_size, fmt, cid);
} else {
srs_error("invalid path, impossible basic header.");
srs_assert(false);
}
return ret;
}
int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_size, int& mh_size)
{
int ret = ERROR_SUCCESS;
/**
* we should not assert anything about fmt, for the first packet.
* (when first packet, the chunk->msg is NULL).
* the fmt maybe 0/1/2/3, the FMLE will send a 0xC4 for some audio packet.
* the previous packet is:
* 04 // fmt=0, cid=4
* 00 00 1a // timestamp=26
* 00 00 9d // payload_length=157
* 08 // message_type=8(audio)
* 01 00 00 00 // stream_id=1
* the current packet maybe:
* c4 // fmt=3, cid=4
* it's ok, for the packet is audio, and timestamp delta is 26.
* the current packet must be parsed as:
* fmt=0, cid=4
* timestamp=26+26=52
* payload_length=157
* message_type=8(audio)
* stream_id=1
* so we must update the timestamp even fmt=3 for first packet.
*/
// fresh packet used to update the timestamp even fmt=3 for first packet.
bool is_fresh_packet = !chunk->msg;
// but, we can ensure that when a chunk stream is fresh,
// the fmt must be 0, a new stream.
if (chunk->msg_count == 0 && fmt != RTMP_FMT_TYPE0) {
ret = ERROR_RTMP_CHUNK_START;
srs_error("chunk stream is fresh, "
"fmt must be %d, actual is %d. ret=%d", RTMP_FMT_TYPE0, fmt, ret);
return ret;
}
// when exists cache msg, means got an partial message,
// the fmt must not be type0 which means new message.
if (chunk->msg && fmt == RTMP_FMT_TYPE0) {
ret = ERROR_RTMP_CHUNK_START;
srs_error("chunk stream exists, "
"fmt must not be %d, actual is %d. ret=%d", RTMP_FMT_TYPE0, fmt, ret);
return ret;
}
// create msg when new chunk stream start
if (!chunk->msg) {
chunk->msg = new SrsCommonMessage();
srs_verbose("create message for new chunk, fmt=%d, cid=%d", fmt, chunk->cid);
}
// read message header from socket to buffer.
static char mh_sizes[] = {11, 7, 3, 0};
mh_size = mh_sizes[(int)fmt];
srs_verbose("calc chunk message header size. fmt=%d, mh_size=%d", fmt, mh_size);
int required_size = bh_size + mh_size;
if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) {
if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret);
}
return ret;
}
char* p = buffer->bytes() + bh_size;
// parse the message header.
// see also: ngx_rtmp_recv
if (fmt <= RTMP_FMT_TYPE2) {
char* pp = (char*)&chunk->header.timestamp_delta;
pp[2] = *p++;
pp[1] = *p++;
pp[0] = *p++;
pp[3] = 0;
// fmt: 0
// timestamp: 3 bytes
// If the timestamp is greater than or equal to 16777215
// (hexadecimal 0x00ffffff), this value MUST be 16777215, and the
// ‘extended timestamp header’ MUST be present. Otherwise, this value
// SHOULD be the entire timestamp.
//
// fmt: 1 or 2
// timestamp delta: 3 bytes
// If the delta is greater than or equal to 16777215 (hexadecimal
// 0x00ffffff), this value MUST be 16777215, and the ‘extended
// timestamp header’ MUST be present. Otherwise, this value SHOULD be
// the entire delta.
chunk->extended_timestamp = (chunk->header.timestamp_delta >= RTMP_EXTENDED_TIMESTAMP);
if (chunk->extended_timestamp) {
// Extended timestamp: 0 or 4 bytes
// This field MUST be sent when the normal timsestamp is set to
// 0xffffff, it MUST NOT be sent if the normal timestamp is set to
// anything else. So for values less than 0xffffff the normal
// timestamp field SHOULD be used in which case the extended timestamp
// MUST NOT be present. For values greater than or equal to 0xffffff
// the normal timestamp field MUST NOT be used and MUST be set to
// 0xffffff and the extended timestamp MUST be sent.
//
// if extended timestamp, the timestamp must >= RTMP_EXTENDED_TIMESTAMP
// we set the timestamp to RTMP_EXTENDED_TIMESTAMP to identify we
// got an extended timestamp.
chunk->header.timestamp = RTMP_EXTENDED_TIMESTAMP;
} else {
if (fmt == RTMP_FMT_TYPE0) {
// 6.1.2.1. Type 0
// For a type-0 chunk, the absolute timestamp of the message is sent
// here.
chunk->header.timestamp = chunk->header.timestamp_delta;
} else {
// 6.1.2.2. Type 1
// 6.1.2.3. Type 2
// For a type-1 or type-2 chunk, the difference between the previous
// chunk's timestamp and the current chunk's timestamp is sent here.
chunk->header.timestamp += chunk->header.timestamp_delta;
}
}
if (fmt <= RTMP_FMT_TYPE1) {
pp = (char*)&chunk->header.payload_length;
pp[2] = *p++;
pp[1] = *p++;
pp[0] = *p++;
pp[3] = 0;
// if msg exists in cache, the size must not changed.
if (chunk->msg->size > 0 && chunk->msg->size != chunk->header.payload_length) {
ret = ERROR_RTMP_PACKET_SIZE;
srs_error("msg exists in chunk cache, "
"size=%d cannot change to %d, ret=%d",
chunk->msg->size, chunk->header.payload_length, ret);
return ret;
}
chunk->header.message_type = *p++;
if (fmt == RTMP_FMT_TYPE0) {
pp = (char*)&chunk->header.stream_id;
pp[0] = *p++;
pp[1] = *p++;
pp[2] = *p++;
pp[3] = *p++;
srs_verbose("header read completed. fmt=%d, mh_size=%d, ext_time=%d, time=%"PRId64", payload=%d, type=%d, sid=%d",
fmt, mh_size, chunk->extended_timestamp, chunk->header.timestamp, chunk->header.payload_length,
chunk->header.message_type, chunk->header.stream_id);
} else {
srs_verbose("header read completed. fmt=%d, mh_size=%d, ext_time=%d, time=%"PRId64", payload=%d, type=%d",
fmt, mh_size, chunk->extended_timestamp, chunk->header.timestamp, chunk->header.payload_length,
chunk->header.message_type);
}
} else {
srs_verbose("header read completed. fmt=%d, mh_size=%d, ext_time=%d, time=%"PRId64"",
fmt, mh_size, chunk->extended_timestamp, chunk->header.timestamp);
}
} else {
// update the timestamp even fmt=3 for first stream
if (is_fresh_packet && !chunk->extended_timestamp) {
chunk->header.timestamp += chunk->header.timestamp_delta;
}
srs_verbose("header read completed. fmt=%d, size=%d, ext_time=%d",
fmt, mh_size, chunk->extended_timestamp);
}
if (chunk->extended_timestamp) {
mh_size += 4;
required_size = bh_size + mh_size;
srs_verbose("read header ext time. fmt=%d, ext_time=%d, mh_size=%d", fmt, chunk->extended_timestamp, mh_size);
if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) {
if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
srs_error("read %dbytes message header failed. required_size=%d, ret=%d", mh_size, required_size, ret);
}
return ret;
}
u_int32_t timestamp = 0x00;
char* pp = (char*)&timestamp;
pp[3] = *p++;
pp[2] = *p++;
pp[1] = *p++;
pp[0] = *p++;
// ffmpeg/librtmp may donot send this filed, need to detect the value.
// @see also: http://blog.csdn.net/win_lin/article/details/13363699
// compare to the chunk timestamp, which is set by chunk message header
// type 0,1 or 2.
u_int32_t chunk_timestamp = chunk->header.timestamp;
if (chunk_timestamp > RTMP_EXTENDED_TIMESTAMP && chunk_timestamp != timestamp) {
mh_size -= 4;
srs_verbose("ignore the 4bytes extended timestamp. mh_size=%d", mh_size);
} else {
chunk->header.timestamp = timestamp;
}
srs_verbose("header read ext_time completed. time=%"PRId64"", chunk->header.timestamp);
}
// the extended-timestamp must be unsigned-int,
// 24bits timestamp: 0xffffff = 16777215ms = 16777.215s = 4.66h
// 32bits timestamp: 0xffffffff = 4294967295ms = 4294967.295s = 1193.046h = 49.71d
// because the rtmp protocol says the 32bits timestamp is about "50 days":
// 3. Byte Order, Alignment, and Time Format
// Because timestamps are generally only 32 bits long, they will roll
// over after fewer than 50 days.
//
// but, its sample says the timestamp is 31bits:
// An application could assume, for example, that all
// adjacent timestamps are within 2^31 milliseconds of each other, so
// 10000 comes after 4000000000, while 3000000000 comes before
// 4000000000.
// and flv specification says timestamp is 31bits:
// Extension of the Timestamp field to form a SI32 value. This
// field represents the upper 8 bits, while the previous
// Timestamp field represents the lower 24 bits of the time in
// milliseconds.
// in a word, 31bits timestamp is ok.
// convert extended timestamp to 31bits.
chunk->header.timestamp &= 0x7fffffff;
// valid message
if (chunk->header.payload_length < 0) {
ret = ERROR_RTMP_MSG_INVLIAD_SIZE;
srs_error("RTMP message size must not be negative. size=%d, ret=%d",
chunk->header.payload_length, ret);
return ret;
}
// copy header to msg
chunk->msg->header = chunk->header;
// increase the msg count, the chunk stream can accept fmt=1/2/3 message now.
chunk->msg_count++;
return ret;
}
int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh_size, int& payload_size, SrsCommonMessage** pmsg)
{
int ret = ERROR_SUCCESS;
// empty message
if (chunk->header.payload_length <= 0) {
// need erase the header in buffer.
buffer->erase(bh_size + mh_size);
srs_trace("get an empty RTMP "
"message(type=%d, size=%d, time=%"PRId64", sid=%d)", chunk->header.message_type,
chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id);
*pmsg = chunk->msg;
chunk->msg = NULL;
return ret;
}
srs_assert(chunk->header.payload_length > 0);
// the chunk payload size.
payload_size = chunk->header.payload_length - chunk->msg->size;
payload_size = srs_min(payload_size, in_chunk_size);
srs_verbose("chunk payload size is %d, message_size=%d, received_size=%d, in_chunk_size=%d",
payload_size, chunk->header.payload_length, chunk->msg->size, in_chunk_size);
// create msg payload if not initialized
if (!chunk->msg->payload) {
chunk->msg->payload = new int8_t[chunk->header.payload_length];
memset(chunk->msg->payload, 0, chunk->header.payload_length);
srs_verbose("create empty payload for RTMP message. size=%d", chunk->header.payload_length);
}
// read payload to buffer
int required_size = bh_size + mh_size + payload_size;
if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) {
if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
srs_error("read payload failed. required_size=%d, ret=%d", required_size, ret);
}
return ret;
}
memcpy(chunk->msg->payload + chunk->msg->size, buffer->bytes() + bh_size + mh_size, payload_size);
buffer->erase(bh_size + mh_size + payload_size);
chunk->msg->size += payload_size;
srs_verbose("chunk payload read completed. bh_size=%d, mh_size=%d, payload_size=%d", bh_size, mh_size, payload_size);
// got entire RTMP message?
if (chunk->header.payload_length == chunk->msg->size) {
*pmsg = chunk->msg;
chunk->msg = NULL;
srs_verbose("get entire RTMP message(type=%d, size=%d, time=%"PRId64", sid=%d)",
chunk->header.message_type, chunk->header.payload_length,
chunk->header.timestamp, chunk->header.stream_id);
return ret;
}
srs_verbose("get partial RTMP message(type=%d, size=%d, time=%"PRId64", sid=%d), partial size=%d",
chunk->header.message_type, chunk->header.payload_length,
chunk->header.timestamp, chunk->header.stream_id,
chunk->msg->size);
return ret;
}
SrsMessageHeader::SrsMessageHeader()
{
message_type = 0;
payload_length = 0;
timestamp_delta = 0;
stream_id = 0;
timestamp = 0;
perfer_cid = RTMP_CID_ProtocolControl;
}
SrsMessageHeader::~SrsMessageHeader()
{
}
bool SrsMessageHeader::is_audio()
{
return message_type == RTMP_MSG_AudioMessage;
}
bool SrsMessageHeader::is_video()
{
return message_type == RTMP_MSG_VideoMessage;
}
bool SrsMessageHeader::is_amf0_command()
{
return message_type == RTMP_MSG_AMF0CommandMessage;
}
bool SrsMessageHeader::is_amf0_data()
{
return message_type == RTMP_MSG_AMF0DataMessage;
}
bool SrsMessageHeader::is_amf3_command()
{
return message_type == RTMP_MSG_AMF3CommandMessage;
}
bool SrsMessageHeader::is_amf3_data()
{
return message_type == RTMP_MSG_AMF3DataMessage;
}
bool SrsMessageHeader::is_window_ackledgement_size()
{
return message_type == RTMP_MSG_WindowAcknowledgementSize;
}
bool SrsMessageHeader::is_ackledgement()
{
return message_type == RTMP_MSG_Acknowledgement;
}
bool SrsMessageHeader::is_set_chunk_size()
{
return message_type == RTMP_MSG_SetChunkSize;
}
bool SrsMessageHeader::is_user_control_message()
{
return message_type == RTMP_MSG_UserControlMessage;
}
void SrsMessageHeader::initialize_amf0_script(int size, int stream)
{
message_type = RTMP_MSG_AMF0DataMessage;
payload_length = (int32_t)size;
timestamp_delta = (int32_t)0;
timestamp = (int64_t)0;
stream_id = (int32_t)stream;
}
void SrsMessageHeader::initialize_audio(int size, u_int32_t time, int stream)
{
message_type = RTMP_MSG_AudioMessage;
payload_length = (int32_t)size;
timestamp_delta = (int32_t)time;
timestamp = (int64_t)time;
stream_id = (int32_t)stream;
}
void SrsMessageHeader::initialize_video(int size, u_int32_t time, int stream)
{
message_type = RTMP_MSG_VideoMessage;
payload_length = (int32_t)size;
timestamp_delta = (int32_t)time;
timestamp = (int64_t)time;
stream_id = (int32_t)stream;
}
SrsChunkStream::SrsChunkStream(int _cid)
{
fmt = 0;
cid = _cid;
extended_timestamp = false;
msg = NULL;
msg_count = 0;
}
SrsChunkStream::~SrsChunkStream()
{
srs_freep(msg);
}
__SrsChunkStream::__SrsChunkStream(int _cid)
{
fmt = 0;
cid = _cid;
extended_timestamp = false;
msg = NULL;
msg_count = 0;
}
__SrsChunkStream::~__SrsChunkStream()
{
srs_freep(msg);
}
__SrsMessage::__SrsMessage()
{
payload = NULL;
size = 0;
}
__SrsMessage::~__SrsMessage()
{
}
__SrsSharedPtrMessage::__SrsSharedPtr::__SrsSharedPtr()
{
payload = NULL;
size = 0;
shared_count = 0;
}
__SrsSharedPtrMessage::__SrsSharedPtr::~__SrsSharedPtr()
{
srs_freepa(payload);
}
__SrsSharedPtrMessage::__SrsSharedPtrMessage()
{
ptr = NULL;
}
__SrsSharedPtrMessage::~__SrsSharedPtrMessage()
{
if (ptr) {
if (ptr->shared_count == 0) {
srs_freep(ptr);
} else {
ptr->shared_count--;
}
}
}
int __SrsSharedPtrMessage::initialize(__SrsMessage* source)
{
int ret = ERROR_SUCCESS;
if ((ret = initialize(&source->header, (char*)source->payload, source->size)) != ERROR_SUCCESS) {
return ret;
}
// detach the payload from source
source->payload = NULL;
source->size = 0;
return ret;
}
int __SrsSharedPtrMessage::initialize(SrsMessageHeader* source, char* payload, int size)
{
int ret = ERROR_SUCCESS;
srs_assert(source != NULL);
if (ptr) {
ret = ERROR_SYSTEM_ASSERT_FAILED;
srs_error("should not set the payload twice. ret=%d", ret);
srs_assert(false);
return ret;
}
header = *source;
header.payload_length = size;
ptr = new __SrsSharedPtr();
// direct attach the data of common message.
ptr->payload = payload;
ptr->size = size;
__SrsMessage::payload = (int8_t*)ptr->payload;
__SrsMessage::size = ptr->size;
return ret;
}
__SrsSharedPtrMessage* __SrsSharedPtrMessage::copy()
{
if (!ptr) {
srs_error("invoke initialize to initialize the ptr.");
srs_assert(false);
return NULL;
}
__SrsSharedPtrMessage* copy = new __SrsSharedPtrMessage();
copy->header = header;
copy->ptr = ptr;
ptr->shared_count++;
copy->payload = (int8_t*)ptr->payload;
copy->size = ptr->size;
return copy;
}
ISrsMessage::ISrsMessage()
{
payload = NULL;
size = 0;
}
ISrsMessage::~ISrsMessage()
{
}
SrsCommonMessage::SrsCommonMessage()
bool SrsMessageHeader::is_set_chunk_size()
{
stream = NULL;
packet = NULL;
return message_type == RTMP_MSG_SetChunkSize;
}
SrsCommonMessage::~SrsCommonMessage()
bool SrsMessageHeader::is_user_control_message()
{
// we must directly free the ptrs,
// nevery use the virtual functions to delete,
// for in the destructor, the virtual functions is disabled.
srs_freepa(payload);
srs_freep(packet);
srs_freep(stream);
return message_type == RTMP_MSG_UserControlMessage;
}
bool SrsCommonMessage::can_decode()
void SrsMessageHeader::initialize_amf0_script(int size, int stream)
{
return true;
message_type = RTMP_MSG_AMF0DataMessage;
payload_length = (int32_t)size;
timestamp_delta = (int32_t)0;
timestamp = (int64_t)0;
stream_id = (int32_t)stream;
}
int SrsCommonMessage::decode_packet(SrsProtocol* protocol)
void SrsMessageHeader::initialize_audio(int size, u_int32_t time, int stream)
{
int ret = ERROR_SUCCESS;
srs_assert(payload != NULL);
srs_assert(size > 0);
if (packet) {
srs_verbose("msg already decoded");
return ret;
}
if (!stream) {
srs_verbose("create decode stream for message.");
stream = new SrsStream();
}
// initialize the decode stream for all message,
// it's ok for the initialize if fast and without memory copy.
if ((ret = stream->initialize((char*)payload, size)) != ERROR_SUCCESS) {
srs_error("initialize stream failed. ret=%d", ret);
return ret;
}
srs_verbose("decode stream initialized success");
// decode specified packet type
if (header.is_amf0_command() || header.is_amf3_command() || header.is_amf0_data() || header.is_amf3_data()) {
srs_verbose("start to decode AMF0/AMF3 command message.");
// skip 1bytes to decode the amf3 command.
if (header.is_amf3_command() && stream->require(1)) {
srs_verbose("skip 1bytes to decode AMF3 command");
stream->skip(1);
}
// amf0 command message.
// need to read the command name.
std::string command;
if ((ret = srs_amf0_read_string(stream, command)) != ERROR_SUCCESS) {
srs_error("decode AMF0/AMF3 command name failed. ret=%d", ret);
return ret;
}
srs_verbose("AMF0/AMF3 command message, command_name=%s", command.c_str());
// result/error packet
if (command == RTMP_AMF0_COMMAND_RESULT || command == RTMP_AMF0_COMMAND_ERROR) {
double transactionId = 0.0;
if ((ret = srs_amf0_read_number(stream, transactionId)) != ERROR_SUCCESS) {
srs_error("decode AMF0/AMF3 transcationId failed. ret=%d", ret);
return ret;
}
srs_verbose("AMF0/AMF3 command id, transcationId=%.2f", transactionId);
// reset stream, for header read completed.
stream->reset();
if (header.is_amf3_command()) {
stream->skip(1);
}
std::string request_name = protocol->get_request_name(transactionId);
if (request_name.empty()) {
ret = ERROR_RTMP_NO_REQUEST;
srs_error("decode AMF0/AMF3 request failed. ret=%d", ret);
return ret;
}
srs_verbose("AMF0/AMF3 request parsed. request_name=%s", request_name.c_str());
if (request_name == RTMP_AMF0_COMMAND_CONNECT) {
srs_info("decode the AMF0/AMF3 response command(%s message).", request_name.c_str());
packet = new SrsConnectAppResPacket();
return packet->decode(stream);
} else if (request_name == RTMP_AMF0_COMMAND_CREATE_STREAM) {
srs_info("decode the AMF0/AMF3 response command(%s message).", request_name.c_str());
packet = new SrsCreateStreamResPacket(0, 0);
return packet->decode(stream);
} else if (request_name == RTMP_AMF0_COMMAND_RELEASE_STREAM
|| request_name == RTMP_AMF0_COMMAND_FC_PUBLISH
|| request_name == RTMP_AMF0_COMMAND_UNPUBLISH) {
srs_info("decode the AMF0/AMF3 response command(%s message).", request_name.c_str());
packet = new SrsFMLEStartResPacket(0);
return packet->decode(stream);
} else {
ret = ERROR_RTMP_NO_REQUEST;
srs_error("decode AMF0/AMF3 request failed. "
"request_name=%s, transactionId=%.2f, ret=%d",
request_name.c_str(), transactionId, ret);
return ret;
}
}
// reset to zero(amf3 to 1) to restart decode.
stream->reset();
if (header.is_amf3_command()) {
stream->skip(1);
}
// decode command object.
if (command == RTMP_AMF0_COMMAND_CONNECT) {
srs_info("decode the AMF0/AMF3 command(connect vhost/app message).");
packet = new SrsConnectAppPacket();
return packet->decode(stream);
} else if(command == RTMP_AMF0_COMMAND_CREATE_STREAM) {
srs_info("decode the AMF0/AMF3 command(createStream message).");
packet = new SrsCreateStreamPacket();
return packet->decode(stream);
} else if(command == RTMP_AMF0_COMMAND_PLAY) {
srs_info("decode the AMF0/AMF3 command(paly message).");
packet = new SrsPlayPacket();
return packet->decode(stream);
} else if(command == RTMP_AMF0_COMMAND_PAUSE) {
srs_info("decode the AMF0/AMF3 command(pause message).");
packet = new SrsPausePacket();
return packet->decode(stream);
} else if(command == RTMP_AMF0_COMMAND_RELEASE_STREAM) {
srs_info("decode the AMF0/AMF3 command(FMLE releaseStream message).");
packet = new SrsFMLEStartPacket();
return packet->decode(stream);
} else if(command == RTMP_AMF0_COMMAND_FC_PUBLISH) {
srs_info("decode the AMF0/AMF3 command(FMLE FCPublish message).");
packet = new SrsFMLEStartPacket();
return packet->decode(stream);
} else if(command == RTMP_AMF0_COMMAND_PUBLISH) {
srs_info("decode the AMF0/AMF3 command(publish message).");
packet = new SrsPublishPacket();
return packet->decode(stream);
} else if(command == RTMP_AMF0_COMMAND_UNPUBLISH) {
srs_info("decode the AMF0/AMF3 command(unpublish message).");
packet = new SrsFMLEStartPacket();
return packet->decode(stream);
} else if(command == RTMP_AMF0_DATA_SET_DATAFRAME || command == RTMP_AMF0_DATA_ON_METADATA) {
srs_info("decode the AMF0/AMF3 data(onMetaData message).");
packet = new SrsOnMetaDataPacket();
return packet->decode(stream);
} else if(command == SRS_BW_CHECK_FINISHED
|| command == SRS_BW_CHECK_PLAYING
|| command == SRS_BW_CHECK_PUBLISHING
|| command == SRS_BW_CHECK_STARTING_PLAY
|| command == SRS_BW_CHECK_STARTING_PUBLISH
|| command == SRS_BW_CHECK_START_PLAY
|| command == SRS_BW_CHECK_START_PUBLISH
|| command == SRS_BW_CHECK_STOPPED_PLAY
|| command == SRS_BW_CHECK_STOP_PLAY
|| command == SRS_BW_CHECK_STOP_PUBLISH
|| command == SRS_BW_CHECK_STOPPED_PUBLISH
|| command == SRS_BW_CHECK_FLASH_FINAL)
{
srs_info("decode the AMF0/AMF3 band width check message.");
packet = new SrsBandwidthPacket();
return packet->decode(stream);
} else if (command == RTMP_AMF0_COMMAND_CLOSE_STREAM) {
srs_info("decode the AMF0/AMF3 closeStream message.");
packet = new SrsCloseStreamPacket();
return packet->decode(stream);
}
// default packet to drop message.
srs_trace("drop the AMF0/AMF3 command message, command_name=%s", command.c_str());
packet = new SrsPacket();
return ret;
} else if(header.is_user_control_message()) {
srs_verbose("start to decode user control message.");
packet = new SrsUserControlPacket();
return packet->decode(stream);
} else if(header.is_window_ackledgement_size()) {
srs_verbose("start to decode set ack window size message.");
packet = new SrsSetWindowAckSizePacket();
return packet->decode(stream);
} else if(header.is_set_chunk_size()) {
srs_verbose("start to decode set chunk size message.");
packet = new SrsSetChunkSizePacket();
return packet->decode(stream);
} else {
// default packet to drop message.
srs_trace("drop the unknown message, type=%d", header.message_type);
packet = new SrsPacket();
}
return ret;
message_type = RTMP_MSG_AudioMessage;
payload_length = (int32_t)size;
timestamp_delta = (int32_t)time;
timestamp = (int64_t)time;
stream_id = (int32_t)stream;
}
bool SrsCommonMessage::has_packet()
void SrsMessageHeader::initialize_video(int size, u_int32_t time, int stream)
{
return packet != NULL;
message_type = RTMP_MSG_VideoMessage;
payload_length = (int32_t)size;
timestamp_delta = (int32_t)time;
timestamp = (int64_t)time;
stream_id = (int32_t)stream;
}
SrsPacket* SrsCommonMessage::get_packet()
__SrsChunkStream::__SrsChunkStream(int _cid)
{
if (!packet) {
srs_error("the payload is raw/undecoded, invoke decode_packet to decode it.");
}
srs_assert(packet != NULL);
return packet;
fmt = 0;
cid = _cid;
extended_timestamp = false;
msg = NULL;
msg_count = 0;
}
int SrsCommonMessage::get_perfer_cid()
__SrsChunkStream::~__SrsChunkStream()
{
if (!packet) {
return RTMP_CID_ProtocolControl;
}
// we donot use the complex basic header,
// ensure the basic header is 1bytes.
if (packet->get_perfer_cid() < 2) {
return packet->get_perfer_cid();
}
return packet->get_perfer_cid();
srs_freep(msg);
}
SrsCommonMessage* SrsCommonMessage::set_packet(SrsPacket* pkt, int stream_id)
__SrsMessage::__SrsMessage()
{
srs_freep(packet);
packet = pkt;
header.message_type = packet->get_message_type();
header.payload_length = packet->get_payload_length();
header.stream_id = stream_id;
return this;
payload = NULL;
size = 0;
}
int SrsCommonMessage::encode_packet()
__SrsMessage::~__SrsMessage()
{
int ret = ERROR_SUCCESS;
// sometimes, for example, the edge proxy,
// the payload is not decoded, so directly sent out.
if (payload != NULL) {
header.payload_length = size;
return ret;
}
// encode packet to payload and size.
if (packet == NULL) {
srs_warn("packet is empty, send out empty message.");
return ret;
}
// realloc the payload.
size = 0;
srs_freepa(payload);
if ((ret = packet->encode(size, (char*&)payload)) != ERROR_SUCCESS) {
return ret;
}
header.payload_length = size;
return ret;
}
SrsSharedPtrMessage::SrsSharedPtr::SrsSharedPtr()
__SrsSharedPtrMessage::__SrsSharedPtr::__SrsSharedPtr()
{
payload = NULL;
size = 0;
perfer_cid = 0;
shared_count = 0;
}
SrsSharedPtrMessage::SrsSharedPtr::~SrsSharedPtr()
__SrsSharedPtrMessage::__SrsSharedPtr::~__SrsSharedPtr()
{
srs_freepa(payload);
}
SrsSharedPtrMessage::SrsSharedPtrMessage()
__SrsSharedPtrMessage::__SrsSharedPtrMessage()
{
ptr = NULL;
}
SrsSharedPtrMessage::~SrsSharedPtrMessage()
__SrsSharedPtrMessage::~__SrsSharedPtrMessage()
{
if (ptr) {
if (ptr->shared_count == 0) {
... ... @@ -2678,12 +1526,7 @@ SrsSharedPtrMessage::~SrsSharedPtrMessage()
}
}
bool SrsSharedPtrMessage::can_decode()
{
return false;
}
int SrsSharedPtrMessage::initialize(SrsCommonMessage* source)
int __SrsSharedPtrMessage::initialize(__SrsMessage* source)
{
int ret = ERROR_SUCCESS;
... ... @@ -2698,7 +1541,7 @@ int SrsSharedPtrMessage::initialize(SrsCommonMessage* source)
return ret;
}
int SrsSharedPtrMessage::initialize(SrsMessageHeader* source, char* payload, int size)
int __SrsSharedPtrMessage::initialize(SrsMessageHeader* source, char* payload, int size)
{
int ret = ERROR_SUCCESS;
... ... @@ -2714,27 +1557,19 @@ int SrsSharedPtrMessage::initialize(SrsMessageHeader* source, char* payload, int
header = *source;
header.payload_length = size;
ptr = new SrsSharedPtr();
ptr = new __SrsSharedPtr();
// direct attach the data of common message.
ptr->payload = payload;
ptr->size = size;
if (source->is_video()) {
ptr->perfer_cid = RTMP_CID_Video;
} else if (source->is_audio()) {
ptr->perfer_cid = RTMP_CID_Audio;
} else {
ptr->perfer_cid = RTMP_CID_OverConnection2;
}
ISrsMessage::payload = (int8_t*)ptr->payload;
ISrsMessage::size = ptr->size;
__SrsMessage::payload = (int8_t*)ptr->payload;
__SrsMessage::size = ptr->size;
return ret;
}
SrsSharedPtrMessage* SrsSharedPtrMessage::copy()
__SrsSharedPtrMessage* __SrsSharedPtrMessage::copy()
{
if (!ptr) {
srs_error("invoke initialize to initialize the ptr.");
... ... @@ -2742,7 +1577,7 @@ SrsSharedPtrMessage* SrsSharedPtrMessage::copy()
return NULL;
}
SrsSharedPtrMessage* copy = new SrsSharedPtrMessage();
__SrsSharedPtrMessage* copy = new __SrsSharedPtrMessage();
copy->header = header;
... ... @@ -2755,21 +1590,6 @@ SrsSharedPtrMessage* SrsSharedPtrMessage::copy()
return copy;
}
int SrsSharedPtrMessage::get_perfer_cid()
{
if (!ptr) {
return 0;
}
return ptr->perfer_cid;
}
int SrsSharedPtrMessage::encode_packet()
{
srs_verbose("shared message ignore the encode method.");
return ERROR_SUCCESS;
}
SrsPacket::SrsPacket()
{
}
... ...
... ... @@ -40,11 +40,8 @@ class ISrsProtocolReaderWriter;
class SrsBuffer;
class SrsPacket;
class SrsStream;
class SrsCommonMessage;
class SrsChunkStream;
class SrsAmf0Object;
class SrsAmf0Any;
class ISrsMessage;
class SrsMessageHeader;
class __SrsMessage;
class __SrsChunkStream;
... ... @@ -112,7 +109,6 @@ private:
std::map<double, std::string> requests;
// peer in
private:
std::map<int, SrsChunkStream*> chunk_streams;
// TODO: FIXME: rename to chunk_streams
std::map<int, __SrsChunkStream*> __chunk_streams;
SrsStream* decode_stream;
... ... @@ -220,58 +216,9 @@ private:
* when message sentout, update the context.
*/
virtual int __on_send_message(__SrsMessage* msg, SrsPacket* packet);
public:
/**
* recv a message with raw/undecoded payload from peer.
* the payload is not decoded, use srs_rtmp_expect_message<T> if requires
* specifies message.
* @pmsg, user must free it. NULL if not success.
* @remark, only when success, user can use and must free the pmsg.
*/
virtual int recv_message(SrsCommonMessage** pmsg);
/**
* send out message with encoded payload to peer.
* use the message encode method to encode to payload,
* then sendout over socket.
* @msg this method will free it whatever return value.
*/
virtual int send_message(ISrsMessage* msg);
private:
/**
* when recv message, update the context.
*/
virtual int on_recv_message(SrsCommonMessage* msg);
virtual int response_acknowledgement_message();
virtual int response_ping_message(int32_t timestamp);
/**
* when message sentout, update the context.
*/
virtual int on_send_message(ISrsMessage* msg);
/**
* try to recv interlaced message from peer,
* return error if error occur and nerver set the pmsg,
* return success and pmsg set to NULL if no entire message got,
* return success and pmsg set to entire message if got one.
*/
virtual int recv_interlaced_message(SrsCommonMessage** pmsg);
/**
* read the chunk basic header(fmt, cid) from chunk stream.
* user can discovery a SrsChunkStream by cid.
* @bh_size return the chunk basic header size, to remove the used bytes when finished.
*/
virtual int read_basic_header(char& fmt, int& cid, int& bh_size);
/**
* read the chunk message header(timestamp, payload_length, message_type, stream_id)
* from chunk stream and save to SrsChunkStream.
* @mh_size return the chunk message header size, to remove the used bytes when finished.
*/
virtual int read_message_header(SrsChunkStream* chunk, char fmt, int bh_size, int& mh_size);
/**
* read the chunk payload, remove the used bytes in buffer,
* if got entire message, set the pmsg.
* @payload_size read size in this roundtrip, generally a chunk size or left message size.
*/
virtual int read_message_payload(SrsChunkStream* chunk, int bh_size, int mh_size, int& payload_size, SrsCommonMessage** pmsg);
};
/**
... ... @@ -342,44 +289,6 @@ public:
* incoming chunk stream maybe interlaced,
* use the chunk stream to cache the input RTMP chunk streams.
*/
class SrsChunkStream
{
public:
/**
* represents the basic header fmt,
* which used to identify the variant message header type.
*/
char fmt;
/**
* represents the basic header cid,
* which is the chunk stream id.
*/
int cid;
/**
* cached message header
*/
SrsMessageHeader header;
/**
* whether the chunk message header has extended timestamp.
*/
bool extended_timestamp;
/**
* partially read message.
*/
SrsCommonMessage* msg;
/**
* decoded msg count, to identify whether the chunk stream is fresh.
*/
int64_t msg_count;
public:
SrsChunkStream(int _cid);
virtual ~SrsChunkStream();
};
/**
* incoming chunk stream maybe interlaced,
* use the chunk stream to cache the input RTMP chunk streams.
*/
class __SrsChunkStream
{
public:
... ... @@ -480,156 +389,6 @@ public:
};
/**
* message to output.
*/
class ISrsMessage
{
// 4.1. Message Header
public:
SrsMessageHeader header;
// 4.2. Message Payload
public:
/**
* The other part which is the payload is the actual data that is
* contained in the message. For example, it could be some audio samples
* or compressed video data. The payload format and interpretation are
* beyond the scope of this document.
*/
int32_t size;
int8_t* payload;
public:
ISrsMessage();
virtual ~ISrsMessage();
public:
/**
* whether message canbe decoded.
* only update the context when message canbe decoded.
*/
virtual bool can_decode() = 0;
/**
* encode functions.
*/
public:
/**
* get the perfered cid(chunk stream id) which sendout over.
*/
virtual int get_perfer_cid() = 0;
/**
* encode the packet to message payload bytes.
* @remark there exists empty packet, so maybe the payload is NULL.
*/
virtual int encode_packet() = 0;
};
/**
* common RTMP message defines in rtmp.part2.Message-Formats.pdf.
* cannbe parse and decode.
*/
class SrsCommonMessage : public ISrsMessage
{
private:
disable_default_copy(SrsCommonMessage);
// decoded message payload.
private:
SrsStream* stream;
SrsPacket* packet;
public:
SrsCommonMessage();
virtual ~SrsCommonMessage();
public:
virtual bool can_decode();
/**
* decode functions.
*/
public:
/**
* decode packet from message payload.
*/
// TODO: use protocol to decode it.
virtual int decode_packet(SrsProtocol* protocol);
/**
* whether msg has decoded packet.
*/
virtual bool has_packet();
/**
* get the decoded packet which decoded by decode_packet().
* @remark, user never free the pkt, the message will auto free it.
*/
virtual SrsPacket* get_packet();
/**
* encode functions.
*/
public:
/**
* get the perfered cid(chunk stream id) which sendout over.
*/
virtual int get_perfer_cid();
/**
* set the encoded packet to encode_packet() to payload.
* @stream_id, the id of stream which is created by createStream.
* @remark, user never free the pkt, the message will auto free it.
* @return message itself.
*/
// TODO: refine the send methods.
virtual SrsCommonMessage* set_packet(SrsPacket* pkt, int stream_id);
/**
* encode the packet to message payload bytes.
* @remark there exists empty packet, so maybe the payload is NULL.
*/
virtual int encode_packet();
};
/**
* shared ptr message.
* for audio/video/data message that need less memory copy.
* and only for output.
*/
class SrsSharedPtrMessage : public ISrsMessage
{
private:
struct SrsSharedPtr
{
char* payload;
int size;
int perfer_cid;
int shared_count;
SrsSharedPtr();
virtual ~SrsSharedPtr();
};
SrsSharedPtr* ptr;
public:
SrsSharedPtrMessage();
virtual ~SrsSharedPtrMessage();
public:
virtual bool can_decode();
public:
/**
* set the shared payload.
* we will detach the payload of source,
* so ensure donot use it before.
*/
virtual int initialize(SrsCommonMessage* source);
/**
* set the shared payload.
* use source header, and specified param payload.
*/
virtual int initialize(SrsMessageHeader* source, char* payload, int size);
virtual SrsSharedPtrMessage* copy();
public:
/**
* get the perfered cid(chunk stream id) which sendout over.
*/
virtual int get_perfer_cid();
/**
* ignored.
* for shared message, nothing should be done.
* use initialize() to set the data.
*/
virtual int encode_packet();
};
/**
* the decoded message payload.
* @remark we seperate the packet from message,
* for the packet focus on logic and domain data,
... ... @@ -1374,44 +1133,6 @@ protected:
* if need to set timeout, use set timeout of SrsProtocol.
*/
template<class T>
int srs_rtmp_expect_message(SrsProtocol* protocol, SrsCommonMessage** pmsg, T** ppacket)
{
*pmsg = NULL;
*ppacket = NULL;
int ret = ERROR_SUCCESS;
while (true) {
SrsCommonMessage* msg = NULL;
if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {
srs_error("recv message failed. ret=%d", ret);
return ret;
}
srs_verbose("recv message success.");
if ((ret = msg->decode_packet(protocol)) != ERROR_SUCCESS) {
delete msg;
srs_error("decode message failed. ret=%d", ret);
return ret;
}
T* pkt = dynamic_cast<T*>(msg->get_packet());
if (!pkt) {
delete msg;
srs_trace("drop message(type=%d, size=%d, time=%"PRId64", sid=%d).",
msg->header.message_type, msg->header.payload_length,
msg->header.timestamp, msg->header.stream_id);
continue;
}
*pmsg = msg;
*ppacket = pkt;
break;
}
return ret;
}
template<class T>
int __srs_rtmp_expect_message(SrsProtocol* protocol, __SrsMessage** pmsg, T** ppacket)
{
*pmsg = NULL;
... ...