Skip to main content

Hudi Catalog

Apache Doris & Hudi Quick Start

Usage​

  1. The query types supported by the Hudi table are as follows, and the Incremental Query will be supported in the future.
Table TypeSupported Query types
Copy On WriteSnapshot Query + Time Travel
Merge On ReadSnapshot Queries + Read Optimized Queries + Time Travel
  1. Doris supports Hive Metastore(Including catalogs compatible with Hive MetaStore, like AWS Glue/Alibaba DLF) Catalogs.

Create Catalog​

Same as creating Hive Catalogs. A simple example is provided here. See Hive for more information.

CREATE CATALOG hudi PROPERTIES (
'type'='hms',
'hive.metastore.uris' = 'thrift://172.21.0.1:7004',
'hadoop.username' = 'hive',
'dfs.nameservices'='your-nameservice',
'dfs.ha.namenodes.your-nameservice'='nn1,nn2',
'dfs.namenode.rpc-address.your-nameservice.nn1'='172.21.0.2:4007',
'dfs.namenode.rpc-address.your-nameservice.nn2'='172.21.0.3:4007',
'dfs.client.failover.proxy.provider.your-nameservice'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
);

Optional configuration parameters:

namedescriptiondefault
use_hive_sync_partitionUse hms synchronized partition datafalse

Column Type Mapping​

Same as that in Hive Catalogs. See the relevant section in Hive.

Skip Merge​

Spark will create the read optimize table with _ro suffix when generating hudi mor table. Doris will skip the log files when reading optimize table. Doris does not determine whether a table is read optimize by the _ro suffix instead of the hive inputformat. Users can observe whether the inputformat of the 'cow/mor/read optimize' table is the same through the SHOW CREATE TABLE command. In addition, Doris supports adding hoodie related configurations to catalog properties, which are compatible with Spark Datasource Configs, so users can add hoodie.datasource.merge.type=skip_merge in catalog properties to skip merge logs files.

Query Optimization​

Doris uses the parquet native reader to read the data files of the COW table, and uses the Java SDK (By calling hudi-bundle through JNI) to read the data files of the MOR table. In upsert scenario, there may still remains base files that have not been updated in the MOR table, which can be read through the parquet native reader. Users can view the execution plan of hudi scan through the explain command, where hudiNativeReadSplits indicates how many split files are read through the parquet native reader.

|0:VHUDI_SCAN_NODE                                                             |
| table: minbatch_mor_rt |
| predicates: `o_orderkey` = 100030752 |
| inputSplitNum=810, totalFileSize=5645053056, scanRanges=810 |
| partition=80/80 |
| numNodes=6 |
| hudiNativeReadSplits=717/810 |

Users can view the perfomace of Java SDK through profile, for example:

-  HudiJniScanner:  0ns
- FillBlockTime: 31.29ms
- GetRecordReaderTime: 1m5s
- JavaScanTime: 35s991ms
- OpenScannerTime: 1m6s
  1. OpenScannerTime: Time to create and initialize JNI reader
  2. JavaScanTime: Time to read data by Java SDK
  3. FillBlockTime: Time co convert Java column data into C++ column data
  4. GetRecordReaderTime: Time to create and initialize Hudi Record Reader

Time Travel​

Supports reading snapshots specified in Hudi table.

Every write operation to the Hudi table will generate a new snapshot.

By default, query requests will only read the latest version of the snapshot.

You can use the FOR TIME AS OF statement, based on the time of the snapshot to read historical version data. Examples are as follows:

SELECT * FROM hudi_tbl FOR TIME AS OF "2022-10-07 17:20:37";

SELECT * FROM hudi_tbl FOR TIME AS OF "20221007172037";

Hudi table does not support the FOR VERSION AS OF statement. Using this syntax to query the Hudi table will throw an error.

Incremental Read​

Incremental Read can query the data changed between startTime and endTime, and the returned result set is the final state of the data at endTime.

Doris provides @incr syntax support Incremental Read:

SELECT * from hudi_table@incr('beginTime'='xxx', ['endTime'='xxx'], ['hoodie.read.timeline.holes.resolution.policy'='FAIL'], ...);

beginTime is required, and the time format is consistent with the hudi official website hudi_table_changes, and supports "earliest". endTime is optional, and the default is the latest commitTime. Compatible with Spark Read Options.

To support Incremental Read, you need to enable the new optimizer, which is enabled by default. By viewing the execution plan through desc, we can find that Doris converts @incr into predicates and pushes it down to VHUDI_SCAN_NODE:

| 0:VHUDI_SCAN_NODE(113) |
| table: lineitem_mor |
| predicates: (_hoodie_commit_time[#0] >= '20240311151019723'), (_hoodie_commit_time[#0] <= '20240311151606605') |
| inputSplitNum=1, totalFileSize=13099711, scanRanges=1 |