Java分布式环境下并发编程实践

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 Tair(兼容Redis),内存型 2GB
简介: Java分布式环境下并发编程实践

在Java中,我们可以使用线程来实现并发编程,但是在多线程编程中,我们需要考虑线程安全、锁、死锁等问题。本文将介绍Java中的并发编程,包括线程安全、锁、死锁等内容,同时提供实际的代码案例,让读者更容易理解和掌握。


随着分布式系统越来越普及,分布式系统中的并发编程成为了一个重要的话题。Java作为一种高级编程语言,其并发编程能力得到了广泛的认可。但在分布式系统中,Java并发编程面临着一些新的挑战。本文将介绍在分布式系统下Java并发编程的一些技术和实际案例。


569045c644f3aadcc792c479da213462.png



一、线程安全


多线程编程中,线程安全是一个重要的问题。如果多个线程同时访问同一个共享资源,就会出现线程安全问题。例如,在银行账户转账时,如果多个线程同时对同一个账户进行操作,就会出现线程安全问题。


解决线程安全问题的方法之一是使用synchronized关键字。synchronized关键字可以将代码块或方法锁定,保证同一时间只有一个线程可以执行该代码块或方法。

下面是一个使用synchronized关键字的示例:

public class Counter {
    private int count;
    public synchronized void increment() {
        count++;
    }
    public synchronized void decrement() {
        count--;
    }
    public int getCount() {
        return count;
    }
}


在这个示例中,Counter类有两个方法increment()和decrement(),它们都是使用synchronized关键字来保证线程安全。这样,同一时间只有一个线程可以执行increment()和decrement()方法。


二、锁


在Java中,锁是一种同步机制,可以用于控制多个线程对共享资源的访问。Java中的锁有两种类型:内置锁和显式锁。

内置锁是Java中的一个特殊对象,每个对象都有一个内置锁。可以使用synchronized关键字来获取内置锁。例如:

public synchronized void increment() {
    count++;
}


在这个示例中,synchronized关键字获取了Counter对象的内置锁。这样,在同一时间只有一个线程可以访问increment()方法。


显式锁是Java中的另一种锁类型,可以使用java.util.concurrent.locks包中的Lock接口来实现。与内置锁不同,显式锁提供了更多的灵活性和控制。例如:

public class Counter {
    private int count;
    private Lock lock = new ReentrantLock();
    public void increment() {
        lock.lock();
        try {
            count++;
        } finally {
            lock.unlock();
        }
    }
    public void decrement() {
        lock.lock();
        try {
            count--;
        } finally {
            lock.unlock();
        }
    }
    public int getCount() {
        return count;
    }
}

在这个示例中,Counter类使用ReentrantLock类来创建一个显式锁。increment()和decrement()方法获取锁并释放锁。这样,在同一时间只有一个线程可以访问increment()和decrement()方法。


三、死锁


image-20230409205211771.png

死锁是多线程编程中的一种问题,它发生在两个或多个线程互相等待对方释放锁的情况下。例如:

public class DeadlockExample {
    private final Object lock1 = new Object();
    private final Object lock2 = new Object();
    public void method1() {
        synchronized (lock1) {
            // do something
            synchronized (lock2) {
                // do something
            }
        }
    }
    public void method2() {
        synchronized (lock2) {
            // do something
            synchronized (lock1) {
                // do something
            }
        }
    }
}

在这个示例中,DeadlockExample类有两个方法method1()和method2(),它们都使用两个锁lock1和lock2。如果一个线程调用method1()方法并获取了lock1锁,另一个线程调用method2()方法并获取了lock2锁,那么两个线程都无法继续执行,因为它们都在等待对方释放锁。这就是死锁。


避免死锁的方法之一是使用定时锁。定时锁可以在一定时间内自动释放锁,避免死锁。例如:

