攻防系统之攻防环境介绍&搭建

栏目: 编程工具 · 发布时间: 5年前

内容简介:为什么要搞这个东西呢,因为自己希望能够通过这个“课题”提高自己的渗透技术、提高自己的应急响应能力,同时提升监测平台的有效检测率。听起来感觉不错的样子,但是目前没有在互联网上发现类似的环境,所以就只能自己动手来搭建这套系统,希望这篇文章能够给让大家多一点想法,多一点思路。当然本文肯定会有不足之处,也希望大家多多提出意见,有则改进、无则勉之。既然是一个课题,是一个连载系列,所以后面还会有其他文章输出,那作为第一个开篇的文章,我要在这里给大家简单介绍下这个课程大纲 。目前规划主要有四个大章节。

*本文原创作者:gncao,本文属于FreeBuf原创奖励计划,未经许可禁止转载

一、前言

为什么要搞这个东西呢,因为自己希望能够通过这个“课题”提高自己的渗透技术、提高自己的应急响应能力,同时提升监测平台的有效检测率。听起来感觉不错的样子,但是目前没有在互联网上发现类似的环境,所以就只能自己动手来搭建这套系统,希望这篇文章能够给让大家多一点想法,多一点思路。当然本文肯定会有不足之处,也希望大家多多提出意见,有则改进、无则勉之。

二、连载介绍

既然是一个课题,是一个连载系列,所以后面还会有其他文章输出,那作为第一个开篇的文章,我要在这里给大家简单介绍下这个课程大纲 。目前规划主要有四个大章节。

第一章:攻防环境介绍&搭建(也就是本篇文章)

第二章:主机入侵&响应

2.1 ssh端口爆破
2.2 ftp端口爆破
2.3  mysql 服务爆破
2.4 telnet服务爆破
…

第三章:Web应用入侵&响应

3.1  sql 注入
3.2 xss攻击
3.3 文件上传
3.4 命令执行
…

第四章:后门/木马入侵&响应

3.1 后门入侵&检测
3.2 挖矿病毒入侵&检测
…

三、环境介绍

因为资源的限制,所以目前整套系统都在虚拟机环境中搭建。整个环境包含三台虚拟机(未来应该会扩展到四台虚拟机),一台攻击主机、两台受害主机、一台监测主机。拓扑图如下(没有visio工具,画的很丑,大家将就着看吧):

攻防系统之攻防环境介绍&搭建 看图可能有些同学看不出来是个啥玩意,我来解释下。

attacker作为一个攻击者会对受害者1和受害者2分别发起攻击(两种操作系统),然后事先我们在两个受害者主机上分别安装了flume客户端,flume会采集操作系统、应用软件的日志信息实时传送给observer监控主机,监控主机上运行着监控平台,通过加载检测规则分析日志,发现攻击行为并发出告警信息,当然最后在observer主机上会有展示页面,展示攻击行为。

通过这样一套环境,我们需要掌握攻击技能、响应能力和检测能力,所以对于个人的技术提升有很大的帮助。

各个虚拟机的详细信息如下表所示:

主机名称 角色 IP地址 操作系统
Attacker 攻击机 192.168.171.1 Win10
Victim1 受害主机1 192.168.171.121 Centos7_64
Victim2 受害主机2 192.168.171.122 WinServer2012_64
Observer 监测主机 192.168.171.120 Centos7_64

四、环境搭建

目前第一阶段先不启用victim2虚拟机,第一阶段力求搞懂 Linux 环境下的攻防,所以整个环境就需要从三个虚拟机开始搭建。

4.1 Attacker

为了保证资源的合理运用,攻击机就使用物理主机,IP地址和操作系统均符合上述要求。

4.2 Victim1

受害主机在未来需要安装许多程序软件,用来作为一个攻击靶机,包含ssh、mysql、apache、ftp等等;同时也需要安装flume客户端采集服务器日志信息;

4.2.1 安装系统

安装系统比较简单,只是在选择镜像文件的时候没有直接选择centos7镜像文件,而是选择稍后安装,同时在启动虚拟机后选择安装的类型是minimal模式,即没有图形化的最简约模式。

4.2.2 给用户授予sudo权限

在centos7安装的时候会创建一个账户(可选),但是创建完的账户是没有sudo权限的,安装一些程序的时候就比较麻烦,所以先给这个账户授予sudo权限。

