部署mysql
dockerrun--namemysql-eMYSQL_ROOT_PASSWORD=123456-d-p3306:3306mysql:5.7
mysql启动好之后,需要更改一下配置,因为是容器启动的,需要进入mysql容器执行
#进入mysql容器dockerexec-itmysqlbash
因为mysql容器默认没有安装vi vim等命令工具,这里直接用echo 追加文件内容方式
#开启binlog日志配置echo"[mysqld]">>/etc/mysql/my.cnfecho"server_id=1">>/etc/mysql/my.cnfecho"log_bin=mysql-bin">>/etc/mysql/my.cnfecho"binlog_format=ROW">>/etc/mysql/my.cnf
上述配置更改完成之后,退出mysql容器,然后重启
连接到mysql,执行查询binlog配置是否生效
#查询是否开启binlogshowvariableslike'log_bin';#查询binlog文件详情showmasterlogs;
配置maxwell相关需求
创建maxwell所需的账密及权限
CREATEUSER'maxwell'@'%'IDENTIFIEDBY'XXXXXX';GRANTALLONmaxwell.*TO'maxwell'@'%';GRANTSELECT,REPLICATIONCLIENT,REPLICATIONSLAVEON*.*TO'maxwell'@'%';
创建test库和一个测试用的表
SETFOREIGN_KEY_CHECKS=0;createdatabasetest;usetest;CREATETABLE`maxwell`(`id`int(11)DEFAULTNULL,`daemon`varchar(255)DEFAULTNULL)ENGINE=InnoDBDEFAULTCHARSET=utf8mb4;
部署kafka
安装的kafka版本是2.2.1,依赖zookeeper,所以先安装zookeeper
#安装zkdockerrun-d--namezookeeper-server-eALLOW_ANONYMOUS_LOGIN=yesbitnami/zookeeper:latest#安装kafkadockerrun-d--namekafka-server-eALLOW_PLAINTEXT_LISTENER=yes-eKAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181bitnami/kafka:2.2.1
kafka启动完成之后,需要创建一个maxwell的topic给maxwell使用,因为maxwell不会主动创建topic
#创建一个1复制,5分片的topickafka-topics.sh--create--zookeeperip:2181--replication-factor1--partitions5--topicmaxwell
部署maxwell
dockerrun-it-d--namemaxwell--rmzendesk/maxwellbin/maxwell\--user=maxwell--password=XXXXXX--host=mysql_ip--port=3306--producer=kafka\--kafka.bootstrap.servers=kafka_ip:9092--kafka_topic=maxwell
启动完成之后,查看maxwell容器输出日志,如下图所述就说明启动成功了
#查看maxwell容器日志dockerlogs-fmaxwell
生成数据
编写一个脚本,每隔一秒写一条数据到test.maxwell表里
#进入mysql容器dockerexec-itmysqlbash0
验证采集结果
运行生成数据脚本之后,mysql的test.maxwell表里开始有数据持续写入了。。
现在验证一下kafka里面是否有数据
#进入mysql容器dockerexec-itmysqlbash1
可以看到maxwell成功的解析了binlog日志,并以json格式将消息发送到了kafka
消息解析后的动作 数据库: test 表: maxwell type: 插入 数据: id:100,daemon:"Stanislaw Lem"
至此,maxwell采集binlog日志并发送给kafka完成,下一篇将分享flink1.12消费kafka消息。