ffplay数据读取线程

简介: ffplay播放器实现原理源码走读

在前面我们介绍了ffplay的总体架构和一些关键的数据结构。今天我们还是从这张图开始,主要介绍ffplay的读取线程部分。

ffplay总体架构

图导出的可能有点模糊,再加上上传图床后不知道有没有更加模糊了,想要高清大图的可以后台留言,加v信索取。

从ffplay的main函数入口开始阅读源码,发现是在函数stream_open创建了资源读取线程,读取线程执行的函数是read_thread,所以要分析读取线程的工作内容,我们只需读懂函数read_thread即可。

下面是我加了注释的read_thread函数:

/**
 * 读取线程工作内容
 * @param arg
 * @return
 */
static int read_thread(void *arg)
{
    VideoState *is = arg;
    AVFormatContext *ic = NULL;
    int err, i, ret;
    int st_index[AVMEDIA_TYPE_NB];
    AVPacket *pkt = NULL;
    int64_t stream_start_time;
    int pkt_in_play_range = 0;
    const AVDictionaryEntry *t;
    // 互斥量,读取线程总不能一直读取吧,播放消费队列消费比不上读取的速度,队列满了就要陷入等待唤醒
    SDL_mutex *wait_mutex = SDL_CreateMutex();
    int scan_all_pmts_set = 0;
    int64_t pkt_ts;

    if (!wait_mutex) {
        av_log(NULL, AV_LOG_FATAL, "SDL_CreateMutex(): %s\n", SDL_GetError());
        ret = AVERROR(ENOMEM);
        goto fail;
    }
    // 所有流的索引都设置成-1
    memset(st_index, -1, sizeof(st_index));
    is->eof = 0;
    // 分配pack
    pkt = av_packet_alloc();
    if (!pkt) {
        av_log(NULL, AV_LOG_FATAL, "Could not allocate packet.\n");
        ret = AVERROR(ENOMEM);
        goto fail;
    }
    // 解封装上下文
    ic = avformat_alloc_context();
    if (!ic) {
        av_log(NULL, AV_LOG_FATAL, "Could not allocate context.\n");
        ret = AVERROR(ENOMEM);
        goto fail;
    }
    // 回调函数,防止读取过程中阻塞时间过长
    ic->interrupt_callback.callback = decode_interrupt_cb;
    ic->interrupt_callback.opaque = is;
    if (!av_dict_get(format_opts, "scan_all_pmts", NULL, AV_DICT_MATCH_CASE)) {
        av_dict_set(&format_opts, "scan_all_pmts", "1", AV_DICT_DONT_OVERWRITE);
        scan_all_pmts_set = 1;
    }
    // 打开
    err = avformat_open_input(&ic, is->filename, is->iformat, &format_opts);
    if (err < 0) {
        print_error(is->filename, err);
        ret = -1;
        goto fail;
    }
    if (scan_all_pmts_set)
        av_dict_set(&format_opts, "scan_all_pmts", NULL, AV_DICT_MATCH_CASE);

    if ((t = av_dict_get(format_opts, "", NULL, AV_DICT_IGNORE_SUFFIX))) {
        av_log(NULL, AV_LOG_ERROR, "Option %s not found.\n", t->key);
        ret = AVERROR_OPTION_NOT_FOUND;
        goto fail;
    }
    is->ic = ic;

    if (genpts)
        ic->flags |= AVFMT_FLAG_GENPTS;

    // 将其注入成一个变量
    av_format_inject_global_side_data(ic);

    if (find_stream_info) {
        AVDictionary **opts = setup_find_stream_info_opts(ic, codec_opts);
        int orig_nb_streams = ic->nb_streams;

        /*
         * 探测媒体类型,可得到当前文件的封装格式,音视频编码参数等信息
         * 调用该函数后得多的参数信息会比只调用avformat_open_input更为详细,
         * 其本质上是去做了decdoe packet获取信息的工作
         */
        err = avformat_find_stream_info(ic, opts);

        for (i = 0; i < orig_nb_streams; i++)
            av_dict_free(&opts[i]);
        av_freep(&opts);

        if (err < 0) {
            av_log(NULL, AV_LOG_WARNING,
                   "%s: could not find codec parameters\n", is->filename);
            ret = -1;
            goto fail;
        }
    }

    if (ic->pb)
        ic->pb->eof_reached = 0; // FIXME hack, ffplay maybe should not use avio_feof() to test for the end

    if (seek_by_bytes < 0)
        seek_by_bytes = !!(ic->iformat->flags & AVFMT_TS_DISCONT) && strcmp("ogg", ic->iformat->name);

    is->max_frame_duration = (ic->iformat->flags & AVFMT_TS_DISCONT) ? 10.0 : 3600.0;

    if (!window_title && (t = av_dict_get(ic->metadata, "title", NULL, 0)))
        window_title = av_asprintf("%s - %s", t->value, input_filename);

    /* if seeking requested, we execute it */
    // 起始播放时间处理
    if (start_time != AV_NOPTS_VALUE) {
        int64_t timestamp;

        timestamp = start_time;
        /* add the stream start time */
        if (ic->start_time != AV_NOPTS_VALUE)
            timestamp += ic->start_time;
        // 跳到播放位置
        ret = avformat_seek_file(ic, -1, INT64_MIN, timestamp, INT64_MAX, 0);
        if (ret < 0) {
            av_log(NULL, AV_LOG_WARNING, "%s: could not seek to position %0.3f\n",
                    is->filename, (double)timestamp / AV_TIME_BASE);
        }
    }

    // 是否是时时流
    is->realtime = is_realtime(ic);

    if (show_status)
        // 打印媒体信息
        av_dump_format(ic, 0, is->filename, 0);

    // 流索引相关
    for (i = 0; i < ic->nb_streams; i++) {
        AVStream *st = ic->streams[i];
        enum AVMediaType type = st->codecpar->codec_type;
        st->discard = AVDISCARD_ALL;
        if (type >= 0 && wanted_stream_spec[type] && st_index[type] == -1)
            if (avformat_match_stream_specifier(ic, st, wanted_stream_spec[type]) > 0)
                st_index[type] = i;
    }
    for (i = 0; i < AVMEDIA_TYPE_NB; i++) {
        if (wanted_stream_spec[i] && st_index[i] == -1) {
            av_log(NULL, AV_LOG_ERROR, "Stream specifier %s does not match any %s stream\n", wanted_stream_spec[i], av_get_media_type_string(i));
            st_index[i] = INT_MAX;
        }
    }

    if (!video_disable)
        // 查找视频流索引
        st_index[AVMEDIA_TYPE_VIDEO] =
            av_find_best_stream(ic, AVMEDIA_TYPE_VIDEO,
                                st_index[AVMEDIA_TYPE_VIDEO], -1, NULL, 0);
    if (!audio_disable)
        st_index[AVMEDIA_TYPE_AUDIO] =
            av_find_best_stream(ic, AVMEDIA_TYPE_AUDIO,
                                st_index[AVMEDIA_TYPE_AUDIO],
                                st_index[AVMEDIA_TYPE_VIDEO],
                                NULL, 0);
    if (!video_disable && !subtitle_disable)
        st_index[AVMEDIA_TYPE_SUBTITLE] =
            av_find_best_stream(ic, AVMEDIA_TYPE_SUBTITLE,
                                st_index[AVMEDIA_TYPE_SUBTITLE],
                                (st_index[AVMEDIA_TYPE_AUDIO] >= 0 ?
                                 st_index[AVMEDIA_TYPE_AUDIO] :
                                 st_index[AVMEDIA_TYPE_VIDEO]),
                                NULL, 0);

    // 更新视频信息初始化窗口参数
    is->show_mode = show_mode;
    if (st_index[AVMEDIA_TYPE_VIDEO] >= 0) {
        AVStream *st = ic->streams[st_index[AVMEDIA_TYPE_VIDEO]];
        AVCodecParameters *codecpar = st->codecpar;
        AVRational sar = av_guess_sample_aspect_ratio(ic, st, NULL);
        if (codecpar->width)
            set_default_window_size(codecpar->width, codecpar->height, sar);
    }

    /* open the streams */
    // 打开视频流,内不创建了解码线程
    if (st_index[AVMEDIA_TYPE_AUDIO] >= 0) {
        stream_component_open(is, st_index[AVMEDIA_TYPE_AUDIO]);
    }

    ret = -1;
    // 打开音频流
    if (st_index[AVMEDIA_TYPE_VIDEO] >= 0) {
        ret = stream_component_open(is, st_index[AVMEDIA_TYPE_VIDEO]);
    }
    if (is->show_mode == SHOW_MODE_NONE)
        is->show_mode = ret >= 0 ? SHOW_MODE_VIDEO : SHOW_MODE_RDFT;

    // 打开字幕流
    if (st_index[AVMEDIA_TYPE_SUBTITLE] >= 0) {
        stream_component_open(is, st_index[AVMEDIA_TYPE_SUBTITLE]);
    }

    if (is->video_stream < 0 && is->audio_stream < 0) {
        av_log(NULL, AV_LOG_FATAL, "Failed to open file '%s' or configure filtergraph\n",
               is->filename);
        ret = -1;
        goto fail;
    }

    if (infinite_buffer < 0 && is->realtime)
        infinite_buffer = 1;

    // 循环读取
    for (;;) {
        // 是否退出了
        if (is->abort_request)
            break;
        // 更新是否暂停了
        if (is->paused != is->last_paused) {
            is->last_paused = is->paused;
            if (is->paused)
                is->read_pause_return = av_read_pause(ic);
            else
                av_read_play(ic);
        }
#if CONFIG_RTSP_DEMUXER || CONFIG_MMSH_PROTOCOL
        if (is->paused &&
                (!strcmp(ic->iformat->name, "rtsp") ||
                 (ic->pb && !strncmp(input_filename, "mmsh:", 5)))) {
            /* wait 10 ms to avoid trying to get another packet */
            /* XXX: horrible */
            SDL_Delay(10);
            continue;
        }
#endif
        // 是否seek了
        if (is->seek_req) {
            int64_t seek_target = is->seek_pos;
            int64_t seek_min    = is->seek_rel > 0 ? seek_target - is->seek_rel + 2: INT64_MIN;
            int64_t seek_max    = is->seek_rel < 0 ? seek_target - is->seek_rel - 2: INT64_MAX;
// FIXME the +-2 is due to rounding being not done in the correct direction in generation
//      of the seek_pos/seek_rel variables

            ret = avformat_seek_file(is->ic, -1, seek_min, seek_target, seek_max, is->seek_flags);
            if (ret < 0) {
                av_log(NULL, AV_LOG_ERROR,
                       "%s: error while seeking\n", is->ic->url);
            } else {
                // seek了要刷新待解码包队列
                if (is->audio_stream >= 0)
                    packet_queue_flush(&is->audioq);
                if (is->subtitle_stream >= 0)
                    packet_queue_flush(&is->subtitleq);
                if (is->video_stream >= 0)
                    packet_queue_flush(&is->videoq);
                if (is->seek_flags & AVSEEK_FLAG_BYTE) {
                   set_clock(&is->extclk, NAN, 0);
                } else {
                   set_clock(&is->extclk, seek_target / (double)AV_TIME_BASE, 0);
                }
            }
            // seek复位
            is->seek_req = 0;
            is->queue_attachments_req = 1;
            is->eof = 0;
            if (is->paused)
                step_to_next_frame(is);
        }
        // 检测video是否为attached_pic
        if (is->queue_attachments_req) {
            // attached_pic 附带的图片。比如说一些MP3,AAC音频文件附带的专辑封面,所以需要注意的是音频文件不一定只存在音频流本身
            if (is->video_st && is->video_st->disposition & AV_DISPOSITION_ATTACHED_PIC) {
                if ((ret = av_packet_ref(pkt, &is->video_st->attached_pic)) < 0)
                    goto fail;
                packet_queue_put(&is->videoq, pkt);
                packet_queue_put_nullpacket(&is->videoq, pkt, is->video_stream);
            }
            is->queue_attachments_req = 0;
        }

        /* if the queue are full, no need to read more */
        // 队列是否有足够的包
        if (infinite_buffer<1 &&
              (is->audioq.size + is->videoq.size + is->subtitleq.size > MAX_QUEUE_SIZE
            || (stream_has_enough_packets(is->audio_st, is->audio_stream, &is->audioq) &&
                stream_has_enough_packets(is->video_st, is->video_stream, &is->videoq) &&
                stream_has_enough_packets(is->subtitle_st, is->subtitle_stream, &is->subtitleq)))) {
            /* wait 10 ms */
            SDL_LockMutex(wait_mutex);
            SDL_CondWaitTimeout(is->continue_read_thread, wait_mutex, 10);
            SDL_UnlockMutex(wait_mutex);
            continue;
        }
        // 播放完了,看是否需要重播
        if (!is->paused &&
            (!is->audio_st || (is->auddec.finished == is->audioq.serial && frame_queue_nb_remaining(&is->sampq) == 0)) &&
            (!is->video_st || (is->viddec.finished == is->videoq.serial && frame_queue_nb_remaining(&is->pictq) == 0))) {
            if (loop != 1 && (!loop || --loop)) {
                // 这里的重播就不用再次从新走一次逻辑了,直接seek就好了
                stream_seek(is, start_time != AV_NOPTS_VALUE ? start_time : 0, 0, 0);
            } else if (autoexit) {
                ret = AVERROR_EOF;
                goto fail;
            }
        }
        // 读取一个包
        ret = av_read_frame(ic, pkt);
        if (ret < 0) {
            // 读到末尾了,放入空包,以便后续冲刷解码器
            if ((ret == AVERROR_EOF || avio_feof(ic->pb)) && !is->eof) {
                if (is->video_stream >= 0)
                    packet_queue_put_nullpacket(&is->videoq, pkt, is->video_stream);
                if (is->audio_stream >= 0)
                    packet_queue_put_nullpacket(&is->audioq, pkt, is->audio_stream);
                if (is->subtitle_stream >= 0)
                    packet_queue_put_nullpacket(&is->subtitleq, pkt, is->subtitle_stream);
                is->eof = 1;
            }
            if (ic->pb && ic->pb->error) {
                if (autoexit)
                    goto fail;
                else
                    break;
            }
            // 休息一会,但是可以被唤醒,有需求你就去干
            SDL_LockMutex(wait_mutex);
            SDL_CondWaitTimeout(is->continue_read_thread, wait_mutex, 10);
            SDL_UnlockMutex(wait_mutex);
            continue;
        } else {
            is->eof = 0;
        }
        // 将包放入队列前先检查这个包是否在播放的时间之内
        /* check if packet is in play range specified by user, then queue, otherwise discard */
        stream_start_time = ic->streams[pkt->stream_index]->start_time;
        pkt_ts = pkt->pts == AV_NOPTS_VALUE ? pkt->dts : pkt->pts;
        pkt_in_play_range = duration == AV_NOPTS_VALUE ||
                (pkt_ts - (stream_start_time != AV_NOPTS_VALUE ? stream_start_time : 0)) *
                av_q2d(ic->streams[pkt->stream_index]->time_base) -
                (double)(start_time != AV_NOPTS_VALUE ? start_time : 0) / 1000000
                <= ((double)duration / 1000000);
        // 将包放入队列
        if (pkt->stream_index == is->audio_stream && pkt_in_play_range) {
            packet_queue_put(&is->audioq, pkt);
        } else if (pkt->stream_index == is->video_stream && pkt_in_play_range
                   && !(is->video_st->disposition & AV_DISPOSITION_ATTACHED_PIC)) {
            packet_queue_put(&is->videoq, pkt);
        } else if (pkt->stream_index == is->subtitle_stream && pkt_in_play_range) {
            packet_queue_put(&is->subtitleq, pkt);
        } else {
            av_packet_unref(pkt);
        }
    }

    ret = 0;
 fail:
    if (ic && !is->ic)
        avformat_close_input(&ic);

    av_packet_free(&pkt);
    if (ret != 0) {
        SDL_Event event;

        event.type = FF_QUIT_EVENT;
        event.user.data1 = is;
        SDL_PushEvent(&event);
    }
    SDL_DestroyMutex(wait_mutex);
    return 0;
}

