Delta Sharingオープン共有を使用して共有されたデータを読み取る(受信者向け)

February 11, 2025

この記事では、Delta Sharing オープン共有 プロトコルを使用して共有されたデータを読み取る方法について説明します。 これには、 Databricks、 Apache Spark、 Pandas、 Power BI、および Tableauを使用して共有データを読み取る手順が含まれています。

オープン共有では、データ プロバイダーによってチームのメンバーと共有された資格情報ファイルを使用して、共有データへの安全な読み取りアクセスを取得します。 資格情報が有効で、プロバイダーが引き続きデータを共有する限り、アクセスは保持されます。 プロバイダーは、資格情報の有効期限とローテーションを管理します。 データの更新はほぼリアルタイムで利用できます。 共有データの読み取りとコピーは可能ですが、ソースデータを変更することはできません。

注:

Databricks 間 Delta Sharing を使用してデータが共有されている場合、データにアクセスするために資格情報ファイルは必要なく、この記事は適用されません。 手順については、「 Databricks間Delta Sharing (受信者用) を使用して共有されたデータを読み取る」を参照してください。

次のセクションでは、 Databricks 、 Apache Spark 、 Pandas 、 Power BIを使用して資格情報ファイルを共有データにアクセスし、読み取る方法について説明します。 Delta Sharingコネクタの完全なリストとその使用方法については、 Delta Sharingオープンソース ドキュメントを参照してください。 共有データにアクセスする際に問題が発生した場合は、データ プロバイダーにお問い合わせください。

注:

パートナー統合は、特に記載がない限り、サードパーティによって提供されており、その製品およびサービスを使用するには、適切なプロバイダーのアカウントを持っている必要があります。 Databricks は、このコンテンツを最新の状態に保つために最善を尽くしていますが、パートナー統合ページの統合またはコンテンツの正確性については一切表明しません。 統合に関して、適切なプロバイダーに連絡してください。

始める前に

チームのメンバーは、データ プロバイダーによって共有される資格情報ファイルをダウンロードする必要があります。 「オープン共有モデルでアクセスを取得する」を参照してください。

そのファイルまたはファイルの場所をあなたと共有するには、安全なチャンネルを使用する必要があります。

Databricks: オープンな共有コネクタを使用して共有データを読み取る

このセクションでは、Catalog Explorer または Python ノートブックでプロバイダーの資格情報ファイルを使用してプロバイダーをインポートする方法について説明します。 ノートブックの手順では、ノートブックを使用して共有テーブルを一覧表示および読み取る方法についても説明します。 カタログエクスプローラーを使用してインポートされた共有データを読み取るには、「Databricks-to-Databricks Delta Sharingを使用して共有データを読み取る(受信者向け)」を参照してください。

注:

データ プロバイダーが Databricks 間の共有を使用していて、資格情報ファイルを共有していない場合は、 Unity Catalog を使用してデータにアクセスする必要があります。 手順については、「 Databricks間Delta Sharing (受信者用) を使用して共有されたデータを読み取る」を参照してください。

必要なアクセス許可: メタストア管理者、または Unity Catalog メタストアに対する CREATE PROVIDER 権限と USE PROVIDER 権限の両方を持つユーザー。

  1. Databricks ワークスペースで、カタログアイコン[カタログ]をクリックしてカタログエクスプローラーを開きます。

  2. [カタログ] ウィンドウの上部にある [歯車アイコン] 歯車アイコンをクリックし、[Delta Sharing] を選択します。

    または、[ クイック アクセス ] ページで [Delta Sharing > ] ボタンをクリックします。

  3. [ 自分と共有] タブで、ページの右上隅にある [ プロバイダーを直接インポート] をクリックします。

  4. ドロップダウンメニューからプロバイダーを選択するか、名前を入力します。

  5. プロバイダーから共有された資格情報ファイルをアップロードします。 さまざまなプロバイダーの具体的な手順を読むことができます。

  6. (オプション)コメントを入力します。

    プロバイダーの資格情報ファイルをプロバイダーから直接インポートします。
  7. インポート」をクリックします。

  8. 共有データからカタログを作成します。

    このプロセスは、 Databricks-to-Databricks 共有モデルと同じです。

    「共有からのカタログの作成」を参照してください。

  9. カタログへのアクセス権を付与します。

    共有データをチームで利用できるようにするにはどうすればよいですか?」を参照してください。 Delta Sharingカタログ内のスキーマ、テーブル、ボリュームのアクセス許可の管理

  10. 共有データ オブジェクトは、Unity Catalog に登録されているデータ オブジェクトと同様に読み取ります。

    詳細と例については、 共有テーブルまたはボリューム内のデータへのアクセスを参照してください。

Apache Spark: 共有データの読み取り

