winlin

fix bug of edge, refine state to user state.

... ... @@ -309,6 +309,7 @@ int SrsEdgeIngester::connect_server()
SrsEdge::SrsEdge()
{
state = SrsEdgeStateInit;
user_state = SrsEdgeUserStateInit;
ingester = new SrsEdgeIngester();
}
... ... @@ -333,9 +334,10 @@ int SrsEdge::on_client_play()
int ret = ERROR_SUCCESS;
// error state.
if (state == SrsEdgeStateAborting || state == SrsEdgeStateReloading) {
if (user_state != SrsEdgeUserStateInit) {
ret = ERROR_RTMP_EDGE_PLAY_STATE;
srs_error("invalid state for client to play stream on edge. state=%d, ret=%d", state, ret);
srs_error("invalid state for client to play stream on edge. "
"state=%d, user_state=%d, ret=%d", state, user_state, ret);
return ret;
}
... ... @@ -350,22 +352,32 @@ int SrsEdge::on_client_play()
void SrsEdge::on_all_client_stop()
{
if (state == SrsEdgeStateIngestConnected) {
// when all client disconnected,
// and edge is ingesting origin stream, abort it.
if (state == SrsEdgeStatePlay || state == SrsEdgeStateIngestConnected) {
ingester->stop();
}
SrsEdgeState pstate = state;
state = SrsEdgeStateInit;
srs_trace("edge change from %d to state %d (init).", pstate, state);
return;
}
}
int SrsEdge::on_ingest_play()
{
int ret = ERROR_SUCCESS;
// when already connected(for instance, reconnect for error), ignore.
if (state == SrsEdgeStateIngestConnected) {
return ret;
}
srs_assert(state == SrsEdgeStatePlay);
SrsEdgeState pstate = state;
state = SrsEdgeStateIngestConnected;
srs_trace("edge change from %d to state %d (ingest connected).", pstate, state);
return ret;
... ...
... ... @@ -41,7 +41,7 @@ class SrsCommonMessage;
class ISrsProtocolReaderWriter;
/**
* the state of edge
* the state of edge, auto machine
*/
enum SrsEdgeState
{
... ... @@ -52,8 +52,15 @@ enum SrsEdgeState
SrsEdgeStateIngestConnected,
// publish stream to edge, forward to origin
SrsEdgeStateForwardConnected,
SrsEdgeStateAborting,
SrsEdgeStateReloading,
};
/**
* the state of edge from user, manual machine
*/
enum SrsEdgeUserState
{
SrsEdgeUserStateInit = 0,
SrsEdgeUserStateReloading = 100,
};
/**
... ... @@ -96,6 +103,7 @@ class SrsEdge
{
private:
SrsEdgeState state;
SrsEdgeUserState user_state;
SrsEdgeIngester* ingester;
public:
SrsEdge();
... ...
... ... @@ -89,6 +89,9 @@ int SrsThread::start()
return ret;
}
// we set to loop to true for thread to run.
loop = true;
// wait for cid to ready, for parent thread to get the cid.
while (_cid < 0) {
st_usleep(10 * SRS_TIME_MILLISECONDS);
... ... @@ -130,7 +133,6 @@ void SrsThread::thread_cycle()
srs_assert(handler);
handler->on_thread_start();
loop = true;
while (loop) {
if ((ret = handler->on_before_cycle()) != ERROR_SUCCESS) {
srs_warn("thread on before cycle failed, ignored and retry, ret=%d", ret);
... ...
... ... @@ -109,6 +109,7 @@ public:
* user stop the thread.
* @remark ignore any error of cycle of handler.
* @remark user can start multiple times, ignore if already started.
* @remark wait for the cid is set by thread pfn.
*/
virtual int start();
/**
... ...