EFK 配置geo-ip落地实践(二)fluentd插件编写

栏目: Ruby · 发布时间: 6年前

内容简介:在之前的篇幅中--但是随之也产生了一个问题。即,ip出口地址大多数落在了大城市,并且相对集中,其导致根据其转换的经纬度所绘制的地图点太集中,不能够反映真实的用户所在位置。因此,新的需求从天而降!我们需要使用客户端上报的经纬度来绘制地图。

在之前的篇幅中-- EFK 配置geo-ip落地实践 ,已经介绍过如何配置geo插件到fluentd中,实现ip地址转经纬度的实例。

但是随之也产生了一个问题。即,ip出口地址大多数落在了大城市,并且相对集中,其导致根据其转换的经纬度所绘制的地图点太集中,不能够反映真实的用户所在位置。

因此,新的需求从天而降!我们需要使用客户端上报的经纬度来绘制地图。

在浏览了所有fluentd的插件后。居然没有解析request body的插件。 fluentd插件查询地址: www.fluentd.org/plugins/all

没办法,自己动手写一个。

插件编写首先,我们可以明确,fluentd的filter数据是顺序流向下一个的。 由于我也是第一次编写 Ruby 程序。所以先看一下项目结构:

EFK 配置geo-ip落地实践(二)fluentd插件编写

其中比较重要的是:

  1. filter_parse_request_body.rb (fluentd的filter插件编写,用于拦截数据,中途修改数据)

  2. out_parse_request_body.rb (fluentd的out插件编写,用于最后格式化输出数据)

  3. parse_request_body_extractor.rb (最终执行拆解数据的类)

  4. fluent-plugin-parse_request_body.gemspec (项目的配置文件,包括项目名称,版本号等)

下面,直接上代码:

  1. filter_parse_request_body.rb
require "fluent/plugin/filter"
require "fluent/plugin/parse_request_body_extractor"

module Fluent::Plugin
  class ParseRequestBodyFilter < Fluent::Plugin::Filter

    #注册filter名称#
    Fluent::Plugin.register_filter('parse_request_body', self)

    #request body在record中所对应的key#
    desc "point a key whose value contains body string."
    config_param :key,    :string
    #需要使用array_value替换数据的key#
    desc "point a key who will be replaced."
    config_param :replace_key,    :string, default: nil
    #需要被组合在一起的数据名称#
    desc "If set, the key/value will be reformd array whose would be added to the record."
    config_param :array_value,   :string, default: nil
    #最终组合数据在record中的key#
    desc "array_value's key in record"
    config_param :array_value_key,   :string, default: nil
    #request body解析数据白名单#
    desc "If set, only the key/value whose key is included only will be added to the record."
    config_param :only,   :string, default: nil
    #request body解析数据黑名单#
    desc "If set, the key/value whose key is included except will NOT be added to the record."
    config_param :except, :string, default: nil
    #是否只保留最终解析出来的数据,而删除request body原数据#
    desc "If set to true, the original key url will be discarded from the record."
    config_param :discard_key, :bool, default: false
    #给解析出的数据key添加前缀#
    desc "Prefix of fields."
    config_param :add_field_prefix, :string, default: nil
    #是否允许解析空key#
    desc "If set to true, permit blank key."
    config_param :permit_blank_key, :bool, default: false

    #初始化解析器#
    def configure(conf)
      super
      @extractor = Fluent::Plugin::ParseRequestBodyExtractor.new(self, conf)
    end

    #执行解析工作#
    def filter(tag, time, record)
      @extractor.add_query_params_field(record)
    end
  end
end
复制代码
  1. out_parse_request_body.rb (fluentd的out插件编写,用于最后格式化输出数据)
require "fluent/plugin/output"
require "fluent/plugin/parse_request_body_extractor"

