Liberty版本Neutron LBaas学习

栏目: 服务器 · 发布时间: 7年前

内容简介:neutron_lbaas.services.loadbalancer.plugin.pyneutron_lbaas.drivers.haproxy.plugin_driver.py
Liberty版本Neutron LBaas学习

这里参考的是lbaasv2的driver,neutron_lbaas.conf中的service_provider即为lbaasv2的driver:

service_provider=LOADBALANCERV2:Haproxy:neutron_lbaas.drivers.haproxy.plugin_driver.HaproxyOnHostPluginDriver:default

neutron.conf中的service_plugins表示了lbaasv2的plugin:

lbaasv2 = neutron_lbaas.services.loadbalancer.plugin:LoadBalancerPluginv2

plugin

neutron_lbaas.services.loadbalancer.plugin.py

class LoadBalancerPluginv2(loadbalancerv2.LoadBalancerPluginBaseV2):
    def __init__(self):
        """Initialization for the loadbalancer service plugin."""
        self.db = ldbv2.LoadBalancerPluginDbv2()

driver

neutron_lbaas.drivers.haproxy.plugin_driver.py

class HaproxyOnHostPluginDriver(agent_driver_base.AgentDriverBase):
    device_driver = namespace_driver.DRIVER_NAME

agent rpc(plugin向agent发送)

neutron_lbaas.drivers.common.agent_driver_base.py
class LoadBalancerAgentApi
def __init__

plugin rpc(agent向plugin发送)

neutron_lbaas.agent.agent_api.py
class LbaasAgentApi
def __init__

agent 侧的回调

neutron_lbaas.agent.agent_manager.py
class LbaasAgentManager

plugin 侧的回调

neutron_lbaas.drivers.common.agent_driver_base.py
class AgentDriverBase
def _set_callbacks_on_plugin

neutron_lbaas.drivers.common.agent_callbacks.py
class LoadBalancerCallbacks

agent入口函数

neutron_lbaas.agent.agent.py
def main

db处理

neutron_lbaas.db.loadbalancer.loadbalancer_dbv2.py
class LoadBalancerPluginDbv2

流程示例,创建一个Pool

plugin部分

首先进入一个同步流程,即创建pool的数据库信息。

plugin入口

neutron_lbaas.services.loadbalancer.plugin.py

class LoadBalancerPluginv2(loadbalancerv2.LoadBalancerPluginBaseV2):

    def create_pool(self, context, pool):
        try:
            # 创建pool的数据库信息,并绑定listener
            db_pool = self.db.create_pool_and_add_to_listener(context, pool,
                                                              listener_id)
        except Exception as exc:
            self.db.update_loadbalancer_provisioning_status(
                context, db_listener.loadbalancer.id)
            raise exc
        # 根据创建lb时的provider来选择driver
        driver = self._get_driver_for_loadbalancer(
            context, db_pool.listener.loadbalancer_id)
        # 将调用driver的pool.create方法,进入driver的父类
        # AgentDriverBase
        self._call_driver_operation(context, driver.pool.create, db_pool)
        # 同步返回了pool的数据库信息
        return self.db.get_pool(context, db_pool.id).to_api_dict()

plugin的api及rpc处理

neutron_lbaas.drivers.common.agent_driver_base.py

class AgentDriverBase(driver_base.LoadBalancerBaseDriver):

    # name of device driver that should be used by the agent;
    # vendor specific plugin drivers must override it;
    device_driver = None

    def __init__(self, plugin):
        super(AgentDriverBase, self).__init__(plugin)
        # pool的管理类
        self.pool = PoolManager(self)
        # agent的rpc处理类
        self.agent_rpc = LoadBalancerAgentApi(lb_const.LOADBALANCER_AGENTV2)
        # rpc的回调
        self._set_callbacks_on_plugin()
        # Setting this on the db because the plugin no longer inherts from
        # database classes, the db does.
        self.plugin.db.agent_notifiers.update(
            {lb_const.AGENT_TYPE_LOADBALANCERV2: self.agent_rpc})

        # agent的调度driver
        lb_sched_driver = provconf.get_provider_driver_class(
            cfg.CONF.loadbalancer_scheduler_driver, LB_SCHEDULERS)
        self.loadbalancer_scheduler = importutils.import_object(
            lb_sched_driver)

