SRS流媒体服务器源码分析(一)

栏目: 服务器 · 发布时间: 6年前

内容简介:srs使用了state-threads协程库,是单线程多协程模型。这个协程的概念类似于lua的协程,都是单线程中可以创建多个协程。而golang中的goroutine协程是多线程并发的,goroutine有可能运行在同一个线程也可能在不同线程,这样就有了线程安全问题,所以需要chan通信或者mutex加锁共享资源。而srs因为是单线程多协程所以不用考虑线程安全,数据不用加锁。

线程模型

srs使用了state-threads协程库,是单线程多协程模型。

这个协程的概念类似于 lua 的协程,都是单线程中可以创建多个协程。而golang中的goroutine协程是多线程并发的,goroutine有可能运行在同一个线程也可能在不同线程,这样就有了线程安全问题,所以需要chan通信或者mutex加锁共享资源。

而srs因为是单线程多协程所以不用考虑线程安全,数据不用加锁。

主流程分析

撇掉程序启动的一些初始化和设置,直接进入:

int SrsServer::listen()
{
    int ret = ERROR_SUCCESS;
    
    if ((ret = listen_rtmp()) != ERROR_SUCCESS) {
        return ret;
    }
    
    if ((ret = listen_http_api()) != ERROR_SUCCESS) {
        return ret;
    }
    
    if ((ret = listen_http_stream()) != ERROR_SUCCESS) {
        return ret;
    }
    
    if ((ret = listen_stream_caster()) != ERROR_SUCCESS) {
        return ret;
    }
    
    return ret;
}

先看看 listen_rtmp() :

int SrsServer::listen_rtmp()
{
    int ret = ERROR_SUCCESS;
    
    // stream service port.
    std::vector<std::string> ip_ports = _srs_config->get_listens();
    srs_assert((int)ip_ports.size() > 0);
    
    close_listeners(SrsListenerRtmpStream);
    
    for (int i = 0; i < (int)ip_ports.size(); i++) {
        SrsListener* listener = new SrsStreamListener(this, SrsListenerRtmpStream);
        listeners.push_back(listener);
        
        std::string ip;
        int port;
        srs_parse_endpoint(ip_ports[i], ip, port);
        
        if ((ret = listener->listen(ip, port)) != ERROR_SUCCESS) {
            srs_error("RTMP stream listen at %s:%d failed. ret=%d", ip.c_str(), port, ret);
            return ret;
        }
    }
    
    return ret;
}

创建了 SrsStreamListener ,在 SrsStreamListener::listen 中又创建了 SrsTcpListener 进行 listen

SrsTcpListener::SrsTcpListener(ISrsTcpHandler* h, string i, int p)
{
    handler = h;
    ip = i;
    port = p;

    _fd = -1;
    _stfd = NULL;

    pthread = new SrsReusableThread("tcp", this);
}

SrsTcpListener 中创建了 pthread: SrsReusableThread

int SrsTcpListener::listen() 中调用了 pthread->start() ,协程会回调到 int SrsTcpListener::cycle()

int SrsTcpListener::cycle()
{
    int ret = ERROR_SUCCESS;
    
    st_netfd_t client_stfd = st_accept(_stfd, NULL, NULL, ST_UTIME_NO_TIMEOUT);
    
    if(client_stfd == NULL){
        // ignore error.
        if (errno != EINTR) {
            srs_error("ignore accept thread stoppped for accept client error");
        }
        return ret;
    }
    srs_verbose("get a client. fd=%d", st_netfd_fileno(client_stfd));
    
    if ((ret = handler->on_tcp_client(client_stfd)) != ERROR_SUCCESS) {
        srs_warn("accept client error. ret=%d", ret);
        return ret;
    }
    
    return ret;
}

accept 连接后,回调到 on_tcp_client

也就是 SrsStreamListener::on_tcp_client