1)首先找到sudoers文件,一般都在/etc/sudoers这个位置:

whereis sudoers

2)修改该文件权限为可写状态:

chmod -v u+w /etc/sudoers

3)编辑文件内容:

在root  ALL=(ALL)ALL这一行下面增加新的一行

victim ALL=(ALL)ALL

这里表示victim用户将会具备sudoers权限。

4)还原sudoers文件权限:

chmod -v u-w /etc/sudoers

4.2.3 更新yum源并安装ssh服务

这时候就切到普通用户victim下执行下面的命令:

sudo yum -y update
sudo yum -y install openssh-server

一般安装目录都是/etc/sshd/。

启动ssh:service sshd start。

4.2.4 安装 java 并配置环境变量

安装flume前必须要先安装java环境,且对Java的版本要求有限制,这里我使用的jdk是1.8的,flume是1.6.0-cdh5.7.0的。

1)分别在home目录下创建app和software文件夹,app是程序安装目录,software是程序安装包目录:

mkdir app
mkdir software

2)解压software目录下的jdk文件到app目录下:

tar zxvf jdk-8u131-linux-x64.tar.gz -C ../app/

3)配置环境变量

Vim ~/.bahsrc 增加如下信息:

#jdk
export JAVA_HOME=/home/victim/app/jdk1.8.0_131
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH

保存退出后激活环境变量:

Source ~/.bashrc

在命令行下输入java命令能够查看到配置选项信息则表示java环境安装成功。

4.2.5 安装flume并配置环境变量

flume的作用就是从服务器这里采集日志数据传送给logger或者其他服务器。类似的客户端还有filebeat等。

1)解压flume文件到app目录下:

tar zxvf flume-ng-1.6.0-cdh5.7.0.tar.gz -C../app/

2)配置环境变量

Vim ~/.bahsrc 增加如下信息:

#flume
exportFLUME_HOME=/home/hadoop/app/apache-flume-1.6.0-cdh5.7.0-bin
export PATH=${JAVA_HOME}/bin:$PATH

保存退出后激活环境变量:

Source ~/.bashrc

在命令行下输入echo $FLUME_HOME命令能够查看到flume的安装目录则表示成功安装。

4.2.6 配置flume-env.sh

这个文件在启动flume的时候有用到,需要配置该文件中java环境变量:

1)切换到flume的conf目录下,拷贝flume-env.sh.template文件并重名为flume-env.sh;

2)编辑flume-env.sh文件。

取消export JAVA_HOME前面的注释,并修改JAVA_HOME变量指向JAVA目录:

export JAVA_HOME=/home/victim/app/jdk1.8.0_131

4.2.7 检查flume是否能够正常启动

1)在flume的conf目录下,新建example.conf文件,这个文件是flume的配置文件,配置监听的数据源和指定数据输出,内容如下:

#a1:agent名称
#r1:数据源名称
#k1:sink的名称
#c1:channel的名称
 
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = victim
a1.sources.r1.port = 44444
 
# Describe the sink
a1.sinks.k1.type = logger
 
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
 
# Bind the source and sink to the channel
#一个source可以输出到多个channel,所以是channels
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

这里的配置文件表示监听victim主机的44444端口并输出到logger窗口,保存文件并退出。

2)启动agent

切换到flume的bin目录下,执行如下命令:

./flume-ng agent --name a1 --conf $FLUME_HOME/conf--conf-file $FLUME_HOME/conf/example.conf -Dflume.root.logger=INFO,console

a1表示的是agent名称,若出现如下信息,表示agent启动成功。其中console表示接收到的数据显示在命令行窗口。

攻防系统之攻防环境介绍&搭建

3)模拟数据来源

新开命令行窗口,输入命令:

telnet victim  44444

并在命令行中随意输入信息,这些信息将成为数据源传送到logger窗口:

攻防系统之攻防环境介绍&搭建

4)检查是否成功

若在logger窗口下看到telnet窗口的输入数据则表示flume安装成功:

攻防系统之攻防环境介绍&搭建 至此,victim的安装工作基本完成,后续再陆续安装其他程序。在跟observer联动的时候flume配置文件需要更改,更改内容后续会有介绍。

4.3 Observer

observer作为一台监控主机上面运行着监控程序,用来监控日志中是否存在攻击行为并发出告警信息。