module Fluent::Plugin
  class ParseRequestBodyOutput < Fluent::Plugin::Output
    include Fluent::HandleTagNameMixin

    #注册output名称#
    Fluent::Plugin.register_output('parse_request_body', self)

    helpers :event_emitter

    #request body在record中所对应的key#
    desc "point a key whose value contains body string."
    config_param :key,    :string
    #需要使用array_value替换数据的key#
    desc "point a key who will be replaced."
    config_param :replace_key,    :string, default: nil
    #需要被组合在一起的数据名称#
    desc "If set, the key/value will be reformd array whose would be added to the record."
    config_param :array_value,   :string, default: nil
    #最终组合数据在record中的key#
    desc "array_value's key in record"
    config_param :array_value_key,   :string, default: nil
    #request body解析数据白名单#
    desc "If set, only the key/value whose key is included only will be added to the record."
    config_param :only,   :string, default: nil
    #request body解析数据黑名单#
    desc "If set, the key/value whose key is included except will NOT be added to the record."
    config_param :except, :string, default: nil
    #是否只保留最终解析出来的数据,而删除request body原数据#
    desc "If set to true, the original key url will be discarded from the record."
    config_param :discard_key, :bool, default: false
    #给解析出的数据key添加前缀#
    desc "Prefix of fields."
    config_param :add_field_prefix, :string, default: nil
    #是否允许解析空key#
    desc "If set to true, permit blank key."
    config_param :permit_blank_key, :bool, default: false

    #初始化解析器#
    def configure(conf)
      super
      @extractor = Fluent::Plugin::ParseRequestBodyExtractor.new(self, conf)
    end

    def multi_workers_ready?
      true
    end

    #执行解析工作#
    def filter_record(tag, time, record)
      record = @extractor.add_query_params_field(record)
      super(tag, time, record)
    end

    def process(tag, es)
      es.each do |time, record|
        t = tag.dup
        filter_record(t, time, record)
        router.emit(t, time, record)
      end
    end
  end
end
复制代码
  1. parse_request_body_extractor.rb (最终执行拆解数据的类)
require 'uri'
require 'cgi/util'
require 'webrick'

module Fluent::Plugin
  class ParseRequestBodyExtractor

    attr_reader :log
        
    #初始化解析器#
    def initialize(plugin, conf)
      @log = plugin.log

      if plugin.is_a?(Fluent::Plugin::Output)
        unless have_tag_option?(plugin)
          raise Fluent::ConfigError, "out_parse_request_body: At least one of remove_tag_prefix/remove_tag_suffix/add_tag_prefix/add_tag_suffix is required to be set."
        end
      end

      #从配置中读取配置选项#
      @key = plugin.key
      @only = plugin.only
      @except = plugin.except
      @discard_key = plugin.discard_key
      @add_field_prefix = plugin.add_field_prefix
      @permit_blank_key = plugin.permit_blank_key
      @array_value = plugin.array_value
      @array_value_key = plugin.array_value_key
      @replace_key = plugin.replace_key

      #初始化白名单#
      if @only
        @include_keys = @only.split(/\s*,\s*/).inject({}) do |hash, i|
          hash[i] = true
          hash
        end
      end

      #初始化黑名单#
      if @except
        @exclude_keys = @except.split(/\s*,\s*/).inject({}) do |hash, i|
          hash[i] = true
          hash
        end
      end

      #初始化需要被组合的key#
      if @array_value_key
        if @array_value
          @include_array_value = @array_value.split(/\s*,\s*/).inject({}) do |hash, i|
            hash[i] = true
            hash
          end
        end
      end

    end

    #解析方法#
    def add_query_params_field(record)
      return record unless record[@key]
      add_query_params(record[@key], record)
      replace_record_by_key(record) if @replace_key
      record.delete(@key) if @discard_key
      record
    end

    private

    #替换record中某一个键值#
    def replace_record_by_key(record)
      return record unless record[@replace_key]
      replace_value = record[@array_value_key]
      empty_value = replace_value.select {|item| item == 0 }
      if replace_value && (empty_value.size != replace_value.size)
        record[@replace_key] = replace_value
      end
    end

    def have_tag_option?(plugin)
      plugin.remove_tag_prefix ||
        plugin.remove_tag_suffix ||
        plugin.add_tag_prefix    ||
        plugin.add_tag_suffix
    end

    def create_field_key(field_key)
      if add_field_prefix?
        "#{@add_field_prefix}#{field_key}"
      else
        field_key
      end
    end

    def add_field_prefix?
      !!@add_field_prefix
    end

    def permit_blank_key?
      @permit_blank_key
    end

    def add_query_params(body, record)
      return if body.nil?
      placeholder = []
      body.split('&').each do |pair|
        key, value = pair.split('=', 2).map { |i| CGI.unescape(i) }
        next if (key.nil? || key.empty?) && (!permit_blank_key? || value.nil? || value.empty?)
        key ||= ''
        value ||= ''

        new_key = create_field_key(key)
        if @only
          record[new_key] = value if @include_keys.has_key?(key)
        elsif @except
          record[new_key] = value if !@exclude_keys.has_key?(key)
        else
          record[new_key] = value
        end

        if @include_array_value
          placeholder[placeholder.size] = value.to_f if @include_array_value.has_key?(key)
        end
      end

      unless placeholder.empty?
        record[@array_value_key] = placeholder
      end
    end
  end
