码迷,mamicode.com
首页 > Windows程序 > 详细

DataPipeline数据融合重磅功能丨一对多实时分发、批量读取模式

时间:2019-08-21 11:28:56      阅读:106      评论:0      收藏:0      [点我收藏+]

标签:binlog   from   指定   针对   int   用户   动态   ima   line   

 

 

技术图片

 

为能更好地服务用户,DataPipeline最新版本支持:

 

1. 一个数据源数据同时分发(实时或定时)到多个目的地;

 

2.  提升Hive的使用场景:

  • 写入Hive目的地时,支持选择任意目标表字段作为分区字段;

     

  • 可将Hive作为数据源定时分发到多个目的地。

     

3. 定时同步关系型数据库数据时,可自定义读取策略来满足各个表的同步增量需求。

 

本篇将首先介绍一下一对多数据分发及批量读取模式2.0的功能,后续功能会在官微陆续发布。

 

一、推出「一对多数据分发」的背景

技术图片

 

 

在历史版本中,DataPipeline每个任务只允许有一个数据源和目的地,从数据源读取的数据只允许写入到一张目标表。这会导致无法完美地支持客户的两个需求场景:


需求场景一:

客户从一个API数据源或者从KafkaTopic获取JSON数据后,通过高级清洗解析写入到目的地多个表或者多个数据库中,但历史版本无法同时写入到多个目的地,只能创建多个任务。这会导致数据源端会重复获取同一批数据(而且无法完全保证数据一致性),浪费资源,并且无法统一管理。


需求场景二:

客户希望创建一个数据任务,并从一个关系型数据库表实时(或定时)分发到多个数据目的地。在历史版本中,用户需要创建多个任务来解决,但创建多个任务执行该需求时会重复读取数据源同一张表的数据,比较浪费资源。客户更希望只读取一次便可直接解析为多个表,完成该需求场景。


新功能解决的问题:

1. 用户在一个数据任务中选择一个数据源后,允许选择多个目的地或者多个表作为写入对象,而不需要创建多个任务来实现该需求。
2. 用户在单个任务中针对每个目的地的类型和特性,可以单独设置各个目的地表结构和写入策略,大大减少了数据源读取次数和管理成本。

 

二、扩展Hive相关使用场景

历史版本中,DataPipeline支持各类型数据源数据同步到Hive目的地的需求场景。但由于每个客户的Hive使用方式、数据存储方式不同,两个需求场景在历史版本中并没有得到支持。


需求场景一:

动态分区字段。历史版本中,用户只允许选择时间类型字段作为分区字段。在真实的客户场景中除了按照时间做分区策略外,客户希望指定Hive表任意字段作为分区字段。


需求场景二:

客户希望除了以Hive作为目的地,定时写入数据到Hive外,客户还希望使用DataPipeline可以定时分发Hive表数据到各个应用系统,解决业务需求。

 

新功能解决的问题:


1. 允许用户指定目的地Hive表中任何字段作为分区字段,并支持选择多个分区字段。

2. 新增Hive数据源,可作为数据任务读取对象。

 

三、推出「批量读取模式2.0功能」的背景

技术图片

 

需求场景:

 

关系型数据库(以MySQL为例)的表没有权限读取BINLOG,但在业务上客户需要定期同步增量数据,在权限只有SELECT情况下,需要做到增量数据的同步任务。
新版本出现之前,DataPipeline在用户选择批量读取模式时提供了增量识别字段的功能,可以选择自增字段或者更新时间字段作为条件完成增量数据的同步,但部分表可能没有这种类型的字段,或者增量同步的逻辑不通(比如:只同步过去1小时的数据,或只同步到5分钟前的数据等)。

 

新功能解决的问题:

 

1. 在关系型数据库作为数据源的情况下,允许用户针对每一个表设置WHERE读取条件,并提供lastmax方法。


2. 使用该函数DataPipeline会取该任务下已同步数据中某一个字段的最大值,用户可以使用该值作为WHERE语句读取条件。


3. 用户使用last_max()函数,在首次执行该语句或对应字段暂无数值时,则会忽略该函数相关的读取条件。


4. 允许用户结合其他数据库提供的方法编辑读取条件:


例:以时间字段作为读取条件,每次只同步一小时前的数据,且只同步未曾读取的数据。

SELECT * FROMtable1 WHEREupdate_time > ‘last_max(update_time)‘ ANDupdate_time<= DATE_SUB(NOW(), INTERVAL 24 HOUR) 

 

相较于之前的一对一设置,新版本上线后用户可以通过批量设置增量识别字段、批量移除、批量修改表名称等,批量地操作一些表,批量地做一些动态修改,减少用户配置成本。例如,现在需要在200张表的名称后面都加一个data_warehouse,不同于以往的逐一添加,现在可以批量添加这些前缀后缀。

 DataPipeline每一次版本的迭代都凝聚了团队对企业数据使用需求的深入思索,其它新功能还在路上,很快就会跟大家见面了,希望能够切实帮助大家更敏捷高效地获取数据。

 

 

 

 

DataPipeline数据融合重磅功能丨一对多实时分发、批量读取模式

标签:binlog   from   指定   针对   int   用户   动态   ima   line   

原文地址:https://www.cnblogs.com/DataPipeline2018/p/11387413.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!