Skip to content

事务消息使用简介

unixliang edited this page Oct 10, 2018 · 4 revisions

支持事务消息的分支:https://github.com/Tencent/phxqueue/tree/tx

PhxQueue事务消息使用简介

事务消息作用

用类似2PC的方式执行分布式事务,不仅对主从事务解耦,保证主事务稳定,还能保证主从事务两者最终一致。

主从事务的概念如下:

主事务一般是核心逻辑,逻辑重,同步调用

从事务一般是次要逻辑,逻辑轻,异步调用

事务消息流程

Prepare/Commit/Rollback

pcr

PhxQueue通过二阶段提交保证主从事务最终一致。

使用者先通过 Prepare,将消息暂存到事件中心,该消息对外部不可见;

Prepare后,使用者执行主事务,成功就执行 Commit,PhxQueue 投递消息到从事务;失败就执行 Rollback,PhxQueue 丢弃消息不做投递。

TxQuery

default

网络分区、业务系统重启等原因,都会导致 Commit/Rollback 丢失,这时要靠反查机制来恢复整个分布式事务的上下文。

PhxQueue 先查内部状态,看使用者是否曾经执行过 Commit 或 Rollback,如果都没,才通过 TxQuery 反问业务。

有了这一套机制,业务系统仅用一个无状态的逻辑层,就能实现分布式事务,这个模式对业务非常便利。

PhxQueue事务消息架构

default

Prepare 成功后,消息会进入 Prepare队列,由 Prepare队列 来驱动反查。 Commit 成功后,消息会转而进入 Commit队列,消息从 Commit队列 出队后将会被推送给订阅者。

Demo运行

构建

中文部署手册 完成部署

配置说明

etc/topicconfig.conf

{
    "topic":
    {
        "topic_id": "1000",
        "handle_ids": ["1", "2", "3", "4"],
        "store_paxos_batch_count": "80",
        "store_paxos_batch_delay_time_ms": "30"
    },
    "queue_infos":
    [
        {
            "queue_info_id": "1",
            "ranges": ["0"]  <-- Commit队列
        },
        {
            "queue_info_id": "2",
            "ranges": ["1"], <-- Prepare队列
            "delay": "1", <-- 该队列的重试间隔
            "count": "20" <-- 该队列的重试次数;默认值 -1 表示无限重试
        }
    ],
    "pubs":
    [
        {
            "pub_id": "1", <-- 普通消息pub,事务消息不会用到,忽略
            "consumer_group_ids": ["1"],
            "queue_info_ids": ["1"],
            "sub_ids": ["1"]
        },
        {
            "pub_id": "2", <-- 事务消息pub
            "consumer_group_ids": ["1"],
            "queue_info_ids": ["1"], <-- 指定 Commit队列
            "sub_ids": ["1"], <-- 指定订阅者是谁
            "is_transaction": "1", <-- 该pub处理事务消息
            "tx_query_sub_id": "2", <-- 指定被反查方是谁(反查方其实也可以作为一个订阅者)
            "tx_query_queue_info_ids": ["2"] <-- 指定 Prepare队列
        }
    ],
    "consumer_groups":
    [
        {
            "consumer_group_id": "1",
            "use_dynamic_scale": "0",
            "skip_lock": "1"
        }
    ],
    "subs":
    [
        {
            "sub_id": "1", <-- 订阅者标识
            "sub_name": "push",
            "consumer_group_id": "1",
            "route_conf": "etc/push_routeconfig.conf" <-- 订阅者路由
        },
        {
            "sub_id": "2", <-- 被反查方标识
            "sub_name": "txquery",
            "consumer_group_id": "1",
            "route_conf": "etc/txquery_routeconfig.conf" <-- 被反查方路由
        }

    ]
}

etc/push_routeconfig.conf

{
    "general":
    {
        "conn_timeout_ms": "500",
        "uri": "/push" <-- 订阅者接收推送的HTTP POST URI
    },
    "routes":
    [
        {
            "addr":
            {
                "ip": "127.0.0.1",
                "port": "8081"
            },
            "scale": "1000"
        }
    ]
}

