企业正在经历数据资产的爆炸式增长,这些数据包括结构化、半结构化以及非结构化数据。企业在构建数据仓库的链路中对于数据导入需求也愈加复杂,既有批量导入的需求也有流式的导入需求。新一代云原生实时数仓 SelectDB Cloud 作为一款运行于多云之上的云原生实时数据仓库,致力于通过开箱即用的能力为客户带来简单快速的数仓体验。在生态方面,SelectDB Cloud 提供了丰富的数据连接器插件(Connector)来连接各种来自周边大数据工具的数据源,内置 Kafka、Flink、Spark、DataX 等常见的 Connector。基于此,企业开发者能够更加便捷的将数据移动到 SelectDB Cloud 上,并利用 SelectDB Cloud 从数据资产中获取更高的价值。
在如今复杂的业务场景之下,企业用户的数据既可能在实时消息流中,也可能在离线文件;既可能是结构化的数据,也有可能是非结构的数据,为满足多样化的数据导入需求,SelectDB Cloud 在实时数据流接入方面实现了 Kafka SelectDB Connector和 Flink SelectDB Connector,在离线数据接入部分实现了 Spark SelectDB Connector和 SelectDB DataX Writer,通过多样化的数据工具支持保障了用户多源异构数据能够简单快速高效的导入。
在生态工具的设计过程中,SelectDB Cloud 始终秉持着简单易用和安全可靠的原则。例如,在 Kafka、Flink 的实时数据接入过程中,SelectDB Cloud 提供了两阶段事务保障了端到端的数据准确一致性,同时如果数据传输过程中发生了服务器宕机也能支持断点续跑;在离线的 Spark 部分,SelectDB Cloud 能够始终确保反馈给用户的是一个准确的状态,当用户发现数据导入失败时可以通过重试等手段保证最终数据接入的一致性。通过一系列的设计,SelectDB Cloud 能够在数据正确性上为客户提供可靠的保障。
目前,SelectDB Cloud 主要通过copy into来进行数据导入,用户可以将数据源存储到外部暂存区(external stage)或者内部暂存区(internal stage)后,通过执行copy into语句将数据导入到SelectDB Cloud中。暂存区是 SelectDB Cloud 对公有云对象存储的一层封装,外部暂存区和内部暂存区的区别在于是使用用户提供的对象存储 bucket,还是使用 SelectDB Cloud 的私有bucket。
用户可以在 copy into 语句中对stage中的数据进行列映射、函数转换和处理、指定导入的文件名以及错误处理机制等。例如下面的SQL就是把外部暂存区中的1.csv文件的第一列、第二列的前两个字符和第三列导入到test_table中
COPY INTO test_table FROM (SELECT $1, substring($2, 2), $3 FROM @ext_stage('1.csv'))
为了便于用户使用 API 进行调用, 还提供了 HTTP 的接口进行数据的上传和 copy into 的调用。本文介绍的生态工具,主要是通过HTTP API接口来进行数据导入。
►►►
Kafka SelectDB Connector
基本介绍
SelectDB Cloud 提供了 Sink Connector 插件,可以将 Kafka Topic 中的 JSON 数据保存到 SelectDB Cloud 数据库中。
在业务场景中,通常会通过 Debezium Connector 将数据库的变更数据实时写入 Kafka;或者调用API往 Kafka 中推送 JSON 格式数据,然后使用SelectDB Kafka Connector即可将这些数据同步到 SelectDB Cloud 中。
效果及收益
整体来看,Kafka SelectDB Connector 打通了从 Kafka 直接导入数据至 SelectDB Cloud 的数据链路,降低了通过 Flink 作为中间数据同步组件的链路复杂度;通过 Exactly-Once 实现数据的一次性精确导入,确保了数据的准确性;通过以 Kafka 集群作为载体,在超高频的数据导入场景中,性能表现非常优秀。
►►►
Spark SelectDB Connector
基本介绍
Spark SelectDB Connector 作为 SelectDB Cloud 上大数据量的导入方式之一,可以利用 Spark天然的分布式计算优势将数据导入到 SelectDB Cloud 中。具体来讲,Spark SelectDB Connector 支持将其他数据源(PostgreSQL, HDFS, S3等)的数据通过 Spark 计算引擎后同步到 SelectDB Cloud 的数据表中。
利用 Spark SelectDB Connector,开发者能够使用 Spark 将上游数据源读取到 DataFrame 中,然后使用 Spark SelectDB Connector 将大规模数据导入到SelectDB Cloud 数据仓库的表中;同时,开发者可以使用 Spark 的 JDBC 的方式来读取 SelectDB Cloud 表中的数据。
在整个架构中,通常 Spark SelectDB Connector 作为外部数据写入到 SelectDB Cloud 的桥梁,以其分布式、高效的特性提升了整个数据链路的吞吐。
Spark SelectDB Connector 底层实现依赖于 SelectDB Cloud 的两种导入方式,当前支持两种导入方式:
通过创建对象存储上的 stage 来进行批量数据拉取导入,这个主要适合大批量数据导入,使用前提是用户有自己的对象存储及其相关密钥。
基于HTTP的推送导入,这个主要适合小批量推送,使用较简单。
效果及收益
当有以下几种场景需求的情况可以使用这种连接器:
以 Spark 为计算引擎构建的技术架构体系,减少其他组件引入的成本;
大规模数据 ETL 离线写入 SelectDB Cloud,利用 Spark 分布式计算的特性,提升数据处理吞吐;
Spark SelectDB Connector 以 Spark 这个大数据计算的优秀组件作为核心,实现了利用 Spark 将外部数据源的大数据量同步到 SelectDB Cloud,便于我们实现大批量数据的快速同步,继而利用 SelectDB Cloud 为基石构建新一代的云原生数据仓库,结合 SelectDB Cloud 强大的分析计算性能,能够为企业带来业务便捷性以及增效降本的目标。
►►►
Flink SelectDB Connector
基本介绍
在实时计算中,通过 Flink 可以将业务数据库(MySQL、SQL Server、Oracle)或Kafka消息队列等上游数据作为Source读取出来,经过 Flink SQL 或者 Data Stream 加工计算,最后将清洗后的数据写入 SelectDB Cloud。
Flink SelectDB Connector作为Sink Connector,通过HTTP推送导入的方式将数据加载到 SelectDB Cloud 的表中。
效果及收益
SelectDB Cloud 结合 Flink 流式计算,可以让用户将 Kafka 中的非结构化数据以及 MySQL 等上游业务库中的变更数据,实时同步到 SelectDB Cloud中,同时 SelectDB Cloud 提供亚秒级分析查询的能力,可以有效地满足实时 OLAP、实时数据看板以及实时数据服务等场景的需求。
►►►
SelectDB DataX Writer
基本介绍
SelectDB Datax Writer 插件实现了写入数据到 SelectDB Cloud 目的表的功能。在底层实现上, SelectDB Datax Writer 使用SelectDB HTTP将数据导入库表。
效果及收益
SelectDB Datax Writer借助 Datax 框架的优势,支持各种数据源的数据同步,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、HBase 等各种异构数据源之间稳定高效的数据同步功能,主要用于离线同步,该工具具有以下优势:
配置简单,操作容易,依赖少
##job配置
{
"job":{
"content":[
{
"reader":{
"name":"mysqlreader",
"parameter":{
"column":[
"id",
"order_code",
"line_code",
"remark",
"unit_no",
"unit_name",
"price"
],
"connection":[
{
"jdbcUrl":[
"jdbc:mysql://localhost:3306/demo"
],
"table":[
"employees_1"
]
}
],
"username":"root",
"password":"xxxxx",
"where":""
}
},
"writer":{
"name":"selectdbwriter",
"parameter":{
"loadUrl":[
"xxx:47150"
],
"loadProps":{
"file.type":"json",
"file.strip_outer_array":"true"
},
"column":[
"id",
"order_code",
"line_code",
"remark",
"unit_no",
"unit_name",
"price"
],
"username":"admin",
"password":"SelectDB2022",
"postSql":[
],
"preSql":[
],
"connection":[
{
"jdbcUrl":"jdbc:mysql://xxx:34142/cl_test",
"table":[
"ods_pos_pro_table_dynamic_delta_v4"
],
"selectedDatabase":"cl_test"
}
],
"maxBatchRows":1000000,
"maxBatchByteSize":536870912000
}
}
}
],
"setting":{
"errorLimit":{
"percentage":0.02,
"record":0
},
"speed":{
"channel":5
}
}
}
}
#启动任务
python bin/datax.py job/xx.json
灵活便捷,支持用户随意调整作业速度
配置全局Byte限速以及单Channel Byte限速,Channel个数 = 全局Byte限速 / 单Channel Byte限速配置全局Record限速以及单Channel Record限速,Channel个数 = 全局Record限速 / 单Channel Record限速
直接配置Channel个数
配置含义:
job.setting.speed.channel : channel并发数
job.setting.speed.record : 全局配置channel的record限速
job.setting.speed.byte:全局配置channel的byte限速
core.transport.channel.speed.record:单channel的record限速
core.transport.channel.speed.byte:单channel的byte限速
安全可靠,用户可实时监控同步的进度、速度、错误情况、传输流量、CPU 状况等指标。
在未来,SelectDB 将会在与丰富的 BI 及数据开发工具的集成上持续投入,例如 Dataworks,QuickBI 等,为用户打造零迁移的使用体验,从而降低整体大数据研发使用成本。
同时 SelectDB 也会根据用户使用的技术栈,持续完善 SelectDB Cloud 的生态,为用户提供更多方便易用的各种生态开发工具。
SelectDB,即北京飞轮数据科技有限公司,是一家专注于开源技术创新的云原生实时数据仓库厂商,成立于2022年1月,公司总部位于北京。2022年4月,SelectDB 完成天使轮和天使+轮融资,由 IDG 资本、红杉中国等顶级 VC 投资,融资金额超3亿元人民
币。SelectDB 创始团队由原百度智能云初创人员和 Apache Doris 项目核心成员组成,在云计算、大数据、人工智能方面具有丰富经验。作为依托于开源技术的科技创新公司,SelectDB 将开源作为长期核心战略,不断更新迭代开源版本与企业级商业版本。