利用 Postgres 的 Listen/Notify 机制打造轻量级发布订阅系统

· 代码经验

PostgreSQL 自带的 LISTENNOTIFY 功能可以让不同的数据库连接之间进行高效又简单的异步通信,因此特别适合用来搭建轻量级的发布-订阅(Pub/Sub)系统;相比轮询或者引入外部消息中间件的做法,它直接依靠数据库内核来发送通知,既不会让系统变得更复杂,又能快速对数据变动这类重要事件做出反应。

核心工作机制

实践代码演示

1. 纯 SQL 操作示例

先在一个 psql 会话里执行监听命令:

-- 会话 A:作为订阅方
LISTEN my_channel;

然后在另一个会话里发出通知:

-- 会话 B:作为发布方
NOTIFY my_channel, 'Hello from SQL!';

回到第一个会话,随便运行一条语句(比如 SELECT 1;),就能看到类似下面这样的提示信息:

Asynchronous notification "my_channel" with payload "Hello from SQL!" received from server process with PID 12345.

2. Python 应用集成示例

使用 psycopg2 这个库,我们可以在 Python 程序中轻松处理监听逻辑。

发布端 (publisher.py):

import psycopg2

conn = psycopg2.connect("dbname=test user=postgres")
cur = conn.cursor()

# 发送一条通知
cur.execute("NOTIFY my_channel, %s;", ("New order received!",))
conn.commit()

cur.close()
conn.close()

订阅端 (subscriber.py):

import psycopg2
import select

def start_listening():
    conn = psycopg2.connect("dbname=test user=postgres")
    conn.autocommit = True
    cur = conn.cursor()

    # 开始监听通道
    cur.execute("LISTEN my_channel;")
    print("Now listening on 'my_channel'...")

    while True:
        ready, _, _ = select.select([conn], [], [], 5)
        if not ready:
            print("No message within timeout period")
        else:
            conn.poll()
            while conn.notifies:
                msg = conn.notifies.pop(0)
                print(f"Received NOTIFY: PID={msg.pid}, Channel={msg.channel}, Payload={msg.payload}")

if __name__ == "__main__":
    start_listening()

3. 配合触发器实现数据变更实时推送

我们可以写一个触发器函数,在表里的数据被插入时自动调用 NOTIFY 把消息发出去。

-- 创建测试表
CREATE TABLE orders (
    id SERIAL PRIMARY KEY,
    product TEXT NOT NULL,
    amount INT NOT NULL
);

-- 定义触发器函数
CREATE OR REPLACE FUNCTION notify_on_order_insert()
RETURNS TRIGGER AS $$
BEGIN
    PERFORM pg_notify('order_channel', json_build_object(
        'id', NEW.id,
        'product', NEW.product,
        'amount', NEW.amount
    )::text);
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- 绑定触发器
CREATE TRIGGER trigger_on_order_insert
AFTER INSERT ON orders
FOR EACH ROW EXECUTE FUNCTION notify_on_order_insert();

这样一来,每次往 orders 表里加一条新记录,所有正在监听 order_channel 的程序都会立刻收到包含完整订单信息的 JSON 字符串。

小结

PostgreSQL 提供的 LISTEN/NOTIFY 功能把发布-订阅这种模式直接嵌入到了数据库内部;虽然它不适合用来处理超高吞吐量的消息场景,但在很多需要快速响应、基于数据库事件驱动的小型或中等规模应用中,它是一个既简单又管用的选择。只要搭配触发器和主流编程语言的数据库驱动,开发者就能很快搭出结构清晰、反应灵敏的实时数据交互功能。