winlin

fix the rtmp dump, parse the aggregate msg.

@@ -272,10 +272,18 @@ int main(int argc, char** argv) @@ -272,10 +272,18 @@ int main(int argc, char** argv)
272 goto rtmp_destroy; 272 goto rtmp_destroy;
273 } 273 }
274 274
  275 + // we only write some types of messages to flv file.
  276 + int is_flv_msg = type == SRS_RTMP_TYPE_AUDIO
  277 + || type == SRS_RTMP_TYPE_VIDEO || type == SRS_RTMP_TYPE_SCRIPT;
  278 +
275 if (flv) { 279 if (flv) {
276 - if (srs_flv_write_tag(flv, type, timestamp, data, size) != 0) {  
277 - srs_human_trace("dump rtmp packet failed.");  
278 - goto rtmp_destroy; 280 + if (is_flv_msg) {
  281 + if (srs_flv_write_tag(flv, type, timestamp, data, size) != 0) {
  282 + srs_human_trace("dump rtmp packet failed.");
  283 + goto rtmp_destroy;
  284 + }
  285 + } else {
  286 + srs_human_trace("drop message size=%dB", size);
279 } 287 }
280 } 288 }
281 289
@@ -70,6 +70,12 @@ struct Context @@ -70,6 +70,12 @@ struct Context
70 // extra request object for connect to server, NULL to ignore. 70 // extra request object for connect to server, NULL to ignore.
71 SrsRequest* req; 71 SrsRequest* req;
72 72
  73 + // the message received cache,
  74 + // for example, when got aggregate message,
  75 + // the context will parse to videos/audios,
  76 + // and return one by one.
  77 + std::vector<SrsCommonMessage*> msgs;
  78 +
73 SrsRtmpClient* rtmp; 79 SrsRtmpClient* rtmp;
74 SimpleSocketStream* skt; 80 SimpleSocketStream* skt;
75 int stream_id; 81 int stream_id;
@@ -106,6 +112,13 @@ struct Context @@ -106,6 +112,13 @@ struct Context
106 srs_freep(req); 112 srs_freep(req);
107 srs_freep(rtmp); 113 srs_freep(rtmp);
108 srs_freep(skt); 114 srs_freep(skt);
  115 +
  116 + std::vector<SrsCommonMessage*>::iterator it;
  117 + for (it = msgs.begin(); it != msgs.end(); ++it) {
  118 + SrsCommonMessage* msg = *it;
  119 + srs_freep(msg);
  120 + }
  121 + msgs.clear();
