winlin

refine kafka, simplify code.

@@ -178,41 +178,31 @@ int SrsKafkaPartition::flush(SrsKafkaPartitionCache* pc) @@ -178,41 +178,31 @@ int SrsKafkaPartition::flush(SrsKafkaPartitionCache* pc)
178 return kafka->write_messages(topic, id, *pc); 178 return kafka->write_messages(topic, id, *pc);
179 } 179 }
180 180
181 -SrsKafkaMessage::SrsKafkaMessage(int k) 181 +SrsKafkaMessage::SrsKafkaMessage(SrsKafkaProducer* p, int k, SrsJsonObject* j)
182 { 182 {
  183 + producer = p;
183 key = k; 184 key = k;
  185 + obj = j;
184 } 186 }
185 187
186 SrsKafkaMessage::~SrsKafkaMessage() 188 SrsKafkaMessage::~SrsKafkaMessage()
187 { 189 {
  190 + srs_freep(obj);
188 } 191 }
189 192
190 -SrsKafkaMessageOnClient::SrsKafkaMessageOnClient(SrsKafkaProducer* p, int k, SrsListenerType t, string i)  
191 - : SrsKafkaMessage(k)  
192 -{  
193 - producer = p;  
194 - type = t;  
195 - ip = i;  
196 -}  
197 -  
198 -SrsKafkaMessageOnClient::~SrsKafkaMessageOnClient()  
199 -{  
200 -}  
201 -  
202 -int SrsKafkaMessageOnClient::call() 193 +int SrsKafkaMessage::call()
203 { 194 {
204 - SrsJsonObject* obj = SrsJsonAny::object(); 195 + int ret = producer->send(key, obj);
205 196
206 - obj->set("msg", SrsJsonAny::str("accept"));  
207 - obj->set("type", SrsJsonAny::integer(type));  
208 - obj->set("ip", SrsJsonAny::str(ip.c_str())); 197 + // the obj is manged by producer now.
  198 + obj = NULL;
209 199
210 - return producer->send(key, obj); 200 + return ret;
211 } 201 }
212 202
213 -string SrsKafkaMessageOnClient::to_string() 203 +string SrsKafkaMessage::to_string()
214 { 204 {
215 - return ip; 205 + return "kafka";
216 } 206 }
217 207
218 SrsKafkaCache::SrsKafkaCache() 208 SrsKafkaCache::SrsKafkaCache()
@@ -393,7 +383,13 @@ void SrsKafkaProducer::stop() @@ -393,7 +383,13 @@ void SrsKafkaProducer::stop()
393 383
394 int SrsKafkaProducer::on_client(int key, SrsListenerType type, string ip) 384 int SrsKafkaProducer::on_client(int key, SrsListenerType type, string ip)
395 { 385 {
396 - return worker->execute(new SrsKafkaMessageOnClient(this, key, type, ip)); 386 + SrsJsonObject* obj = SrsJsonAny::object();
  387 +
  388 + obj->set("msg", SrsJsonAny::str("accept"));
  389 + obj->set("type", SrsJsonAny::integer(type));
  390 + obj->set("ip", SrsJsonAny::str(ip.c_str()));
  391 +
  392 + return worker->execute(new SrsKafkaMessage(this, key, obj));
397 } 393 }
398 394
399 int SrsKafkaProducer::send(int key, SrsJsonObject* obj) 395 int SrsKafkaProducer::send(int key, SrsJsonObject* obj)
@@ -80,21 +80,13 @@ public: @@ -80,21 +80,13 @@ public:
80 */ 80 */
81 class SrsKafkaMessage : public ISrsAsyncCallTask 81 class SrsKafkaMessage : public ISrsAsyncCallTask
82 { 82 {
83 -protected: 83 +private:
  84 + SrsKafkaProducer* producer;
84 int key; 85 int key;
  86 + SrsJsonObject* obj;
85 public: 87 public:
86 - SrsKafkaMessage(int k); 88 + SrsKafkaMessage(SrsKafkaProducer* p, int k, SrsJsonObject* j);
87 virtual ~SrsKafkaMessage(); 89 virtual ~SrsKafkaMessage();
88 -};  
89 -struct SrsKafkaMessageOnClient : public SrsKafkaMessage  
90 -{  
91 -public:  
92 - SrsKafkaProducer* producer;  
93 - SrsListenerType type;  
94 - std::string ip;  
95 -public:  
96 - SrsKafkaMessageOnClient(SrsKafkaProducer* p, int k, SrsListenerType t, std::string i);  
97 - virtual ~SrsKafkaMessageOnClient();  
98 // interface ISrsAsyncCallTask 90 // interface ISrsAsyncCallTask
99 public: 91 public:
100 virtual int call(); 92 virtual int call();