在这台主机上需要安装flume接收victim的日志(其实可以直接用kafka接收victim的日志),kafka做数据传输,spark streaming做数据实时分析,MySQL存储处理后的数据,django做数据展示。

其中核心的是spark streaming其中的规则处置,规则的好坏决定着系统的误报和漏报情况。

4.3.1 安装系统

4.3.2 给用户授予sudo权限

4.3.3 更新yum源并安装ssh

4.3.4 安装java环境

4.3.5 安装flume

这五个步骤的安装方法同victim服务器一致,下面是observer的安装重点。

4.3.6 安装zookeeper

在安装kafka之前要先安装zookeeper,因为kafka的集群管理都是通过zookeeper来实现的。

1)下载zookeeper并解压到app目录下:

tar zxvf zookeeper-3.4.5-cdh5.7.0.tar.gz -C ../app/

2)配置zookeeper的环境变量

vim ~/.bashrc,增加如下内容:

#zk
export ZK_HOME=/home/hadoop/app/zookeeper-3.4.5-cdh5.7.0
export PATH=${ZK_HOME}/bin:$PATH

保存推出后,激活当前环境变量:

source ~/.bashrc

然后命令行下输入 echo $ZK_HOME如果能够正常输出zookeeper目录则表示环境变量配置成功。

3)配置zookeeper

切换到zookeeper的conf目录下,拷贝zoo_sample.cfg文件为zoo.cfg文件,并编辑zoo.cfg文件。

修改datadir为自定义的文件目录,如果不修改该选项则有可能存在默认的文件目录被删除的情况。

dataDir=/home/hadoop/app/tmp/zk

因为资源有限所以只能单机模式运行zookeeper和kafka。

4)启动zookeeper

切换到zookeeper的bin目录下,输入如下命令启动zookeeper:

./zkServer.sh start

如果出现如下所示信息则表示zookeeper启动成功:

攻防系统之攻防环境介绍&搭建

4.3.7 安装kafka

安装完zookeeper之后就可以安装kafka了,配置过程也不麻烦。

1)下载kafka并解压到app目录下:

tar zxvf kafka_2.11-0.9.0.0.tgz -C ../app/

2)配置kafka环境变量

vim ~/.bashrc,增加如下内容:

#kafka
export KAFKA_HOME=/home/hadoop/app/kafka_2.11-0.9.0.0
export PATH=${KAFKA_HOME}/bin:$PATH

保存退出后,激活当前环境变量:

source ~/.bashrc

然后命令行下输入 echo $KAFKA_HOME如果能够正常输出kafka目录则表示环境变量配置成功。

3)配置server.properties文件

切换到kafka的config目录下,编辑server.properties文件,编辑配置如下选项:

broker.id=0  #唯一标识broker身份的id信息 
listeners=PLAINTEXT://:9092  #表示监听端口是9092 
host.name=observer  #hostname表示当前主机名称 
log.dirs=/home/hadoop/app/tmp/kafka-logs   #指定kafka的log日志目录 
num.partitions=1   #指定分区数量 

zookeeper.connect=observer:2181  #配置zookeeper的地址和端口

配置完成后保存退出

4)启动kafka

运行bin目录下的kafka-server-start.sh文件:

bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties

输入完命令后会显示如下信息:

攻防系统之攻防环境介绍&搭建 新开一个窗口输入jps -m命令,查看如果存在如下进程则表示程序已经正常启动。

攻防系统之攻防环境介绍&搭建 5)创建topic

一个业务一个topic,在正式使用kafka之前我们必须要配置topic,我们先创建一个topic来测试程序能否正常工作:

bin/kafka-topics.sh --create --zookeeper observer:2181 --replication-factor 1 --partitions 1 --topic testtopic

这里表示创建一个名为testtopic的话题。

查看是否创建成功,新开窗口输入如下命令:

bin/kafka-topics.sh --list --zookeeper observer:2181

如果出现testtopic即表明话题创建成功:

攻防系统之攻防环境介绍&搭建 6)topic消息测试

kafka整体架构分为consumer和producer,consumer表示消费消息端,producer表示生产消息端。在测试之前我们要先生产消息才能消费消息。

首先我们输入如下命令生产消息:

bin/kafka-console-producer.sh --broker-list observer:9092 --topic testtopic

然后我们输入如下命令消费消息:

bin/kafka-console-consumer.sh --zookeeper observer:2181 --topic testtopic --from-beginning 

#from-beginning表示从头开始接收消息。

两个窗口的程序在运行以后就会停在窗口不动,当我们在生产端输入任意字符都会在消费端显示的时候,表示我们的kafka可以正常运行了。

生产端:

攻防系统之攻防环境介绍&搭建 消费端:

攻防系统之攻防环境介绍&搭建 从上面可以看出我们的kafka已经正常运行了。

因为资源有限只能搞单机部署,如果各位老板有钱可以多搞几台主机做冗余,但是一定要注意kafka部署的主机数量一定是2n+1台主机。

7)kafka和flume联动

flume采集到的数据是需要通过kafka传输的,所以它们之间的联动必须要有效才可以。这里我们还是以上面那个testtopic话题为测试对象。

7.1)首先启动kafka,开启consumer消费指定的topic消息

bin/kafka-console-consumer.sh --zookeeper observer:2181 --topic testtopic --from-beginning

7.2)然后修改victim主机上的flume配置文件,修改source监听源为data.log文件,sink输出类型为avro sink(这个类型能跟其他flume客户端联动)。具体配置如下:

# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
#监听某文件
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/data.log
a1.sources.r1.shell=/bin/sh -c
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = observer
a1.sinks.k1.port = 44445
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
# Bind the source and sink to the channel
#一个source可以输出到多个channel,所以是channels
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

上面表示数据来源于data.log文件,输出到observer的44445端口。

7.3)修改observer主机的flume配置文件,配置文件内容如下:

# example.conf: A single-node Flume configuration
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
#监听某文件
a2.sources.r2.type = avro
a2.sources.r2.bind = observer
a2.sources.r2.port=44445
# export to kafka
a2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a2.sinks.k2.brokerList = observer:9092
a2.sinks.k2.topic = testtopic
a2.sinks.k2.requiredAcks = 1
a2.sinks.k2.batchSize = 5
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
# Bind the source and sink to the channel
#一个source可以输出到多个channel,所以是channels
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

sink type类型为KafkaSink。

topic为我们之前创建的topic名称,batchSize表示当记录数到达5个后再发送。

7.4)先启动observer的flume程序,然后再启动victim的flume程序,即先监听再发送数据。

7.4.1)先启动observer的flume程序

../bin/flume-ng agent --name a2 --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/example-kafka.conf -Dflume.root.logger=INFO,console

7.4.2)再启动victim的flume程序

../bin/flume-ng agent --name a1 --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/example-send.conf -Dflume.root.logger=INFO,console

7.4.3)然后我们再向文件/home/hadoop/data/data.log输入测试文件时,如果在kafka的监听窗口看到我们输入的信息时则表示双击联动成功。

攻防系统之攻防环境介绍&搭建 切换到kafka窗口看到我们在data.log文件中新增的内容:

攻防系统之攻防环境介绍&搭建 至此我们的kafka已经安装成功且已经能够跟flume进行联动。下面就是重要数据处理和规则匹配环节了。

4.3.8 安装spark

spark环境安装也是比较麻烦的,需要有耐心才可以,但是跟着步骤走一定会搞定的。

1)scala安装

安装spark之前必须要安装scala和hadoop等环境。首先我们先安装scala语言。

1.1)下载scala安装包并解压到app目录下:

tar zxvf scala-2.11.8.tgz -C ../app/

1.2)配置环境变量

vim ~/.bashrc并加入如下内容:

#scala
export SCALA_HOME=/home/hadoop/app/scala-2.11.8
export PATH=${SCALA_HOME}/bin:$PATH

保存文件后退出,source ~/.bashrc 激活环境变量后在命令行输入scala出现scala交互窗口则表示安装配置成功。

攻防系统之攻防环境介绍&搭建

2)hadoop环境搭建

2.1)下载解压hadoop安装包到app目录下。

tar zxvf hadoop-2.6.0-cdh5.7.0.tar.gz -C ../app/

2.2)配置环境变量

vim ~/.bashrc并加入如下内容:

#hadoop
export HADOOP_HOME=/home/hadoop/app/hadoop-2.6.0-cdh5.7.0
export PATH=${HADOOP_HOME}/bin:$PATH

保存文件后退出,source ~/.bashrc 激活环境变量。