int SrsStreamListener::on_tcp_client(st_netfd_t stfd)
{
    int ret = ERROR_SUCCESS;
    
    if ((ret = server->accept_client(type, stfd)) != ERROR_SUCCESS) {
        srs_warn("accept client error. ret=%d", ret);
        return ret;
    }

    return ret;
}
int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd)
{
...
    SrsConnection* conn = NULL;
    if (type == SrsListenerRtmpStream) {
        conn = new SrsRtmpConn(this, client_stfd);
    } else if (type == SrsListenerHttpApi) {
#ifdef SRS_AUTO_HTTP_API
        conn = new SrsHttpApi(this, client_stfd, http_api_mux);
#else
        srs_warn("close http client for server not support http-api");
        srs_close_stfd(client_stfd);
        return ret;
#endif
    } else if (type == SrsListenerHttpStream) {
#ifdef SRS_AUTO_HTTP_SERVER
        conn = new SrsResponseOnlyHttpConn(this, client_stfd, http_server);
#else
        srs_warn("close http client for server not support http-server");
        srs_close_stfd(client_stfd);
        return ret;
#endif
    } else {
        // TODO: FIXME: handler others
    }
    srs_assert(conn);
    
    // directly enqueue, the cycle thread will remove the client.
    conns.push_back(conn);
    srs_verbose("add conn to vector.");
    
    // cycle will start process thread and when finished remove the client.
    // @remark never use the conn, for it maybe destroyed.
    if ((ret = conn->start()) != ERROR_SUCCESS) {
        return ret;
    }
    srs_verbose("conn started success.");

    srs_verbose("accept client finished. conns=%d, ret=%d", (int)conns.size(), ret);
    
    return ret;
}

在上面根据type创建不同的 SrsConnectionRtmp 创建了 SrsRtmpConn ,并且加入到 std::vector<SrsConnection*> conns; 中,然后执行 conn->start()

SrsConnection 基类创建了一个协程 pthread: SrsOneCycleThread ,上面的 conn->start() 。实际上是 pthread->start()

SrsConnection::SrsConnection(IConnectionManager* cm, st_netfd_t c)
{
    id = 0;
    manager = cm;
    stfd = c;
    disposed = false;
    expired = false;
    
    // the client thread should reap itself, 
    // so we never use joinable.
    // TODO: FIXME: maybe other thread need to stop it.
    // @see: https://github.com/ossrs/srs/issues/78
    pthread = new SrsOneCycleThread("conn", this);
}

int SrsConnection::start()
{
    return pthread->start();
}

int SrsConnection::cycle() 调用了 do_cycle() ,派生类实现了这个方法。

