@Catyee
2021-09-02T19:25:44.000000Z
字数 6944
阅读 547
工作总结
面试官你好,我叫袁奎,18年研究生毕业,到现在工作三年多一点点,一直在星环做研发工作,工作期间主要负责一个叫TDT的数据流转服务,这个服务的作用主要是将数据加载到公司大数据平台下的产品,主要是两个分析库inceptor和argodb,以及一个交易库kundb。
我负责的部分主要是关系型数据库相关的导数和同步工作,包括关系库到分析库inceptor的全量导数,关系库到分析库inceptor的准实时同步,这里导数和同步的语义是不一样的,同步的要求要高于导数,另外还有一部分关系库之间的实时同步和HTAP场景下的实时同步,这里HTAP场景实际上是指关系库和分析库的数据实时同步功能。这四件事实际上应对的是不同的场景,有批式数据处理,也有流式数据处理,这一部分的工作从设计、实现到优化基本都是我完成的,比如说全量导数,基于最基本mapreduce的思想,我们实现了类似于sqoop的导数方案,这之外还新设计了一种更适合批量配置的导数方式。不管是导数还是同步,基本都是和关系数据库打交道,在开发迭代的过程中我统一了各种异构数据库的库表逻辑结构,统一了特殊字符的处理方式,另外实现了一种驱动包隔离加载的功能,中间涉及到类加载、卸载相关的原理,就我个人而言是一次很好的将理论用于实践,解决实际生产问题的过程,当然遇到的生产问题有很多,但这一次印象比较深刻,我中间提到的这些,如果有兴趣之后都可以详细了解,我的自我介绍就到这里了。
基于jdbc来做传统关系库数据的抽取是比较通用的方式,大家都是这么做的,包括sqoop、阿里开源的datax,底层原理也是一样的,都是想方设法的将原表进行切分,每个切片抽取一部分数据,这样并行或者并发抽取效率会更高。sqoop和datax切片的策略也是一样的,都是要选择一个切分字段,然后通过拼接where条件进行切分。
这种方式的好处是方案通用,任何一个关系型数据库都支持where条件语句,几乎可以认为适用与所有关系型数据库。缺点也很明显,这种方案大家熟知的缺点是切片的数据不均匀,会产生数据倾斜从而影响效率,但实际上还有另外一个比较显著的缺点,那就是对实施人员和原表的要求都很高,实施人员要为每张表都选择一个切分字段,这个切分字段最好是数值或者时间类型的主键或者不为空的唯一键,基于这个切分字段数据还要尽量均匀,这就要求实施人员要特别熟悉每张表,这个要求太高了,我们现场的实施人员也都是我们公司的员工,不可能对客户的库表如此了解,配置整库的导数任务要花费非常长的时间,所以需要一种新的导数方案来降低实施人员的使用难度,总的思路还是对原表进行切片,除了用where条件进行切片之外,我的思路就是用分页语句进行切片,比如对于mysql来说就是limit、offset语句,这种方式不需要选择切分字段,每个切片的数据也是绝对均衡的,但是缺点也很明显,最基本的就是方案不通用,有些数据库没有特别方便的分页方法,不同数据库需要不同的实现,另外一个缺点是性能问题,这是分页语句本身在关系库实际执行方式原因,比如mysql,越往后的分页花费的时间越长,所以这种方式不适合数据量特别大的表,世上没有两全法,所以对于整库导数我们的最佳实践是先对表进行大致的规划,把大表给摘出来,大部分情况下大表都只是少量的,大表用sqoop的方式导数,需要选择切分,单独照顾,导数的效率高。小表用分页的方式,配置起来方便,另外我们根据数据量做了自动计算切片数量的算法,如果表数量实在太少,实际上只有一个map任务去跑,也进一步减少配置并节省资源。
准实时同步要求对源表的增删改能及时的反应到目标端,要求是要比导数高很多的,这种情况一般就不会采用jdbc的方式,一般都是捕获变化数据,比如mysql就是解析binlog日志,目标端就是我们的分析型数据库,是列式存储,对单条或者小数据量的操作是不友好的,所以我们解析binlog日志之后不能直接还原成sql语句去分析库中执行,这是分析库的特征决定的。
那我们怎么办呢?还是要利用分析库的特点,也就是说还是要批量操作,我们先积累一段时间的数据,比如30分钟,我们持续解析binlog,并上传hdfs,然后我们要想办法把这段时间的增删改单条操作转换为批量的操作,这样就可以用mapreduce任务来完成同步的过程了,首先我们可以对操作进行压缩,比如源端插入了一条id为1的一行数据,之后一段时间可能做了n次update操作,我们从binlog中都可以解析出来,但是实际上中间的update操作是不关系的,我们只需要关系某一个时间窗口内的最后一次操作,如果最后一次操作是delete就更容易了,直接不用管。其次由于分析库对update这样的单条操作不友好,所以我们把update拆为先去目标端delete旧数据,然后插入新数据的两个过程,这样拆分源端增删改操作统一为目标端的先删后插,这样删可以批量操作,插入也可以批量操作,唯一要注意的是先删再插入必须在同一个事务里面,否则如果插入失败可能造成数据丢失,我们的分析库很早就实现了事务能力,所以不存在这样的顾虑。
这里面比较费时的步骤是delete的过程,分析库不像关系库可以用索引,所以delete是需要全表扫描的,那优化的思路就是尽量减少扫描的数据量,如果目标表是一个分区的表,我们在构造delete语句的时候要将分区信息给带上,这样可以减少扫描的数据量。
另外由于中间文件是上传到hdfs上的,hdfs对小文件不友好,所以要注意合并小文件,执行完之后要清理文件。我们采用定时任务去进行定时调度,可以做到30分钟的延时,所以是准实时的同步。
所谓HTAP是指同一套系统既要提供交易能力又要提供分析能力,我们公司的方案是使用交易库kundb和分析库argodb打造一个一体的系统,其中kundb提供交易能力,argodb提供分析能力,那就需要打通kundb和argodb的数据,而且不能和准实时同步不一样,要求很低的延时,所以我们不能再采用mapreduce的思路了,而是流式数据,我们依然是捕获关系库的变化数据,然后会解析成特定的格式写入到kafka,然后启动一个流任务从kafka中获取数据,转化成列式存储的格式放入到目标表的底层文件,这里为了提高效率写采用的是顺序写,那update和delete操作是怎么实现的呢?实际上update和delete操作也转换成了insert的操作,只是有一个标志位和一个版本号,当在分析库中执行查询语句的时候根据标志位和版本号进行操作的合并。
这个项目是一个跨组合作的项目,有交易库kundb组、也有分析库argodb组,我是中间件的部分,我负责的主要是捕获变化数据,然后把数据写入kafka。由于对于实时性要求比较高,所以我也针对canal的部分做出了一些优化,主要有两方面,一个是去除canal client减少一次网络操作,另外就是canal server上任务的负载均衡。
canal本身是一个消息中间件的设计,canal server实际上是生产者,canal client则是消费者,也就是说canal server从mysql获取binlog日志之后,还要通过网络(netty)传输给canal client,然后由canal client实现数据消费的逻辑,中间多了一层网络的传输,实际上没有必要,我猜测原先的canal肯定是有消息存储的,但是开源出来的版本拿掉了存储的这一部分,所以看起来会比较别扭,那么我做的第一个优化就是去掉canal client,canal server获取到binlog之后直接解析成需要的格式发送给kafka进行存储。减少一层网络的传输。
第二个优化就是canal server上任务的负载均衡,canal server的一个任务叫做一个instance,它定义了需要同步哪个mysql的哪些库表,以及从什么位置或者时间进行同步,instance之间是彼此隔离的,不会互相影响,canal server本身是一个多活的状态,但是一个instance只会在一个canal server上运行,这一部分是通过zookeeper实现了一个分布式锁来实现的,谁先抢占到锁,谁就会执行,其它的canal server的instance就会被阻塞,这样虽然可以保证一个instance在同一个时刻只有一个canal server运行,但是由于是抢占式的,所以可能造成同一个canal server运行了绝大多数的instance,其它canal server却都空闲着,所以要进行instance的负载均衡。
实现也是通过zookeeper来实现的,具体来说是新增了两个持久节点,一个用来记录instance也就是同步任务当前正在运行的canal server信息,以及曾经运行过但是运行失败的canal server信息,实际上记录的是任务应该由那台服务器运行,也就是调度信息,所以这个节点叫做调度节点;另外一个持久化节点这个节点的子节点就是每一个canal server机器节点,这些机器节点上记录了每个运行的instance,实际上就是每个canal server的负载情况,这个节点叫做负载均衡节点。
那么当运行一个新的同步任务的时候,就先去获取所有canal server的负载情况,然后选择负载最少的canal server来运行这个同步任务,将这个选中的canal server的信息写入调度节点,canal server都会去扫描需要运行的同步任务,扫描之后从调度节点获知是否需要自己来运行,只有相互匹配才会运行。停止同步任务的时候,会删除schedule的节点,并更新机器的负载均衡信息。
如果一个canal server挂掉了,那么它上面的任务会自动负载到其它机器去运行,但是目前如果新增一个canal server服务,不会自动触发负载均衡,但是新的同步任务会自动运行到新增的canal server上。
还有一个问题是负载均衡的粒度比较粗,是以instance为粒度的,但实际上每个instance负责的库表数量不一样,每个库表的操作频率也不一样。
instance抢占式执行的具体实现
每个canal server启动之后都会去扫描instance的配置,如果发现了一个instence,会先去zookeeper中创建这个instance的节点,这个是一个持久化节点,这个节点里面还会记录同步的位点信息,另外会创建一个临时节点,里面会记录创建这个临时节点的canal server的ip和端口号,如果有多个canal server扫描到了同一个instance,谁先创建出临时节点并记录自己的信息,谁就执行这个instance,所以这个临时节点记录的就是正在运行这个instance的canal server。而其它的canal server发现临时节点已经创建出来了,并且上面记录的信息不是自己的,就会注册一个watcher,监听这个临时节点的删除事件,然后在这个canal server上的instance会陷入阻塞状态,直到监听到临时节点被删除,会再次尝试去创建临时节点,如果创建上了,自己就读取zookeeper上记录的这个instance的位点信息,并从这个位点继续往后解析。
tdt中的全量/增量导数是通过jdbc去做的,通过jdbc的方式就是先加载驱动然后获取连接,我们的客户现场有很多都有不同类型的数据库,而且还有好几个不同的版本,驱动包可能发生冲突,驱动包就是jar包,也就是java类,java类一旦加载到jvm中想卸载就难了,所以这就需要我们考虑怎么去管理驱动,有两种方式,一个是我们提供,第二种是用户自己提供。我们提供的好处是可以保证驱动包的正确性,因为我们可以经过充分测试再发布出去,坏处是我们只能提供有限的固定版本的驱动,tdt现在已经测试过的大概有7、8种数据库,这些数据库的某些版本我们可以保证不会出错,但是同一个数据库有很多版本,驱动也有很多版本,并不能保证每个版本的驱动都兼容,另外市面上也有各种各样类型的数据库,理论上只要实现了jdbc标准,tdt就应该能够支持对应的导数功能,我们无法把所有数据库的驱动都提供出来。
基于这样的考虑,我们选择了第二种方式,也就是用户自己提供,我们允许用户上传自己的驱动包,并管理驱动,但是用户自己上传的驱动包就无法保证正确性了,我们不知道他是不是随便从网上下载了一个或者用了一个错误版本的驱动,驱动就是一个jar包,如果用通用的方式,也就是URLClassloader加载驱动,由于默认类加载机制,对于工具来说还好,但是对于服务来说一旦加载到内存再想卸载就比较难了,更重要是如果先加载了一个有问题的驱动,由于父类委托机制,之后就算用户上传了一个正确的驱动,也可能不会再加载了,这个时候没有办法,只能重启服务。但是生产环境重启服务影响就太大了。
所以对于这个问题,首要要解决的就是怎么让加载的驱动互不影响。解决方式其实就是类加载隔离,jvm中要判定一个类是不是唯一的其实是要看两个方面,一个是这个类的全限定名是不是一样的,另外一个要看这个类的类加载器是不是一样的,如果全限定名一样但是类加载器不一样,jvm依然会认为这是两个不一样的类,所以要实现类加载隔离只需要让不同的类加载器去加载驱动包就可以了。
我们每次加载驱动的时候都new一个新的Urlclassloader对象,按道理这就是不同的类加载器,一开始我也以为这样就实现类加载隔离了,但其实没这么简单,原因是Urlclassloader默认是父类委托机制,有些类他交给父加载器去加载了,父加载器大部分情况都是appclassloader,都是同一个,如果有些全限定名一样的类已经加载过,那么是不会再次加载的,所以基于UrlClassLoader不太容易做类隔离加载,但是可以看到做类加载隔离的一个重要的思路就是要打破父类委托机制。
所以我们要实现一个自己的类加载器,而且要破坏父类委托机制,优先自己加载,加载不到的时候再使用父类加载器,这个就是类加载隔离的核心机制了,实现上没那么困难,只要继承基础的ClassLoader类,然后重写他的loadClass方法,重写的时候优先自己加载就可以了。
但实际上把类以隔离的方式加载进jvm只完成了工作的一半,因为破坏了父类委托机制,加载的类可能会越来越多,尤其是周期调度类型任务,每调度一次就加载一次,很快元空间就会爆炸,所以实现类加载隔离的时候一定要考虑清楚加载的类什么时候卸载。那类什么时候卸载呢?一是这个类的所有实例都被回收掉了,二是加载这个类的类加载器被回收掉,三是这个类的Class对象已经被回收掉了。后两个条件都很苛刻,类加载器和它加载的每个类的Class对象都是一个相互引用的关系,根据可达性分析,要同时引用不可达时才会被回收掉,这个需要小心设计,我的做法是一个驱动包会绑定一个类加载器,这一个类加载器只负责加载它绑定的驱动,然后会缓存驱动,当驱动从缓存中移除的时候,会让类加载器将加载过的类全都释放,然后把自己置为null,总之是保证没有引用关系,根据上面的理论我又修改了类加载器,保证类加载器使用完之后,一定是可以被回收掉的。
但是到这一步还没结束,我开发完成自测的时候通过jvm参数打印类加载和卸载信息,结果发现根本没有打印类卸载的信息,这个一度让我非常困惑,所以我把运行时候的内存信息dump出来用Jprofile分析这个类加载器的引用链,发现竟然有一个Driver的实例没有被回收,然后一看是在DriverManager这个类中的,但是我们根本没有用DriverManager来获取连接,它是什么时候被保存到DriverManager中的呢?DriverManager的内部有一个copyonwritearraylist,用来保存所有向他注册的Driver对象,既然没有用到DriverManager,那这个对象是什么时候注册进去的呢?我看了mysql、oracle、db2、sql server驱动包的源码,它们都有一个静态代码块,在静态代码块中会new一个自己的实例注册到DriverManager中,就是这个实例导致类加载器卸载不掉。所以我的解决方式就是在清理类加载器的时候用反射获取到DriverManager中的copyonwritearrylist,然后清理掉掉对应的实例。修改完之后我再自测,之后终于打印出类卸载的信息了。
究其原因还是驱动包中的静态代码块,静态代码块中可能会有一些隐秘的引用关系导致类卸载失败。这种引用关系一般也很难发现,所以要经过充分测试才行。总体来说就是一个驱动包隔离加载的功能,涉及到了类加载和卸载的细节,实际上类隔离加载还挺常用的,比如tomcat的加载机制以及jdk9的模块功能都用到了隔离加载的机制。