前言

企业正在经历数据资产的爆炸式增长,这些数据包括结构化、半结构化以及非结构化数据。企业在构建数据仓库的链路中对于数据导入需求也愈加复杂,既有批量导入的需求也有流式的导入需求。新一代云原生实时数仓 SelectDB Cloud 作为一款运行于多云之上的云原生实时数据仓库,致力于通过开箱即用的能力为客户带来简单快速的数仓体验。在生态方面,SelectDB Cloud 提供了丰富的数据连接器插件(Connector)来连接各种来自周边大数据工具的数据源,内置 Kafka、Flink、Spark、DataX 等常见的 Connector。基于此,企业开发者能够更加便捷的将数据移动到 SelectDB Cloud 上,并利用 SelectDB Cloud 从数据资产中获取更高的价值。

SelectDB Cloud 数据导入机制

在如今复杂的业务场景之下,企业用户的数据既可能在实时消息流中,也可能在离线文件;既可能是结构化的数据,也有可能是非结构的数据,为满足多样化的数据导入需求,SelectDB Cloud 在实时数据流接入方面实现了 Kafka SelectDB Connector和 Flink SelectDB Connector,在离线数据接入部分实现了 Spark SelectDB Connector和 SelectDB DataX Writer,通过多样化的数据工具支持保障了用户多源异构数据能够简单快速高效的导入。

在生态工具的设计过程中,SelectDB Cloud 始终秉持着简单易用和安全可靠的原则。例如,在 Kafka、Flink 的实时数据接入过程中,SelectDB Cloud 提供了两阶段事务保障了端到端的数据准确一致性,同时如果数据传输过程中发生了服务器宕机也能支持断点续跑;在离线的 Spark 部分,SelectDB Cloud 能够始终确保反馈给用户的是一个准确的状态,当用户发现数据导入失败时可以通过重试等手段保证最终数据接入的一致性。通过一系列的设计,SelectDB Cloud 能够在数据正确性上为客户提供可靠的保障。

目前,SelectDB Cloud 主要通过copy into来进行数据导入,用户可以将数据源存储到外部暂存区(external stage)或者内部暂存区(internal stage)后,通过执行copy into语句将数据导入到SelectDB Cloud中。暂存区是 SelectDB Cloud 对公有云对象存储的一层封装,外部暂存区和内部暂存区的区别在于是使用用户提供的对象存储 bucket,还是使用 SelectDB Cloud 的私有bucket。

用户可以在 copy into 语句中对stage中的数据进行列映射、函数转换和处理、指定导入的文件名以及错误处理机制等。例如下面的SQL就是把外部暂存区中的1.csv文件的第一列、第二列的前两个字符和第三列导入到test_table中


COPY INTO test_table FROM (SELECT $1, substring($2, 2), $3 FROM @ext_stage('1.csv'))

为了便于用户使用 API 进行调用, 还提供了 HTTP 的接口进行数据的上传和 copy into 的调用。本文介绍的生态工具,主要是通过HTTP API接口来进行数据导入。

SelectDB Cloud 生态工具介绍

►►►

Kafka SelectDB Connector

基本介绍

SelectDB Cloud 采用了云原生的产品架构,存储和计算由两个松耦合、独立可扩展的服务来处理,实现了存算分离和弹性扩缩容。
Kafka Connect 是一款可扩展、可靠的在 Apache Kafka 和其他系统之间进行数据传输的工具。可以定义 Connectors 来将大量数据迁入迁出 Kafka。Kafka SelectDB Connector 运行在 Kafka Connect 集群中,支持从 Kafka Topic 中读取数据,并将数据写入 SelectDB Cloud 表中。

SelectDB Cloud 提供了 Sink Connector 插件,可以将 Kafka Topic 中的 JSON 数据保存到 SelectDB Cloud 数据库中。

在业务场景中,通常会通过 Debezium Connector 将数据库的变更数据实时写入 Kafka;或者调用API往 Kafka 中推送 JSON 格式数据,然后使用SelectDB Kafka Connector即可将这些数据同步到 SelectDB Cloud 中。

效果及收益

在调研的使用场景中,使用 Kafka 同步上游 JSON 数据。这里数据维持以每秒 10w 条的超高频导入,在 8c16g 的机器上,仅部署单节点 Kafka 集群,同时在 topic 中配置 20 个 partition,以 distributed 模式启动 connect。在实际处理过程中,topic 中的总体消息平均积压在 120w 条左右,单个 partition 积压 6w 条消息,表现相当优秀。

整体来看,Kafka SelectDB Connector 打通了从 Kafka 直接导入数据至 SelectDB Cloud 的数据链路,降低了通过 Flink 作为中间数据同步组件的链路复杂度;通过 Exactly-Once 实现数据的一次性精确导入,确保了数据的准确性;通过以 Kafka 集群作为载体,在超高频的数据导入场景中,性能表现非常优秀

►►►

Spark SelectDB Connector

基本介绍

Spark SelectDB Connector 作为 SelectDB Cloud 上大数据量的导入方式之一,可以利用 Spark天然的分布式计算优势将数据导入到 SelectDB Cloud 中。具体来讲,Spark SelectDB Connector 支持将其他数据源(PostgreSQL, HDFS, S3等)的数据通过 Spark 计算引擎后同步到 SelectDB Cloud 的数据表中。

利用 Spark SelectDB Connector,开发者能够使用 Spark 将上游数据源读取到 DataFrame 中,然后使用 Spark SelectDB Connector 将大规模数据导入到SelectDB Cloud 数据仓库的表中;同时,开发者可以使用 Spark 的 JDBC 的方式来读取 SelectDB Cloud 表中的数据。