int SrsRtmpConn::do_cycle()
{
    int ret = ERROR_SUCCESS;
    
    srs_trace("RTMP client ip=%s", ip.c_str());

    rtmp->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
    rtmp->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
    
    //正式进入rtmp握手。
    if ((ret = rtmp->handshake()) != ERROR_SUCCESS) {
        srs_error("rtmp handshake failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("rtmp handshake success");
    
    if ((ret = rtmp->connect_app(req)) != ERROR_SUCCESS) {
        srs_error("rtmp connect vhost/app failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("rtmp connect app success");
    
    // set client ip to request.
    req->ip = ip;
    
    srs_trace("connect app, "
        "tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, args=%s", 
        req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(), 
        req->schema.c_str(), req->vhost.c_str(), req->port.c_str(),
        req->app.c_str(), (req->args? "(obj)":"null"));
    
    // show client identity
    if(req->args) {
        std::string srs_version;
        std::string srs_server_ip;
        int srs_pid = 0;
        int srs_id = 0;
        
        SrsAmf0Any* prop = NULL;
        if ((prop = req->args->ensure_property_string("srs_version")) != NULL) {
            srs_version = prop->to_str();
        }
        if ((prop = req->args->ensure_property_string("srs_server_ip")) != NULL) {
            srs_server_ip = prop->to_str();
        }
        if ((prop = req->args->ensure_property_number("srs_pid")) != NULL) {
            srs_pid = (int)prop->to_number();
        }
        if ((prop = req->args->ensure_property_number("srs_id")) != NULL) {
            srs_id = (int)prop->to_number();
        }
        
        srs_info("edge-srs ip=%s, version=%s, pid=%d, id=%d", 
            srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id);
        if (srs_pid > 0) {
            srs_trace("edge-srs ip=%s, version=%s, pid=%d, id=%d", 
                srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id);
        }
    }
    
    ret = service_cycle();
    
    http_hooks_on_close();

    return ret;
}

在这儿正式进入rtmp协议处理阶段。先进行握手: rtmp->handshake() 等操作,然后进入 service_cycle();

int SrsRtmpConn::service_cycle()
{    
  ...
    while (!disposed) {
        ret = stream_service_cycle();
        
        // stream service must terminated with error, never success.
        // when terminated with success, it's user required to stop.
        if (ret == ERROR_SUCCESS) {
            continue;
        }
        
        // when not system control error, fatal error, return.
        if (!srs_is_system_control_error(ret)) {
            if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
                srs_error("stream service cycle failed. ret=%d", ret);
            }
            return ret;
        }
        
        // for republish, continue service
        if (ret == ERROR_CONTROL_REPUBLISH) {
            // set timeout to a larger value, wait for encoder to republish.
            rtmp->set_send_timeout(SRS_REPUBLISH_RECV_TIMEOUT_US);
            rtmp->set_recv_timeout(SRS_REPUBLISH_SEND_TIMEOUT_US);
            
            srs_trace("control message(unpublish) accept, retry stream service.");
            continue;
        }
        
        // for "some" system control error, 
        // logical accept and retry stream service.
        if (ret == ERROR_CONTROL_RTMP_CLOSE) {
            // TODO: FIXME: use ping message to anti-death of socket.
            // @see: https://github.com/ossrs/srs/issues/39
            // set timeout to a larger value, for user paused.
            rtmp->set_recv_timeout(SRS_PAUSED_RECV_TIMEOUT_US);
            rtmp->set_send_timeout(SRS_PAUSED_SEND_TIMEOUT_US);
            
            srs_trace("control message(close) accept, retry stream service.");
            continue;
        }
        
        // for other system control message, fatal error.
        srs_error("control message(%d) reject as error. ret=%d", ret, ret);
        return ret;
    }
    
    return ret;
}

stream_service_cycle :

int SrsRtmpConn::stream_service_cycle()
{
    int ret = ERROR_SUCCESS;
        
    SrsRtmpConnType type;
    if ((ret = rtmp->identify_client(res->stream_id, type, req->stream, req->duration)) != ERROR_SUCCESS) {
        if (!srs_is_client_gracefully_close(ret)) {
            srs_error("identify client failed. ret=%d", ret);
        }
        return ret;
    }
    
    srs_discovery_tc_url(req->tcUrl, req->schema, req->host, req->vhost, req->app, req->stream, req->port, req->param);
    req->strip();
    srs_trace("client identified, type=%s, stream_name=%s, duration=%.2f, param=%s",
        srs_client_type_string(type).c_str(), req->stream.c_str(), req->duration, req->param.c_str());
    
    // discovery vhost, resolve the vhost from config
    SrsConfDirective* parsed_vhost = _srs_config->get_vhost(req->vhost);
    if (parsed_vhost) {
        req->vhost = parsed_vhost->arg0();
    }
    
    if (req->schema.empty() || req->vhost.empty() || req->port.empty() || req->app.empty()) {
        ret = ERROR_RTMP_REQ_TCURL;
        srs_error("discovery tcUrl failed. "
                  "tcUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, ret=%d",
                  req->tcUrl.c_str(), req->schema.c_str(), req->vhost.c_str(), req->port.c_str(), req->app.c_str(), ret);
        return ret;
    }
    
    if ((ret = check_vhost()) != ERROR_SUCCESS) {
        srs_error("check vhost failed. ret=%d", ret);
        return ret;
    }
    
    srs_trace("connected stream, tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, stream=%s, param=%s, args=%s",
        req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(),
        req->schema.c_str(), req->vhost.c_str(), req->port.c_str(),
        req->app.c_str(), req->stream.c_str(), req->param.c_str(), (req->args? "(obj)":"null"));
    
    // do token traverse before serve it.
    // @see https://github.com/ossrs/srs/pull/239
    if (true) {
        bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
        bool edge_traverse = _srs_config->get_vhost_edge_token_traverse(req->vhost);
        if (vhost_is_edge && edge_traverse) {
            if ((ret = check_edge_token_traverse_auth()) != ERROR_SUCCESS) {
                srs_warn("token auth failed, ret=%d", ret);
                return ret;
            }
        }
    }
    
    // security check
    if ((ret = security->check(type, ip, req)) != ERROR_SUCCESS) {
        srs_error("security check failed. ret=%d", ret);
        return ret;
    }
    srs_info("security check ok");
    
    // Never allow the empty stream name, for HLS may write to a file with empty name.
    // @see https://github.com/ossrs/srs/issues/834
    if (req->stream.empty()) {
        ret = ERROR_RTMP_STREAM_NAME_EMPTY;
        srs_error("RTMP: Empty stream name not allowed, ret=%d", ret);
        return ret;
    }

    // client is identified, set the timeout to service timeout.
    rtmp->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
    rtmp->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
    
    // find a source to serve.
    SrsSource* source = NULL;
    if ((ret = SrsSource::fetch_or_create(req, server, &source)) != ERROR_SUCCESS) {
        return ret;
    }
    srs_assert(source != NULL);
    
    // update the statistic when source disconveried.
    SrsStatistic* stat = SrsStatistic::instance();
    if ((ret = stat->on_client(_srs_context->get_id(), req, this, type)) != ERROR_SUCCESS) {
        srs_error("stat client failed. ret=%d", ret);
        return ret;
    }

    bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
    bool enabled_cache = _srs_config->get_gop_cache(req->vhost);
    srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%d[%d]",
        req->get_stream_url().c_str(), ip.c_str(), enabled_cache, vhost_is_edge, 
        source->source_id(), source->source_id());
    source->set_cache(enabled_cache);
    
    client_type = type;
    //根据客户端类型进入不同分支
    switch (type) {
        case SrsRtmpConnPlay: {
            srs_verbose("start to play stream %s.", req->stream.c_str());
            
            // response connection start play
            if ((ret = rtmp->start_play(res->stream_id)) != ERROR_SUCCESS) {
                srs_error("start to play stream failed. ret=%d", ret);
                return ret;
            }
            if ((ret = http_hooks_on_play()) != ERROR_SUCCESS) {
                srs_error("http hook on_play failed. ret=%d", ret);
                return ret;
            }
            
            srs_info("start to play stream %s success", req->stream.c_str());
            ret = playing(source);
            http_hooks_on_stop();
            
            return ret;
        }
        case SrsRtmpConnFMLEPublish: {
            srs_verbose("FMLE start to publish stream %s.", req->stream.c_str());
            
            if ((ret = rtmp->start_fmle_publish(res->stream_id)) != ERROR_SUCCESS) {
                srs_error("start to publish stream failed. ret=%d", ret);
                return ret;
            }
            
            return publishing(source);
        }
        case SrsRtmpConnHaivisionPublish: {
            srs_verbose("Haivision start to publish stream %s.", req->stream.c_str());
            
            if ((ret = rtmp->start_haivision_publish(res->stream_id)) != ERROR_SUCCESS) {
                srs_error("start to publish stream failed. ret=%d", ret);
                return ret;
            }
            
            return publishing(source);
        }
        case SrsRtmpConnFlashPublish: {
            srs_verbose("flash start to publish stream %s.", req->stream.c_str());
            
            if ((ret = rtmp->start_flash_publish(res->stream_id)) != ERROR_SUCCESS) {
                srs_error("flash start to publish stream failed. ret=%d", ret);
                return ret;
            }
            
            return publishing(source);
        }
        default: {
            ret = ERROR_SYSTEM_CLIENT_INVALID;
            srs_info("invalid client type=%d. ret=%d", type, ret);
            return ret;
        }
    }

    return ret;
}

先进行 tmp->identify_client 客户端身份识别。

然后根据根据客户端类型( type )进入不同分支。

SrsRtmpConnPlay 是客户端播流。

SrsRtmpConnFMLEPublish 是Rtmp推流到服务器。

SrsRtmpConnHaivisionPublish 应该是海康威视推流到服务器?

SrsRtmpConnFlashPublish 是Flash推流到服务器。

这儿只看 SrsRtmpConnFMLEPublish

进入 int SrsRtmpConn::publishing(SrsSource* source) ,然后 int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)

int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
{
...
    // start isolate recv thread.
    if ((ret = trd->start()) != ERROR_SUCCESS) {
        srs_error("start isolate recv thread failed. ret=%d", ret);
        return ret;
    }
    ...
}

trd 协程运行,协程循环:执行 rtmp->recv_message(&msg) 后调用 int SrsPublishRecvThread::handle(SrsCommonMessage* msg)

再回调到 int SrsRtmpConn::handle_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle, bool vhost_is_edge)

