Kong 的缓存更新很多依赖于事件,而事件看起来是相对来说比较复杂、也是最有趣的一部分。
worker 模型
假设我们对 Kong 做了一个更改的请求,这个请求通常是通过 admin_api 这个路由处理的。也就是说最终执行数据库操作的动作是在一个 Nginx worker 进程里。因为操作了数据库所以我们需要刷新这个 Kong 节点的所有 worker 的缓存,而且要把事件分发给其他 Kong 节点,让其他 Kong 节点刷新所有 worker 的缓存。
这就涉及到两部分:
- Kong 节点之间的消息通信,这是使用serf来实现的
- Kong 每个节点内部,也就是 Nginx worker 之间的通信,这是使用lua-resty-worker-events来进行。
发布订阅模式
发布订阅是实现事件的一种经典设计模式,主要需要有两类操作:
- 发布消息
- 订阅消息,收到消息后触发指定的函数。
Kong 使用的是一个叫作 mediator_lua,mediator 中文意思为”中间人”,很符合项目的意思。可以看到 kong/core/events.lua 里面实现如下:
function Events:subscribe(event_name, fn)
if fn then
self._mediator:subscribe({event_name}, function(message_t)
fn(message_t)
return nil, true -- Required to tell mediator to continue processing other events
end)
end
end
function Events:publish(event_name, message_t)
if event_name then
self._mediator:publish({string.upper(event_name)}, message_t)
end
end
Kong.init 初始化的时候会调用一个叫做 attach_hooks 的函数:
attach_hooks(events, require "kong.core.hooks")
在 load 插件的时候也会把插件对应 hooks 绑定上:
-- Attaching hooks
local ok, hooks = utils.load_module_if_exists("kong.plugins." .. plugin .. ".hooks")
if ok then
attach_hooks(events, hooks)
end
事件的来源
上面说过,Kong 节点之间通信是通过serf发送的。我们来看看事件是如何触发发出通知的。
事件来于源数据库的修改,那就应该在数据库代码部分有触发事件的代码,查看 dao/dao.lua 这个文件里的代码,我们可以看到在 insert、update、insert 执行的时候都调用了一行代码
event(self, event_types.ENTITY_DELETED, k, v.schema, entity)
这个函数的实现如下,这里做了数据的序列化,然后发布了一种叫做 CLUSTER_PROGATE 类型的消息:
local function event(self, type, table, schema, data_t)
if self.events_handler then
..... 执行数据序列化
self.events_handler:publish(self.events_handler.TYPES.CLUSTER_PROPAGATE, payload)
end
end
在 core/hooks.lua 接受消息部分,events.TYPES.CLUSTER_PROPAGATE 对应的处理部分是 singletons.serf:event(message_t),所以我们看 serf.lua 这个源文件,最终 event 调用了 invoke_signal,这个函数会运行一个 serf 命令,类似于这样:
serf event -coalesce=false -rpc-addr=127.0.0.1:7373 kong '{"type":"ENTITY_UPDATED","primary_key":["id"],"collection":"apis","entity":{"id":"94acca76-d61a-429e-86a9-5abf2c61ee31"}}'
这就出发了一个 serf event,其他 Kong 节点会收到此消息。
serf: Kong 节点之间通信
那么 Kong 节点收到消息之后是如何处理的呢?Kong 在启动的时候会在后台执行一个 serf 进程,类似这样:
serf agent -profile wan -bind 0.0.0.0:7946 -log-level err -rpc-addr 127.0.0.1:7373 -event-handler member-join,member-leave,member-failed,member-update,member-reap,user:kong=/usr/local/kong/serf/serf_event.sh -node Kang.local_0.0.0.0:7946_be3b9352808e4839a272f30ca6025650
可以看看 serf_event.sh 这个脚本,内容如下:
PAYLOAD=`cat` # Read from stdin
if [ "$SERF_EVENT" != "user" ]; then
PAYLOAD="{\"type\":\"${SERF_EVENT}\",\"entity\": \"${PAYLOAD}\"}"
fi
CMD="\
local http = require 'resty.http' \
local client = http.new() \
client:set_timeout(5000) \
client:connect('127.0.0.1', 8001) \
client:request { \
method = 'POST', \
path = '/cluster/events/', \
body = [=[${PAYLOAD}]=], \
headers = { \
['content-type'] = 'application/json' \
} \
}"
/usr/local/openresty/bin/resty -e "$CMD"
可以看到 serf 收到消息后会触发这个脚本,然后把消息发送到本节点的/cluster/events 这个路由。api/routes/cluster.lua 这个文件里有收到消息后的处理代码,其中最关键的是:
-- Trigger event in the node
ev.post(constants.CACHE.CLUSTER, message_t.type, message_t)
就是通过 resty.worker.events publish 出收到的消息,本节点的 worker 会处理这些消息。
worker 刷新缓存
假设当前 Kong 节点收到一个消息,这个消息是如何分发给各个 worker 的?从代码看出,在 Kong 初始化的时候有调用一个叫做 kong.lua 里面的 Kong.init_worker() 函数,其中有一段代码注册了 event handler:
local worker_events = require "resty.worker.events"
local handler = function(data, event, source, pid)
if data and data.collection == "apis" then
assert(core.build_router())
elseif source and source == constants.CACHE.CLUSTER then
singletons.events:publish(event, data)
end
end
worker_events.register(handler)
可以从上面的 handler 代码看到,一个 worker 接收到消息之后执行的是:
singletons.events:publish(event, data)
也就是通过 mediator_lua 再把消息 publish。之前初始化的时候已经 attach_hooks 了各种 handler,这时候那些注册的函数才会被最终执行,比如核心的刷新缓存部分代码在 core/hooks.lua 的 invalidate 函数里面。
回顾
总的来说 Kong 事件部分的代码相当精妙,也很统一。比如当前 worker 做了修改,这个事件会发送给各个节点,包括当前自己所在的节点。通过发布订阅模式,写代码的时候只需关心消息发送、接受消息索要处理的逻辑。