2.3)修改hadoop配置文件

切换到Hadoop的etc/hadoop/目录下,编辑hadoop-env.sh文件,配置java环境变量:

export JAVA_HOME=/home/hadoop/app/jdk1.8.0_131

保存退出后编辑同级目录下的core-site.xml文件,增加如下配置信息:

<configuration>
<property>
        <name>fs.defaultFS</name>
                <value>hdfs://observer:8020</value>
</property>
<property>
        <name>hadoop.tmp.dir</name>
                <value>/home/hadoop/app/tmp</value>
</property>
</configuration>

配置文件指定fs服务器地址和临时目录。

保存退出后编辑统计目录下的hdf-site.xml文件,增加如下配置信息:

<configuration>
<property>
        <name>dfs.replication</name>
                <value>1</value>
</property>
</configuration>

保存退出编辑slaves文件,增加主机名后保存退出。

2.4)格式化

切换到bin目录下,执行如下命令:

./hdfs namenode -format

执行完成后会在上面指定的tmp目录下生成dfs文件夹。

攻防系统之攻防环境介绍&搭建

2.5)启动hadoop服务

切换到sbin目录下,执行如下命令启动hadoop服务:

./start-dfs.sh 

然后输入三次当前账户密码后即可启动服务,在浏览器输入ip:50070后如果在浏览器中出现如下信息则表示hadoop安装成功:

攻防系统之攻防环境介绍&搭建

2.5)配置yarn

yarn作为Hadoop的资源调度守护进程,同样也需要安装。

切换到etc/hadoop/目录下,编辑mapred-site.xml文件,增加如下内容,指定框架为yarn:

<configuration>
<property>
        <name>mapreduce.framework.name</name>
                <value>yarn</value>
</property>
</configuration>

2.6)启动yarn

切换到sbin目录下,执行./start-yarn.sh命令。

同样的输入完密码后,启动yarn。

启动完成后在浏览器输入ip:8088后如果出现如下所示信息则表示yarn配置成功:

攻防系统之攻防环境介绍&搭建

jps -m查看进程信息也表示Hadoop和yarn配置成功。

攻防系统之攻防环境介绍&搭建

3)spark安装

3.1)首先下载源代码编译适合hadoop版本的,然后再安装编译完之后的包。

攻防系统之攻防环境介绍&搭建

3.2)这里我编译的是spark2.2.0的,解压该文件到app目录下并配置环境变量:

tar zxvf spark-2.2.0-bin-2.6.0-cdh5.7.0.tgz -C ../app/

vim ~/.bashrc 增加如下内容:

#spark
export SPARK_HOME=/home/hadoop/app/spark-2.2.0-bin-2.6.0-cdh5.7.0
export PATH=${SPARK_HOME}/bin:$PATH

保存退出后source ~/.bashrc 激活环境变量。

3.3)检查是否安装成功

切换到bin目录下执行./spark-shell –master local[2] 命令,如果进入scala环境则表示安装成功。

攻防系统之攻防环境介绍&搭建 4.3.9 安装MySQL

安装MySQL的目的是将spark streaming的处理结果存储下来,然后通过django程序对结果做展示,MySQL安装直接通过sudo yum -y install mysql-server命令来执行安装即可。

4.3.10安装django

django程序的安装直接使用pip或者easy_install命令来安装即可:

pip install django

django安装完成后可以使用django-admin.py文件来创建一个django项目:

django-admin.py startproject attackview

然后创建一个app:

cd attackview
python manage.py startapp sparkmysql

app创建完毕后需要切换到attackview目录下,并编辑settings.py文件,指定数据库为mysql:

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.mysql',
        'NAME': 'echart',
        'USER':'root',
        'PASSWORD':'root@qwe',
        'HOST':'127.0.0.1',
        'PORT':'3306',
    }
}

保存后退出,然后编辑urls.py文件指定视图和函数之间的关系,保证当有新的url请求后django能够正常处理。

这里的细节不再赘述,包括上面各个程序,如果大家有疑问可以联系我或者百度查询解决方案。

这样我们的攻防环境基本上就完成了,那最后我们要做个flume+kafka+spark+mysql的联动测试,如果mysql数据库中能够存储flume客户端采集后被处理的数据,那表明我们的环境联动没有问题了。

4.3.11 联动测试