class PoolManager(driver_base.BasePoolManager):

    def create(self, context, pool):
            super(PoolManager, self).delete(context, pool)
            # 从数据库中选择了一个agent
            agent = self.driver.get_loadbalancer_agent(
                context, pool.listener.loadbalancer.id)
            # 向这个agent发送create_pool这个任务
            # 进入LoadBalancerAgentApi的create_pool
            self.driver.agent_rpc.create_pool(context, pool, agent['host'])

class LoadBalancerAgentApi(object):
    """Plugin side of plugin to agent RPC API."""

    def __init__(self, topic):
        target = messaging.Target(topic=topic, version='1.0')
        self.client = n_rpc.get_client(target,
                                       serializer=DataModelSerializer())

    def create_pool(self, context, pool, host):
            cctxt = self.client.prepare(server=host)
            # 向agent发送create_pool的消息
            cctxt.cast(context, 'create_pool', pool=pool)

agent(driver)部分

查看入口函数,可见LbaasAgentManager为api的manager

agent的入口

neutron_lbaas.agent.agent.py - main

def main():
    # mgr指向LbaasAgentManager
    # 创建pool即为create_pool函数
    mgr = manager.LbaasAgentManager(cfg.CONF)
    svc = LbaasAgentService(
        host=cfg.CONF.host,
        topic=constants.LOADBALANCER_AGENTV2,
        manager=mgr
    )
    service.launch(cfg.CONF, svc).wait()

agent处理plugin下发的消息

neutron_lbaas.agent.agent_manager.py

class LbaasAgentManager(periodic_task.PeriodicTasks):

    def create_pool(self, context, pool):

        pool = data_models.Pool.from_dict(pool)
        # 根据pool对象的listener.loadbalancer.id获取driver
        # 根据配置文件找到device_driver参数的值
        driver = self._get_driver(pool.listener.loadbalancer.id)
        try:
            # driver创建pool
            # 如选用默认的haproxy,函数路径如下:
            # PoolManager中的create函数
            driver.pool.create(pool)
        except Exception:
            self._handle_failed_driver_call('create', pool, driver.get_name())
        else:
            # 回报状态
            self._update_statuses(pool)

agent将任务下发给driver

neutron_lbaas.drivers.haproxy.namespace_driver.py

NS_PREFIX = 'qlbaas-'

def get_ns_name(namespace_id):
    # 返回namespace的名字
    return NS_PREFIX + namespace_id

class HaproxyNSDriver(agent_device_driver.AgentDeviceDriver):

    def __init__(self, conf, plugin_rpc):

        super(HaproxyNSDriver, self).__init__(conf, plugin_rpc)
        # 指向了PoolManager
        # self(也就是PoolManager中的self.driver)即为HaproxyNSDriver
        self._pool = PoolManager(self)
        # 指向了LoadBalancerManager
        # self(也就是LoadBalancerManager中的self.driver)即为HaproxyNSDriver
        self._loadbalancer = LoadBalancerManager(self)

    @property
    def loadbalancer(self):
        return self._loadbalancer

    @property
    def pool(self):
        return self._pool

neutron_lbaas.drivers.haproxy.namespace_driver.py

class PoolManager(agent_device_driver.BasePoolManager):
    def create(self, pool):
        # 调用了LoadBalancerManager的refresh函数
        self.driver.loadbalancer.refresh(pool.listener.loadbalancer)

class LoadBalancerManager(agent_device_driver.BaseLoadBalancerManager):

    def refresh(self, loadbalancer):
        # 调用了LbaasAgentApi的get_loadbalancer函数
        loadbalancer_dict = self.driver.plugin_rpc.get_loadbalancer(
            loadbalancer.id)
        # 根据返回的lb的dict生成一个lb的object
        loadbalancer = data_models.LoadBalancer.from_dict(loadbalancer_dict)
        # 部署lb
        # 调用了HaproxyNSDriver的deploy_instance函数
        if (not self.driver.deploy_instance(loadbalancer) and
                self.driver.exists(loadbalancer.id)):
            self.driver.undeploy_instance(loadbalancer.id)

