windows下kafka+ELK的日志系统

栏目: 后端 · 发布时间: 7年前

内容简介:用到的软件:zookeeper、kafka、logstash(6.3.2版本)、ES(6.3.2版本)、Kibana(6.3.2版本)。具体安装步骤不在此说明,基本都是下载解压,改一下配置文件,即可使用。(以下所述均在Windows下)1、zookeeper:kafka中自带zookeeper,可以不用装zookeeper,如果想自己另装,需配置环境变量,如下:

用到的软件:zookeeper、kafka、logstash(6.3.2版本)、ES(6.3.2版本)、Kibana(6.3.2版本)。具体安装步骤不在此说明,基本都是下载解压,改一下配置文件,即可使用。(以下所述均在Windows下)

1、zookeeper:

kafka中自带zookeeper,可以不用装zookeeper,如果想自己另装,需配置环境变量,如下:

ZOOKEEPER_HOME => D:\nomalAPP\zookeeper-3.4.13

path 里面加入 %ZOOKEEPER_HOME%\bin

如果配置好以后,在cmd里运行zkserver报找不到 java 错误的话,可能是java环境变量的位置放置有问题,可以将path里面配置的java环境位置移到最前面。</b>

2、kafka:

kafka如果启动报找不到java的错误,原因在于kafka-run-class.bat的179行,找到里面的 %CLASSPATH% ,将其用双引号括起来。即改为:set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp "%CLASSPATH%" %KAFKA_OPTS% %* </b>

kafka的配置文件server.properties里面要修改的:

日志所放置的目录(可以选择用默认的):log.dirs=D:/nomalAPP/kafka_2.12-2.0.0/kafka-logs,

连接zookeeper的ip和端口:zookeeper.connect=localhost:2181

其他的均可用默认配置。</b>

启动命令:cmd锁定安装目录,然后 .\bin\windows\kafka-server-start.bat .\config\server.properties

</b>

3、logstash:

在config目录下,logstash.yml里面,如果想启动多个,在里面可以配置 http.port=9600-9700,如果没有配置这一项,会使用默认的9600端口。

在config目录下,增加一个logstash.conf的配置文件,用来配置数据源,和数据过滤,数据输出的位置,如下:

input {
            #   ==>    用kafka的日志作为数据源
   kafka{  
     ····#(kafka的IP和端口)
        bootstrap_servers => "10.8.22.15:9092"  
        # (在有多个相同类型的数据源时,需要配置)
        client_id => "test1"  
        # (消费者分组,可以通过组 ID 去指定,不同的组之间消费是相互不受影响的,相互隔离)
        group_id => "test1"   
        auto_offset_reset => "latest"    
        # (消费者线程个数)
        consumer_threads => 5   
        # (在输出消息的时候会输出自身的信息包括:消费消息的大小, topic 来源以及 consumer 的 group 信息)
        decorate_events => true    
        # (主题)
        topics => ["bas-binding-topic"]    
        # (用于ES索引)
        type => "bas-binding-topic"  
        # (数据格式)
        codec => "json"     
        tags => ["bas-binding-topic"]
      }  
      kafka{  
        bootstrap_servers => "10.8.22.15:9092"  
        client_id => "test2"  
        group_id => "test2"  
        auto_offset_reset => "latest"  
        consumer_threads => 5
        #decorate_events => true  
        topics => ["bas-cus-request-topic"]  
        type => "bas-cus-request-topic"  
        codec => "json"
        tags => ["bas-cus-request-topic"]
      }  
}

filter {
    filter插件负责过滤解析input读取的数据,可以用grok插件正则解析数据,
    date插件解析日期,json插件解析json等等
}

output {
    # 输出到ES
    if "bas-binding-topic" in [tags] {
       elasticsearch{  
             # ES的IP和端口
            hosts => ["localhost:9201"]  
            # 索引名称:主题+时间
            index => "bas-binding-topic-%{+YYYY.MM.dd+HH:mm:ss}"  
            timeout => 300
        }
        stdout {
            codec => rubydebug
         }
  }
  if "bas-cus-request-topic" in [tags] {
       elasticsearch{  
            hosts => ["localhost:9201"]  
            index => "bas-cus-request-topic-%{+YYYY.MM.dd+HH:mm:ss}"  
            timeout => 300
        }
        stdout {
            codec => rubydebug
         }
  }
}