public class DeadlockExample {
    private final Object lock1 = new Object();
    private final Object lock2 = new Object();
    private final Lock timedLock1 = new ReentrantLock();
    private final Lock timedLock2 = new ReentrantLock();
    public void method1() {
        timedLock1.lock();
        try {
            // do something
            if (timedLock2.tryLock(500, TimeUnit.MILLISECONDS)) {
                try {
                    // do something
                } finally {
                    timedLock2.unlock();
                }
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            timedLock1.unlock();
        }
    }
    public void method2() {
        timedLock2.lock();
        try {
            // do something
            if (timedLock1.tryLock(500, TimeUnit.MILLISECONDS)) {
                try {
                    // do something
                } finally {
                    timedLock1.unlock();
                }
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            timedLock2.unlock();
        }
    }
}


在这个示例中,DeadlockExample类使用定时锁timedLock1和timedLock2来避免死锁。如果一个线程调用method1()方法并获取了timedLock1锁,另一个线程调用method2()方法并获取了timedLock2锁,那么它们会等待一段时间,如果在这段时间内无法获取到对方的锁,就会自动释放自己的锁,避免死锁。


四、分布式系统下的并发编程挑战


image-20230409205403525.png


在分布式系统中,由于不同的节点之间通过网络进行通信,因此会带来以下一些挑战:


1.网络延迟

在分布式系统中,由于节点之间通过网络进行通信,因此会存在网络延迟。这会导致节点之间的通信变慢,从而影响并发编程的效率。为了解决这个问题,可以采用异步编程模型,即通过回调函数的方式来处理网络通信。


2.数据一致性

在分布式系统中,由于数据分布在不同的节点上,因此会存在数据一致性的问题。如果不同节点上的数据不一致,就会导致系统出现异常。为了解决这个问题,可以采用分布式锁或者分布式事务来保证数据一致性。


3.容错性

在分布式系统中,由于节点之间存在网络通信,因此会存在节点宕机的情况。为了保证系统的容错性,需要采用一些容错机制,例如备份节点、自动故障转移等。


五、分布式锁的实现


在分布式系统中,为了保证数据一致性,需要采用分布式锁来控制对共享资源的访问。下面介绍一种基于Redis实现的分布式锁。

image-20230409205625391.png

笑小枫友情链接,感兴趣的进

更详细的分布式锁可以参考文章分布式锁的 3 种实现方案


1. Redis实现分布式锁的原理

Redis是一个高性能的键值存储系统,支持多种数据结构,例如字符串、哈希表、列表等。Redis提供了一种原子性的操作,可以实现分布式锁。


实现分布式锁的原理如下:


1)客户端向Redis发送一个SETNX命令,尝试去设置一个key的值为1,如果这个key不存在,则设置成功,否则设置失败。


2)客户端设置了这个key的值为1之后,就拥有了这个锁。


3)其他客户端也可以向Redis发送SETNX命令,尝试去设置这个key的值为1,但是由于这个key已经存在了,因此设置失败。


4)当客户端完成了对共享资源的访问之后,需要将这个key删除,以便其他客户端可以获得这个锁。


2. Redis实现分布式锁的代码实现

下面是基于Redis实现分布式锁的代码实现:

public class RedisDistributedLock {
    private static final String LOCK_SUCCESS = "OK";
    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_WITH_EXPIRE_TIME = "PX";
    private static final Long RELEASE_SUCCESS = 1L;
    private static final String LOCK_PREFIX = "lock:";
    private JedisPool jedisPool;
    public RedisDistributedLock(JedisPool jedisPool) {
        this.jedisPool = jedisPool;
    }
    public boolean tryLock(String key, String requestId, int expireTime) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            String lockKey = LOCK_PREFIX + key;
            String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
            if (LOCK_SUCCESS.equals(result)) {
                return true;
            }
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
        return false;
    }
    public boolean releaseLock(String key, String requestId) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            String lockKey = LOCK_PREFIX + key;
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
            Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
            if (RELEASE_SUCCESS.equals(result)) {
                return true;
            }
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
        return false;
    }
}


3. Redisson实现分布式锁的代码实现


当然,这里介绍的是比较原生的方式,我们也可以直接使用Redisson框架封装的分布式锁。


Redisson是一个基于Redis的Java客户端,提供了丰富的分布式数据结构和服务。其中就包括分布式锁的实现,下面介绍一下如何使用Redisson实现分布式锁。


  • 引入Redisson依赖
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.15.5</version>
</dependency>


  • 使用分布式锁

好的,下面提供一个更详细的代码示例:

