winlin

extract method to process publish message

@@ -351,62 +351,74 @@ int SrsClient::publish(SrsSource* source, bool is_fmle) @@ -351,62 +351,74 @@ int SrsClient::publish(SrsSource* source, bool is_fmle)
351 srs_trace("<- clock=%u, time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d", 351 srs_trace("<- clock=%u, time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d",
352 (int)(srs_get_system_time_ms()/1000), pithy_print.get_age(), rtmp->get_send_bytes(), rtmp->get_recv_bytes(), rtmp->get_send_kbps(), rtmp->get_recv_kbps()); 352 (int)(srs_get_system_time_ms()/1000), pithy_print.get_age(), rtmp->get_send_bytes(), rtmp->get_recv_bytes(), rtmp->get_send_kbps(), rtmp->get_recv_kbps());
353 } 353 }
354 -  
355 - // process audio packet  
356 - if (msg->header.is_audio() && ((ret = source->on_audio(msg)) != ERROR_SUCCESS)) {  
357 - srs_error("process audio message failed. ret=%d", ret); 354 +
  355 + if ((ret = process_publish_message(source, hls, msg, is_fmle)) != ERROR_SUCCESS) {
  356 + srs_error("process publish message failed. ret=%d", ret);
358 return ret; 357 return ret;
359 } 358 }
360 - // process video packet  
361 - if (msg->header.is_video() && ((ret = source->on_video(msg)) != ERROR_SUCCESS)) {  
362 - srs_error("process video message failed. ret=%d", ret); 359 + }
  360 +
  361 + return ret;
  362 +}
  363 +
  364 +int SrsClient::process_publish_message(SrsSource* source, SrsHLS* hls, SrsCommonMessage* msg, bool is_fmle)
  365 +{
  366 + int ret = ERROR_SUCCESS;
  367 +
  368 + // process audio packet
  369 + if (msg->header.is_audio() && ((ret = source->on_audio(msg)) != ERROR_SUCCESS)) {
  370 + srs_error("process audio message failed. ret=%d", ret);
  371 + return ret;
  372 + }
  373 + // process video packet
  374 + if (msg->header.is_video() && ((ret = source->on_video(msg)) != ERROR_SUCCESS)) {
  375 + srs_error("process video message failed. ret=%d", ret);
  376 + return ret;
  377 + }
  378 +
  379 + // process onMetaData
  380 + if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
  381 + if ((ret = msg->decode_packet()) != ERROR_SUCCESS) {
  382 + srs_error("decode onMetaData message failed. ret=%d", ret);
363 return ret; 383 return ret;
364 } 384 }
365 -  
366 - // process onMetaData  
367 - if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {  
368 - if ((ret = msg->decode_packet()) != ERROR_SUCCESS) {  
369 - srs_error("decode onMetaData message failed. ret=%d", ret); 385 +
  386 + SrsPacket* pkt = msg->get_packet();
  387 + if (dynamic_cast<SrsOnMetaDataPacket*>(pkt)) {
  388 + SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt);
  389 + if ((ret = source->on_meta_data(msg, metadata)) != ERROR_SUCCESS) {
  390 + srs_error("process onMetaData message failed. ret=%d", ret);
370 return ret; 391 return ret;
371 } 392 }
372 -  
373 - SrsPacket* pkt = msg->get_packet();  
374 - if (dynamic_cast<SrsOnMetaDataPacket*>(pkt)) {  
375 - SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt);  
376 - if ((ret = source->on_meta_data(msg, metadata)) != ERROR_SUCCESS) {  
377 - srs_error("process onMetaData message failed. ret=%d", ret);  
378 - return ret;  
379 - }  
380 - srs_trace("process onMetaData message success.");  
381 - continue;  
382 - }  
383 -  
384 - srs_trace("ignore AMF0/AMF3 data message.");  
385 - continue; 393 + srs_trace("process onMetaData message success.");
  394 + return ret;
386 } 395 }
387 396
388 - // process UnPublish event.  
389 - if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) {  
390 - if ((ret = msg->decode_packet()) != ERROR_SUCCESS) {  
391 - srs_error("decode unpublish message failed. ret=%d", ret);  
392 - return ret;  
393 - }  
394 -  
395 - // flash unpublish.  
396 - if (!is_fmle) {  
397 - srs_trace("flash publish finished.");  
398 - return ret;  
399 - } 397 + srs_trace("ignore AMF0/AMF3 data message.");
  398 + return ret;
  399 + }
  400 +
  401 + // process UnPublish event.
  402 + if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) {
  403 + if ((ret = msg->decode_packet()) != ERROR_SUCCESS) {
  404 + srs_error("decode unpublish message failed. ret=%d", ret);
  405 + return ret;
  406 + }
400 407
401 - SrsPacket* pkt = msg->get_packet();  
402 - if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) {  
403 - SrsFMLEStartPacket* unpublish = dynamic_cast<SrsFMLEStartPacket*>(pkt);  
404 - return rtmp->fmle_unpublish(res->stream_id, unpublish->transaction_id);  
405 - }  
406 -  
407 - srs_trace("ignore AMF0/AMF3 command message.");  
408 - continue; 408 + // flash unpublish.
  409 + if (!is_fmle) {
  410 + srs_trace("flash publish finished.");
  411 + return ret;
  412 + }
  413 +
  414 + SrsPacket* pkt = msg->get_packet();
  415 + if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) {
  416 + SrsFMLEStartPacket* unpublish = dynamic_cast<SrsFMLEStartPacket*>(pkt);
  417 + return rtmp->fmle_unpublish(res->stream_id, unpublish->transaction_id);
409 } 418 }
  419 +
  420 + srs_trace("ignore AMF0/AMF3 command message.");
  421 + return ret;
410 } 422 }
411 423
412 return ret; 424 return ret;
@@ -39,6 +39,7 @@ class SrsSource; @@ -39,6 +39,7 @@ class SrsSource;
39 class SrsRefer; 39 class SrsRefer;
40 class SrsConsumer; 40 class SrsConsumer;
41 class SrsCommonMessage; 41 class SrsCommonMessage;
  42 +class SrsHLS;
42 43
43 /** 44 /**
44 * the client provides the main logic control for RTMP clients. 45 * the client provides the main logic control for RTMP clients.
@@ -60,6 +61,7 @@ private: @@ -60,6 +61,7 @@ private:
60 virtual int check_vhost(); 61 virtual int check_vhost();
61 virtual int playing(SrsSource* source); 62 virtual int playing(SrsSource* source);
62 virtual int publish(SrsSource* source, bool is_fmle); 63 virtual int publish(SrsSource* source, bool is_fmle);
  64 + virtual int process_publish_message(SrsSource* source, SrsHLS* hls, SrsCommonMessage* msg, bool is_fmle);
63 virtual int get_peer_ip(); 65 virtual int get_peer_ip();
64 virtual int process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg); 66 virtual int process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg);
65 }; 67 };