需求:把线下210数据同步到本地中台数据库中
文章主是把线上mysql同步到线下mysql中
DataX下载地址
一、DataX 简介
DataX 是阿里云 DataWorks 数据集成 的开源版本,主要就是用于实现数据间的离线同步。 DataX 致力于实现包括关系型数据库(MySQL、Oracle 等)、HDFS、Hive、ODPS、Hbase、FTP 等 各种异构数据源(即不同的数据库) 间稳定高效的数据同步功能。
角色 作用
任务启动时刻 : 2022-04-24 17:20:54
任务结束时刻 : 2022-04-24 17:21:04
任务总计耗时 : 10s
任务平均流量 : 253.91KB/s
记录写入速度 : 10000rec/s
读出记录总数 : 100000
读写失败总数 : 0
2.DataX3.0 核心架构
DataX 完成单个数据同步的作业,我们称为 Job,DataX 接收到一个 Job 后,将启动一个进程来完成整个作业同步过程。DataX Job 模块是单个作业的中枢管理节点,承担了数据清理、子任务切分、TaskGroup 管理等功能。
DataX 调度过程:
首先 DataX Job 模块会根据分库分表切分成若干个 Task,然后根据用户配置并发数,来计算需要分配多少个 TaskGroup;
计算过程:Task / Channel = TaskGroup,最后由 TaskGroup 根据分配好的并发数来运行 Task(任务)
二、使用 DataX 实现数据同步
准备工作:
JDK(1.8 以上,推荐 1.8)
Python(2,3 版本都可以)
Apache Maven 3.x(Compile DataX)(手动打包使用,使用 tar 包方式不需要安装)
安装 JDK:下载地址(需要创建 Oracle 账号)
下载jdk解压到/usr/local/java/
配置jdk环境变量
vi /etc/profile
因为 上自带 的软件包,所以不需要进行安装
1.Linux 上安装 DataX 软件
- 当未删除时,可能会输出:
验证:
输出:
能显示出来表示验证成功
生成 MySQL 到 MySQL 同步的模板:
验证:
1.IDEA编译打包
下载链接:https://pan.baidu.com/s/13a8nIpz6FL8y4fdE94trjQ
提取码:data
备注:官方提供的版本tar版本包
https://pan.baidu.com/s/13yoqhGpD00I82K4lOYtQhg 提取码:cpsk
2.解压安装包
在选定的安装目录,解压安装包
为接下来一键安装部署准备,这里我建的库是dataxweb(自己定义就好,前后保持一致)
4.执行一键安装脚本
进入解压后的目录,找到bin目录下面的install.sh文件,如果选择交互式的安装,则直接执行
然后按照提示操作即可。包含了数据库初始化,如果你的服务上安装有mysql命令,在执行安装脚本的过程中则会出现以下提醒:
按照提示输入数据库地址,端口号,用户名,密码以及数据库名称,大部分情况下即可快速完成初始化。
如果服务上并没有安装mysql命令,则可以取用目录下/bin/db/datax-web.sql脚本去手动执行,完成后修改相关配置文件
按照具体情况配置对应的值即可。
在交互模式下,对各个模块的package压缩包的解压以及configure配置脚本的调用,都会请求用户确认,可根据提示查看是否安装成功,如果没有安装成功,可以重复尝试; 如果不想使用交互模式,跳过确认过程,则执行以下命令安装
4.1邮件服务
在项目目录:modules/datax-admin/bin/env.properties 配置邮件服务(可跳过)
此文件中包括一些默认配置参数,例如:server.port,具体请查看文件。
4.2指定PYTHON_PATH的路径
此文件中包括一些默认配置参数,例如:executor.port,json.path,data.path等,具体请查看文件。
5.1一键启动所有服务
中途可能发生部分模块启动失败或者卡住,可以退出重复执行,如果需要改变某一模块服务端口号,则:
找到SERVER_PORT配置项,改变它的值即可。 当然也可以单一地启动某一模块服务:
module_name可以为datax-admin或datax-executor
5.2一键停止所有服务
当然也可以单一地停止某一模块服务:
5.3查看服务(注意!注意!)
在Linux环境下使用JPS命令,查看是否出现DataXAdminApplication和DataXExecutorApplication进程,如果存在这表示项目运行成功
1.前端界面
部署完成后,在浏览器中输入 http://ip:port/index.html 就可以访问对应的主界面(ip为datax-admin部署所在服务器ip,port为为datax-admin 指定的运行端口9527)
输入用户名 admin 密码 123456 就可以直接访问系统
2.datax-web API
datax-web部署成功后,可以了解datax-web API相关内容,网址: http://ip:port/doc.html
五、DataX-WEB 运行日志
部署完成之后,在modules/对应的项目/data/applogs下(用户也可以自己指定日志,修改application.yml 中的logpath地址即可),用户可以根据此日志跟踪项目实际启动情况
如果执行器启动比admin快,执行器会连接失败,日志报"拒绝连接"的错误,一般是先启动admin,再启动executor,30秒之后会重连,如果成功请忽略这个异常。
查看web界面是否有注册成功的执行器,另外执行器可以根据需要改名称。
3.任务模板构建
3.2路由策略:当执行器集群部署时,提供丰富的路由策略,包括:
FIRST(第一个):固定选择第一个机器;
LAST(最后一个):固定选择最后一个机器;
ROUND(轮询):依次分配任务;
RANDOM(随机):随机选择在线的机器;
CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;
LEAST_RECENTLY_USED(最近最久未使用):最久为使用的机器优先被选举;
FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;
BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;
- 单机串行:调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
- 丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
- 覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;
增量增新建议将阻塞策略设置为丢弃后续调度或者单机串行
设置单机串行时应该注意合理设置重试次数(失败重试的次数*每次执行时间<任务的调度周期),重试的次数如果设置的过多会导致数据重复,例如任务30秒执行一次,每次执行时间需要20秒,设置重试三次,如果任务失败了,第一个重试的时间段为1577755680-1577756680,重试任务没结束,新任务又开启,那新任务的时间段会是1577755680-1577758680
3.3任务类型
先选择DataX任务,后续配置完详细任务后可以更改3.4其它
可以根据需求填写
3.4其它
4.数据源配置
根据不同数据源,配置参数。
5.任务构建
构建reader
"where": " id>= ${startId} and id<= ${endId}",
入库前 清表
"preSql": [
"truncate table uc_op_dynnamic_ad_api_day"
],
问题1:
配置远程数据源过程中出现异常,提示远程210不能连接
Code:[MYSQLErrCode-02], Description:[数据库服务的IP地址或者Port错误,请检查填写的IP地址和Port或者联系DBA确认IP地址和Port是否正确。如果是同步中心用户请联系DBA确认idb上录入的IP和PORT信息和数据库的当前实际信息是一致的]. - 具体错误信息为:com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure The last packet successfully received from the server was 174 milliseconds ago. The last packet sent successfully to the server was 174 milliseconds ago.
把https://blog.csdn.net/yuanzelin8/article/details/datax-admin/lib/mysql-connector-java-5.1.47.jar替换成mysql-connector-java-5.1.49.jar好了
问题2:
2022-04-28 16:00:08.872 [New I/O server worker #1-3] ERROR c.a.otter.canal.server.netty.handler.SessionHandler - something goes wrong with channel:[id: 0x31df79de, /211.103.136.38:44900 => /x.x.x.210:11111], exception=java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntonativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:322)
at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:281)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:201)
at org.jboss.netty.util.internal.IoWorkerRunnable.run(IoWorkerRunnable.java:46)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)