内容简介:为什么要搞这个东西呢,因为自己希望能够通过这个“课题”提高自己的渗透技术、提高自己的应急响应能力,同时提升监测平台的有效检测率。听起来感觉不错的样子,但是目前没有在互联网上发现类似的环境,所以就只能自己动手来搭建这套系统,希望这篇文章能够给让大家多一点想法,多一点思路。当然本文肯定会有不足之处,也希望大家多多提出意见,有则改进、无则勉之。既然是一个课题,是一个连载系列,所以后面还会有其他文章输出,那作为第一个开篇的文章,我要在这里给大家简单介绍下这个课程大纲 。目前规划主要有四个大章节。
*本文原创作者:gncao,本文属于FreeBuf原创奖励计划,未经许可禁止转载
一、前言
为什么要搞这个东西呢,因为自己希望能够通过这个“课题”提高自己的渗透技术、提高自己的应急响应能力,同时提升监测平台的有效检测率。听起来感觉不错的样子,但是目前没有在互联网上发现类似的环境,所以就只能自己动手来搭建这套系统,希望这篇文章能够给让大家多一点想法,多一点思路。当然本文肯定会有不足之处,也希望大家多多提出意见,有则改进、无则勉之。
二、连载介绍
既然是一个课题,是一个连载系列,所以后面还会有其他文章输出,那作为第一个开篇的文章,我要在这里给大家简单介绍下这个课程大纲 。目前规划主要有四个大章节。
第一章:攻防环境介绍&搭建(也就是本篇文章)
第二章:主机入侵&响应
2.1 ssh端口爆破 2.2 ftp端口爆破 2.3 mysql 服务爆破 2.4 telnet服务爆破 …
第三章:Web应用入侵&响应
3.1 sql 注入 3.2 xss攻击 3.3 文件上传 3.4 命令执行 …
第四章:后门/木马入侵&响应
3.1 后门入侵&检测 3.2 挖矿病毒入侵&检测 …
三、环境介绍
因为资源的限制,所以目前整套系统都在虚拟机环境中搭建。整个环境包含三台虚拟机(未来应该会扩展到四台虚拟机),一台攻击主机、两台受害主机、一台监测主机。拓扑图如下(没有visio工具,画的很丑,大家将就着看吧):
看图可能有些同学看不出来是个啥玩意,我来解释下。
attacker作为一个攻击者会对受害者1和受害者2分别发起攻击(两种操作系统),然后事先我们在两个受害者主机上分别安装了flume客户端,flume会采集操作系统、应用软件的日志信息实时传送给observer监控主机,监控主机上运行着监控平台,通过加载检测规则分析日志,发现攻击行为并发出告警信息,当然最后在observer主机上会有展示页面,展示攻击行为。
通过这样一套环境,我们需要掌握攻击技能、响应能力和检测能力,所以对于个人的技术提升有很大的帮助。
各个虚拟机的详细信息如下表所示:
主机名称 | 角色 | IP地址 | 操作系统 |
---|---|---|---|
Attacker | 攻击机 | 192.168.171.1 | Win10 |
Victim1 | 受害主机1 | 192.168.171.121 | Centos7_64 |
Victim2 | 受害主机2 | 192.168.171.122 | WinServer2012_64 |
Observer | 监测主机 | 192.168.171.120 | Centos7_64 |
四、环境搭建
目前第一阶段先不启用victim2虚拟机,第一阶段力求搞懂 Linux 环境下的攻防,所以整个环境就需要从三个虚拟机开始搭建。
4.1 Attacker
为了保证资源的合理运用,攻击机就使用物理主机,IP地址和操作系统均符合上述要求。
4.2 Victim1
受害主机在未来需要安装许多程序软件,用来作为一个攻击靶机,包含ssh、mysql、apache、ftp等等;同时也需要安装flume客户端采集服务器日志信息;
4.2.1 安装系统
安装系统比较简单,只是在选择镜像文件的时候没有直接选择centos7镜像文件,而是选择稍后安装,同时在启动虚拟机后选择安装的类型是minimal模式,即没有图形化的最简约模式。
4.2.2 给用户授予sudo权限
在centos7安装的时候会创建一个账户(可选),但是创建完的账户是没有sudo权限的,安装一些程序的时候就比较麻烦,所以先给这个账户授予sudo权限。
1)首先找到sudoers文件,一般都在/etc/sudoers这个位置:
whereis sudoers
2)修改该文件权限为可写状态:
chmod -v u+w /etc/sudoers
3)编辑文件内容:
在root ALL=(ALL)ALL这一行下面增加新的一行
victim ALL=(ALL)ALL
这里表示victim用户将会具备sudoers权限。
4)还原sudoers文件权限:
chmod -v u-w /etc/sudoers
4.2.3 更新yum源并安装ssh服务
这时候就切到普通用户victim下执行下面的命令:
sudo yum -y update sudo yum -y install openssh-server
一般安装目录都是/etc/sshd/。
启动ssh:service sshd start。
4.2.4 安装 java 并配置环境变量
安装flume前必须要先安装java环境,且对Java的版本要求有限制,这里我使用的jdk是1.8的,flume是1.6.0-cdh5.7.0的。
1)分别在home目录下创建app和software文件夹,app是程序安装目录,software是程序安装包目录:
mkdir app mkdir software
2)解压software目录下的jdk文件到app目录下:
tar zxvf jdk-8u131-linux-x64.tar.gz -C ../app/
3)配置环境变量
Vim ~/.bahsrc 增加如下信息:
#jdk export JAVA_HOME=/home/victim/app/jdk1.8.0_131 export JRE_HOME=${JAVA_HOME}/jre export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib export PATH=${JAVA_HOME}/bin:$PATH
保存退出后激活环境变量:
Source ~/.bashrc
在命令行下输入java命令能够查看到配置选项信息则表示java环境安装成功。
4.2.5 安装flume并配置环境变量
flume的作用就是从服务器这里采集日志数据传送给logger或者其他服务器。类似的客户端还有filebeat等。
1)解压flume文件到app目录下:
tar zxvf flume-ng-1.6.0-cdh5.7.0.tar.gz -C../app/
2)配置环境变量
Vim ~/.bahsrc 增加如下信息:
#flume exportFLUME_HOME=/home/hadoop/app/apache-flume-1.6.0-cdh5.7.0-bin export PATH=${JAVA_HOME}/bin:$PATH
保存退出后激活环境变量:
Source ~/.bashrc
在命令行下输入echo $FLUME_HOME命令能够查看到flume的安装目录则表示成功安装。
4.2.6 配置flume-env.sh
这个文件在启动flume的时候有用到,需要配置该文件中java环境变量:
1)切换到flume的conf目录下,拷贝flume-env.sh.template文件并重名为flume-env.sh;
2)编辑flume-env.sh文件。
取消export JAVA_HOME前面的注释,并修改JAVA_HOME变量指向JAVA目录:
export JAVA_HOME=/home/victim/app/jdk1.8.0_131
4.2.7 检查flume是否能够正常启动
1)在flume的conf目录下,新建example.conf文件,这个文件是flume的配置文件,配置监听的数据源和指定数据输出,内容如下:
#a1:agent名称 #r1:数据源名称 #k1:sink的名称 #c1:channel的名称 # example.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = victim a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory # Bind the source and sink to the channel #一个source可以输出到多个channel,所以是channels a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
这里的配置文件表示监听victim主机的44444端口并输出到logger窗口,保存文件并退出。
2)启动agent
切换到flume的bin目录下,执行如下命令:
./flume-ng agent --name a1 --conf $FLUME_HOME/conf--conf-file $FLUME_HOME/conf/example.conf -Dflume.root.logger=INFO,console
a1表示的是agent名称,若出现如下信息,表示agent启动成功。其中console表示接收到的数据显示在命令行窗口。
3)模拟数据来源
新开命令行窗口,输入命令:
telnet victim 44444
并在命令行中随意输入信息,这些信息将成为数据源传送到logger窗口:
4)检查是否成功
若在logger窗口下看到telnet窗口的输入数据则表示flume安装成功:
至此,victim的安装工作基本完成,后续再陆续安装其他程序。在跟observer联动的时候flume配置文件需要更改,更改内容后续会有介绍。
4.3 Observer
observer作为一台监控主机上面运行着监控程序,用来监控日志中是否存在攻击行为并发出告警信息。
在这台主机上需要安装flume接收victim的日志(其实可以直接用kafka接收victim的日志),kafka做数据传输,spark streaming做数据实时分析,MySQL存储处理后的数据,django做数据展示。
其中核心的是spark streaming其中的规则处置,规则的好坏决定着系统的误报和漏报情况。
4.3.1 安装系统
4.3.2 给用户授予sudo权限
4.3.3 更新yum源并安装ssh
4.3.4 安装java环境
4.3.5 安装flume
这五个步骤的安装方法同victim服务器一致,下面是observer的安装重点。
4.3.6 安装zookeeper
在安装kafka之前要先安装zookeeper,因为kafka的集群管理都是通过zookeeper来实现的。
1)下载zookeeper并解压到app目录下:
tar zxvf zookeeper-3.4.5-cdh5.7.0.tar.gz -C ../app/
2)配置zookeeper的环境变量
vim ~/.bashrc,增加如下内容:
#zk export ZK_HOME=/home/hadoop/app/zookeeper-3.4.5-cdh5.7.0 export PATH=${ZK_HOME}/bin:$PATH
保存推出后,激活当前环境变量:
source ~/.bashrc
然后命令行下输入 echo $ZK_HOME如果能够正常输出zookeeper目录则表示环境变量配置成功。
3)配置zookeeper
切换到zookeeper的conf目录下,拷贝zoo_sample.cfg文件为zoo.cfg文件,并编辑zoo.cfg文件。
修改datadir为自定义的文件目录,如果不修改该选项则有可能存在默认的文件目录被删除的情况。
dataDir=/home/hadoop/app/tmp/zk
因为资源有限所以只能单机模式运行zookeeper和kafka。
4)启动zookeeper
切换到zookeeper的bin目录下,输入如下命令启动zookeeper:
./zkServer.sh start
如果出现如下所示信息则表示zookeeper启动成功:
4.3.7 安装kafka
安装完zookeeper之后就可以安装kafka了,配置过程也不麻烦。
1)下载kafka并解压到app目录下:
tar zxvf kafka_2.11-0.9.0.0.tgz -C ../app/
2)配置kafka环境变量
vim ~/.bashrc,增加如下内容:
#kafka export KAFKA_HOME=/home/hadoop/app/kafka_2.11-0.9.0.0 export PATH=${KAFKA_HOME}/bin:$PATH
保存退出后,激活当前环境变量:
source ~/.bashrc
然后命令行下输入 echo $KAFKA_HOME如果能够正常输出kafka目录则表示环境变量配置成功。
3)配置server.properties文件
切换到kafka的config目录下,编辑server.properties文件,编辑配置如下选项:
broker.id=0 #唯一标识broker身份的id信息 listeners=PLAINTEXT://:9092 #表示监听端口是9092 host.name=observer #hostname表示当前主机名称 log.dirs=/home/hadoop/app/tmp/kafka-logs #指定kafka的log日志目录 num.partitions=1 #指定分区数量 zookeeper.connect=observer:2181 #配置zookeeper的地址和端口
配置完成后保存退出
4)启动kafka
运行bin目录下的kafka-server-start.sh文件:
bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
输入完命令后会显示如下信息:
新开一个窗口输入jps -m命令,查看如果存在如下进程则表示程序已经正常启动。
5)创建topic
一个业务一个topic,在正式使用kafka之前我们必须要配置topic,我们先创建一个topic来测试程序能否正常工作:
bin/kafka-topics.sh --create --zookeeper observer:2181 --replication-factor 1 --partitions 1 --topic testtopic
这里表示创建一个名为testtopic的话题。
查看是否创建成功,新开窗口输入如下命令:
bin/kafka-topics.sh --list --zookeeper observer:2181
如果出现testtopic即表明话题创建成功:
6)topic消息测试
kafka整体架构分为consumer和producer,consumer表示消费消息端,producer表示生产消息端。在测试之前我们要先生产消息才能消费消息。
首先我们输入如下命令生产消息:
bin/kafka-console-producer.sh --broker-list observer:9092 --topic testtopic
然后我们输入如下命令消费消息:
bin/kafka-console-consumer.sh --zookeeper observer:2181 --topic testtopic --from-beginning
#from-beginning表示从头开始接收消息。
两个窗口的程序在运行以后就会停在窗口不动,当我们在生产端输入任意字符都会在消费端显示的时候,表示我们的kafka可以正常运行了。
生产端:
消费端:
从上面可以看出我们的kafka已经正常运行了。
因为资源有限只能搞单机部署,如果各位老板有钱可以多搞几台主机做冗余,但是一定要注意kafka部署的主机数量一定是2n+1台主机。
7)kafka和flume联动
flume采集到的数据是需要通过kafka传输的,所以它们之间的联动必须要有效才可以。这里我们还是以上面那个testtopic话题为测试对象。
7.1)首先启动kafka,开启consumer消费指定的topic消息
bin/kafka-console-consumer.sh --zookeeper observer:2181 --topic testtopic --from-beginning
7.2)然后修改victim主机上的flume配置文件,修改source监听源为data.log文件,sink输出类型为avro sink(这个类型能跟其他flume客户端联动)。具体配置如下:
# example.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source #监听某文件 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /home/hadoop/data/data.log a1.sources.r1.shell=/bin/sh -c # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = observer a1.sinks.k1.port = 44445 # Use a channel which buffers events in memory a1.channels.c1.type = memory # Bind the source and sink to the channel #一个source可以输出到多个channel,所以是channels a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
上面表示数据来源于data.log文件,输出到observer的44445端口。
7.3)修改observer主机的flume配置文件,配置文件内容如下:
# example.conf: A single-node Flume configuration # Name the components on this agent a2.sources = r2 a2.sinks = k2 a2.channels = c2 # Describe/configure the source #监听某文件 a2.sources.r2.type = avro a2.sources.r2.bind = observer a2.sources.r2.port=44445 # export to kafka a2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink a2.sinks.k2.brokerList = observer:9092 a2.sinks.k2.topic = testtopic a2.sinks.k2.requiredAcks = 1 a2.sinks.k2.batchSize = 5 # Use a channel which buffers events in memory a2.channels.c2.type = memory # Bind the source and sink to the channel #一个source可以输出到多个channel,所以是channels a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2
sink type类型为KafkaSink。
topic为我们之前创建的topic名称,batchSize表示当记录数到达5个后再发送。
7.4)先启动observer的flume程序,然后再启动victim的flume程序,即先监听再发送数据。
7.4.1)先启动observer的flume程序
../bin/flume-ng agent --name a2 --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/example-kafka.conf -Dflume.root.logger=INFO,console
7.4.2)再启动victim的flume程序
../bin/flume-ng agent --name a1 --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/example-send.conf -Dflume.root.logger=INFO,console
7.4.3)然后我们再向文件/home/hadoop/data/data.log输入测试文件时,如果在kafka的监听窗口看到我们输入的信息时则表示双击联动成功。
切换到kafka窗口看到我们在data.log文件中新增的内容:
至此我们的kafka已经安装成功且已经能够跟flume进行联动。下面就是重要数据处理和规则匹配环节了。
4.3.8 安装spark
spark环境安装也是比较麻烦的,需要有耐心才可以,但是跟着步骤走一定会搞定的。
1)scala安装
安装spark之前必须要安装scala和hadoop等环境。首先我们先安装scala语言。
1.1)下载scala安装包并解压到app目录下:
tar zxvf scala-2.11.8.tgz -C ../app/
1.2)配置环境变量
vim ~/.bashrc并加入如下内容:
#scala export SCALA_HOME=/home/hadoop/app/scala-2.11.8 export PATH=${SCALA_HOME}/bin:$PATH
保存文件后退出,source ~/.bashrc 激活环境变量后在命令行输入scala出现scala交互窗口则表示安装配置成功。
2)hadoop环境搭建
2.1)下载解压hadoop安装包到app目录下。
tar zxvf hadoop-2.6.0-cdh5.7.0.tar.gz -C ../app/
2.2)配置环境变量
vim ~/.bashrc并加入如下内容:
#hadoop export HADOOP_HOME=/home/hadoop/app/hadoop-2.6.0-cdh5.7.0 export PATH=${HADOOP_HOME}/bin:$PATH
保存文件后退出,source ~/.bashrc 激活环境变量。
2.3)修改hadoop配置文件
切换到Hadoop的etc/hadoop/目录下,编辑hadoop-env.sh文件,配置java环境变量:
export JAVA_HOME=/home/hadoop/app/jdk1.8.0_131
保存退出后编辑同级目录下的core-site.xml文件,增加如下配置信息:
<configuration> <property> <name>fs.defaultFS</name> <value>hdfs://observer:8020</value> </property> <property> <name>hadoop.tmp.dir</name> <value>/home/hadoop/app/tmp</value> </property> </configuration>
配置文件指定fs服务器地址和临时目录。
保存退出后编辑统计目录下的hdf-site.xml文件,增加如下配置信息:
<configuration> <property> <name>dfs.replication</name> <value>1</value> </property> </configuration>
保存退出编辑slaves文件,增加主机名后保存退出。
2.4)格式化
切换到bin目录下,执行如下命令:
./hdfs namenode -format
执行完成后会在上面指定的tmp目录下生成dfs文件夹。
2.5)启动hadoop服务
切换到sbin目录下,执行如下命令启动hadoop服务:
./start-dfs.sh
然后输入三次当前账户密码后即可启动服务,在浏览器输入ip:50070后如果在浏览器中出现如下信息则表示hadoop安装成功:
2.5)配置yarn
yarn作为Hadoop的资源调度守护进程,同样也需要安装。
切换到etc/hadoop/目录下,编辑mapred-site.xml文件,增加如下内容,指定框架为yarn:
<configuration> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> </configuration>
2.6)启动yarn
切换到sbin目录下,执行./start-yarn.sh命令。
同样的输入完密码后,启动yarn。
启动完成后在浏览器输入ip:8088后如果出现如下所示信息则表示yarn配置成功:
jps -m查看进程信息也表示Hadoop和yarn配置成功。
3)spark安装
3.1)首先下载源代码编译适合hadoop版本的,然后再安装编译完之后的包。
3.2)这里我编译的是spark2.2.0的,解压该文件到app目录下并配置环境变量:
tar zxvf spark-2.2.0-bin-2.6.0-cdh5.7.0.tgz -C ../app/
vim ~/.bashrc 增加如下内容:
#spark export SPARK_HOME=/home/hadoop/app/spark-2.2.0-bin-2.6.0-cdh5.7.0 export PATH=${SPARK_HOME}/bin:$PATH
保存退出后source ~/.bashrc 激活环境变量。
3.3)检查是否安装成功
切换到bin目录下执行./spark-shell –master local[2] 命令,如果进入scala环境则表示安装成功。
4.3.9 安装MySQL
安装MySQL的目的是将spark streaming的处理结果存储下来,然后通过django程序对结果做展示,MySQL安装直接通过sudo yum -y install mysql-server命令来执行安装即可。
4.3.10安装django
django程序的安装直接使用pip或者easy_install命令来安装即可:
pip install django
django安装完成后可以使用django-admin.py文件来创建一个django项目:
django-admin.py startproject attackview
然后创建一个app:
cd attackview python manage.py startapp sparkmysql
app创建完毕后需要切换到attackview目录下,并编辑settings.py文件,指定数据库为mysql:
DATABASES = { 'default': { 'ENGINE': 'django.db.backends.mysql', 'NAME': 'echart', 'USER':'root', 'PASSWORD':'root@qwe', 'HOST':'127.0.0.1', 'PORT':'3306', } }
保存后退出,然后编辑urls.py文件指定视图和函数之间的关系,保证当有新的url请求后django能够正常处理。
这里的细节不再赘述,包括上面各个程序,如果大家有疑问可以联系我或者百度查询解决方案。
这样我们的攻防环境基本上就完成了,那最后我们要做个flume+kafka+spark+mysql的联动测试,如果mysql数据库中能够存储flume客户端采集后被处理的数据,那表明我们的环境联动没有问题了。
4.3.11 联动测试
1)首先配置flume,监听某日志文件,鉴于文件篇幅,这里只贴上部分代码,大体思路跟flume配置环节一致。
# Describe/configure the source #监听某端口 a2.sources.r2.type = exec a2.sources.r2.command = tail -F /home/hadoop/source/access.log a2.sources.r2.shell=/bin/sh -c # Describe the sink a2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink a2.sinks.k2.topic = streamingtopic a2.sinks.k2.brokerList = hadoop0:9092 a2.sinks.k2.batchSize = 20 a2.sinks.k2.requiredAcks = 1
2)zookeeper和kafka配置保持上面配置不变即可;
3)配置spark streaming的配置文件;
切换到/examples/src/main/python/streaming/目录下,新增一个kafkadirect.py文件,文件内容如下:
#!/usr/bin/python #coding:utf-8 from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils import datetime,yaml, os,re,pymysql #时间格式转换 def timeutil(time): return datetime.datetime.strptime(time, "%Y-%m-%d %H:%M:%S").strftime("%Y%m%d%H%M%S") #第一步:创建一个本地的StreamingContext,并设置批处理周期为5s sc=SparkContext("local[2]","flumeWordCount") ssc=StreamingContext(sc,10) #第二步:创建一个kafka连接 topic="streamingtopic" brokers="hadoop0:9092" #参数分别表示ssc连接名,列表形式显示的topic名称,brokers列表 directkafkaStream = KafkaUtils.createDirectStream(ssc,[topic],{"metadata.broker.list":brokers}) #第三步:数据处理&转换&存储 #提取kafka数据的第一个字段为消息正文字段并打印数据长度 lines = directkafkaStream.map(lambda x: x[1]) #加载规则文件 f=open('attackregexp_file.yaml','r') cont=f.read() x=yaml.load(cont) #识别攻击行为 def identify_attack(url): rule_len=len(x['attackregex']) for i in xrange(rule_len): rex = x['attackregex'][i]['regex'] if re.match(rex,url): #连接数据库并追加记录到数据库中 mysqldb = pymysql.connect(host='localhost',port=3306,db='ana_log',user='root',passwd='P@ssw0rd',charset='utf8') cursor = mysqldb.cursor() query_sql="insert into test values('%d','%s');" % (5,'ceshizhuru') cursor.execute(query_sql) mysqldb.commit() data=cursor.fetchone() cursor.close() mysqldb.close() #处理日志记录 def test(i): infos=i.split(" ") url=infos[6] identify_attack(url) host=infos[0] ip=infos[0] time=infos[3] status_code=int(infos[8]) return (ip,url,time,status_code) words=lines.map(lambda x:test(x)).pprint() #第四步:开始sparkstreaming ssc.start() ssc.awaitTermination()
保存文件后退出。这里加载了一个规则文件也是这里的核心文件。
4)分别启动zk、kafka等程序
4.1)启动zk
./bin/zkServer.sh start
4.2)启动kafka
./bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
4.3)创建topic
./bin/kafka-topics.sh --create --zookeeper hadoop0:2181 --replication-factor 1 --partitions 1 --topic streamingtopic
4.4)启动flume
./bin/flume-ng agent --name a2 --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/access-log-flume-kafka.conf -Dflume.root.logger=INFO,console
4.5)启动spark streaming
./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 examples/src/main/python/streaming/kafkadirect1.py
4.6)然后我们向监听的文件中增加新的记录,如果在mysql中出现存储后的结果就表示联动成功。这里我们的规则是判断请求行为是否存在攻击行为。这里我贴上在spark streaming窗口看到的处理记录:
五、总结
终于我们的环境安装完毕了,后面我们要做的就是开展每一个课程内容了。
文章内容比较多,肯定会存在一些纰漏,应该也会存在更优的解决方案,毕竟我在这方面的研究时间不长,希望大家在观阅的过程中提出自己的见解,共同成长,有问题或者好的见解请联系我,谢谢。
*本文原创作者:gncao,本文属于FreeBuf原创奖励计划,未经许可禁止转载
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。