之后处理收到的数据:

int SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool vhost_is_edge)
{
    int ret = ERROR_SUCCESS;
    
    // for edge, directly proxy message to origin.
    if (vhost_is_edge) {
        if ((ret = source->on_edge_proxy_publish(msg)) != ERROR_SUCCESS) {
            srs_error("edge publish proxy msg failed. ret=%d", ret);
            return ret;
        }
        return ret;
    }
    
    // process audio packet
    if (msg->header.is_audio()) {
        if ((ret = source->on_audio(msg)) != ERROR_SUCCESS) {
            srs_error("source process audio message failed. ret=%d", ret);
            return ret;
        }
        return ret;
    }
    // process video packet
    if (msg->header.is_video()) {
        if ((ret = source->on_video(msg)) != ERROR_SUCCESS) {
            srs_error("source process video message failed. ret=%d", ret);
            return ret;
        }
        return ret;
    }
    
    // process aggregate packet
    if (msg->header.is_aggregate()) {
        if ((ret = source->on_aggregate(msg)) != ERROR_SUCCESS) {
            srs_error("source process aggregate message failed. ret=%d", ret);
            return ret;
        }
        return ret;
    }
    
    // process onMetaData
    if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
        SrsPacket* pkt = NULL;
        if ((ret = rtmp->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
            srs_error("decode onMetaData message failed. ret=%d", ret);
            return ret;
        }
        SrsAutoFree(SrsPacket, pkt);
    
        if (dynamic_cast<SrsOnMetaDataPacket*>(pkt)) {
            SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt);
            if ((ret = source->on_meta_data(msg, metadata)) != ERROR_SUCCESS) {
                srs_error("source process onMetaData message failed. ret=%d", ret);
                return ret;
            }
            srs_info("process onMetaData message success.");
            return ret;
        }
        
        srs_info("ignore AMF0/AMF3 data message.");
        return ret;
    }
    
    return ret;
}

