CatCoding

Kong 源码分析:事件

2017-07-23

Kong 的缓存更新很多依赖于事件,而事件看起来是相对来说比较复杂、也是最有趣的一部分。

worker 模型

假设我们对 Kong 做了一个更改的请求,这个请求通常是通过 admin_api 这个路由处理的。也就是说最终执行数据库操作的动作是在一个 Nginx worker 进程里。因为操作了数据库所以我们需要刷新这个 Kong 节点的所有 worker 的缓存,而且要把事件分发给其他 Kong 节点,让其他 Kong 节点刷新所有 worker 的缓存。

kong-messagekong-message

这就涉及到两部分:

  1. Kong 节点之间的消息通信,这是使用serf来实现的
  2. Kong 每个节点内部,也就是 Nginx worker 之间的通信,这是使用lua-resty-worker-events来进行。

发布订阅模式

发布订阅是实现事件的一种经典设计模式,主要需要有两类操作:

  1. 发布消息
  2. 订阅消息,收到消息后触发指定的函数。

Kong 使用的是一个叫作 mediator_lua,mediator 中文意思为”中间人”,很符合项目的意思。可以看到 kong/core/events.lua 里面实现如下:

function Events:subscribe(event_name, fn)
  if fn then
    self._mediator:subscribe({event_name}, function(message_t)
      fn(message_t)
      return nil, true -- Required to tell mediator to continue processing other events
    end)
  end
end

function Events:publish(event_name, message_t)
  if event_name then
    self._mediator:publish({string.upper(event_name)}, message_t)
  end
end

Kong.init 初始化的时候会调用一个叫做 attach_hooks 的函数:

attach_hooks(events, require "kong.core.hooks")

在 load 插件的时候也会把插件对应 hooks 绑定上:

-- Attaching hooks
local ok, hooks = utils.load_module_if_exists("kong.plugins." .. plugin .. ".hooks")
if ok then
  attach_hooks(events, hooks)
end

事件的来源

上面说过,Kong 节点之间通信是通过serf发送的。我们来看看事件是如何触发发出通知的。
事件来于源数据库的修改,那就应该在数据库代码部分有触发事件的代码,查看 dao/dao.lua 这个文件里的代码,我们可以看到在 insert、update、insert 执行的时候都调用了一行代码

event(self, event_types.ENTITY_DELETED, k, v.schema, entity)

这个函数的实现如下,这里做了数据的序列化,然后发布了一种叫做 CLUSTER_PROGATE 类型的消息:

local function event(self, type, table, schema, data_t)
  if self.events_handler then
    .....  执行数据序列化
    self.events_handler:publish(self.events_handler.TYPES.CLUSTER_PROPAGATE, payload)
  end
end

在 core/hooks.lua 接受消息部分,events.TYPES.CLUSTER_PROPAGATE 对应的处理部分是 singletons.serf:event(message_t),所以我们看 serf.lua 这个源文件,最终 event 调用了 invoke_signal,这个函数会运行一个 serf 命令,类似于这样:

serf event -coalesce=false -rpc-addr=127.0.0.1:7373  kong '{"type":"ENTITY_UPDATED","primary_key":["id"],"collection":"apis","entity":{"id":"94acca76-d61a-429e-86a9-5abf2c61ee31"}}'

这就出发了一个 serf event,其他 Kong 节点会收到此消息。

serf: Kong 节点之间通信

那么 Kong 节点收到消息之后是如何处理的呢?Kong 在启动的时候会在后台执行一个 serf 进程,类似这样:

serf agent -profile wan -bind 0.0.0.0:7946 -log-level err -rpc-addr 127.0.0.1:7373 -event-handler member-join,member-leave,member-failed,member-update,member-reap,user:kong=/usr/local/kong/serf/serf_event.sh -node Kang.local_0.0.0.0:7946_be3b9352808e4839a272f30ca6025650

可以看看 serf_event.sh 这个脚本,内容如下:

PAYLOAD=`cat` # Read from stdin
if [ "$SERF_EVENT" != "user" ]; then
  PAYLOAD="{\"type\":\"${SERF_EVENT}\",\"entity\": \"${PAYLOAD}\"}"
fi

CMD="\
local http = require 'resty.http' \
local client = http.new() \
client:set_timeout(5000) \
client:connect('127.0.0.1', 8001) \
client:request { \
  method = 'POST', \
  path = '/cluster/events/', \
  body = [=[${PAYLOAD}]=], \
  headers = { \
    ['content-type'] = 'application/json' \
  } \
}"

/usr/local/openresty/bin/resty -e "$CMD"

可以看到 serf 收到消息后会触发这个脚本,然后把消息发送到本节点的/cluster/events 这个路由。api/routes/cluster.lua 这个文件里有收到消息后的处理代码,其中最关键的是:

-- Trigger event in the node
ev.post(constants.CACHE.CLUSTER, message_t.type, message_t)

就是通过 resty.worker.events publish 出收到的消息,本节点的 worker 会处理这些消息。

worker 刷新缓存

假设当前 Kong 节点收到一个消息,这个消息是如何分发给各个 worker 的?从代码看出,在 Kong 初始化的时候有调用一个叫做 kong.lua 里面的 Kong.init_worker() 函数,其中有一段代码注册了 event handler:

local worker_events = require "resty.worker.events"

local handler = function(data, event, source, pid)  
  if data and data.collection == "apis" then    
    assert(core.build_router())

  elseif source and source == constants.CACHE.CLUSTER then    
    singletons.events:publish(event, data)
  end
end

worker_events.register(handler)

可以从上面的 handler 代码看到,一个 worker 接收到消息之后执行的是:

singletons.events:publish(event, data)

也就是通过 mediator_lua 再把消息 publish。之前初始化的时候已经 attach_hooks 了各种 handler,这时候那些注册的函数才会被最终执行,比如核心的刷新缓存部分代码在 core/hooks.lua 的 invalidate 函数里面。

回顾

总的来说 Kong 事件部分的代码相当精妙,也很统一。比如当前 worker 做了修改,这个事件会发送给各个节点,包括当前自己所在的节点。通过发布订阅模式,写代码的时候只需关心消息发送、接受消息索要处理的逻辑。

公号同步更新,欢迎关注👻
Tags: Lua