1)首先配置flume,监听某日志文件,鉴于文件篇幅,这里只贴上部分代码,大体思路跟flume配置环节一致。

# Describe/configure the source
#监听某端口
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /home/hadoop/source/access.log
a2.sources.r2.shell=/bin/sh -c
# Describe the sink
a2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a2.sinks.k2.topic = streamingtopic
a2.sinks.k2.brokerList = hadoop0:9092
a2.sinks.k2.batchSize = 20
a2.sinks.k2.requiredAcks = 1

2)zookeeper和kafka配置保持上面配置不变即可;

3)配置spark streaming的配置文件;

切换到/examples/src/main/python/streaming/目录下,新增一个kafkadirect.py文件,文件内容如下:

#!/usr/bin/python
#coding:utf-8
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import datetime,yaml, os,re,pymysql
#时间格式转换
def timeutil(time):
        return datetime.datetime.strptime(time, "%Y-%m-%d %H:%M:%S").strftime("%Y%m%d%H%M%S")
#第一步:创建一个本地的StreamingContext,并设置批处理周期为5s
sc=SparkContext("local[2]","flumeWordCount")
ssc=StreamingContext(sc,10)
#第二步:创建一个kafka连接
topic="streamingtopic"
brokers="hadoop0:9092"
#参数分别表示ssc连接名,列表形式显示的topic名称,brokers列表
directkafkaStream = KafkaUtils.createDirectStream(ssc,[topic],{"metadata.broker.list":brokers})
#第三步:数据处理&转换&存储
#提取kafka数据的第一个字段为消息正文字段并打印数据长度
lines = directkafkaStream.map(lambda x: x[1])
#加载规则文件
f=open('attackregexp_file.yaml','r')
cont=f.read()
x=yaml.load(cont)
#识别攻击行为
def identify_attack(url):
rule_len=len(x['attackregex'])
for i in xrange(rule_len):
rex = x['attackregex'][i]['regex']
if re.match(rex,url):
#连接数据库并追加记录到数据库中
mysqldb = pymysql.connect(host='localhost',port=3306,db='ana_log',user='root',passwd='P@ssw0rd',charset='utf8')
cursor = mysqldb.cursor()
query_sql="insert into test values('%d','%s');" % (5,'ceshizhuru')
cursor.execute(query_sql)
mysqldb.commit()
data=cursor.fetchone()
cursor.close()
mysqldb.close()
#处理日志记录
def test(i):
infos=i.split(" ")
url=infos[6]
identify_attack(url)
host=infos[0]
ip=infos[0]
time=infos[3]
status_code=int(infos[8])
return (ip,url,time,status_code)

words=lines.map(lambda x:test(x)).pprint()
#第四步:开始sparkstreaming
ssc.start()
ssc.awaitTermination()

保存文件后退出。这里加载了一个规则文件也是这里的核心文件。

4)分别启动zk、kafka等程序

4.1)启动zk

./bin/zkServer.sh start

4.2)启动kafka

./bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties

4.3)创建topic

./bin/kafka-topics.sh --create --zookeeper hadoop0:2181 --replication-factor 1 --partitions 1 --topic streamingtopic

4.4)启动flume

./bin/flume-ng agent --name a2 --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/access-log-flume-kafka.conf -Dflume.root.logger=INFO,console

4.5)启动spark streaming

./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 examples/src/main/python/streaming/kafkadirect1.py

4.6)然后我们向监听的文件中增加新的记录,如果在mysql中出现存储后的结果就表示联动成功。这里我们的规则是判断请求行为是否存在攻击行为。这里我贴上在spark streaming窗口看到的处理记录:

攻防系统之攻防环境介绍&搭建

五、总结

终于我们的环境安装完毕了,后面我们要做的就是开展每一个课程内容了。

文章内容比较多,肯定会存在一些纰漏,应该也会存在更优的解决方案,毕竟我在这方面的研究时间不长,希望大家在观阅的过程中提出自己的见解,共同成长,有问题或者好的见解请联系我,谢谢。

*本文原创作者:gncao,本文属于FreeBuf原创奖励计划,未经许可禁止转载


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

遗传算法原理及应用

遗传算法原理及应用

周明、孙树栋 / 国防工业出版社 / 1999-6 / 18.0

一起来看看 《遗传算法原理及应用》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

在线进制转换器
在线进制转换器

各进制数互转换器

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具