driver通过rpc访问plugin侧的db

neutron_lbaas.agent.agent_api.py

class LbaasAgentApi(object):
    def get_loadbalancer(self, loadbalancer_id):
        cctxt = self.client.prepare()
        # 调用了LoadBalancerCallbacks的get_loadbalancer函数
        # 返回一个loadbalancer的dict
        # 回到LoadBalancerManager的refresh函数
        return cctxt.call(self.context, 'get_loadbalancer',
                          loadbalancer_id=loadbalancer_id)

neutron_lbaas.drivers.common.agent_callbacks.py

class LoadBalancerCallbacks(object):
    def get_loadbalancer(self, context, loadbalancer_id=None):
        # 这里指向LoadBalancerPluginDbv2的get_loadbalancer函数
        lb_model = self.plugin.db.get_loadbalancer(context, loadbalancer_id)
        if lb_model.vip_port and lb_model.vip_port.fixed_ips:
            for fixed_ip in lb_model.vip_port.fixed_ips:
                subnet_dict = self.plugin.db._core_plugin.get_subnet(
                    context, fixed_ip.subnet_id
                )
                setattr(fixed_ip, 'subnet', data_models.Subnet.from_dict(
                    subnet_dict))
        if lb_model.provider:
            device_driver = self.plugin.drivers[
                lb_model.provider.provider_name].device_driver
            setattr(lb_model.provider, 'device_driver', device_driver)
        # 将这个LoadBalancer的object转换为dict
        lb_dict = lb_model.to_dict(stats=False)

        # 返回这个字典
        # 回到LbaasAgentApi的get_loadbalancer函数
        return lb_dict

neutron_lbaas.db.loadbalancer.loadbalancer_dbv2.py

class LoadBalancerPluginDbv2(base_db.CommonDbMixin,
                             agent_scheduler.LbaasAgentSchedulerDbMixin):
    def get_loadbalancer(self, context, id):
        lb_db = self._get_resource(context, models.LoadBalancer, id)
        # 返回一个LoadBalancer的object
        # 回到LoadBalancerCallbacks的get_loadbalancer函数
        return data_models.LoadBalancer.from_sqlalchemy_model(lb_db)

driver开始执行实际操作

neutron_lbaas.drivers.haproxy.namespace_driver.py

