内容简介:在之前的篇幅中--但是随之也产生了一个问题。即,ip出口地址大多数落在了大城市,并且相对集中,其导致根据其转换的经纬度所绘制的地图点太集中,不能够反映真实的用户所在位置。因此,新的需求从天而降!我们需要使用客户端上报的经纬度来绘制地图。
在之前的篇幅中-- EFK 配置geo-ip落地实践 ,已经介绍过如何配置geo插件到fluentd中,实现ip地址转经纬度的实例。
但是随之也产生了一个问题。即,ip出口地址大多数落在了大城市,并且相对集中,其导致根据其转换的经纬度所绘制的地图点太集中,不能够反映真实的用户所在位置。
因此,新的需求从天而降!我们需要使用客户端上报的经纬度来绘制地图。
在浏览了所有fluentd的插件后。居然没有解析request body的插件。 fluentd插件查询地址: www.fluentd.org/plugins/all
没办法,自己动手写一个。
插件编写首先,我们可以明确,fluentd的filter数据是顺序流向下一个的。 由于我也是第一次编写 Ruby 程序。所以先看一下项目结构:
其中比较重要的是:
-
filter_parse_request_body.rb (fluentd的filter插件编写,用于拦截数据,中途修改数据)
-
out_parse_request_body.rb (fluentd的out插件编写,用于最后格式化输出数据)
-
parse_request_body_extractor.rb (最终执行拆解数据的类)
-
fluent-plugin-parse_request_body.gemspec (项目的配置文件,包括项目名称,版本号等)
下面,直接上代码:
- filter_parse_request_body.rb
require "fluent/plugin/filter" require "fluent/plugin/parse_request_body_extractor" module Fluent::Plugin class ParseRequestBodyFilter < Fluent::Plugin::Filter #注册filter名称# Fluent::Plugin.register_filter('parse_request_body', self) #request body在record中所对应的key# desc "point a key whose value contains body string." config_param :key, :string #需要使用array_value替换数据的key# desc "point a key who will be replaced." config_param :replace_key, :string, default: nil #需要被组合在一起的数据名称# desc "If set, the key/value will be reformd array whose would be added to the record." config_param :array_value, :string, default: nil #最终组合数据在record中的key# desc "array_value's key in record" config_param :array_value_key, :string, default: nil #request body解析数据白名单# desc "If set, only the key/value whose key is included only will be added to the record." config_param :only, :string, default: nil #request body解析数据黑名单# desc "If set, the key/value whose key is included except will NOT be added to the record." config_param :except, :string, default: nil #是否只保留最终解析出来的数据,而删除request body原数据# desc "If set to true, the original key url will be discarded from the record." config_param :discard_key, :bool, default: false #给解析出的数据key添加前缀# desc "Prefix of fields." config_param :add_field_prefix, :string, default: nil #是否允许解析空key# desc "If set to true, permit blank key." config_param :permit_blank_key, :bool, default: false #初始化解析器# def configure(conf) super @extractor = Fluent::Plugin::ParseRequestBodyExtractor.new(self, conf) end #执行解析工作# def filter(tag, time, record) @extractor.add_query_params_field(record) end end end 复制代码
- out_parse_request_body.rb (fluentd的out插件编写,用于最后格式化输出数据)
require "fluent/plugin/output" require "fluent/plugin/parse_request_body_extractor" module Fluent::Plugin class ParseRequestBodyOutput < Fluent::Plugin::Output include Fluent::HandleTagNameMixin #注册output名称# Fluent::Plugin.register_output('parse_request_body', self) helpers :event_emitter #request body在record中所对应的key# desc "point a key whose value contains body string." config_param :key, :string #需要使用array_value替换数据的key# desc "point a key who will be replaced." config_param :replace_key, :string, default: nil #需要被组合在一起的数据名称# desc "If set, the key/value will be reformd array whose would be added to the record." config_param :array_value, :string, default: nil #最终组合数据在record中的key# desc "array_value's key in record" config_param :array_value_key, :string, default: nil #request body解析数据白名单# desc "If set, only the key/value whose key is included only will be added to the record." config_param :only, :string, default: nil #request body解析数据黑名单# desc "If set, the key/value whose key is included except will NOT be added to the record." config_param :except, :string, default: nil #是否只保留最终解析出来的数据,而删除request body原数据# desc "If set to true, the original key url will be discarded from the record." config_param :discard_key, :bool, default: false #给解析出的数据key添加前缀# desc "Prefix of fields." config_param :add_field_prefix, :string, default: nil #是否允许解析空key# desc "If set to true, permit blank key." config_param :permit_blank_key, :bool, default: false #初始化解析器# def configure(conf) super @extractor = Fluent::Plugin::ParseRequestBodyExtractor.new(self, conf) end def multi_workers_ready? true end #执行解析工作# def filter_record(tag, time, record) record = @extractor.add_query_params_field(record) super(tag, time, record) end def process(tag, es) es.each do |time, record| t = tag.dup filter_record(t, time, record) router.emit(t, time, record) end end end end 复制代码
- parse_request_body_extractor.rb (最终执行拆解数据的类)
require 'uri' require 'cgi/util' require 'webrick' module Fluent::Plugin class ParseRequestBodyExtractor attr_reader :log #初始化解析器# def initialize(plugin, conf) @log = plugin.log if plugin.is_a?(Fluent::Plugin::Output) unless have_tag_option?(plugin) raise Fluent::ConfigError, "out_parse_request_body: At least one of remove_tag_prefix/remove_tag_suffix/add_tag_prefix/add_tag_suffix is required to be set." end end #从配置中读取配置选项# @key = plugin.key @only = plugin.only @except = plugin.except @discard_key = plugin.discard_key @add_field_prefix = plugin.add_field_prefix @permit_blank_key = plugin.permit_blank_key @array_value = plugin.array_value @array_value_key = plugin.array_value_key @replace_key = plugin.replace_key #初始化白名单# if @only @include_keys = @only.split(/\s*,\s*/).inject({}) do |hash, i| hash[i] = true hash end end #初始化黑名单# if @except @exclude_keys = @except.split(/\s*,\s*/).inject({}) do |hash, i| hash[i] = true hash end end #初始化需要被组合的key# if @array_value_key if @array_value @include_array_value = @array_value.split(/\s*,\s*/).inject({}) do |hash, i| hash[i] = true hash end end end end #解析方法# def add_query_params_field(record) return record unless record[@key] add_query_params(record[@key], record) replace_record_by_key(record) if @replace_key record.delete(@key) if @discard_key record end private #替换record中某一个键值# def replace_record_by_key(record) return record unless record[@replace_key] replace_value = record[@array_value_key] empty_value = replace_value.select {|item| item == 0 } if replace_value && (empty_value.size != replace_value.size) record[@replace_key] = replace_value end end def have_tag_option?(plugin) plugin.remove_tag_prefix || plugin.remove_tag_suffix || plugin.add_tag_prefix || plugin.add_tag_suffix end def create_field_key(field_key) if add_field_prefix? "#{@add_field_prefix}#{field_key}" else field_key end end def add_field_prefix? !!@add_field_prefix end def permit_blank_key? @permit_blank_key end def add_query_params(body, record) return if body.nil? placeholder = [] body.split('&').each do |pair| key, value = pair.split('=', 2).map { |i| CGI.unescape(i) } next if (key.nil? || key.empty?) && (!permit_blank_key? || value.nil? || value.empty?) key ||= '' value ||= '' new_key = create_field_key(key) if @only record[new_key] = value if @include_keys.has_key?(key) elsif @except record[new_key] = value if !@exclude_keys.has_key?(key) else record[new_key] = value end if @include_array_value placeholder[placeholder.size] = value.to_f if @include_array_value.has_key?(key) end end unless placeholder.empty? record[@array_value_key] = placeholder end end end end 复制代码
- fluent-plugin-parse_request_body.gemspec (项目的配置文件,包括项目名称,版本号等)
Gem::Specification.new do |gem| gem.name = 'fluent-plugin-parse_request_body' gem.version = '0.0.18' gem.authors = ['EkiSong'] gem.email = ['yifriday0614@gmail.com'] gem.homepage = 'https://github.com/yifriday/fluent-plugin-parse_request_body.git' gem.description = %q{Fluentd plugin to parse request body.} gem.summary = %q{Fluentd plugin to parse request body} gem.license = 'MIT' gem.files = `git ls-files`.split($\) gem.executables = gem.files.grep(%r{^bin/}).map{ |f| File.basename(f) } gem.test_files = gem.files.grep(%r{^(test|spec|features)/}) gem.require_paths = ['lib'] if defined?(RUBY_VERSION) && RUBY_VERSION > '2.2' gem.add_development_dependency "test-unit", '~> 3' end gem.add_development_dependency 'rake' gem.add_development_dependency 'appraisal' gem.add_runtime_dependency 'fluentd', ['>= 0.14.8', '< 2'] end 复制代码
##插件制作
在结束编码工作之后,使用gem制作上传插件。
ruby所写的插件,都可以上传到这个网站rubygems.org/
下面是在命令行中的相关操作
➜ fluent-plugin-parse_request_body git:(master) gem build fluent-plugin-parse_request_body.gemspec Successfully built RubyGem Name: fluent-plugin-parse_request_body Version: 0.0.18 File: fluent-plugin-parse_request_body-0.0.18.gem ➜ fluent-plugin-parse_request_body git:(master) gem push fluent-plugin-parse_request_body-0.0.18.gem Pushing gem to https://rubygems.org... Successfully registered gem: fluent-plugin-parse_request_body (0.0.18) 复制代码
在成功上传插件后。我们就可以使用gem工具,在我们的服务器中安装了
插件安装和配置
安装直接执行下面语句即可:
/usr/sbin/td-agent-gem install fluent-plugin-parse_request_body 复制代码
而在td-agent.conf中的配置如下:
<filter nginx.**> @type geoip geoip_lookup_key remote geoip_database /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-geoip-1.2.0/data/GeoLiteCity.dat geoip2_database /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-geoip-1.2.0/data/GeoLite2-City.mmdb <record> city ${city.names.zh-CN["remote"]} # skip adding fields if this field is null country ${country.iso_code["remote"]} country_name ${country.names.zh-CN["remote"]} location '[${location.longitude["remote"]},${location.latitude["remote"]}]' </record> </filter> <filter nginx.**> @type parse_request_body key request_body #discard_key true add_field_prefix body. only lat,lng array_value lat,lng array_value_key locatonGPS replace_key location </filter> 复制代码
注意:由于我们需要在没有客户端经纬度的情况下,依然使用ip转换,并且filter是单向数据流,所以 parse_request_body一定要配置在geo下方。
至此,数据地图的数据处理部分第二阶段工作结束。
后续,在目前的需求上,此插件需要优化一点:
组合数据,使用标签动态配置,想geo中配置location一样,而不是现在死配置这种比较low的做法。
感兴趣的同学,可以看我的github:
以上所述就是小编给大家介绍的《EFK 配置geo-ip落地实践(二)fluentd插件编写》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。