ユーザー定義のスカラー関数 - Python
この記事には、Python ユーザー定義関数 (UDF) の例が含まれています。 UDF を登録する方法、UDF を呼び出す方法を示し、Spark SQL での部分式の評価順序に関する注意事項を示します。
Databricks Runtime 14.0 以降では、 Pythonユーザー定義テーブル関数 (UDTF) を使用して、スカラー値ではなくリレーション全体を返す関数を登録できます。 Python ユーザー定義テーブル関数 (UDTF)を参照してください。
注
Databricks Runtime12.2LTS 以前では、標準アクセスPython PandasUnity Catalogモードを使用するコンピュート UDF と UDF はサポートされていません。Scalar Python UDFs と Pandas UDF は、Databricks Runtime 13.3 LTS 以降ですべてのアクセス モードでサポートされています。
Databricks Runtime13.3LTS 以降では、 構文を使用してスカラーPython UDF を に登録できます。Unity CatalogSQLUnity Catalogのユーザー定義関数 (UDF)を参照してください。
Unity Catalog 対応クラスター上の Python UDF に対する Graviton インスタンスのサポートは、Databricks Runtime 15.2 以降で利用できます。
重要
UDF とUDAFs Gravitonは、Unity Catalog Databricks Runtime15.2 以降の標準アクセス モードと で構成された クラスタリングでサポートされています。
関数を UDF として登録する
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
必要に応じて、UDF の戻り値の型を設定できます。 デフォルトの戻り値の型は StringType
です。
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
Spark SQL で UDF を呼び出す
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
DataFrames で UDF を使用する
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
または、注釈構文を使用して同じ UDF を宣言することもできます。
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
評価順序と null チェック
Spark SQL (SQL および DataFrame および DataFrame および DataDataset API を含む) は、部分式の評価順序を保証するものではありません。 特に、演算子または関数の入力は、必ずしも左から右、またはその他の固定された順序で評価されるとは限りません。 たとえば、論理 AND
式と OR
式には、左から右への "短絡" セマンティクスはありません。
したがって、 Boolean 式の副作用や評価順序、 WHERE
句や HAVING
句の順序は、クエリーの最適化や計画時に並べ替えることができるため、危険です。 具体的には、UDF が null チェックのために SQL の短絡セマンティクスに依存している場合、UDF を呼び出す前に null チェックが行われる保証はありません。 例えば
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
この WHERE
句は、null を除外した後に呼び出される strlen
UDF を保証するものではありません。
適切な null チェックを実行するには、次のいずれかを実行することをお勧めします。
UDF 自体をヌル対応にし、UDF 自体の内部でヌル チェックを行う
IF
式またはCASE WHEN
式を使用して null チェックを実行し、条件分岐で UDF を呼び出す
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
制限事項
PySpark UDF には、次の制限が適用されます。
モジュールのインポート制限:PySpark 標準アクセスGit モード クラスタリングとサーバレス コンピュートの UDFUnity Catalog Databricks Runtimeは、 14.2 以前でモジュールをインポートするために、 フォルダ、ワークスペース ファイル、または ボリュームにアクセスできません。
ブロードキャスト変数: 標準アクセス モード クラスタリング と サーバレス コンピュートの PySpark UDF は、ブロードキャスト変数をサポートしていません。
インスタンスプロファイル: 標準アクセスモードの PySpark UDFs クラスタリングとサーバレス コンピュートはインスタンスプロファイルをサポートしていません。
メモリ制限: サーバレス コンピュート上の PySpark UDF には、 PySpark UDFあたり 1GB のメモリ制限があります。 この制限を超えると、次のエラーが発生します。
[UDF_PYSPARK_ERROR.OOM] Python worker exited unexpectedly (crashed) due to running out of memory.