import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import java.util.concurrent.TimeUnit;
public class DistributedLockDemo {
    public static void main(String[] args) {
        // 创建Redisson客户端
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        RedissonClient redisson = Redisson.create(config);
        // 获取分布式锁
        RLock lock = redisson.getLock("mylock");
        try {
            // 尝试获取锁,等待时间为10秒,锁的有效期为5秒
            boolean isLocked = lock.tryLock(10, 5, TimeUnit.SECONDS);
            if (isLocked) {
                // 获取锁成功,执行业务逻辑
                System.out.println("获取锁成功,执行业务逻辑...");
                Thread.sleep(3000); // 模拟业务逻辑耗时
            } else {
                // 获取锁失败,处理异常情况
                System.out.println("获取锁失败,处理异常情况...");
            }
        } catch (Exception e) {
            // 处理异常情况
            System.out.println("处理异常情况...");
        } finally {
            // 释放锁
            lock.unlock();
            System.out.println("释放锁...");
        }
        // 关闭Redisson客户端
        redisson.shutdown();
    }
}

在上面的代码中,我们首先创建了一个RedissonClient对象,然后通过该对象获取一个RLock对象。在try...catch...finally代码块中,我们调用tryLock方法尝试获取锁,如果获取成功就执行业务逻辑;否则就处理异常情况。最后,在finally代码块中释放锁,并关闭RedissonClient对象。


需要注意的是,在实际应用中,我们需要将上面的代码封装成一个可重入的分布式锁工具类,方便各个业务模块使用。


六、分布式事务的实现


在分布式系统中,为了保证数据一致性,需要采用分布式事务来控制对共享资源的访问。下面介绍一种基于XA协议实现的分布式事务。


image-20230409205723358.png


笑小枫友情链接,感兴趣的进

具体事务相关可以参考文章七种常见分布式事务详解(2PC、3PC、TCC、Saga、本地事务表、MQ事务消息、最大努力通知)


1. XA协议的原理

XA协议是一种分布式事务协议,可以用于协调多个数据库的事务。XA协议的原理如下:


1)事务管理器向数据库发送XA START命令,开始一个分布式事务。


2)事务管理器向数据库发送XA END命令,结束一个分布式事务。


3)事务管理器向数据库发送XA PREPARE命令,准备提交一个分布式事务。


4)如果所有数据库都准备好提交事务,则事务管理器向数据库发送XA COMMIT命令,提交分布式事务。


5)如果有任何一个数据库无法提交事务,则事务管理器向所有数据库发送XA ROLLBACK命令,回滚分布式事务。


2. XA协议的代码实现


下面是基于XA协议实现分布式事务的代码实现:

