winlin

fix the log cid display error, merge the publish recv thread log to publish connection.

@@ -57,6 +57,20 @@ int SrsThreadContext::get_id() @@ -57,6 +57,20 @@ int SrsThreadContext::get_id()
57 return cache[st_thread_self()]; 57 return cache[st_thread_self()];
58 } 58 }
59 59
  60 +int SrsThreadContext::set_id(int v)
  61 +{
  62 + st_thread_t self = st_thread_self();
  63 +
  64 + int ov = 0;
  65 + if (cache.find(self) != cache.end()) {
  66 + ov = cache[self];
  67 + }
  68 +
  69 + cache[self] = v;
  70 +
  71 + return ov;
  72 +}
  73 +
60 // the max size of a line of log. 74 // the max size of a line of log.
61 #define LOG_MAX_SIZE 4096 75 #define LOG_MAX_SIZE 4096
62 76
@@ -53,6 +53,7 @@ public: @@ -53,6 +53,7 @@ public:
53 public: 53 public:
54 virtual int generate_id(); 54 virtual int generate_id();
55 virtual int get_id(); 55 virtual int get_id();
  56 + virtual int set_id(int v);
56 }; 57 };
57 58
58 /** 59 /**
@@ -62,6 +62,11 @@ SrsRecvThread::~SrsRecvThread() @@ -62,6 +62,11 @@ SrsRecvThread::~SrsRecvThread()
62 srs_freep(trd); 62 srs_freep(trd);
63 } 63 }
64 64
  65 +int SrsRecvThread::cid()
  66 +{
  67 + return trd->cid();
  68 +}
  69 +
65 int SrsRecvThread::start() 70 int SrsRecvThread::start()
66 { 71 {
67 return trd->start(); 72 return trd->start();
@@ -253,6 +258,7 @@ SrsPublishRecvThread::SrsPublishRecvThread( @@ -253,6 +258,7 @@ SrsPublishRecvThread::SrsPublishRecvThread(
253 recv_error_code = ERROR_SUCCESS; 258 recv_error_code = ERROR_SUCCESS;
254 _nb_msgs = 0; 259 _nb_msgs = 0;
255 error = st_cond_new(); 260 error = st_cond_new();
  261 + ncid = cid = 0;
256 262
257 req = _req; 263 req = _req;
258 mr_fd = mr_sock_fd; 264 mr_fd = mr_sock_fd;
@@ -297,9 +303,21 @@ int SrsPublishRecvThread::error_code() @@ -297,9 +303,21 @@ int SrsPublishRecvThread::error_code()
297 return recv_error_code; 303 return recv_error_code;
298 } 304 }
299 305
  306 +void SrsPublishRecvThread::set_cid(int v)
  307 +{
  308 + ncid = v;
  309 +}
  310 +
  311 +int SrsPublishRecvThread::get_cid()
  312 +{
  313 + return ncid;
  314 +}
  315 +
300 int SrsPublishRecvThread::start() 316 int SrsPublishRecvThread::start()
301 { 317 {
302 - return trd.start(); 318 + int ret = trd.start();
  319 + ncid = cid = trd.cid();
  320 + return ret;
303 } 321 }
304 322
305 void SrsPublishRecvThread::stop() 323 void SrsPublishRecvThread::stop()
@@ -352,6 +370,12 @@ int SrsPublishRecvThread::handle(SrsCommonMessage* msg) @@ -352,6 +370,12 @@ int SrsPublishRecvThread::handle(SrsCommonMessage* msg)
352 { 370 {
353 int ret = ERROR_SUCCESS; 371 int ret = ERROR_SUCCESS;
354 372
  373 + // when cid changed, change it.
  374 + if (ncid != cid) {
  375 + _srs_context->set_id(ncid);
  376 + cid = ncid;
  377 + }
  378 +
355 _nb_msgs++; 379 _nb_msgs++;
356 380
357 // log to show the time of recv thread. 381 // log to show the time of recv thread.
@@ -91,6 +91,8 @@ public: @@ -91,6 +91,8 @@ public:
91 SrsRecvThread(ISrsMessageHandler* msg_handler, SrsRtmpServer* rtmp_sdk, int timeout_ms); 91 SrsRecvThread(ISrsMessageHandler* msg_handler, SrsRtmpServer* rtmp_sdk, int timeout_ms);
92 virtual ~SrsRecvThread(); 92 virtual ~SrsRecvThread();
93 public: 93 public:
  94 + virtual int cid();
  95 +public:
94 virtual int start(); 96 virtual int start();
95 virtual void stop(); 97 virtual void stop();
96 virtual void stop_loop(); 98 virtual void stop_loop();
@@ -170,6 +172,9 @@ private: @@ -170,6 +172,9 @@ private:
170 // the error timeout cond 172 // the error timeout cond
171 // @see https://github.com/simple-rtmp-server/srs/issues/244 173 // @see https://github.com/simple-rtmp-server/srs/issues/244
172 st_cond_t error; 174 st_cond_t error;
  175 + // merged context id.
  176 + int cid;
  177 + int ncid;
173 public: 178 public:
174 SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, 179 SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk,
175 SrsRequest* _req, int mr_sock_fd, int timeout_ms, 180 SrsRequest* _req, int mr_sock_fd, int timeout_ms,
@@ -182,6 +187,8 @@ public: @@ -182,6 +187,8 @@ public:
182 virtual int wait(int timeout_ms); 187 virtual int wait(int timeout_ms);
183 virtual int64_t nb_msgs(); 188 virtual int64_t nb_msgs();
184 virtual int error_code(); 189 virtual int error_code();
  190 + virtual void set_cid(int v);
  191 + virtual int get_cid();
185 public: 192 public:
186 virtual int start(); 193 virtual int start();
187 virtual void stop(); 194 virtual void stop();
@@ -851,6 +851,11 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) @@ -851,6 +851,11 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
851 return ret; 851 return ret;
852 } 852 }
853 853
  854 + // change the isolate recv thread context id,
  855 + // merge its log to current thread.
  856 + int receive_thread_cid = trd->get_cid();
  857 + trd->set_cid(_srs_context->get_id());
  858 +
854 // initialize the publish timeout. 859 // initialize the publish timeout.
855 publish_1stpkt_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost); 860 publish_1stpkt_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost);
856 publish_normal_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost); 861 publish_normal_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost);
@@ -861,8 +866,8 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) @@ -861,8 +866,8 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
861 if (true) { 866 if (true) {
862 bool mr = _srs_config->get_mr_enabled(req->vhost); 867 bool mr = _srs_config->get_mr_enabled(req->vhost);
863 int mr_sleep = _srs_config->get_mr_sleep_ms(req->vhost); 868 int mr_sleep = _srs_config->get_mr_sleep_ms(req->vhost);
864 - srs_trace("start publish mr=%d/%d, p1stpt=%d, pnt=%d, tcp_nodelay=%d",  
865 - mr, mr_sleep, publish_1stpkt_timeout, publish_normal_timeout, tcp_nodelay); 869 + srs_trace("start publish mr=%d/%d, p1stpt=%d, pnt=%d, tcp_nodelay=%d, rtcid=%d",
  870 + mr, mr_sleep, publish_1stpkt_timeout, publish_normal_timeout, tcp_nodelay, receive_thread_cid);
866 } 871 }
867 872
868 int64_t nb_msgs = 0; 873 int64_t nb_msgs = 0;
@@ -1109,7 +1114,9 @@ int SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, SrsCommonMessag @@ -1109,7 +1114,9 @@ int SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, SrsCommonMessag
1109 res->command_object = SrsAmf0Any::null(); 1114 res->command_object = SrsAmf0Any::null();
1110 res->response = SrsAmf0Any::null(); 1115 res->response = SrsAmf0Any::null();
1111 if ((ret = rtmp->send_and_free_packet(res, 0)) != ERROR_SUCCESS) { 1116 if ((ret = rtmp->send_and_free_packet(res, 0)) != ERROR_SUCCESS) {
  1117 + if (!srs_is_system_control_error(ret) && !srs_is_client_gracefully_close(ret)) {
1112 srs_warn("response call failed. ret=%d", ret); 1118 srs_warn("response call failed. ret=%d", ret);
  1119 + }
1113 return ret; 1120 return ret;
1114 } 1121 }
1115 } 1122 }
@@ -1415,6 +1415,7 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata @@ -1415,6 +1415,7 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata
1415 if ((prop = metadata->metadata->ensure_property_number("audiocodecid")) != NULL) { 1415 if ((prop = metadata->metadata->ensure_property_number("audiocodecid")) != NULL) {
1416 ss << ", acodec=" << (int)prop->to_number(); 1416 ss << ", acodec=" << (int)prop->to_number();
1417 } 1417 }
  1418 + srs_trace("got metadata%s", ss.str().c_str());
1418 1419
1419 // add server info to metadata 1420 // add server info to metadata
1420 metadata->metadata->set("server", SrsAmf0Any::str(RTMP_SIG_SRS_SERVER)); 1421 metadata->metadata->set("server", SrsAmf0Any::str(RTMP_SIG_SRS_SERVER));
@@ -1479,7 +1480,6 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata @@ -1479,7 +1480,6 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata
1479 return ret; 1480 return ret;
1480 } 1481 }
1481 } 1482 }
1482 - srs_trace("got metadata%s", ss.str().c_str());  
1483 } 1483 }
1484 1484
1485 // copy to all forwarders 1485 // copy to all forwarders
@@ -76,4 +76,9 @@ int ISrsThreadContext::get_id() @@ -76,4 +76,9 @@ int ISrsThreadContext::get_id()
76 return 0; 76 return 0;
77 } 77 }
78 78
  79 +int ISrsThreadContext::set_id(int /*v*/)
  80 +{
  81 + return 0;
  82 +}
  83 +