etc/push_routeconfig.conf

{
    "general":
    {
        "conn_timeout_ms": "500",
        "uri": "/push" <-- 订阅者接收推送的HTTP POST URI
    },
    "routes":
    [
        {
            "addr":
            {
                "ip": "127.0.0.1",
                "port": "8081"
            },
            "scale": "1000"
        }
    ]
}

etc/txquery_routeconfig.conf

{
    "general":
    {
        "conn_timeout_ms": "500",
        "uri": "/txquery" <-- 被反查方接收反查请求的HTTP POST URI
    },
    "routes":
    [
        {
            "addr":
            {
                "ip": "127.0.0.1",
                "port": "8081"
            },
            "scale": "1000"
        }
    ]
}

运行步骤

subscriber.py同时处理/push/txquery,其行为是:前10次/txquery返回UNCERTAIN,随即返回Commit

Case 1:Prepare + Commit

$ phxqueue_phxrpc/test/test_producer_main -f prepare
succeeded! func prepare client_id 1539090487_REHlvT3nBa buf wMrSGK5Tv1

$ phxqueue_phxrpc/test/test_producer_main -f commit -c 1539090487_REHlvT3nBa
succeeded! func commit client_id 1539090487_REHlvT3nBa buf wMrSGK5Tv1

Subscriber输出


...


----- Request Start ----->

request_path /txquery <-- 反查请求
topic_id 1000
pub_id 2
client_id 1539090487_REHlvT3nBa
count 3              <-- Subscriber 数次返回 UNCERTAIN,PhxQueue 不断反查
atime 1539090487
buffer REHlvT3nBa
127.0.0.1 - - [09/Oct/2018 13:08:11] "POST /txquery HTTP/1.0" 200 -
<----- Request End -----


----- Request Start ----->

request_path /push  <-- 执行 Commit 后,PhxQueue 随即推送消息给 Subscriber
topic_id 1000
pub_id 2
client_id 1539090487_REHlvT3nBa
count 0
atime 1539090487
buffer REHlvT3nBa
127.0.0.1 - - [09/Oct/2018 13:08:13] "POST /push HTTP/1.0" 200 -
<----- Request End -----

Case 2:Prepare + Rollback

$ phxqueue_phxrpc/test/test_producer_main -f prepare
succeeded! func prepare client_id 1538919775_hj8gqrUcq2 buf hj8gqrUcq2

$ phxqueue_phxrpc/test/test_producer_main -f rollback -c 1538919775_hj8gqrUcq2
succeeded! func rollback client_id 1538919775_hj8gqrUcq2 buf egZ7XoYsRw

Subscriber输出

...


----- Request Start ----->

request_path /txquery
topic_id 1000
pub_id 2
client_id 1538966857_MlO6436Qwe
count 5              <-- Subscriber 数次返回 UNCERTAIN,PhxQueue 不断反查
atime 1538966857
buffer MlO6436Qwe
127.0.0.1 - - [08/Oct/2018 02:47:44] "POST /txquery HTTP/1.0" 200 -
<----- Request End -----

<-- 执行Rollback之后,PhxQueue 丢弃消息,不再推送

Case 3:Prepare + 反查返回Commit

Subscriber输出

...

----- Request Start ----->

request_path /txquery
topic_id 1000
pub_id 2
client_id 1539132534_AhyzRJRVq4
count 10          <-- Subscriber 数次返回 UNCERTAIN,PhxQueue 不断反查;第10次反查,Subscribe 返回 Commit
atime 1539132535
buffer AhyzRJRVq4
127.0.0.1 - - [10/Oct/2018 00:49:08] "POST /txquery HTTP/1.0" 200 -
<----- Request End -----


----- Request Start ----->

request_path /push    <-- PhxQueue 收到 Commit,进行推送
topic_id 1000
pub_id 2
client_id 1539132534_AhyzRJRVq4
count 0
atime 1539132535
buffer AhyzRJRVq4
127.0.0.1 - - [10/Oct/2018 00:49:09] "POST /push HTTP/1.0" 200 -
<----- Request End -----