class HaproxyNSDriver(agent_device_driver.AgentDeviceDriver):

    @n_utils.synchronized('haproxy-driver')
    def deploy_instance(self, loadbalancer):
        # 部署一个lb,如果已经有,则返回True,否则返回False
        # deployable函数用于检查是否可以部署
        # 如listener是否已经准备就绪或可用,如可以返回True,不可以则返回False
        if not self.deployable(loadbalancer):
            LOG.info(_LI("Loadbalancer %s is not deployable.") %
                     loadbalancer.id)
            return False

        if self.exists(loadbalancer.id):
            self.update(loadbalancer)
        else:
            # 创建,进入create函数
            self.create(loadbalancer)
        return True

    def deployable(self, loadbalancer):
        # 如果lb已经active了,返回True,否则返回False
        if not loadbalancer:
            return False
        acceptable_listeners = [
            listener for listener in loadbalancer.listeners
            if (listener.provisioning_status != constants.PENDING_DELETE and
                listener.admin_state_up)]
        return (bool(acceptable_listeners) and loadbalancer.admin_state_up and
                loadbalancer.provisioning_status != constants.PENDING_DELETE)
        # 回到HaproxyNSDriver的deploy_instance函数

    def create(self, loadbalancer):
        namespace = get_ns_name(loadbalancer.id)
        # 挂载网卡,将进入_plug函数
        self._plug(namespace, loadbalancer.vip_port, loadbalancer.vip_address)
        # 孵化lb,进入_spawn函数
        self._spawn(loadbalancer)

    def _plug(self, namespace, port, vip_address, reuse_existing=True):
        # 调用LbaasAgentApi的plug_vip_port函数,rpc的publisher
        self.plugin_rpc.plug_vip_port(port.id)

        # tap××××××××-××
        interface_name = self.vif_driver.get_device_name(port)

        # 如果这个namespace中有这个interface,返回True,否则返回False
        if ip_lib.device_exists(interface_name,
                                namespace=namespace):
            if not reuse_existing:
                raise exceptions.PreexistingDeviceFailure(
                    dev_name=interface_name
                )
        else:
            self.vif_driver.plug(
                port.network_id,
                port.id,
                interface_name,
                port.mac_address,
                namespace=namespace
            )

        # 根据port的信息建立l3
        self.vif_driver.init_l3(interface_name, cidrs, namespace=namespace)

        # Haproxy socket binding to IPv6 VIP address will fail if this address
        # is not yet ready(i.e tentative address).
        if netaddr.IPAddress(vip_address).version == 6:
            device = ip_lib.IPDevice(interface_name, namespace=namespace)
            device.addr.wait_until_address_ready(vip_address)

        gw_ip = port.fixed_ips[0].subnet.gateway_ip

        if not gw_ip:
            host_routes = port.fixed_ips[0].subnet.host_routes
            for host_route in host_routes:
                if host_route.destination == "0.0.0.0/0":
                    gw_ip = host_route.nexthop
                    break
        else:
        cmd = ['route', 'add', 'default', 'gw', gw_ip]
        # 添加默认路由
        ip_wrapper = ip_lib.IPWrapper(namespace=namespace)
        # ip netns exec ns env *** route add default gw gw_ip
        ip_wrapper.netns.execute(cmd, check_exit_code=False)
        # When delete and re-add the same vip, we need to
        # send gratuitous ARP to flush the ARP cache in the Router.
        gratuitous_arp = self.conf.haproxy.send_gratuitous_arp
        if gratuitous_arp > 0:
            for ip in port.fixed_ips:
                cmd_arping = ['arping', '-U',
                              '-I', interface_name,
                              '-c', gratuitous_arp,
                              ip.ip_address]
                # ip netns exec ns env *** arping \
                # -U -I interface_name -c gratuitous_arp ip.ip_address
                ip_wrapper.netns.execute(cmd_arping, check_exit_code=False)
        # 回到create函数

    def _spawn(self, loadbalancer, extra_cmd_args=()):
        namespace = get_ns_name(loadbalancer.id)
        conf_path = self._get_state_file_path(loadbalancer.id, 'haproxy.conf')
        pid_path = self._get_state_file_path(loadbalancer.id,
                                             'haproxy.pid')
        sock_path = self._get_state_file_path(loadbalancer.id,
                                              'haproxy_stats.sock')
        user_group = self.conf.haproxy.user_group
        haproxy_base_dir = self._get_state_file_path(loadbalancer.id, '')
        jinja_cfg.save_config(conf_path,
                              loadbalancer,
                              sock_path,
                              user_group,
                              haproxy_base_dir)
        cmd = ['haproxy', '-f', conf_path, '-p', pid_path]
        cmd.extend(extra_cmd_args)

        ns = ip_lib.IPWrapper(namespace=namespace)
        # ip netns exec ns env *** haproxy -f conf_path -p pid_path
        ns.netns.execute(cmd)

        # remember deployed loadbalancer id
        self.deployed_loadbalancers[loadbalancer.id] = loadbalancer

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Head First HTML5 Programming

Head First HTML5 Programming

Eric Freeman、Elisabeth Robson / O'Reilly Media / 2011-10-18 / USD 49.99

What can HTML5 do for you? If you're a web developer looking to use this new version of HTML, you might be wondering how much has really changed. Head First HTML5 Programming introduces the key featur......一起来看看 《Head First HTML5 Programming》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

html转js在线工具
html转js在线工具

html转js在线工具