基于多线程的方式优化 FLink 程序

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 Tair(兼容Redis),内存型 2GB
简介: 这篇内容介绍了线程的基本概念和重要性。线程是程序执行的最小单位,比进程更细粒度,常用于提高程序响应性和性能。多线程可以实现并发处理,利用多核处理器,实现资源共享和复杂逻辑。文章还讨论了线程的五种状态(NEW、RUNNABLE、BLOCKED、WAITING、TIMED_WAITING和TERMINATED)以及如何在Java中创建和停止线程。最后提到了两种停止线程的方法:使用标识和中断机制。

一、前言

线程算是相对较高级的内容,主要的原因不是说他难,而是它不可见。最近基于多线程的方式优化了一些 FLink 程序,所以这一系列,我们聊聊多线程

二 线程

2.1 进程和线程关系

进程是计算机系统进行资源分配和调度的最小单位,换句话说我们平时双击那些后缀为 .exe的文件时都会产生一个进程。

进程可以产生若干个线程,是程序执行的最小单位,换句话说,进程就是房子,线程就是房子内一个个干活的人

2.2 为什么需要线程

线程在计算机编程中扮演着重要角色,其重要性主要体现在以下几个方面:

  1. 提高程序响应性:通过多线程处理,程序可以变得更加灵活和响应更加及时。在一个单线程程序中,如果有一个耗时的操作,会导致整个程序阻塞,影响用户体验;而多线程可以使程序保持活跃,允许其他线程继续执行,从而提高程序的响应性。
  2. 提高程序性能:多线程可以充分利用多核处理器的优势,实现并发执行多个任务,加快程序运行速度,提高系统整体性能。通过并行执行,程序可以更有效地利用计算资源,加快任务完成的速度。
  3. 实现并发处理:多线程允许程序同时执行多个任务,这对于需要同时处理多个事件或任务的应用程序至关重要。例如,在服务器端应用中,多线程可以同时处理多个客户端请求。
  4. 资源共享:多个线程可以共享进程的资源,如内存空间、文件句柄等,这种资源共享有助于简化程序设计,并提高效率。不同线程之间可以相互通信、共享数据,协同工作来完成复杂任务。
  5. 实现复杂逻辑:有些程序需要同时进行多项任务,通过多线程可以更好地组织和管理复杂的逻辑,提高程序的可维护性和可拓展性。
  6. 实现异步编程:多线程可以实现异步操作和事件驱动,允许程序在等待某些操作完成时继续执行其他操作,提高程序的效率和灵活性。异步编程模型通过非阻塞方式进行任务处理,可以有效提升程序的吞吐量和性能。 综合以上原因,线程在计算机编程中是不可或缺的,它提供了一种有效的机制来实现并发处理、提高程序的响应性和性能、实现资源共享以及管理复杂的程序逻辑。因此,了解和灵活运用线程是提升程序效率和优化系统性能的重要手段。

2.3 线程的状态

java

复制代码

public enum State {
    /**
     * Thread state for a thread which has not yet started.
     */
    NEW,

    /**
     * Thread state for a runnable thread.  A thread in the runnable
     * state is executing in the Java virtual machine but it may
     * be waiting for other resources from the operating system
     * such as processor.
     */
    RUNNABLE,

    /**
     * Thread state for a thread blocked waiting for a monitor lock.
     * A thread in the blocked state is waiting for a monitor lock
     * to enter a synchronized block/method or
     * reenter a synchronized block/method after calling
     * {@link Object#wait() Object.wait}.
     */
    BLOCKED,

    /**
     * Thread state for a waiting thread.
     * A thread is in the waiting state due to calling one of the
     * following methods:
     * <ul>
     *   <li>{@link Object#wait() Object.wait} with no timeout</li>
     *   <li>{@link #join() Thread.join} with no timeout</li>
     *   <li>{@link LockSupport#park() LockSupport.park}</li>
     * </ul>
     *
     * <p>A thread in the waiting state is waiting for another thread to
     * perform a particular action.
     *
     * For example, a thread that has called <tt>Object.wait()</tt>
     * on an object is waiting for another thread to call
     * <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on
     * that object. A thread that has called <tt>Thread.join()</tt>
     * is waiting for a specified thread to terminate.
     */
    WAITING,

