Deltaテーブルにリキッドクラスタリングを使用する

Delta Lake リキッドクラスタリングは、テーブルのパーティション分割とZORDERを置き換えて、データレイアウトの決定を簡素化し、クエリのパフォーマンスを最適化します。 リキッドクラスタリングは、既存のデータを書き換えることなくクラスタリングキーを再定義する柔軟性を提供し、時間の経過とともに分析ニーズに沿ってデータレイアウトを進化させることができます。 リキッドクラスタリングは、ストリーミング テーブルとマテリアライズドビューの両方に適用されます。

重要

Databricksでは、リキッドクラスタリングが有効なすべてのテーブルに対して、Databricks Runtime 15.2以降を使用することをお勧めします。制限付きのパブリックプレビューサポートは、Databricks Runtime 13.3 LTS以降で使用できます。

注:

Databricks Runtime 13.3LTS以降では、リキッドクラスタリングが有効になっているテーブルで行レベルの同時実行がサポートされます。行レベルの同時実行は、削除ベクトルが有効なすべてのテーブルに対し、Databricks Runtime 14.2以降で一般公開されています。「Databricksの分離レベルと書き込み競合」を参照してください。

リキッドクラスタリングは何に使用されますか?

Databricks では、ストリーミング テーブル (ST) とマテリアライズド ビュー (MV) の両方を含む、すべての新しい Delta テーブルに対してリキッドクラスタリングを推奨しています。 クラスタリングの恩恵を受けるシナリオの例を次に示します。

  • カーディナリティの高い列によってフィルタリングされることが多いテーブル。

  • データの分散に大きな偏りがあるテーブル。

  • 急速に増大し、メンテナンスとチューニングが必要となるテーブル。

  • 並列書き込み要件のあるテーブル。

  • 時間の経過と共に変化するアクセスパターンを持つテーブル。

  • 一般的なパーティションキーでは、パーティションが多すぎたり少なすぎたりするテーブル。

リキッドクラスタリングを有効にする

リキッドクラスタリングは、既存のテーブルに対して、またはテーブルの作成時に有効にすることができます。クラスタリングはパーティション分割やZORDERと互換性がなく、Databricksを使用してテーブル内のデータのすべてのレイアウト操作と最適化操作を管理する必要があります。リキッドクラスタリングを有効にすると、OPTIMIZEジョブを通常どおり実行することでデータを増分的にクラスター化できます。「クラスタリングをトリガーする方法」を参照してください。

リキッドクラスタリングを有効にするには、次の例のように、テーブル作成ステートメントにCLUSTER BYフレーズを追加します。

注:

Databricks Runtime 14.2以降では、PythonまたはScalaでDataFrame APIとDeltaTable APIを使用してリキッドクラスタリングを有効にできます。

-- Create an empty table
CREATE TABLE table1(col0 int, col1 string) CLUSTER BY (col0);

-- Using a CTAS statement
CREATE EXTERNAL TABLE table2 CLUSTER BY (col0)  -- specify clustering after table name, not in subquery
LOCATION 'table_location'
AS SELECT * FROM table1;

-- Using a LIKE statement to copy configurations
CREATE TABLE table3 LIKE table1;
# Create an empty table
(DeltaTable.create()
  .tableName("table1")
  .addColumn("col0", dataType = "INT")
  .addColumn("col1", dataType = "STRING")
  .clusterBy("col0")
  .execute())

# Using a CTAS statement
df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")

# CTAS using DataFrameWriterV2
df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()
// Create an empty table
DeltaTable.create()
  .tableName("table1")
  .addColumn("col0", dataType = "INT")
  .addColumn("col1", dataType = "STRING")
  .clusterBy("col0")
  .execute()

// Using a CTAS statement
val df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")

// CTAS using DataFrameWriterV2
val df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()

Databricks Runtime 16.0 以降では、次の例のように、構造化ストリーミング write を使用してリキッドクラスタリングを有効にしたテーブルを作成できます。

(spark.readStream.table("source_table")
  .writeStream
  .clusterBy("column_name")
  .option("checkpointLocation", checkpointPath)
  .toTable("target_table")
)
spark.readStream.table("source_table")
  .writeStream
  .clusterBy("column_name")
  .option("checkpointLocation", checkpointPath)
  .toTable("target_table")

警告

リキッドクラスタリングを有効にして作成されたテーブルでは、作成時に多数のDeltaテーブル機能が有効になっており、Deltaライターバージョン7およびリーダーバージョン3が使用されます。これらの機能の一部の有効化設定を上書きできます。「デフォルトの機能有効化設定を上書きする(オプション)」を参照してください。