public class XADistributedTransaction {
    private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
    private static final String DB_URL = "jdbc:mysql://localhost:3306/test";
    private static final String USER = "root";
    private static final String PASSWORD = "root";
    private static final String XA_DATASOURCE_CLASSNAME = "com.mysql.jdbc.jdbc2.optional.MysqlXADataSource";
    private static final String XA_DATASOURCE_URL = "jdbc:mysql://localhost:3306/test";
    private static final String XA_DATASOURCE_USER = "root";
    private static final String XA_DATASOURCE_PASSWORD = "root";
    private static final String XID_PREFIX = "xa_";
    private static final String TABLE_NAME = "account";
    private static final String TABLE_SCHEMA = "CREATE TABLE account (id INT PRIMARY KEY, balance INT)";
    private static final String INSERT_SQL = "INSERT INTO account (id, balance) VALUES (?, ?)";
    private static final String UPDATE_SQL = "UPDATE account SET balance = ? WHERE id = ?";
    public void transferMoney(int fromId, int toId, int amount) throws SQLException {
        XADataSource xaDataSource = getXADataSource();
        Connection connection = xaDataSource.getXAConnection().getConnection();
        Xid xid = createXid();
        try {
            connection.setAutoCommit(false);
            XAResource xaResource = getXAResource(connection);
            xaResource.start(xid, XAResource.TMNOFLAGS);
            try (PreparedStatement preparedStatement = connection.prepareStatement(UPDATE_SQL)) {
                preparedStatement.setInt(1, getBalance(connection, fromId) - amount);
                preparedStatement.setInt(2, fromId);
                preparedStatement.executeUpdate();
            }
            xaResource.end(xid, XAResource.TMSUCCESS);
            xaResource.start(xid, XAResource.TMNOFLAGS);
            try (PreparedStatement preparedStatement = connection.prepareStatement(UPDATE_SQL)) {
                preparedStatement.setInt(1, getBalance(connection, toId) + amount);
                preparedStatement.setInt(2, toId);
                preparedStatement.executeUpdate();
            }
            xaResource.end(xid, XAResource.TMSUCCESS);
            int prepare = xaResource.prepare(xid);
            if (prepare == XAResource.XA_OK) {
                xaResource.commit(xid, false);
            } else {
                xaResource.rollback(xid);
            }
            connection.commit();
        } catch (SQLException | XAException e) {
            connection.rollback();
            throw e;
        } finally {
            connection.close();
        }
    }
    private XADataSource getXADataSource() throws SQLException {
        MysqlXADataSource xaDataSource = new MysqlXADataSource();
        xaDataSource.setUrl(XA_DATASOURCE_URL);
        xaDataSource.setUser(XA_DATASOURCE_USER);
        xaDataSource.setPassword(XA_DATASOURCE_PASSWORD);
        return xaDataSource;
    }
    private Xid createXid() throws XAException {
        byte[] gtrid = new byte[10];
        byte[] bqual = new byte[10];
        Arrays.fill(gtrid, (byte) 9);
        Arrays.fill(bqual, (byte) 9);
        return new XidImpl(0x1234, gtrid, bqual);
    }
    private XAResource getXAResource(Connection connection) throws SQLException {
        return connection.unwrap(XAResource.class);
    }
    private int getBalance(Connection connection, int id) throws SQLException {
        try (PreparedStatement preparedStatement = connection.prepareStatement("SELECT balance FROM account WHERE id = ?")) {
            preparedStatement.setInt(1, id);
            try (ResultSet resultSet = preparedStatement.executeQuery()) {
                if (resultSet.next()) {
                    return resultSet.getInt("balance");
                }
            }
        }
        throw new RuntimeException("Account not found: " + id);
    }
    public void init() throws SQLException {
        try (Connection connection = DriverManager.getConnection(DB_URL, USER, PASSWORD)) {
            try (Statement statement = connection.createStatement()) {
                statement.executeUpdate("DROP TABLE IF EXISTS account");
                statement.executeUpdate(TABLE_SCHEMA);
                try (PreparedStatement preparedStatement = connection.prepareStatement(INSERT_SQL)) {
                    preparedStatement.setInt(1, 1);
                    preparedStatement.setInt(2, 1000);
                    preparedStatement.executeUpdate();
                }
                try (PreparedStatement preparedStatement = connection.prepareStatement(INSERT_SQL)) {
                    preparedStatement.setInt(1, 2);
                    preparedStatement.setInt(2, 1000);
                    preparedStatement.executeUpdate();
                }
            }
        }
    }
}


七、总结


本文介绍了Java中的并发编程,包括线程安全、锁、死锁等内容。在多线程编程中,线程安全是一个重要的问题,可以使用synchronized关键字或显式锁来实现。死锁是一个常见的问题,可以使用定时锁来避免。多线程编程需要仔细考虑线程安全和锁的问题,才能保证程序的正确性和性能。


