内容简介:neutron_lbaas.services.loadbalancer.plugin.pyneutron_lbaas.drivers.haproxy.plugin_driver.py
这里参考的是lbaasv2的driver,neutron_lbaas.conf中的service_provider即为lbaasv2的driver:
service_provider=LOADBALANCERV2:Haproxy:neutron_lbaas.drivers.haproxy.plugin_driver.HaproxyOnHostPluginDriver:default
neutron.conf中的service_plugins表示了lbaasv2的plugin:
lbaasv2 = neutron_lbaas.services.loadbalancer.plugin:LoadBalancerPluginv2
plugin
neutron_lbaas.services.loadbalancer.plugin.py
class LoadBalancerPluginv2(loadbalancerv2.LoadBalancerPluginBaseV2):
def __init__(self):
"""Initialization for the loadbalancer service plugin."""
self.db = ldbv2.LoadBalancerPluginDbv2()
driver
neutron_lbaas.drivers.haproxy.plugin_driver.py
class HaproxyOnHostPluginDriver(agent_driver_base.AgentDriverBase):
device_driver = namespace_driver.DRIVER_NAME
agent rpc(plugin向agent发送)
neutron_lbaas.drivers.common.agent_driver_base.py
class LoadBalancerAgentApi
def __init__
plugin rpc(agent向plugin发送)
neutron_lbaas.agent.agent_api.py
class LbaasAgentApi
def __init__
agent 侧的回调
neutron_lbaas.agent.agent_manager.py
class LbaasAgentManager
plugin 侧的回调
neutron_lbaas.drivers.common.agent_driver_base.py
class AgentDriverBase
def _set_callbacks_on_plugin
neutron_lbaas.drivers.common.agent_callbacks.py
class LoadBalancerCallbacks
agent入口函数
neutron_lbaas.agent.agent.py
def main
db处理
neutron_lbaas.db.loadbalancer.loadbalancer_dbv2.py
class LoadBalancerPluginDbv2
流程示例,创建一个Pool
plugin部分
首先进入一个同步流程,即创建pool的数据库信息。
plugin入口
neutron_lbaas.services.loadbalancer.plugin.py
class LoadBalancerPluginv2(loadbalancerv2.LoadBalancerPluginBaseV2):
def create_pool(self, context, pool):
try:
# 创建pool的数据库信息,并绑定listener
db_pool = self.db.create_pool_and_add_to_listener(context, pool,
listener_id)
except Exception as exc:
self.db.update_loadbalancer_provisioning_status(
context, db_listener.loadbalancer.id)
raise exc
# 根据创建lb时的provider来选择driver
driver = self._get_driver_for_loadbalancer(
context, db_pool.listener.loadbalancer_id)
# 将调用driver的pool.create方法,进入driver的父类
# AgentDriverBase
self._call_driver_operation(context, driver.pool.create, db_pool)
# 同步返回了pool的数据库信息
return self.db.get_pool(context, db_pool.id).to_api_dict()
plugin的api及rpc处理
neutron_lbaas.drivers.common.agent_driver_base.py
class AgentDriverBase(driver_base.LoadBalancerBaseDriver):
# name of device driver that should be used by the agent;
# vendor specific plugin drivers must override it;
device_driver = None
def __init__(self, plugin):
super(AgentDriverBase, self).__init__(plugin)
# pool的管理类
self.pool = PoolManager(self)
# agent的rpc处理类
self.agent_rpc = LoadBalancerAgentApi(lb_const.LOADBALANCER_AGENTV2)
# rpc的回调
self._set_callbacks_on_plugin()
# Setting this on the db because the plugin no longer inherts from
# database classes, the db does.
self.plugin.db.agent_notifiers.update(
{lb_const.AGENT_TYPE_LOADBALANCERV2: self.agent_rpc})
# agent的调度driver
lb_sched_driver = provconf.get_provider_driver_class(
cfg.CONF.loadbalancer_scheduler_driver, LB_SCHEDULERS)
self.loadbalancer_scheduler = importutils.import_object(
lb_sched_driver)
class PoolManager(driver_base.BasePoolManager):
def create(self, context, pool):
super(PoolManager, self).delete(context, pool)
# 从数据库中选择了一个agent
agent = self.driver.get_loadbalancer_agent(
context, pool.listener.loadbalancer.id)
# 向这个agent发送create_pool这个任务
# 进入LoadBalancerAgentApi的create_pool
self.driver.agent_rpc.create_pool(context, pool, agent['host'])
class LoadBalancerAgentApi(object):
"""Plugin side of plugin to agent RPC API."""
def __init__(self, topic):
target = messaging.Target(topic=topic, version='1.0')
self.client = n_rpc.get_client(target,
serializer=DataModelSerializer())
def create_pool(self, context, pool, host):
cctxt = self.client.prepare(server=host)
# 向agent发送create_pool的消息
cctxt.cast(context, 'create_pool', pool=pool)
agent(driver)部分
查看入口函数,可见LbaasAgentManager为api的manager
agent的入口
neutron_lbaas.agent.agent.py - main
def main():
# mgr指向LbaasAgentManager
# 创建pool即为create_pool函数
mgr = manager.LbaasAgentManager(cfg.CONF)
svc = LbaasAgentService(
host=cfg.CONF.host,
topic=constants.LOADBALANCER_AGENTV2,
manager=mgr
)
service.launch(cfg.CONF, svc).wait()
agent处理plugin下发的消息
neutron_lbaas.agent.agent_manager.py
class LbaasAgentManager(periodic_task.PeriodicTasks):
def create_pool(self, context, pool):
pool = data_models.Pool.from_dict(pool)
# 根据pool对象的listener.loadbalancer.id获取driver
# 根据配置文件找到device_driver参数的值
driver = self._get_driver(pool.listener.loadbalancer.id)
try:
# driver创建pool
# 如选用默认的haproxy,函数路径如下:
# PoolManager中的create函数
driver.pool.create(pool)
except Exception:
self._handle_failed_driver_call('create', pool, driver.get_name())
else:
# 回报状态
self._update_statuses(pool)
agent将任务下发给driver
neutron_lbaas.drivers.haproxy.namespace_driver.py
NS_PREFIX = 'qlbaas-'
def get_ns_name(namespace_id):
# 返回namespace的名字
return NS_PREFIX + namespace_id
class HaproxyNSDriver(agent_device_driver.AgentDeviceDriver):
def __init__(self, conf, plugin_rpc):
super(HaproxyNSDriver, self).__init__(conf, plugin_rpc)
# 指向了PoolManager
# self(也就是PoolManager中的self.driver)即为HaproxyNSDriver
self._pool = PoolManager(self)
# 指向了LoadBalancerManager
# self(也就是LoadBalancerManager中的self.driver)即为HaproxyNSDriver
self._loadbalancer = LoadBalancerManager(self)
@property
def loadbalancer(self):
return self._loadbalancer
@property
def pool(self):
return self._pool
neutron_lbaas.drivers.haproxy.namespace_driver.py
class PoolManager(agent_device_driver.BasePoolManager):
def create(self, pool):
# 调用了LoadBalancerManager的refresh函数
self.driver.loadbalancer.refresh(pool.listener.loadbalancer)
class LoadBalancerManager(agent_device_driver.BaseLoadBalancerManager):
def refresh(self, loadbalancer):
# 调用了LbaasAgentApi的get_loadbalancer函数
loadbalancer_dict = self.driver.plugin_rpc.get_loadbalancer(
loadbalancer.id)
# 根据返回的lb的dict生成一个lb的object
loadbalancer = data_models.LoadBalancer.from_dict(loadbalancer_dict)
# 部署lb
# 调用了HaproxyNSDriver的deploy_instance函数
if (not self.driver.deploy_instance(loadbalancer) and
self.driver.exists(loadbalancer.id)):
self.driver.undeploy_instance(loadbalancer.id)
driver通过rpc访问plugin侧的db
neutron_lbaas.agent.agent_api.py
class LbaasAgentApi(object):
def get_loadbalancer(self, loadbalancer_id):
cctxt = self.client.prepare()
# 调用了LoadBalancerCallbacks的get_loadbalancer函数
# 返回一个loadbalancer的dict
# 回到LoadBalancerManager的refresh函数
return cctxt.call(self.context, 'get_loadbalancer',
loadbalancer_id=loadbalancer_id)
neutron_lbaas.drivers.common.agent_callbacks.py
class LoadBalancerCallbacks(object):
def get_loadbalancer(self, context, loadbalancer_id=None):
# 这里指向LoadBalancerPluginDbv2的get_loadbalancer函数
lb_model = self.plugin.db.get_loadbalancer(context, loadbalancer_id)
if lb_model.vip_port and lb_model.vip_port.fixed_ips:
for fixed_ip in lb_model.vip_port.fixed_ips:
subnet_dict = self.plugin.db._core_plugin.get_subnet(
context, fixed_ip.subnet_id
)
setattr(fixed_ip, 'subnet', data_models.Subnet.from_dict(
subnet_dict))
if lb_model.provider:
device_driver = self.plugin.drivers[
lb_model.provider.provider_name].device_driver
setattr(lb_model.provider, 'device_driver', device_driver)
# 将这个LoadBalancer的object转换为dict
lb_dict = lb_model.to_dict(stats=False)
# 返回这个字典
# 回到LbaasAgentApi的get_loadbalancer函数
return lb_dict
neutron_lbaas.db.loadbalancer.loadbalancer_dbv2.py
class LoadBalancerPluginDbv2(base_db.CommonDbMixin,
agent_scheduler.LbaasAgentSchedulerDbMixin):
def get_loadbalancer(self, context, id):
lb_db = self._get_resource(context, models.LoadBalancer, id)
# 返回一个LoadBalancer的object
# 回到LoadBalancerCallbacks的get_loadbalancer函数
return data_models.LoadBalancer.from_sqlalchemy_model(lb_db)
driver开始执行实际操作
neutron_lbaas.drivers.haproxy.namespace_driver.py
class HaproxyNSDriver(agent_device_driver.AgentDeviceDriver):
@n_utils.synchronized('haproxy-driver')
def deploy_instance(self, loadbalancer):
# 部署一个lb,如果已经有,则返回True,否则返回False
# deployable函数用于检查是否可以部署
# 如listener是否已经准备就绪或可用,如可以返回True,不可以则返回False
if not self.deployable(loadbalancer):
LOG.info(_LI("Loadbalancer %s is not deployable.") %
loadbalancer.id)
return False
if self.exists(loadbalancer.id):
self.update(loadbalancer)
else:
# 创建,进入create函数
self.create(loadbalancer)
return True
def deployable(self, loadbalancer):
# 如果lb已经active了,返回True,否则返回False
if not loadbalancer:
return False
acceptable_listeners = [
listener for listener in loadbalancer.listeners
if (listener.provisioning_status != constants.PENDING_DELETE and
listener.admin_state_up)]
return (bool(acceptable_listeners) and loadbalancer.admin_state_up and
loadbalancer.provisioning_status != constants.PENDING_DELETE)
# 回到HaproxyNSDriver的deploy_instance函数
def create(self, loadbalancer):
namespace = get_ns_name(loadbalancer.id)
# 挂载网卡,将进入_plug函数
self._plug(namespace, loadbalancer.vip_port, loadbalancer.vip_address)
# 孵化lb,进入_spawn函数
self._spawn(loadbalancer)
def _plug(self, namespace, port, vip_address, reuse_existing=True):
# 调用LbaasAgentApi的plug_vip_port函数,rpc的publisher
self.plugin_rpc.plug_vip_port(port.id)
# tap××××××××-××
interface_name = self.vif_driver.get_device_name(port)
# 如果这个namespace中有这个interface,返回True,否则返回False
if ip_lib.device_exists(interface_name,
namespace=namespace):
if not reuse_existing:
raise exceptions.PreexistingDeviceFailure(
dev_name=interface_name
)
else:
self.vif_driver.plug(
port.network_id,
port.id,
interface_name,
port.mac_address,
namespace=namespace
)
# 根据port的信息建立l3
self.vif_driver.init_l3(interface_name, cidrs, namespace=namespace)
# Haproxy socket binding to IPv6 VIP address will fail if this address
# is not yet ready(i.e tentative address).
if netaddr.IPAddress(vip_address).version == 6:
device = ip_lib.IPDevice(interface_name, namespace=namespace)
device.addr.wait_until_address_ready(vip_address)
gw_ip = port.fixed_ips[0].subnet.gateway_ip
if not gw_ip:
host_routes = port.fixed_ips[0].subnet.host_routes
for host_route in host_routes:
if host_route.destination == "0.0.0.0/0":
gw_ip = host_route.nexthop
break
else:
cmd = ['route', 'add', 'default', 'gw', gw_ip]
# 添加默认路由
ip_wrapper = ip_lib.IPWrapper(namespace=namespace)
# ip netns exec ns env *** route add default gw gw_ip
ip_wrapper.netns.execute(cmd, check_exit_code=False)
# When delete and re-add the same vip, we need to
# send gratuitous ARP to flush the ARP cache in the Router.
gratuitous_arp = self.conf.haproxy.send_gratuitous_arp
if gratuitous_arp > 0:
for ip in port.fixed_ips:
cmd_arping = ['arping', '-U',
'-I', interface_name,
'-c', gratuitous_arp,
ip.ip_address]
# ip netns exec ns env *** arping \
# -U -I interface_name -c gratuitous_arp ip.ip_address
ip_wrapper.netns.execute(cmd_arping, check_exit_code=False)
# 回到create函数
def _spawn(self, loadbalancer, extra_cmd_args=()):
namespace = get_ns_name(loadbalancer.id)
conf_path = self._get_state_file_path(loadbalancer.id, 'haproxy.conf')
pid_path = self._get_state_file_path(loadbalancer.id,
'haproxy.pid')
sock_path = self._get_state_file_path(loadbalancer.id,
'haproxy_stats.sock')
user_group = self.conf.haproxy.user_group
haproxy_base_dir = self._get_state_file_path(loadbalancer.id, '')
jinja_cfg.save_config(conf_path,
loadbalancer,
sock_path,
user_group,
haproxy_base_dir)
cmd = ['haproxy', '-f', conf_path, '-p', pid_path]
cmd.extend(extra_cmd_args)
ns = ip_lib.IPWrapper(namespace=namespace)
# ip netns exec ns env *** haproxy -f conf_path -p pid_path
ns.netns.execute(cmd)
# remember deployed loadbalancer id
self.deployed_loadbalancers[loadbalancer.id] = loadbalancer
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 机器学习库 TensorFlow 1.9.0 发布,常规更新版本
- 机器学习库 TensorFlow 1.9.0 发布,常规更新版本
- ARKit+Swift 版本的机器学习算法 k-NN
- 支持边云协同终身学习特性,KubeEdge 子项目 Sedna 0.3.0 版本发布!
- 腾讯首个 AI 开源项目 Angel 发布 3.0 版本:迈向全栈机器学习平台
- 查看linux内核版本和CentOS版本
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Head First HTML5 Programming
Eric Freeman、Elisabeth Robson / O'Reilly Media / 2011-10-18 / USD 49.99
What can HTML5 do for you? If you're a web developer looking to use this new version of HTML, you might be wondering how much has really changed. Head First HTML5 Programming introduces the key featur......一起来看看 《Head First HTML5 Programming》 这本书的介绍吧!