テーブルプロトコルのバージョンはダウングレードできません。また、有効なDeltaリーダープロトコルテーブル機能のすべてをサポートしていないDelta Lakeクライアントからは、クラスタリングが有効になっているテーブルを読み取ることができません。「DatabricksでDelta Lake機能の互換性を管理する方法」を参照してください。

次の構文を使用すると、既存のパーティション化されていないDeltaテーブルでリキッドクラスタリングを有効にできます。

ALTER TABLE <table_name>
CLUSTER BY (<clustering_columns>)

重要

デフォルトの動作では、以前に書き込まれたデータにクラスタリングは適用されません。 すべてのレコードに対して再クラスタリングを強制するには、 OPTIMIZE FULLを使用する必要があります。 「すべてのレコードの再クラスタリングの強制」を参照してください。

クラスタリング・キーを削除するには、次の構文を使用します。

ALTER TABLE table_name CLUSTER BY NONE;

Automatic リキッドクラスタリング

プレビュー

Automatic リキッドクラスタリングは パブリック プレビュー段階です。

Databricks Runtime 15.4 LTS以降では、Unity Catalogマネージドテーブルの自動リキッドクラスタリングを有効にできます。自動リキッドクラスタリングを有効にすると、Databricks はクラスタリングキーをインテリジェントに選択してクエリのパフォーマンスを最適化します。 自動リキッドクラスタリングを有効にするには、 CLUSTER BY AUTO 節を使用します。

有効にすると、自動キー選択操作と自動クラスタリング操作はメンテナンス操作として非同期に実行され、テーブルに対して予測的最適化が有効になっている必要があります。 予測的最適化 for Unity Catalog マネージドテーブルを参照してください。

クラスタリング キーを特定するために、Databricks はテーブルの履歴クエリ ワークロードを分析し、最適な候補列を特定します。 クラスタリング キーは、データ スキップの改善による予測されるコスト削減がデータのクラスタリング コストを上回る場合に変更されます。

データのクエリ方法が時間の経過と共に変化したり、クエリのパフォーマンスがデータ分布の変化を示している場合、自動リキッドクラスタリングはパフォーマンスを最適化するために新しいキーを選択します。

注:

リキッドクラスタリングをサポートするすべての Databricks Runtime バージョンから、自動クラスタリングが有効になっているテーブルの読み取りまたは書き込みが可能ですが、インテリジェントなキー選択は、 Databricks Runtime 15.4 LTSで導入されたメタデータに依存します。 Databricks Runtime 15.4 LTS 以降を使用して、自動的に選択されたキーがすべてのワークロードにメリットをもたらし、新しいキーを選択するときにこれらのワークロードが考慮されるようにします。

自動クラスタリングを有効または無効にする

自動リキッドクラスタリングが有効になっている新しいテーブルを作成するには、次の構文を使用します。

CREATE OR REPLACE TABLE table_name CLUSTER BY AUTO;

次の例に示すように、以前に手動でキーが指定されていたテーブルを含む、既存のテーブルで自動リキッドクラスタリングを有効にすることもできます。

ALTER TABLE table_name CLUSTER BY AUTO;

また、自動リキッドクラスタリングが有効になっているテーブルを変更して、手動で指定したキーを使用することもできます。

注:

clusterByAuto プロパティは、自動リキッドクラスタリングを有効にすると true に設定されます。clusteringColumns プロパティは、自動キー選択によって選択された現在のクラスタリング列を示します。DESCRIBE EXTENDED table_name を実行すると、テーブルのプロパティの完全な一覧が表示されます。

デフォルトの機能有効化設定を上書きする(オプション)

リキッドクラスタリングが有効な場合にDeltaテーブル機能を有効にするデフォルトの動作を上書きできます。これにより、それらのテーブル機能に関連するリーダーとライターのプロトコルがアップグレードされなくなります。次の手順を完了するには、既存のテーブルが必要です。

  1. ALTER TABLEを使用して、1つ以上の機能を無効にするテーブルプロパティを設定します。たとえば、削除ベクトルを無効にするには次のコマンドを使用します。

    ALTER TABLE table_name SET TBLPROPERTIES ('delta.enableDeletionVectors' = false);
    
  2. 次のコマンドを実行して、テーブルのリキッドクラスタリングを有効にします。

    ALTER TABLE <table_name>
    CLUSTER BY (<clustering_columns>)
    

次の表は、上書きできるDelta機能と、有効化設定がDatabricks Runtimeバージョンとの互換性にどのように影響するかについての情報を示しています。