并且讲解了在分布式系统下Java并发编程的一些技术和实际案例。在分布式系统中,Java并发编程需要面对网络延迟、数据一致性和容错性等挑战,需要采用一些技术和机制来解决这些问题。例如,可以采用基于Redis实现的分布式锁来控制对共享资源的访问,也可以采用基于XA协议实现的分布式事务来保证数据一致性。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
23天前
|
设计模式 安全 Java
Java编程中的单例模式:理解与实践
【10月更文挑战第31天】在Java的世界里,单例模式是一种优雅的解决方案,它确保一个类只有一个实例,并提供一个全局访问点。本文将深入探讨单例模式的实现方式、使用场景及其优缺点,同时提供代码示例以加深理解。无论你是Java新手还是有经验的开发者,掌握单例模式都将是你技能库中的宝贵财富。
33 2
|
20天前
|
存储 安全 Java
Java多线程编程的艺术:从基础到实践####
本文深入探讨了Java多线程编程的核心概念、应用场景及其实现方式,旨在帮助开发者理解并掌握多线程编程的基本技能。文章首先概述了多线程的重要性和常见挑战,随后详细介绍了Java中创建和管理线程的两种主要方式:继承Thread类与实现Runnable接口。通过实例代码,本文展示了如何正确启动、运行及同步线程,以及如何处理线程间的通信与协作问题。最后,文章总结了多线程编程的最佳实践,为读者在实际项目中应用多线程技术提供了宝贵的参考。 ####
|
17天前
|
监控 安全 Java
Java中的多线程编程:从入门到实践####
本文将深入浅出地探讨Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的摘要形式,本文将以一个简短的代码示例作为开篇,直接展示多线程的魅力,随后再详细解析其背后的原理与实现方式,旨在帮助读者快速理解并掌握Java多线程编程的基本技能。 ```java // 简单的多线程示例:创建两个线程,分别打印不同的消息 public class SimpleMultithreading { public static void main(String[] args) { Thread thread1 = new Thread(() -> System.out.prin
|
21天前
|
SQL Java 数据库连接
从理论到实践:Hibernate与JPA在Java项目中的实际应用
本文介绍了Java持久层框架Hibernate和JPA的基本概念及其在具体项目中的应用。通过一个在线书店系统的实例,展示了如何使用@Entity注解定义实体类、通过Spring Data JPA定义仓库接口、在服务层调用方法进行数据库操作,以及使用JPQL编写自定义查询和管理事务。这些技术不仅简化了数据库操作,还显著提升了开发效率。
34 3
|
20天前
|
Java UED
Java中的多线程编程基础与实践
【10月更文挑战第35天】在Java的世界中,多线程是提升应用性能和响应性的利器。本文将深入浅出地介绍如何在Java中创建和管理线程,以及如何利用同步机制确保数据一致性。我们将从简单的“Hello, World!”线程示例出发,逐步探索线程池的高效使用,并讨论常见的多线程问题。无论你是Java新手还是希望深化理解,这篇文章都将为你打开多线程的大门。
|
25天前
|
存储 缓存 安全
Java内存模型(JMM):深入理解并发编程的基石####
【10月更文挑战第29天】 本文作为一篇技术性文章,旨在深入探讨Java内存模型(JMM)的核心概念、工作原理及其在并发编程中的应用。我们将从JMM的基本定义出发,逐步剖析其如何通过happens-before原则、volatile关键字、synchronized关键字等机制,解决多线程环境下的数据可见性、原子性和有序性问题。不同于常规摘要的简述方式,本摘要将直接概述文章的核心内容,为读者提供一个清晰的学习路径。 ####
37 2
|
26天前
|
存储 NoSQL Java
Java调度任务如何使用分布式锁保证相同任务在一个周期里只执行一次?
【10月更文挑战第29天】Java调度任务如何使用分布式锁保证相同任务在一个周期里只执行一次?
69 1
|
26天前
|
Java 程序员 数据库连接
Java中的异常处理:理解与实践
【10月更文挑战第29天】在Java编程的世界里,异常像是不请自来的客人,它们可能在任何时候闯入我们的程序宴会。了解如何妥善处理这些意外访客,不仅能够保持我们程序的优雅和稳健,还能确保它不会因为一个小小的失误而全盘崩溃。本文将通过浅显易懂的方式,带领读者深入异常处理的核心概念,并通过实际示例展现如何在Java代码中实现有效的异常管理策略。
|
1月前
|
缓存 Java 调度
Java中的多线程编程:从基础到实践
【10月更文挑战第24天】 本文旨在为读者提供一个关于Java多线程编程的全面指南。我们将从多线程的基本概念开始,逐步深入到Java中实现多线程的方法,包括继承Thread类、实现Runnable接口以及使用Executor框架。此外,我们还将探讨多线程编程中的常见问题和最佳实践,帮助读者在实际项目中更好地应用多线程技术。
24 3
|
1月前
|
缓存 安全 Java
Java中的多线程编程:从基础到实践
【10月更文挑战第24天】 本文将深入探讨Java中的多线程编程,包括其基本原理、实现方式以及常见问题。我们将从简单的线程创建开始,逐步深入了解线程的生命周期、同步机制、并发工具类等高级主题。通过实际案例和代码示例,帮助读者掌握多线程编程的核心概念和技术,提高程序的性能和可靠性。
15 2