79 84
@@ -95,6 +95,13 @@ public: @@ -95,6 +95,13 @@ public:
95 virtual void error(const char* tag, int context_id, const char* fmt, ...); 95 virtual void error(const char* tag, int context_id, const char* fmt, ...);
96 }; 96 };
97 97
  98 +/**
  99 + * the context id manager to identify context, for instance, the green-thread.
  100 + * usage:
  101 + * _srs_context->generate_id(); // when thread start.
  102 + * _srs_context->get_id(); // get current generated id.
  103 + * int old_id = _srs_context->set_id(1000); // set context id if need to merge thread context.
  104 + */
98 // the context for multiple clients. 105 // the context for multiple clients.
99 class ISrsThreadContext 106 class ISrsThreadContext
100 { 107 {
@@ -102,8 +109,19 @@ public: @@ -102,8 +109,19 @@ public:
102 ISrsThreadContext(); 109 ISrsThreadContext();
103 virtual ~ISrsThreadContext(); 110 virtual ~ISrsThreadContext();
104 public: 111 public:
  112 + /**
  113 + * generate the id for current context.
  114 + */
105 virtual int generate_id(); 115 virtual int generate_id();
  116 + /**
  117 + * get the generated id of current context.
  118 + */
106 virtual int get_id(); 119 virtual int get_id();
  120 + /**
  121 + * set the id of current context.
  122 + * @return the previous id value; 0 if no context.
  123 + */
  124 + virtual int set_id(int v);
107 }; 125 };
108 126
109 // user must provides a log object 127 // user must provides a log object
@@ -2633,7 +2633,9 @@ int SrsRtmpServer::identify_client(int stream_id, SrsRtmpConnType& type, string& @@ -2633,7 +2633,9 @@ int SrsRtmpServer::identify_client(int stream_id, SrsRtmpConnType& type, string&
2633 res->command_object = SrsAmf0Any::null(); 2633 res->command_object = SrsAmf0Any::null();
2634 res->response = SrsAmf0Any::null(); 2634 res->response = SrsAmf0Any::null();
2635 if ((ret = protocol->send_and_free_packet(res, 0)) != ERROR_SUCCESS) { 2635 if ((ret = protocol->send_and_free_packet(res, 0)) != ERROR_SUCCESS) {
  2636 + if (!srs_is_system_control_error(ret) && !srs_is_client_gracefully_close(ret)) {
2636 srs_warn("response call failed. ret=%d", ret); 2637 srs_warn("response call failed. ret=%d", ret);
  2638 + }
2637 return ret; 2639 return ret;
2638 } 2640 }
2639 continue; 2641 continue;