ステートフルストリーミングとは?

ステートフルな構造化ストリーミング クエリでは、中間状態情報の増分更新が必要ですが、ステートレスな構造化ストリーミング クエリでは、ソースからシンクに処理された行に関する情報のみを追跡します。

ステートフル操作には、ストリーミング アグリゲーション、ストリーミング dropDuplicates、ストリーム ストリーム結合、カスタム ステートフル アプリケーションが含まれます。

ステートフルな構造化ストリーミング クエリに必要な中間状態情報の構成を誤ると、予期しない待機時間や本番運用の問題が発生する可能性があります。

Databricks Runtime 13.3 LTS 以降では、RocksDB を使用して変更ログのチェックポイント処理を有効にし、構造化ストリーミング ワークロードのチェックポイントの期間とエンドツーエンドの待機時間を短縮できます。 Databricks では、すべての構造化ストリーミング ステートフル クエリに対して changelog のチェックポイント設定を有効にすることをお勧めします。 「変更ログのチェックポイント設定を有効にする」を参照してください。

ステートフルな構造化ストリーミングの最適化クエリー

ステートフルな構造化ストリーミング クエリーの中間状態情報を管理すると、予期しない待機時間や運用上の問題を防ぐのに役立ちます。

Databricks お勧めします:

  • コンピュート最適化インスタンスをワーカーとして使用します。

  • シャッフル パーティションの数を、クラスタリングのコア数の 1 倍から 2 倍に設定します。

  • SparkSession で spark.sql.streaming.noDataMicroBatches.enabled 構成を false に設定します。 これにより、ストリーミング マイクロバッチ エンジンは、データを含まないマイクロバッチを処理できなくなります。 また、この構成を false に設定すると、ウォーターマークを使用したステートフル操作や、新しいデータが到着するまでデータ出力を取得しない処理時間タイムアウトが発生する可能性があることにも注意してください。

Databricks では、変更ログ チェックポイントで RocksDB を使用して、ステートフル ストリームの状態を管理することをお勧めします。 Databricksでの RocksDB 状態ストアの構成を参照してください。

状態管理スキームは、クエリの再起動間で変更できません。 クエリがデフォルト管理で開始された場合、状態ストアを変更するには、新しいチェックポイントの場所を使用してクエリを最初から再開する必要があります。

構造化ストリーミングでの複数のステートフル演算子の操作

Databricks Runtime 13.3 LTS 以降では、Databricks は構造化ストリーミング ワークロードのステートフル演算子の高度なサポートを提供します。 複数のステートフルな演算子を連結できるようになったため、ウィンドウ集計などの操作の出力を、結合などの別のステートフルな操作にフィードできるようになりました。

Databricks Runtime 16.2 以降では、複数のステートフル演算子を持つワークロードで transformWithState を使用できます。 「カスタム ステートフル アプリケーションの構築」を参照してください。

次の例は、使用できるいくつかのパターンを示しています。

重要

複数のステートフル演算子を使用する場合は、次の制限があります。

  • 従来のカスタム ステートフル演算子 (FlatMapGroupWithStateapplyInPandasWithState はサポートされていません。

  • 追加出力モードのみがサポートされています。

チェーンされたタイム ウィンドウ集約

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()
import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

2つの異なるストリームでの時間枠集約とそれに続くストリーム-ストリームウィンドウ結合

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")
val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

ストリームとストリームの時間間隔の結合とそれに続くタイム ウィンドウの集計

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()
val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

構造化ストリーミングの状態の再調整

Delta Live Tables のすべてのストリーミング ワークロードでは、状態の再バランス調整がデフォルトで有効になっています。 Databricks Runtime 11.3 LTS 以降では、Spark クラスター構成で次の構成オプションを設定して、状態の再調整を有効にすることができます。

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

状態の再調整は、クラスターのサイズ変更イベントが発生するステートフルな構造化ストリーミングパイプラインの利点です。 ステートレス ストリーミング操作は、クラスター サイズの変更に関係なく、メリットはありません。

コンピュートのオートスケーリングは、構造化ストリーミングワークロードのクラスターサイズのスケールダウンに対して制限があります。Databricksでは、ストリーミングワークロードの強化オートスケーリングでDelta Live Tablesを使用することを推奨しています。「強化オートスケールを使用してDelta Live Tablesパイプラインのクラスター使用率を最適化する」を参照してください。

クラスタリング サイズ変更イベントは、状態の再調整をトリガーします。 マイクロバッチは、状態がクラウドストレージから新しいエグゼキューターにロードされるため、リバランスイベント中のレイテンシーが高くなる可能性があります。