如果本服务器是edge边缘服务器(vhost_is_edge)直接推流回源到源服务器。

audio和video分开处理。

这儿只看一下video的处理:

int SrsSource::on_video(SrsCommonMessage* shared_video)
{
    int ret = ERROR_SUCCESS;
    
    // monotically increase detect.
    if (!mix_correct && is_monotonically_increase) {
        if (last_packet_time > 0 && shared_video->header.timestamp < last_packet_time) {
            is_monotonically_increase = false;
            srs_warn("VIDEO: stream not monotonically increase, please open mix_correct.");
        }
    }
    last_packet_time = shared_video->header.timestamp;
    
    // drop any unknown header video.
    // @see https://github.com/ossrs/srs/issues/421
    if (!SrsFlvCodec::video_is_acceptable(shared_video->payload, shared_video->size)) {
        char b0 = 0x00;
        if (shared_video->size > 0) {
            b0 = shared_video->payload[0];
        }
        
        srs_warn("drop unknown header video, size=%d, bytes[0]=%#x", shared_video->size, b0);
        return ret;
    }
    
    // convert shared_video to msg, user should not use shared_video again.
    // the payload is transfer to msg, and set to NULL in shared_video.
    SrsSharedPtrMessage msg;
    if ((ret = msg.create(shared_video)) != ERROR_SUCCESS) {
        srs_error("initialize the video failed. ret=%d", ret);
        return ret;
    }
    srs_info("Video dts=%"PRId64", size=%d", msg.timestamp, msg.size);
    
    // directly process the audio message.
    if (!mix_correct) {
        return on_video_imp(&msg);
    }
    
    // insert msg to the queue.
    mix_queue->push(msg.copy());
    
    // fetch someone from mix queue.
    SrsSharedPtrMessage* m = mix_queue->pop();
    if (!m) {
        return ret;
    }
    
    // consume the monotonically increase message.
    if (m->is_audio()) {
        ret = on_audio_imp(m);
    } else {
        ret = on_video_imp(m);
    }
    srs_freep(m);
    
    return ret;
}