109 } 122 }
110 }; 123 };
111 124
@@ -799,6 +812,156 @@ int srs_rtmp_bandwidth_check(srs_rtmp_t rtmp, @@ -799,6 +812,156 @@ int srs_rtmp_bandwidth_check(srs_rtmp_t rtmp,
799 return ret; 812 return ret;
800 } 813 }
801 814
  815 +
  816 +int __srs_rtmp_on_aggregate(Context* context, SrsCommonMessage* msg)
  817 +{
  818 + int ret = ERROR_SUCCESS;
  819 +
  820 + SrsStream aggregate_stream;
  821 + SrsStream* stream = &aggregate_stream;
  822 + if ((ret = stream->initialize(msg->payload, msg->size)) != ERROR_SUCCESS) {
  823 + return ret;
  824 + }
  825 +
  826 + int delta = -1;
  827 + while (!stream->empty()) {
  828 + if (!stream->require(1)) {
  829 + ret = ERROR_RTMP_AGGREGATE;
  830 + srs_error("invalid aggregate message type. ret=%d", ret);
  831 + return ret;
  832 + }
  833 + int8_t type = stream->read_1bytes();
  834 +
  835 + if (!stream->require(3)) {
  836 + ret = ERROR_RTMP_AGGREGATE;
  837 + srs_error("invalid aggregate message size. ret=%d", ret);
  838 + return ret;
  839 + }
  840 + int32_t data_size = stream->read_3bytes();
  841 +
  842 + if (data_size < 0) {
  843 + ret = ERROR_RTMP_AGGREGATE;
  844 + srs_error("invalid aggregate message size(negative). ret=%d", ret);
  845 + return ret;
  846 + }
  847 +
  848 + if (!stream->require(3)) {
  849 + ret = ERROR_RTMP_AGGREGATE;
  850 + srs_error("invalid aggregate message time. ret=%d", ret);
  851 + return ret;
  852 + }
  853 + int32_t timestamp = stream->read_3bytes();
  854 +
  855 + if (!stream->require(1)) {
  856 + ret = ERROR_RTMP_AGGREGATE;
  857 + srs_error("invalid aggregate message time(high). ret=%d", ret);
  858 + return ret;
  859 + }
  860 + int32_t time_h = stream->read_1bytes();
  861 +
  862 + timestamp |= time_h<<24;
  863 + timestamp &= 0x7FFFFFFF;
  864 +
  865 + // adjust timestamp.
  866 + if (delta < 0) {
  867 + delta = (int)msg->header.timestamp - (int)timestamp;
  868 + }
  869 + timestamp += delta;
  870 +
  871 + if (!stream->require(3)) {
  872 + ret = ERROR_RTMP_AGGREGATE;
  873 + srs_error("invalid aggregate message stream_id. ret=%d", ret);
  874 + return ret;
  875 + }
  876 + int32_t stream_id = stream->read_3bytes();
  877 +
  878 + if (data_size > 0 && !stream->require(data_size)) {
  879 + ret = ERROR_RTMP_AGGREGATE;
  880 + srs_error("invalid aggregate message data. ret=%d", ret);
  881 + return ret;
  882 + }
  883 +
  884 + // to common message.
  885 + SrsCommonMessage __o;
  886 + SrsCommonMessage& o = __o;
  887 +
  888 + o.header.message_type = type;
  889 + o.header.payload_length = data_size;
  890 + o.header.timestamp_delta = timestamp;
  891 + o.header.timestamp = timestamp;
  892 + o.header.stream_id = stream_id;
  893 + o.header.perfer_cid = msg->header.perfer_cid;
  894 +
  895 + if (data_size > 0) {
  896 + o.size = data_size;
  897 + o.payload = new char[o.size];
  898 + stream->read_bytes(o.payload, o.size);
  899 + }
  900 +
  901 + if (!stream->require(4)) {
  902 + ret = ERROR_RTMP_AGGREGATE;
  903 + srs_error("invalid aggregate message previous tag size. ret=%d", ret);
  904 + return ret;
  905 + }
  906 + stream->read_4bytes();
  907 +
  908 + // process parsed message
  909 + SrsCommonMessage* parsed_msg = new SrsCommonMessage();
  910 + parsed_msg->header = o.header;
  911 + parsed_msg->payload = o.payload;
  912 + parsed_msg->size = o.size;
  913 + o.payload = NULL;
  914 + context->msgs.push_back(parsed_msg);
  915 + }
  916 +
  917 + return ret;
  918 +}
  919 +
  920 +int __srs_rtmp_go_packet(Context* context, SrsCommonMessage* msg,
  921 + char* type, u_int32_t* timestamp, char** data, int* size,
  922 + bool* got_msg
  923 +) {
  924 + int ret = ERROR_SUCCESS;
  925 +
  926 + // generally we got a message.
  927 + *got_msg = true;
  928 +
  929 + if (msg->header.is_audio()) {
  930 + *type = SRS_RTMP_TYPE_AUDIO;
  931 + *timestamp = (u_int32_t)msg->header.timestamp;
  932 + *data = (char*)msg->payload;
  933 + *size = (int)msg->size;
  934 + // detach bytes from packet.
  935 + msg->payload = NULL;
  936 + } else if (msg->header.is_video()) {
  937 + *type = SRS_RTMP_TYPE_VIDEO;
  938 + *timestamp = (u_int32_t)msg->header.timestamp;
  939 + *data = (char*)msg->payload;
  940 + *size = (int)msg->size;
  941 + // detach bytes from packet.
  942 + msg->payload = NULL;
  943 + } else if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
  944 + *type = SRS_RTMP_TYPE_SCRIPT;
  945 + *data = (char*)msg->payload;
  946 + *size = (int)msg->size;
  947 + // detach bytes from packet.
  948 + msg->payload = NULL;
  949 + } else if (msg->header.is_aggregate()) {
  950 + if ((ret = __srs_rtmp_on_aggregate(context, msg)) != ERROR_SUCCESS) {
  951 + return ret;
  952 + }
  953 + *got_msg = false;
  954 + } else {
  955 + *type = msg->header.message_type;
  956 + *data = (char*)msg->payload;
  957 + *size = (int)msg->size;
  958 + // detach bytes from packet.
  959 + msg->payload = NULL;
  960 + }
  961 +
  962 + return ret;
  963 +}
  964 +
