winlin

fix #467, support write log to kafka. 3.0.6

@@ -259,7 +259,7 @@ The `features`, `compare`, `release` and `performance` of SRS. @@ -259,7 +259,7 @@ The `features`, `compare`, `release` and `performance` of SRS.
259 1. Support NGINX-RTMP style EXEC, read [#367][bug #367]. 259 1. Support NGINX-RTMP style EXEC, read [#367][bug #367].
260 1. Support NGINX-RTMP style dvr control module, read [#459][bug #459]. 260 1. Support NGINX-RTMP style dvr control module, read [#459][bug #459].
261 1. Support HTTP Security Raw Api, read [#459][bug #459], [#470][bug #470], [#319][bug #319]. 261 1. Support HTTP Security Raw Api, read [#459][bug #459], [#470][bug #470], [#319][bug #319].
262 -1. [dev]Support Integration with Kafka/Spark Big-Data system, read [#467][bug #467]. 262 +1. Support Integration with Kafka/Spark Big-Data system, read [#467][bug #467].
263 1. [plan]Support Origin Cluster for Load Balance and Fault Tolarence, read [#464][bug #464], [RTMP 302][bug #92]. 263 1. [plan]Support Origin Cluster for Load Balance and Fault Tolarence, read [#464][bug #464], [RTMP 302][bug #92].
264 1. [plan]Support H.265, push RTMP with H.265, delivery in HLS, read [#465][bug #465]. 264 1. [plan]Support H.265, push RTMP with H.265, delivery in HLS, read [#465][bug #465].
265 1. [plan]Support MPEG-DASH, the future streaming protocol, read [#299][bug #299]. 265 1. [plan]Support MPEG-DASH, the future streaming protocol, read [#299][bug #299].
@@ -386,6 +386,7 @@ Remark: @@ -386,6 +386,7 @@ Remark:
386 386
387 ### History 387 ### History
388 388
  389 +* v3.0, 2015-10-23, fix [#467][bug #467], support write log to kafka. 3.0.6
389 * v3.0, 2015-10-20, fix [#502][bug #502], support snapshot with http-callback or transcoder. 3.0.5 390 * v3.0, 2015-10-20, fix [#502][bug #502], support snapshot with http-callback or transcoder. 3.0.5
390 * v3.0, 2015-09-19, support amf0 and json to convert with each other. 391 * v3.0, 2015-09-19, support amf0 and json to convert with each other.
391 * v3.0, 2015-09-19, json objects support dumps to string. 392 * v3.0, 2015-09-19, json objects support dumps to string.
@@ -1279,6 +1280,7 @@ Winlin @@ -1279,6 +1280,7 @@ Winlin
1279 [bug #466]: https://github.com/simple-rtmp-server/srs/issues/466 1280 [bug #466]: https://github.com/simple-rtmp-server/srs/issues/466
1280 [bug #468]: https://github.com/simple-rtmp-server/srs/issues/468 1281 [bug #468]: https://github.com/simple-rtmp-server/srs/issues/468
1281 [bug #502]: https://github.com/simple-rtmp-server/srs/issues/502 1282 [bug #502]: https://github.com/simple-rtmp-server/srs/issues/502
  1283 +[bug #467]: https://github.com/simple-rtmp-server/srs/issues/467
1282 [bug #xxxxxxx]: https://github.com/simple-rtmp-server/srs/issues/xxxxxxx 1284 [bug #xxxxxxx]: https://github.com/simple-rtmp-server/srs/issues/xxxxxxx
1283 1285
1284 [r2.0a2]: https://github.com/simple-rtmp-server/srs/releases/tag/v2.0-a2 1286 [r2.0a2]: https://github.com/simple-rtmp-server/srs/releases/tag/v2.0-a2
@@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
31 // current release version 31 // current release version
32 #define VERSION_MAJOR 3 32 #define VERSION_MAJOR 3
33 #define VERSION_MINOR 0 33 #define VERSION_MINOR 0
34 -#define VERSION_REVISION 5 34 +#define VERSION_REVISION 6
35 35
36 // server info. 36 // server info.
37 #define RTMP_SIG_SRS_KEY "SRS" 37 #define RTMP_SIG_SRS_KEY "SRS"
@@ -39,17 +39,27 @@ ISrsCodec::~ISrsCodec() @@ -39,17 +39,27 @@ ISrsCodec::~ISrsCodec()
39 39
40 SrsBuffer::SrsBuffer() 40 SrsBuffer::SrsBuffer()
41 { 41 {
42 - p = bytes = NULL;  
43 - nb_bytes = 0; 42 + set_value(NULL, 0);
  43 +}
44 44
45 - // TODO: support both little and big endian.  
46 - srs_assert(srs_is_little_endian()); 45 +SrsBuffer::SrsBuffer(char* b, int nb_b)
  46 +{
  47 + set_value(b, nb_b);
47 } 48 }
48 49
49 SrsBuffer::~SrsBuffer() 50 SrsBuffer::~SrsBuffer()
50 { 51 {
51 } 52 }
52 53
  54 +void SrsBuffer::set_value(char* b, int nb_b)
  55 +{
  56 + p = bytes = b;
  57 + nb_bytes = nb_b;
  58 +
  59 + // TODO: support both little and big endian.
  60 + srs_assert(srs_is_little_endian());
  61 +}
  62 +
53 int SrsBuffer::initialize(char* b, int nb) 63 int SrsBuffer::initialize(char* b, int nb)
54 { 64 {
55 int ret = ERROR_SUCCESS; 65 int ret = ERROR_SUCCESS;
@@ -98,7 +98,10 @@ private: @@ -98,7 +98,10 @@ private:
98 int nb_bytes; 98 int nb_bytes;
99 public: 99 public:
100 SrsBuffer(); 100 SrsBuffer();
  101 + SrsBuffer(char* b, int nb_b);
101 virtual ~SrsBuffer(); 102 virtual ~SrsBuffer();
  103 +private:
  104 + virtual void set_value(char* b, int nb_b);
102 public: 105 public:
103 /** 106 /**
104 * initialize the stream from bytes. 107 * initialize the stream from bytes.
@@ -2141,7 +2141,7 @@ int SrsTsPayloadPSI::decode(SrsBuffer* stream, SrsTsMessage** /*ppmsg*/) @@ -2141,7 +2141,7 @@ int SrsTsPayloadPSI::decode(SrsBuffer* stream, SrsTsMessage** /*ppmsg*/)
2141 CRC_32 = stream->read_4bytes(); 2141 CRC_32 = stream->read_4bytes();
2142 2142
2143 // verify crc32. 2143 // verify crc32.
2144 - int32_t crc32 = srs_crc32(ppat, stream->pos() - pat_pos - 4); 2144 + int32_t crc32 = srs_crc32_mpegts(ppat, stream->pos() - pat_pos - 4);
2145 if (crc32 != CRC_32) { 2145 if (crc32 != CRC_32) {
2146 ret = ERROR_STREAM_CASTER_TS_CRC32; 2146 ret = ERROR_STREAM_CASTER_TS_CRC32;
2147 srs_error("ts: verify PSI crc32 failed. ret=%d", ret); 2147 srs_error("ts: verify PSI crc32 failed. ret=%d", ret);
@@ -2238,7 +2238,7 @@ int SrsTsPayloadPSI::encode(SrsBuffer* stream) @@ -2238,7 +2238,7 @@ int SrsTsPayloadPSI::encode(SrsBuffer* stream)
2238 srs_error("ts: mux PSI crc32 failed. ret=%d", ret); 2238 srs_error("ts: mux PSI crc32 failed. ret=%d", ret);
2239 return ret; 2239 return ret;
2240 } 2240 }
2241 - CRC_32 = srs_crc32(ppat, stream->pos() - pat_pos); 2241 + CRC_32 = srs_crc32_mpegts(ppat, stream->pos() - pat_pos);
2242 stream->write_4bytes(CRC_32); 2242 stream->write_4bytes(CRC_32);
2243 2243
2244 return ret; 2244 return ret;
@@ -535,7 +535,10 @@ bool srs_aac_startswith_adts(SrsBuffer* stream) @@ -535,7 +535,10 @@ bool srs_aac_startswith_adts(SrsBuffer* stream)
535 return true; 535 return true;
536 } 536 }
537 537
538 -/* 538 +// @see http://www.stmc.edu.hk/~vincent/ffmpeg_0.4.9-pre1/libavformat/mpegtsenc.c
  539 +unsigned int __mpegts_crc32(const u_int8_t *data, int len)
  540 +{
  541 + /*
539 * MPEG2 transport stream (aka DVB) mux 542 * MPEG2 transport stream (aka DVB) mux
540 * Copyright (c) 2003 Fabrice Bellard. 543 * Copyright (c) 2003 Fabrice Bellard.
541 * 544 *
@@ -553,7 +556,7 @@ bool srs_aac_startswith_adts(SrsBuffer* stream) @@ -553,7 +556,7 @@ bool srs_aac_startswith_adts(SrsBuffer* stream)
553 * License along with this library; if not, write to the Free Software 556 * License along with this library; if not, write to the Free Software
554 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 557 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
555 */ 558 */
556 -static const u_int32_t crc_table[256] = { 559 + static const u_int32_t table[256] = {
557 0x00000000, 0x04c11db7, 0x09823b6e, 0x0d4326d9, 0x130476dc, 0x17c56b6b, 560 0x00000000, 0x04c11db7, 0x09823b6e, 0x0d4326d9, 0x130476dc, 0x17c56b6b,
558 0x1a864db2, 0x1e475005, 0x2608edb8, 0x22c9f00f, 0x2f8ad6d6, 0x2b4bcb61, 561 0x1a864db2, 0x1e475005, 0x2608edb8, 0x22c9f00f, 0x2f8ad6d6, 0x2b4bcb61,
559 0x350c9b64, 0x31cd86d3, 0x3c8ea00a, 0x384fbdbd, 0x4c11db70, 0x48d0c6c7, 562 0x350c9b64, 0x31cd86d3, 0x3c8ea00a, 0x384fbdbd, 0x4c11db70, 0x48d0c6c7,
@@ -597,23 +600,98 @@ static const u_int32_t crc_table[256] = { @@ -597,23 +600,98 @@ static const u_int32_t crc_table[256] = {
597 0x89b8fd09, 0x8d79e0be, 0x803ac667, 0x84fbdbd0, 0x9abc8bd5, 0x9e7d9662, 600 0x89b8fd09, 0x8d79e0be, 0x803ac667, 0x84fbdbd0, 0x9abc8bd5, 0x9e7d9662,
598 0x933eb0bb, 0x97ffad0c, 0xafb010b1, 0xab710d06, 0xa6322bdf, 0xa2f33668, 601 0x933eb0bb, 0x97ffad0c, 0xafb010b1, 0xab710d06, 0xa6322bdf, 0xa2f33668,
599 0xbcb4666d, 0xb8757bda, 0xb5365d03, 0xb1f740b4 602 0xbcb4666d, 0xb8757bda, 0xb5365d03, 0xb1f740b4
600 -}; 603 + };
601 604
602 -// @see http://www.stmc.edu.hk/~vincent/ffmpeg_0.4.9-pre1/libavformat/mpegtsenc.c  
603 -unsigned int mpegts_crc32(const u_int8_t *data, int len)  
604 -{  
605 - register int i;  
606 - unsigned int crc = 0xffffffff; 605 + u_int32_t crc = 0xffffffff;
607 606
608 - for (i=0; i<len; i++)  
609 - crc = (crc << 8) ^ crc_table[((crc >> 24) ^ *data++) & 0xff]; 607 + for (int i=0; i<len; i++) {
  608 + crc = (crc << 8) ^ table[((crc >> 24) ^ *data++) & 0xff];
  609 + }
610 610
611 return crc; 611 return crc;
612 } 612 }
613 613
614 -u_int32_t srs_crc32(const void* buf, int size) 614 +// @see https://github.com/ETrun/crc32/blob/master/crc32.c
  615 +u_int32_t __crc32_ieee(u_int32_t init, const u_int8_t* buf, size_t nb_buf)
  616 +{
  617 + /*----------------------------------------------------------------------------*\
  618 + * CRC-32 version 2.0.0 by Craig Bruce, 2006-04-29.
  619 + *
  620 + * This program generates the CRC-32 values for the files named in the
  621 + * command-line arguments. These are the same CRC-32 values used by GZIP,
  622 + * PKZIP, and ZMODEM. The Crc32_ComputeBuf() can also be detached and
  623 + * used independently.
  624 + *
  625 + * THIS PROGRAM IS PUBLIC-DOMAIN SOFTWARE.
  626 + *
  627 + * Based on the byte-oriented implementation "File Verification Using CRC"
  628 + * by Mark R. Nelson in Dr. Dobb's Journal, May 1992, pp. 64-67.
  629 + *
  630 + * v1.0.0: original release.
  631 + * v1.0.1: fixed printf formats.
  632 + * v1.0.2: fixed something else.
  633 + * v1.0.3: replaced CRC constant table by generator function.
  634 + * v1.0.4: reformatted code, made ANSI C. 1994-12-05.
  635 + * v2.0.0: rewrote to use memory buffer & static table, 2006-04-29.
  636 + * v2.1.0: modified by Nico, 2013-04-20
  637 + \*----------------------------------------------------------------------------*/
  638 + static const u_int32_t table[256] = {
  639 + 0x00000000,0x77073096,0xEE0E612C,0x990951BA,0x076DC419,0x706AF48F,0xE963A535,
  640 + 0x9E6495A3,0x0EDB8832,0x79DCB8A4,0xE0D5E91E,0x97D2D988,0x09B64C2B,0x7EB17CBD,
  641 + 0xE7B82D07,0x90BF1D91,0x1DB71064,0x6AB020F2,0xF3B97148,0x84BE41DE,0x1ADAD47D,
  642 + 0x6DDDE4EB,0xF4D4B551,0x83D385C7,0x136C9856,0x646BA8C0,0xFD62F97A,0x8A65C9EC,
  643 + 0x14015C4F,0x63066CD9,0xFA0F3D63,0x8D080DF5,0x3B6E20C8,0x4C69105E,0xD56041E4,
  644 + 0xA2677172,0x3C03E4D1,0x4B04D447,0xD20D85FD,0xA50AB56B,0x35B5A8FA,0x42B2986C,
  645 + 0xDBBBC9D6,0xACBCF940,0x32D86CE3,0x45DF5C75,0xDCD60DCF,0xABD13D59,0x26D930AC,
  646 + 0x51DE003A,0xC8D75180,0xBFD06116,0x21B4F4B5,0x56B3C423,0xCFBA9599,0xB8BDA50F,
  647 + 0x2802B89E,0x5F058808,0xC60CD9B2,0xB10BE924,0x2F6F7C87,0x58684C11,0xC1611DAB,
  648 + 0xB6662D3D,0x76DC4190,0x01DB7106,0x98D220BC,0xEFD5102A,0x71B18589,0x06B6B51F,
  649 + 0x9FBFE4A5,0xE8B8D433,0x7807C9A2,0x0F00F934,0x9609A88E,0xE10E9818,0x7F6A0DBB,
  650 + 0x086D3D2D,0x91646C97,0xE6635C01,0x6B6B51F4,0x1C6C6162,0x856530D8,0xF262004E,
  651 + 0x6C0695ED,0x1B01A57B,0x8208F4C1,0xF50FC457,0x65B0D9C6,0x12B7E950,0x8BBEB8EA,
  652 + 0xFCB9887C,0x62DD1DDF,0x15DA2D49,0x8CD37CF3,0xFBD44C65,0x4DB26158,0x3AB551CE,
  653 + 0xA3BC0074,0xD4BB30E2,0x4ADFA541,0x3DD895D7,0xA4D1C46D,0xD3D6F4FB,0x4369E96A,
  654 + 0x346ED9FC,0xAD678846,0xDA60B8D0,0x44042D73,0x33031DE5,0xAA0A4C5F,0xDD0D7CC9,
  655 + 0x5005713C,0x270241AA,0xBE0B1010,0xC90C2086,0x5768B525,0x206F85B3,0xB966D409,
  656 + 0xCE61E49F,0x5EDEF90E,0x29D9C998,0xB0D09822,0xC7D7A8B4,0x59B33D17,0x2EB40D81,
  657 + 0xB7BD5C3B,0xC0BA6CAD,0xEDB88320,0x9ABFB3B6,0x03B6E20C,0x74B1D29A,0xEAD54739,
  658 + 0x9DD277AF,0x04DB2615,0x73DC1683,0xE3630B12,0x94643B84,0x0D6D6A3E,0x7A6A5AA8,
  659 + 0xE40ECF0B,0x9309FF9D,0x0A00AE27,0x7D079EB1,0xF00F9344,0x8708A3D2,0x1E01F268,
  660 + 0x6906C2FE,0xF762575D,0x806567CB,0x196C3671,0x6E6B06E7,0xFED41B76,0x89D32BE0,
  661 + 0x10DA7A5A,0x67DD4ACC,0xF9B9DF6F,0x8EBEEFF9,0x17B7BE43,0x60B08ED5,0xD6D6A3E8,
  662 + 0xA1D1937E,0x38D8C2C4,0x4FDFF252,0xD1BB67F1,0xA6BC5767,0x3FB506DD,0x48B2364B,
  663 + 0xD80D2BDA,0xAF0A1B4C,0x36034AF6,0x41047A60,0xDF60EFC3,0xA867DF55,0x316E8EEF,
  664 + 0x4669BE79,0xCB61B38C,0xBC66831A,0x256FD2A0,0x5268E236,0xCC0C7795,0xBB0B4703,
  665 + 0x220216B9,0x5505262F,0xC5BA3BBE,0xB2BD0B28,0x2BB45A92,0x5CB36A04,0xC2D7FFA7,
  666 + 0xB5D0CF31,0x2CD99E8B,0x5BDEAE1D,0x9B64C2B0,0xEC63F226,0x756AA39C,0x026D930A,
  667 + 0x9C0906A9,0xEB0E363F,0x72076785,0x05005713,0x95BF4A82,0xE2B87A14,0x7BB12BAE,
  668 + 0x0CB61B38,0x92D28E9B,0xE5D5BE0D,0x7CDCEFB7,0x0BDBDF21,0x86D3D2D4,0xF1D4E242,
  669 + 0x68DDB3F8,0x1FDA836E,0x81BE16CD,0xF6B9265B,0x6FB077E1,0x18B74777,0x88085AE6,
  670 + 0xFF0F6A70,0x66063BCA,0x11010B5C,0x8F659EFF,0xF862AE69,0x616BFFD3,0x166CCF45,
  671 + 0xA00AE278,0xD70DD2EE,0x4E048354,0x3903B3C2,0xA7672661,0xD06016F7,0x4969474D,
  672 + 0x3E6E77DB,0xAED16A4A,0xD9D65ADC,0x40DF0B66,0x37D83BF0,0xA9BCAE53,0xDEBB9EC5,
  673 + 0x47B2CF7F,0x30B5FFE9,0xBDBDF21C,0xCABAC28A,0x53B39330,0x24B4A3A6,0xBAD03605,
  674 + 0xCDD70693,0x54DE5729,0x23D967BF,0xB3667A2E,0xC4614AB8,0x5D681B02,0x2A6F2B94,
  675 + 0xB40BBE37,0xC30C8EA1,0x5A05DF1B,0x2D02EF8D
  676 + };
  677 +
  678 + u_int32_t crc = init ^ 0xFFFFFFFF;
  679 +
  680 + for (int i = 0; i < nb_buf; i++) {
  681 + crc = table[(crc ^ buf[i]) & 0xff] ^ (crc >> 8);
  682 + }
  683 +
  684 + return crc^0xFFFFFFFF;
  685 +}
  686 +
  687 +u_int32_t srs_crc32_mpegts(const void* buf, int size)
  688 +{
  689 + return __mpegts_crc32((const u_int8_t*)buf, size);
  690 +}
  691 +
  692 +u_int32_t srs_crc32_ieee(const void* buf, int size, u_int32_t previous)
615 { 693 {
616 - return mpegts_crc32((const u_int8_t*)buf, size); 694 + return __crc32_ieee(previous, (const u_int8_t*)buf, size);
617 } 695 }
618 696
619 /* 697 /*
@@ -122,9 +122,14 @@ extern bool srs_avc_startswith_annexb(SrsBuffer* stream, int* pnb_start_code = N @@ -122,9 +122,14 @@ extern bool srs_avc_startswith_annexb(SrsBuffer* stream, int* pnb_start_code = N
122 extern bool srs_aac_startswith_adts(SrsBuffer* stream); 122 extern bool srs_aac_startswith_adts(SrsBuffer* stream);
123 123
124 /** 124 /**
125 -* cacl the crc32 of bytes in buf.  
126 -*/  
127 -extern u_int32_t srs_crc32(const void* buf, int size); 125 +* cacl the crc32 of bytes in buf, for ffmpeg.
  126 + */
  127 +extern u_int32_t srs_crc32_mpegts(const void* buf, int size);
  128 +
  129 +/**
  130 + * calc the crc32 of bytes in buf by IEEE, for zip.
  131 + */
  132 +extern u_int32_t srs_crc32_ieee(const void* buf, int size, u_int32_t previous = 0);
128 133
129 /** 134 /**
130 * Decode a base64-encoded string. 135 * Decode a base64-encoded string.
@@ -158,20 +158,30 @@ int SrsKafkaString::decode(SrsBuffer* buf) @@ -158,20 +158,30 @@ int SrsKafkaString::decode(SrsBuffer* buf)
158 SrsKafkaBytes::SrsKafkaBytes() 158 SrsKafkaBytes::SrsKafkaBytes()
159 { 159 {
160 _size = -1; 160 _size = -1;
161 - data = NULL; 161 + _data = NULL;
162 } 162 }
163 163
164 SrsKafkaBytes::SrsKafkaBytes(const char* v, int nb_v) 164 SrsKafkaBytes::SrsKafkaBytes(const char* v, int nb_v)
165 { 165 {
166 _size = -1; 166 _size = -1;
167 - data = NULL; 167 + _data = NULL;
168 168
169 set_value(v, nb_v); 169 set_value(v, nb_v);
170 } 170 }
171 171
172 SrsKafkaBytes::~SrsKafkaBytes() 172 SrsKafkaBytes::~SrsKafkaBytes()
173 { 173 {
174 - srs_freep(data); 174 + srs_freep(_data);
  175 +}
  176 +
  177 +char* SrsKafkaBytes::data()
  178 +{
  179 + return _data;
  180 +}
  181 +
  182 +int SrsKafkaBytes::size()
  183 +{
  184 + return _size;
175 } 185 }
176 186
177 bool SrsKafkaBytes::null() 187 bool SrsKafkaBytes::null()
@@ -192,14 +202,29 @@ void SrsKafkaBytes::set_value(string v) @@ -192,14 +202,29 @@ void SrsKafkaBytes::set_value(string v)
192 void SrsKafkaBytes::set_value(const char* v, int nb_v) 202 void SrsKafkaBytes::set_value(const char* v, int nb_v)
193 { 203 {
194 // free previous data. 204 // free previous data.
195 - srs_freep(data); 205 + srs_freep(_data);
196 206
197 // copy new value to data. 207 // copy new value to data.
198 _size = (int16_t)nb_v; 208 _size = (int16_t)nb_v;
199 209
200 srs_assert(_size > 0); 210 srs_assert(_size > 0);
201 - data = new char[_size];  
202 - memcpy(data, v, _size); 211 + _data = new char[_size];
  212 + memcpy(_data, v, _size);
  213 +}
  214 +
  215 +u_int32_t SrsKafkaBytes::crc32(u_int32_t previous)
  216 +{
  217 + char bsize[4];
  218 + SrsBuffer(bsize, 4).write_4bytes(_size);
  219 +
  220 + if (_size <= 0) {
  221 + return srs_crc32_ieee(bsize, 4, previous);
  222 + }
  223 +
  224 + u_int32_t crc = srs_crc32_ieee(bsize, 4, previous);
  225 + crc = srs_crc32_ieee(_data, _size, crc);
  226 +
  227 + return crc;
203 } 228 }
204 229
205 int SrsKafkaBytes::nb_bytes() 230 int SrsKafkaBytes::nb_bytes()
@@ -227,7 +252,7 @@ int SrsKafkaBytes::encode(SrsBuffer* buf) @@ -227,7 +252,7 @@ int SrsKafkaBytes::encode(SrsBuffer* buf)
227 srs_error("kafka encode bytes data failed. ret=%d", ret); 252 srs_error("kafka encode bytes data failed. ret=%d", ret);
228 return ret; 253 return ret;
229 } 254 }
230 - buf->write_bytes(data, _size); 255 + buf->write_bytes(_data, _size);
231 256
232 return ret; 257 return ret;
233 } 258 }
@@ -255,9 +280,9 @@ int SrsKafkaBytes::decode(SrsBuffer* buf) @@ -255,9 +280,9 @@ int SrsKafkaBytes::decode(SrsBuffer* buf)
255 return ret; 280 return ret;
256 } 281 }
257 282
258 - srs_freep(data);  
259 - data = new char[_size];  
260 - buf->read_bytes(data, _size); 283 + srs_freep(_data);
  284 + _data = new char[_size];
  285 + buf->read_bytes(_data, _size);
261 286
262 return ret; 287 return ret;
263 } 288 }
@@ -523,8 +548,13 @@ int SrsKafkaRawMessage::create(SrsJsonObject* obj) @@ -523,8 +548,13 @@ int SrsKafkaRawMessage::create(SrsJsonObject* obj)
523 // dumps the json to string. 548 // dumps the json to string.
524 value->set_value(obj->dumps()); 549 value->set_value(obj->dumps());
525 550
526 - // TODO: FIXME: implements it.  
527 - crc = 0; 551 + // crc32 message.
  552 + crc = srs_crc32_ieee(&magic_byte, 1);
  553 + crc = srs_crc32_ieee(&attributes, 1, crc);
  554 + crc = key->crc32(crc);
  555 + crc = value->crc32(crc);
  556 +
  557 + srs_info("crc32 message is %#x", crc);
528 558
529 message_size = raw_message_size(); 559 message_size = raw_message_size();
530 560
@@ -1143,13 +1173,17 @@ SrsKafkaProducerRequest::~SrsKafkaProducerRequest() @@ -1143,13 +1173,17 @@ SrsKafkaProducerRequest::~SrsKafkaProducerRequest()
1143 1173
1144 int SrsKafkaProducerRequest::nb_bytes() 1174 int SrsKafkaProducerRequest::nb_bytes()
1145 { 1175 {
1146 - return 2 + 4 + topics.nb_bytes(); 1176 + return SrsKafkaRequest::nb_bytes() + 2 + 4 + topics.nb_bytes();
1147 } 1177 }
1148 1178
1149 int SrsKafkaProducerRequest::encode(SrsBuffer* buf) 1179 int SrsKafkaProducerRequest::encode(SrsBuffer* buf)
1150 { 1180 {
1151 int ret = ERROR_SUCCESS; 1181 int ret = ERROR_SUCCESS;
1152 1182
  1183 + if ((ret = SrsKafkaRequest::encode(buf)) != ERROR_SUCCESS) {
  1184 + return ret;
  1185 + }
  1186 +
1153 if (!buf->require(2 + 4)) { 1187 if (!buf->require(2 + 4)) {
1154 ret = ERROR_KAFKA_CODEC_PRODUCER; 1188 ret = ERROR_KAFKA_CODEC_PRODUCER;
1155 srs_error("kafka encode producer failed. ret=%d", ret); 1189 srs_error("kafka encode producer failed. ret=%d", ret);
@@ -1169,6 +1203,10 @@ int SrsKafkaProducerRequest::decode(SrsBuffer* buf) @@ -1169,6 +1203,10 @@ int SrsKafkaProducerRequest::decode(SrsBuffer* buf)
1169 { 1203 {
1170 int ret = ERROR_SUCCESS; 1204 int ret = ERROR_SUCCESS;
1171 1205
  1206 + if ((ret = SrsKafkaRequest::decode(buf)) != ERROR_SUCCESS) {
  1207 + return ret;
  1208 + }
  1209 +
1172 if (!buf->require(2 + 4)) { 1210 if (!buf->require(2 + 4)) {
1173 ret = ERROR_KAFKA_CODEC_PRODUCER; 1211 ret = ERROR_KAFKA_CODEC_PRODUCER;
1174 srs_error("kafka decode producer failed. ret=%d", ret); 1212 srs_error("kafka decode producer failed. ret=%d", ret);
@@ -1445,7 +1483,12 @@ int SrsKafkaClient::write_messages(std::string topic, int32_t partition, vector< @@ -1445,7 +1483,12 @@ int SrsKafkaClient::write_messages(std::string topic, int32_t partition, vector<
1445 1483
1446 partitions->message_set_size = partitions->messages.nb_bytes(); 1484 partitions->message_set_size = partitions->messages.nb_bytes();
1447 1485
1448 - // TODO: FIXME: implements it. 1486 + // write to kafka cluster.
  1487 + if ((ret = protocol->send_and_free_message(req)) != ERROR_SUCCESS) {
  1488 + srs_error("kafka write producer message failed. ret=%d", ret);
  1489 + return ret;
  1490 + }
  1491 +
1449 return ret; 1492 return ret;
1450 } 1493 }
1451 1494
@@ -96,16 +96,19 @@ class SrsKafkaBytes : public ISrsCodec @@ -96,16 +96,19 @@ class SrsKafkaBytes : public ISrsCodec
96 { 96 {
97 private: 97 private:
98 int32_t _size; 98 int32_t _size;
99 - char* data; 99 + char* _data;
100 public: 100 public:
101 SrsKafkaBytes(); 101 SrsKafkaBytes();
102 SrsKafkaBytes(const char* v, int nb_v); 102 SrsKafkaBytes(const char* v, int nb_v);
103 virtual ~SrsKafkaBytes(); 103 virtual ~SrsKafkaBytes();
104 public: 104 public:
  105 + virtual char* data();
  106 + virtual int size();
105 virtual bool null(); 107 virtual bool null();
106 virtual bool empty(); 108 virtual bool empty();
107 virtual void set_value(std::string v); 109 virtual void set_value(std::string v);
108 virtual void set_value(const char* v, int nb_v); 110 virtual void set_value(const char* v, int nb_v);
  111 + virtual u_int32_t crc32(u_int32_t previous);
109 // interface ISrsCodec 112 // interface ISrsCodec
110 public: 113 public:
111 virtual int nb_bytes(); 114 virtual int nb_bytes();