Delta機能

Runtimeの互換性

有効化設定を上書きするためのプロパティ

無効化によるリキッドクラスタリングへの影響

削除ベクトル

読み取りと書き込みには、Databricks Runtime 12.2 LTS以降が必要です。

'delta.enableDeletionVectors' = false

行レベルの同時実行が無効になるため、トランザクションとクラスタリング操作が競合しやすくなります。「行レベルの同時実行との書き込みの競合」を参照してください。

DELETEMERGEUPDATEコマンドの実行速度が遅くなる可能性があります。

行追跡

書き込みにはDatabricks Runtime 13.3 LTS以降が必要です。読み取りはどのDatabricks Runtimeバージョンからでも可能です。

'delta.enableRowTracking' = false

行レベルの同時実行が無効になるため、トランザクションとクラスタリング操作が競合しやすくなります。「行レベルの同時実行との書き込みの競合」を参照してください。

チェックポイントV2

読み取りと書き込みには、Databricks Runtime 13.3 LTS 以上が必要です。

'delta.checkpointPolicy' = 'classic'

リキッドクラスタリングの動作には影響ありません。

クラスタリングキーの選択

Databricks では、サポートされているテーブルに対して自動リキッドクラスタリングを推奨しています。 自動リキッドクラスタリングを参照してください。

Databricks では、クエリ フィルターで最も頻繁に使用される列に基づいてクラスタリング キーを選択することをお勧めします。 クラスタリング・キーは、任意の順序で定義できます。 2 つの列の相関性が高い場合は、そのうちの 1 つだけをクラスタリング キーとして含める必要があります。

最大 4 つのクラスタリングキーを指定できます。 小規模なテーブル (10 TB 未満) では、クラスタリングキーの数が多い (4 つなど) と、1 つのカラムでフィルタリングするときのパフォーマンスが、より少ないクラスタリングキー (2 つなど) と比較して低下する可能性があります。 ただし、テーブル・サイズが大きくなると、単一カラム・クエリにより多くのクラスタリング・キーを使用した場合のパフォーマンスの違いはごくわずかになります。

指定できるのは、統計がクラスタリング・キーとして収集される列のみです。 デフォルトでは、 Delta テーブルの最初の32列に統計が収集されます。 Delta 統計カラムの指定を参照してください。

クラスタリングでは、クラスタリングキーとして次のデータ型がサポートされています。

  • Date

  • Timestamp

  • TimestampNTZ(Databricks Runtime 14.3 LTS以降が必要)

  • String

  • Integer

  • Long

  • Short

  • Float

  • Double

  • Decimal

  • Byte

既存のテーブルを変換する場合は、以下の推奨事項を考慮してください。

現在のデータ最適化手法

クラスタリングキーに関する推奨事項

Hiveスタイルのパーティション分割

パーティション列をクラスタリングキーとして使用します。

Z-orderのインデックス作成

ZORDER BY列をクラスタリングキーとして使用します。

Hiveスタイルのパーティション分割とZ-order

パーティション列とZORDER BY列の両方をクラスタリングキーとして使用します。

カーディナリティを減らすための生成済み列(タイムスタンプの日付など)

元の列をクラスタリングキーとして使用し、生成済み列を作成しないでください。

クラスター化されたテーブルへのデータの書き込み

リキッドクラスタリングで使用されるすべてのDelta書き込みプロトコルテーブル機能をサポートするDeltaライタークライアントを使用する必要があります。Databricksでは、Databricks Runtime 13.3 LTS以降を使用する必要があります。

書き込み時にクラスタリングが行われる操作には、次のものがあります。

  • INSERT INTO オペレーション

  • CTAS およびRTASステートメント

  • COPY INTO (Parquet由来)

  • spark.write.mode("append")

構造化ストリーミングによる書き込みでは、書き込み時にクラスタリングがトリガーされることはありません。追加の制限が適用されます。「制限事項」を参照してください。

書き込み時のクラスタリングは、トランザクション内のデータがサイズしきい値を超えたときにのみトリガーされます。これらのしきい値はクラスタリング列の数によって異なり、Unity Catalogマネージドテーブルでは他のDeltaテーブルよりも低くなります。

クラスタリング列の数

Unity Catalogマネージドテーブルのしきい値サイズ

他のDeltaテーブルのしきい値サイズ

1

64 MB

256 MB

2

256 MB

1 GB

3

512 MB

2 GB

4

1 GB

4 GB

すべての操作にリキッドクラスタリングが適用されるわけではないため、すべてのデータが効率的にクラスタリングされるよう、 OPTIMIZEを頻繁に実行することをお勧めします。