802 int srs_rtmp_read_packet(srs_rtmp_t rtmp, char* type, u_int32_t* timestamp, char** data, int* size) 965 int srs_rtmp_read_packet(srs_rtmp_t rtmp, char* type, u_int32_t* timestamp, char** data, int* size)
803 { 966 {
804 *type = 0; 967 *type = 0;
@@ -813,45 +976,36 @@ int srs_rtmp_read_packet(srs_rtmp_t rtmp, char* type, u_int32_t* timestamp, char @@ -813,45 +976,36 @@ int srs_rtmp_read_packet(srs_rtmp_t rtmp, char* type, u_int32_t* timestamp, char
813 976
814 for (;;) { 977 for (;;) {
815 SrsCommonMessage* msg = NULL; 978 SrsCommonMessage* msg = NULL;
816 - if ((ret = context->rtmp->recv_message(&msg)) != ERROR_SUCCESS) { 979 +
  980 + // read from cache first.
  981 + if (!context->msgs.empty()) {
  982 + std::vector<SrsCommonMessage*>::iterator it = context->msgs.begin();
  983 + msg = *it;
  984 + context->msgs.erase(it);
  985 + }
  986 +
  987 + // read from protocol sdk.
  988 + if (!msg && (ret = context->rtmp->recv_message(&msg)) != ERROR_SUCCESS) {
817 return ret; 989 return ret;
818 } 990 }
  991 +
  992 + // no msg, try again.
819 if (!msg) { 993 if (!msg) {
820 continue; 994 continue;
821 } 995 }
822 996
823 SrsAutoFree(SrsCommonMessage, msg); 997 SrsAutoFree(SrsCommonMessage, msg);
824 998
825 - if (msg->header.is_audio()) {  
826 - *type = SRS_RTMP_TYPE_AUDIO;  
827 - *timestamp = (u_int32_t)msg->header.timestamp;  
828 - *data = (char*)msg->payload;  
829 - *size = (int)msg->size;  
830 - // detach bytes from packet.  
831 - msg->payload = NULL;  
832 - } else if (msg->header.is_video()) {  
833 - *type = SRS_RTMP_TYPE_VIDEO;  
834 - *timestamp = (u_int32_t)msg->header.timestamp;  
835 - *data = (char*)msg->payload;  
836 - *size = (int)msg->size;  
837 - // detach bytes from packet.  
838 - msg->payload = NULL;  
839 - } else if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {  
840 - *type = SRS_RTMP_TYPE_SCRIPT;  
841 - *data = (char*)msg->payload;  
842 - *size = (int)msg->size;  
843 - // detach bytes from packet.  
844 - msg->payload = NULL;  
845 - } else {  
846 - *type = msg->header.message_type;  
847 - *data = (char*)msg->payload;  
848 - *size = (int)msg->size;  
849 - // detach bytes from packet.  
850 - msg->payload = NULL; 999 + // process the got packet, if nothing, try again.
  1000 + bool got_msg;
  1001 + if ((ret = __srs_rtmp_go_packet(context, msg, type, timestamp, data, size, &got_msg)) != ERROR_SUCCESS) {
  1002 + return ret;
851 } 1003 }
852 1004
853 // got expected message. 1005 // got expected message.
854 - break; 1006 + if (got_msg) {
  1007 + break;
  1008 + }
855 } 1009 }
856 1010
857 return ret; 1011 return ret;