幸福和富有的过一生

幸福和富有的过一生!


  • 首页

  • 归档

  • 关于我

  • 公益404

  • 搜索

APISIX的route匹配机制

时间: 2022-04-10 分类: apisix   etcd   字数: 3932 字 阅读: 8分钟 阅读次数:

apisix通过etcd作为后端存储,存储了route、service、plugin、upstream等信息,我们看一下如何通过etcd查找路由等信息,如果路由有变化时时如何通知更新的。 apisix也是支持yaml文件存储的,我们主要介绍etcd作为存储。

apisix与etcd交互是通过resty-lua-etcd ,这个也是apisix自己开发并开源的组件。apisix的etcd核心代码都在config_etc.lua 。

启动阶段start

调用etcd.init()根据配置初始化etcd,创建etcd的client,测试验证是否ok。

local function start(env, ...)
    ...
    init(env)
    init_etcd(env, args)   ---调用etcd.init()

    util.execute_cmd(env.openresty_args)
end

-- config_etcd.lua
function _M.init()
    local local_conf, err = config_local.local_conf()
    if not local_conf then
        return nil, err
    end

    if table.try_read_attr(local_conf, "apisix", "disable_sync_configuration_during_start") then
        return true
    end

    local etcd_cli, err = get_etcd()
    if not etcd_cli then
        return nil, "failed to start a etcd instance: " .. err
    end

    local etcd_conf = local_conf.etcd
    local prefix = etcd_conf.prefix
    local res, err = readdir(etcd_cli, prefix, create_formatter(prefix))
    if not res then
        return nil, err
    end

    return true
end

init_worker阶段

在这个阶段调用各个组件模块的init_woker,从etcd中获取router、service、plugin、upstream等信息,我看一下

-- init.lua中http_init_worker在init_worker阶段调用
function _M.http_init_worker()
    ...
    require("apisix.balancer").init_worker()
    load_balancer = require("apisix.balancer")
    require("apisix.admin.init").init_worker()

    require("apisix.timers").init_worker()

    require("apisix.debug").init_worker()

    ...
    plugin.init_worker()
    router.http_init_worker()  --route的init_worker函数
    require("apisix.http.service").init_worker()   --- service的init_worker函数
    plugin_config.init_worker()
    require("apisix.consumer").init_worker()

    apisix_upstream.init_worker()  -- upstream的init_worker
    ...
end

着重看一下route的init_worker 函数

function _M.http_init_worker()
    local conf = core.config.local_conf()
    local router_http_name = "radixtree_uri"
    local router_ssl_name = "radixtree_sni"

    if conf and conf.apisix and conf.apisix.router then
        router_http_name = conf.apisix.router.http or router_http_name
        router_ssl_name = conf.apisix.router.ssl or router_ssl_name
    end

    -- router_http_name引用相应的路由模块apisix/http/router
    local router_http = require("apisix.http.router." .. router_http_name)
    --绑定路由模块的init_worker函数和routes函数,init_worker不存在时就apisix/http/route.lua中的init_worker
    attach_http_router_common_methods(router_http)
    --调用模块的init_worker函数,一般都是apisix/http/route.lua中的init_worker函数
    router_http.init_worker(filter)
    _M.router_http = router_http

    local router_ssl = require("apisix.ssl.router." .. router_ssl_name)
    router_ssl.init_worker()
    _M.router_ssl = router_ssl

    _M.api = require("apisix.api_router")

    local global_rules, err = core.config.new("/global_rules", {
            automatic = true,
            item_schema = core.schema.global_rule,
            checker = plugin_checker,
        })
    if not global_rules then
        error("failed to create etcd instance for fetching /global_rules : "
              .. err)
    end
    _M.global_rules = global_rules
end

我们在跟进router_http.init_worker(filter)这个就是调用的apisix/http/route.lua中的init_worker函数,并将从etcd获取的路由信息存储在http_router.user_routes中,以便access阶段进行路由匹配。

function _M.init_worker(filter)
    local user_routes, err = core.config.new("/routes", {
            automatic = true,
            item_schema = core.schema.route,
            checker = check_route,
            filter = filter,
        })
    if not user_routes then
        error("failed to create etcd instance for fetching /routes : " .. err)
    end

    return user_routes
end

这里core.config.new对应是apisix/core/config_etcd.lua的new函数,因为core.config设置的配置中心etcd,这是获取route的信息因此key是routes,获取到信息存储到user_routes变量中,有变更时就更新这个变量,再次根据etcd的new函数。

