KOA + egg.js 集成 kafka 消息队列

栏目: Node.js · 发布时间: 6年前

内容简介:Egg.js : 基于KOA2的企业级框架Kafka:高吞吐量的分布式发布订阅消息系统本文章将集成egg + kafka + mysql 的日志系统例子

Egg.js : 基于KOA2的企业级框架

Kafka:高吞吐量的分布式发布订阅消息系统

本文章将集成egg + kafka + mysql 的日志系统例子

系统要求:日志记录,通过kafka进行消息队列控制

思路图:

KOA + egg.js 集成 kafka 消息队列

这里消费者和生产者都由日志系统提供

λ.1 环境准备

①Kafka

官网下载kafka后,解压

启动zookeeper:

bin/zookeeper-server-start.sh config/zookeeper.properties
复制代码

启动Kafka server

这里config/server.properties中将num.partitions=5,我们设置5个partitions

bin/kafka-server-start.sh config/server.properties
复制代码

② egg + mysql

根据脚手架搭建好egg,再多安装kafka-node,egg-mysql

mysql 用户名root 密码123456

λ.2 集成

  1. 根目录新建app.js,这个文件在每次项目加载时候都会运作
'use strict';
 
const kafka = require('kafka-node');
 
module.exports = app => {
  app.beforeStart(async () => {
    const ctx = app.createAnonymousContext();
 
    const Producer = kafka.Producer;
    const client = new kafka.KafkaClient({ kafkaHost: app.config.kafkaHost });
    const producer = new Producer(client, app.config.producerConfig);
 
    producer.on('error', function(err) {
      console.error('ERROR: [Producer] ' + err);
    });
 
    app.producer = producer;
 
    const consumer = new kafka.Consumer(client, app.config.consumerTopics, {
      autoCommit: false,
    });
 
    consumer.on('message', async function(message) {
      try {
        await ctx.service.log.insert(JSON.parse(message.value));
        consumer.commit(true, (err, data) => {
          console.error('commit:', err, data);
        });
      } catch (error) {
        console.error('ERROR: [GetMessage] ', message, error);
      }
    });
 
    consumer.on('error', function(err) {
      console.error('ERROR: [Consumer] ' + err);
    });
  });
};
复制代码

上述代码新建了生产者、消费者。

生产者新建后加载进app全局对象。我们将在请求时候生产消息。这里只是先新建实例

消费者获取消息将访问service层的insert方法(数据库插入数据)。

具体参数可以参考kafka-node官方API,往下看会有生产者和消费者的配置参数。

  1. controller · log.js

这里获取到了producer,并传往service层

'use strict';
 
const Controller = require('egg').Controller;
 
class LogController extends Controller {
  /**
   * @description Kafka控制日志信息流
   * @host /log/notice
   * @method POST
   * @param {Log} log 日志信息
   */
  async notice() {
    const producer = this.ctx.app.producer;
    const Response = new this.ctx.app.Response();
 
    const requestBody = this.ctx.request.body;
    const backInfo = await this.ctx.service.log.send(producer, requestBody);
    this.ctx.body = Response.success(backInfo);
  }
}
 
module.exports = LogController;
复制代码
  1. service · log.js

这里有一个send方法,这里调用了producer.send ,进行生产者生产

insert方法则是数据库插入数据

'use strict';
 
const Service = require('egg').Service;
const uuidv1 = require('uuid/v1');
 
class LogService extends Service {
  async send(producer, params) {
    const payloads = [
      {
        topic: this.ctx.app.config.topic,
        messages: JSON.stringify(params),
      },
    ];
 
    producer.send(payloads, function(err, data) {
      console.log('send : ', data);
    });
 
    return 'success';
  }
  async insert(message) {
    try {
      const logDB = this.ctx.app.mysql.get('log');
      const ip = this.ctx.ip;
 
      const Logs = this.ctx.model.Log.build({
        id: uuidv1(),
        type: message.type || '',
        level: message.level || 0,
        operator: message.operator || '',
        content: message.content || '',
        ip,
        user_agent: message.user_agent || '',
        error_stack: message.error_stack || '',
        url: message.url || '',
        request: message.request || '',
        response: message.response || '',
        created_at: new Date(),
        updated_at: new Date(),
      });
 
      const result = await logDB.insert('logs', Logs.dataValues);
 
      if (result.affectedRows === 1) {
        console.log(`SUCEESS: [Insert ${message.type}]`);
      } else console.error('ERROR: [Insert DB] ', result);
    } catch (error) {
      console.error('ERROR: [Insert] ', message, error);
    }
  }
}
 
module.exports = LogService;
复制代码
  1. config · config.default.js

一些上述代码用到的配置参数具体在这里,注这里开了5个partition。

'use strict';
 
