Batch Deletion
Why do we need to introduce import-based Batch Delete when we have the Delete operation?
- Limitations of Delete operation
When you delete by Delete statement, each execution of Delete generates an empty rowset to record the deletion conditions and a new version of the data. Each time you read, you have to filter the deletion conditions. If you delete too often or have too many deletion conditions, it will seriously affect the query performance.
- Insert data interspersed with Delete data
For scenarios like importing data from a transactional database via CDC, Insert and Delete are usually interspersed in the data. In this case, the current Delete operation cannot be implemented.
Based on imported data, there are three ways the data can be merged:
APPEND: Append all data to existing data.
DELETE: Delete all rows that have the same value as the key column of the imported data (when a
sequence
column exists in the table, it is necessary to satisfy the logic of having the same primary key as well as the size of the sequence column in order to delete it correctly, see Use Case 4 below for details).MERGE: APPEND or DELETE according to DELETE ON decision
Batch Delete only works on Unique models.
Fundamental
This is achieved by adding a hidden column DORIS_DELETE_SIGN
to the Unique table.
When FE parses the query, DORIS_DELETE_SIGN
is removed when it encounters * and so on, and DORIS_DELETE_SIGN !
= true
, BE will add a column for judgement when reading, and determine whether to delete by the condition.
Import
On import, the value of the hidden column is set to the value of the
DELETE ON
expression during the FE parsing stage.Read
The read adds
DORIS_DELETE_SIGN !
= true
condition, BE does not sense this process and executes normally.Cumulative Compaction
In Cumulative Compaction, hidden columns are treated as normal columns and the Compaction logic remains unchanged.
Base Compaction
When Base Compaction is performed, the rows marked for deletion are deleted to reduce the space occupied by the data.
Enable Batch Delete Support
There are two forms of enabling Batch Delete support:
Batch Delete is supported by adding
enable_batch_delete_by_default=true
in the FE configuration file for all new tables created after restarting FE;For tables that do not have the above FE configuration changed or for existing tables that do not support Batch Delete, the following statement can be used:
ALTER TABLE tablename ENABLE FEATURE "BATCH_DELETE"
to enable Batch Delete. This is essentially a schema change operation, which returns immediately and can be confirmed byshowing alter table column
.
Then how to determine whether a table supports Batch Delete, you can set a session variable to show hidden columns SET show_hidden_columns=true
, and after that use desc tablename
, if there is a DORIS_DELETE_SIGN
column in the output then it is supported, if there is not then it is not supported.
Syntax Description
The syntax design of the import is mainly to add a column mapping that specifies the field of the delete marker column, and it is necessary to add a column to the imported data. The syntax of various import methods is as follows:
Stream Load
The writing method of Stream Load
adds a field to set the delete label column in the columns field in the header. Example: -H "columns: k1, k2, label_c3" -H "merge_type: [MERGE|APPEND|DELETE]" -H "delete: label_c3=1"
Broker Load
The writing method of Broker Load
sets the field of the delete marker column at PROPERTIES
. The syntax is as follows:
LOAD LABEL db1.label1
(
[MERGE|APPEND|DELETE] DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file1")
INTO TABLE tbl1
COLUMNS TERMINATED BY ","
(tmp_c1,tmp_c2, label_c3)
SET
(
id=tmp_c2,
name=tmp_c1,
)
[DELETE ON label_c3=true]
)
WITH BROKER 'broker'
(
"username"="user",
"password"="pass"
)
PROPERTIES
(
"timeout" = "3600"
);
Routine Load
The writing method of Routine Load
adds a mapping to the columns
field. The mapping method is the same as above. The syntax is as follows:
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
[WITH MERGE|APPEND|DELETE]
COLUMNS(k1, k2, k3, v1, v2, label),
WHERE k1 100 and k2 like "%doris%"
[DELETE ON label=true]
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,0,200"
);
Note
Since import operations other than stream load may be executed out of order inside doris, if it is not stream load when importing using the
MERGE
method, it needs to be used with load sequence. For the specific syntax, please refer to thesequence
column related documentsDELETE ON
condition can only be used with MERGE.
if session variable SET show_hidden_columns = true
was executed before running import task to show whether table support batch delete feature, then execute select count(*) from xxx
statement in the same session after finishing DELETE/MERGE
import task, it will result in a unexpected result that the statement result set will include the deleted results. To avoid this problem, you should execute SET show_hidden_columns = false
before selecting statement or open a new session to run the select statement.
Usage Examples
Check if Batch Delete Support is Enabled
mysql SET show_hidden_columns=true;
Query OK, 0 rows affected (0.00 sec)
mysql DESC test;
+-----------------------+--------------+------+-------+---------+---------+
| Field | Type | Null | Key | Default | Extra |
+-----------------------+--------------+------+-------+---------+---------+
| name | VARCHAR(100) | No | true | NULL | |
| gender | VARCHAR(10) | Yes | false | NULL | REPLACE |
| age | INT | Yes | false | NULL | REPLACE |
| DORIS_DELETE_SIGN | TINYINT | No | false | 0 | REPLACE |
+-----------------------+--------------+------+-------+---------+---------+
4 rows in set (0.00 sec)
Stream Load Usage Examples
Import data normally:
curl --location-trusted -u root: -H "column_separator:," -H "columns: siteid, citycode, username, pv" -H "merge_type: APPEND" -T ~/table1_data http://127.0.0.1: 8130/api/test/table1/_stream_load
The APPEND condition can be omitted, which has the same effect as the following statement:
curl --location-trusted -u root: -H "column_separator:," -H "columns: siteid, citycode, username, pv" -T ~/table1_data http://127.0.0.1:8130/api/test/table1 /_stream_load
Delete all data with the same key as the imported data
curl --location-trusted -u root: -H "column_separator:," -H "columns: siteid, citycode, username, pv" -H "merge_type: DELETE" -T ~/table1_data http://127.0.0.1: 8130/api/test/table1/_stream_load
Before load:
+--------+----------+----------+------+
| siteid | citycode | username | pv |
+--------+----------+----------+------+
| 3 | 2 | tom | 2 |
| 4 | 3 | bush | 3 |
| 5 | 3 | helen | 3 |
+--------+----------+----------+------+Load data:
3,2,tom,0
After load:
+--------+----------+----------+------+
| siteid | citycode | username | pv |
+--------+----------+----------+------+
| 4 | 3 | bush | 3 |
| 5 | 3 | helen | 3 |
+--------+----------+----------+------+Import the same row as the key column of the row with
site_id=1
curl --location-trusted -u root: -H "column_separator:," -H "columns: siteid, citycode, username, pv" -H "merge_type: MERGE" -H "delete: siteid=1" -T ~/ table1_data http://127.0.0.1:8130/api/test/table1/_stream_load
Before load:
+--------+----------+----------+------+
| siteid | citycode | username | pv |
+--------+----------+----------+------+
| 4 | 3 | bush | 3 |
| 5 | 3 | helen | 3 |
| 1 | 1 | jim | 2 |
+--------+----------+----------+------+Load data:
2,1,grace,2
3,2,tom,2
1,1,jim,2After load:
+--------+----------+----------+------+
| siteid | citycode | username | pv |
+--------+----------+----------+------+
| 4 | 3 | bush | 3 |
| 2 | 1 | grace | 2 |
| 3 | 2 | tom | 2 |
| 5 | 3 | helen | 3 |
+--------+----------+----------+------+When the table has the sequence column, delete all data with the same key as the imported data
curl --location-trusted -u root: -H "column_separator:," -H "columns: name, gender, age" -H "function_column.sequence_col: age" -H "merge_type: DELETE" -T ~/table1_data http://127.0.0.1:8130/api/test/table1/_stream_load
When the unique table has the sequence column, sequence column is used as the basis for the replacement order of the REPLACE aggregate function under the same key column, and the larger value can replace the smaller value. If you want delete some data, the imported data must have the same key and the sequence column must be larger or equal than before.
For example, one table like this:
mysql SET show_hidden_columns=true;
Query OK, 0 rows affected (0.00 sec)
mysql DESC table1;
+------------------------+--------------+------+-------+---------+---------+
| Field | Type | Null | Key | Default | Extra |
+------------------------+--------------+------+-------+---------+---------+
| name | VARCHAR(100) | No | true | NULL | |
| gender | VARCHAR(10) | Yes | false | NULL | REPLACE |
| age | INT | Yes | false | NULL | REPLACE |
| DORIS_DELETE_SIGN | TINYINT | No | false | 0 | REPLACE |
| DORIS_SEQUENCE_COL | INT | Yes | false | NULL | REPLACE |
+------------------------+--------------+------+-------+---------+---------+
4 rows in set (0.00 sec)Before load:
+-------+--------+------+
| name | gender | age |
+-------+--------+------+
| li | male | 10 |
| wang | male | 14 |
| zhang | male | 12 |
+-------+--------+------+If you load data like this:
li,male,10
After load:
+-------+--------+------+
| name | gender | age |
+-------+--------+------+
| wang | male | 14 |
| zhang | male | 12 |
+-------+--------+------+You will find that the data is deleted.
li,male,10
But if you load data like this:
li,male,9
After load:
+-------+--------+------+
| name | gender | age |
+-------+--------+------+
| li | male | 10 |
| wang | male | 14 |
| zhang | male | 12 |
+-------+--------+------+You will find that the data is not deleted.
li,male,10
This is because in the underlying dependencies, it will first judge the case of the same key, display the row data with a large value in the sequence column, and then check whether the
DORIS_DELETE_SIGN
value of the row is 1. If it is 1, it will not be displayed. If it is 0, it will still be read out.
When data is written and deleted at the same time in the imported data (e.g., in the Flink CDC scenario), using the sequence column can effectively ensure consistency when the data arrives out of order, avoiding the deletion operation of an old version that arrives later, and accidentally deleting the new version of the data that arrives first.