读取线程准备工作

1、创建读取线程互斥量SDL_CreateMutex

2、分配AVPacket,av_packet_alloc

3、分配解封装上下文avformat_alloc_context

4、设置回调函数ic->interrupt_callback.callback

5、打开媒体文件avformat_open_input

6、avformat_find_stream_info探测媒体信息

7、是否需要起始seek播放avformat_seek_file

8、获取流索引

9、stream_component_open初始化相关解码器组件

在ffplay读取线程的准备工作中,我们发现一个检查是否是实时流的方法,它是这样子判断的:

static int is_realtime(AVFormatContext *s)
{
    if(   !strcmp(s->iformat->name, "rtp")
       || !strcmp(s->iformat->name, "rtsp")
       || !strcmp(s->iformat->name, "sdp")
    )
        return 1;

    if(s->pb && (   !strncmp(s->url, "rtp:", 4)
                 || !strncmp(s->url, "udp:", 4)
                )
    )
        return 1;
    return 0;
}

for循环读取

1、首先通过大内总管VideoState判断是否请求了退出

2、根据是否暂停了控制RTSP流以及SDL显示

3、处理seek操作

在这个过程中会调用函数packet_queue_flush刷新待解码包队列,然后将队列的播放序列serial加1,这就是播放序列的妙用之处。

