O que é transmissão stateful?
Uma consulta de transmissão estruturada com estado requer atualizações incrementais para informações de estado intermediário, enquanto uma consulta de transmissão estruturada sem estado rastreia apenas informações sobre quais linhas foram processadas da origem para o destino.
As operações stateful incluem agregação de transmissão, transmissão dropDuplicates
, união de transmissão-transmissão e aplicativos stateful personalizados.
As informações de estado intermediárias necessárias para consultas de transmissão estruturada com estado podem levar a latência inesperada e problemas de produção se forem mal configuradas.
Em Databricks Runtime 13.3 LTS e acima, o senhor pode ativar o checkpointing de changelog com RocksDB para reduzir a duração do checkpoint e a latência de ponta a ponta para cargas de trabalho de transmissão estruturada. Databricks recomenda habilitar o checkpointing do changelog para todas as consultas stateful de transmissão estruturada. Consulte Habilitar o checkpoint do registro de alterações.
Otimize queryestruturada transmitida com estado
O gerenciamento da informação de estado intermediário da query estruturada de transmissão de estado pode ajudar a evitar latência inesperada e problemas de produção.
Databricks recomenda:
Use instâncias otimizadas computecomo worker.
Defina o número de partições embaralhadas para 1-2 vezes o número de núcleos no clustering.
Defina a configuração
spark.sql.streaming.noDataMicroBatches.enabled
comofalse
na SparkSession. Isso impede que o mecanismo de transmissão de microlotes processe microlotes que não contenham dados. Observe também que definir essa configuração comofalse
pode resultar em operações com estado que usam marcas d'água ou tempos limite de processamento para não obter saída de dados até que novos dados cheguem, em vez de imediatamente.
Databricks recomenda usar RocksDB com checkpoint de changelog para gerenciar o estado para transmissão com estado. Consulte Configure RocksDB armazenamento do estado em Databricks.
Observação
O esquema de gerenciamento de estado não pode ser alterado entre as reinicializações da consulta. Se uma consulta tiver sido iniciada com o gerenciamento default, o senhor deverá reiniciá-la do zero com um novo local de ponto de verificação para alterar o armazenamento do estado.
Trabalhe com múltiplos operadores stateful em transmissão estruturada
Em Databricks Runtime 13.3 LTS e acima, Databricks oferece suporte avançado para operadores stateful em cargas de trabalho de transmissão estruturada. Agora é possível encadear vários operadores com estado, o que significa que o senhor pode alimentar a saída de uma operação, como uma agregação com janela, para outra operação com estado, como um join.
Em Databricks Runtime 16.2 e acima, o senhor pode usar transformWithState
em cargas de trabalho com vários operadores stateful. Consulte Criar um aplicativo personalizado com estado.
Os exemplos a seguir demonstram vários padrões que você pode usar.
Importante
As seguintes limitações existem ao trabalhar com vários operadores com estado:
Operadores de estado personalizados legados (
FlatMapGroupWithState
eapplyInPandasWithState
) não são suportados.Apenas o modo de saída de acréscimo é suportado.
Agregação de janela de tempo encadeada
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
window(window_time(windowedCounts.window), "1 hour"),
windowedCounts.word
).count()
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
window($"window", "1 hour"),
$"word"
).count()
Agregação de janela de tempo em duas transmissões diferentes seguida de junção de janela de transmissão-transmissão
clicksWindow = clicksWithWatermark.groupBy(
clicksWithWatermark.clickAdId,
window(clicksWithWatermark.clickTime, "1 hour")
).count()
impressionsWindow = impressionsWithWatermark.groupBy(
impressionsWithWatermark.impressionAdId,
window(impressionsWithWatermark.impressionTime, "1 hour")
).count()
clicksWindow.join(impressionsWindow, "window", "inner")
val clicksWindow = clicksWithWatermark
.groupBy(window("clickTime", "1 hour"))
.count()
val impressionsWindow = impressionsWithWatermark
.groupBy(window("impressionTime", "1 hour"))
.count()
clicksWindow.join(impressionsWindow, "window", "inner")
junção de intervalo de tempo de transmissão-transmissão seguida de agregação de janela de tempo
joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined.groupBy(
joined.clickAdId,
window(joined.clickTime, "1 hour")
).count()
val joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined
.groupBy($"clickAdId", window($"clickTime", "1 hour"))
.count()
Rebalanceamento de estado para transmissão estruturada
O rebalanceamento de estado é ativado por default para todas as cargas de trabalho de transmissão em Delta Live Tables. Em Databricks Runtime 11.3 LTS e acima, o senhor pode definir a seguinte opção de configuração na configuração Spark cluster para ativar o rebalanceamento de estado:
spark.sql.streaming.statefulOperator.stateRebalancing.enabled true
Rebalanceamento de estado beneficia pipelines de transmissão stateful estruturados que passam por eventos de redimensionamento clusters . As operações de transmissão sem estado não se beneficiam, independentemente da mudança de tamanho clusters .
Observação
O dimensionamento automático de computação tem limitações ao reduzir o tamanho do cluster para cargas de trabalho de streaming estruturado. A Databricks recomenda usar Delta Live Tables com Autoscale aprimorado para cargas de trabalho de streaming. Consulte Otimize a utilização do cluster dos pipelines do Delta Live Tables com escalonamento automático aprimorado.
Os eventos de redimensionamento de clustering acionam o reequilíbrio de estado. Os microlotes podem ter maior latência durante os eventos de rebalanceamento, pois o estado é carregado do armazenamento em nuvem para o novo executor.