function _M.new(key, opts)
    ...
    local obj = setmetatable({
        etcd_cli = nil,
        key = key and prefix .. key,
        automatic = automatic,
        item_schema = item_schema,
        checker = checker,
        sync_times = 0,
        running = true,
        conf_version = 0,    ---设置当前版本函数
        values = nil,
        need_reload = true,
        routes_hash = nil,
        prev_index = 0,
        last_err = nil,
        last_err_time = nil,
        resync_delay = resync_delay,
        health_check_timeout = health_check_timeout,
        timeout = timeout,
        single_item = single_item,
        filter = filter_fun,
    }, mt)

    if automatic then
        if not key then
            return nil, "missing `key` argument"
        end

        if loaded_configuration[key] then
            local res = loaded_configuration[key]
            loaded_configuration[key] = nil -- tried to load

            log.notice("use loaded configuration ", key)

            local dir_res, headers = res.body, res.headers
            load_full_data(obj, dir_res, headers)
        end
        --- 定时函数,定时检查配置是否有变更,变更时就更新
        ngx_timer_at(0, _automatic_fetch, obj)   

    else
        local etcd_cli, err = get_etcd()
        if not etcd_cli then
            return nil, "failed to start a etcd instance: " .. err
        end
        obj.etcd_cli = etcd_cli
    end

    if key then
        created_obj[key] = obj
    end

    return obj
end

etcd的这个new定义对象obj,将每次通过定时处理函数是_automatic_fetch从etcd中获取的放到这个对象的values,并更新conf_version的版本号,我们跟进这个定时函数

local function _automatic_fetch(premature, self)
    if premature then
        return
    end

    if not (health_check.conf and health_check.conf.shm_name) then
        local _, err = health_check.init({
            shm_name = health_check_shm_name,
            fail_timeout = self.health_check_timeout,
            max_fails = 3,
            retry = true,
        })
        if err then
            log.warn("fail to create health_check: " .. err)
        end
    end

    local i = 0
    while not exiting() and self.running and i <= 32 do
        i = i + 1

        local ok, err = xpcall(function()
            if not self.etcd_cli then
                local etcd_cli, err = get_etcd()  -- 获取etcd客户端
                if not etcd_cli then
                    error("failed to create etcd instance for key ["
                          .. self.key .. "]: " .. (err or "unknown"))
                end
                self.etcd_cli = etcd_cli
            end

            local ok, err = sync_data(self)   -- 从etcd中get数据
            if err then
                if string.find(err, err_etcd_unhealthy_all) then
                    local reconnected = false
                    while err and not reconnected and i <= 32 do
                        local backoff_duration, backoff_factor, backoff_step = 1, 2, 6
                        for _ = 1, backoff_step do
                            i = i + 1
                            ngx_sleep(backoff_duration)
                            _, err = sync_data(self)
                            if not err or not string.find(err, err_etcd_unhealthy_all) then
                                log.warn("reconnected to etcd")
                                reconnected = true
                                break
                            end
                            backoff_duration = backoff_duration * backoff_factor
                            log.error("no healthy etcd endpoint available, next retry after "
                                       .. backoff_duration .. "s")
                        end
                    end
                elseif err ~= "timeout" and err ~= "Key not found"
                    and self.last_err ~= err then
                    log.error("failed to fetch data from etcd: ", err, ", ",
                              tostring(self))
                end

                if err ~= self.last_err then
                    self.last_err = err
                    self.last_err_time = ngx_time()
                else
                    if ngx_time() - self.last_err_time >= 30 then
                        self.last_err = nil
                    end
                end

                -- etcd watch timeout is an expected error, so there is no need for resync_delay
                if err ~= "timeout" then
                    ngx_sleep(self.resync_delay + rand() * 0.5 * self.resync_delay)
                end
            elseif not ok then
                -- no error. reentry the sync with different state
                ngx_sleep(0.05)
            end

        end, debug.traceback)

        if not ok then
            log.error("failed to fetch data from etcd: ", err, ", ",
                      tostring(self))
            ngx_sleep(self.resync_delay + rand() * 0.5 * self.resync_delay)
            break
        end
    end

    if not exiting() and self.running then
        ngx_timer_at(0, _automatic_fetch, self)  --再次设置定时器
    end
end

init_worker阶段etcd最重要的功能就是定时回调函数,定时同步etcd中的数据放到user_routes。到这里init_worker介绍完了。

access阶段

这个阶段主要匹配路由和挑选server。

function _M.http_access_phase()
    local ngx_ctx = ngx.ctx
    ...
    -- 匹配路由
    router.router_http.match(api_ctx)
    ...
    -- 挑选server
    local server, err = load_balancer.pick_server(route, api_ctx)
    if not server then
        core.log.error("failed to pick server: ", err)
        return core.response.exit(502)
    end
    ...
end

匹配路由