4、检查队列中是否有足够的包,stream_has_enough_packets,如果队列满了则读取线程最多等待10ms,但是可以被唤醒。

5、播放完了,判断是否需要进行重播

    // 播放完了,看是否需要重播
        if (!is->paused &&
            (!is->audio_st || (is->auddec.finished == is->audioq.serial && frame_queue_nb_remaining(&is->sampq) == 0)) &&
            (!is->video_st || (is->viddec.finished == is->videoq.serial && frame_queue_nb_remaining(&is->pictq) == 0))) {
            if (loop != 1 && (!loop || --loop)) {
                // 这里的重播就不用再次从新走一次逻辑了,直接seek就好了
                stream_seek(is, start_time != AV_NOPTS_VALUE ? start_time : 0, 0, 0);
            } else if (autoexit) {
                ret = AVERROR_EOF;
                goto fail;
            }
        }

6、av_read_frame读取一个资源包

如果读到了文件末尾则增加一个空包packet_queue_put_nullpacket,以便于后续在解码线程用于冲刷解码器,这样就能将解码器内部缓存的数据全部取出。

如果读取到的不是空包,则判断该包的pts是否处于有效播放时间内,如果不在则丢弃,如果在则放入队列packet_queue_put

    // 将包放入队列前先检查这个包是否在播放的时间之内
        /* check if packet is in play range specified by user, then queue, otherwise discard */
        stream_start_time = ic->streams[pkt->stream_index]->start_time;
        pkt_ts = pkt->pts == AV_NOPTS_VALUE ? pkt->dts : pkt->pts;
        pkt_in_play_range = duration == AV_NOPTS_VALUE ||
                (pkt_ts - (stream_start_time != AV_NOPTS_VALUE ? stream_start_time : 0)) *
                av_q2d(ic->streams[pkt->stream_index]->time_base) -
                (double)(start_time != AV_NOPTS_VALUE ? start_time : 0) / 1000000
                <= ((double)duration / 1000000);
        // 将包放入队列
        if (pkt->stream_index == is->audio_stream && pkt_in_play_range) {
            packet_queue_put(&is->audioq, pkt);
        } else if (pkt->stream_index == is->video_stream && pkt_in_play_range
                   && !(is->video_st->disposition & AV_DISPOSITION_ATTACHED_PIC)) {
            packet_queue_put(&is->videoq, pkt);
        } else if (pkt->stream_index == is->subtitle_stream && pkt_in_play_range) {
            packet_queue_put(&is->subtitleq, pkt);
        } else {
            av_packet_unref(pkt);
        }

