From 32a67b746d46afaecd8cfdf71f13a9192d4e94f3 Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Mon, 18 May 2026 21:33:28 +0800 Subject: [PATCH 01/12] implement levenshtein with threshold parameter for Spark 4.x --- .../auron/utils/AuronSparkTestSettings.scala | 5 +- .../auron/utils/AuronSparkTestSettings.scala | 5 +- .../datafusion-ext-functions/src/lib.rs | 1 + .../src/spark_strings.rs | 266 +++++++++++++++++- .../spark/sql/auron/NativeConverters.scala | 2 +- 5 files changed, 267 insertions(+), 12 deletions(-) diff --git a/auron-spark-tests/spark40/src/test/scala/org/apache/auron/utils/AuronSparkTestSettings.scala b/auron-spark-tests/spark40/src/test/scala/org/apache/auron/utils/AuronSparkTestSettings.scala index a9155abe6..6dd9a5e4c 100644 --- a/auron-spark-tests/spark40/src/test/scala/org/apache/auron/utils/AuronSparkTestSettings.scala +++ b/auron-spark-tests/spark40/src/test/scala/org/apache/auron/utils/AuronSparkTestSettings.scala @@ -41,11 +41,10 @@ class AuronSparkTestSettings extends SparkTestSettings { enableSuite[AuronStringFunctionsSuite] .exclude("string concat") .exclude("string concat_ws") - // Spark 4 adds the threshold argument, but native levenshtein currently supports only - // two arguments. - .exclude("string Levenshtein distance") .exclude("UTF-8 string validate") .exclude("RegExpReplace throws the right exception when replace fails on a particular row") + // Native substr does not support BinaryType inputs. + .exclude("string / binary substring function") enableSuite[AuronDataFrameAggregateSuite] .disable("Native execution can crash in Spark 4") diff --git a/auron-spark-tests/spark41/src/test/scala/org/apache/auron/utils/AuronSparkTestSettings.scala b/auron-spark-tests/spark41/src/test/scala/org/apache/auron/utils/AuronSparkTestSettings.scala index 060872689..47dcfa2eb 100644 --- a/auron-spark-tests/spark41/src/test/scala/org/apache/auron/utils/AuronSparkTestSettings.scala +++ b/auron-spark-tests/spark41/src/test/scala/org/apache/auron/utils/AuronSparkTestSettings.scala @@ -41,11 +41,10 @@ class AuronSparkTestSettings extends SparkTestSettings { enableSuite[AuronStringFunctionsSuite] .exclude("string concat") .exclude("string concat_ws") - // Spark 4 adds the threshold argument, but native levenshtein currently supports only - // two arguments. - .exclude("string Levenshtein distance") .exclude("UTF-8 string validate") .exclude("RegExpReplace throws the right exception when replace fails on a particular row") + // Native substr does not support BinaryType inputs. + .exclude("string / binary substring function") enableSuite[AuronDataFrameAggregateSuite] .disable("Native execution can crash in Spark 4") diff --git a/native-engine/datafusion-ext-functions/src/lib.rs b/native-engine/datafusion-ext-functions/src/lib.rs index 3916d63b3..264684dcd 100644 --- a/native-engine/datafusion-ext-functions/src/lib.rs +++ b/native-engine/datafusion-ext-functions/src/lib.rs @@ -76,6 +76,7 @@ pub fn create_auron_ext_function( "Spark_StringLower" => Arc::new(spark_strings::string_lower), "Spark_StringUpper" => Arc::new(spark_strings::string_upper), "Spark_Substring" => Arc::new(spark_strings::spark_substring), + "Spark_Levenshtein" => Arc::new(spark_strings::spark_levenshtein), "Spark_InitCap" => Arc::new(spark_initcap::string_initcap), "Spark_Year" => Arc::new(spark_dates::spark_year), "Spark_Month" => Arc::new(spark_dates::spark_month), diff --git a/native-engine/datafusion-ext-functions/src/spark_strings.rs b/native-engine/datafusion-ext-functions/src/spark_strings.rs index b805346d0..361027bb6 100644 --- a/native-engine/datafusion-ext-functions/src/spark_strings.rs +++ b/native-engine/datafusion-ext-functions/src/spark_strings.rs @@ -17,7 +17,8 @@ use std::sync::Arc; use arrow::{ array::{ - Array, ArrayRef, AsArray, BinaryArray, ListArray, ListBuilder, StringArray, StringBuilder, + Array, ArrayRef, AsArray, BinaryArray, Int32Array, ListArray, ListBuilder, StringArray, + StringBuilder, }, datatypes::DataType, }; @@ -190,6 +191,141 @@ fn substring_range(total_len: usize, pos: i64, len: i64) -> (usize, usize) { (start, end) } +pub fn spark_levenshtein(args: &[ColumnarValue]) -> Result { + if args.len() != 2 && args.len() != 3 { + df_execution_err!( + "levenshtein was called with {} arguments. It requires 2 or 3.", + args.len(), + )?; + } + + let return_array_len = args.iter().find_map(|arg| match arg { + ColumnarValue::Array(array) => Some(array.len()), + ColumnarValue::Scalar(_) => None, + }); + + if let Some(array_len) = return_array_len { + let left_array = args[0].clone().into_array(array_len)?; + let right_array = args[1].clone().into_array(array_len)?; + let threshold_array = args + .get(2) + .map(|threshold| threshold.clone().into_array(array_len)) + .transpose()?; + + for array in [&left_array, &right_array] { + if array.len() != array_len { + df_execution_err!( + "levenshtein array arguments must have the same length, got {} and {}", + array_len, + array.len(), + )?; + } + } + if let Some(threshold_array) = &threshold_array + && threshold_array.len() != array_len + { + df_execution_err!( + "levenshtein array arguments must have the same length, got {} and {}", + array_len, + threshold_array.len(), + )?; + } + + let left_strings = as_string_array(&left_array)?; + let right_strings = as_string_array(&right_array)?; + enum Thresholds<'a> { + Absent, + Int32(&'a Int32Array), + Null, + } + let thresholds = match &threshold_array { + Some(array) if array.data_type() == &DataType::Null => Thresholds::Null, + Some(array) => Thresholds::Int32(as_int32_array(array)?), + None => Thresholds::Absent, + }; + + let result = Int32Array::from_iter((0..array_len).map(|i| { + let threshold = match thresholds { + Thresholds::Absent => None, + Thresholds::Int32(array) => { + Some(if array.is_valid(i) { array.value(i) } else { 0 }) + } + Thresholds::Null => Some(0), + }; + levenshtein_result( + left_strings.is_valid(i).then(|| left_strings.value(i)), + right_strings.is_valid(i).then(|| right_strings.value(i)), + threshold, + ) + })); + return Ok(ColumnarValue::Array(Arc::new(result))); + } + + let left = scalar_string_value(&args[0])?; + let right = scalar_string_value(&args[1])?; + let threshold = args.get(2).map(scalar_threshold_value).transpose()?; + Ok(ColumnarValue::Scalar(ScalarValue::Int32( + levenshtein_result(left, right, threshold), + ))) +} + +fn scalar_string_value(arg: &ColumnarValue) -> Result> { + match arg { + ColumnarValue::Scalar(ScalarValue::Utf8(value)) => Ok(value.as_deref()), + _ => df_execution_err!("levenshtein only supports utf8 string arguments"), + } +} + +fn scalar_threshold_value(arg: &ColumnarValue) -> Result { + match arg { + ColumnarValue::Scalar(ScalarValue::Int32(Some(value))) => Ok(*value), + ColumnarValue::Scalar(scalar) if scalar.is_null() => Ok(0), + _ => df_execution_err!("levenshtein threshold only supports int32"), + } +} + +fn levenshtein_result( + left: Option<&str>, + right: Option<&str>, + threshold: Option, +) -> Option { + let distance = levenshtein_distance(left?, right?); + Some(match threshold { + Some(threshold) if distance > threshold => -1, + _ => distance, + }) +} + +fn levenshtein_distance(left: &str, right: &str) -> i32 { + if left == right { + return 0; + } + + let left_chars = left.chars().collect::>(); + let right_chars = right.chars().collect::>(); + if left_chars.is_empty() { + return right_chars.len() as i32; + } + if right_chars.is_empty() { + return left_chars.len() as i32; + } + + let mut previous = (0..=right_chars.len()).collect::>(); + let mut current = vec![0; right_chars.len() + 1]; + + for (i, left_char) in left_chars.iter().enumerate() { + current[0] = i + 1; + for (j, right_char) in right_chars.iter().enumerate() { + let substitution_cost = usize::from(left_char != right_char); + current[j + 1] = (current[j] + 1) + .min(previous[j + 1] + 1) + .min(previous[j] + substitution_cost); + } + std::mem::swap(&mut previous, &mut current); + } + previous[right_chars.len()] as i32 +} + /// concat() function compatible with spark (returns null if any param is null) /// concat('abcde', 2, 22) = 'abcde222 /// concat('abcde', 2, NULL, 22) = NULL @@ -398,19 +534,21 @@ pub fn string_concat_ws(args: &[ColumnarValue]) -> Result { mod test { use std::sync::Arc; - use arrow::array::{BinaryArray, Int32Array, ListBuilder, StringArray, StringBuilder}; + use arrow::array::{ + BinaryArray, Int32Array, ListBuilder, NullArray, StringArray, StringBuilder, + }; use datafusion::{ common::{ Result, ScalarValue, - cast::{as_binary_array, as_list_array, as_string_array}, + cast::{as_binary_array, as_int32_array, as_list_array, as_string_array}, }, physical_plan::ColumnarValue, }; use datafusion_ext_commons::df_execution_err; use crate::spark_strings::{ - spark_substring, string_concat, string_concat_ws, string_lower, string_repeat, - string_space, string_split, string_upper, + spark_levenshtein, spark_substring, string_concat, string_concat_ws, string_lower, + string_repeat, string_space, string_split, string_upper, }; #[test] @@ -471,6 +609,124 @@ mod test { } } + #[test] + fn test_spark_levenshtein_array() -> Result<()> { + let r = spark_levenshtein(&vec![ + ColumnarValue::Array(Arc::new(StringArray::from_iter(vec![ + Some("kitten".to_string()), + Some("frog".to_string()), + Some("千世".to_string()), + None, + ]))), + ColumnarValue::Array(Arc::new(StringArray::from_iter(vec![ + Some("sitting".to_string()), + Some("fog".to_string()), + Some("世界千世".to_string()), + Some("abc".to_string()), + ]))), + ])?; + let s = r.into_array(4)?; + assert_eq!( + as_int32_array(&s)?.into_iter().collect::>(), + vec![Some(3), Some(1), Some(2), None] + ); + Ok(()) + } + + #[test] + fn test_spark_levenshtein_threshold() -> Result<()> { + let r = spark_levenshtein(&vec![ + ColumnarValue::Array(Arc::new(StringArray::from_iter(vec![ + Some("kitten".to_string()), + Some("kitten".to_string()), + Some("abc".to_string()), + Some("abc".to_string()), + Some("".to_string()), + Some("abc".to_string()), + ]))), + ColumnarValue::Array(Arc::new(StringArray::from_iter(vec![ + Some("sitting".to_string()), + Some("sitting".to_string()), + Some("abc".to_string()), + Some("xyz".to_string()), + Some("abc".to_string()), + Some("abc".to_string()), + ]))), + ColumnarValue::Array(Arc::new(Int32Array::from_iter(vec![ + Some(2), + Some(3), + Some(0), + None, + Some(3), + Some(-1), + ]))), + ])?; + let s = r.into_array(6)?; + assert_eq!( + as_int32_array(&s)?.into_iter().collect::>(), + vec![Some(-1), Some(3), Some(0), Some(-1), Some(3), Some(-1)] + ); + + let r = spark_levenshtein(&vec![ + ColumnarValue::Array(Arc::new(StringArray::from_iter(vec![Some( + "abc".to_string(), + )]))), + ColumnarValue::Array(Arc::new(StringArray::from_iter(vec![Some( + "xyz".to_string(), + )]))), + ColumnarValue::Array(Arc::new(NullArray::new(1))), + ])?; + let s = r.into_array(1)?; + assert_eq!( + as_int32_array(&s)?.into_iter().collect::>(), + vec![Some(-1)] + ); + Ok(()) + } + + #[test] + fn test_spark_levenshtein_scalar() -> Result<()> { + let r = spark_levenshtein(&vec![ + ColumnarValue::Scalar(ScalarValue::Utf8(Some("kitten".to_string()))), + ColumnarValue::Scalar(ScalarValue::Utf8(Some("sitting".to_string()))), + ColumnarValue::Scalar(ScalarValue::Int32(Some(2))), + ])?; + match r { + ColumnarValue::Scalar(ScalarValue::Int32(Some(-1))) => {} + other => df_execution_err!("Expected Int32(-1) scalar, got: {:?}", other)?, + } + + let r = spark_levenshtein(&vec![ + ColumnarValue::Scalar(ScalarValue::Utf8(Some("kitten".to_string()))), + ColumnarValue::Scalar(ScalarValue::Utf8(Some("sitting".to_string()))), + ColumnarValue::Scalar(ScalarValue::Int32(Some(3))), + ])?; + match r { + ColumnarValue::Scalar(ScalarValue::Int32(Some(3))) => {} + other => df_execution_err!("Expected Int32(3) scalar, got: {:?}", other)?, + } + + let r = spark_levenshtein(&vec![ + ColumnarValue::Scalar(ScalarValue::Utf8(Some("kitten".to_string()))), + ColumnarValue::Scalar(ScalarValue::Utf8(Some("sitting".to_string()))), + ColumnarValue::Scalar(ScalarValue::Null), + ])?; + match r { + ColumnarValue::Scalar(ScalarValue::Int32(Some(-1))) => {} + other => df_execution_err!("Expected Int32(-1) scalar, got: {:?}", other)?, + } + + let r = spark_levenshtein(&vec![ + ColumnarValue::Scalar(ScalarValue::Utf8(None)), + ColumnarValue::Scalar(ScalarValue::Utf8(Some("sitting".to_string()))), + ])?; + match r { + ColumnarValue::Scalar(ScalarValue::Int32(None)) => {} + other => df_execution_err!("Expected null Int32 scalar, got: {:?}", other)?, + } + Ok(()) + } + #[test] fn test_spark_substring_string_array() -> Result<()> { let r = spark_substring(&vec![ diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala index 52e813eba..655423497 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala @@ -975,7 +975,7 @@ object NativeConverters extends Logging { buildTimePartExt("Spark_Quarter", child, isPruningExpr, fallback) case e: Levenshtein => - buildScalarFunction(pb.ScalarFunction.Levenshtein, e.children, e.dataType) + buildExtScalarFunction("Spark_Levenshtein", e.children, e.dataType) case e: Hour if datetimeExtractEnabled => buildTimePartExt("Spark_Hour", e.children.head, isPruningExpr, fallback) From 564e5572294ac01b8f74eb58dda68ce1943ce308 Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Tue, 19 May 2026 11:37:09 +0800 Subject: [PATCH 02/12] Refactor spark levenshtein implementation --- .../src/spark_strings.rs | 206 +++++++++--------- 1 file changed, 101 insertions(+), 105 deletions(-) diff --git a/native-engine/datafusion-ext-functions/src/spark_strings.rs b/native-engine/datafusion-ext-functions/src/spark_strings.rs index 361027bb6..37215cd34 100644 --- a/native-engine/datafusion-ext-functions/src/spark_strings.rs +++ b/native-engine/datafusion-ext-functions/src/spark_strings.rs @@ -199,133 +199,129 @@ pub fn spark_levenshtein(args: &[ColumnarValue]) -> Result { )?; } - let return_array_len = args.iter().find_map(|arg| match arg { - ColumnarValue::Array(array) => Some(array.len()), - ColumnarValue::Scalar(_) => None, - }); - - if let Some(array_len) = return_array_len { - let left_array = args[0].clone().into_array(array_len)?; - let right_array = args[1].clone().into_array(array_len)?; - let threshold_array = args - .get(2) - .map(|threshold| threshold.clone().into_array(array_len)) - .transpose()?; + if args + .iter() + .all(|arg| matches!(arg, ColumnarValue::Scalar(_))) + { + let left = match &args[0] { + ColumnarValue::Scalar(ScalarValue::Utf8(value)) => value.as_deref(), + _ => df_execution_err!("levenshtein only supports utf8 string arguments")?, + }; + let right = match &args[1] { + ColumnarValue::Scalar(ScalarValue::Utf8(value)) => value.as_deref(), + _ => df_execution_err!("levenshtein only supports utf8 string arguments")?, + }; + let threshold = match args.get(2) { + Some(ColumnarValue::Scalar(ScalarValue::Int32(Some(value)))) => Some(*value), + Some(ColumnarValue::Scalar(scalar)) if scalar.is_null() => Some(0), + Some(_) => df_execution_err!("levenshtein threshold only supports int32")?, + None => None, + }; + return Ok(ColumnarValue::Scalar(ScalarValue::Int32( + compute_levenshtein(left, right, threshold), + ))); + } - for array in [&left_array, &right_array] { - if array.len() != array_len { - df_execution_err!( - "levenshtein array arguments must have the same length, got {} and {}", - array_len, - array.len(), - )?; - } - } - if let Some(threshold_array) = &threshold_array - && threshold_array.len() != array_len - { + let array_len = args + .iter() + .find_map(|arg| match arg { + ColumnarValue::Array(array) => Some(array.len()), + ColumnarValue::Scalar(_) => None, + }) + .expect("levenshtein arguments include an array"); + let left_array = args[0].clone().into_array(array_len)?; + let right_array = args[1].clone().into_array(array_len)?; + let threshold_array = args + .get(2) + .map(|threshold| threshold.clone().into_array(array_len)) + .transpose()?; + + for array in [&left_array, &right_array] { + if array.len() != array_len { df_execution_err!( "levenshtein array arguments must have the same length, got {} and {}", array_len, - threshold_array.len(), + array.len(), )?; } - - let left_strings = as_string_array(&left_array)?; - let right_strings = as_string_array(&right_array)?; - enum Thresholds<'a> { - Absent, - Int32(&'a Int32Array), - Null, - } - let thresholds = match &threshold_array { - Some(array) if array.data_type() == &DataType::Null => Thresholds::Null, - Some(array) => Thresholds::Int32(as_int32_array(array)?), - None => Thresholds::Absent, - }; - - let result = Int32Array::from_iter((0..array_len).map(|i| { - let threshold = match thresholds { - Thresholds::Absent => None, - Thresholds::Int32(array) => { - Some(if array.is_valid(i) { array.value(i) } else { 0 }) - } - Thresholds::Null => Some(0), - }; - levenshtein_result( - left_strings.is_valid(i).then(|| left_strings.value(i)), - right_strings.is_valid(i).then(|| right_strings.value(i)), - threshold, - ) - })); - return Ok(ColumnarValue::Array(Arc::new(result))); } - - let left = scalar_string_value(&args[0])?; - let right = scalar_string_value(&args[1])?; - let threshold = args.get(2).map(scalar_threshold_value).transpose()?; - Ok(ColumnarValue::Scalar(ScalarValue::Int32( - levenshtein_result(left, right, threshold), - ))) -} - -fn scalar_string_value(arg: &ColumnarValue) -> Result> { - match arg { - ColumnarValue::Scalar(ScalarValue::Utf8(value)) => Ok(value.as_deref()), - _ => df_execution_err!("levenshtein only supports utf8 string arguments"), + if let Some(threshold_array) = &threshold_array + && threshold_array.len() != array_len + { + df_execution_err!( + "levenshtein array arguments must have the same length, got {} and {}", + array_len, + threshold_array.len(), + )?; } -} -fn scalar_threshold_value(arg: &ColumnarValue) -> Result { - match arg { - ColumnarValue::Scalar(ScalarValue::Int32(Some(value))) => Ok(*value), - ColumnarValue::Scalar(scalar) if scalar.is_null() => Ok(0), - _ => df_execution_err!("levenshtein threshold only supports int32"), + let left_strings = as_string_array(&left_array)?; + let right_strings = as_string_array(&right_array)?; + enum Thresholds<'a> { + Absent, + Int32(&'a Int32Array), + Null, } + let thresholds = match &threshold_array { + Some(array) if array.data_type() == &DataType::Null => Thresholds::Null, + Some(array) => Thresholds::Int32(as_int32_array(array)?), + None => Thresholds::Absent, + }; + + let result = Int32Array::from_iter((0..array_len).map(|i| { + let threshold = match thresholds { + Thresholds::Absent => None, + Thresholds::Int32(array) => Some(if array.is_valid(i) { array.value(i) } else { 0 }), + Thresholds::Null => Some(0), + }; + compute_levenshtein( + left_strings.is_valid(i).then(|| left_strings.value(i)), + right_strings.is_valid(i).then(|| right_strings.value(i)), + threshold, + ) + })); + Ok(ColumnarValue::Array(Arc::new(result))) } -fn levenshtein_result( +fn compute_levenshtein( left: Option<&str>, right: Option<&str>, threshold: Option, ) -> Option { - let distance = levenshtein_distance(left?, right?); + let left = left?; + let right = right?; + let distance = if left == right { + 0 + } else { + let left_chars = left.chars().collect::>(); + let right_chars = right.chars().collect::>(); + if left_chars.is_empty() { + right_chars.len() as i32 + } else if right_chars.is_empty() { + left_chars.len() as i32 + } else { + let mut previous = (0..=right_chars.len()).collect::>(); + let mut current = vec![0; right_chars.len() + 1]; + + for (i, left_char) in left_chars.iter().enumerate() { + current[0] = i + 1; + for (j, right_char) in right_chars.iter().enumerate() { + let substitution_cost = usize::from(left_char != right_char); + current[j + 1] = (current[j] + 1) + .min(previous[j + 1] + 1) + .min(previous[j] + substitution_cost); + } + std::mem::swap(&mut previous, &mut current); + } + previous[right_chars.len()] as i32 + } + }; Some(match threshold { Some(threshold) if distance > threshold => -1, _ => distance, }) } -fn levenshtein_distance(left: &str, right: &str) -> i32 { - if left == right { - return 0; - } - - let left_chars = left.chars().collect::>(); - let right_chars = right.chars().collect::>(); - if left_chars.is_empty() { - return right_chars.len() as i32; - } - if right_chars.is_empty() { - return left_chars.len() as i32; - } - - let mut previous = (0..=right_chars.len()).collect::>(); - let mut current = vec![0; right_chars.len() + 1]; - - for (i, left_char) in left_chars.iter().enumerate() { - current[0] = i + 1; - for (j, right_char) in right_chars.iter().enumerate() { - let substitution_cost = usize::from(left_char != right_char); - current[j + 1] = (current[j] + 1) - .min(previous[j + 1] + 1) - .min(previous[j] + substitution_cost); - } - std::mem::swap(&mut previous, &mut current); - } - previous[right_chars.len()] as i32 -} - /// concat() function compatible with spark (returns null if any param is null) /// concat('abcde', 2, 22) = 'abcde222 /// concat('abcde', 2, NULL, 22) = NULL From 65391aedcb568f815f5a7471f0df33e79ec3a598 Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Tue, 19 May 2026 13:45:36 +0800 Subject: [PATCH 03/12] Simplify levenshtein array threshold handling --- .../src/spark_strings.rs | 42 ++++--------------- 1 file changed, 9 insertions(+), 33 deletions(-) diff --git a/native-engine/datafusion-ext-functions/src/spark_strings.rs b/native-engine/datafusion-ext-functions/src/spark_strings.rs index 37215cd34..5f18d9da9 100644 --- a/native-engine/datafusion-ext-functions/src/spark_strings.rs +++ b/native-engine/datafusion-ext-functions/src/spark_strings.rs @@ -236,43 +236,19 @@ pub fn spark_levenshtein(args: &[ColumnarValue]) -> Result { .map(|threshold| threshold.clone().into_array(array_len)) .transpose()?; - for array in [&left_array, &right_array] { - if array.len() != array_len { - df_execution_err!( - "levenshtein array arguments must have the same length, got {} and {}", - array_len, - array.len(), - )?; - } - } - if let Some(threshold_array) = &threshold_array - && threshold_array.len() != array_len - { - df_execution_err!( - "levenshtein array arguments must have the same length, got {} and {}", - array_len, - threshold_array.len(), - )?; - } - let left_strings = as_string_array(&left_array)?; let right_strings = as_string_array(&right_array)?; - enum Thresholds<'a> { - Absent, - Int32(&'a Int32Array), - Null, - } - let thresholds = match &threshold_array { - Some(array) if array.data_type() == &DataType::Null => Thresholds::Null, - Some(array) => Thresholds::Int32(as_int32_array(array)?), - None => Thresholds::Absent, - }; + let thresholds = threshold_array + .as_ref() + .filter(|array| array.data_type() != &DataType::Null) + .map(|array| as_int32_array(array)) + .transpose()?; let result = Int32Array::from_iter((0..array_len).map(|i| { - let threshold = match thresholds { - Thresholds::Absent => None, - Thresholds::Int32(array) => Some(if array.is_valid(i) { array.value(i) } else { 0 }), - Thresholds::Null => Some(0), + let threshold = match &threshold_array { + Some(array) if array.data_type() == &DataType::Null => Some(0), + Some(_) => thresholds.map(|array| if array.is_valid(i) { array.value(i) } else { 0 }), + None => None, }; compute_levenshtein( left_strings.is_valid(i).then(|| left_strings.value(i)), From 25626770b68dfc21a4de6aac033183dfd74e7515 Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Tue, 19 May 2026 21:09:26 +0800 Subject: [PATCH 04/12] Comment out levenshtein proto enum --- native-engine/auron-planner/proto/auron.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native-engine/auron-planner/proto/auron.proto b/native-engine/auron-planner/proto/auron.proto index 13b9f48bc..441191d7c 100644 --- a/native-engine/auron-planner/proto/auron.proto +++ b/native-engine/auron-planner/proto/auron.proto @@ -283,7 +283,7 @@ enum ScalarFunction { Hex=66; Power=67; IsNaN=69; - Levenshtein=80; + // Levenshtein=80; FindInSet=81; Nvl=82; Nvl2=83; From 764a10a45bb8baa27a5861b77d84785b1c9cb14e Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Tue, 19 May 2026 21:17:33 +0800 Subject: [PATCH 05/12] Clean up levenshtein planner fallback --- .../scala/org/apache/auron/utils/AuronSparkTestSettings.scala | 3 +++ native-engine/auron-planner/src/planner.rs | 3 +-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/auron-spark-tests/spark35/src/test/scala/org/apache/auron/utils/AuronSparkTestSettings.scala b/auron-spark-tests/spark35/src/test/scala/org/apache/auron/utils/AuronSparkTestSettings.scala index 8b57c0c38..7d0b47e8f 100644 --- a/auron-spark-tests/spark35/src/test/scala/org/apache/auron/utils/AuronSparkTestSettings.scala +++ b/auron-spark-tests/spark35/src/test/scala/org/apache/auron/utils/AuronSparkTestSettings.scala @@ -62,6 +62,9 @@ class AuronSparkTestSettings extends SparkTestSettings { enableSuite[AuronStringFunctionsSuite] // Native levenshtein has a Spark 3.5+ result or schema comparison mismatch. .exclude("string Levenshtein distance") + // Native substr does not support BinaryType inputs. + // See https://github.com/apache/auron/issues/1724 + .exclude("string / binary substring function") enableSuite[AuronDataFrameAggregateSuite] // See https://github.com/apache/auron/issues/1840 diff --git a/native-engine/auron-planner/src/planner.rs b/native-engine/auron-planner/src/planner.rs index 418cc951d..480e828d3 100644 --- a/native-engine/auron-planner/src/planner.rs +++ b/native-engine/auron-planner/src/planner.rs @@ -1313,8 +1313,7 @@ impl From for Arc { ScalarFunction::Rpad => f::unicode::rpad(), ScalarFunction::SplitPart => f::string::split_part(), ScalarFunction::StartsWith => f::string::starts_with(), - ScalarFunction::Levenshtein => f::string::levenshtein(), - + // ScalarFunction::Levenshtein => f::string::levenshtein(), ScalarFunction::FindInSet => f::unicode::find_in_set(), ScalarFunction::Strpos => f::unicode::strpos(), ScalarFunction::Substr => f::unicode::substr(), From 72245302570597232a164ab97cbda66c046288dc Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Tue, 19 May 2026 21:17:33 +0800 Subject: [PATCH 06/12] Clean up levenshtein planner fallback --- .../scala/org/apache/auron/utils/AuronSparkTestSettings.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/auron-spark-tests/spark35/src/test/scala/org/apache/auron/utils/AuronSparkTestSettings.scala b/auron-spark-tests/spark35/src/test/scala/org/apache/auron/utils/AuronSparkTestSettings.scala index 7d0b47e8f..7ac227587 100644 --- a/auron-spark-tests/spark35/src/test/scala/org/apache/auron/utils/AuronSparkTestSettings.scala +++ b/auron-spark-tests/spark35/src/test/scala/org/apache/auron/utils/AuronSparkTestSettings.scala @@ -60,8 +60,6 @@ class AuronSparkTestSettings extends SparkTestSettings { enableSuite[AuronMiscFunctionsSuite] enableSuite[AuronStringFunctionsSuite] - // Native levenshtein has a Spark 3.5+ result or schema comparison mismatch. - .exclude("string Levenshtein distance") // Native substr does not support BinaryType inputs. // See https://github.com/apache/auron/issues/1724 .exclude("string / binary substring function") From 6dddfa98f2436d88db75a5633ca24489a6474a33 Mon Sep 17 00:00:00 2001 From: linfeng <33561138+lyne7-sc@users.noreply.github.com> Date: Thu, 21 May 2026 20:11:33 +0800 Subject: [PATCH 07/12] Fix null handling in levenshtein function --- .../src/spark_strings.rs | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/native-engine/datafusion-ext-functions/src/spark_strings.rs b/native-engine/datafusion-ext-functions/src/spark_strings.rs index 5f18d9da9..ec675370d 100644 --- a/native-engine/datafusion-ext-functions/src/spark_strings.rs +++ b/native-engine/datafusion-ext-functions/src/spark_strings.rs @@ -213,7 +213,9 @@ pub fn spark_levenshtein(args: &[ColumnarValue]) -> Result { }; let threshold = match args.get(2) { Some(ColumnarValue::Scalar(ScalarValue::Int32(Some(value)))) => Some(*value), - Some(ColumnarValue::Scalar(scalar)) if scalar.is_null() => Some(0), + Some(ColumnarValue::Scalar(scalar)) if scalar.is_null() => { + return Ok(ColumnarValue::Scalar(ScalarValue::Int32(None))); + } Some(_) => df_execution_err!("levenshtein threshold only supports int32")?, None => None, }; @@ -246,8 +248,14 @@ pub fn spark_levenshtein(args: &[ColumnarValue]) -> Result { let result = Int32Array::from_iter((0..array_len).map(|i| { let threshold = match &threshold_array { - Some(array) if array.data_type() == &DataType::Null => Some(0), - Some(_) => thresholds.map(|array| if array.is_valid(i) { array.value(i) } else { 0 }), + Some(array) if array.data_type() == &DataType::Null => return None, + Some(_) => { + let arr = thresholds.unwrap(); + if !arr.is_valid(i) { + return None; + } + Some(arr.value(i)) + } None => None, }; compute_levenshtein( @@ -636,7 +644,7 @@ mod test { let s = r.into_array(6)?; assert_eq!( as_int32_array(&s)?.into_iter().collect::>(), - vec![Some(-1), Some(3), Some(0), Some(-1), Some(3), Some(-1)] + vec![Some(-1), Some(3), Some(0), None, Some(3), Some(-1)] ); let r = spark_levenshtein(&vec![ @@ -651,7 +659,7 @@ mod test { let s = r.into_array(1)?; assert_eq!( as_int32_array(&s)?.into_iter().collect::>(), - vec![Some(-1)] + vec![None] ); Ok(()) } @@ -684,8 +692,8 @@ mod test { ColumnarValue::Scalar(ScalarValue::Null), ])?; match r { - ColumnarValue::Scalar(ScalarValue::Int32(Some(-1))) => {} - other => df_execution_err!("Expected Int32(-1) scalar, got: {:?}", other)?, + ColumnarValue::Scalar(ScalarValue::Int32(None)) => {} + other => df_execution_err!("Expected null Int32 scalar, got: {:?}", other)?, } let r = spark_levenshtein(&vec![ From 40e432325623596b940a00451c8e7bf956029743 Mon Sep 17 00:00:00 2001 From: linfeng <33561138+lyne7-sc@users.noreply.github.com> Date: Thu, 21 May 2026 20:22:54 +0800 Subject: [PATCH 08/12] lint --- .../datafusion-ext-functions/src/spark_strings.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/native-engine/datafusion-ext-functions/src/spark_strings.rs b/native-engine/datafusion-ext-functions/src/spark_strings.rs index ec675370d..661c0ebba 100644 --- a/native-engine/datafusion-ext-functions/src/spark_strings.rs +++ b/native-engine/datafusion-ext-functions/src/spark_strings.rs @@ -249,13 +249,10 @@ pub fn spark_levenshtein(args: &[ColumnarValue]) -> Result { let result = Int32Array::from_iter((0..array_len).map(|i| { let threshold = match &threshold_array { Some(array) if array.data_type() == &DataType::Null => return None, - Some(_) => { - let arr = thresholds.unwrap(); - if !arr.is_valid(i) { - return None; - } - Some(arr.value(i)) - } + Some(_) => match thresholds { + Some(arr) if arr.is_valid(i) => Some(arr.value(i)), + _ => return None, + }, None => None, }; compute_levenshtein( From a9253aa3d375f790bd602f184f8dcd9ab097c105 Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Tue, 26 May 2026 21:44:24 +0800 Subject: [PATCH 09/12] optimize with bounded threshold --- .../src/spark_strings.rs | 123 +++++++++++++++--- 1 file changed, 104 insertions(+), 19 deletions(-) diff --git a/native-engine/datafusion-ext-functions/src/spark_strings.rs b/native-engine/datafusion-ext-functions/src/spark_strings.rs index 661c0ebba..276871cc3 100644 --- a/native-engine/datafusion-ext-functions/src/spark_strings.rs +++ b/native-engine/datafusion-ext-functions/src/spark_strings.rs @@ -271,30 +271,19 @@ fn compute_levenshtein( ) -> Option { let left = left?; let right = right?; + + if matches!(threshold, Some(t) if t < 0) { + return Some(-1); + } + let distance = if left == right { 0 } else { let left_chars = left.chars().collect::>(); let right_chars = right.chars().collect::>(); - if left_chars.is_empty() { - right_chars.len() as i32 - } else if right_chars.is_empty() { - left_chars.len() as i32 - } else { - let mut previous = (0..=right_chars.len()).collect::>(); - let mut current = vec![0; right_chars.len() + 1]; - - for (i, left_char) in left_chars.iter().enumerate() { - current[0] = i + 1; - for (j, right_char) in right_chars.iter().enumerate() { - let substitution_cost = usize::from(left_char != right_char); - current[j + 1] = (current[j] + 1) - .min(previous[j + 1] + 1) - .min(previous[j] + substitution_cost); - } - std::mem::swap(&mut previous, &mut current); - } - previous[right_chars.len()] as i32 + match threshold.and_then(|threshold| usize::try_from(threshold).ok()) { + Some(threshold) => limited_levenshtein_distance(&left_chars, &right_chars, threshold), + None => unlimited_levenshtein_distance(&left_chars, &right_chars), } }; Some(match threshold { @@ -303,6 +292,102 @@ fn compute_levenshtein( }) } +fn unlimited_levenshtein_distance(left_chars: &[char], right_chars: &[char]) -> i32 { + if left_chars.is_empty() { + return right_chars.len() as i32; + } + if right_chars.is_empty() { + return left_chars.len() as i32; + } + + let right_len = right_chars.len(); + let mut previous = (0..=right_len).collect::>(); + let mut current = vec![0; right_len + 1]; + + for (i, left_char) in left_chars.iter().enumerate() { + current[0] = i + 1; + for (j, right_char) in right_chars.iter().enumerate() { + let substitution_cost = usize::from(left_char != right_char); + current[j + 1] = (current[j] + 1) + .min(previous[j + 1] + 1) + .min(previous[j] + substitution_cost); + } + std::mem::swap(&mut previous, &mut current); + } + + previous[right_len] as i32 +} + +fn limited_levenshtein_distance( + left_chars: &[char], + right_chars: &[char], + threshold: usize, +) -> i32 { + if left_chars.len().abs_diff(right_chars.len()) > threshold { + return -1; + } + if left_chars.is_empty() { + return if right_chars.len() > threshold { + -1 + } else { + right_chars.len() as i32 + }; + } + if right_chars.is_empty() { + return if left_chars.len() > threshold { + -1 + } else { + left_chars.len() as i32 + }; + } + + let right_len = right_chars.len(); + let out_of_band = threshold + 1; + let mut previous = vec![out_of_band; right_len + 1]; + let mut current = vec![out_of_band; right_len + 1]; + + for j in 0..=right_len.min(threshold) { + previous[j] = j; + } + + for (i, left_char) in left_chars.iter().enumerate() { + let row = i + 1; + let min = row.saturating_sub(threshold).max(1); + let max = right_len.min(row.saturating_add(threshold)); + + if min == 1 { + current[0] = row.min(out_of_band); + } else { + current[min - 1] = out_of_band; + } + + let mut row_min = out_of_band; + for column in min..=max { + let substitution_cost = usize::from(left_char != &right_chars[column - 1]); + current[column] = (current[column - 1] + 1) + .min(previous[column] + 1) + .min(previous[column - 1] + substitution_cost) + .min(out_of_band); + row_min = row_min.min(current[column]); + } + + if max < right_len { + current[max + 1] = out_of_band; + } + if row_min > threshold { + return -1; + } + + std::mem::swap(&mut previous, &mut current); + } + + if previous[right_len] > threshold { + -1 + } else { + previous[right_len] as i32 + } +} + /// concat() function compatible with spark (returns null if any param is null) /// concat('abcde', 2, 22) = 'abcde222 /// concat('abcde', 2, NULL, 22) = NULL From 2e168ae50927c86f224bdeae1c4ce474087f1849 Mon Sep 17 00:00:00 2001 From: linfeng <33561138+lyne7-sc@users.noreply.github.com> Date: Mon, 15 Jun 2026 11:42:37 +0800 Subject: [PATCH 10/12] add test cases --- .../src/spark_strings.rs | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/native-engine/datafusion-ext-functions/src/spark_strings.rs b/native-engine/datafusion-ext-functions/src/spark_strings.rs index 276871cc3..70ba813ff 100644 --- a/native-engine/datafusion-ext-functions/src/spark_strings.rs +++ b/native-engine/datafusion-ext-functions/src/spark_strings.rs @@ -743,6 +743,27 @@ mod test { as_int32_array(&s)?.into_iter().collect::>(), vec![None] ); + + // Long-string cases to stress banded DP sentinel handling + let r = spark_levenshtein(&vec![ + ColumnarValue::Array(Arc::new(StringArray::from_iter(vec![ + Some("abcdefghij".to_string()), + Some("abcdefghij".to_string()), + ]))), + ColumnarValue::Array(Arc::new(StringArray::from_iter(vec![ + Some("abXdefghij".to_string()), + Some("abXdefghij".to_string()), + ]))), + ColumnarValue::Array(Arc::new(Int32Array::from_iter(vec![ + Some(1), + Some(0), + ]))), + ])?; + let s = r.into_array(2)?; + assert_eq!( + as_int32_array(&s)?.into_iter().collect::>(), + vec![Some(1), Some(-1)] + ); Ok(()) } From f19b897ab9ddf76cf24809bdfe477943e0d8d79f Mon Sep 17 00:00:00 2001 From: linfeng <33561138+lyne7-sc@users.noreply.github.com> Date: Mon, 15 Jun 2026 11:45:37 +0800 Subject: [PATCH 11/12] reserved levenshtein identifier --- native-engine/auron-planner/proto/auron.proto | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/native-engine/auron-planner/proto/auron.proto b/native-engine/auron-planner/proto/auron.proto index 441191d7c..5ac330a57 100644 --- a/native-engine/auron-planner/proto/auron.proto +++ b/native-engine/auron-planner/proto/auron.proto @@ -283,7 +283,8 @@ enum ScalarFunction { Hex=66; Power=67; IsNaN=69; - // Levenshtein=80; + reserved 80; + reserved "Levenshtein"; FindInSet=81; Nvl=82; Nvl2=83; From b906161bafb3a398aa2082737354c5bd9a805d5b Mon Sep 17 00:00:00 2001 From: linfeng <33561138+lyne7-sc@users.noreply.github.com> Date: Mon, 15 Jun 2026 13:59:24 +0800 Subject: [PATCH 12/12] lint --- native-engine/datafusion-ext-functions/src/spark_strings.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/native-engine/datafusion-ext-functions/src/spark_strings.rs b/native-engine/datafusion-ext-functions/src/spark_strings.rs index 70ba813ff..1d04d6506 100644 --- a/native-engine/datafusion-ext-functions/src/spark_strings.rs +++ b/native-engine/datafusion-ext-functions/src/spark_strings.rs @@ -754,10 +754,7 @@ mod test { Some("abXdefghij".to_string()), Some("abXdefghij".to_string()), ]))), - ColumnarValue::Array(Arc::new(Int32Array::from_iter(vec![ - Some(1), - Some(0), - ]))), + ColumnarValue::Array(Arc::new(Int32Array::from_iter(vec![Some(1), Some(0)]))), ])?; let s = r.into_array(2)?; assert_eq!(