Workday レポートの取り込み

プレビュー

LakeFlow Connect はゲート付きパブリック プレビュー段階です。 プレビューに参加するには、Databricks アカウント チームにお問い合わせください。

この記事では、LakeFlow Connect を使用して Workday レポートを取り込み、Databricks にロードする方法について説明します。 結果として得られる取り込みパイプラインはUnity Catalogによって管理され、サーバレス コンピュートとDelta Live Tablesによって強化されます。

始める前に

インジェスト パイプラインを作成するには、次の要件を満たす必要があります。

  • ワークスペースは Unity Catalog に対して有効になっています。

  • サーバレス コンピュートは、ノートブック、ワークフロー、およびDelta Live Tablesで有効です。 「サーバレス コンピュートを有効にする」を参照してください。

  • 接続を作成するには: メタストアに CREATE CONNECTION があります。

    既存の接続を使用するには: 接続オブジェクトに USE CONNECTION または ALL PRIVILEGES があります。

  • USE CATALOG ターゲットカタログ上。

  • USE SCHEMA ターゲットカタログ上の既存のスキーマまたはCREATE SCHEMACREATE TABLE

インジェストのための Workday レポートの構成

インジェスト用の Workday レポートの構成」を参照してください。

Workday 接続を作成する

必要なアクセス許可: メタストア CREATE CONNECTION

Workday 接続を作成するには、次の手順を実行します。

  1. Databricksワークスペースで、 [カタログ] > [外部ロケーション] > [接続] > [接続の作成] をクリックします。

  2. [ Connection name] に、Workday 接続の一意の名前を入力します。

  3. [接続の種類] で [Workday レポート] を選択します。

  4. [Auth type ] で [OAuth 更新 トークン ] を選択し、 ソース セットアップ 中に生成した [Client ID ]、[ Client secret ]、および [更新トークン ] を入力します。

  5. [ 接続の作成 ] ページで、[ 作成] をクリックします。

インジェスト パイプラインを作成する

このステップでは、インジェスト パイプラインのセットアップ方法を説明します。 取り込まれた各テーブルは、明示的に名前を変更しない限り、宛先に同じ名前 (ただしすべて小文字) の対応するストリーミング テーブルを取得します。