end
复制代码
  1. fluent-plugin-parse_request_body.gemspec (项目的配置文件,包括项目名称,版本号等)
Gem::Specification.new do |gem|
  gem.name          = 'fluent-plugin-parse_request_body'
  gem.version       = '0.0.18'
  gem.authors       = ['EkiSong']
  gem.email         = ['yifriday0614@gmail.com']
  gem.homepage      = 'https://github.com/yifriday/fluent-plugin-parse_request_body.git'
  gem.description   = %q{Fluentd plugin to parse request body.}
  gem.summary       = %q{Fluentd plugin to parse request body}
  gem.license       = 'MIT'

  gem.files         = `git ls-files`.split($\)
  gem.executables   = gem.files.grep(%r{^bin/}).map{ |f| File.basename(f) }
  gem.test_files    = gem.files.grep(%r{^(test|spec|features)/})
  gem.require_paths = ['lib']

  if defined?(RUBY_VERSION) && RUBY_VERSION > '2.2'
    gem.add_development_dependency "test-unit", '~> 3'
  end

  gem.add_development_dependency 'rake'
  gem.add_development_dependency 'appraisal'
  gem.add_runtime_dependency     'fluentd', ['>= 0.14.8', '< 2']
end
复制代码

##插件制作

在结束编码工作之后,使用gem制作上传插件。

ruby所写的插件,都可以上传到这个网站rubygems.org/

下面是在命令行中的相关操作

➜  fluent-plugin-parse_request_body git:(master) gem build fluent-plugin-parse_request_body.gemspec 
  Successfully built RubyGem
  Name: fluent-plugin-parse_request_body
  Version: 0.0.18
  File: fluent-plugin-parse_request_body-0.0.18.gem
➜  fluent-plugin-parse_request_body git:(master) gem push fluent-plugin-parse_request_body-0.0.18.gem
Pushing gem to https://rubygems.org...
Successfully registered gem: fluent-plugin-parse_request_body (0.0.18)
复制代码

在成功上传插件后。我们就可以使用gem工具,在我们的服务器中安装了

插件安装和配置

安装直接执行下面语句即可:

/usr/sbin/td-agent-gem install fluent-plugin-parse_request_body
复制代码

而在td-agent.conf中的配置如下:

<filter nginx.**>
  @type geoip
    geoip_lookup_key remote
    geoip_database /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-geoip-1.2.0/data/GeoLiteCity.dat
    geoip2_database /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-geoip-1.2.0/data/GeoLite2-City.mmdb
    <record>
     city         ${city.names.zh-CN["remote"]}     # skip adding fields if this field is null
     country      ${country.iso_code["remote"]}
     country_name ${country.names.zh-CN["remote"]}
     location '[${location.longitude["remote"]},${location.latitude["remote"]}]'
    </record>
</filter>

<filter nginx.**>
  @type parse_request_body
  key                request_body
  #discard_key        true
  add_field_prefix   body.
  only lat,lng
  array_value lat,lng
  array_value_key locatonGPS
  replace_key location
</filter>
复制代码

注意:由于我们需要在没有客户端经纬度的情况下,依然使用ip转换,并且filter是单向数据流,所以 parse_request_body一定要配置在geo下方。

至此,数据地图的数据处理部分第二阶段工作结束。

后续,在目前的需求上,此插件需要优化一点:

组合数据,使用标签动态配置,想geo中配置location一样,而不是现在死配置这种比较low的做法。

感兴趣的同学,可以看我的github:

github.com/yifriday/fl…


以上所述就是小编给大家介绍的《EFK 配置geo-ip落地实践(二)fluentd插件编写》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Computational Geometry

Computational Geometry

Mark de Berg、Otfried Cheong、Marc van Kreveld、Mark Overmars / Springer / 2008-4-16 / USD 49.95

This well-accepted introduction to computational geometry is a textbook for high-level undergraduate and low-level graduate courses. The focus is on algorithms and hence the book is well suited for st......一起来看看 《Computational Geometry》 这本书的介绍吧!

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具