apisix为了提高查找性能,基于基数树写一套路由查找的插件GitHub - api7/lua-resty-radixtree: Adaptive Radix Trees implemented in Lua / LuaJIT ,match函数主要调用apisix/radixtree_uri.lua at master · apache/apisix · GitHub 中的match,这个可以根据config_default.ymal配置修改用那种路由查找模块,有三个路由查找radixtree_uri.lua、radixtree_host_uri.lua、radixtree_uri_with_parameter.lua,我们先来看一下这个match函数

function _M.match(api_ctx)
    local user_routes = _M.user_routes   --从etcd获取的路由数据
    local _, service_version = get_services()  --service版本
    --user_routes的数据有变更时重新创建radixtree的路由查找树
    if not cached_router_version or cached_router_version ~= user_routes.conf_version
        or not cached_service_version or cached_service_version ~= service_version
    then
        uri_router = base_router.create_radixtree_uri_router(user_routes.values,
                                                             uri_routes, false)
        cached_router_version = user_routes.conf_version
        cached_service_version = service_version
    end

    if not uri_router then
        core.log.error("failed to fetch valid `uri` router: ")
        return true
    end

    --真正路由匹配
    return base_router.match_uri(uri_router, match_opts, api_ctx)
end
``

match函数有两个重要的函数create_radixtree_uri_router和base_router.match_uri,前者主要在从etcd中route信息的user_routes版本有变更时,将route信息user_routes.values重建radixtree的路由查找树,后者是将请求匹配radixtree的路由查找树,查合适的路由信息。

挑选server

挑选server主要是load_balancer.pick_server这个函数主要是调用的apisix/balancer.lua中的pick_server函数

local function pick_server(route, ctx)
    ...
    local server_picker = ctx.server_picker
    if not server_picker then
        -- 根据负载均衡类型选取合适的lru缓存
        server_picker = lrucache_server_picker(key, version,
                                               create_server_picker, up_conf, checker)
    end
    if not server_picker then
        return nil, "failed to fetch server picker"
    end

    local server, err = server_picker.get(ctx)  --合适balancer的get方法
    if not server then
        err = err or "no valid upstream node"
        return nil, "failed to find valid upstream server, " .. err
    end
    ctx.balancer_server = server
    ...
end

这里有两个重要的函数lrucache_server_picker和server_picker.get函数,前者创建balancer的lru缓存,lru的new函数是lrucache.lua的new_lru_fun函数,通过create_obj_fun创建缓存对象,在缓存击穿后为了保证只有一个worker进程取etcd获取数据,使用了resty.lock锁。

local function new_lru_fun(opts)
    local item_count, item_ttl
    if opts and opts.type == 'plugin' then
        item_count = opts.count or PLUGIN_ITEMS_COUNT
        item_ttl = opts.ttl or PLUGIN_TTL
    else
        item_count = opts and opts.count or GLOBAL_ITEMS_COUNT
        item_ttl = opts and opts.ttl or GLOBAL_TTL
    end

    local item_release = opts and opts.release
    local invalid_stale = opts and opts.invalid_stale
    local serial_creating = opts and opts.serial_creating
    local lru_obj = lru_new(item_count)

    return function (key, version, create_obj_fun, ...)
        if not serial_creating or not can_yield_phases[get_phase()] then
            local cache_obj = fetch_valid_cache(lru_obj, invalid_stale,
                                item_ttl, item_release, key, version)
            if cache_obj then
                return cache_obj.val
            end

            local obj, err = create_obj_fun(...)  --回调函数就是create_server_picker
            if obj ~= nil then
                lru_obj:set(key, {val = obj, ver = version}, item_ttl)
            end

            return obj, err
        end

        local cache_obj = fetch_valid_cache(lru_obj, invalid_stale, item_ttl,
                            item_release, key, version)
        if cache_obj then
            return cache_obj.val
        end

        local lock, err = resty_lock:new(lock_shdict_name)
        if not lock then
            return nil, "failed to create lock: " .. err
        end

        local key_s = tostring(key)
        log.info("try to lock with key ", key_s)

        local elapsed, err = lock:lock(key_s)
        if not elapsed then
            return nil, "failed to acquire the lock: " .. err
        end

        cache_obj = fetch_valid_cache(lru_obj, invalid_stale, item_ttl,
                        nil, key, version)
        if cache_obj then
            lock:unlock()
            log.info("unlock with key ", key_s)
            return cache_obj.val
        end

        local obj, err = create_obj_fun(...)
        if obj ~= nil then
            lru_obj:set(key, {val = obj, ver = version}, item_ttl)
        end
        lock:unlock()
        log.info("unlock with key ", key_s)

        return obj, err
    end
end

create_obj_fun的回调函数是create_server_picker

