JDBC Catalog 开发指南
1. 概述
Apache Doris 的 JdbcCatalog 通过 JDBC 协议为 Doris 提供对外部数据库的访问能力。
本文档详细介绍如何为 JdbcCatalog 新增一个数据源类型的支持,以新增 NewDB 数据源为例。
该文档适用于 3.0 以后的版本。
2. 架构概览
2.1 整体架构
JdbcCatalog 采用 Frontend (FE) 和 Backend (BE) 分离的架构:
- Frontend (FE): 负责元数据管理、SQL 解析、查询规划和优化。
- Backend (BE): 负责数据扫描、类型转换和执行。
2.2 核心组件
Frontend 核心组件
JdbcResource
: 定义 JDBC 连接资源和参数。JdbcExternalCatalog
: 管理整个 Catalog,创建和管理 JdbcClient。JdbcExternalDatabase
: 管理数据库级别的元数据。JdbcExternalTable
: 管理表级别的元数据和 Schema。JdbcClient
: 抽象基类,处理元数据操作(获取数据库列表、表列表、列信息等)。JdbcScanNode
: 查询计划中的扫描节点,负责生成查询 SQL。JdbcTableSink
: 写入计划中的 Sink 节点。
Backend 核心组件
vjdbc_connector.cpp
: C++ 连接器,通过 JNI 调用 Java 执行器。(开发者无需修改此文件)BaseJdbcExecutor
: Java 执行器抽象基类,处理数据读写操作。JdbcExecutorFactory
: 工厂类,根据数据源类型创建对应的执行器。
3. 新增数据源开发步骤
Step 1: 更新 Thrift 定义
-
修改 Thrift 文件
在
gensrc/thrift//Types.thrift
文件中,为TOdbcTableType
枚举添加新类型: -
生成代码
在 Doris 根目录下执行脚本,使新的枚举值在 Java 和 C++ 代码中生效:
Step 2: 在 Frontend 定义核心元数据
-
修改
JdbcResource.java
添加 NewDB 的 URL 前缀和类型名称的常量,并在
parseDbType
方法中加入识别逻辑。 -
修改
JdbcTable.java
将 NewDB 类型字符串映射到 Thrift 枚举,并定义其 SQL 标识符的引用方式。
Step 3:实现 Frontend 的元数据客户端 (JdbcClient)
元数据交互核心逻辑
这一步是与外部数据源进行元数据交互的核心。您需要在 doris/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/
目录下创建 JdbcNewDBClient.java
文件,并重写以下关键方法:
// doris/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcNewDBClient.java
public class JdbcNewDBClient extends JdbcClient {
public JdbcNewDBClient(JdbcClientConfig jdbcClientConfig) {
super(jdbcClientConfig);
}
/**
* [必须重写] 获取数据库(或 Schema)列表。
*
* @return 数据库名称列表。
* @purpose 这是 `SHOW DATABASES` 命令的底层实现。
* @implementation
* 1. 从连接池获取一个 JDBC Connection。
* 2. 使用 `connection.getMetaData().getCatalogs()` 或 `getSchemas()` 获取列表。
* 3. 遍历 ResultSet,将数据库名添加到 List<String> 中。
* 4. 调用 `filterDatabaseNames()` 过滤掉系统库和不符合规则的库。
*/
@Override
public List<String> getDatabaseNameList() {
// 实现获取 NewDB 所有数据库名称的逻辑
}
/**
* 这是 `SHOW TABLES` 命令的底层实现。
*
* @param remoteDbName 数据库名。
* @return 表名列表。
*/
@Override
public List<String> getTablesNameList(String remoteDbName) {
// 通常不需要直接重写此方法,而是重写其内部调用的 processTable()
return super.getTablesNameList(remoteDbName);
}
/**
* [必须重写] 获取表的元数据。
*
* @purpose 供 getTablesNameList() 和 isTableExist() 等方法调用,
* 通过 `DatabaseMetaData.getTables()` 执行元数据查找。
* @implementation
* 1. 获取 JDBC Connection 和 DatabaseMetaData。
* 2. 调用 `databaseMetaData.getTables(catalog, schemaPattern, tableNamePattern, types)`。
* 3. 关键在于为 `catalog` 和 `schemaPattern` 参数传递正确的值,这取决于 NewDB 的 JDBC 驱动实现。
* - 如果 NewDB 使用 Catalog,catalog 传 `remoteDbName`,schemaPattern 传 `null`。
* - 如果 NewDB 使用 Schema,catalog 传 `null`,schemaPattern 传 `remoteDbName`。
*/
@Override
protected void processTable(String remoteDbName, String remoteTableName, String[] tableTypes,
Consumer<ResultSet> resultSetConsumer) {
// 实现调用 DatabaseMetaData.getTables() 的逻辑
}
/**
* [必须重写] **核心方法** - 定义从 NewDB 类型到 Doris 类型的映射。
*
* @param fieldSchema 包含了从 JDBC Driver 获取到的列信息,如类型名 (getDataTypeName)、
* 精度 (getColumnSize)、标度 (getDecimalDigits) 等。
* @return 对应的 Doris `Type`。
*/
@Override
public Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) {
// 根据 fieldSchema 中的信息,返回 Doris 的 Type
// 例如:
// String newdbType = fieldSchema.getDataTypeName().toUpperCase();
// switch (newdbType) {
// case "VARCHAR": return ScalarType.createStringType();
// case "INTEGER": return Type.INT;
// case "DECIMAL":
// return ScalarType.createDecimalV3Type(fieldSchema.getColumnSize(), fieldSchema.getDecimalDigits());
// // ... 其他类型映射
// default: return Type.UNSUPPORTED;
// }
}
/**
* [建议重写] 定义需要从 `getDatabaseNameList` 结果中过滤掉的系统库。
*/
@Override
protected Set<String> getFilterInternalDatabases() {
// return ImmutableSet.of("information_schema", "sys", "performance_schema", "newdb_system_db");
}
/**
* [建议重写] 定义测试连接时使用的查询语句。
* 默认是 "SELECT 1",但某些数据库(如 Oracle)需要 "SELECT 1 FROM DUAL"。
*/
@Override
public String getTestQuery() {
// return "SELECT 1"; // 或者 NewDB 支持的简单查询
}
/**
* [可选重写] 定义在调用 `DatabaseMetaData` 相关方法时,catalog 参数应为何值。
* 默认返回 `connection.getCatalog()`。如果 NewDB 不支持 Catalog,应返回 `null`。
*/
@Override
protected String getCatalogName(Connection conn) throws SQLException {
// return conn.getCatalog(); // 或 return null;
}
/**
* [可选重写] 获取列定义。
* 默认实现通常够用,但对于某些数据库(如 MySQL),可能需要执行一个 SHOW 或 SELECT 查询
* 来获取更精确的类型信息(例如区分 `TINYINT(1)` 和 `TINYINT`)。
*/
@Override
public List<JdbcFieldSchema> getJdbcColumnsInfo(String remoteDbName, String remoteTableName) {
return super.getJdbcColumnsInfo(remoteDbName, remoteTableName);
}
}
在工厂类中注册 JdbcNewDBClient
修改 doris/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java
:
Step 4:适配 Frontend 的查询计划 (JdbcScanNode)
在 getJdbcQueryStr()
方法中,为 NewDB 添加特定的 LIMIT
子句生成逻辑。
// doris/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
private String getJdbcQueryStr() {
// ...
// MSSQL use select top to do top n
if (shouldPushDownLimit() && jdbcType == TOdbcTableType.SQLSERVER) {
sql.append("TOP " + limit + " ");
}
// ...
sql.append(Joiner.on(", ").join(columns));
sql.append(" FROM ").append(tableName);
// ...
// Other DataBase use limit do top n
if (shouldPushDownLimit()
&& (jdbcType == TOdbcTableType.MYSQL
|| jdbcType == TOdbcTableType.POSTGRESQL
/* 其他使用标准 LIMIT 的数据库 */
// 如果 NewDB 也是标准 LIMIT,在此添加
)) {
sql.append(" LIMIT ").append(limit);
}
if (shouldPushDownLimit() && jdbcType == TOdbcTableType.NEWDB) {
// 示例:如果 NewDB 有特殊的 LIMIT 语法,例如 'FETCH FIRST n ROWS ONLY'
// 在这里独立处理
sql.append(" FETCH FIRST ").append(limit).append(" ROWS ONLY");
}
return sql.toString();
}
Step 5:实现 Backend 的数据执行器 (JdbcExecutor)
数据读取核心逻辑
这一步是在 BE 端通过 JNI 实际执行 JDBC 数据读写的核心。您需要在 doris/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/
目录下创建 NewDBJdbcExecutor.java
文件,并重写以下关键方法:
// doris/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/NewDBJdbcExecutor.java
public class NewDBJdbcExecutor extends BaseJdbcExecutor {
public NewDBJdbcExecutor(byte[] thriftParams) throws Exception {
super(thriftParams);
}
/**
* [按需重写] 初始化用于存放一批数据的 Java 端缓存。
*
* @purpose 这是一个关键的性能优化。它在开始迭代 ResultSet 之前,为整个批次的数据预先分配好内存。
* 这避免了在循环中重复创建对象,从而显著提升了数据读取性能。
* @implementation
* 1. 遍历每一列。
* 2. 对于大多数标准类型,最佳实践是调用 `outputTable.getColumn(i).newObjectContainerArray(batchSizeNum)`。
* 这将根据 Doris 的目标类型创建一个类型安全的数组容器(如 Integer[], BigDecimal[], Long[] 等)。
* 3. 对于需要特殊处理的类型(例如,一个由 JDBC Driver 返回二进制对象,但我们只想按 String 处理的类型),
* 可以分配一个更通用的容器,如`Object[]`。
*/
@Override
protected void initializeBlock(int columnCount, String[] replaceStringList, int batchSizeNum,
VectorTable outputTable) {
for (int i = 0; i < columnCount; ++i) {
if (outputTable.getColumnType(i).getType() == Type.STRING) {
block.add(new Object[batchSizeNum]);
} else {
block.add(outputTable.getColumn(i).newObjectContainerArray(batchSizeNum));
}
}
}
/**
* [必须重写] **核心方法** - 从 JDBC ResultSet 中获取单列数据。
*
* @param columnIndex 列的索引(从 0 开始)。
* @param type Doris 端期望的列类型。
* @return 从 ResultSet 中获取并转换好的 Java 对象。
* @implementation
* 1. 使用 `resultSet.getObject(columnIndex + 1, TargetClass.class)` 获取值,这是最安全的方式。
* 2. `TargetClass.class` 应与 Doris 的 `type` 相匹配,例如 `Type.INT` 对应 `Integer.class`。
* 3. 必须通过 `resultSet.wasNull()` 正确处理 `NULL` 值。
*/
@Override
protected Object getColumnValue(int columnIndex, ColumnType type, String[] replaceStringList) throws SQLException {
switch (type.getType()) {
case INT:
return resultSet.getObject(columnIndex + 1, Integer.class);
case DATETIME:
return resultSet.getObject(columnIndex + 1, LocalDateTime.class);
// ... 其他类型
default:
return resultSet.getObject(columnIndex + 1);
}
}
/**
* [必须重写] 为特定类型提供输出转换器。
*
* @purpose 对 `getColumnValue` 获取到的值进行最终处理,然后再传递给 C++ 层。
* 常用于需要特定格式化的场景。
* @implementation
* 一个常见的例子是 `java.sql.Time` 类型。为了保留其微秒精度,`getColumnValue`
* 获取 `Time` 对象,而转换器则负责将其格式化为 Doris 需要的 `HH:mm:ss.SSSSSS` 字符串。
* 另一个例子是将 `byte[]` 类型转换为十六进制显示的字符串。
*/
@Override
protected ColumnValueConverter getOutputConverter(ColumnType columnType, String replaceString) {
if (columnType.getType() == ColumnType.Type.STRING) {
// 示例:MySQL Executor 中对 Time 类型的处理
return createConverter(input -> {
if (input instanceof java.sql.Time) {
return timeToString((java.sql.Time) input); // timeToString 是 BaseJdbcExecutor 中的一个方法
}
return input.toString();
}, String.class);
}
return null; // 其他类型通常不需要转换
}
/**
* [可选重写] 初始化 PreparedStatement。
*
* @purpose 如果需要对某些写入读取的 size 和方式优化
* @implementation
* 1. 必须为读操作(READ)和写操作(WRITE)分别处理。
* 2. 对于读操作,通过 `stmt.setFetchSize()` 控制 JDBC Driver 从数据库一次拉取多少行数据。
* 这可以避免 OOM。不同的 Driver 对此参数的行为不同(例如,MySQL 设置为 `Integer.MIN_VALUE` 会开启流式读取)。
*/
@Override
protected void initializeStatement(Connection conn, JdbcDataSourceConfig config, String sql) throws SQLException {
// if (config.getOp() == TJdbcOperation.READ) {
// stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
// stmt.setFetchSize(1024); // 为 NewDB 设置合适的 fetch size
// } else {
// preparedStatement = conn.prepareStatement(sql);
// }
}
/**
* [可选重写] 在查询取消时中断 JDBC 连接。
*
* @purpose 如果某数据源的 Driver 有特殊的中断方式,需要在此重写
* @implementation 调用 `connection.abort()` 或 Driver 提供的其他中断方法。
*/
@Override
protected void abortReadConnection(Connection connection, ResultSet resultSet) throws SQLException {
// connection.abort(MoreExecutors.directExecutor());
}
/**
* [按需重写] 为 Hikari 连接池设置验证查询,默认为 SELECT 1,如果数据源有特殊的语法,需重写
*
* @purpose 确保从池中获取的连接是有效的。
*/
@Override
protected void setValidationQuery(HikariDataSource ds) {
// ds.setConnectionTestQuery("SELECT 1"); // NewDB 的验证查询
}
/**
* [可选重写] 设置 JDBC Driver 特定的系统属性。
*
* @purpose 有些 Driver 需要通过 `System.setProperty()` 来开启或关闭某些功能。
*/
@Override
protected void setJdbcDriverSystemProperties() {
// System.setProperty("newdb.driver.property", "true");
}
}
在工厂类中注册 NewDBJdbcExecutor
修改 doris/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutorFactory.java
:
Step 6:添加回归测试
参考 doris/regression-test/suites/external_table_p0/jdbc/type_test/select/test_mysql_all_types_select.groovy
创建一个简单的测试用例。
-
创建
test_newdb_select.groovy
suite("test_newdb_select", "p0,external,newdb") {
String enabled = context.config.otherConfigs.get("enableJdbcTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
// 1. 定义 NewDB 的连接信息
def newdb_port = context.config.otherConfigs.get("newdb_port")
def driver_url = "http://your_repo/newdb-driver.jar"
// 2. 创建 Catalog
sql """create catalog newdb_catalog properties(
"type"="jdbc",
"user"="root",
"password"="123456",
"jdbc_url" = "jdbc:newdb://\${context.config.otherConfigs.get("externalEnvIp")}:\${newdb_port}/your_db",
"driver_url" = "\${driver_url}",
"driver_class" = "com.newdb.jdbc.Driver"
);"""
// 3. 执行测试
sql """use newdb_catalog.your_db"""
qt_select """select * from your_table order by 1 limit 10;"""
// 4. 清理环境
sql """drop catalog newdb_catalog"""
}
}
4. 开发注意事项
4.1 数据类型映射
在 jdbcTypeToDoris
方法中,需要仔细处理:
- 精度映射:
DECIMAL
类型需要正确映射精度和标度。 - 时间类型:注意时区和精度处理。
- 特殊类型:如一些二进制类的特殊类型
4.2 特定数据源数据读取
在 BaseJdbcExecutor
的子类中,重点关注:
getColumnValue
: 获取数据的方式getOutputConverter
: 数据转换函数
4.3 错误处理
- SQL 异常转换:将数据源特定异常转换为
JdbcClientException
。 - 驱动兼容性:处理不同驱动版本的兼容问题。
4.4 谓词与函数下推 (可选优化)
为提升查询性能,Doris 会尽可能将 WHERE
条件和部分函数下推到外部数据源执行。
- 谓词下推:大部分场景下,Doris 会自动处理。但对于特殊语法(如特殊日期函数),可能需要在
JdbcScanNode.java
的conjunctExprToString()
方法中进行适配。 - 函数下推:可以在
JdbcFunctionPushDownRule.java
中定义 NewDB 支持下推的函数列表和函数名替换规则,以获得更好的性能。
5. 部署配置
5.1 驱动部署
# 将 NewDB JDBC 驱动放到指定目录
cp newdb-jdbc-driver.jar $DORIS_HOME/plugins/jdbc_drivers/
5.2 创建 Catalog
CREATE CATALOG newdb_catalog PROPERTIES (
"type" = "jdbc",
"user" = "newdb_user",
"password" = "newdb_password",
"jdbc_url" = "jdbc:newdb://host:port/database",
"driver_url" = "newdb-jdbc-driver.jar",
"driver_class" = "com.newdb.jdbc.Driver"
);
-- 使用 Catalog
USE newdb_catalog.database_name;
SELECT * FROM table_name LIMIT 10;