Spark 3.x 以降を使用して共有データにアクセスするには、次のステップに従ってください。

これらの手順は、データ・プロバイダーによって共有された資格情報ファイルにアクセスできることを前提としています。 「オープン共有モデルでアクセスを取得する」を参照してください。

Delta Sharing Python コネクタと Spark コネクタをインストールする

共有データに関連するメタデータ (共有されているテーブルのリストなど) にアクセスするには、次の操作を行います。 この例では Python を使用します。

  1. delta-sharing Python コネクタをインストールします。

    Bash
    pip install delta-sharing
    
  2. Apache Spark コネクタをインストールします。

Sparkを使用した共有テーブルの一覧表示

共有内のテーブルを一覧表示します。 次の例では、 <profile-path> を資格情報ファイルの場所に置き換えます。

Python
import delta_sharing

client = delta_sharing.SharingClient(f"<profile-path>/config.share")

client.list_all_tables()

結果は、テーブルの配列と各テーブルのメタデータです。 次の出力は、2 つのテーブルを示しています。

Console
Out[10]: [Table(name='example_table', share='example_share_0', schema='default'), Table(name='other_example_table', share='example_share_0', schema='default')]

出力が空の場合、または期待するテーブルが含まれていない場合は、データ プロバイダーにお問い合わせください。

Sparkを使用した共有データへのアクセス

以下の変数を置き換えて、次のコマンドを実行します。

  • <profile-path>: 資格情報ファイルの場所。

  • <share-name>: テーブルの share= の値。

  • <schema-name>: テーブルの schema= の値。

  • <table-name>: テーブルの name= の値。

  • <version-as-of>:オプション。 データを読み込むテーブルのバージョン。 データ プロバイダーがテーブルの履歴を共有している場合にのみ機能します。 delta-sharing-spark 0.5.0 以上が必要です。

  • <timestamp-as-of>:オプション。 指定されたタイムスタンプより前または指定されたタイムスタンプのバージョンでデータをロードします。 データ プロバイダーがテーブルの履歴を共有している場合にのみ機能します。 0.6.0 以上の delta-sharing-spark が必要です。

delta_sharing.load_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>", version=<version-as-of>)

spark.read.format("deltaSharing")\
.option("versionAsOf", <version-as-of>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")\
.limit(10))

delta_sharing.load_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>", timestamp=<timestamp-as-of>)

spark.read.format("deltaSharing")\
.option("timestampAsOf", <timestamp-as-of>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")\
.limit(10))

Sparkを使用した共有変更データ フィードへのアクセス

テーブル履歴が共有されており、ソース テーブルで変更データフィード (CDF) が有効になっている場合は、これらの変数を置き換えて次のコマンドを実行することで変更データフィードにアクセスできます。 delta-sharing-spark 0.5.0 以降が必要です。

開始点は 1 つだけ指定する必要があります。

  • <profile-path>: 資格情報ファイルの場所。

  • <share-name>: テーブルの share= の値。

  • <schema-name>: テーブルの schema= の値。

  • <table-name>: テーブルの name= の値。

  • <starting-version>:オプション。 クエリーの開始バージョン。 長整数型として指定します。

  • <ending-version>:オプション。 クエリの終了バージョン。 終了バージョンが指定されていない場合、API は最新のテーブル バージョンを使用します。

  • <starting-timestamp>:オプション。 クエリーの開始タイムスタンプは、このタイムスタンプ以上で作成されたバージョンに変換されます。 yyyy-mm-dd hh:mm:ss[.fffffffff]の形式の文字列として指定します。

  • <ending-timestamp>:オプション。 クエリーの終了タイムスタンプは、このタイムスタンプ以前に作成されたバージョンに変換されます。 次の形式の文字列として指定します。 yyyy-mm-dd hh:mm:ss[.fffffffff]

delta_sharing.load_table_changes_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_version=<starting-version>,
  ending_version=<ending-version>)

delta_sharing.load_table_changes_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_timestamp=<starting-timestamp>,
  ending_timestamp=<ending-timestamp>)