shared_video 转换为 SrsSharedPtrMessage

调用 on_video_imp

int SrsSource::on_video_imp(SrsSharedPtrMessage* msg)
{
    int ret = ERROR_SUCCESS;
    
    srs_info("Video dts=%"PRId64", size=%d", msg->timestamp, msg->size);
    
    bool is_sequence_header = SrsFlvCodec::video_is_sequence_header(msg->payload, msg->size);
    
    // whether consumer should drop for the duplicated sequence header.
    bool drop_for_reduce = false;
    if (is_sequence_header && cache_sh_video && _srs_config->get_reduce_sequence_header(_req->vhost)) {
        if (cache_sh_video->size == msg->size) {
            drop_for_reduce = srs_bytes_equals(cache_sh_video->payload, msg->payload, msg->size);
            srs_warn("drop for reduce sh video, size=%d", msg->size);
        }
    }
    
    // cache the sequence header if h264
    // donot cache the sequence header to gop_cache, return here.
    if (is_sequence_header) {
        srs_freep(cache_sh_video);
        cache_sh_video = msg->copy();
        
        // parse detail audio codec
        SrsAvcAacCodec codec;
        
        // user can disable the sps parse to workaround when parse sps failed.
        // @see https://github.com/ossrs/srs/issues/474
        codec.avc_parse_sps = _srs_config->get_parse_sps(_req->vhost);
        
        SrsCodecSample sample;
        if ((ret = codec.video_avc_demux(msg->payload, msg->size, &sample)) != ERROR_SUCCESS) {
            srs_error("source codec demux video failed. ret=%d", ret);
            return ret;
        }
        
        // when got video stream info.
        SrsStatistic* stat = SrsStatistic::instance();
        if ((ret = stat->on_video_info(_req, SrsCodecVideoAVC, codec.avc_profile, codec.avc_level)) != ERROR_SUCCESS) {
            return ret;
        }
        
        srs_trace("%dB video sh,  codec(%d, profile=%s, level=%s, %dx%d, %dkbps, %dfps, %ds)",
            msg->size, codec.video_codec_id,
            srs_codec_avc_profile2str(codec.avc_profile).c_str(),
            srs_codec_avc_level2str(codec.avc_level).c_str(), codec.width, codec.height,
            codec.video_data_rate / 1000, codec.frame_rate, codec.duration);
    }
    
#ifdef SRS_AUTO_HLS
    if ((ret = hls->on_video(msg, is_sequence_header)) != ERROR_SUCCESS) {
        // apply the error strategy for hls.
        // @see https://github.com/ossrs/srs/issues/264
        std::string hls_error_strategy = _srs_config->get_hls_on_error(_req->vhost);
        if (srs_config_hls_is_on_error_ignore(hls_error_strategy)) {
            srs_warn("hls process video message failed, ignore and disable hls. ret=%d", ret);
            
            // unpublish, ignore ret.
            hls->on_unpublish();
            
            // ignore.
            ret = ERROR_SUCCESS;
        } else if (srs_config_hls_is_on_error_continue(hls_error_strategy)) {
            if (srs_hls_can_continue(ret, cache_sh_video, msg)) {
                ret = ERROR_SUCCESS;
            } else {
                srs_warn("hls continue video failed. ret=%d", ret);
                return ret;
            }
        } else {
            srs_warn("hls disconnect publisher for video error. ret=%d", ret);
            return ret;
        }
    }
#endif
    
#ifdef SRS_AUTO_DVR
    if ((ret = dvr->on_video(msg)) != ERROR_SUCCESS) {
        srs_warn("dvr process video message failed, ignore and disable dvr. ret=%d", ret);
        
        // unpublish, ignore ret.
        dvr->on_unpublish();
        
        // ignore.
        ret = ERROR_SUCCESS;
    }
#endif

#ifdef SRS_AUTO_HDS
    if ((ret = hds->on_video(msg)) != ERROR_SUCCESS) {
        srs_warn("hds process video message failed, ignore and disable dvr. ret=%d", ret);
        
        // unpublish, ignore ret.
        hds->on_unpublish();
        // ignore.
        ret = ERROR_SUCCESS;
    }
#endif
    
    // copy to all consumer
    if (!drop_for_reduce) {
        for (int i = 0; i < (int)consumers.size(); i++) {
            SrsConsumer* consumer = consumers.at(i);
            if ((ret = consumer->enqueue(msg, atc, jitter_algorithm)) != ERROR_SUCCESS) {
                srs_error("dispatch the video failed. ret=%d", ret);
                return ret;
            }
        }
        srs_info("dispatch video success.");
    }

    // copy to all forwarders.
    if (!forwarders.empty()) {
        std::vector<SrsForwarder*>::iterator it;
        for (it = forwarders.begin(); it != forwarders.end(); ++it) {
            SrsForwarder* forwarder = *it;
            if ((ret = forwarder->on_video(msg)) != ERROR_SUCCESS) {
                srs_error("forwarder process video message failed. ret=%d", ret);
                return ret;
            }
        }
    }
    
    // when sequence header, donot push to gop cache and adjust the timestamp.
    if (is_sequence_header) {
        return ret;
    }

    // cache the last gop packets
    if ((ret = gop_cache->cache(msg)) != ERROR_SUCCESS) {
        srs_error("gop cache msg failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("cache gop success.");
    
    // if atc, update the sequence header to abs time.
    if (atc) {
        if (cache_sh_video) {
            cache_sh_video->timestamp = msg->timestamp;
        }
        if (cache_metadata) {
            cache_metadata->timestamp = msg->timestamp;
        }
    }
    
    return ret;
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

支付革命

支付革命

马梅、朱晓明、周金黄、季家友、陈宇 / 中信出版社 / 2014-2-1 / 49.00元

本书是中国首部深入探讨第三方支付的著作。 本书以电子交易方式、电子货币及电子认证技术演变的“三重奏”将决定电子支付中介的发展为主线,分析了中国第三方支付从“小支付”走向“大金融”的历史逻辑、技术逻辑和商业逻辑,揭示了第三方支付特别是创新型第三方支付机构发展对提升中国经济运行效率的作用,分析了第三方支付的未来发展趋向,并提出了相应的政策建议。 本书旨在以小见大,立足于揭示互联网与移动互联......一起来看看 《支付革命》 这本书的介绍吧!

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具