正在显示
6 个修改的文件
包含
102 行增加
和
202 行删除
| @@ -179,8 +179,10 @@ int SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec) | @@ -179,8 +179,10 @@ int SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec) | ||
| 179 | { | 179 | { |
| 180 | int ret = ERROR_SUCCESS; | 180 | int ret = ERROR_SUCCESS; |
| 181 | 181 | ||
| 182 | - if ((ret = sdk->connect(output, SRS_CONSTS_RTMP_RECV_TIMEOUT_US)) != ERROR_SUCCESS) { | ||
| 183 | - srs_error("flv: connect %s failed. ret=%d", output.c_str(), ret); | 182 | + int64_t cto = SRS_CONSTS_RTMP_TIMEOUT_US; |
| 183 | + int64_t sto = SRS_CONSTS_RTMP_PULSE_TIMEOUT_US; | ||
| 184 | + if ((ret = sdk->connect(output, cto, sto)) != ERROR_SUCCESS) { | ||
| 185 | + srs_error("flv: connect %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", output.c_str(), cto, sto, ret); | ||
| 184 | return ret; | 186 | return ret; |
| 185 | } | 187 | } |
| 186 | 188 |
| @@ -125,7 +125,7 @@ int SrsEdgeIngester::cycle() | @@ -125,7 +125,7 @@ int SrsEdgeIngester::cycle() | ||
| 125 | 125 | ||
| 126 | source->on_source_id_changed(_srs_context->get_id()); | 126 | source->on_source_id_changed(_srs_context->get_id()); |
| 127 | 127 | ||
| 128 | - std::string url, vhost; | 128 | + std::string url; |
| 129 | if (true) { | 129 | if (true) { |
| 130 | SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost); | 130 | SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost); |
| 131 | 131 | ||
| @@ -139,22 +139,22 @@ int SrsEdgeIngester::cycle() | @@ -139,22 +139,22 @@ int SrsEdgeIngester::cycle() | ||
| 139 | } | 139 | } |
| 140 | 140 | ||
| 141 | // select the origin. | 141 | // select the origin. |
| 142 | - if (true) { | ||
| 143 | - std::string server = lb->select(conf->args); | ||
| 144 | - int port = SRS_CONSTS_RTMP_DEFAULT_PORT; | ||
| 145 | - srs_parse_hostport(server, server, port); | ||
| 146 | - | ||
| 147 | - url = srs_generate_rtmp_url(server, port, req->vhost, req->app, req->stream); | ||
| 148 | - } | 142 | + std::string server = lb->select(conf->args); |
| 143 | + int port = SRS_CONSTS_RTMP_DEFAULT_PORT; | ||
| 144 | + srs_parse_hostport(server, server, port); | ||
| 149 | 145 | ||
| 150 | // support vhost tranform for edge, | 146 | // support vhost tranform for edge, |
| 151 | // @see https://github.com/simple-rtmp-server/srs/issues/372 | 147 | // @see https://github.com/simple-rtmp-server/srs/issues/372 |
| 152 | - vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost); | 148 | + std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost); |
| 153 | vhost = srs_string_replace(vhost, "[vhost]", req->vhost); | 149 | vhost = srs_string_replace(vhost, "[vhost]", req->vhost); |
| 150 | + | ||
| 151 | + url = srs_generate_rtmp_url(server, port, vhost, req->app, req->stream); | ||
| 154 | } | 152 | } |
| 155 | 153 | ||
| 156 | - if ((ret = sdk->connect(url, vhost, SRS_CONSTS_RTMP_TIMEOUT_US)) != ERROR_SUCCESS) { | ||
| 157 | - srs_error("edge pull %s failed. ret=%d", url.c_str(), ret); | 154 | + int64_t cto = SRS_EDGE_INGESTER_TIMEOUT_US; |
| 155 | + int64_t sto = SRS_CONSTS_RTMP_PULSE_TIMEOUT_US; | ||
| 156 | + if ((ret = sdk->connect(url, cto, sto)) != ERROR_SUCCESS) { | ||
| 157 | + srs_error("edge pull %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", url.c_str(), cto, sto, ret); | ||
| 158 | return ret; | 158 | return ret; |
| 159 | } | 159 | } |
| 160 | 160 | ||
| @@ -180,8 +180,6 @@ int SrsEdgeIngester::ingest() | @@ -180,8 +180,6 @@ int SrsEdgeIngester::ingest() | ||
| 180 | { | 180 | { |
| 181 | int ret = ERROR_SUCCESS; | 181 | int ret = ERROR_SUCCESS; |
| 182 | 182 | ||
| 183 | - sdk->set_recv_timeout(SRS_EDGE_INGESTER_TIMEOUT_US); | ||
| 184 | - | ||
| 185 | SrsPithyPrint* pprint = SrsPithyPrint::create_edge(); | 183 | SrsPithyPrint* pprint = SrsPithyPrint::create_edge(); |
| 186 | SrsAutoFree(SrsPithyPrint, pprint); | 184 | SrsAutoFree(SrsPithyPrint, pprint); |
| 187 | 185 | ||
| @@ -271,27 +269,24 @@ int SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg) | @@ -271,27 +269,24 @@ int SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg) | ||
| 271 | 269 | ||
| 272 | SrsEdgeForwarder::SrsEdgeForwarder() | 270 | SrsEdgeForwarder::SrsEdgeForwarder() |
| 273 | { | 271 | { |
| 274 | - transport = new SrsTcpClient(); | ||
| 275 | - kbps = new SrsKbps(); | ||
| 276 | - client = NULL; | ||
| 277 | - _edge = NULL; | ||
| 278 | - _req = NULL; | 272 | + edge = NULL; |
| 273 | + req = NULL; | ||
| 274 | + send_error_code = ERROR_SUCCESS; | ||
| 275 | + | ||
| 276 | + sdk = new SrsSimpleRtmpClient(); | ||
| 279 | lb = new SrsLbRoundRobin(); | 277 | lb = new SrsLbRoundRobin(); |
| 280 | - stream_id = 0; | ||
| 281 | pthread = new SrsReusableThread2("edge-fwr", this, SRS_EDGE_FORWARDER_SLEEP_US); | 278 | pthread = new SrsReusableThread2("edge-fwr", this, SRS_EDGE_FORWARDER_SLEEP_US); |
| 282 | queue = new SrsMessageQueue(); | 279 | queue = new SrsMessageQueue(); |
| 283 | - send_error_code = ERROR_SUCCESS; | ||
| 284 | } | 280 | } |
| 285 | 281 | ||
| 286 | SrsEdgeForwarder::~SrsEdgeForwarder() | 282 | SrsEdgeForwarder::~SrsEdgeForwarder() |
| 287 | { | 283 | { |
| 288 | stop(); | 284 | stop(); |
| 289 | 285 | ||
| 290 | - srs_freep(transport); | 286 | + srs_freep(sdk); |
| 291 | srs_freep(lb); | 287 | srs_freep(lb); |
| 292 | srs_freep(pthread); | 288 | srs_freep(pthread); |
| 293 | srs_freep(queue); | 289 | srs_freep(queue); |
| 294 | - srs_freep(kbps); | ||
| 295 | } | 290 | } |
| 296 | 291 | ||
| 297 | void SrsEdgeForwarder::set_queue_size(double queue_size) | 292 | void SrsEdgeForwarder::set_queue_size(double queue_size) |
| @@ -299,13 +294,13 @@ void SrsEdgeForwarder::set_queue_size(double queue_size) | @@ -299,13 +294,13 @@ void SrsEdgeForwarder::set_queue_size(double queue_size) | ||
| 299 | return queue->set_queue_size(queue_size); | 294 | return queue->set_queue_size(queue_size); |
| 300 | } | 295 | } |
| 301 | 296 | ||
| 302 | -int SrsEdgeForwarder::initialize(SrsSource* source, SrsPublishEdge* edge, SrsRequest* req) | 297 | +int SrsEdgeForwarder::initialize(SrsSource* s, SrsPublishEdge* e, SrsRequest* r) |
| 303 | { | 298 | { |
| 304 | int ret = ERROR_SUCCESS; | 299 | int ret = ERROR_SUCCESS; |
| 305 | 300 | ||
| 306 | - _source = source; | ||
| 307 | - _edge = edge; | ||
| 308 | - _req = req; | 301 | + source = s; |
| 302 | + edge = e; | ||
| 303 | + req = r; | ||
| 309 | 304 | ||
| 310 | return ret; | 305 | return ret; |
| 311 | } | 306 | } |
| @@ -314,36 +309,37 @@ int SrsEdgeForwarder::start() | @@ -314,36 +309,37 @@ int SrsEdgeForwarder::start() | ||
| 314 | { | 309 | { |
| 315 | int ret = ERROR_SUCCESS; | 310 | int ret = ERROR_SUCCESS; |
| 316 | 311 | ||
| 312 | + // reset the error code. | ||
| 317 | send_error_code = ERROR_SUCCESS; | 313 | send_error_code = ERROR_SUCCESS; |
| 318 | 314 | ||
| 319 | - std::string ep_server; | ||
| 320 | - int ep_port; | ||
| 321 | - if ((ret = connect_server(ep_server, ep_port)) != ERROR_SUCCESS) { | ||
| 322 | - return ret; | 315 | + std::string url; |
| 316 | + if (true) { | ||
| 317 | + SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost); | ||
| 318 | + srs_assert(conf); | ||
| 319 | + | ||
| 320 | + // select the origin. | ||
| 321 | + std::string server = lb->select(conf->args); | ||
| 322 | + int port = SRS_CONSTS_RTMP_DEFAULT_PORT; | ||
| 323 | + srs_parse_hostport(server, server, port); | ||
| 324 | + | ||
| 325 | + // support vhost tranform for edge, | ||
| 326 | + // @see https://github.com/simple-rtmp-server/srs/issues/372 | ||
| 327 | + std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost); | ||
| 328 | + vhost = srs_string_replace(vhost, "[vhost]", req->vhost); | ||
| 329 | + | ||
| 330 | + url = srs_generate_rtmp_url(server, port, vhost, req->app, req->stream); | ||
| 323 | } | 331 | } |
| 324 | - srs_assert(client); | ||
| 325 | - | ||
| 326 | - client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US); | ||
| 327 | - client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US); | ||
| 328 | - | ||
| 329 | - SrsRequest* req = _req; | ||
| 330 | 332 | ||
| 331 | - if ((ret = client->handshake()) != ERROR_SUCCESS) { | ||
| 332 | - srs_error("handshake with server failed. ret=%d", ret); | ||
| 333 | - return ret; | ||
| 334 | - } | ||
| 335 | - if ((ret = connect_app(ep_server, ep_port)) != ERROR_SUCCESS) { | ||
| 336 | - srs_error("connect with server failed. ret=%d", ret); | ||
| 337 | - return ret; | ||
| 338 | - } | ||
| 339 | - if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) { | ||
| 340 | - srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret); | 333 | + // open socket. |
| 334 | + int64_t cto = SRS_EDGE_FORWARDER_TIMEOUT_US; | ||
| 335 | + int64_t sto = SRS_CONSTS_RTMP_TIMEOUT_US; | ||
| 336 | + if ((ret = sdk->connect(url, cto, sto)) != ERROR_SUCCESS) { | ||
| 337 | + srs_warn("edge push %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", url.c_str(), cto, sto, ret); | ||
| 341 | return ret; | 338 | return ret; |
| 342 | } | 339 | } |
| 343 | 340 | ||
| 344 | - if ((ret = client->publish(req->stream, stream_id)) != ERROR_SUCCESS) { | ||
| 345 | - srs_error("publish failed, stream=%s, stream_id=%d. ret=%d", | ||
| 346 | - req->stream.c_str(), stream_id, ret); | 341 | + if ((ret = sdk->publish()) != ERROR_SUCCESS) { |
| 342 | + srs_error("edge push publish failed. ret=%d", ret); | ||
| 347 | return ret; | 343 | return ret; |
| 348 | } | 344 | } |
| 349 | 345 | ||
| @@ -353,12 +349,8 @@ int SrsEdgeForwarder::start() | @@ -353,12 +349,8 @@ int SrsEdgeForwarder::start() | ||
| 353 | void SrsEdgeForwarder::stop() | 349 | void SrsEdgeForwarder::stop() |
| 354 | { | 350 | { |
| 355 | pthread->stop(); | 351 | pthread->stop(); |
| 356 | - transport->close(); | ||
| 357 | - | 352 | + sdk->close(); |
| 358 | queue->clear(); | 353 | queue->clear(); |
| 359 | - | ||
| 360 | - srs_freep(client); | ||
| 361 | - kbps->set_io(NULL, NULL); | ||
| 362 | } | 354 | } |
| 363 | 355 | ||
| 364 | #define SYS_MAX_EDGE_SEND_MSGS 128 | 356 | #define SYS_MAX_EDGE_SEND_MSGS 128 |
| @@ -366,7 +358,7 @@ int SrsEdgeForwarder::cycle() | @@ -366,7 +358,7 @@ int SrsEdgeForwarder::cycle() | ||
| 366 | { | 358 | { |
| 367 | int ret = ERROR_SUCCESS; | 359 | int ret = ERROR_SUCCESS; |
| 368 | 360 | ||
| 369 | - client->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); | 361 | + sdk->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); |
| 370 | 362 | ||
| 371 | SrsPithyPrint* pprint = SrsPithyPrint::create_edge(); | 363 | SrsPithyPrint* pprint = SrsPithyPrint::create_edge(); |
| 372 | SrsAutoFree(SrsPithyPrint, pprint); | 364 | SrsAutoFree(SrsPithyPrint, pprint); |
| @@ -382,7 +374,7 @@ int SrsEdgeForwarder::cycle() | @@ -382,7 +374,7 @@ int SrsEdgeForwarder::cycle() | ||
| 382 | // read from client. | 374 | // read from client. |
| 383 | if (true) { | 375 | if (true) { |
| 384 | SrsCommonMessage* msg = NULL; | 376 | SrsCommonMessage* msg = NULL; |
| 385 | - ret = client->recv_message(&msg); | 377 | + ret = sdk->recv_message(&msg); |
| 386 | 378 | ||
| 387 | srs_verbose("edge loop recv message. ret=%d", ret); | 379 | srs_verbose("edge loop recv message. ret=%d", ret); |
| 388 | if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) { | 380 | if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) { |
| @@ -406,22 +398,17 @@ int SrsEdgeForwarder::cycle() | @@ -406,22 +398,17 @@ int SrsEdgeForwarder::cycle() | ||
| 406 | 398 | ||
| 407 | // pithy print | 399 | // pithy print |
| 408 | if (pprint->can_print()) { | 400 | if (pprint->can_print()) { |
| 409 | - kbps->sample(); | ||
| 410 | - srs_trace("-> "SRS_CONSTS_LOG_EDGE_PUBLISH | ||
| 411 | - " time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d", | ||
| 412 | - pprint->age(), count, | ||
| 413 | - kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(), | ||
| 414 | - kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m()); | 401 | + sdk->kbps_sample(SRS_CONSTS_LOG_EDGE_PUBLISH, pprint->age(), count); |
| 415 | } | 402 | } |
| 416 | 403 | ||
| 417 | // ignore when no messages. | 404 | // ignore when no messages. |
| 418 | if (count <= 0) { | 405 | if (count <= 0) { |
| 419 | - srs_verbose("no packets to push."); | 406 | + srs_verbose("edge no packets to push."); |
| 420 | continue; | 407 | continue; |
| 421 | } | 408 | } |
| 422 | 409 | ||
| 423 | // sendout messages, all messages are freed by send_and_free_messages(). | 410 | // sendout messages, all messages are freed by send_and_free_messages(). |
| 424 | - if ((ret = client->send_and_free_messages(msgs.msgs, count, stream_id)) != ERROR_SUCCESS) { | 411 | + if ((ret = sdk->send_and_free_messages(msgs.msgs, count)) != ERROR_SUCCESS) { |
| 425 | srs_error("edge publish push message to server failed. ret=%d", ret); | 412 | srs_error("edge publish push message to server failed. ret=%d", ret); |
| 426 | return ret; | 413 | return ret; |
| 427 | } | 414 | } |
| @@ -456,7 +443,7 @@ int SrsEdgeForwarder::proxy(SrsCommonMessage* msg) | @@ -456,7 +443,7 @@ int SrsEdgeForwarder::proxy(SrsCommonMessage* msg) | ||
| 456 | } | 443 | } |
| 457 | srs_verbose("initialize shared ptr msg success."); | 444 | srs_verbose("initialize shared ptr msg success."); |
| 458 | 445 | ||
| 459 | - copy.stream_id = stream_id; | 446 | + copy.stream_id = sdk->sid(); |
| 460 | if ((ret = queue->enqueue(copy.copy())) != ERROR_SUCCESS) { | 447 | if ((ret = queue->enqueue(copy.copy())) != ERROR_SUCCESS) { |
| 461 | srs_error("enqueue edge publish msg failed. ret=%d", ret); | 448 | srs_error("enqueue edge publish msg failed. ret=%d", ret); |
| 462 | } | 449 | } |
| @@ -464,105 +451,6 @@ int SrsEdgeForwarder::proxy(SrsCommonMessage* msg) | @@ -464,105 +451,6 @@ int SrsEdgeForwarder::proxy(SrsCommonMessage* msg) | ||
| 464 | return ret; | 451 | return ret; |
| 465 | } | 452 | } |
| 466 | 453 | ||
| 467 | -int SrsEdgeForwarder::connect_server(string& ep_server, int& ep_port) | ||
| 468 | -{ | ||
| 469 | - int ret = ERROR_SUCCESS; | ||
| 470 | - | ||
| 471 | - // reopen | ||
| 472 | - transport->close(); | ||
| 473 | - | ||
| 474 | - SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(_req->vhost); | ||
| 475 | - srs_assert(conf); | ||
| 476 | - | ||
| 477 | - // select the origin. | ||
| 478 | - if (true) { | ||
| 479 | - std::string server = lb->select(conf->args); | ||
| 480 | - ep_port = SRS_CONSTS_RTMP_DEFAULT_PORT; | ||
| 481 | - srs_parse_hostport(server, ep_server, ep_port); | ||
| 482 | - } | ||
| 483 | - | ||
| 484 | - // open socket. | ||
| 485 | - int64_t timeout = SRS_EDGE_FORWARDER_TIMEOUT_US; | ||
| 486 | - if ((ret = transport->connect(ep_server, ep_port, timeout)) != ERROR_SUCCESS) { | ||
| 487 | - srs_warn("edge push failed, stream=%s, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d", | ||
| 488 | - _req->stream.c_str(), _req->tcUrl.c_str(), ep_server.c_str(), ep_port, timeout, ret); | ||
| 489 | - return ret; | ||
| 490 | - } | ||
| 491 | - | ||
| 492 | - srs_freep(client); | ||
| 493 | - client = new SrsRtmpClient(transport); | ||
| 494 | - | ||
| 495 | - kbps->set_io(transport, transport); | ||
| 496 | - | ||
| 497 | - // open socket. | ||
| 498 | - srs_trace("edge push connected, stream=%s, tcUrl=%s to server=%s, port=%d", | ||
| 499 | - _req->stream.c_str(), _req->tcUrl.c_str(), ep_server.c_str(), ep_port); | ||
| 500 | - | ||
| 501 | - return ret; | ||
| 502 | -} | ||
| 503 | - | ||
| 504 | -// TODO: FIXME: refine the connect_app. | ||
| 505 | -int SrsEdgeForwarder::connect_app(string ep_server, int ep_port) | ||
| 506 | -{ | ||
| 507 | - int ret = ERROR_SUCCESS; | ||
| 508 | - | ||
| 509 | - SrsRequest* req = _req; | ||
| 510 | - | ||
| 511 | - // args of request takes the srs info. | ||
| 512 | - if (req->args == NULL) { | ||
| 513 | - req->args = SrsAmf0Any::object(); | ||
| 514 | - } | ||
| 515 | - | ||
| 516 | - // notify server the edge identity, | ||
| 517 | - // @see https://github.com/simple-rtmp-server/srs/issues/147 | ||
| 518 | - SrsAmf0Object* data = req->args; | ||
| 519 | - data->set("srs_sig", SrsAmf0Any::str(RTMP_SIG_SRS_KEY)); | ||
| 520 | - data->set("srs_server", SrsAmf0Any::str(RTMP_SIG_SRS_SERVER)); | ||
| 521 | - data->set("srs_license", SrsAmf0Any::str(RTMP_SIG_SRS_LICENSE)); | ||
| 522 | - data->set("srs_role", SrsAmf0Any::str(RTMP_SIG_SRS_ROLE)); | ||
| 523 | - data->set("srs_url", SrsAmf0Any::str(RTMP_SIG_SRS_URL)); | ||
| 524 | - data->set("srs_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION)); | ||
| 525 | - data->set("srs_site", SrsAmf0Any::str(RTMP_SIG_SRS_WEB)); | ||
| 526 | - data->set("srs_email", SrsAmf0Any::str(RTMP_SIG_SRS_EMAIL)); | ||
| 527 | - data->set("srs_copyright", SrsAmf0Any::str(RTMP_SIG_SRS_COPYRIGHT)); | ||
| 528 | - data->set("srs_primary", SrsAmf0Any::str(RTMP_SIG_SRS_PRIMARY)); | ||
| 529 | - data->set("srs_authors", SrsAmf0Any::str(RTMP_SIG_SRS_AUTHROS)); | ||
| 530 | - // for edge to directly get the id of client. | ||
| 531 | - data->set("srs_pid", SrsAmf0Any::number(getpid())); | ||
| 532 | - data->set("srs_id", SrsAmf0Any::number(_srs_context->get_id())); | ||
| 533 | - | ||
| 534 | - // local ip of edge | ||
| 535 | - std::vector<std::string> ips = srs_get_local_ipv4_ips(); | ||
| 536 | - assert(_srs_config->get_stats_network() < (int)ips.size()); | ||
| 537 | - std::string local_ip = ips[_srs_config->get_stats_network()]; | ||
| 538 | - data->set("srs_server_ip", SrsAmf0Any::str(local_ip.c_str())); | ||
| 539 | - | ||
| 540 | - // support vhost tranform for edge, | ||
| 541 | - // @see https://github.com/simple-rtmp-server/srs/issues/372 | ||
| 542 | - std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost); | ||
| 543 | - vhost = srs_string_replace(vhost, "[vhost]", req->vhost); | ||
| 544 | - // generate the tcUrl | ||
| 545 | - std::string param = ""; | ||
| 546 | - std::string tc_url = srs_generate_tc_url(ep_server, vhost, req->app, ep_port, param); | ||
| 547 | - srs_trace("edge forward to %s:%d at %s", ep_server.c_str(), ep_port, tc_url.c_str()); | ||
| 548 | - | ||
| 549 | - // replace the tcUrl in request, | ||
| 550 | - // which will replace the tc_url in client.connect_app(). | ||
| 551 | - req->tcUrl = tc_url; | ||
| 552 | - | ||
| 553 | - // upnode server identity will show in the connect_app of client. | ||
| 554 | - // @see https://github.com/simple-rtmp-server/srs/issues/160 | ||
| 555 | - // the debug_srs_upnode is config in vhost and default to true. | ||
| 556 | - bool debug_srs_upnode = _srs_config->get_debug_srs_upnode(req->vhost); | ||
| 557 | - if ((ret = client->connect_app(req->app, tc_url, req, debug_srs_upnode)) != ERROR_SUCCESS) { | ||
| 558 | - srs_error("connect with server failed, tcUrl=%s, dsu=%d. ret=%d", | ||
| 559 | - tc_url.c_str(), debug_srs_upnode, ret); | ||
| 560 | - return ret; | ||
| 561 | - } | ||
| 562 | - | ||
| 563 | - return ret; | ||
| 564 | -} | ||
| 565 | - | ||
| 566 | SrsPlayEdge::SrsPlayEdge() | 454 | SrsPlayEdge::SrsPlayEdge() |
| 567 | { | 455 | { |
| 568 | state = SrsEdgeStateInit; | 456 | state = SrsEdgeStateInit; |
| @@ -109,15 +109,11 @@ private: | @@ -109,15 +109,11 @@ private: | ||
| 109 | class SrsEdgeForwarder : public ISrsReusableThread2Handler | 109 | class SrsEdgeForwarder : public ISrsReusableThread2Handler |
| 110 | { | 110 | { |
| 111 | private: | 111 | private: |
| 112 | - int stream_id; | ||
| 113 | -private: | ||
| 114 | - SrsSource* _source; | ||
| 115 | - SrsPublishEdge* _edge; | ||
| 116 | - SrsRequest* _req; | 112 | + SrsSource* source; |
| 113 | + SrsPublishEdge* edge; | ||
| 114 | + SrsRequest* req; | ||
| 117 | SrsReusableThread2* pthread; | 115 | SrsReusableThread2* pthread; |
| 118 | - SrsTcpClient* transport; | ||
| 119 | - SrsKbps* kbps; | ||
| 120 | - SrsRtmpClient* client; | 116 | + SrsSimpleRtmpClient* sdk; |
| 121 | SrsLbRoundRobin* lb; | 117 | SrsLbRoundRobin* lb; |
| 122 | /** | 118 | /** |
| 123 | * we must ensure one thread one fd principle, | 119 | * we must ensure one thread one fd principle, |
| @@ -136,7 +132,7 @@ public: | @@ -136,7 +132,7 @@ public: | ||
| 136 | public: | 132 | public: |
| 137 | virtual void set_queue_size(double queue_size); | 133 | virtual void set_queue_size(double queue_size); |
| 138 | public: | 134 | public: |
| 139 | - virtual int initialize(SrsSource* source, SrsPublishEdge* edge, SrsRequest* req); | 135 | + virtual int initialize(SrsSource* s, SrsPublishEdge* e, SrsRequest* r); |
| 140 | virtual int start(); | 136 | virtual int start(); |
| 141 | virtual void stop(); | 137 | virtual void stop(); |
| 142 | // interface ISrsReusableThread2Handler | 138 | // interface ISrsReusableThread2Handler |
| @@ -144,9 +140,6 @@ public: | @@ -144,9 +140,6 @@ public: | ||
| 144 | virtual int cycle(); | 140 | virtual int cycle(); |
| 145 | public: | 141 | public: |
| 146 | virtual int proxy(SrsCommonMessage* msg); | 142 | virtual int proxy(SrsCommonMessage* msg); |
| 147 | -private: | ||
| 148 | - virtual int connect_server(std::string& ep_server, int& ep_port); | ||
| 149 | - virtual int connect_app(std::string ep_server, int ep_port); | ||
| 150 | }; | 143 | }; |
| 151 | 144 | ||
| 152 | /** | 145 | /** |
| @@ -97,12 +97,7 @@ SrsSimpleRtmpClient::~SrsSimpleRtmpClient() | @@ -97,12 +97,7 @@ SrsSimpleRtmpClient::~SrsSimpleRtmpClient() | ||
| 97 | kbps->set_io(NULL, NULL); | 97 | kbps->set_io(NULL, NULL); |
| 98 | } | 98 | } |
| 99 | 99 | ||
| 100 | -int SrsSimpleRtmpClient::connect(string url, int64_t timeout) | ||
| 101 | -{ | ||
| 102 | - return connect(url, "", timeout); | ||
| 103 | -} | ||
| 104 | - | ||
| 105 | -int SrsSimpleRtmpClient::connect(string url, string vhost, int64_t timeout) | 100 | +int SrsSimpleRtmpClient::connect(string url, int64_t connect_timeout, int64_t stream_timeout) |
| 106 | { | 101 | { |
| 107 | int ret = ERROR_SUCCESS; | 102 | int ret = ERROR_SUCCESS; |
| 108 | 103 | ||
| @@ -113,14 +108,13 @@ int SrsSimpleRtmpClient::connect(string url, string vhost, int64_t timeout) | @@ -113,14 +108,13 @@ int SrsSimpleRtmpClient::connect(string url, string vhost, int64_t timeout) | ||
| 113 | } | 108 | } |
| 114 | 109 | ||
| 115 | // parse uri | 110 | // parse uri |
| 116 | - if (!req) { | ||
| 117 | - req = new SrsRequest(); | ||
| 118 | - srs_parse_rtmp_url(url, req->tcUrl, req->stream); | ||
| 119 | - srs_discovery_tc_url(req->tcUrl, req->schema, req->host, req->vhost, req->app, req->port, req->param); | ||
| 120 | - } | 111 | + srs_freep(req); |
| 112 | + req = new SrsRequest(); | ||
| 113 | + srs_parse_rtmp_url(url, req->tcUrl, req->stream); | ||
| 114 | + srs_discovery_tc_url(req->tcUrl, req->schema, req->host, req->vhost, req->app, req->port, req->param); | ||
| 121 | 115 | ||
| 122 | // connect host. | 116 | // connect host. |
| 123 | - if ((ret = transport->connect(req->host, req->port, timeout)) != ERROR_SUCCESS) { | 117 | + if ((ret = transport->connect(req->host, req->port, connect_timeout)) != ERROR_SUCCESS) { |
| 124 | return ret; | 118 | return ret; |
| 125 | } | 119 | } |
| 126 | 120 | ||
| @@ -129,15 +123,15 @@ int SrsSimpleRtmpClient::connect(string url, string vhost, int64_t timeout) | @@ -129,15 +123,15 @@ int SrsSimpleRtmpClient::connect(string url, string vhost, int64_t timeout) | ||
| 129 | 123 | ||
| 130 | kbps->set_io(transport, transport); | 124 | kbps->set_io(transport, transport); |
| 131 | 125 | ||
| 132 | - client->set_recv_timeout(timeout); | ||
| 133 | - client->set_send_timeout(timeout); | 126 | + client->set_recv_timeout(stream_timeout); |
| 127 | + client->set_send_timeout(stream_timeout); | ||
| 134 | 128 | ||
| 135 | // connect to vhost/app | 129 | // connect to vhost/app |
| 136 | if ((ret = client->handshake()) != ERROR_SUCCESS) { | 130 | if ((ret = client->handshake()) != ERROR_SUCCESS) { |
| 137 | srs_error("sdk: handshake with server failed. ret=%d", ret); | 131 | srs_error("sdk: handshake with server failed. ret=%d", ret); |
| 138 | return ret; | 132 | return ret; |
| 139 | } | 133 | } |
| 140 | - if ((ret = connect_app(vhost)) != ERROR_SUCCESS) { | 134 | + if ((ret = connect_app()) != ERROR_SUCCESS) { |
| 141 | srs_error("sdk: connect with server failed. ret=%d", ret); | 135 | srs_error("sdk: connect with server failed. ret=%d", ret); |
| 142 | return ret; | 136 | return ret; |
| 143 | } | 137 | } |
| @@ -149,7 +143,7 @@ int SrsSimpleRtmpClient::connect(string url, string vhost, int64_t timeout) | @@ -149,7 +143,7 @@ int SrsSimpleRtmpClient::connect(string url, string vhost, int64_t timeout) | ||
| 149 | return ret; | 143 | return ret; |
| 150 | } | 144 | } |
| 151 | 145 | ||
| 152 | -int SrsSimpleRtmpClient::connect_app(string vhost) | 146 | +int SrsSimpleRtmpClient::connect_app() |
| 153 | { | 147 | { |
| 154 | int ret = ERROR_SUCCESS; | 148 | int ret = ERROR_SUCCESS; |
| 155 | 149 | ||
| @@ -185,10 +179,7 @@ int SrsSimpleRtmpClient::connect_app(string vhost) | @@ -185,10 +179,7 @@ int SrsSimpleRtmpClient::connect_app(string vhost) | ||
| 185 | // generate the tcUrl | 179 | // generate the tcUrl |
| 186 | std::string param = ""; | 180 | std::string param = ""; |
| 187 | std::string target_vhost = req->vhost; | 181 | std::string target_vhost = req->vhost; |
| 188 | - if (vhost.empty()) { | ||
| 189 | - target_vhost = vhost; | ||
| 190 | - } | ||
| 191 | - std::string tc_url = srs_generate_tc_url(req->host, target_vhost, req->app, req->port, param); | 182 | + std::string tc_url = srs_generate_tc_url(req->host, req->vhost, req->app, req->port, param); |
| 192 | 183 | ||
| 193 | // replace the tcUrl in request, | 184 | // replace the tcUrl in request, |
| 194 | // which will replace the tc_url in client.connect_app(). | 185 | // which will replace the tc_url in client.connect_app(). |
| @@ -256,6 +247,25 @@ void SrsSimpleRtmpClient::kbps_sample(const char* label, int64_t age) | @@ -256,6 +247,25 @@ void SrsSimpleRtmpClient::kbps_sample(const char* label, int64_t age) | ||
| 256 | srs_trace("<- %s time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d", age, sr, sr30s, sr5m, rr, rr30s, rr5m); | 247 | srs_trace("<- %s time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d", age, sr, sr30s, sr5m, rr, rr30s, rr5m); |
| 257 | } | 248 | } |
| 258 | 249 | ||
| 250 | +void SrsSimpleRtmpClient::kbps_sample(const char* label, int64_t age, int msgs) | ||
| 251 | +{ | ||
| 252 | + kbps->sample(); | ||
| 253 | + | ||
| 254 | + int sr = kbps->get_send_kbps(); | ||
| 255 | + int sr30s = kbps->get_send_kbps_30s(); | ||
| 256 | + int sr5m = kbps->get_send_kbps_5m(); | ||
| 257 | + int rr = kbps->get_recv_kbps(); | ||
| 258 | + int rr30s = kbps->get_recv_kbps_30s(); | ||
| 259 | + int rr5m = kbps->get_recv_kbps_5m(); | ||
| 260 | + | ||
| 261 | + srs_trace("<- %s time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d", age, msgs, sr, sr30s, sr5m, rr, rr30s, rr5m); | ||
| 262 | +} | ||
| 263 | + | ||
| 264 | +int SrsSimpleRtmpClient::sid() | ||
| 265 | +{ | ||
| 266 | + return stream_id; | ||
| 267 | +} | ||
| 268 | + | ||
| 259 | int SrsSimpleRtmpClient::rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size) | 269 | int SrsSimpleRtmpClient::rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size) |
| 260 | { | 270 | { |
| 261 | int ret = ERROR_SUCCESS; | 271 | int ret = ERROR_SUCCESS; |
| @@ -286,6 +296,11 @@ int SrsSimpleRtmpClient::decode_message(SrsCommonMessage* msg, SrsPacket** ppack | @@ -286,6 +296,11 @@ int SrsSimpleRtmpClient::decode_message(SrsCommonMessage* msg, SrsPacket** ppack | ||
| 286 | return client->decode_message(msg, ppacket); | 296 | return client->decode_message(msg, ppacket); |
| 287 | } | 297 | } |
| 288 | 298 | ||
| 299 | +int SrsSimpleRtmpClient::send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs) | ||
| 300 | +{ | ||
| 301 | + return client->send_and_free_messages(msgs, nb_msgs, stream_id); | ||
| 302 | +} | ||
| 303 | + | ||
| 289 | void SrsSimpleRtmpClient::set_recv_timeout(int64_t timeout) | 304 | void SrsSimpleRtmpClient::set_recv_timeout(int64_t timeout) |
| 290 | { | 305 | { |
| 291 | transport->set_recv_timeout(timeout); | 306 | transport->set_recv_timeout(timeout); |
| @@ -74,20 +74,22 @@ public: | @@ -74,20 +74,22 @@ public: | ||
| 74 | SrsSimpleRtmpClient(); | 74 | SrsSimpleRtmpClient(); |
| 75 | virtual ~SrsSimpleRtmpClient(); | 75 | virtual ~SrsSimpleRtmpClient(); |
| 76 | public: | 76 | public: |
| 77 | - virtual int connect(std::string url, int64_t timeout); | ||
| 78 | - virtual int connect(std::string url, std::string vhost, int64_t timeout); | 77 | + virtual int connect(std::string url, int64_t connect_timeout, int64_t stream_timeout); |
| 79 | private: | 78 | private: |
| 80 | - virtual int connect_app(std::string vhost); | 79 | + virtual int connect_app(); |
| 81 | public: | 80 | public: |
| 82 | virtual void close(); | 81 | virtual void close(); |
| 83 | public: | 82 | public: |
| 84 | virtual int publish(); | 83 | virtual int publish(); |
| 85 | virtual int play(); | 84 | virtual int play(); |
| 86 | virtual void kbps_sample(const char* label, int64_t age); | 85 | virtual void kbps_sample(const char* label, int64_t age); |
| 86 | + virtual void kbps_sample(const char* label, int64_t age, int msgs); | ||
| 87 | + virtual int sid(); | ||
| 87 | public: | 88 | public: |
| 88 | virtual int rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size); | 89 | virtual int rtmp_write_packet(char type, u_int32_t timestamp, char* data, int size); |
| 89 | virtual int recv_message(SrsCommonMessage** pmsg); | 90 | virtual int recv_message(SrsCommonMessage** pmsg); |
| 90 | virtual int decode_message(SrsCommonMessage* msg, SrsPacket** ppacket); | 91 | virtual int decode_message(SrsCommonMessage* msg, SrsPacket** ppacket); |
| 92 | + virtual int send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs); | ||
| 91 | public: | 93 | public: |
| 92 | virtual void set_recv_timeout(int64_t timeout); | 94 | virtual void set_recv_timeout(int64_t timeout); |
| 93 | }; | 95 | }; |
| @@ -71,10 +71,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. | @@ -71,10 +71,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. | ||
| 71 | // the common io timeout, for both recv and send. | 71 | // the common io timeout, for both recv and send. |
| 72 | #define SRS_CONSTS_RTMP_TIMEOUT_US (int64_t)(30*1000*1000LL) | 72 | #define SRS_CONSTS_RTMP_TIMEOUT_US (int64_t)(30*1000*1000LL) |
| 73 | 73 | ||
| 74 | +// TODO: FIXME: remove following two macros. | ||
| 74 | // the timeout to send data to client, | 75 | // the timeout to send data to client, |
| 75 | // if timeout, close the connection. | 76 | // if timeout, close the connection. |
| 76 | #define SRS_CONSTS_RTMP_SEND_TIMEOUT_US (int64_t)(30*1000*1000LL) | 77 | #define SRS_CONSTS_RTMP_SEND_TIMEOUT_US (int64_t)(30*1000*1000LL) |
| 77 | - | ||
| 78 | // the timeout to wait client data, | 78 | // the timeout to wait client data, |
| 79 | // if timeout, close the connection. | 79 | // if timeout, close the connection. |
| 80 | #define SRS_CONSTS_RTMP_RECV_TIMEOUT_US (int64_t)(30*1000*1000LL) | 80 | #define SRS_CONSTS_RTMP_RECV_TIMEOUT_US (int64_t)(30*1000*1000LL) |
-
请 注册 或 登录 后发表评论