启动logstash的命令: .\bin\logstash.bat -f .\config\logstash.conf </b>

4、ES:

在config目录下,elasticsearch.yml配置文件中,ES的默认端口为9200,可以通过http.port=9201来修改

启动命令: .\bin\elasticsearch.bat

启动后,在浏览器访问: http://localhost:9201 ,

如果出现了一些信息,表示启动成功。</b>

5、Kibana:

在config目录下,kibana.yml配置文件中,通过elasticsearch.url: " http://localhost:9201 " 来配置地址

启动命令: .\bin\kibana.bat

访问 " http://localhost:9201 " ,如果出现需要输入用户名密码的界面,表示ES启动失败,如果直接显示Kibana界面,则启动成功。</b>

logstash数据输出到ES时,会选择ES默认的映射来解析数据。如果觉得默认映射不满足使用条件,可以自定模板:

借助postman工具创建ES模板,如下:

windows下kafka+ELK的日志系统

收到此响应表示创建成功。然后可以通过GET请求查询你刚才创建的模板,通过DELETE请求删除刚才创建的模板。</b>

向kafka主题中发送日志信息,发送的信息会在cmd窗口显示,如下:

windows下kafka+ELK的日志系统

同时,在Kibana界面会看到相应的索引,索引名称就是在logstash.conf的输出中配置的,如下:

windows下kafka+ELK的日志系统

Visualize:可以选择自己想要的索引去进行图形分析,效果如下:

windows下kafka+ELK的日志系统

Dashboard:将做过的图整合到仪表盘,效果如下:

windows下kafka+ELK的日志系统

另外,可以设置自动刷新,即当有新的数据发送到kafka时,所做的图会自动根据主题来进行刷新。

项目中,可以通过logback来讲日志输出到kafka,具体配置如下:

<appender name="bingdings-log-to-kafka" class="com.github.danielwegener.logback.kafka.KafkaAppender">
      <encoder>
        <pattern>%msg%n</pattern>
      </encoder>
            <!-- 指定kafka主题,消费时要用到-->
      <topic>bas-binding-topic</topic>
      <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy" />
      <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" />
      <!-- 配置输出到指定ip、端口的kafka -->
      <producerConfig>bootstrap.servers=localhost:9092,10.8.22.13:9092</producerConfig>
      <!-- this is the fallback appender if kafka is not available. -->
    </appender>
    <logger name = "bingdings-log-to-kafka" level="INFO" additivity = "false">
       <appender-ref ref="bingdings-log-to-kafka"/>
    </logger>

然后在记日志的时候,记到对应的主题,日志就会写到kafka相应的位置。可以通过消费者来消费主题,查看日志是否成功写入,如下:

@SuppressWarnings("resource")
  public static void main(String[] args) {

    Properties properties = new Properties();
    properties.put("bootstrap.servers", "127.0.0.1:9092");
    properties.put("group.id", "group-1");
    properties.put("enable.auto.commit", "false");
    properties.put("auto.commit.interval.ms", "1000");
    properties.put("auto.offset.reset", "earliest");
    properties.put("session.timeout.ms", "30000");
    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer", 
        "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    kafkaConsumer.subscribe(Arrays.asList("bas-cus-request-topic", "bas-binding-topic"));
    while (true) {
      ConsumerRecords<String, String> records = kafkaConsumer.poll(Long.MAX_VALUE);
      System.err.println("+++++++++++++++++++++++++++++++++++++++++");
      for (ConsumerRecord<String, String> record : records) {
        System.err.println(record.offset() + ">>>>>>" + record.value());
      }
      System.err.println("+++++++++++++++++++++++++++++++++++++++++++++");
      break;
    }
  }

暂时想到的就这些,有建议的,欢迎评论提醒我进行补充,谢谢!


以上所述就是小编给大家介绍的《windows下kafka+ELK的日志系统》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Linux Programming Interface

The Linux Programming Interface

Michael Kerrisk / No Starch Press / 2010-11-6 / GBP 79.99

The Linux Programming Interface describes the Linux API (application programming interface)-the system calls, library functions, and other low-level interfaces that are used, directly or indirectly, b......一起来看看 《The Linux Programming Interface》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具