    /**
     * Thread state for a waiting thread with a specified waiting time.
     * A thread is in the timed waiting state due to calling one of
     * the following methods with a specified positive waiting time:
     * <ul>
     *   <li>{@link #sleep Thread.sleep}</li>
     *   <li>{@link Object#wait(long) Object.wait} with timeout</li>
     *   <li>{@link #join(long) Thread.join} with timeout</li>
     *   <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
     *   <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
     * </ul>
     */
    TIMED_WAITING,

    /**
     * Thread state for a terminated thread.
     * The thread has completed execution.
     */
    TERMINATED;
}

2.4 如何创建线程

java

复制代码

Thread thread = new Thread();
thread.start();

在Java中,当调用线程对象的start()方法时,实际上是调用了start0()方法,该方法会启动一个新的本地操作系统线程, 然后调用Java中的run()方法来执行线程的任务。所有 线程的主要工作的方法就是 run 方法,那么怎么样来丰富 run 方法的内容呢?

首先通过匿名内部类来启动线程,如:

java

复制代码

Thread thread1 = new Thread(){
    @Override
    public void run() {
        while (true){
            if (Thread.currentThread().isInterrupted()){
                System.out.println("Interruted!");
                break;
            }
            Thread.yield();
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("thread1==");
        }
    }
} ;

thread1.start();

当然也可以实现 Runnable 接口来实现线程的功能

java

复制代码

public class CreateThread3 implements Runnable {
    public static void main(String[] args) {
        Thread t1 = new Thread(new CreateThread3());
        t1.start();
    }

    @Override
    public void run() {
        System.out.println("Oh,I am Runnable");
    }

}

2.5 线程停止

2.5.1 使用标识

java

复制代码

public class ControlledThread extends Thread {
    private volatile boolean shouldStop = false;

    public void stopThread() {
        shouldStop = true;
    }

    @Override
    public void run() {
        while (!shouldStop) {
            // 线程执行的任务
            System.out.println("Thread is running...");
        }
        System.out.println("Thread stopped.");
    }

    public static void main(String[] args) {
        ControlledThread thread = new ControlledThread();
        thread.start();

        // 模拟停止线程
        try {
            Thread.sleep(3000); // 等待3秒
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        thread.stopThread();
    }
}

需要注意的是,这里使用 volatile 变量来保证该变量对于任意线程可见,如果不用 volatile 的话,则线程可能会无法停止

2.5.2 使用 interrup()

java

复制代码

public class InterruptedThread extends Thread {

    @Override
    public void run() {
        while (!Thread.interrupted()) {
            // 线程执行的任务
            System.out.println("Thread is running...");
        }
        System.out.println("Thread stopped.");
    }

