English 简体中文 繁體中文 한국 사람 日本語 Deutsch русский بالعربية TÜRKÇE português คนไทย french
查看: 6|回复: 0

分布式事务之2PC两阶段提交

[复制链接]
查看: 6|回复: 0

分布式事务之2PC两阶段提交

[复制链接]
查看: 6|回复: 0

387

主题

0

回帖

1171

积分

金牌会员

积分
1171
zxc

387

主题

0

回帖

1171

积分

金牌会员

积分
1171
2025-2-6 23:00:59 | 显示全部楼层 |阅读模式
1. 分布式事务概述

1.1 问题背景

在分布式系统中,业务操作可能跨越多个服务或数据库(如订单服务、库存服务、支付服务),传统单机事务(ACID)无法满足跨网络节点的数据一致性需求。

  • 网络不可靠:服务间调用可能失败或超时。
  • 数据一致性:不同节点间的状态需最终一致。
  • 性能与可用性:避免长时间锁资源导致系统阻塞。
分布式事务的核心目标是确保 跨服务/数据库的操作要么全部成功,要么全部回滚
2. 两阶段提交(2PC)

原理


  • 阶段一(Prepare):协调者询问所有参与者是否可提交,参与者锁定资源并返回“同意”或“拒绝”。
  • 阶段二(Commit/Rollback):若所有参与者同意,协调者发送提交命令;否则发送回滚命令。
以下是一个简化的 Java 两阶段提交(2PC) 具体实现示例,包含协调者(Coordinator)和参与者(Participant)的核心逻辑。代码通过模拟数据库操作展示2PC的关键流程:
<hr>1. 参与者(Participant)实现

每个参与者代表一个独立的数据库或服务,需支持准备(Prepare)、提交(Commit)、回滚(Rollback)操作。
import java.util.concurrent.atomic.AtomicBoolean;/** * 参与者(如数据库或服务) */public class Participant {    private String name;          // 参与者名称(如"DB1")    private AtomicBoolean prepared = new AtomicBoolean(false);  // 准备状态    private AtomicBoolean committed = new AtomicBoolean(false); // 提交状态    public Participant(String name) {        this.name = name;    }    /**     * 阶段一:准备操作(锁定资源)     * @return true表示准备成功,false表示失败     */    public boolean prepare() {        try {            // 模拟资源锁定,实际可能为操作数据库            System.out.println(name + ": Trying to prepare...");            Thread.sleep(100);  // 模拟网络延迟            boolean success = Math.random() > 0.2;  // 80%概率成功            if (success) {                prepared.set(true);                System.out.println(name + ": Prepared successfully.");                return true;            } else {                System.out.println(name + ": Prepare failed.");                return false;            }        } catch (InterruptedException e) {            Thread.currentThread().interrupt();            return false;        }    }    /**     * 阶段二:提交操作     */    public void commit() {        if (prepared.get()) {            // 实际提交事务(如更新数据库)            committed.set(true);            System.out.println(name + ": Committed.");        } else {            System.out.println(name + ": Cannot commit without preparation.");        }    }    /**     * 阶段二:回滚操作     */    public void rollback() {        if (prepared.get()) {            // 实际回滚事务(如恢复数据)            prepared.set(false);            System.out.println(name + ": Rolled back.");        } else {            System.out.println(name + ": No need to rollback.");        }    }    // 检查是否已提交    public boolean isCommitted() {        return committed.get();    }}<hr>2. 协调者(Coordinator)实现

协调者负责管理所有参与者,驱动两阶段提交流程。
import java.util.List;/** * 协调者(事务管理器) */public class Coordinator {    private List<Participant> participants;    public Coordinator(List<Participant> participants) {        this.participants = participants;    }    /**     * 执行两阶段提交事务     * @return true表示事务成功提交,false表示失败     */    public boolean executeTransaction() {        System.out.println("===== Phase 1: Prepare =====");        boolean allPrepared = participants.stream()                .allMatch(Participant::prepare);        System.out.println("===== Phase 2: Commit/Rollback =====");        if (allPrepared) {            participants.forEach(Participant::commit);            System.out.println("Transaction committed successfully.");            return true;        } else {            participants.forEach(Participant::rollback);            System.out.println("Transaction rolled back due to failures.");            return false;        }    }}<hr>3. 客户端测试代码

模拟包含两个参与者的分布式事务场景。
import java.util.Arrays;public class TwoPhaseCommitDemo {    public static void main(String[] args) {        // 创建两个参与者(如数据库DB1和DB2)        Participant db1 = new Participant("DB1");        Participant db2 = new Participant("DB2");        // 创建协调者并关联参与者        Coordinator coordinator = new Coordinator(Arrays.asList(db1, db2));        // 执行两阶段提交事务        boolean success = coordinator.executeTransaction();        // 输出最终状态        System.out.println("\nFinal Status:");        System.out.println("DB1 Committed: " + db1.isCommitted());        System.out.println("DB2 Committed: " + db2.isCommitted());        System.out.println("Transaction Result: " + (success ? "SUCCESS" : "FAILURE"));    }}<hr>4. 运行结果示例

成功场景(所有参与者准备成功)

===== Phase 1: Prepare =====DB1: Trying to prepare...DB1: Prepared successfully.DB2: Trying to prepare...DB2: Prepared successfully.===== Phase 2: Commit/Rollback =====DB1: Committed.DB2: Committed.Transaction committed successfully.Final Status:DB1 Committed: trueDB2 Committed: trueTransaction Result: SUCCESS失败场景(某一参与者准备失败)

===== Phase 1: Prepare =====DB1: Trying to prepare...DB1: Prepared successfully.DB2: Trying to prepare...DB2: Prepare failed.===== Phase 2: Commit/Rollback =====DB1: Rolled back.DB2: No need to rollback.Transaction rolled back due to failures.Final Status:DB1 Committed: falseDB2 Committed: falseTransaction Result: FAILURE<hr>5. 关键点说明


  • 阶段一(Prepare)

    • 协调者询问所有参与者是否可以提交。
    • 参与者锁定资源并记录操作日志。
    • 任一参与者失败则整个事务回滚。

  • 阶段二(Commit/Rollback)

    • 若所有参与者准备成功,协调者发送提交命令。
    • 若任一参与者失败,协调者发送回滚命令。

  • 代码简化说明

    • 实际应用中需处理网络超时、重试和持久化日志。
    • 分布式场景下需使用RPC或HTTP替代本地方法调用。
    • 生产环境建议使用成熟的XA协议实现(如Atomikos、Narayana)。

<hr>6. 2PC的局限性


  • 同步阻塞:参与者在Prepare阶段后需阻塞等待协调者指令。
  • 单点故障:协调者宕机可能导致事务悬挂。
  • 数据不一致:协调者与参与者在Commit阶段同时宕机时,可能部分提交。
<hr>关注微信公众号,查看更多技术文章。

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

387

主题

0

回帖

1171

积分

金牌会员

积分
1171

QQ|智能设备 | 粤ICP备2024353841号-1

GMT+8, 2025-3-10 15:53 , Processed in 2.246573 second(s), 30 queries .

Powered by 智能设备

©2025

|网站地图