local function create_server_picker(upstream, checker)  --upstream是从etcd中获取数据,ctx.upstream_conf
    local picker = pickers[upstream.type]
    if not picker then
        -- 根据不同负载均衡算法挑选合适负载均衡模块,upstream.type是负载算法类型
        pickers[upstream.type] = require("apisix.balancer." .. upstream.type)
        picker = pickers[upstream.type]
    end

    if picker then
        local nodes = upstream.nodes
        local addr_to_domain = {}
        for _, node in ipairs(nodes) do
            if node.domain then
                local addr = node.host .. ":" .. node.port
                addr_to_domain[addr] = node.domain
            end
        end

        local up_nodes = fetch_health_nodes(upstream, checker)

        if #up_nodes._priority_index > 1 then
            core.log.info("upstream nodes: ", core.json.delay_encode(up_nodes))
            local server_picker = priority_balancer.new(up_nodes, upstream, picker)
            server_picker.addr_to_domain = addr_to_domain
            return server_picker
        end

        core.log.info("upstream nodes: ",
                      core.json.delay_encode(up_nodes[up_nodes._priority_index[1]]))
        -- 调用balancer的new方法
        local server_picker = picker.new(up_nodes[up_nodes._priority_index[1]], upstream)
        server_picker.addr_to_domain = addr_to_domain
        return server_picker
    end

    return nil, "invalid balancer type: " .. upstream.type, 0
end

pickers变量存放负载均衡算法的模块,apisix支持的负载算法roundrobin、chash、least_conn,这些实现代码模块都在apisix/balancer目录,picker.new是balancer对象的new方法。我们看一下roundrobin的balancer

function _M.new(up_nodes, upstream)  --new方法
    local safe_limit = 0
    for _, weight in pairs(up_nodes) do
        -- the weight can be zero
        safe_limit = safe_limit + weight + 1
    end

    local picker = roundrobin:new(up_nodes)
    local nodes_count = nkeys(up_nodes)
    return {
        upstream = upstream,
        get = function (ctx)   --get方法
            if ctx.balancer_tried_servers and ctx.balancer_tried_servers_count == nodes_count then
                return nil, "all upstream servers tried"
            end

            local server, err
            for i = 1, safe_limit do
                server, err = picker:find()
                if not server then
                    return nil, err
                end
                if ctx.balancer_tried_servers then
                    if not ctx.balancer_tried_servers[server] then
                        break
                    end
                else
                    break
                end
            end

            return server
        end,
        after_balance = function (ctx, before_retry)  --after_balance方法
            if not before_retry then
                if ctx.balancer_tried_servers then
                    core.tablepool.release("balancer_tried_servers", ctx.balancer_tried_servers)
                    ctx.balancer_tried_servers = nil
                end

                return nil
            end

            if not ctx.balancer_tried_servers then
                ctx.balancer_tried_servers = core.tablepool.fetch("balancer_tried_servers", 0, 2)
            end

            ctx.balancer_tried_servers[ctx.balancer_server] = true
            ctx.balancer_tried_servers_count = (ctx.balancer_tried_servers_count or 0) + 1
        end,
        before_retry_next_priority = function (ctx)  -- before_retry_next_priority方法
            if ctx.balancer_tried_servers then
                core.tablepool.release("balancer_tried_servers", ctx.balancer_tried_servers)
                ctx.balancer_tried_servers = nil
            end

            ctx.balancer_tried_servers_count = 0
        end,
    }
end

前面的server_picker.get就是调用这里的get方法。apisix中的upstream server放到lru的缓存中,所有的worker共享这个缓存,缓存击穿后通过resty.lock保证只有一个worker去etcd中获取数据。

到这里apisix从etcd获取数据,有变更时更新缓存数据的流程介绍完了。

#apisix# #etcd#
QQ扫一扫交流

标题:APISIX的route匹配机制

链接:/post/apisix-etcd/

作者:lizj3624

声明: 本博客文章除特别声明外,均采用 CC BY-NC-SA 3.0许可协议,转载请注明出处!

创作实属不易,如有帮助,那就打赏博主些许茶钱吧 ^_^
WeChat Pay

微信打赏

Alipay

支付宝打赏

APISIX的控制面
2021年中国云服务市场规模及格局
  • 文章目录
  • 站点概览
lizj3624

lizj3624

幸福和富有的过一生!

93 日志
90 分类
107 标签
GitHub 知乎
标签云
  • Cloudnative 13
  • 财报 11
  • Kubernetes 10
  • Nginx 9
  • Apisix 7
  • 美股 6
  • Video 5
  • 宏观经济 5
  • 投资 5
  • Linux 4
    • 启动阶段start
    • init_worker阶段
    • access阶段
© 2010 - 2024 幸福和富有的过一生
Powered by - Hugo v0.124.1 / Theme by - NexT
/
Storage by /
0%