    public static void main(String[] args) {
        InterruptedThread thread = new InterruptedThread();
        thread.start();

        // 模拟停止线程
        try {
            Thread.sleep(3000); // 等待3秒
            thread.interrupt(); // 中断线程
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}


转载来源:https://juejin.cn/post/7376099820509642786

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
1月前
|
数据采集 存储 弹性计算
高并发Java爬虫的瓶颈分析与动态线程优化方案
高并发Java爬虫的瓶颈分析与动态线程优化方案
|
7月前
|
SQL 算法 调度
Flink批处理自适应执行计划优化
本文整理自阿里集团高级开发工程师孙夏在Flink Forward Asia 2024的分享,聚焦Flink自适应逻辑执行计划与Join算子优化。内容涵盖自适应批处理调度器、动态逻辑执行计划、自适应Broadcast Hash Join及Join倾斜优化等技术细节,并展望未来改进方向,如支持更多场景和智能优化策略。文章还介绍了Flink UI调整及性能优化措施,为批处理任务提供更高效、灵活的解决方案。
288 0
Flink批处理自适应执行计划优化
|
5月前
|
Kubernetes Linux Go
使用 Uber automaxprocs 正确设置 Go 程序线程数
`automaxprocs` 包就是专门用来解决此问题的,并且用法非常简单,只需要使用匿名导入的方式 `import _ "go.uber.org/automaxprocs"` 一行代码即可搞定。
269 78
|
5月前
|
SQL 关系型数据库 MySQL
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
Apache Flink CDC 3.4.0 版本正式发布!经过4个月的开发,此版本强化了对高频表结构变更的支持,新增 batch 执行模式和 Apache Iceberg Sink 连接器,可将数据库数据全增量实时写入 Iceberg 数据湖。51位贡献者完成了259次代码提交,优化了 MySQL、MongoDB 等连接器,并修复多个缺陷。未来 3.5 版本将聚焦脏数据处理、数据限流等能力及 AI 生态对接。欢迎下载体验并提出反馈!
1013 1
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
|
4月前
|
数据采集 存储 Web App开发
多线程爬虫优化:快速爬取并写入CSV
多线程爬虫优化:快速爬取并写入CSV
|
4月前
|
机器学习/深度学习 监控 算法
局域网行为监控软件 C# 多线程数据包捕获算法:基于 KMP 模式匹配的内容分析优化方案探索
本文探讨了一种结合KMP算法的多线程数据包捕获与分析方案,用于局域网行为监控。通过C#实现,该系统可高效检测敏感内容、管理URL访问、分析协议及审计日志。实验表明,相较于传统算法,KMP在处理大规模网络流量时效率显著提升。未来可在算法优化、多模式匹配及机器学习等领域进一步研究。
135 0
|
7月前
|
存储 SQL Java
Flink CDC + Hologres高性能数据同步优化实践
本文整理自阿里云高级技术专家胡一博老师在Flink Forward Asia 2024数据集成(二)专场的分享,主要内容包括:1. Hologres介绍:实时数据仓库,支持毫秒级写入和高QPS查询;2. 写入优化:通过改进缓冲队列、连接池和COPY模式提高吞吐量和降低延迟;3. 消费优化:优化离线场景和分区表的消费逻辑,提升性能和资源利用率;4. 未来展望:进一步简化用户操作,支持更多DDL操作及全增量消费。Hologres 3.0全新升级为一体化实时湖仓平台,提供多项新功能并降低使用成本。
581 1
Flink CDC + Hologres高性能数据同步优化实践
|
6月前
|
数据采集 存储 网络协议
Java HttpClient 多线程爬虫优化方案
Java HttpClient 多线程爬虫优化方案
|
9月前
|
Java 调度 Python
多线程优化For循环:实战指南
本文介绍如何使用多线程优化For循环,提高程序处理大量数据或耗时操作的效率。通过并行任务处理,充分利用多核处理器性能,显著缩短执行时间。文中详细解释了多线程基础概念,如线程、进程、线程池等,并提供了Python代码示例,包括单线程、多线程和多进程实现方式。最后,还总结了使用多线程或多进程时需要注意的事项,如线程数量、任务拆分、共享资源访问及异常处理等。
289 7
|
10月前
|
并行计算 算法 安全
面试必问的多线程优化技巧与实战
多线程编程是现代软件开发中不可或缺的一部分,特别是在处理高并发场景和优化程序性能时。作为Java开发者,掌握多线程优化技巧不仅能够提升程序的执行效率,还能在面试中脱颖而出。本文将从多线程基础、线程与进程的区别、多线程的优势出发,深入探讨如何避免死锁与竞态条件、线程间的通信机制、线程池的使用优势、线程优化算法与数据结构的选择,以及硬件加速技术。通过多个Java示例,我们将揭示这些技术的底层原理与实现方法。
543 3