Python UDF, UDAF, UDWF, UDTF
Python UDF
Python UDF (User Defined Function) is a custom scalar function extension mechanism provided by Apache Doris, allowing users to write custom functions in Python for data querying and processing. Through Python UDF, users can flexibly implement complex business logic, handle various data types, and fully leverage Python's rich ecosystem of libraries.
Python UDF supports two execution modes:
- Scalar Mode: Processes data row by row, suitable for simple transformations and calculations
- Vectorized Mode: Processes data in batches, utilizing Pandas for high-performance computing
Environment Dependencies: Before using Python UDF, you must pre-install pandas and pyarrow libraries in the Python environment on all BE nodes. These are mandatory dependencies for Doris Python UDF functionality. See Python UDF Environment Configuration.
Log Path: The Python UDF Server runtime log is located at output/be/log/python_udf_output.log. Users can check the Python Server's operation status, function execution information, and debug errors in this log.
Creating Python UDF
Python UDF supports two creation modes: Inline Mode and Module Mode.
If both the file parameter and AS $$ inline Python code are specified, Doris will prioritize loading the inline Python code and run the Python UDF in inline mode.
Inline Mode
Inline mode allows writing Python code directly in SQL, suitable for simple function logic.
Syntax:
CREATE FUNCTION function_name(parameter_type1, parameter_type2, ...)
RETURNS return_type
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "entry_function_name",
"runtime_version" = "python_version",
"always_nullable" = "true|false"
)
AS $$
def entry_function_name(param1, param2, ...):
# Python code here
return result
$$;
Example 1: Integer Addition
DROP FUNCTION IF EXISTS py_add(INT, INT);
CREATE FUNCTION py_add(INT, INT)
RETURNS INT
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "evaluate",
"runtime_version" = "3.10.12"
)
AS $$
def evaluate(a, b):
return a + b
$$;
SELECT py_add(10, 20) AS result; -- Result: 30
Example 2: String Concatenation
DROP FUNCTION IF EXISTS py_concat(STRING, STRING);
CREATE FUNCTION py_concat(STRING, STRING)
RETURNS STRING
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "evaluate",
"runtime_version" = "3.10.12"
)
AS $$
def evaluate(s1, s2):
if s1 is None or s2 is None:
return None
return s1 + s2
$$;
SELECT py_concat('Hello', ' World') AS result; -- Result: Hello World
SELECT py_concat(NULL, ' World') AS result; -- Result: NULL
SELECT py_concat('Hello', NULL) AS result; -- Result: NULL
Module Mode
Module mode is suitable for complex function logic, requiring Python code to be packaged into a .zip archive and referenced during function creation.
Step 1: Write Python Module
Create python_udf_scalar_ops.py file:
def add_three_numbers(a, b, c):
"""Add three numbers"""
if a is None or b is None or c is None:
return None
return a + b + c
def reverse_string(s):
"""Reverse a string"""
if s is None:
return None
return s[::-1]
def is_prime(n):
"""Check if a number is prime"""
if n is None or n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
import math
for i in range(3, int(math.sqrt(n)) + 1, 2):
if n % i == 0:
return False
return True
Step 2: Package Python Module
Must package Python files into .zip format (even for a single file):
zip python_udf_scalar_ops.zip python_udf_scalar_ops.py
For multiple Python files:
zip python_udf_scalar_ops.zip python_udf_scalar_ops.py utils.py helper.py ...
Step 3: Set Python Module Archive Path
Python module archives support multiple deployment methods, specified through the file parameter for the .zip package path:
Method 1: Local Filesystem (using file:// protocol)
"file" = "file:///path/to/python_udf_scalar_ops.zip"
Suitable for scenarios where the .zip package is stored on the BE node's local filesystem.
Method 2: HTTP/HTTPS Remote Download (using http:// or https:// protocol)
"file" = "http://example.com/udf/python_udf_scalar_ops.zip"
"file" = "https://s3.amazonaws.com/bucket/python_udf_scalar_ops.zip"
Suitable for scenarios where the .zip package is downloaded from object storage (such as S3, OSS, COS, etc.) or HTTP servers. Doris will automatically download and cache it locally.
- When using remote download method, ensure all BE nodes can access the URL
- First call will download the file, which may have some delay
- Files will be cached, subsequent calls do not need to download again
Step 4: Set symbol Parameter
In module mode, the symbol parameter is used to specify the function's location in the ZIP package, with the format:
[package_name.]module_name.func_name
Parameter Description:
package_name(optional): Top-level Python package name in the ZIP archive. Can be omitted if the function is in the package's root module or if there is no package in the ZIP archivemodule_name(required): Python module filename containing the target function (without.pysuffix)func_name(required): User-defined function name
Parsing Rules:
- Doris will split the
symbolstring by.:- If two substrings are obtained, they are
module_nameandfunc_name - If three or more substrings are obtained, the beginning is
package_name, middle ismodule_name, and end isfunc_name
- If two substrings are obtained, they are
- The
module_namepart is used as the module path for dynamic import viaimportlib - If
package_nameis specified, the entire path must form a valid Python import path, and the ZIP package structure must match this path
Example Illustrations:
Example A: No Package Structure (Two-Part)
ZIP Structure:
math_ops.py
symbol = "math_ops.add"
Indicates that the function add is defined in the math_ops.py file at the root of the ZIP package.
Example B: Package Structure (Three-Part)
ZIP Structure:
mylib/
├── __init__.py
└── string_helper.py
symbol = "mylib.string_helper.split_text"
Indicates that the function split_text is defined in the mylib/string_helper.py file, where:
package_name=mylibmodule_name=string_helperfunc_name=split_text
Example C: Nested Package Structure (Four-Part)
ZIP Structure:
mylib/
├── __init__.py
└── utils/
├── __init__.py
└── string_helper.py
symbol = "mylib.utils.string_helper.split_text"
Indicates that the function split_text is defined in the mylib/utils/string_helper.py file, where:
package_name=mylibmodule_name=utils.string_helperfunc_name=split_text
Note:
- If the
symbolformat is invalid (such as missing function name, empty module name, empty components in path, etc.), Doris will report an error during function invocation- The directory structure in the ZIP package must match the path specified by
symbol- Each package directory needs to contain an
__init__.pyfile (can be empty)
Step 5: Create UDF Function
Example 1: Using Local Files (No Package Structure)
DROP FUNCTION IF EXISTS py_add_three(INT, INT, INT);
DROP FUNCTION IF EXISTS py_reverse(STRING);
DROP FUNCTION IF EXISTS py_is_prime(INT);
CREATE FUNCTION py_add_three(INT, INT, INT)
RETURNS INT
PROPERTIES (
"type" = "PYTHON_UDF",
"file" = "file:///path/to/python_udf_scalar_ops.zip",
"symbol" = "python_udf_scalar_ops.add_three_numbers",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
);
CREATE FUNCTION py_reverse(STRING)
RETURNS STRING
PROPERTIES (
"type" = "PYTHON_UDF",
"file" = "file:///path/to/python_udf_scalar_ops.zip",
"symbol" = "python_udf_scalar_ops.reverse_string",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
);
CREATE FUNCTION py_is_prime(INT)
RETURNS BOOLEAN
PROPERTIES (
"type" = "PYTHON_UDF",
"file" = "file:///path/to/python_udf_scalar_ops.zip",
"symbol" = "python_udf_scalar_ops.is_prime",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
);
Example 2: Using HTTP/HTTPS Remote Files
DROP FUNCTION IF EXISTS py_add_three(INT, INT, INT);
DROP FUNCTION IF EXISTS py_reverse(STRING);
DROP FUNCTION IF EXISTS py_is_prime(INT);
CREATE FUNCTION py_add_three(INT, INT, INT)
RETURNS INT
PROPERTIES (
"type" = "PYTHON_UDF",
"file" = "https://your-storage.com/udf/python_udf_scalar_ops.zip",
"symbol" = "python_udf_scalar_ops.add_three_numbers",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
);
CREATE FUNCTION py_reverse(STRING)
RETURNS STRING
PROPERTIES (
"type" = "PYTHON_UDF",
"file" = "https://your-storage.com/udf/python_udf_scalar_ops.zip",
"symbol" = "python_udf_scalar_ops.reverse_string",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
);
CREATE FUNCTION py_is_prime(INT)
RETURNS BOOLEAN
PROPERTIES (
"type" = "PYTHON_UDF",
"file" = "https://your-storage.com/udf/python_udf_scalar_ops.zip",
"symbol" = "python_udf_scalar_ops.is_prime",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
);
Example 3: Using Package Structure
DROP FUNCTION IF EXISTS py_multiply(INT);
-- ZIP Structure: my_udf/__init__.py, my_udf/math_ops.py
CREATE FUNCTION py_multiply(INT)
RETURNS INT
PROPERTIES (
"type" = "PYTHON_UDF",
"file" = "file:///path/to/my_udf.zip",
"symbol" = "my_udf.math_ops.multiply_by_two",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
);
Step 6: Use Functions
SELECT py_add_three(10, 20, 30) AS sum_result; -- Result: 60
SELECT py_reverse('hello') AS reversed; -- Result: olleh
SELECT py_is_prime(17) AS is_prime; -- Result: true
Dropping Python UDF
-- Syntax
DROP FUNCTION IF EXISTS function_name(parameter_type1, parameter_type2, ...);
-- Examples
DROP FUNCTION IF EXISTS py_add_three(INT, INT, INT);
DROP FUNCTION IF EXISTS py_reverse(STRING);
DROP FUNCTION IF EXISTS py_is_prime(INT);
Parameter Description
CREATE FUNCTION Parameters
| Parameter | Required | Description |
|---|---|---|
function_name | Yes | Function name, must comply with identifier naming rules |
parameter_type | Yes | Parameter type list, supports various Doris data types |
return_type | Yes | Return value type |
PROPERTIES Parameters
| Parameter | Required | Default | Description |
|---|---|---|---|
type | Yes | - | Fixed as "PYTHON_UDF" |
symbol | Yes | - | Python function entry name. • Inline Mode: Write function name directly, such as "evaluate"• Module Mode: Format is [package_name.]module_name.func_name, see module mode description |
file | No | - | Python .zip package path, only required for module mode. Supports three protocols:• file:// - Local filesystem path• http:// - HTTP remote download• https:// - HTTPS remote download |
runtime_version | Yes | - | Python runtime version, such as "3.10.12", requires complete version number |
always_nullable | No | true | Whether to always return nullable results |
Runtime Version Description
- Supports Python 3.x versions
- Requires specifying complete version number (such as
"3.10.12"), cannot use only major.minor version number (such as"3.10") - If
runtime_versionis not specified, function invocation will report an error
Data Type Mapping
The following table lists the mapping relationship between Doris data types and Python types:
| Type Category | Doris Type | Python Type | Description |
|---|---|---|---|
| Null Type | NULL | None | Null value |
| Boolean Type | BOOLEAN | bool | Boolean value |
| Integer Types | TINYINT | int | 8-bit integer |
SMALLINT | int | 16-bit integer | |
INT | int | 32-bit integer | |
BIGINT | int | 64-bit integer | |
LARGEINT | int | 128-bit integer | |
IPV4 | int | IPv4 address (as integer) | |
| Floating Point Types | FLOAT | float | 32-bit floating point |
DOUBLE | float | 64-bit floating point | |
TIME / TIMEV2 | float | Time type (as floating point) | |
| String Types | CHAR | str | Fixed-length string |
VARCHAR | str | Variable-length string | |
STRING | str | String | |
IPV6 | str | IPv6 address (string format) | |
JSONB | str | JSON binary format (converted to string) | |
VARIANT | str | Variant type (converted to string) | |
| Date/Time Types | DATE | str | Date string, format 'YYYY-MM-DD' |
DATEV2 | datetime.date | Date object | |
DATETIME | str | DateTime string, format 'YYYY-MM-DD HH:MM:SS' | |
DATETIMEV2 | datetime.datetime | DateTime object | |
| Decimal Types | DECIMAL / DECIMALV2 | decimal.Decimal | High-precision decimal |
DECIMAL32 | decimal.Decimal | 32-bit fixed-point number | |
DECIMAL64 | decimal.Decimal | 64-bit fixed-point number | |
DECIMAL128 | decimal.Decimal | 128-bit fixed-point number | |
DECIMAL256 | decimal.Decimal | 256-bit fixed-point number | |
| Binary Types | BITMAP | bytes | Bitmap data (currently not supported) |
HLL | bytes | HyperLogLog data (currently not supported) | |
QUANTILE_STATE | bytes | Quantile state data (currently not supported) | |
| Complex Data Types | ARRAY<T> | list | Array, element type T |
MAP<K,V> | dict | Dictionary, key type K, value type V | |
STRUCT<f1:T1, f2:T2, ...> | dict | Struct, field names as keys, field values as values |
NULL Value Handling
- Doris
NULLvalues are mapped toNonein Python - If a function parameter is
NULL, Python function receivesNone - If Python function returns
None, Doris treats it asNULL - Recommend explicitly handling
Nonevalues in functions to avoid runtime errors
Example:
DROP FUNCTION IF EXISTS py_safe_divide(DOUBLE, DOUBLE);
CREATE FUNCTION py_safe_divide(DOUBLE, DOUBLE)
RETURNS DOUBLE
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "evaluate",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
)
AS $$
def evaluate(a, b):
if a is None or b is None:
return None
if b == 0:
return None
return a / b
$$;
SELECT py_safe_divide(10.0, 2.0); -- Result: 5.0
SELECT py_safe_divide(10.0, 0.0); -- Result: NULL
SELECT py_safe_divide(10.0, NULL); -- Result: NULL
Vectorized Mode
Vectorized mode uses Pandas for batch data processing, offering better performance than scalar mode. In vectorized mode, function parameters are pandas.Series objects, and return values should also be pandas.Series.
To ensure the system correctly recognizes vectorized mode, please use type annotations in function signatures (such as a: pd.Series) and directly operate on batch data structures in function logic. If vectorized types are not explicitly used, the system will fall back to Scalar Mode.
## Vectorized Mode
def add(a: pd.Series, b: pd.Series) -> pd.Series:
return a + b + 1
## Scalar Mode
def add(a, b):
return a + b + 1
Basic Examples
Example 1: Vectorized Integer Addition
DROP FUNCTION IF EXISTS py_vec_add(INT, INT);
CREATE FUNCTION py_vec_add(INT, INT)
RETURNS INT
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "add",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
)
AS $$
import pandas as pd
def add(a: pd.Series, b: pd.Series) -> pd.Series:
return a + b + 1
$$;
SELECT py_vec_add(1, 2); -- Result: 4
Example 2: Vectorized String Processing
DROP FUNCTION IF EXISTS py_vec_upper(STRING);
CREATE FUNCTION py_vec_upper(STRING)
RETURNS STRING
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "to_upper",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
)
AS $$
import pandas as pd
def to_upper(s: pd.Series) -> pd.Series:
return s.str.upper()
$$;
SELECT py_vec_upper('hello'); -- Result: 'HELLO'
Example 3: Vectorized Mathematical Operations
DROP FUNCTION IF EXISTS py_vec_sqrt(DOUBLE);
CREATE FUNCTION py_vec_sqrt(DOUBLE)
RETURNS DOUBLE
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "sqrt",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
)
AS $$
import pandas as pd
import numpy as np
def sqrt(x: pd.Series) -> pd.Series:
return np.sqrt(x)
$$;
SELECT py_vec_sqrt(16); -- Result: 4.0
Advantages of Vectorized Mode
- Performance Optimization: Batch processing reduces interaction frequency between Python and Doris
- Leverage Pandas/NumPy: Fully utilize vectorized computing performance advantages
- Concise Code: Pandas API allows more concise expression of complex logic
Using Vectorized Functions
DROP TABLE IF EXISTS test_table;
CREATE TABLE test_table (
id INT,
value INT,
text STRING,
score DOUBLE
) ENGINE=OLAP
DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES("replication_num" = "1");
INSERT INTO test_table VALUES
(1, 10, 'hello', 85.5),
(2, 20, 'world', 92.0),
(3, 30, 'python', 78.3);
SELECT
id,
py_vec_add(value, value) AS sum_result,
py_vec_upper(text) AS upper_text,
py_vec_sqrt(score) AS sqrt_score
FROM test_table;
+------+------------+------------+-------------------+
| id | sum_result | upper_text | sqrt_score |
+------+------------+------------+-------------------+
| 1 | 21 | HELLO | 9.246621004453464 |
| 2 | 41 | WORLD | 9.591663046625438 |
| 3 | 61 | PYTHON | 8.848728722251575 |
+------+------------+------------+-------------------+
Complex Data Type Handling
ARRAY Type
Example: Array Element Sum
DROP FUNCTION IF EXISTS py_array_sum(ARRAY<INT>);
CREATE FUNCTION py_array_sum(ARRAY<INT>)
RETURNS INT
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "evaluate",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
)
AS $$
def evaluate(arr):
""" ARRAY type in Doris corresponds to list in Python """
if arr is None:
return None
return sum(arr)
$$;
SELECT py_array_sum([1, 2, 3, 4, 5]) AS result; -- Result: 15
Example: Array Filtering
DROP FUNCTION IF EXISTS py_array_filter_positive(ARRAY<INT>);
CREATE FUNCTION py_array_filter_positive(ARRAY<INT>)
RETURNS ARRAY<INT>
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "evaluate",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
)
AS $$
def evaluate(arr):
if arr is None:
return None
return [x for x in arr if x > 0]
$$;
SELECT py_array_filter_positive([1, -2, 3, -4, 5]) AS result; -- Result: [1, 3, 5]
MAP Type
Example: Get MAP Key Count
DROP FUNCTION IF EXISTS py_map_size(MAP<STRING, INT>);
CREATE FUNCTION py_map_size(MAP<STRING, INT>)
RETURNS INT
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "evaluate",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
)
AS $$
def evaluate(m):
""" MAP type in Doris corresponds to dict in Python """
if m is None:
return None
return len(m)
$$;
SELECT py_map_size({'a': 1, 'b': 2, 'c': 3}) AS result; -- Result: 3
Example: Get MAP Value
DROP FUNCTION IF EXISTS py_map_get(MAP<STRING, STRING>, STRING);
CREATE FUNCTION py_map_get(MAP<STRING, STRING>, STRING)
RETURNS STRING
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "evaluate",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
)
AS $$
def evaluate(m, key):
if m is None or key is None:
return None
return m.get(key)
$$;
SELECT py_map_get({'name': 'Alice', 'age': '30'}, 'name') AS result; -- Result: Alice
STRUCT Type
Example: Access STRUCT Field
DROP FUNCTION IF EXISTS py_struct_get_name(STRUCT<name: STRING, age: INT>);
CREATE FUNCTION py_struct_get_name(STRUCT<name: STRING, age: INT>)
RETURNS STRING
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "evaluate",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
)
AS $$
def evaluate(s):
""" STRUCT type in Doris corresponds to dict in Python """
if s is None:
return None
return s.get('name')
$$;
SELECT py_struct_get_name({'Alice', 30}) AS result; -- Result: Alice
Practical Application Scenarios
Scenario 1: Data Masking
DROP FUNCTION IF EXISTS py_mask_email(STRING);
CREATE FUNCTION py_mask_email(STRING)
RETURNS STRING
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "evaluate",
"runtime_version" = "3.10.12"
)
AS $$
def evaluate(email):
if email is None or '@' not in email:
return None
parts = email.split('@')
if len(parts[0]) <= 1:
return email
masked_user = parts[0][0] + '***'
return f"{masked_user}@{parts[1]}"
$$;
SELECT py_mask_email('user@example.com') AS masked; -- Result: u***@example.com
Scenario 2: String Similarity Calculation
DROP FUNCTION IF EXISTS py_levenshtein_distance(STRING, STRING);
CREATE FUNCTION py_levenshtein_distance(STRING, STRING)
RETURNS INT
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "evaluate",
"runtime_version" = "3.10.12"
)
AS $$
def evaluate(s1, s2):
if s1 is None or s2 is None:
return None
if len(s1) < len(s2):
return evaluate(s2, s1)
if len(s2) == 0:
return len(s1)
previous_row = range(len(s2) + 1)
for i, c1 in enumerate(s1):
current_row = [i + 1]
for j, c2 in enumerate(s2):
insertions = previous_row[j + 1] + 1
deletions = current_row[j] + 1
substitutions = previous_row[j] + (c1 != c2)
current_row.append(min(insertions, deletions, substitutions))
previous_row = current_row
return previous_row[-1]
$$;
SELECT py_levenshtein_distance('kitten', 'sitting') AS distance; -- Result: 3
Scenario 3: Date Calculation
DROP FUNCTION IF EXISTS py_days_between(DATE, DATE);
CREATE FUNCTION py_days_between(DATE, DATE)
RETURNS INT
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "evaluate",
"runtime_version" = "3.10.12"
)
AS $$
from datetime import datetime
def evaluate(date1_str, date2_str):
if date1_str is None or date2_str is None:
return None
try:
d1 = datetime.strptime(str(date1_str), '%Y-%m-%d')
d2 = datetime.strptime(str(date2_str), '%Y-%m-%d')
return abs((d2 - d1).days)
except:
return None
$$;
SELECT py_days_between('2024-01-01', '2024-12-31') AS days; -- Result: 365
Scenario 4: ID Card Validation
DROP FUNCTION IF EXISTS py_validate_id_card(STRING);
CREATE FUNCTION py_validate_id_card(STRING)
RETURNS BOOLEAN
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "evaluate",
"runtime_version" = "3.10.12"
)
AS $$
def evaluate(id_card):
if id_card is None or len(id_card) != 18:
return False
# Validate first 17 digits are numeric
if not id_card[:17].isdigit():
return False
# Check code weights
weights = [7, 9, 10, 5, 8, 4, 2, 1, 6, 3, 7, 9, 10, 5, 8, 4, 2]
check_codes = ['1', '0', 'X', '9', '8', '7', '6', '5', '4', '3', '2']
# Calculate check code
total = sum(int(id_card[i]) * weights[i] for i in range(17))
check_code = check_codes[total % 11]
return id_card[17].upper() == check_code
$$;
SELECT py_validate_id_card('11010519491231002X') AS is_valid; -- Result: True
SELECT py_validate_id_card('110105194912310021x') AS is_valid; -- Result: False
Performance Optimization Recommendations
1. Prefer Vectorized Mode
Vectorized mode significantly outperforms scalar mode:
# Scalar Mode - Process row by row
def scalar_process(x):
return x * 2
# Vectorized Mode - Batch processing
import pandas as pd
def vector_process(x: pd.Series) -> pd.Series:
return x * 2
2. Use Module Mode for Complex Logic Management
Place complex function logic in separate Python files for easier maintenance and reuse.
3. Avoid I/O Operations in Functions
Not recommended to perform file read/write, network requests, and other I/O operations in UDF, which will seriously impact performance.
Limitations and Considerations
1. Python Version Support
- Only supports Python 3.x versions
- Recommend using Python 3.10 or higher
- Ensure Doris cluster has the corresponding Python runtime installed
2. Dependency Libraries
- Built-in support for Python standard library
- Third-party libraries need to be pre-installed in the cluster environment
3. Performance Considerations
- Python UDF performance is lower than Doris built-in functions (C++ implementation)
- For performance-sensitive scenarios, prioritize Doris built-in functions
- Large data volume scenarios recommend using vectorized mode
4. Security
- UDF code executes in Doris processes, must ensure code is safe and trusted
- Avoid dangerous operations in UDF (such as system commands, file deletion, etc.)
- Production environments recommend auditing UDF code
5. Resource Limitations
- UDF execution occupies BE node CPU and memory resources
- Heavy UDF usage may impact overall cluster performance
- Recommend monitoring UDF resource consumption
Frequently Asked Questions (FAQ)
Q1: How to use third-party libraries in Python UDF?
A: Need to install corresponding Python libraries on all BE nodes. For example:
pip3 install numpy pandas
conda install numpy pandas
Q2: Does Python UDF support recursive functions?
A: Yes, but need to pay attention to recursion depth to avoid stack overflow.
Q3: How to debug Python UDF?
A: Can debug function logic in local Python environment first, ensure correctness before creating UDF. Can check BE logs for error information.
Q4: Does Python UDF support global variables?
A: Yes, but not recommended, because global variable behavior in distributed environments may not meet expectations.
Q5: How to update existing Python UDF?
A: Delete old UDF first, then create new one:
DROP FUNCTION IF EXISTS function_name(parameter_types);
CREATE FUNCTION function_name(...) ...;
Q6: Can Python UDF access external resources?
A: Technically possible, but strongly not recommended. Python UDF can use network request libraries (such as requests) to access external APIs, databases, etc., but this will seriously impact performance and stability. Reasons include:
- Network latency will slow down queries
- External service unavailability will cause UDF failure
- Large concurrent requests may cause external service pressure
- Difficult to control timeout and error handling
Python UDAF
Python UDAF (User Defined Aggregate Function) is a custom aggregate function extension mechanism provided by Apache Doris, allowing users to write custom aggregate functions in Python for data grouping aggregation and window calculations. Through Python UDAF, users can flexibly implement complex aggregation logic such as statistical analysis, data collection, custom metric calculations, etc.
Core features of Python UDAF:
- Distributed Aggregation: Supports data aggregation in distributed environments, automatically handling data partitioning, merging, and final computation
- State Management: Maintains aggregation state through class instances, supporting complex state objects
- Window Function Support: Can be used with window functions (OVER clause) to implement advanced features like moving aggregations, ranking, etc.
- High Flexibility: Can implement arbitrarily complex aggregation logic without being limited by built-in aggregate functions
Environment Dependencies: Before using Python UDAF, you must pre-install pandas and pyarrow libraries in the Python environment on all BE nodes. These are mandatory dependencies for Doris Python UDAF functionality. See Python UDAF Environment Configuration.
Log Path: The Python UDAF Server runtime log is located at output/be/log/python_udf_output.log. Users can check the Python Server's operation status, aggregate function execution information, and debug errors in this log.
UDAF Basic Concepts
Lifecycle of Aggregate Functions
Python UDAF is implemented through classes, and the execution of an aggregate function includes the following stages:
- Initialization (init): Creates aggregation state object, initializes state variables
- Accumulation (accumulate): Processes single row data, updates aggregation state
- Merging (merge): Merges aggregation states from multiple partitions (distributed scenario)
- Completion (finish): Computes and returns final aggregation result
Required Class Methods and Properties
A complete Python UDAF class must implement the following methods:
| Method/Property | Description | Required |
|---|---|---|
__init__(self) | Initialize aggregation state | Yes |
accumulate(self, *args) | Accumulate single row data | Yes |
merge(self, other_state) | Merge states from other partitions | Yes |
finish(self) | Return final aggregation result | Yes |
aggregate_state (property) | Return serializable aggregation state, must support pickle serialization | Yes |
Basic Syntax
Creating Python UDAF
Python UDAF supports two creation modes: Inline Mode and Module Mode.
If both the file parameter and AS $$ inline Python code are specified, Doris will prioritize loading inline Python code and run the Python UDAF in inline mode.
Inline Mode
Inline mode allows writing Python classes directly in SQL, suitable for simple aggregation logic.
Syntax:
CREATE AGGREGATE FUNCTION function_name(parameter_type1, parameter_type2, ...)
RETURNS return_type
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "ClassName",
"runtime_version" = "python_version",
"always_nullable" = "true|false"
)
AS $$
class ClassName:
def __init__(self):
# Initialize state variables
@property
def aggregate_state(self):
# Return serializable state
def accumulate(self, *args):
# Accumulate data
def merge(self, other_state):
# Merge state
def finish(self):
# Return final result
$$;
Example 1: Sum Aggregation
DROP TABLE IF EXISTS sales;
CREATE TABLE IF NOT EXISTS sales (
id INT,
category VARCHAR(50),
amount INT
) DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES("replication_num" = "1");
INSERT INTO sales VALUES
(1, 'Electronics', 1000),
(2, 'Electronics', 1500),
(3, 'Books', 200),
(4, 'Books', 300),
(5, 'Clothing', 500),
(6, 'Clothing', 800),
(7, 'Electronics', 2000),
(8, 'Books', 150);
DROP FUNCTION IF EXISTS py_sum(INT);
CREATE AGGREGATE FUNCTION py_sum(INT)
RETURNS BIGINT
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "SumUDAF",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
)
AS $$
class SumUDAF:
def __init__(self):
self.total = 0
@property
def aggregate_state(self):
return self.total
def accumulate(self, value):
if value is not None:
self.total += value
def merge(self, other_state):
self.total += other_state
def finish(self):
return self.total
$$;
SELECT category, py_sum(amount) as total_amount
FROM sales
GROUP BY category
ORDER BY category;
+-------------+--------------+
| category | total_amount |
+-------------+--------------+
| Books | 650 |
| Clothing | 1300 |
| Electronics | 4500 |
+-------------+--------------+
Example 2: Average Aggregation
DROP TABLE IF EXISTS employees;
CREATE TABLE IF NOT EXISTS employees (
id INT,
name VARCHAR(100),
department VARCHAR(50),
salary DOUBLE
) DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES("replication_num" = "1");
INSERT INTO employees VALUES
(1, 'Alice', 'Engineering', 80000.0),
(2, 'Bob', 'Engineering', 90000.0),
(3, 'Charlie', 'Sales', 60000.0),
(4, 'David', 'Sales', 80000.0),
(5, 'Eve', 'HR', 50000.0),
(6, 'Frank', 'Engineering', 70000.0),
(7, 'Grace', 'HR', 70000.0);
DROP FUNCTION IF EXISTS py_avg(DOUBLE);
CREATE AGGREGATE FUNCTION py_avg(DOUBLE)
RETURNS DOUBLE
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "AvgUDAF",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
)
AS $$
class AvgUDAF:
def __init__(self):
self.sum = 0.0
self.count = 0
@property
def aggregate_state(self):
return (self.sum, self.count)
def accumulate(self, value):
if value is not None:
self.sum += value
self.count += 1
def merge(self, other_state):
other_sum, other_count = other_state
self.sum += other_sum
self.count += other_count
def finish(self):
if self.count == 0:
return None
return self.sum / self.count
$$;
SELECT department, py_avg(salary) as avg_salary
FROM employees
GROUP BY department
ORDER BY department;
+-------------+------------+
| department | avg_salary |
+-------------+------------+
| Engineering | 80000 |
| HR | 60000 |
| Sales | 70000 |
+-------------+------------+
Module Mode
Module mode is suitable for complex aggregation logic, requiring Python code to be packaged into a .zip archive and referenced during function creation.
Step 1: Write Python Module
Create stats_udaf.py file:
import math
class VarianceUDAF:
"""Calculate population variance"""
def __init__(self):
self.count = 0
self.sum_val = 0.0
self.sum_sq = 0.0
@property
def aggregate_state(self):
return (self.count, self.sum_val, self.sum_sq)
def accumulate(self, value):
if value is not None:
self.count += 1
self.sum_val += value
self.sum_sq += value * value
def merge(self, other_state):
other_count, other_sum, other_sum_sq = other_state
self.count += other_count
self.sum_val += other_sum
self.sum_sq += other_sum_sq
def finish(self):
if self.count == 0:
return None
mean = self.sum_val / self.count
variance = (self.sum_sq / self.count) - (mean * mean)
return variance
class StdDevUDAF:
"""Calculate population standard deviation"""
def __init__(self):
self.count = 0
self.sum_val = 0.0
self.sum_sq = 0.0
@property
def aggregate_state(self):
return (self.count, self.sum_val, self.sum_sq)
def accumulate(self, value):
if value is not None:
self.count += 1
self.sum_val += value
self.sum_sq += value * value
def merge(self, other_state):
other_count, other_sum, other_sum_sq = other_state
self.count += other_count
self.sum_val += other_sum
self.sum_sq += other_sum_sq
def finish(self):
if self.count == 0:
return None
mean = self.sum_val / self.count
variance = (self.sum_sq / self.count) - (mean * mean)
return math.sqrt(max(0, variance))
class MedianUDAF:
"""Calculate median"""
def __init__(self):
self.values = []
@property
def aggregate_state(self):
return self.values
def accumulate(self, value):
if value is not None:
self.values.append(value)
def merge(self, other_state):
if other_state:
self.values.extend(other_state)
def finish(self):
if not self.values:
return None
sorted_vals = sorted(self.values)
n = len(sorted_vals)
if n % 2 == 0:
return (sorted_vals[n//2 - 1] + sorted_vals[n//2]) / 2.0
else:
return sorted_vals[n//2]
Step 2: Package Python Module
Must package Python files into .zip format (even for a single file):
zip stats_udaf.zip stats_udaf.py
Step 3: Set Python Module Archive Path
Supports multiple deployment methods, specified through the file parameter for the .zip package path:
Method 1: Local Filesystem (using file:// protocol)
"file" = "file:///path/to/stats_udaf.zip"
Method 2: HTTP/HTTPS Remote Download (using http:// or https:// protocol)
"file" = "http://example.com/udaf/stats_udaf.zip"
"file" = "https://s3.amazonaws.com/bucket/stats_udaf.zip"
Note:
- When using remote download method, ensure all BE nodes can access the URL
- First call will download the file, which may have some delay
- Files will be cached, subsequent calls do not need to download again
Step 4: Set symbol Parameter
In module mode, the symbol parameter is used to specify the class's location in the ZIP package, with the format:
[package_name.]module_name.ClassName
Parameter Description:
package_name(optional): Top-level Python package name in the ZIP archivemodule_name(required): Python module filename containing the target class (without.pysuffix)ClassName(required): UDAF class name
Parsing Rules:
- Doris will split the
symbolstring by.:- If two substrings are obtained, they are
module_nameandClassName - If three or more substrings are obtained, the beginning is
package_name, middle ismodule_name, and end isClassName
- If two substrings are obtained, they are
Step 5: Create UDAF Functions
DROP FUNCTION IF EXISTS py_variance(DOUBLE);
DROP FUNCTION IF EXISTS py_stddev(DOUBLE);
DROP FUNCTION IF EXISTS py_median(DOUBLE);
CREATE AGGREGATE FUNCTION py_variance(DOUBLE)
RETURNS DOUBLE
PROPERTIES (
"type" = "PYTHON_UDF",
"file" = "file:///path/to/stats_udaf.zip",
"symbol" = "stats_udaf.VarianceUDAF",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
);
CREATE AGGREGATE FUNCTION py_stddev(DOUBLE)
RETURNS DOUBLE
PROPERTIES (
"type" = "PYTHON_UDF",
"file" = "file:///path/to/stats_udaf.zip",
"symbol" = "stats_udaf.StdDevUDAF",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
);
CREATE AGGREGATE FUNCTION py_median(DOUBLE)
RETURNS DOUBLE
PROPERTIES (
"type" = "PYTHON_UDF",
"file" = "file:///path/to/stats_udaf.zip",
"symbol" = "stats_udaf.MedianUDAF",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
);
Step 6: Use Functions
DROP TABLE IF EXISTS exam_results;
CREATE TABLE IF NOT EXISTS exam_results (
id INT,
student_name VARCHAR(100),
category VARCHAR(50),
score DOUBLE
) DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES("replication_num" = "1");
INSERT INTO exam_results VALUES
(1, 'Alice', 'Math', 85.0),
(2, 'Bob', 'Math', 92.0),
(3, 'Charlie', 'Math', 78.0),
(4, 'David', 'Math', 88.0),
(5, 'Eve', 'Math', 95.0),
(6, 'Frank', 'English', 75.0),
(7, 'Grace', 'English', 82.0),
(8, 'Henry', 'English', 88.0),
(9, 'Iris', 'English', 79.0),
(10, 'Jack', 'Physics', 90.0),
(11, 'Kate', 'Physics', 85.0),
(12, 'Lily', 'Physics', 92.0),
(13, 'Mike', 'Physics', 88.0);
SELECT
category,
py_variance(score) as variance,
py_stddev(score) as std_dev,
py_median(score) as median
FROM exam_results
GROUP BY category
ORDER BY category;
+----------+-------------------+-------------------+--------+
| category | variance | std_dev | median |
+----------+-------------------+-------------------+--------+
| English | 22.5 | 4.743416490252569 | 80.5 |
| Math | 34.64000000000033 | 5.885575587824892 | 88 |
| Physics | 6.6875 | 2.58602010819715 | 89 |
+----------+-------------------+-------------------+--------+
Dropping Python UDAF
-- Syntax
DROP FUNCTION IF EXISTS function_name(parameter_types);
-- Examples
DROP FUNCTION IF EXISTS py_sum(INT);
DROP FUNCTION IF EXISTS py_avg(DOUBLE);
DROP FUNCTION IF EXISTS py_variance(DOUBLE);
Parameter Description
CREATE AGGREGATE FUNCTION Parameters
| Parameter | Description |
|---|---|
function_name | Function name, follows SQL identifier naming rules |
parameter_types | Parameter type list, such as INT, DOUBLE, STRING, etc. |
RETURNS return_type | Return value type |
PROPERTIES Parameters
| Parameter | Required | Default | Description |
|---|---|---|---|
type | Yes | - | Fixed as "PYTHON_UDF" |
symbol | Yes | - | Python class name. • Inline Mode: Write class name directly, such as "SumUDAF"• Module Mode: Format is [package_name.]module_name.ClassName |
file | No | - | Python .zip package path, only required for module mode. Supports three protocols:• file:// - Local filesystem path• http:// - HTTP remote download• https:// - HTTPS remote download |
runtime_version | Yes | - | Python runtime version, such as "3.10.12" |
always_nullable | No | true | Whether to always return nullable results |
runtime_version Description
- Must fill in complete version number of Python version, format is
x.x.xorx.x.xx - Doris will search for matching version interpreter in configured Python environment
Window Functions
Python UDAF can be used with window functions (OVER clause):
If Python UDAF is used in window functions (OVER clause), Doris will call the
resetmethod of the UDAF after calculating each window frame, which needs to be implemented in the class to reset the aggregation state to its initial value
DROP TABLE IF EXISTS daily_sales_data;
CREATE TABLE IF NOT EXISTS daily_sales_data (
sales_date DATE,
daily_sales DOUBLE
) DUPLICATE KEY(sales_date)
DISTRIBUTED BY HASH(sales_date) BUCKETS 1
PROPERTIES("replication_num" = "1");
INSERT INTO daily_sales_data VALUES
('2024-01-01', 1000),
('2024-01-01', 800),
('2024-01-02', 1200),
('2024-01-02', 950),
('2024-01-03', 900),
('2024-01-03', 1100),
('2024-01-04', 1500),
('2024-01-04', 850),
('2024-01-05', 1100),
('2024-01-05', 1300);
DROP FUNCTION IF EXISTS py_running_sum(DOUBLE);
CREATE AGGREGATE FUNCTION py_running_sum(DOUBLE)
RETURNS DOUBLE
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "RunningSumUDAF",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
)
AS $$
class RunningSumUDAF:
def __init__(self):
self.total = 0.0
def reset(self):
self.total = 0.0
@property
def aggregate_state(self):
return self.total
def accumulate(self, value):
if value is not None:
self.total += value
def merge(self, other_state):
self.total += other_state
def finish(self):
return self.total
$$;
SELECT
sales_date,
daily_sales,
py_running_sum(daily_sales) OVER (
ORDER BY sales_date
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
) as last_3_days_sum
FROM daily_sales_data
ORDER BY sales_date;
+------------+-------------+-----------------+
| sales_date | daily_sales | last_3_days_sum |
+------------+-------------+-----------------+
| 2024-01-01 | 800 | 800 |
| 2024-01-01 | 1000 | 1800 |
| 2024-01-02 | 950 | 2750 |
| 2024-01-02 | 1200 | 3150 |
| 2024-01-03 | 1100 | 3250 |
| 2024-01-03 | 900 | 3200 |
| 2024-01-04 | 850 | 2850 |
| 2024-01-04 | 1500 | 3250 |
| 2024-01-05 | 1300 | 3650 |
| 2024-01-05 | 1100 | 3900 |
+------------+-------------+-----------------+
Data Type Mapping
Python UDAF uses exactly the same data type mapping rules as Python UDF, including all types such as integers, floats, strings, date/time, Decimal, boolean, etc.
For detailed data type mapping relationships, please refer to: Data Type Mapping
NULL Value Handling
- Doris maps SQL
NULLvalues to Python'sNone - In the
accumulatemethod, need to check if parameters areNone - Aggregate functions can return
Noneto indicate result isNULL
Practical Application Scenarios
Scenario 1: Calculate Percentiles
DROP FUNCTION IF EXISTS py_percentile(DOUBLE, INT);
CREATE AGGREGATE FUNCTION py_percentile(DOUBLE, INT)
RETURNS DOUBLE
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "PercentileUDAF",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
)
AS $$
class PercentileUDAF:
"""Calculate percentile, second parameter is percentile (0-100)"""
def __init__(self):
self.values = []
self.percentile = 50 # Default median
@property
def aggregate_state(self):
return self.values
def accumulate(self, value, percentile):
if value is not None:
self.values.append(value)
if percentile is not None:
self.percentile = percentile
def merge(self, other_state):
if other_state:
self.values.extend(other_state)
def finish(self):
if not self.values:
return None
sorted_vals = sorted(self.values)
n = len(sorted_vals)
k = (n - 1) * (self.percentile / 100.0)
f = int(k)
c = k - f
if f + 1 < n:
return sorted_vals[f] + (sorted_vals[f + 1] - sorted_vals[f]) * c
else:
return sorted_vals[f]
$$;
DROP TABLE IF EXISTS api_logs;
CREATE TABLE IF NOT EXISTS api_logs (
log_id INT,
api_name VARCHAR(100),
category VARCHAR(50),
response_time DOUBLE
) DUPLICATE KEY(log_id)
DISTRIBUTED BY HASH(log_id) BUCKETS 1
PROPERTIES("replication_num" = "1");
INSERT INTO api_logs VALUES
(1, '/api/users', 'User', 120.5),
(2, '/api/users', 'User', 95.3),
(3, '/api/users', 'User', 150.0),
(4, '/api/users', 'User', 80.2),
(5, '/api/users', 'User', 200.8),
(6, '/api/orders', 'Order', 250.0),
(7, '/api/orders', 'Order', 180.5),
(8, '/api/orders', 'Order', 300.2),
(9, '/api/orders', 'Order', 220.0),
(10, '/api/products', 'Product', 50.0),
(11, '/api/products', 'Product', 60.5),
(12, '/api/products', 'Product', 45.0),
(13, '/api/products', 'Product', 70.2),
(14, '/api/products', 'Product', 55.8);
SELECT
category,
py_percentile(response_time, 25) as p25,
py_percentile(response_time, 50) as p50,
py_percentile(response_time, 75) as p75,
py_percentile(response_time, 95) as p95
FROM api_logs
GROUP BY category
ORDER BY category;
+----------+-------+-------+-------+-------+
| category | p25 | p50 | p75 | p95 |
+----------+-------+-------+-------+-------+
| Order | 235 | 235 | 235 | 235 |
| Product | 55.8 | 55.8 | 55.8 | 55.8 |
| User | 120.5 | 120.5 | 120.5 | 120.5 |
+----------+-------+-------+-------+-------+
Scenario 2: String Deduplication and Aggregation
DROP FUNCTION IF EXISTS py_collect_set(STRING);
CREATE AGGREGATE FUNCTION py_collect_set(STRING)
RETURNS STRING
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "CollectSetUDAF",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
)
AS $$
class CollectSetUDAF:
"""Deduplicate and collect strings, return comma-separated string"""
def __init__(self):
self.items = set()
@property
def aggregate_state(self):
return list(self.items)
def accumulate(self, value):
if value is not None:
self.items.add(value)
def merge(self, other_state):
if other_state:
self.items.update(other_state)
def finish(self):
if not self.items:
return None
return ','.join(sorted(self.items))
$$;
DROP TABLE IF EXISTS page_views;
CREATE TABLE IF NOT EXISTS page_views (
view_id INT,
user_id INT,
page_url VARCHAR(200),
view_time DATETIME
) DUPLICATE KEY(view_id)
DISTRIBUTED BY HASH(view_id) BUCKETS 1
PROPERTIES("replication_num" = "1");
INSERT INTO page_views VALUES
(1, 1001, '/home', '2024-01-01 10:00:00'),
(2, 1001, '/products', '2024-01-01 10:05:00'),
(3, 1001, '/home', '2024-01-01 10:10:00'),
(4, 1001, '/cart', '2024-01-01 10:15:00'),
(5, 1002, '/home', '2024-01-01 11:00:00'),
(6, 1002, '/about', '2024-01-01 11:05:00'),
(7, 1002, '/products', '2024-01-01 11:10:00'),
(8, 1003, '/products', '2024-01-01 12:00:00'),
(9, 1003, '/products', '2024-01-01 12:05:00'),
(10, 1003, '/cart', '2024-01-01 12:10:00'),
(11, 1003, '/checkout', '2024-01-01 12:15:00');
SELECT
user_id,
py_collect_set(page_url) as visited_pages
FROM page_views
GROUP BY user_id
ORDER BY user_id;
+---------+---------------------------+
| user_id | visited_pages |
+---------+---------------------------+
| 1001 | /cart,/home,/products |
| 1002 | /about,/home,/products |
| 1003 | /cart,/checkout,/products |
+---------+---------------------------+
Scenario 3: Moving Average
DROP TABLE IF EXISTS daily_sales;
CREATE TABLE IF NOT EXISTS daily_sales (
id INT,
date DATE,
sales DOUBLE
) DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES("replication_num" = "1");
INSERT INTO daily_sales VALUES
(1, '2024-01-01', 1000.0),
(2, '2024-01-02', 1200.0),
(3, '2024-01-03', 900.0),
(4, '2024-01-04', 1500.0),
(5, '2024-01-05', 1100.0),
(6, '2024-01-06', 1300.0),
(7, '2024-01-07', 1400.0),
(8, '2024-01-08', 1000.0),
(9, '2024-01-09', 1600.0),
(10, '2024-01-10', 1250.0);
SELECT
date,
sales,
py_avg(sales) OVER (
ORDER BY date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) as moving_avg_7days
FROM daily_sales
ORDER BY date;
+------------+-------+-------------------+
| date | sales | moving_avg_7days |
+------------+-------+-------------------+
| 2024-01-01 | 1000 | 1000 |
| 2024-01-02 | 1200 | 1100 |
| 2024-01-03 | 900 | 1033.333333333333 |
| 2024-01-04 | 1500 | 1150 |
| 2024-01-05 | 1100 | 1140 |
| 2024-01-06 | 1300 | 1166.666666666667 |
| 2024-01-07 | 1400 | 1200 |
| 2024-01-08 | 1000 | 1200 |
| 2024-01-09 | 1600 | 1257.142857142857 |
| 2024-01-10 | 1250 | 1307.142857142857 |
+------------+-------+-------------------+
Performance Optimization Recommendations
1. Optimize State Object Size
- Avoid storing large amounts of raw data in state objects
- Use aggregated statistics instead of complete data lists whenever possible
- For scenarios that must store data (such as median), consider sampling or limiting data volume
Not recommended usage:
class BadMedianUDAF:
def __init__(self):
self.all_values = [] # May be very large
def accumulate(self, value):
if value is not None:
self.all_values.append(value)
2. Reduce Object Creation
- Reuse state objects, avoid frequent creation of new objects
- Use primitive data types instead of complex objects
3. Simplify merge Logic
mergemethod is called frequently in distributed environments- Ensure merge operations are efficient and correct
4. Use Incremental Calculation
- For metrics that can be calculated incrementally (such as average), use incremental approach instead of storing all data
5. Avoid Using External Resources
- Do not access databases or external APIs in UDAF
- All calculations should be based on incoming data and internal state
Limitations and Considerations
1. Performance Considerations
- Python UDAF performance is lower than built-in aggregate functions
- Recommended for scenarios with complex logic but moderate data volume
- For large data volume scenarios, prioritize built-in functions or optimize UDAF implementation
2. State Serialization
- Objects returned by
aggregate_statemust support pickle serialization - Supported types: basic types (int, float, str, bool), list, dict, tuple, set, and custom class instances that support pickle serialization
- Not supported: file handles, database connections, socket connections, thread locks, and other objects that cannot be pickle serialized
- If state object cannot be pickle serialized, function execution will report an error
- Recommend prioritizing built-in types (dict, list, tuple) as state objects to ensure compatibility and maintainability
3. Memory Limitations
- State objects occupy memory, avoid storing too much data
- Large state objects will affect performance and stability
4. Function Naming
- Same function name can be repeatedly defined in different databases
- Call time should specify database name (such as
db.func()) to avoid ambiguity
5. Environment Consistency
- Python environment on all BE nodes must be consistent
- Including Python version, dependency package versions, environment configuration
Frequently Asked Questions (FAQ)
Q1: What is the difference between UDAF and UDF?
A: UDF processes single row data, returns single row result. Function is called once per row. UDAF processes multiple rows of data, returns single aggregation result. Used with GROUP BY.
-- UDF: Called for each row
SELECT id, py_upper(name) FROM users;
-- UDAF: Called once per group
SELECT category, py_sum(amount) FROM sales GROUP BY category;
Q2: What is the purpose of the aggregate_state property?
A: aggregate_state is used to serialize and transmit aggregation state in distributed environments:
- Serialization: Convert state object to transmittable format, using pickle protocol for serialization
- Merging: Merge partial aggregation results between different nodes
- Must support pickle serialization: Can return basic types, lists, dictionaries, tuples, sets, and custom class instances that support pickle serialization
- Cannot return: File handles, database connections, socket connections, thread locks, and other objects that cannot be pickle serialized, otherwise function execution will report error
Q3: Can UDAF be used in window functions?
A: Yes. Python UDAF fully supports window functions (OVER clause).
Q4: When is the merge method called?
A: The merge method is called in the following situations:
- Distributed aggregation: Merge partial aggregation results from different BE nodes
- Parallel processing: Merge partial results from different threads within the same node
- Window functions: Merge partial results within window frame
Therefore, merge implementation must be correct, otherwise it will lead to incorrect results.
Python UDTF
Python UDTF (User Defined Table Function) is a custom table function extension mechanism provided by Apache Doris, allowing users to write custom table functions in Python to convert single-row data into multi-row output. Through Python UDTF, users can flexibly implement complex logic such as data splitting, expansion, and generation.
Core features of Python UDTF:
- One Row to Multiple Rows: Receives single row input, outputs zero, one, or multiple rows of results
- Flexible Output Structure: Can define any number and type of output columns, supports both simple types and complex STRUCT types
- Lateral View Support: Used with
LATERAL VIEWto implement data expansion and association - Functional Programming: Uses Python functions and
yieldstatements, concise and intuitive
Environment Dependencies: Before using Python UDTF, you must pre-install pandas and pyarrow libraries in the Python environment on all BE nodes. These are mandatory dependencies for Doris Python UDTF functionality. See Python UDTF Environment Configuration.
Log Path: The Python UDTF Server runtime log is located at output/be/log/python_udf_output.log. Users can check the Python Server's operation status, aggregate function execution information, and debug errors in this log.
UDTF Basic Concepts
Execution Method of Table Functions
Python UDTF is implemented through functions (not classes), and the execution flow of a function is as follows:
- Receive Input: Function receives column values of single row data as parameters
- Process and Produce: Produces zero or multiple rows of results through
yieldstatement - Stateless: Each function call independently processes one row, does not retain state from previous row
Function Requirements
Python UDTF functions must meet the following requirements:
- Use yield to produce results: Produce output rows through
yieldstatement - Parameter type correspondence: Function parameters correspond to parameter types defined in SQL
- Output format matching: Data format of
yieldmust matchRETURNS ARRAY<...>definition
Output Methods
- Single column output:
yield valueproduces single value - Multi-column output:
yield (value1, value2, ...)produces tuple of multiple values - Conditional skip: Do not call
yield, this row produces no output
Basic Syntax
Creating Python UDTF
Python UDTF supports two creation modes: Inline Mode and Module Mode.
If both the file parameter and AS $$ inline Python code are specified, Doris will prioritize loading inline Python code and run the Python UDTF in inline mode.
Inline Mode
Inline mode allows writing Python functions directly in SQL, suitable for simple table function logic.
Syntax:
CREATE TABLES FUNCTION function_name(parameter_type1, parameter_type2, ...)
RETURNS ARRAY<return_type>
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "function_name",
"runtime_version" = "python_version",
"always_nullable" = "true|false"
)
AS $$
def function_name(param1, param2, ...):
'''Function description'''
# Processing logic
yield result # Single column output
# or
yield (result1, result2, ...) # Multi-column output
$$;
Important Syntax Notes:
- Use
CREATE TABLES FUNCTION(note TABLES, plural form)- Single column output:
ARRAY<type>, such asARRAY<INT>- Multi-column output:
ARRAY<STRUCT<col1:type1, col2:type2, ...>>
Example 1: String Split (Single Column Output)
DROP FUNCTION IF EXISTS py_split(STRING, STRING);
CREATE TABLES FUNCTION py_split(STRING, STRING)
RETURNS ARRAY<STRING>
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "split_string_udtf",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
)
AS $$
def split_string_udtf(text, delimiter):
'''Split string by delimiter into multiple rows'''
if text is not None and delimiter is not None:
parts = text.split(delimiter)
for part in parts:
# Also supports yield (part.strip(),)
yield part.strip()
$$;
SELECT part
FROM (SELECT 'apple,banana,orange' as fruits) t
LATERAL VIEW py_split(fruits, ',') tmp AS part;
+--------+
| part |
+--------+
| apple |
| banana |
| orange |
+--------+
Example 2: Generate Number Sequence (Single Column Output)
DROP FUNCTION IF EXISTS py_range(INT, INT);
CREATE TABLES FUNCTION py_range(INT, INT)
RETURNS ARRAY<INT>
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "generate_series_udtf",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
)
AS $$
def generate_series_udtf(start, end):
'''Generate integer sequence from start to end'''
if start is not None and end is not None:
for i in range(start, end + 1):
yield i
$$;
SELECT num
FROM (SELECT 1 as start_val, 5 as end_val) t
LATERAL VIEW py_range(start_val, end_val) tmp AS num;
+------+
| num |
+------+
| 1 |
| 2 |
| 3 |
| 4 |
| 5 |
+------+
SELECT date_add('2024-01-01', n) as date
FROM (SELECT 0 as start_val, 6 as end_val) t
LATERAL VIEW py_range(start_val, end_val) tmp AS n;
+------------+
| date |
+------------+
| 2024-01-01 |
| 2024-01-02 |
| 2024-01-03 |
| 2024-01-04 |
| 2024-01-05 |
| 2024-01-06 |
| 2024-01-07 |
+------------+
Example 3: Multi-Column Output (STRUCT)
DROP FUNCTION IF EXISTS py_duplicate(STRING, INT);
CREATE TABLES FUNCTION py_duplicate(STRING, INT)
RETURNS ARRAY<STRUCT<output:STRING, idx:INT>>
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "duplicate_udtf",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
)
AS $$
def duplicate_udtf(text, n):
'''Duplicate text n times, each with sequence number'''
if text is not None and n is not None:
for i in range(n):
yield (text, i + 1)
$$;
SELECT output, idx
FROM (SELECT 'Hello' as text, 3 as times) t
LATERAL VIEW py_duplicate(text, times) tmp AS output, idx;
+--------+------+
| output | idx |
+--------+------+
| Hello | 1 |
| Hello | 2 |
| Hello | 3 |
+--------+------+
Example 4: Cartesian Product (Multi-Column STRUCT)
DROP FUNCTION IF EXISTS py_cartesian(STRING, STRING);
CREATE TABLES FUNCTION py_cartesian(STRING, STRING)
RETURNS ARRAY<STRUCT<item1:STRING, item2:STRING>>
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "cartesian_udtf",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
)
AS $$
def cartesian_udtf(list1, list2):
'''Generate Cartesian product of two lists'''
if list1 is not None and list2 is not None:
items1 = [x.strip() for x in list1.split(',')]
items2 = [y.strip() for y in list2.split(',')]
for x in items1:
for y in items2:
yield (x, y)
$$;
SELECT item1, item2
FROM (SELECT 'A,B' as list1, 'X,Y,Z' as list2) t
LATERAL VIEW py_cartesian(list1, list2) tmp AS item1, item2;
+-------+-------+
| item1 | item2 |
+-------+-------+
| A | X |
| A | Y |
| A | Z |
| B | X |
| B | Y |
| B | Z |
+-------+-------+
Example 5: JSON Array Parsing
DROP FUNCTION IF EXISTS py_explode_json(STRING);
CREATE TABLES FUNCTION py_explode_json(STRING)
RETURNS ARRAY<STRING>
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "explode_json_udtf",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
)
AS $$
import json
def explode_json_udtf(json_str):
'''Parse JSON array, output each element as one row'''
if json_str is not None:
try:
data = json.loads(json_str)
if isinstance(data, list):
for item in data:
yield (str(item),)
except:
pass # Skip on parsing failure
$$;
SELECT element
FROM (SELECT '["apple", "banana", "cherry"]' as json_data) t
LATERAL VIEW py_explode_json(json_data) tmp AS element;
+---------+
| element |
+---------+
| apple |
| banana |
| cherry |
+---------+
Module Mode
Module mode is suitable for complex table function logic, requiring Python code to be packaged into a .zip archive and referenced during function creation.
Step 1: Write Python Module
Create text_udtf.py file:
import json
import re
def split_lines_udtf(text):
"""Split text by lines"""
if text:
lines = text.split('\n')
for line in lines:
line = line.strip()
if line: # Filter empty lines
yield (line,)
def extract_emails_udtf(text):
"""Extract all email addresses from text"""
if text:
email_pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
emails = re.findall(email_pattern, text)
for email in emails:
yield (email,)
def parse_json_object_udtf(json_str):
"""Parse JSON object, output key-value pairs"""
if json_str:
try:
data = json.loads(json_str)
if isinstance(data, dict):
for key, value in data.items():
yield (key, str(value))
except:
pass
def expand_json_array_udtf(json_str):
"""Expand objects in JSON array, output structured data"""
if json_str:
try:
data = json.loads(json_str)
if isinstance(data, list):
for item in data:
if isinstance(item, dict):
# Assume each object has id, name, score fields
item_id = item.get('id')
name = item.get('name')
score = item.get('score')
yield (item_id, name, score)
except:
pass
def ngram_udtf(text, n):
"""Generate N-gram phrases"""
if text and n and n > 0:
words = text.split()
for i in range(len(words) - n + 1):
ngram = ' '.join(words[i:i+n])
yield (ngram,)
Step 2: Package Python Module
Must package Python files into .zip format (even for a single file):
zip text_udtf.zip text_udtf.py
Step 3: Set Python Module Archive Path
Supports multiple deployment methods, specified through the file parameter for the .zip package path:
Method 1: Local Filesystem (using file:// protocol)
"file" = "file:///path/to/text_udtf.zip"
Method 2: HTTP/HTTPS Remote Download (using http:// or https:// protocol)
"file" = "http://example.com/udtf/text_udtf.zip"
"file" = "https://s3.amazonaws.com/bucket/text_udtf.zip"
- When using remote download method, ensure all BE nodes can access the URL
- First call will download the file, which may have some delay
- Files will be cached, subsequent calls do not need to download again
Step 4: Set symbol Parameter
In module mode, the symbol parameter is used to specify the function's location in the ZIP package, with the format:
[package_name.]module_name.function_name
Parameter Description:
package_name(optional): Top-level Python package name in the ZIP archivemodule_name(required): Python module filename containing the target function (without.pysuffix)function_name(required): UDTF function name
Parsing Rules:
- Doris will split the
symbolstring by.:- If two substrings are obtained, they are
module_nameandfunction_name - If three or more substrings are obtained, the beginning is
package_name, middle ismodule_name, and end isfunction_name
- If two substrings are obtained, they are
Step 5: Create UDTF Functions
DROP FUNCTION IF EXISTS py_split_lines(STRING);
DROP FUNCTION IF EXISTS py_extract_emails(STRING);
DROP FUNCTION IF EXISTS py_parse_json(STRING);
DROP FUNCTION IF EXISTS py_expand_json(STRING);
DROP FUNCTION IF EXISTS py_ngram(STRING, INT);
CREATE TABLES FUNCTION py_split_lines(STRING)
RETURNS ARRAY<STRING>
PROPERTIES (
"type" = "PYTHON_UDF",
"file" = "file:///path/to/text_udtf.zip",
"symbol" = "text_udtf.split_lines_udtf",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
);
CREATE TABLES FUNCTION py_extract_emails(STRING)
RETURNS ARRAY<STRING>
PROPERTIES (
"type" = "PYTHON_UDF",
"file" = "file:///path/to/text_udtf.zip",
"symbol" = "text_udtf.extract_emails_udtf",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
);
CREATE TABLES FUNCTION py_parse_json(STRING)
RETURNS ARRAY<STRUCT<k:STRING, v:STRING>>
PROPERTIES (
"type" = "PYTHON_UDF",
"file" = "file:///path/to/text_udtf.zip",
"symbol" = "text_udtf.parse_json_object_udtf",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
);
CREATE TABLES FUNCTION py_expand_json(STRING)
RETURNS ARRAY<STRUCT<id:INT, name:STRING, score:DOUBLE>>
PROPERTIES (
"type" = "PYTHON_UDF",
"file" = "file:///path/to/text_udtf.zip",
"symbol" = "text_udtf.expand_json_array_udtf",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
);
CREATE TABLES FUNCTION py_ngram(STRING, INT)
RETURNS ARRAY<STRING>
PROPERTIES (
"type" = "PYTHON_UDF",
"file" = "file:///path/to/text_udtf.zip",
"symbol" = "text_udtf.ngram_udtf",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
);
Step 6: Use Functions
SELECT line
FROM (SELECT 'Line 1\nLine 2\nLine 3' as text) t
LATERAL VIEW py_split_lines(text) tmp AS line;
+--------+
| line |
+--------+
| Line 1 |
| Line 2 |
| Line 3 |
+--------+
SELECT email
FROM (SELECT 'Contact us at support@example.com or sales@company.org' as content) t
LATERAL VIEW py_extract_emails(content) tmp AS email;
+---------------------+
| email |
+---------------------+
| support@example.com |
| sales@company.org |
+---------------------+
SELECT k, v
FROM (SELECT '{"name": "Alice", "age": "25"}' as json_data) t
LATERAL VIEW py_parse_json(json_data) tmp AS k, v;
+------+-------+
| k | v |
+------+-------+
| name | Alice |
| age | 25 |
+------+-------+
SELECT id, name, score
FROM (
SELECT '[{"id": 1, "name": "Alice", "score": 95.5}, {"id": 2, "name": "Bob", "score": 88.0}]' as data
) t
LATERAL VIEW py_expand_json(data) tmp AS id, name, score;
+------+-------+-------+
| id | name | score |
+------+-------+-------+
| 1 | Alice | 95.5 |
| 2 | Bob | 88 |
+------+-------+-------+
SELECT ngram
FROM (SELECT 'Apache Doris is a fast database' as text) t
LATERAL VIEW py_ngram(text, 2) tmp AS ngram;
+---------------+
| ngram |
+---------------+
| Apache Doris |
| Doris is |
| is a |
| a fast |
| fast database |
+---------------+
Dropping Python UDTF
-- Syntax
DROP FUNCTION IF EXISTS function_name(parameter_types);
-- Examples
DROP FUNCTION IF EXISTS py_split(STRING, STRING);
DROP FUNCTION IF EXISTS py_range(INT, INT);
DROP FUNCTION IF EXISTS py_explode_json(STRING);
Modifying Python UDTF
Doris does not support directly modifying existing functions, you need to drop first and then recreate:
DROP FUNCTION IF EXISTS py_split(STRING, STRING);
CREATE TABLES FUNCTION py_split(STRING, STRING) ...;
Parameter Description
CREATE TABLES FUNCTION Parameters
| Parameter | Description |
|---|---|
function_name | Function name, follows SQL identifier naming rules |
parameter_types | Parameter type list, such as INT, STRING, DOUBLE, etc. |
RETURNS ARRAY<...> | Returned array type, defines output structure • Single column: ARRAY<type>• Multi-column: ARRAY<STRUCT<col1:type1, col2:type2, ...>> |
PROPERTIES Parameters
| Parameter | Required | Default | Description |
|---|---|---|---|
type | Yes | - | Fixed as "PYTHON_UDF" |
symbol | Yes | - | Python function name. • Inline Mode: Write function name directly, such as "split_string_udtf"• Module Mode: Format is [package_name.]module_name.function_name |
file | No | - | Python .zip package path, only required for module mode. Supports three protocols:• file:// - Local filesystem path• http:// - HTTP remote download• https:// - HTTPS remote download |
runtime_version | Yes | - | Python runtime version, such as "3.10.12" |
always_nullable | No | true | Whether to always return nullable results |
runtime_version Description
- Must fill in complete version number of Python version, format is
x.x.xorx.x.xx - Doris will search for matching version interpreter in configured Python environment
Data Type Mapping
Python UDTF uses exactly the same data type mapping rules as Python UDF, including all types such as integers, floats, strings, date/time, Decimal, boolean, arrays, STRUCT, etc.
For detailed data type mapping relationships, please refer to: Data Type Mapping
NULL Value Handling
- Doris maps SQL
NULLvalues to Python'sNone - In functions, need to check if parameters are
None - Values produced by
yieldcan containNone, indicating that column isNULL
Practical Application Scenarios
Scenario 1: CSV Data Parsing
DROP FUNCTION IF EXISTS py_parse_csv(STRING);
CREATE TABLES FUNCTION py_parse_csv(STRING)
RETURNS ARRAY<STRUCT<name:STRING, age:INT, city:STRING>>
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "parse_csv_udtf",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
)
AS $$
def parse_csv_udtf(csv_data):
'''Parse multi-line data in CSV format'''
if csv_data is None:
return
lines = csv_data.strip().split('\n')
for line in lines:
parts = line.split(',')
if len(parts) >= 3:
name = parts[0].strip()
age = int(parts[1].strip()) if parts[1].strip().isdigit() else None
city = parts[2].strip()
yield (name, age, city)
$$;
SELECT name, age, city
FROM (
SELECT 'Alice,25,Beijing\nBob,30,Shanghai\nCharlie,28,Guangzhou' as data
) t
LATERAL VIEW py_parse_csv(data) tmp AS name, age, city;
+---------+------+-----------+
| name | age | city |
+---------+------+-----------+
| Alice | 25 | Beijing |
| Bob | 30 | Shanghai |
| Charlie | 28 | Guangzhou |
+---------+------+-----------+
Scenario 2: Date Range Generation
DROP FUNCTION IF EXISTS py_date_range(STRING, STRING);
CREATE TABLES FUNCTION py_date_range(STRING, STRING)
RETURNS ARRAY<STRING>
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "date_range_udtf",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
)
AS $$
from datetime import datetime, timedelta
def date_range_udtf(start_date, end_date):
'''Generate date range'''
if start_date is None or end_date is None:
return
try:
start = datetime.strptime(start_date, '%Y-%m-%d')
end = datetime.strptime(end_date, '%Y-%m-%d')
current = start
while current <= end:
yield (current.strftime('%Y-%m-%d'),)
current += timedelta(days=1)
except:
pass
$$;
SELECT date
FROM (SELECT '2024-01-01' as start_date, '2024-01-07' as end_date) t
LATERAL VIEW py_date_range(start_date, end_date) tmp AS date;
+------------+
| date |
+------------+
| 2024-01-01 |
| 2024-01-02 |
| 2024-01-03 |
| 2024-01-04 |
| 2024-01-05 |
| 2024-01-06 |
| 2024-01-07 |
+------------+
Scenario 3: Text Tokenization
DROP FUNCTION IF EXISTS py_tokenize(STRING);
CREATE TABLES FUNCTION py_tokenize(STRING)
RETURNS ARRAY<STRUCT<word:STRING, position:INT>>
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "tokenize_udtf",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
)
AS $$
import re
def tokenize_udtf(text):
'''Tokenize text, output words and positions'''
if text is None:
return
# Use regex to extract words
words = re.findall(r'\b\w+\b', text.lower())
for i, word in enumerate(words, 1):
if len(word) >= 2: # Filter single characters
yield (word, i)
$$;
SELECT word, position
FROM (SELECT 'Apache Doris is a fast OLAP database' as text) t
LATERAL VIEW py_tokenize(text) tmp AS word, position;
+----------+----------+
| word | position |
+----------+----------+
| apache | 1 |
| doris | 2 |
| is | 3 |
| fast | 5 |
| olap | 6 |
| database | 7 |
+----------+----------+
Scenario 4: URL Parameter Parsing
DROP FUNCTION IF EXISTS py_parse_url_params(STRING);
CREATE TABLES FUNCTION py_parse_url_params(STRING)
RETURNS ARRAY<STRUCT<param_name:STRING, param_value:STRING>>
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "parse_url_params_udtf",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
)
AS $$
from urllib.parse import urlparse, parse_qs
def parse_url_params_udtf(url):
'''Parse URL parameters'''
if url is None:
return
try:
parsed = urlparse(url)
params = parse_qs(parsed.query)
for key, values in params.items():
for value in values:
yield (key, value)
except:
pass
$$;
SELECT param_name, param_value
FROM (
SELECT 'https://example.com/page?id=123&category=tech&tag=python&tag=database' as url
) t
LATERAL VIEW py_parse_url_params(url) tmp AS param_name, param_value;
+------------+-------------+
| param_name | param_value |
+------------+-------------+
| id | 123 |
| category | tech |
| tag | python |
| tag | database |
+------------+-------------+
Scenario 5: IP Range Expansion
DROP FUNCTION IF EXISTS py_expand_ip_range(STRING, STRING);
CREATE TABLES FUNCTION py_expand_ip_range(STRING, STRING)
RETURNS ARRAY<STRING>
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "expand_ip_range_udtf",
"runtime_version" = "3.10.12",
"always_nullable" = "true"
)
AS $$
def expand_ip_range_udtf(start_ip, end_ip):
'''Expand IP address range (only supports last octet)'''
if start_ip is None or end_ip is None:
return
try:
# Assume format: 192.168.1.10 to 192.168.1.20
start_parts = start_ip.split('.')
end_parts = end_ip.split('.')
if len(start_parts) == 4 and len(end_parts) == 4:
# Only expand last octet
if start_parts[:3] == end_parts[:3]:
prefix = '.'.join(start_parts[:3])
start_num = int(start_parts[3])
end_num = int(end_parts[3])
for i in range(start_num, end_num + 1):
yield (f"{prefix}.{i}",)
except:
pass
$$;
SELECT ip
FROM (SELECT '192.168.1.10' as start_ip, '192.168.1.15' as end_ip) t
LATERAL VIEW py_expand_ip_range(start_ip, end_ip) tmp AS ip;
+--------------+
| ip |
+--------------+
| 192.168.1.10 |
| 192.168.1.11 |
| 192.168.1.12 |
| 192.168.1.13 |
| 192.168.1.14 |
| 192.168.1.15 |
+--------------+
Performance Optimization Recommendations
1. Control Output Row Count
- For scenarios that may produce large amounts of output, set reasonable upper limits
- Avoid Cartesian product explosion
2. Avoid Duplicate Calculations
If you need to use the same calculation result multiple times, pre-calculate:
# Not recommended
def bad_split_udtf(text):
for i in range(len(text.split(','))): # Split every time
parts = text.split(',')
yield (parts[i],)
# Recommended
def good_split_udtf(text):
parts = text.split(',') # Split only once
for part in parts:
yield (part,)
3. Use Generator Expressions
Leverage Python's generator features, avoid creating intermediate lists:
# Not recommended
def bad_filter_udtf(text, delimiter):
parts = text.split(delimiter)
filtered = [p.strip() for p in parts if p.strip()] # Create list
for part in filtered:
yield (part,)
# Recommended
def good_filter_udtf(text, delimiter):
parts = text.split(delimiter)
for part in parts:
part = part.strip()
if part: # Filter directly
yield (part,)
4. Avoid Accessing External Resources
- Do not access databases, files, networks in UDTF
- All processing should be based on input parameters
Limitations and Considerations
1. Stateless Limitation
- Python UDTF is stateless, each function call independently processes one row
- Cannot retain state between multiple calls
- If cross-row aggregation is needed, should use UDAF
2. Performance Considerations
- Python UDTF performance is lower than built-in table functions
- Suitable for scenarios with complex logic but moderate data volume
- For large data volume scenarios, prioritize optimization or use built-in functions
3. Fixed Output Type
- Type defined in
RETURNS ARRAY<...>is fixed - Values produced by
yieldmust match definition - Single column:
yield valueoryield (value,), multi-column:yield (value1, value2, ...)
4. Function Naming
- Same function name can be repeatedly defined in different databases
- Recommend specifying database name when calling to avoid ambiguity
5. Environment Consistency
- Python environment on all BE nodes must be consistent
- Including Python version, dependency package versions, environment configuration
Frequently Asked Questions (FAQ)
Q1: What is the difference between UDTF and UDF?
A: UDF inputs single row, outputs single row, one-to-one relationship. UDTF inputs single row, outputs zero or multiple rows, one-to-many relationship.
Example:
SELECT py_upper(name) FROM users;
SELECT tag FROM users LATERAL VIEW py_split(tags, ',') tmp AS tag;
Q2: How to output multiple columns?
A: Multi-column output uses STRUCT to define return type, and produces tuple in yield:
CREATE TABLES FUNCTION func(...)
RETURNS ARRAY<STRUCT<col1:INT, col2:STRING>>
...
def func(...):
yield (123, 'hello') # Corresponds to col1 and col2
Q3: Why doesn't my UDTF produce output?
A: Possible reasons:
- Did not call yield: Ensure
yieldis called in function - Condition filtering: All data was filtered out
- Exception caught: Check if try-except swallowed errors
- NULL input: Input is NULL and function returns directly
Q4: Can UDTF maintain state?
A: No. Python UDTF is stateless, each function call independently processes one row. If cross-row aggregation or state maintenance is needed, should use Python UDAF.
Q5: How to limit UDTF output row count?
A: Add counter or conditional judgment in function:
def limited_udtf(data):
max_rows = 1000
count = 0
for item in data.split(','):
if count >= max_rows:
break
yield (item,)
count += 1
Q6: Are there limitations on UDTF output data types?
A: UDTF supports all Doris data types, including basic types (INT, STRING, DOUBLE, etc.) and complex types (ARRAY, STRUCT, MAP, etc.). Output type must be explicitly defined in RETURNS ARRAY<...>.
Q7: Can external resources be accessed in UDTF?
A: Technically possible, but strongly not recommended. UDTF should be purely functional, only process based on input parameters. Accessing external resources (databases, files, networks) will cause performance issues and unpredictable behavior.
Python UDF/UDAF/UDTF Environment Configuration and Multi-Version Management
Python Environment Management
Before using Python UDF/UDAF/UDTF, please ensure that the Backend (BE) nodes of Doris have properly configured the Python runtime environment. Doris supports managing Python environments through Conda or Virtual Environment (venv), allowing different UDFs to use different versions of Python interpreters and dependency libraries.
Doris provides two Python environment management methods:
- Conda Mode: Use Miniconda/Anaconda to manage multi-version environments
- Venv Mode: Use Python's built-in virtual environment (venv) to manage multi-version environments
Installation and Usage of Third-Party Libraries
Python UDF, UDAF, and UDTF can all use third-party libraries. However, due to Doris's distributed nature, third-party libraries must be uniformly installed on all BE nodes, otherwise some nodes will fail to execute.
Installation Steps
-
Install dependencies on each BE node:
# Install using pip
pip install numpy pandas requests
# Or install using conda
conda install numpy pandas requests -y -
Import and use in functions:
import numpy as np
import pandas as pd
# Use in UDF/UDAF/UDTF functions
def my_function(x):
return np.sqrt(x)
pandasandpyarroware mandatory dependencies, must be pre-installed in all Python environments, otherwise Python UDF/UDAF/UDTF cannot run- Must install same version dependencies on all BE nodes, otherwise some nodes will fail to execute
- Installation path must match Python runtime environment used by corresponding UDF/UDAF/UDTF
- Recommend using virtual environments or Conda to manage dependencies, avoid conflicts with system Python environment
BE Configuration Parameters
Set the following parameters in the be.conf configuration file on all BE nodes, and restart BE to make the configuration take effect.
Configuration Parameter Description
| Parameter Name | Type | Possible Values | Default Value | Description |
|---|---|---|---|---|
enable_python_udf_support | bool | true / false | false | Whether to enable Python UDF functionality |
python_env_mode | string | conda / venv | "" | Python multi-version environment management method |
python_conda_root_path | string | Directory path | "" | Root directory of Miniconda Only effective when python_env_mode = conda |
python_venv_root_path | string | Directory path | ${DORIS_HOME}/lib/udf/python | Root directory for venv multi-version management Only effective when python_env_mode = venv |
python_venv_interpreter_paths | string | Path list (separated by :) | "" | Directory list of available Python interpreters Only effective when python_env_mode = venv |
max_python_process_num | int32 | Integer | 0 | Maximum number of processes in Python Server process pool0 means using CPU core count as default value, users can set other positive integers to override default value |
Method 1: Using Conda to Manage Python Environment
1. Configure BE
Add the following configuration in be.conf:
## be.conf
enable_python_udf_support = true
python_env_mode = conda
python_conda_root_path = /path/to/miniconda3
2. Environment Search Rules
Doris will search for Conda environments matching the runtime_version in UDF under the ${python_conda_root_path}/envs/ directory.
Matching Rules:
runtime_versionmust fill in the complete version number of Python version, in the format ofx.x.xorx.x.xx, such as"3.9.18","3.12.11"- Doris will traverse all Conda environments and check whether the actual version of the Python interpreter in each environment exactly matches
runtime_version - If no matching environment is found, an error will be reported:
Python environment with version x.x.x not found
Examples:
- If UDF specifies
runtime_version = "3.9.18", Doris will search for an environment with Python version 3.9.18 in all environments - The environment name can be arbitrary (such as
py39,my-env,data-science, etc.), as long as the Python version in that environment is 3.9.18 - Must fill in complete version number, cannot use version prefix, such as
"3.9"or"3.12"
3. Directory Structure Diagram
## Doris BE Node Filesystem Structure (Conda Mode)
/path/to/miniconda3 ← python_conda_root_path (configured by be.conf)
│
├── bin/
│ ├── conda ← conda command-line tool (used by operations)
│ └── ... ← Other conda tools
│
├── envs/ ← All Conda environments directory
│ │
│ ├── py39/ ← Conda environment 1 (created by user)
│ │ ├── bin/
│ │ │ ├── python ← Python 3.9 interpreter (directly called by Doris)
│ │ │ ├── pip
│ │ │ └── ...
│ │ ├── lib/
│ │ │ └── python3.9/
│ │ │ └── site-packages/ ← Third-party dependencies for this environment (e.g., pandas, pyarrow)
│ │ └── ...
│ │
│ ├── py312/ ← Conda environment 2 (created by user)
│ │ ├── bin/
│ │ │ └── python ← Python 3.12 interpreter
│ │ └── lib/
│ │ └── python3.12/
│ │ └── site-packages/ ← Pre-installed dependencies (e.g., torch, sklearn)
│ │
│ └── ml-env/ ← Semantic environment name (recommended)
│ ├── bin/
│ │ └── python ← Possibly Python 3.12 + GPU dependencies
│ └── lib/
│ └── python3.12/
│ └── site-packages/
│
└── ...
4. Create Conda Environment
Doris Python UDF/UDAF/UDTF functionality mandatorily depends on pandas and pyarrow libraries, which must be pre-installed in all Python environments, otherwise UDF will not run normally.
Execute the following commands on all BE nodes:
# Install Miniconda (if not already installed)
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
bash Miniconda3-latest-Linux-x86_64.sh -b -p /opt/miniconda3
# Create Python 3.9.18 environment and install required dependencies (environment name can be customized)
/opt/miniconda3/bin/conda create -n py39 python=3.9.18 pandas pyarrow -y
# Create Python 3.12.11 environment and pre-install dependencies (Important: Python version must be precisely specified, and pandas and pyarrow must be installed)
/opt/miniconda3/bin/conda create -n py312 python=3.12.11 pandas pyarrow numpy -y
# Activate environment and install additional dependencies
source /opt/miniconda3/bin/activate py39
conda install requests beautifulsoup4 -y
conda deactivate
# Verify Python version in environment
/opt/miniconda3/envs/py39/bin/python --version # Should output: Python 3.9.18
/opt/miniconda3/envs/py312/bin/python --version # Should output: Python 3.12.11
5. Use in UDF
-- Use Python 3.12.11 environment
CREATE FUNCTION py_ml_predict(DOUBLE)
RETURNS DOUBLE
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "evaluate",
"runtime_version" = "3.12.11", -- Must specify complete version number, matching Python 3.12.11
"always_nullable" = "true"
)
AS $$
def evaluate(x):
# Can use libraries installed in Python 3.12.11 environment
return x * 2
$$;
-- Note: Whether the environment name is py312 or ml-env, as long as the Python version is 3.12.11, it can be used
-- runtime_version only cares about Python version, not environment name
Method 2: Using Venv to Manage Python Environment
1. Configure BE
Add the following configuration in be.conf:
## be.conf
enable_python_udf_support = true
python_env_mode = venv
python_venv_root_path = /doris/python_envs
python_venv_interpreter_paths = /opt/python3.9/bin/python3.9:/opt/python3.12/bin/python3.12
2. Configuration Parameter Description
python_venv_root_path: Root directory of virtual environments, all venv environments will be created under this directorypython_venv_interpreter_paths: List of absolute paths to Python interpreters separated by English colon:. Doris will check the version of each interpreter and match the corresponding interpreter according to theruntime_version(complete version number, such as"3.9.18") specified in UDF
3. Directory Structure Diagram
## Doris BE Configuration (be.conf)
python_venv_interpreter_paths = "/opt/python3.9/bin/python3.9:/opt/python3.12/bin/python3.12"
python_venv_root_path = /doris/python_envs
/opt/python3.9/bin/python3.9 ← System pre-installed Python 3.9
/opt/python3.12/bin/python3.12 ← System pre-installed Python 3.12
/doris/python_envs/ ← Root directory of all virtual environments (python_venv_root_path)
│
├── python3.9.18/ ← Environment ID = Python complete version
│ ├── bin/
│ │ ├── python
│ │ └── pip
│ └── lib/python3.9/site-packages/
│ ├── pandas==2.1.0
│ └── pyarrow==15.0.0
│
├── python3.12.11/ ← Python 3.12.11 environment
│ ├── bin/
│ │ ├── python
│ │ └── pip
│ └── lib/python3.12/site-packages/
│ ├── pandas==2.1.0
│ └── pyarrow==15.0.0
│
└── python3.12.10/ ← Python 3.12.10 environment
└── ...
4. Create Venv Environment
Doris Python UDF/UDAF/UDTF functionality mandatorily depends on pandas and pyarrow libraries, which must be pre-installed in all Python environments, otherwise UDF will not run normally.
Execute the following commands on all BE nodes:
# Create virtual environment root directory
mkdir -p /doris/python_envs
# Use Python 3.9 to create virtual environment
/opt/python3.9/bin/python3.9 -m venv /doris/python_envs/python3.9.18
# Activate environment and install required dependencies (pandas and pyarrow must be installed)
source /doris/python_envs/python3.9.18/bin/activate
pip install pandas pyarrow numpy
deactivate
# Use Python 3.12 to create virtual environment
/opt/python3.12/bin/python3.12 -m venv /doris/python_envs/python3.12.11
# Activate environment and install required dependencies (pandas and pyarrow must be installed)
source /doris/python_envs/python3.12.11/bin/activate
pip install pandas pyarrow numpy scikit-learn
deactivate
5. Use in UDF
-- Use Python 3.9.18 environment
CREATE FUNCTION py_clean_text(STRING)
RETURNS STRING
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "evaluate",
"runtime_version" = "3.9.18", -- Must specify complete version number, matching Python 3.9.18
"always_nullable" = "true"
)
AS $$
def evaluate(text):
return text.strip().upper()
$$;
-- Use Python 3.12.11 environment
CREATE FUNCTION py_calculate(DOUBLE)
RETURNS DOUBLE
PROPERTIES (
"type" = "PYTHON_UDF",
"symbol" = "evaluate",
"runtime_version" = "3.12.11", -- Must specify complete version number, matching Python 3.12.11
"always_nullable" = "true"
)
AS $$
import numpy as np
def evaluate(x):
return np.sqrt(x)
$$;
Environment Management Best Practices
1. Choose Appropriate Management Method
| Scenario | Recommended Method | Reason |
|---|---|---|
| Need to frequently switch Python versions | Conda | Good environment isolation, simple dependency management |
| Already have Conda environment | Conda | Can directly reuse existing environment |
| Limited system resources | Venv | Small footprint, fast startup |
| Already have Python system environment | Venv | No need to install additional Conda |
2. Environment Consistency Requirements
All BE nodes must be configured with exactly the same Python environment, including:
- Python version must be consistent
- Installed dependency packages and their versions must be consistent
- Environment directory paths must be consistent
Notes
1. Configuration Modification Takes Effect
- After modifying
be.conf, must restart BE process to take effect - Please ensure configuration is correct before restart to avoid service interruption
2. Path Verification
Please ensure paths are correct before configuration:
# Conda mode: Verify conda path
ls -la /opt/miniconda3/bin/conda
/opt/miniconda3/bin/conda env list
# Venv mode: Verify interpreter path
/opt/python3.9/bin/python3.9 --version
/opt/python3.12/bin/python3.12 --version
3. Permission Settings
Ensure Doris BE process has permission to access Python environment directory:
# Conda mode
chmod -R 755 /opt/miniconda3
# Venv mode
chmod -R 755 /doris/python_envs
chown -R doris:doris /doris/python_envs # Assuming BE process user is doris
4. Resource Limitations
Adjust Python process pool parameters according to actual needs:
## Confirm using CPU core count (recommended, max_python_process_num = 0)
max_python_process_num = 0
## High concurrency scenario, manually specify process count
max_python_process_num = 128
## Resource-constrained scenario, limit process count
max_python_process_num = 32
Environment Verification
Verify on each BE node whether the environment is correct:
# Conda mode
/opt/miniconda3/envs/py39/bin/python --version
/opt/miniconda3/envs/py39/bin/python -c "import pandas; print(pandas.__version__)"
# Venv mode
/doris/python_envs/python3.9.18/bin/python --version
/doris/python_envs/python3.9.18/bin/python -c "import pandas; print(pandas.__version__)"
Common Problem Troubleshooting
Q1: UDF call prompts "Python environment not found"
Reason:
- Version specified by
runtime_versiondoes not exist in the system - Environment path configuration is incorrect
Solution:
# Check Conda environment list
conda env list
# Check if Venv interpreter exists
ls -la /opt/python3.9/bin/python3.9
# Check BE configuration
grep python /path/to/be.conf
Q2: UDF call prompts "ModuleNotFoundError: No module named 'xxx'"
Reason: Required dependency package not installed in Python environment
Q3: Execution results inconsistent across different BE nodes
Reason: Python environment or dependency versions inconsistent across BE nodes
Solution:
- Check Python version and dependency versions on all nodes.
- Verify environment consistency across all nodes.
- Use
requirements.txt(pip) orenvironment.yml(Conda) to deploy environments; common usage examples:
- Using
requirements.txt(pip):
# Export dependencies from development environment
pip freeze > requirements.txt
# On BE nodes, install with target Python interpreter
/path/to/python -m pip install -r requirements.txt
- Using
environment.yml(Conda):
# export dependencies
conda env export --from-history -n py312 -f environment.yml
# On BE nodes, create the environment
conda env create -f environment.yml -n py312
# Or update an existing environment
conda env update -f environment.yml -n py312
Notes:
- Ensure
pandasandpyarroware included in the dependency files and installed with the same versions on all BE nodes. - When installing, use the Python interpreter or Conda path configured for Doris (for example,
/opt/miniconda3/bin/condaor the venv interpreter path used by BE). - Keep dependency files under version control or on shared storage so operations can distribute them consistently to all BE nodes.
- References: pip docs,Conda export/import
Q4: be.conf modification not effective
Possible Reason: BE process not restarted
Usage Limitations
-
Performance Considerations:
- Python UDF performance is lower than built-in functions, recommended for scenarios with complex logic but small data volume
- For large data volume processing, prioritize vectorized mode
-
Type Limitations:
- Does not support special types such as HLL, Bitmap
-
Environment Isolation:
- Same function name can be repeatedly defined in different databases
- Call time should specify database name (such as
db.func()) to avoid ambiguity
-
Concurrency Limitations:
- Python UDF executes through process pool, concurrency is limited by
max_python_process_num - High concurrency scenarios need to appropriately increase this parameter
- Python UDF executes through process pool, concurrency is limited by