module.exports = appInfo => {
  const config = (exports = {});
 
  const topic = 'logAction_p5';
 
  // add your config here
  config.middleware = [];
 
  config.security = {
    csrf: {
      enable: false,
    },
  };
 
  // mysql database configuration
  config.mysql = {
    clients: {
      basic: {
        host: 'localhost',
        port: '3306',
        user: 'root',
        password: '123456',
        database: 'merchants_basic',
      },
      log: {
        host: 'localhost',
        port: '3306',
        user: 'root',
        password: '123456',
        database: 'merchants_log',
      },
    },
    default: {},
    app: true,
    agent: false,
  };
 
  // sequelize config
  config.sequelize = {
    dialect: 'mysql',
    database: 'merchants_log',
    host: 'localhost',
    port: '3306',
    username: 'root',
    password: '123456',
    dialectOptions: {
      requestTimeout: 999999,
    },
    pool: {
      acquire: 999999,
    },
  };
 
  // kafka config
  config.kafkaHost = 'localhost:9092';
 
  config.topic = topic;
 
  config.producerConfig = {
    // Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0
    partitionerType: 1,
  };
 
  config.consumerTopics = [
    { topic, partition: 0 },
    { topic, partition: 1 },
    { topic, partition: 2 },
    { topic, partition: 3 },
    { topic, partition: 4 },
  ];
 
  return config;
};
复制代码
  1. 实体类:

mode · log.js

这里使用了 Sequelize

'use strict';
 
module.exports = app => {
  const { STRING, INTEGER, DATE, TEXT } = app.Sequelize;
 
  const Log = app.model.define('log', {
    /**
     * UUID
     */
    id: { type: STRING(36), primaryKey: true },
    /**
     * 日志类型
     */
    type: STRING(100),
    /**
     * 优先等级(数字越高,优先级越高)
     */
    level: INTEGER,
    /**
     * 操作者
     */
    operator: STRING(50),
    /**
     * 日志内容
     */
    content: TEXT,
    /**
     * IP
     */
    ip: STRING(36),
    /**
     * 当前用户代理信息
     */
    user_agent: STRING(150),
    /**
     * 错误堆栈
     */
    error_stack: TEXT,
    /**
     * URL
     */
    url: STRING(255),
    /**
     * 请求对象
     */
    request: TEXT,
    /**
     * 响应对象
     */
    response: TEXT,
    /**
     * 创建时间
     */
    created_at: DATE,
    /**
     * 更新时间
     */
    updated_at: DATE,
  });
 
  return Log;
};

复制代码
  1. 测试 Python 脚本:
import requests
 
from multiprocessing import Pool
from threading import Thread
 
from multiprocessing import Process
 
 
def loop():
    t = 1000
    while t:
        url = "http://localhost:7001/log/notice"
 
        payload = "{\n\t\"type\": \"ERROR\",\n\t\"level\": 1,\n\t\"content\": \"URL send ERROR\",\n\t\"operator\": \"Knove\"\n}"
        headers = {
        'Content-Type': "application/json",
        'Cache-Control': "no-cache"
        }
 
        response = requests.request("POST", url, data=payload, headers=headers)
 
        print(response.text)
 
if __name__ == '__main__':
    for i in range(10):
        t = Thread(target=loop)
        t.start()
复制代码
  1. 建表语句:
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
 
-- ----------------------------
-- Table structure for logs
-- ----------------------------
DROP TABLE IF EXISTS `logs`;
CREATE TABLE `logs`  (
  `id` varchar(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,
  `type` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '日志类型',
  `level` int(11) NULL DEFAULT NULL COMMENT '优先等级(数字越高,优先级越高)',
  `operator` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '操作人',
  `content` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '日志信息',
  `ip` varchar(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT 'IP\r\nIP',
  `user_agent` varchar(150) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '当前用户代理信息',
  `error_stack` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '错误堆栈',
  `url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '当前URL',
  `request` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '请求对象',
  `response` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '响应对象',
  `created_at` datetime(0) NULL DEFAULT NULL COMMENT '创建时间',
  `updated_at` datetime(0) NULL DEFAULT NULL COMMENT '更新时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic;
 
SET FOREIGN_KEY_CHECKS = 1;
复制代码

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

计算机组成:结构化方法

计算机组成:结构化方法

坦嫩鲍姆 / 刘卫东 / 人民邮电出版社 / 2006-1 / 65.00元

本书采用结构化方法来介绍计算机系统,书的内容完全建立在“计算机是由层次结构组成的,每层完成规定的功能”这一概念之上。作者对本版进行了彻底的更新,以反映当今最重要的计算机技术以及计算机组成和体系结构方面的最新进展。书中详细讨论了数字逻辑层、微体系结构层、指令系统层、操作系统层和汇编语言层,并涵盖了并行体系结构的内容,而且每一章结尾都配有丰富的习题。 本书适合作为计算机专业本科生计算机组......一起来看看 《计算机组成:结构化方法》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具