Delta Live Tables パイプラインの監視
この記事では、 Delta Live Tables パイプラインの組み込み モニタリング機能と可観測性機能の使用について説明します。 これらの機能は、次のようなタスクをサポートします。
パイプラインの更新の進行状況とステータスを監視します。 「UI で使用できるパイプラインの詳細」を参照してください。
パイプライン更新の成功や失敗などのパイプライン イベントに関するアラート。 「パイプラインイベントのEメール通知の追加」を参照してください。
Viewing メトリクス for ストリーミング ソース like Apache Kafka and Auto Loader (Public Preview). 「ストリーミングメトリクスの表示」を参照してください。
データリネージ、データ品質メトリクス、リソース使用量などのパイプライン更新に関する詳細な情報を抽出します。 「Delta Live Tables イベント ログとは」を参照してください。
特定のイベントが発生したときに実行するカスタムアクションの定義イベントフックを使用したDelta Live Tablesパイプラインのカスタムモニタリングの定義を参照してください。
クエリのパフォーマンスを検査および診断するには、「 Delta Live Tables パイプラインのクエリ履歴へのアクセス」を参照してください。 この機能はパブリック プレビュー段階です。
パイプラインイベントのEメール 通知を追加する
1 つ以上の Eメール アドレスを設定して、次の場合に通知を受け取ることができます。
パイプラインの更新が正常に完了しました。
パイプラインの更新が失敗し、再試行可能または再試行不可能なエラーが発生します。 このオプションを選択すると、すべてのパイプラインの失敗に関する通知を受け取ります。
パイプラインの更新が再試行不可 (致命的) エラーで失敗します。 このオプションを選択すると、再試行できないエラーが発生した場合にのみ通知を受け取ります。
1 つのデータ フローが失敗します。
パイプライン を作成 または編集するときに Eメール 通知を設定するには:
[ 通知の追加] をクリックします。
通知を受け取る 1 つ以上の Eメール アドレスを入力します。
各通知タイプのチェックボックスをクリックして、設定したEメールアドレスに送信します。
[ 通知の追加] をクリックします。
UI ではどのようなパイプラインの詳細を使用できますか?
パイプライン グラフは、パイプラインの更新が正常に開始されるとすぐに表示されます。 矢印は、パイプライン内のデータセット間の依存関係を表します。 デフォルトでは、パイプラインの詳細ページにはテーブルの最新の更新が表示されますが、ドロップダウンメニューから古い更新を選択することもできます。
詳細には、パイプライン ID、ソース コード、コンピュート コスト、製品エディション、およびパイプライン用に構成されたチャンネルが含まれます。
データセットを表形式で表示するには、[ リスト ] タブをクリックします。 リストビューでは、パイプライン内のすべてのデータセットをテーブルの行として表示でき、パイプラインの DAG が大きすぎてグラフビューで視覚化できない場合に便利です。テーブルに表示されるデータセットは、データセット名、タイプ、ステータスなどの複数のフィルターを使用して制御できます。 DAG ビジュアリゼーションに戻るには、[ グラフ] をクリックします。
[別のユーザーとして実行] ユーザーはパイプラインの所有者であり、パイプラインの更新はこのユーザーのアクセス許可で実行されます。run as
ユーザーを変更するには、[アクセス許可] をクリックし、パイプラインの所有者を変更します。
ストリーミングメトリクスの表示
プレビュー
Delta Live Tables のストリーミング監視は 、パブリック プレビュー段階です。
パイプライン内の各ストリーミングフローについて、Spark 構造化ストリーミングでサポートされているデータソースApacheKafka ( 、AmazonKinesis 、Auto Loader テーブル、Delta テーブルなど) からストリーミング メトリクスを表示できます。Delta Live Tablesメトリクスは、 Delta Live Tables UI の右ペインにグラフとして表示され、バックログ秒数、バックログバイト数、バックログレコード、バックログファイルが含まれます。 グラフには分単位で集計された最大値が表示され、グラフにカーソルを合わせるとツールチップに最大値が表示されます。 データは、現在の時刻から過去 48 時間に制限されます。
ストリーミングメトリクスが利用可能なパイプライン内のテーブルでは、UI Graph ビューでパイプライン DAG を表示すると、 アイコンが表示されます。ストリーミング メトリクスを表示するには、
をクリックして、右ペインの [フロー ] タブにストリーミング メトリクス チャートを表示します。 また、ストリーミング メトリクスのあるテーブルのみを表示するフィルターを適用するには、[ List ] をクリックし、[ Has ストリーミング メトリクス] をクリックします。
各ストリーミング ソースは、特定のメトリクスのみをサポートします。 ストリーミングソースでサポートされていないメトリクスは、UIで表示できません。 次の表は、サポートされているストリーミング ソースで使用できるメトリクスを示しています。
ソース |
バックログバイト |
バックログ レコード |
バックログ秒数 |
バックログ ファイル |
---|---|---|---|---|
Kafka |
✓ |
✓ |
||
Kinesis |
✓ |
✓ |
||
Delta |
✓ |
✓ |
||
Auto Loader |
✓ |
✓ |
||
Google Pub/Sub |
✓ |
✓ |
Delta Live Tables イベントログとは何ですか?
Delta Live Tables イベント ログには、監査ログ、データ品質チェック、パイプラインの進行状況、データリネージなど、パイプラインに関連するすべての情報が含まれています。 イベント ログを使用して、データパイプラインの状態を追跡、理解、および監視できます。
イベント ログのエントリは、Delta Live Tables ユーザー インターフェイス、Delta Live Tables API で表示するか、イベント ログを直接クエリすることで表示できます。 このセクションでは、イベント ログを直接クエリする方法に焦点を当てます。
また、イベントがログに記録されたときに実行するカスタムアクション (アラートの送信など) を イベントフックとともに定義することもできます。
重要
イベント ログ、またはイベント ログが公開されている親カタログまたはスキーマは削除しないでください。 イベント ログを削除すると、今後の実行中にパイプラインの更新が失敗する可能性があります。
イベントログスキーマ
次の表では、イベント ログのスキーマについて説明します。 これらのフィールドの一部には、 details
フィールドなど、クエリーを実行するために解析が必要な JSON データが含まれています。 Databricks では、JSON フィールドを解析するための :
演算子がサポートされています。 : (コロン記号) 演算子を参照してください。
畑 |
説明 |
---|---|
|
イベント ログ レコードの一意の識別子。 |
|
イベントを識別して順序付けるためのメタデータを含む JSON ドキュメント。 |
|
イベントのオリジンのメタデータ (クラウドプロバイダー、クラウドプロバイダーリージョン、 |
|
イベントが記録された時刻。 |
|
イベントを説明する人間が判読できるメッセージ。 |
|
イベントの種類 (たとえば、 |
|
イベント スキーマの安定性。 可能な値は次のとおりです。
|
|
エラーが発生した場合は、エラーを説明する詳細。 |
|
イベントの構造化された詳細を含む JSON ドキュメント。 これは、イベントの分析に使用されるプライマリ フィールドです。 |
|
イベントの種類。 |
イベント ログのクエリ
注
このセクションでは、 Unity Catalog とデフォルトの公開モードで構成されたパイプラインのイベント ログを操作するためのデフォルトの動作と構文について説明します。
レガシ発行モードを使用する Unity Catalog パイプラインの動作については、「 Unity Catalog レガシ パイプラインのイベント ログの操作」を参照してください。
Hive metastore パイプラインの動作と構文については、「Hive metastore パイプラインのイベント ログの操作」を参照してください。
デフォルトによって、 Delta Live Tables は、パイプライン用に構成されたデフォルト カタログとスキーマの非表示の Delta テーブルにイベント ログを書き込みます。 非表示になっている間も、十分に権限のあるすべてのユーザーがテーブルをクエリできます。 デフォルトでは、パイプラインの所有者のみがイベント ログ テーブルをクエリできます。
デフォルトでは、非表示のイベントログの名前は event_log_{pipeline_id}
という形式で、パイプライン ID はシステムによって割り当てられた UUID で、ダッシュはアンダースコアに置き換えられています。
JSON 構成を操作して、イベント ログを公開できます。 イベント ログを発行するときは、イベント ログの名前を指定し、次の例のようにカタログとスキーマをオプションで指定できます。
{
"id": "ec2a0ff4-d2a5-4c8c-bf1d-d9f12f10e749",
"name": "billing_pipeline",
"event_log": {
"catalog": "catalog_name",
"schema": "schema_name",
"name": "event_log_table_name"
}
}
イベント ログの場所は、パイプライン内の Auto Loader クエリのスキーマの場所としても機能します。 Databricks 、権限を変更する前にイベント ログ テーブルに対してビューを作成することをお勧めします。これは、イベント ログ テーブルが直接共有されている場合、一部のコンピュート設定でユーザーがスキーマ メタデータにアクセスできる可能性があるためです。 次の構文例では、イベント・ログ・テーブルにビューを作成します。
CREATE VIEW event_log_view
AS SELECT * FROM catalog_name.schema_name.event_log_table_name;
Unity Catalog では、ビューはストリーミング クエリをサポートします。 次の例では、構造化ストリーミングを使用して、イベント ログ テーブルの上に定義されたビューをクエリします。
df = spark.readStream.table("event_log_view")
イベントログのリネージ情報のクエリ
リネージに関する情報を含むイベントのイベント・タイプは flow_definition
です。 details:flow_definition
オブジェクトには、グラフ内の各リレーションシップを定義する output_dataset
と input_datasets
が含まれています。
次のクエリーを使用して入力データセットと出力データセットを抽出し、系列情報を表示できます。
SELECT
details:flow_definition.output_dataset as output_dataset,
details:flow_definition.input_datasets as input_dataset
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_definition'
AND
origin.update_id = latest_update.id
|
|
---|---|
|
|
|
|
|
|
|
|
イベントログからのデータ品質のクエリー
パイプライン内のデータセットに対するエクスペクテーションを定義すると、データ品質メトリクスは details:flow_progress.data_quality.expectations
オブジェクトに保存されます。 データ品質に関する情報を含むイベントのイベント・タイプは flow_progress
です。 次の例では、最後のパイプライン更新のデータ品質メトリクスをクエリーします。
SELECT
row_expectations.dataset as dataset,
row_expectations.name as expectation,
SUM(row_expectations.passed_records) as passing_records,
SUM(row_expectations.failed_records) as failing_records
FROM
(
SELECT
explode(
from_json(
details :flow_progress :data_quality :expectations,
"array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
)
) row_expectations
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_progress'
AND origin.update_id = latest_update.id
)
GROUP BY
row_expectations.dataset,
row_expectations.name
|
|
|
|
---|---|---|---|
|
|
4083 |
0 |
イベントログを照会してデータバックログを監視する
Delta Live Tables は、 details:flow_progress.metrics.backlog_bytes
オブジェクトのバックログに存在するデータの量を追跡します。 バックログメトリクスを含むイベントのイベントタイプは flow_progress
です。 次の例は、最後のパイプライン更新のクエリー バックログ メトリクスです。
SELECT
timestamp,
Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
event_log_raw,
latest_update
WHERE
event_type ='flow_progress'
AND
origin.update_id = latest_update.id
注
バックログ メトリクスは、パイプラインの Databricks Runtime の種類とバージョンによっては使用できない場合があります。
サーバレスが有効になっていないパイプラインのイベントログから強化オートスケールイベントを監視する
サーバレス コンピュートを使用しない DLT パイプラインの場合、パイプラインで強化オートスケールが有効になっていると、イベント ログにクラスターのサイズ変更が記録されます。 enhanced オートスケールに関する情報を含むイベントのイベントタイプは autoscale
です。 クラスターのサイズ変更要求情報は、 details:autoscale
オブジェクトに格納されます。 次の例では、強化オートスケール クラスターのサイズ変更要求に対して、最後のパイプライン更新を照会します。
SELECT
timestamp,
Double(
case
when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
else null
end
) as starting_num_executors,
Double(
case
when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as succeeded_num_executors,
Double(
case
when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as partially_succeeded_num_executors,
Double(
case
when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
else null
end
) as failed_num_executors
FROM
event_log_raw,
latest_update
WHERE
event_type = 'autoscale'
AND
origin.update_id = latest_update.id
コンピュートのリソース使用率の監視
cluster_resources
イベントは、クラスター内のタスクスロットの数、それらのタスクスロットの使用率、およびスケジュールを待機しているタスクの数に関するメトリクスを提供します。
強化オートスケールが有効になっている場合、 cluster_resources
イベントには、 latest_requested_num_executors
や optimal_num_executors
などのオートスケール アルゴリズムのメトリクスも含まれます。 また、イベントでは、アルゴリズムのステータスが CLUSTER_AT_DESIRED_SIZE
、 SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORS
、 BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION
などのさまざまな状態として示されます。 この情報は、オートスケールイベントと併せて表示することで、強化されたオートスケールの全体像を把握することができます。
次の例では、最後のパイプライン更新のタスクキューサイズ履歴をクエリーします。
SELECT
timestamp,
Double(details :cluster_resources.avg_num_queued_tasks) as queue_size
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
次の例では、最後のパイプライン更新の使用履歴をクエリーします。
SELECT
timestamp,
Double(details :cluster_resources.avg_task_slot_utilization) as utilization
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
次の例では、エグゼキューターのカウント履歴をクエリし、Enhanced オートスケール パイプラインでのみ使用可能なメトリクス (最新のリクエストでアルゴリズムによって要求されたエグゼキューターの数、最新のメトリクスに基づいてアルゴリズムによって推奨されるエグゼキューターの最適数、オートスケール アルゴリズムの状態など) を照会します。
SELECT
timestamp,
Double(details :cluster_resources.num_executors) as current_executors,
Double(details :cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
Double(details :cluster_resources.optimal_num_executors) as optimal_num_executors,
details :cluster_resources.state as autoscaling_state
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
Delta Live Tables パイプラインの監査
Delta Live Tables イベント ログ レコードおよびその他の Databricks 監査ログを使用して、Delta Live テーブルでデータがどのように更新されているかを完全に把握できます。
Delta Live Tables は、パイプライン所有者の資格情報を使用して更新を実行します。 パイプライン所有者を更新することで、使用する認証情報を変更できます。 Delta Live Tables は、パイプラインの作成、構成の編集、更新のトリガーなど、パイプラインでのアクションについてユーザーを記録します。
Unity Catalog 監査イベントのリファレンスについては、「Unity Catalog イベント」を参照してください。
イベントログ内のユーザーアクションのクエリ
イベント ログを使用して、ユーザー アクションなどのイベントを監査できます。 ユーザーアクションに関する情報を含むイベントのイベントタイプは user_action
です。
アクションに関する情報は、details
フィールドの user_action
オブジェクトに格納されます。次のクエリを使用して、ユーザー イベントの監査ログを作成します。 このクエリで使用する event_log_raw
ビューを作成するには、「 イベント ログのクエリ」を参照してください。
SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
|
|
|
---|---|---|
2021-05-20T19:36:03.517+0000 |
|
|
2021-05-20T19:35:59.913+0000 |
|
|
2021-05-27T00:35:51.971+0000 |
|
|