7、失败处理,释放相关资源,SDL发送退出事件FF_QUIT_EVENT

推荐阅读

FFmpeg连载1-开发环境搭建
FFmpeg连载2-分离视频和音频
FFmpeg连载3-视频解码
FFmpeg连载4-音频解码
FFmpeg连载5-音视频编码
FFmpeg连载6-音频重采样
FFmpeg连载8-视频合并以及替换视频背景音乐实战
ffplay调试环境搭建
ffplay整体框架

关注我,一起进步,人生不止coding!!!

目录
相关文章
|
8月前
|
存储 前端开发 Java
【C++ 多线程 】C++并发编程:精细控制数据打印顺序的策略
【C++ 多线程 】C++并发编程:精细控制数据打印顺序的策略
219 1
|
4月前
|
消息中间件 监控 安全
服务Down机了,线程池中的数据如何保证不丢失?
在分布式系统与高并发应用开发中,服务的稳定性和数据的持久性是两个至关重要的考量点。当服务遭遇Down机时,如何确保线程池中处理的数据不丢失,是每一位开发者都需要深入思考的问题。以下,我将从几个关键方面分享如何在这种情况下保障数据的安全与完整性。
80 2
|
2月前
|
消息中间件 监控 Java
线程池关闭时未完成的任务如何保证数据的一致性?
保证线程池关闭时未完成任务的数据一致性需要综合运用多种方法和机制。通过备份与恢复、事务管理、任务状态记录与恢复、数据同步与协调、错误处理与补偿、监控与预警等手段的结合,以及结合具体业务场景进行分析和制定策略,能够最大程度地确保数据的一致性,保障系统的稳定运行和业务的顺利开展。同时,不断地优化和改进这些方法和机制,也是提高系统性能和可靠性的重要途径。
124 62
|
3月前
|
缓存 安全 Java
使用 Java 内存模型解决多线程中的数据竞争问题
【10月更文挑战第11天】在 Java 多线程编程中,数据竞争是一个常见问题。通过使用 `synchronized` 关键字、`volatile` 关键字、原子类、显式锁、避免共享可变数据、合理设计数据结构、遵循线程安全原则和使用线程池等方法,可以有效解决数据竞争问题,确保程序的正确性和稳定性。
65 2
|
7月前
|
缓存 NoSQL Java
Java高并发实战:利用线程池和Redis实现高效数据入库
Java高并发实战:利用线程池和Redis实现高效数据入库
540 0
|
8月前
|
消息中间件 Java Kafka
实时计算 Flink版操作报错之Apache Flink中的SplitFetcher线程在读取数据时遇到了未预期的情况,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
消息中间件 存储 Java
服务重启了,如何保证线程池中的数据不丢失?
【8月更文挑战第30天】为确保服务重启时线程池数据不丢失,可采用数据持久化(如数据库或文件存储)、使用可靠的任务队列(如消息队列或分布式任务队列系统)、状态监测与恢复机制,以及分布式锁等方式。这些方法能有效提高系统稳定性和可靠性,需根据具体需求选择合适方案并进行测试优化。
268 5
|
5月前
处理串口线程数据的函数
【8月更文挑战第4天】处理串口线程数据的函数。
34 4
|
5月前
|
数据处理 Python
解锁Python多线程编程魔法,告别漫长等待!让数据下载如飞,感受科技带来的速度与激情!
【8月更文挑战第22天】Python以简洁的语法和强大的库支持在多个领域大放异彩。尽管存在全局解释器锁(GIL),Python仍提供多线程支持,尤其适用于I/O密集型任务。通过一个多线程下载数据的例子,展示了如何使用`threading`模块创建多线程程序,并与单线程版本进行了性能对比。实验表明,多线程能显著减少总等待时间,但在CPU密集型任务上GIL可能会限制其性能提升。此案例帮助理解Python多线程的优势及其适用场景。
52 0
|
5月前
|
NoSQL Redis
Lettuce的特性和内部实现问题之在同步调用模式下,业务线程是如何拿到结果数据的
Lettuce的特性和内部实现问题之在同步调用模式下,业务线程是如何拿到结果数据的