このタブでは、Databricks アセット バンドル (DAB) を使用してインジェスト パイプラインをデプロイする方法について説明します。 バンドルには、ジョブとタスクの YAML 定義を含めることができ、 Databricks CLIを使用して管理され、異なるターゲット ワークスペース (開発、ステージング、本番運用など) で共有および実行できます。 詳細については、Databricksアセットバンドルを参照してください。

  1. Databricks CLI を使用して新しいバンドルを作成します。

    databricks bundle init
    
  2. バンドルに 2 つの新しいリソース ファイルを追加します。

    • パイプライン定義ファイル (resources/workday_pipeline.yml)。

    • データ取り込みの頻度を制御するワークフロー ファイル (resources/workday_job.yml)。

    次に、 resources/workday_pipeline.yml ファイルの例を示します。

    variables:
      dest_catalog:
        default: main
      dest_schema:
        default: ingest_destination_schema
    
    # The main pipeline for workday_dab
    resources:
      pipelines:
        pipeline_workday:
          name: workday_pipeline
          channel: PREVIEW
          catalog: ${var.dest_catalog}
          schema: ${var.dest_schema}
          ingestion_definition:
            connection_name: <workday-connection>
            objects:
              # An array of objects to ingest from Workday. This example
              # ingests a sample report about all active employees. The Employee_ID key is used as
              # the primary key for the report.
              - report:
                  source_url: https://wd2-impl-services1.workday.com/ccx/service/customreport2/All_Active_Employees_Data?format=json
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
                  destination_table: All_Active_Employees_Data
                  table_configuration:
                     primary_keys:
                        - Employee_ID
    

    次に、 resources/workday_job.yml ファイルの例を示します。

    resources:
      jobs:
        workday_dab_job:
          name: workday_dab_job
    
          trigger:
            # Run this job every day, exactly one day from the last run
            # See https://docs.databricks.com/api/workspace/jobs/create#trigger
            periodic:
              interval: 1
              unit: DAYS
    
          email_notifications:
            on_failure:
              - <email-address>
    
          tasks:
            - task_key: refresh_pipeline
              pipeline_task:
                pipeline_id: ${resources.pipelines.pipeline_workday.id}
    
  3. Databricks CLI を使用してパイプラインをデプロイします。

    databricks bundle deploy
    
  1. 個人アクセストークンを生成します。

  2. 次のコードを Python ノートブックのセルに貼り付けて、 <personal-access-token>値を変更します。

    # SHOULD MODIFY
    # This step sets up a PAT to make API calls to the Databricks service.
    api_token = "<personal-access-token>"
    
  3. 次のコードをノートブックの 2 番目のセルに貼り付けます。

    # DO NOT MODIFY
    # This step sets up a connection to make API calls to the Databricks service.
    import requests
    import json
    
    
    notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
    workspace_url = notebook_context.apiUrl().get()
    api_url = f"{workspace_url}/api/2.0/pipelines"
    
    
    headers = {
       'Authorization': 'Bearer {}'.format(api_token),
       'Content-Type': 'application/json'
    }
    
    
    def check_response(response):
       if response.status_code == 200:
          print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
       else:
          print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")
    
    
    # DO NOT MODIFY
    # These are API definition to be used.
    def create_pipeline(pipeline_definition: str):
    response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
    check_response(response)
    
    
    def edit_pipeline(id: str, pipeline_definition: str):
    response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
    check_response(response)
    
    
    def delete_pipeline(id: str):
    response = requests.delete(url=f"{api_url}/{id}", headers=headers)
    check_response(response)
    
    
    def get_pipeline(id: str):
    response = requests.get(url=f"{api_url}/{id}", headers=headers)
    check_response(response)
    
    
    def list_pipeline(filter: str = ""):
    body = "" if len(filter) == 0 else f"""{{"filter": "{filter} AND pipeline_type IN ('MANAGED_INGESTION')"}}"""
    response = requests.get(url=api_url, headers=headers, data=body)
    check_response(response)
    
  4. 次のコードを 3 番目のノートブック セルに貼り付け、パイプラインの仕様を反映するように変更します。

    # SHOULD MODIFY
    # Update this notebook to configure your ingestion pipeline.
    
    pipeline_spec = """
    {
    "name": "<YOUR_PIPELINE_NAME>",
    "ingestion_definition": {
       "connection_name": "<YOUR_CONNECTON_NAME>",
       "objects": [
          {
             "report": {
             "source_url": "<YOUR_REPORT_URL>",
             "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
             "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
             "destination_table": "<YOUR_DATABRICKS_TABLE>",
             "table_configuration": {
                   "primary_keys": ["<PRIMARY_KEY>"]
                }
             }
          }, {
             "report": {
             "source_url": "<YOUR_SECOND_REPORT_URL>",
             "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
             "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
             "destination_table": "<YOUR_DATABRICKS_SECOND_TABLE>",
             "table_configuration": {
                   "primary_keys": ["<PRIMARY_KEY>"],
                   "scd_type": "SCD_TYPE_2"
                }
             }
          }
       ]
    },
    "channel": "PREVIEW"
    }
    """
    
    
    create_pipeline(pipeline_spec)
    
  5. 最初のノートブックセルをあなたの個人的なアクセスアドレスで実行します。

  6. 2 番目のノートブック セルを実行します。

  7. パイプラインの詳細を使用して、3 番目のノートブック セルを実行します。 これはcreate_pipelineを実行します。

    • list_pipeline パイプライン ID とその詳細を返します。

    • edit_pipeline パイプライン定義を編集できます。

    • delete_pipeline パイプラインを削除します。

パイプラインを作成するには:

databricks pipelines create --json "<pipeline_definition OR json file path>"

パイプラインを編集するには:

databricks pipelines update --json "<<pipeline_definition OR json file path>"

パイプライン定義を取得するには:

databricks pipelines get "<your_pipeline_id>"

パイプラインを削除するには:

databricks pipelines delete "<your_pipeline_id>"

さらに詳しい情報が必要な場合は、いつでも以下を実行してください。

databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help

パイプラインを開始、スケジュール、アラートを設定する

  1. パイプラインが作成されたら、Databricks ワークスペースに戻り、 [Delta Live Tables]をクリックします。

    新しいパイプラインがパイプライン リストに表示されます。

  2. パイプラインの詳細を表示するには、パイプライン名をクリックします。

  3. パイプラインの詳細ページで、 [開始]をクリックしてパイプラインを実行します。 [スケジュール]をクリックすると、パイプラインをスケジュールできます。

  4. パイプラインにアラートを設定するには、 [スケジュール]をクリックし、 [その他のオプション]をクリックして、通知を追加します。

  5. インジェストが完了したら、テーブルに対してクエリを実行できます。