[关闭]
@levinzhang 2021-01-24T13:57:11.000000Z 字数 6879 阅读 663

使用Kafka、MongoDB和Maxwell's Daemon构建SQL数据库的审计系统

by

摘要:

在本文中,作者讨论了在传统内置的数据副本之外,实现数据库审计日志系统的重要性,并给出了使用Kafka、MongoDB和Maxwell's Daemon构建审计日志系统的方案。

核心要点

假设你正在使用关系型数据来维护事务性数据并且你需要存储某些数据的审计跟踪信息,而这些数据本身是以表的形式存在的。如果你像大多数开发人员那样,那么最终所采用的方案可能如下所示:

1. 使用数据库的审计日志功能

大多数数据库都提供了插件来支持审计日志。这些插件可以很容易地安装和配置,以便于记录数据。但是,这种方式存在如下的问题:

2. 使用应用程序来负责审计日志

要实现这一点,你可以采用如下的方案之一:

a.在更新现有的数据之前,复制现有的数据到另外一个表中,然后再更新当前表中的数据。

b.为数据添加一个版本号,然后每次更新都会插入一条已递增版本号的数据。

c.写入到两个数据库表中,其中一张表包含最新的数据,另外一张表包含审计跟踪信息。

作为设计可扩展系统的一项原则,我们必须要避免多次写入相同的数据,因为这不仅会降低系统的性能,还会引发各种数据不同步的问题。

那么企业为什么需要审计数据呢?

在开始介绍审计日志系统的架构之前,我们首先看一下各种组织对审计日志系统的一些需求。

在本文中,我将会使用像Maxwell’s Daemon和Kafka这样的技术提供一个可扩展的方案,以管理审计跟踪数据。

问题陈述

构建一个独立于应用程序和数据模型的审计系统。该系统必须要具备可扩展性并且经济划算。

架构

重要提示:本系统只适用于使用MySQL数据库的情况,并且使用基于ROW的binlog日志格式

在我们讨论解决方案的细节之前,我们先快速看一下本文中所讨论的每项技术。

Maxwell’s Daemon

Maxwell’s Daemon(MD)是一个来自Zendesk的开源项目,它会读取MySQL bin日志并将ROW更新以JSON的格式写入到Kafka、Kinesis或其他流平台上。Maxwell的运维开销非常低,除了MySQL和一些写入数据的地方之外,就没有其他的需求了,如项目网站所述。简而言之,MD是一个数据变化捕获(Change-Data-Capture,CDC)的工具。

市场上有很多可用的CDC变种,比如Redhat的Debezium、Netflix的DBLog以及LinkedIn的Brooklyn。我们这里的环境可以采用这些工具中的任意一个来实现。但是,Netflix的DBLog以及LinkedIn的Brooklyn是为了满足不足的使用场景而开发的,正如上述的链接中所阐述的那样。不过,Debezium与MD非常类似,可以用来取代我们的架构中的MD。关于该选择MD还是Debezium,我简要列出了几件需要考虑的事情。

Kafka

Apache Kafka是一个开源的分布式事件流平台,能够用于高性能的数据管道、流分析、数据集成和任务关键型的应用。

MongoDB

MongoDB是一个通用的、基于文档的分布式数据库,它是为现代应用开发人员和云时代所构建的。我们使用MongoDB只是为了进行阐述,你可以选择其他的方案,比如S3,也可以选择其他的时序数据库如InfluxDBCassandra

下图展示了审计跟踪方案的数据流图。

图1 数据流图

在审计跟踪管理系统中,要涉及到如下几个步骤。

  1. 应用程序执行数据库写入、更新或删除操作。
  2. SQL数据库将会以ROW格式为这些操作生成bin日志。这是SQL数据库相关的配置。
  3. Maxwell’s Daemon轮询SQL bin日志,读取新的条目并将其写入到Kafka主题中。
  4. 消费者应用轮询Kafka主题以读取数据并进行处理。
  5. 消费者将处理后的数据写入到新的数据存储中。

环境搭建

为了实现简便的环境搭建,我们在所有可能的地方都尽可能使用Docker容器。如果你的机器还没有安装docker的话,那么可以考虑安装Docker Desktop

MySQL数据库

1.在本地运行mysql服务器。如下的命令将会在3307端口启动一个mysql容器。

  1. docker run -p 3307:3306 -p 33061:33060 --name=mysql83 -d mysql/mysql-server:latest

2.如果这是全新安装的话,我们并不知道root密码,运行如下的命令在控制台打印密码出来。

  1. docker logs mysql83 2>&1 | grep GENERATED

3.如果需要的话,登录容器并更改密码。

  1. docker exec -it mysql83 mysql -uroot -p
  2. alter user 'root'@'localhost' IDENTIFIED BY 'abcd1234'

4.处于安全的原因,mysql docker容器默认不允许从外部应用进行连接。我们需要运行如下的命令改变这一点。

  1. update mysql.user set host = '%' where user='root';

5.从mysql提示窗口退出并重启docker容器。

  1. docker container restart mysql83