クラスタリングをトリガーする方法

予測的最適化は、予測的最適化が有効になっているテーブルに対してOPTIMIZEコマンドを自動的に実行する機能です。「Unity Catalog管理テーブルの予測的最適化」を参照してください。

クラスタリングをトリガーするには、Databricks Runtime 13.3 LTS以降を使用する必要があります。次の例のように、テーブルに対してOPTIMIZEコマンドを使用します。

OPTIMIZE table_name;

リキッドクラスタリングはインクリメンタルであり、クラスタリングする必要があるデータに対応するために、必要な場合にのみデータが書き換えられます。クラスター化されるデータと一致しないクラスタリングキーを持つデータファイルは書き換えられません。

最適なパフォーマンスを得るために、Databricksでは、通常のOPTIMIZEジョブをクラスターデータにスケジュールすることをお勧めしています。また、多くの更新または挿入が発生しているテーブルの場合、Databricksでは、1時間または2時間ごとにOPTIMIZEジョブをスケジュールすることをお勧めしています。リキッドクラスタリングは増分であるため、クラスター化されたテーブルのほとんどのOPTIMIZEジョブは迅速に実行されます。

すべてのレコードの再クラスタリングを強制する

Databricks Runtime 16.0 以降では、次の構文を使用して、テーブル内のすべてのレコードを強制的に再クラスタリングできます。

OPTIMIZE table_name FULL;

重要

OPTIMIZE FULLを実行すると、必要に応じてすべての既存のデータが再クラスター化されます。指定したキーで以前にクラスター化されていない大きなテーブルの場合、この操作には数時間かかることがあります。

クラスタリングを初めて有効にするとき、またはクラスタリング キーを変更するときに OPTIMIZE FULL を実行します。 以前に OPTIMIZE FULL を実行したことがあり、クラスタリング キーに変更がない場合は、OPTIMIZEと同じようにOPTIMIZE FULL実行できます。常に OPTIMIZE FULL を使用して、データ レイアウトが現在のクラスタリング キーを反映していることを確認します。

クラスター化されたテーブルからデータを読み取る

クラスター化されたテーブルのデータは、削除ベクトルの読み取りをサポートする任意のDelta Lakeクライアントを使用して読み取ることができます。最適なクエリー結果を得るには、次の例のように、クエリーフィルターにクラスタリングキーを含めます。

SELECT * FROM table_name WHERE cluster_key_column_name = "some_value";

クラスタリングキーの変更

次の例のように、ALTER TABLEコマンドを実行すると、いつでもテーブルのクラスタリングキーを変更できます。

ALTER TABLE table_name CLUSTER BY (new_column1, new_column2);

クラスタリングキーを変更すると、後続のOPTIMIZE操作と書き込み操作では新しいクラスタリングアプローチが使用されますが、既存のデータは書き換えられません。

次の例のように、キーをNONEに設定してクラスタリングをオフにすることもできます。

ALTER TABLE table_name CLUSTER BY NONE;

クラスターキーをNONEに設定した場合、既にクラスター化されたデータは書き換わりませんが、その後のOPTIMIZE操作でクラスタリングキーが使用されなくなります。

テーブルのクラスター化方法を確認する

次の例のように、DESCRIBEコマンドを使用してテーブルのクラスタリングキーを確認できます。

DESCRIBE TABLE table_name;

DESCRIBE DETAIL table_name;

リキッドクラスタリングが有効なテーブルの互換性

Databricks Runtime 14.1以降でリキッドクラスタリングを有効にして作成されたテーブルでは、デフォルトでv2チェックポイントが使用されます。Databricks Runtime 13.3 LTS以降では、v2チェックポイントを使用してテーブルを読み書きできます。

Databricks Runtime 12.2 LTS以降では、v2チェックポイントを無効にし、テーブルプロトコルをダウングレードすることで、リキッドクラスタリングが有効なテーブルを読み取ることができます。「Deltaテーブル機能の削除」を参照してください。

制限事項

次の制限があります。

  • Databricks Runtime 15.1以前において、書き込み時のクラスタリングでは、フィルター、結合、または集計を含むソースクエリがサポートされません。

  • 構造化ストリーミングワークロードはクラスタリングオンライトをサポートしていません。

  • Databricks Runtime 15.4 LTS 以前では、構造化ストリーミング書き込みを使用してリキッドクラスタリングが有効になっているテーブルを作成することはできません。構造化ストリーミングを使用して、リキッドクラスタリングが有効になっている既存のテーブルにデータを書き込むことができます。