spark.read.format("deltaSharing").option("readChangeFeed", "true")\
.option("statingVersion", <starting-version>)\
.option("endingVersion", <ending-version>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

spark.read.format("deltaSharing").option("readChangeFeed", "true")\
.option("startingTimestamp", <starting-timestamp>)\
.option("endingTimestamp", <ending-timestamp>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

出力が空の場合、または期待するデータが含まれていない場合は、データ プロバイダーにお問い合わせください。

Spark 構造化ストリーミングを使用した共有テーブルへのアクセス

テーブル履歴が共有されている場合は、共有データをストリーム読み取りできます。 delta-sharing-spark 0.6.0 以降が必要です。

サポートされているオプション:

  • ignoreDeletes: データを削除するトランザクションを無視します。

  • ignoreChanges: UPDATEMERGE INTODELETE (パーティション内)、 OVERWRITEなどのデータ変更操作によりソース テーブルでファイルが書き換えられた場合は、更新を再処理します。 変更されていない行は引き続き出力できます。 したがって、ダウンストリームの消費者は重複を処理できる必要があります。 削除はダウンストリームに反映されません。 ignoreChangesignoreDeletesを包含します。 したがって、 ignoreChangesを使用すると、ソース テーブルの削除や更新によってストリームが中断されることはありません。

  • startingVersion: 開始する共有テーブルのバージョン。 このバージョン (含む) 以降のすべてのテーブルの変更は、ストリーミング ソースによって読み取られます。

  • startingTimestamp: 開始するタイムスタンプ。 タイムスタンプ(含む)以降にコミットされたすべてのテーブル変更は、ストリーミング ソースによって読み取られます。 例: "2023-01-01 00:00:00.0"

  • maxFilesPerTrigger: 各マイクロバッチで考慮される新しいファイルの数。

  • maxBytesPerTrigger: 各マイクロバッチで処理されるデータの量。 このオプションは「ソフト最大値」を設定します。つまり、バッチはほぼこの量のデータを処理し、最小の入力単位がこの制限より大きい場合にはストリーミング クエリを進めるために制限を超えて処理する場合があります。

  • readChangeFeed: 共有テーブルの変更データフィードをストリーム読み取りします。

サポートされていないオプション:

  • Trigger.availableNow

構造化ストリーミングクエリのサンプル

spark.readStream.format("deltaSharing")
.option("startingVersion", 0)
.option("ignoreChanges", true)
.option("maxFilesPerTrigger", 10)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

Databricksでのストリーミング」も参照してください。

削除ベクトルまたは列マッピングが有効になっているテーブルを読み取る

プレビュー

この機能はパブリックプレビュー段階です。

削除ベクトルは、プロバイダーが共有 Delta テーブルで有効にできるストレージ最適化機能です。 「削除とは何ですか?」を参照してください。 。

Databricks は Delta テーブルの列マッピングもサポートしています。 「Delta Lake 列マッピングを使用した列の名前変更と削除」を参照してください。

プロバイダが削除または列マッピングを有効にしてテーブルを共有している場合は、 delta-sharing-spark 3.1 以降を実行しているコンピュートを使用してテーブルを読み取ることができます。 Databricksクラスターを使用している場合は、 Databricks Runtime 14.1 以降を実行しているクラスターを使用してバッチ読み取りを実行できます。 CDF およびストリーミング クエリには、Databricks Runtime 14.2 以上が必要です。

共有テーブルのテーブル機能に基づいてresponseFormatを自動的に解決できるため、バッチ クエリをそのまま実行できます。

変更データフィード (CDF) を読み取るか、削除または列マッピングが有効になっている共有テーブルでストリーミング クエリを実行するには、追加オプションresponseFormat=deltaを設定する必要があります。

次の例は、バッチ、CDF、およびストリーミング クエリを示しています。

Scala
import org.apache.spark.sql.SparkSession

val spark = SparkSession
        .builder()
        .appName("...")
        .master("...")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .getOrCreate()

val tablePath = "<profile-file-path>#<share-name>.<schema-name>.<table-name>"

// Batch query
spark.read.format("deltaSharing").load(tablePath)

// CDF query
spark.read.format("deltaSharing")
  .option("readChangeFeed", "true")
  .option("responseFormat", "delta")
  .option("startingVersion", 1)
  .load(tablePath)

// Streaming query
spark.readStream.format("deltaSharing").option("responseFormat", "delta").load(tablePath)

Pandas: 共有データの読み取り

pandas 0.25.3以降で共有データにアクセスするには次の手順に従ってください。

これらの手順は、データ・プロバイダーによって共有された資格情報ファイルにアクセスできることを前提としています。 「オープン共有モデルでアクセスを取得する」を参照してください。

Delta Sharing Python コネクタをインストールする

共有されているテーブルの一覧など、共有データに関連するメタデータにアクセスするには、 delta-sharing Python コネクタをインストールする必要があります。

Bash
pip install delta-sharing

pandas を使用した共有テーブルの一覧表示

共有内のテーブルを一覧表示するには、 <profile-path>/config.share資格情報ファイルの場所に置き換えて、次のコマンドを実行します。

Python
import delta_sharing

client = delta_sharing.SharingClient(f"<profile-path>/config.share")

client.list_all_tables()

出力が空の場合、または期待するテーブルが含まれていない場合は、データ プロバイダーにお問い合わせください。

pandas を使用して共有データにアクセスする

を使用してPandas Pythonの共有データにアクセスするには、変数を次のように置き換えて以下を実行します。

  • <profile-path>: 資格情報ファイルの場所。

  • <share-name>: テーブルの share= の値。

  • <schema-name>: テーブルの schema= の値。

  • <table-name>: テーブルの name= の値。

Python
import delta_sharing
delta_sharing.load_as_pandas(f"<profile-path>#<share-name>.<schema-name>.<table-name>")

pandas を使用した共有チェンジデータフィードへのアクセス

実行を使用して の共有テーブルの変更データフィードにアクセスするには、次のように変数を置き換えます。PandasPythonデータプロバイダーがテーブルの変更データフィードを共有したかどうかによっては、変更データフィードを利用できない場合があります。

  • <starting-version>:オプション。 クエリーの開始バージョン。

  • <ending-version>:オプション。 クエリの終了バージョン。

  • <starting-timestamp>:オプション。 クエリーの開始タイムスタンプ。 これは、このタイムスタンプ以降に作成されたバージョンに変換されます。

  • <ending-timestamp>:オプション。 クエリーの終了タイムスタンプ。 これは、このタイムスタンプ以前に作成されたバージョンに変換されます。

Python
import delta_sharing
delta_sharing.load_table_changes_as_pandas(
  f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_version=<starting-version>,
  ending_version=<starting-version>)

delta_sharing.load_table_changes_as_pandas(
  f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_timestamp=<starting-timestamp>,
  ending_timestamp=<ending-timestamp>)

出力が空の場合、または期待するデータが含まれていない場合は、データ プロバイダーにお問い合わせください。

Power BI: 共有データの読み取り

Power BI Delta Sharing コネクタを使用すると、Delta Sharing オープン プロトコルを通じて共有されたデータセットを検出、分析、視覚化できます。

要件

Databricksに接続する

Delta Sharing コネクタを使用して Databricks に接続するには、次の手順を実行します。

  1. 共有資格情報ファイルをテキスト エディターで開き、エンドポイント URL とトークンを取得します。

  2. Power BI Desktop を開きます。

  3. [データの取得]メニューで、 Delta Sharingを検索します。

  4. コネクタを選択し、[ 接続] をクリックします。

  5. 資格情報ファイルからコピーしたエンドポイント URL をDelta Sharing Server URLフィールドに入力します。

  6. 必要に応じて、[ 詳細オプション ] タブで、ダウンロードできる行の最大数の [行制限 ] を設定します。 これは、デフォルトで 100 万行に設定されます。

  7. OK」をクリックします。

  8. Authenticationの場合は、資格情報ファイルから取得した Windows をBearer ウイルスにコピーします。

  9. 接続」をクリックします。

Power BI Delta Sharing コネクタの制限事項

Power BI Delta Sharingコネクタには次の制限があります。

  • コネクタがロードするデータは、マシンのメモリに収まる必要があります。 この要件を管理するために、コネクタは、インポートされる行の数を、Power BI Desktop の [詳細オプション] タブで設定した行制限に制限します。

Tableau: 共有データの読み取り

Tableau Delta Sharing コネクタを使用すると、Delta Sharing オープン プロトコルを通じて共有されているデータセットを検出、分析、視覚化できます。

要件

Databricksに接続する

Delta Sharing コネクタを使用して Databricks に接続するには、次の手順を実行します。

  1. Tableau Exchangeにアクセスし、指示に従って Delta Sharing Connector をダウンロードし、適切なデスクトップ フォルダーに配置します。

  2. Tableau Desktop を開きます。

  3. コネクタページで、「Delta Sharing by Databricks」を検索します。

  4. [ 共有ファイルのアップロード] を選択し、プロバイダーによって共有された資格情報ファイルを選択します。

  5. [ データを取得] をクリックします。

  6. Data Explorerでテーブルを選択します。

  7. 必要に応じて、SQL フィルターまたは行制限を追加します。

  8. テーブル・データの取得」をクリックします。

Tableau Delta Sharing コネクタの制限

Tableau Delta Sharingコネクタには次の制限があります。

  • コネクタがロードするデータは、マシンのメモリに収まる必要があります。 この要件を管理するために、コネクタはインポートされる行の数を Tableau で設定した行制限に制限します。

  • すべての列は String型として返されます。

  • SQL フィルターは、Delta Sharing サーバーがpredicateHintをサポートしている場合にのみ機能します。

新しい資格情報を要求する

資格情報のアクティブ化 URL またはダウンロードした資格情報が紛失、破損、または侵害された場合、または資格情報の有効期限が切れてもプロバイダーから新しい資格情報が送られてこない場合は、プロバイダーに連絡して新しい資格情報をリクエストしてください。