6.重新登录mysql客户端并运行如下的命令为maxwell’s daemon创建用户。关于该步骤的详细信息,请参考Maxwell’s Daemon的快速指南

  1. docker exec -it mysql83 mysql -uroot -p
  2. set global binlog_format=ROW;
  3. set global binlog_row_image=FULL;
  4. CREATE USER 'maxwell'@'%' IDENTIFIED BY 'pmaxwell';
  5. GRANT ALL ON maxwell.* TO 'maxwell'@'%';
  6. GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
  7. CREATE USER 'maxwell'@'localhost' IDENTIFIED BY 'pmaxwell';
  8. GRANT ALL ON maxwell.* TO 'maxwell'@'localhost';
  9. GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'localhost';

Kafka代理

搭建Kafka是一项非常简单直接的任务。从该链接下载Kafka。

运行如下的命令:

提取Kafka

  1. tar -xzf kafka_2.13-2.6.0.tgz
  2. cd kafka_2.13-2.6.0

运行Zookeeper,这是目前使用Kafka所需要的

  1. bin/zookeeper-server-start.sh config/zookeeper.properties

在一个单独的终端启动Kafka

  1. bin/kafka-server-start.sh config/server.properties

在一个单独的终端创建主题

  1. bin/kafka-topics.sh --create --topic maxwell-events --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

上述的命令会启动一个Kafka代理并在其中创建一个名为“maxwell-events”的主题。

要推送消息到该Kafka主题,我们可以在新的终端运行如下的命令

  1. bin/kafka-console-producer.sh --topic maxwell-events --broker-list localhost:9092

上述的命令会给我们显示一个提示,从中可以输入消息内容,然后点击回车键,以便于发送消息到Kafka中。

消费来自Kafka主题的消息

  1. bin/kafka-console-producer.sh --topic quickstart-events --broker-list localhost:9092

Maxwell’s Daemon

通过该地址下载maxwell’s daemon。

将其解压并运行如下的命令。

  1. bin/maxwell --user=maxwell --password=pmaxwell --host=localhost --port=3307 --producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=maxwell-events

这样的话,我们就建立好了Maxwell来监控前面所搭建的数据库的bin日志。当然,我们也可以只监控几个数据库或几个表。关于这方面的更多信息,请参考Maxwell’s Daemon配置文档。

测试环境

要测试搭建的环境是否正确的话,我们可以连接MySQL,并在一张表中插入一些数据。

  1. docker exec -it mysql83 mysql -uroot -p
  2. CREATE DATABASE maxwelltest;
  3. USE maxwelltest;
  4. CREATE TABLE Persons (
  5. PersonId int NOT NULL AUTO_INCREMENT,
  6. LastName varchar(255),
  7. FirstName varchar(255),
  8. City varchar(255),
  9. primary key (PersonId)
  10. );
  11. INSERT INTO Persons (LastName, FirstName, City) VALUES ('Erichsen', 'Tom', 'Stavanger');

现在,在另外一个终端中,运行如下的命令:

  1. bin/kafka-console-consumer.sh --topic maxwell-events --from-beginning --bootstrap-server localhost:9092

在终端中,你应该能够看到如下所示的内容:

  1. {"database":"maxwelltest","table":"Persons","type":"insert","ts":1602904030,"xid":17358,"commit":true,"data":{"PersonId":1,"LastName":"Erichsen","FirstName":"Tom","City":"Stavanger"}}

正如我们所看到的,Maxwell’s Daemon捕获到了数据库插入事件并写入一个JSON字符串到Kafka主题中,其中包含了事件的详情。

搭建MongoDB

要在本地运行MongoDB,可以运行如下的命令:

  1. docker run --name mongolocal -p 27017:27017 mongo:latest

Kafka消费者

Kafka-consumer的代码可以通过GitHub项目获取。下载源码并参考README文档以了解如何运行。

最终测试

最后,我们的环境搭建终于完成了。登录MySQL数据库并运行任意的插入、删除或更新命令。如果环境搭建正确的话,将会在mongodb auditlog数据库中看到相应的条目。我们可以愉快地开始进行审计了!

结论

在本文中所描述的系统在实际部署中能够很好地运行,为我们提供了一个用户数据之外的额外数据源,但是在采用这种架构之前,有些权衡你必须要注意。

  1. 基础设施成本:要运行这种环境,需要额外的基础设施。数据要经历网络上的多次跳转,从数据库到Kafka,再到另外一个数据库,后面可能还会到一个备份中。这会增加基础设施的成本。
  2. 因为数据要经历多次跳转,审计日志无法以实时的形式进行维护。它可能会延迟几秒到几分钟。我们可能会反问“谁能需要实时的审计日志呢?”但是,如果你计划使用这种数据进行实时监控的话,必须要考虑到这一点。
  3. 在这个架构中,我们捕获了数据的变化,而不是谁改变了数据。如果你还关心哪个数据库用户改变了数据的话,那么这种设计就不能提供直接的支持了。

在强调完这种架构的一些权衡之后,我想重申一下这种环境的收益,它的主要好处在于:

关于作者

Vishal Sinha是一位充满激情的技术专家,对分布式计算和大型可扩展系统有着专业的知识和浓厚的兴趣。目前,他在一家领先的印度独角兽公司担任技术总监。在16年的软件行业生涯中,他曾在多家跨国公司和创业公司工作,开发过各种大规模的系统,并领导过一个由众多软件工程师组成的团队。他喜欢解决复杂的问题和尝试新技术。

查看英文原文:Building a SQL Database Audit System using Kafka, MongoDB and Maxwell's Daemon

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注