winlin

support encode amf0 packet, connect app response packet

... ... @@ -53,6 +53,12 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// User defined
#define RTMP_AMF0_Invalid 0x3F
int srs_amf0_get_object_eof_size();
int srs_amf0_read_object_eof(SrsStream* stream, SrsAmf0ObjectEOF*&);
int srs_amf0_write_object_eof(SrsStream* stream, SrsAmf0ObjectEOF*);
int srs_amf0_read_any(SrsStream* stream, SrsAmf0Any*& value);
int srs_amf0_write_any(SrsStream* stream, SrsAmf0Any* value);
SrsAmf0Any::SrsAmf0Any()
{
marker = RTMP_AMF0_Invalid;
... ... @@ -167,8 +173,6 @@ SrsAmf0Any* SrsAmf0Object::ensure_property_string(std::string name)
return prop;
}
int srs_amf0_read_object_eof(SrsStream* stream, SrsAmf0ObjectEOF*&);
int srs_amf0_read_utf8(SrsStream* stream, std::string& value)
{
int ret = ERROR_SUCCESS;
... ... @@ -213,6 +217,36 @@ int srs_amf0_read_utf8(SrsStream* stream, std::string& value)
return ret;
}
int srs_amf0_write_utf8(SrsStream* stream, std::string value)
{
int ret = ERROR_SUCCESS;
// len
if (!stream->require(2)) {
ret = ERROR_RTMP_AMF0_ENCODE;
srs_error("amf0 write string length failed. ret=%d", ret);
return ret;
}
stream->write_2bytes(value.length());
srs_verbose("amf0 write string length success. len=%d", (int)value.length());
// empty string
if (value.length() <= 0) {
srs_verbose("amf0 write empty string. ret=%d", ret);
return ret;
}
// data
if (!stream->require(value.length())) {
ret = ERROR_RTMP_AMF0_ENCODE;
srs_error("amf0 write string data failed. ret=%d", ret);
return ret;
}
stream->write_string(value);
srs_verbose("amf0 write string data success. str=%s", value.c_str());
return ret;
}
int srs_amf0_read_string(SrsStream* stream, std::string& value)
{
... ... @@ -237,6 +271,23 @@ int srs_amf0_read_string(SrsStream* stream, std::string& value)
return srs_amf0_read_utf8(stream, value);
}
int srs_amf0_write_string(SrsStream* stream, std::string value)
{
int ret = ERROR_SUCCESS;
// marker
if (!stream->require(1)) {
ret = ERROR_RTMP_AMF0_ENCODE;
srs_error("amf0 write string marker failed. ret=%d", ret);
return ret;
}
stream->write_1bytes(RTMP_AMF0_String);
srs_verbose("amf0 write string marker success");
return srs_amf0_write_utf8(stream, value);
}
int srs_amf0_read_boolean(SrsStream* stream, bool& value)
{
int ret = ERROR_SUCCESS;
... ... @@ -274,6 +325,36 @@ int srs_amf0_read_boolean(SrsStream* stream, bool& value)
return ret;
}
int srs_amf0_write_boolean(SrsStream* stream, bool value)
{
int ret = ERROR_SUCCESS;
// marker
if (!stream->require(1)) {
ret = ERROR_RTMP_AMF0_ENCODE;
srs_error("amf0 write bool marker failed. ret=%d", ret);
return ret;
}
stream->write_1bytes(RTMP_AMF0_Boolean);
srs_verbose("amf0 write bool marker success");
// value
if (!stream->require(1)) {
ret = ERROR_RTMP_AMF0_ENCODE;
srs_error("amf0 write bool value failed. ret=%d", ret);
return ret;
}
if (value) {
stream->write_1bytes(0x01);
} else {
stream->write_1bytes(0x00);
}
srs_verbose("amf0 write bool value success. value=%d", value);
return ret;
}
int srs_amf0_read_number(SrsStream* stream, double& value)
{
... ... @@ -309,6 +390,35 @@ int srs_amf0_read_number(SrsStream* stream, double& value)
return ret;
}
int srs_amf0_write_number(SrsStream* stream, double value)
{
int ret = ERROR_SUCCESS;
// marker
if (!stream->require(1)) {
ret = ERROR_RTMP_AMF0_ENCODE;
srs_error("amf0 write number marker failed. ret=%d", ret);
return ret;
}
stream->write_1bytes(RTMP_AMF0_Number);
srs_verbose("amf0 write number marker success");
// value
if (!stream->require(8)) {
ret = ERROR_RTMP_AMF0_ENCODE;
srs_error("amf0 write number value failed. ret=%d", ret);
return ret;
}
int64_t temp = 0x00;
memcpy(&temp, &value, 8);
stream->write_8bytes(temp);
srs_verbose("amf0 write number value success. value=%.2f", value);
return ret;
}
int srs_amf0_read_any(SrsStream* stream, SrsAmf0Any*& value)
{
... ... @@ -381,11 +491,66 @@ int srs_amf0_read_any(SrsStream* stream, SrsAmf0Any*& value)
return ret;
}
int srs_amf0_write_any(SrsStream* stream, SrsAmf0Any* value)
{
int ret = ERROR_SUCCESS;
srs_assert(value != NULL);
switch (value->marker) {
case RTMP_AMF0_String: {
std::string data = srs_amf0_convert<SrsAmf0String>(value)->value;
return srs_amf0_write_string(stream, data);
}
case RTMP_AMF0_Boolean: {
bool data = srs_amf0_convert<SrsAmf0Boolean>(value)->value;
return srs_amf0_write_boolean(stream, data);
}
case RTMP_AMF0_Number: {
double data = srs_amf0_convert<SrsAmf0Number>(value)->value;
return srs_amf0_write_number(stream, data);
}
case RTMP_AMF0_ObjectEnd: {
SrsAmf0ObjectEOF* p = srs_amf0_convert<SrsAmf0ObjectEOF>(value);
return srs_amf0_write_object_eof(stream, p);
}
case RTMP_AMF0_Object: {
SrsAmf0Object* p = srs_amf0_convert<SrsAmf0Object>(value);
return srs_amf0_write_object(stream, p);
}
case RTMP_AMF0_Invalid:
default: {
ret = ERROR_RTMP_AMF0_INVALID;
srs_error("invalid amf0 message type. marker=%#x, ret=%d", value->marker, ret);
return ret;
}
}
return ret;
}
int srs_amf0_read_object_eof(SrsStream* stream, SrsAmf0ObjectEOF*& value)
{
int ret = ERROR_SUCCESS;
// auto skip -2 to read the object eof.
srs_assert(stream->pos() >= 2);
stream->skip(-2);
// value
if (!stream->require(2)) {
ret = ERROR_RTMP_AMF0_DECODE;
srs_error("amf0 read object eof value failed. ret=%d", ret);
return ret;
}
int16_t temp = stream->read_2bytes();
if (temp != 0x00) {
ret = ERROR_RTMP_AMF0_DECODE;
srs_error("amf0 read object eof value check failed. "
"must be 0x00, actual is %#x, ret=%d", temp, ret);
return ret;
}
// marker
if (!stream->require(1)) {
ret = ERROR_RTMP_AMF0_DECODE;
... ... @@ -402,9 +567,36 @@ int srs_amf0_read_object_eof(SrsStream* stream, SrsAmf0ObjectEOF*& value)
}
srs_verbose("amf0 read object eof marker success");
// value
value = new SrsAmf0ObjectEOF();
srs_verbose("amf0 read object eof marker success");
srs_verbose("amf0 read object eof success");
return ret;
}
int srs_amf0_write_object_eof(SrsStream* stream, SrsAmf0ObjectEOF* value)
{
int ret = ERROR_SUCCESS;
srs_assert(value != NULL);
// value
if (!stream->require(2)) {
ret = ERROR_RTMP_AMF0_ENCODE;
srs_error("amf0 write object eof value failed. ret=%d", ret);
return ret;
}
stream->write_2bytes(0x00);
srs_verbose("amf0 write object eof value success");
// marker
if (!stream->require(1)) {
ret = ERROR_RTMP_AMF0_ENCODE;
srs_error("amf0 write object eof marker failed. ret=%d", ret);
return ret;
}
stream->write_1bytes(RTMP_AMF0_ObjectEnd);
srs_verbose("amf0 read object eof success");
return ret;
}
... ... @@ -462,3 +654,108 @@ int srs_amf0_read_object(SrsStream* stream, SrsAmf0Object*& value)
return ret;
}
int srs_amf0_write_object(SrsStream* stream, SrsAmf0Object* value)
{
int ret = ERROR_SUCCESS;
srs_assert(value != NULL);
// marker
if (!stream->require(1)) {
ret = ERROR_RTMP_AMF0_ENCODE;
srs_error("amf0 write object marker failed. ret=%d", ret);
return ret;
}
stream->write_1bytes(RTMP_AMF0_Object);
srs_verbose("amf0 write object marker success");
// value
std::map<std::string, SrsAmf0Any*>::iterator it;
for (it = value->properties.begin(); it != value->properties.end(); ++it) {
std::string name = it->first;
SrsAmf0Any* any = it->second;
if ((ret = srs_amf0_write_utf8(stream, name)) != ERROR_SUCCESS) {
srs_error("write object property name failed. ret=%d", ret);
return ret;
}
if ((ret = srs_amf0_write_any(stream, any)) != ERROR_SUCCESS) {
srs_error("write object property value failed. ret=%d", ret);
return ret;
}
srs_verbose("write amf0 property success. name=%s", name.c_str());
}
if ((ret = srs_amf0_write_object_eof(stream, &value->eof)) != ERROR_SUCCESS) {
srs_error("write object eof failed. ret=%d", ret);
return ret;
}
srs_verbose("write amf0 object success.");
return ret;
}
int srs_amf0_get_utf8_size(std::string value)
{
return 2 + value.length();
}
int srs_amf0_get_string_size(std::string value)
{
return 1 + srs_amf0_get_utf8_size(value);
}
int srs_amf0_get_number_size()
{
return 1 + 8;
}
int srs_amf0_get_boolean_size()
{
return 1 + 1;
}
int srs_amf0_get_object_size(SrsAmf0Object* obj)
{
if (!obj) {
return 0;
}
int size = 1;
std::map<std::string, SrsAmf0Any*>::iterator it;
for (it = obj->properties.begin(); it != obj->properties.end(); ++it) {
std::string name = it->first;
SrsAmf0Any* value = it->second;
size += srs_amf0_get_utf8_size(name);
if (value->is_boolean()) {
size += srs_amf0_get_boolean_size();
} else if (value->is_number()) {
size += srs_amf0_get_number_size();
} else if (value->is_string()) {
SrsAmf0String* p = srs_amf0_convert<SrsAmf0String>(value);
size += srs_amf0_get_string_size(p->value);
} else if (value->is_object()) {
SrsAmf0Object* p = srs_amf0_convert<SrsAmf0Object>(value);
size += srs_amf0_get_object_size(p);
} else {
// TOOD: other AMF0 types.
srs_warn("ignore unkown AMF0 type size.");
}
}
size += srs_amf0_get_object_eof_size();
return size;
}
int srs_amf0_get_object_eof_size()
{
return 2 + 1;
}
... ...
... ... @@ -140,6 +140,7 @@ struct SrsAmf0Object : public SrsAmf0Any
* @remark only support UTF8-1 char.
*/
extern int srs_amf0_read_utf8(SrsStream* stream, std::string& value);
extern int srs_amf0_write_utf8(SrsStream* stream, std::string value);
/**
* read amf0 string from stream.
... ... @@ -147,6 +148,7 @@ extern int srs_amf0_read_utf8(SrsStream* stream, std::string& value);
* string-type = string-marker UTF-8
*/
extern int srs_amf0_read_string(SrsStream* stream, std::string& value);
extern int srs_amf0_write_string(SrsStream* stream, std::string value);
/**
* read amf0 boolean from stream.
... ... @@ -155,6 +157,7 @@ extern int srs_amf0_read_string(SrsStream* stream, std::string& value);
* 0 is false, <> 0 is true
*/
extern int srs_amf0_read_boolean(SrsStream* stream, bool& value);
extern int srs_amf0_write_boolean(SrsStream* stream, bool value);
/**
* read amf0 number from stream.
... ... @@ -162,6 +165,7 @@ extern int srs_amf0_read_boolean(SrsStream* stream, bool& value);
* number-type = number-marker DOUBLE
*/
extern int srs_amf0_read_number(SrsStream* stream, double& value);
extern int srs_amf0_write_number(SrsStream* stream, double value);
/**
* read amf0 object from stream.
... ... @@ -170,6 +174,16 @@ extern int srs_amf0_read_number(SrsStream* stream, double& value);
* object-property = (UTF-8 value-type) | (UTF-8-empty object-end-marker)
*/
extern int srs_amf0_read_object(SrsStream* stream, SrsAmf0Object*& value);
extern int srs_amf0_write_object(SrsStream* stream, SrsAmf0Object* value);
/**
* get amf0 objects size.
*/
extern int srs_amf0_get_utf8_size(std::string value);
extern int srs_amf0_get_string_size(std::string value);
extern int srs_amf0_get_number_size();
extern int srs_amf0_get_boolean_size();
extern int srs_amf0_get_object_size(SrsAmf0Object* obj);
/**
* convert the any to specified object.
... ...
... ... @@ -58,6 +58,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define ERROR_RTMP_REQ_TCURL 306
#define ERROR_RTMP_MESSAGE_DECODE 307
#define ERROR_RTMP_MESSAGE_ENCODE 308
#define ERROR_RTMP_AMF0_ENCODE 309
#define ERROR_SYSTEM_STREAM_INIT 400
#define ERROR_SYSTEM_PACKET_INVALID 401
... ...
... ... @@ -1010,6 +1010,10 @@ SrsConnectAppPacket::SrsConnectAppPacket()
SrsConnectAppPacket::~SrsConnectAppPacket()
{
if (command_object) {
delete command_object;
command_object = NULL;
}
}
int SrsConnectAppPacket::decode(SrsStream* stream)
... ... @@ -1053,6 +1057,77 @@ int SrsConnectAppPacket::decode(SrsStream* stream)
return ret;
}
SrsConnectAppResPacket::SrsConnectAppResPacket()
{
command_name = RTMP_AMF0_COMMAND_CONNECT;
transaction_id = 1;
props = NULL;
info = NULL;
}
SrsConnectAppResPacket::~SrsConnectAppResPacket()
{
if (props) {
delete props;
props = NULL;
}
if (info) {
delete info;
info = NULL;
}
}
int SrsConnectAppResPacket::get_perfer_cid()
{
return RTMP_CID_OverConnection;
}
int SrsConnectAppResPacket::get_message_type()
{
return RTMP_MSG_AMF0CommandMessage;
}
int SrsConnectAppResPacket::get_size()
{
return srs_amf0_get_string_size(command_name) + srs_amf0_get_number_size()
+ srs_amf0_get_object_size(props)+ srs_amf0_get_object_size(info);
}
int SrsConnectAppResPacket::encode_packet(SrsStream* stream)
{
int ret = ERROR_SUCCESS;
if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) {
srs_error("encode command_name failed. ret=%d", ret);
return ret;
}
srs_verbose("encode command_name success.");
if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) {
srs_error("encode transaction_id failed. ret=%d", ret);
return ret;
}
srs_verbose("encode transaction_id success.");
if ((ret = srs_amf0_write_object(stream, props)) != ERROR_SUCCESS) {
srs_error("encode props failed. ret=%d", ret);
return ret;
}
srs_verbose("encode props success.");
if ((ret = srs_amf0_write_object(stream, info)) != ERROR_SUCCESS) {
srs_error("encode info failed. ret=%d", ret);
return ret;
}
srs_verbose("encode info success.");
srs_info("encode connect app response packet success.");
return ret;
}
SrsSetWindowAckSizePacket::SrsSetWindowAckSizePacket()
{
ackowledgement_window_size = 0;
... ...
... ... @@ -324,6 +324,34 @@ public:
public:
virtual int decode(SrsStream* stream);
};
/**
* response for SrsConnectAppPacket.
*/
class SrsConnectAppResPacket : public SrsPacket
{
private:
typedef SrsPacket super;
protected:
virtual const char* get_class_name()
{
return CLASS_NAME_STRING(SrsConnectAppResPacket);
}
public:
std::string command_name;
double transaction_id;
SrsAmf0Object* props;
SrsAmf0Object* info;
public:
SrsConnectAppResPacket();
virtual ~SrsConnectAppResPacket();
public:
virtual int get_perfer_cid();
public:
virtual int get_message_type();
protected:
virtual int get_size();
virtual int encode_packet(SrsStream* stream);
};
/**
* 5.5. Window Acknowledgement Size (5)
... ...
... ... @@ -78,6 +78,15 @@ void SrsStream::skip(int size)
p += size;
}
int SrsStream::pos()
{
if (empty()) {
return 0;
}
return p - bytes;
}
int8_t SrsStream::read_1bytes()
{
srs_assert(require(1));
... ... @@ -141,6 +150,22 @@ std::string SrsStream::read_string(int len)
return value;
}
void SrsStream::write_1bytes(int8_t value)
{
srs_assert(require(1));
*p++ = value;
}
void SrsStream::write_2bytes(int16_t value)
{
srs_assert(require(2));
pp = (char*)&value;
*p++ = pp[1];
*p++ = pp[0];
}
void SrsStream::write_4bytes(int32_t value)
{
srs_assert(require(4));
... ... @@ -152,10 +177,26 @@ void SrsStream::write_4bytes(int32_t value)
*p++ = pp[0];
}
void SrsStream::write_1bytes(int8_t value)
void SrsStream::write_8bytes(int64_t value)
{
srs_assert(require(1));
srs_assert(require(8));
*p++ = value;
pp = (char*)&value;
*p++ = pp[7];
*p++ = pp[6];
*p++ = pp[5];
*p++ = pp[4];
*p++ = pp[3];
*p++ = pp[2];
*p++ = pp[1];
*p++ = pp[0];
}
void SrsStream::write_string(std::string value)
{
srs_assert(require(value.length()));
memcpy(p, value.data(), value.length());
p += value.length();
}
... ...
... ... @@ -35,7 +35,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
class SrsStream
{
protected:
private:
char* p;
char* pp;
char* bytes;
... ... @@ -70,6 +70,10 @@ public:
* @size can be any value. positive to forward; nagetive to backward.
*/
virtual void skip(int size);
/**
* tell the current pos.
*/
virtual int pos();
public:
/**
* get 1bytes char from stream.
... ... @@ -93,13 +97,25 @@ public:
virtual std::string read_string(int len);
public:
/**
* write 1bytes char to stream.
*/
virtual void write_1bytes(int8_t value);
/**
* write 2bytes int to stream.
*/
virtual void write_2bytes(int16_t value);
/**
* write 4bytes int to stream.
*/
virtual void write_4bytes(int32_t value);
/**
* write 1bytes char to stream.
* write 8bytes int to stream.
*/
virtual void write_1bytes(int8_t value);
virtual void write_8bytes(int64_t value);
/**
* write string to stream
*/
virtual void write_string(std::string value);
};
#endif
\ No newline at end of file
... ...