winlin

for #250, use buffer to cache bytes, for system will split the udp packet.

@@ -35,6 +35,7 @@ using namespace std; @@ -35,6 +35,7 @@ using namespace std;
35 #include <srs_kernel_ts.hpp> 35 #include <srs_kernel_ts.hpp>
36 #include <srs_kernel_stream.hpp> 36 #include <srs_kernel_stream.hpp>
37 #include <srs_kernel_ts.hpp> 37 #include <srs_kernel_ts.hpp>
  38 +#include <srs_kernel_buffer.hpp>
38 39
39 #ifdef SRS_AUTO_STREAM_CASTER 40 #ifdef SRS_AUTO_STREAM_CASTER
40 41
@@ -42,11 +43,13 @@ SrsMpegtsOverUdp::SrsMpegtsOverUdp(SrsConfDirective* c) @@ -42,11 +43,13 @@ SrsMpegtsOverUdp::SrsMpegtsOverUdp(SrsConfDirective* c)
42 { 43 {
43 stream = new SrsStream(); 44 stream = new SrsStream();
44 context = new SrsTsContext(); 45 context = new SrsTsContext();
  46 + buffer = new SrsSimpleBuffer();
45 output = _srs_config->get_stream_caster_output(c); 47 output = _srs_config->get_stream_caster_output(c);
46 } 48 }
47 49
48 SrsMpegtsOverUdp::~SrsMpegtsOverUdp() 50 SrsMpegtsOverUdp::~SrsMpegtsOverUdp()
49 { 51 {
  52 + srs_freep(buffer);
50 srs_freep(stream); 53 srs_freep(stream);
51 srs_freep(context); 54 srs_freep(context);
52 } 55 }
@@ -58,16 +61,36 @@ int SrsMpegtsOverUdp::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf) @@ -58,16 +61,36 @@ int SrsMpegtsOverUdp::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf)
58 std::string peer_ip = inet_ntoa(from->sin_addr); 61 std::string peer_ip = inet_ntoa(from->sin_addr);
59 int peer_port = ntohs(from->sin_port); 62 int peer_port = ntohs(from->sin_port);
60 63
  64 + // append to buffer.
  65 + buffer->append(buf, nb_buf);
  66 +
  67 + // find the sync byte of mpegts.
  68 + char* p = buffer->bytes();
  69 + for (int i = 0; i < buffer->length(); i++) {
  70 + if (p[i] != 0x47) {
  71 + continue;
  72 + }
  73 +
  74 + if (i > 0) {
  75 + buffer->erase(i);
  76 + }
  77 + break;
  78 + }
  79 +
61 // drop ts packet when size not modulus by 188 80 // drop ts packet when size not modulus by 188
62 - if (nb_buf < SRS_TS_PACKET_SIZE || (nb_buf % SRS_TS_PACKET_SIZE) != 0) {  
63 - srs_warn("udp: drop %s:%d packet %d bytes", peer_ip.c_str(), peer_port, nb_buf); 81 + if (buffer->length() < SRS_TS_PACKET_SIZE) {
  82 + srs_info("udp: wait %s:%d packet %d/%d bytes",
  83 + peer_ip.c_str(), peer_port, nb_buf, buffer->length());
64 return ret; 84 return ret;
65 } 85 }
66 - srs_info("udp: got %s:%d packet %d bytes", peer_ip.c_str(), peer_port, nb_buf); 86 + srs_info("udp: got %s:%d packet %d/%d bytes",
  87 + peer_ip.c_str(), peer_port, nb_buf, buffer->length());
67 88
68 // use stream to parse ts packet. 89 // use stream to parse ts packet.
69 - for (int i = 0; i < nb_buf; i += SRS_TS_PACKET_SIZE) {  
70 - if ((ret = stream->initialize(buf + i, SRS_TS_PACKET_SIZE)) != ERROR_SUCCESS) { 90 + int nb_packet = buffer->length() / SRS_TS_PACKET_SIZE;
  91 + for (int i = 0; i < nb_packet; i++) {
  92 + char* p = buffer->bytes() + (i * SRS_TS_PACKET_SIZE);
  93 + if ((ret = stream->initialize(p, SRS_TS_PACKET_SIZE)) != ERROR_SUCCESS) {
71 return ret; 94 return ret;
72 } 95 }
73 96
@@ -80,6 +103,11 @@ int SrsMpegtsOverUdp::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf) @@ -80,6 +103,11 @@ int SrsMpegtsOverUdp::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf)
80 } 103 }
81 srs_info("mpegts: parse udp packet completed"); 104 srs_info("mpegts: parse udp packet completed");
82 105
  106 + // erase consumed bytes
  107 + if (nb_packet > 0) {
  108 + buffer->erase(nb_packet * SRS_TS_PACKET_SIZE);
  109 + }
  110 +
83 return ret; 111 return ret;
84 } 112 }
85 113
@@ -36,6 +36,7 @@ class sockaddr_in; @@ -36,6 +36,7 @@ class sockaddr_in;
36 class SrsStream; 36 class SrsStream;
37 class SrsTsContext; 37 class SrsTsContext;
38 class SrsConfDirective; 38 class SrsConfDirective;
  39 +class SrsSimpleBuffer;
39 40
40 #ifdef SRS_AUTO_STREAM_CASTER 41 #ifdef SRS_AUTO_STREAM_CASTER
41 42
@@ -49,6 +50,7 @@ class SrsMpegtsOverUdp : public ISrsTsHandler @@ -49,6 +50,7 @@ class SrsMpegtsOverUdp : public ISrsTsHandler
49 private: 50 private:
50 SrsStream* stream; 51 SrsStream* stream;
51 SrsTsContext* context; 52 SrsTsContext* context;
  53 + SrsSimpleBuffer* buffer;
52 std::string output; 54 std::string output;
53 public: 55 public:
54 SrsMpegtsOverUdp(SrsConfDirective* c); 56 SrsMpegtsOverUdp(SrsConfDirective* c);