在整个架构中,通常 Spark SelectDB Connector 作为外部数据写入到 SelectDB Cloud 的桥梁,以其分布式、高效的特性提升了整个数据链路的吞吐。

Spark SelectDB Connector 底层实现依赖于 SelectDB Cloud 的两种导入方式,当前支持两种导入方式:

  • 通过创建对象存储上的 stage 来进行批量数据拉取导入,这个主要适合大批量数据导入,使用前提是用户有自己的对象存储及其相关密钥。

  • 基于HTTP的推送导入,这个主要适合小批量推送,使用较简单。

效果及收益

当有以下几种场景需求的情况可以使用这种连接器:

  • 以 Spark 为计算引擎构建的技术架构体系,减少其他组件引入的成本;

  • 大规模数据 ETL 离线写入 SelectDB Cloud,利用 Spark 分布式计算的特性,提升数据处理吞吐;

Spark SelectDB Connector 以 Spark 这个大数据计算的优秀组件作为核心,实现了利用 Spark 将外部数据源的大数据量同步到 SelectDB Cloud,便于我们实现大批量数据的快速同步,继而利用 SelectDB Cloud 为基石构建新一代的云原生数据仓库,结合 SelectDB Cloud 强大的分析计算性能,能够为企业带来业务便捷性以及增效降本的目标。

►►►

Flink SelectDB Connector

基本介绍

在实时计算中,通过 Flink 可以将业务数据库(MySQL、SQL Server、Oracle)或Kafka消息队列等上游数据作为Source读取出来,经过 Flink SQL 或者 Data Stream 加工计算,最后将清洗后的数据写入 SelectDB Cloud。

Flink SelectDB Connector作为Sink Connector,通过HTTP推送导入的方式将数据加载到 SelectDB Cloud 的表中。

效果及收益

SelectDB Cloud 结合 Flink 流式计算,可以让用户将 Kafka 中的非结构化数据以及 MySQL 等上游业务库中的变更数据,实时同步到 SelectDB Cloud中,同时 SelectDB Cloud 提供亚秒级分析查询的能力,可以有效地满足实时 OLAP、实时数据看板以及实时数据服务等场景的需求。

►►►

SelectDB DataX Writer

基本介绍

SelectDB Datax Writer 插件实现了写入数据到 SelectDB Cloud 目的表的功能。在底层实现上, SelectDB Datax Writer 使用SelectDB HTTP将数据导入库表。

效果及收益

SelectDB Datax Writer借助 Datax 框架的优势,支持各种数据源的数据同步,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、HBase 等各种异构数据源之间稳定高效的数据同步功能,主要用于离线同步,该工具具有以下优势:

  • 配置简单,操作容易,依赖少




























































































##job配置{    "job":{        "content":[            {                "reader":{                    "name":"mysqlreader",                    "parameter":{                        "column":[                            "id",                            "order_code",                            "line_code",                            "remark",                            "unit_no",                            "unit_name",                            "price"                        ],                        "connection":[                            {                                "jdbcUrl":[                                    "jdbc:mysql://localhost:3306/demo"                                ],                                "table":[                                    "employees_1"                                ]                            }                        ],                        "username":"root",                        "password":"xxxxx",                        "where":""                    }                },                "writer":{                    "name":"selectdbwriter",                    "parameter":{                        "loadUrl":[                            "xxx:47150"                        ],                        "loadProps":{                            "file.type":"json",                            "file.strip_outer_array":"true"                        },                        "column":[                            "id",                            "order_code",                            "line_code",                            "remark",                            "unit_no",                            "unit_name",                            "price"                        ],                        "username":"admin",                        "password":"SelectDB2022",                        "postSql":[
], "preSql":[
], "connection":[ { "jdbcUrl":"jdbc:mysql://xxx:34142/cl_test", "table":[ "ods_pos_pro_table_dynamic_delta_v4" ], "selectedDatabase":"cl_test" } ], "maxBatchRows":1000000, "maxBatchByteSize":536870912000 } } } ], "setting":{ "errorLimit":{ "percentage":0.02, "record":0 }, "speed":{ "channel":5 } } }}

#启动任务
python bin/datax.py job/xx.json
灵活便捷,支持用户随意调整作业速度











配置全局Byte限速以及单Channel Byte限速,Channel个数 = 全局Byte限速 / 单Channel Byte限速配置全局Record限速以及单Channel Record限速,Channel个数 = 全局Record限速 / 单Channel Record限速直接配置Channel个数 配置含义:job.setting.speed.channel : channel并发数job.setting.speed.record : 全局配置channel的record限速job.setting.speed.byte:全局配置channel的byte限速
core.transport.channel.speed.record:单channel的record限速core.transport.channel.speed.byte:单channel的byte限速
  • 安全可靠,用户可实时监控同步的进度、速度、错误情况、传输流量、CPU 状况等指标。

未来规划

在未来,SelectDB 将会在与丰富的 BI 及数据开发工具的集成上持续投入,例如 Dataworks,QuickBI 等,为用户打造零迁移的使用体验,从而降低整体大数据研发使用成本。

同时 SelectDB 也会根据用户使用的技术栈,持续完善 SelectDB Cloud 的生态,为用户提供更多方便易用的各种生态开发工具。


SelectDB2022120224SelectDB 使使 IDG VC 3亿

SelectDB Apache Doris SelectDB

举报/反馈

SelectDB

2获赞 49粉丝
基于 Apache Doris 的云原生实时数据仓库
关注
0
0
收藏
分享