Coluna de metadados do arquivo

O senhor pode obter informações de metadados para arquivos de entrada com a coluna _metadata. A coluna _metadata é uma coluna oculta e está disponível para todos os formatos de arquivo de entrada. Para incluir a coluna _metadata no DataFrame retornado, o senhor deve selecioná-la explicitamente na consulta de leitura em que especifica a fonte.

Se a fonte de dados contiver uma coluna denominada _metadata, query retornará a coluna da fonte de dados e não os metadados do arquivo.

Aviso

Novos campos podem ser adicionados à coluna _metadata em versões futuras. Para evitar erros de evolução do esquema se a coluna _metadata for atualizada, Databricks recomenda selecionar campos específicos da coluna em sua query. Veja exemplos.

Metadados compatíveis

A coluna _metadata é um STRUCT contendo os seguintes campos:

Nome

Tipo

Descrição

Exemplo

Versão mínima do Databricks Runtime

caminho de arquivo

STRING

Caminho de arquivo do arquivo de entrada.

file:/tmp/f0.csv

10.5

nome do arquivo

STRING

Nome do arquivo de entrada junto com sua extensão.

f0.csv

10.5

tamanho do arquivo

LONG

Comprimento do arquivo de entrada, em bytes.

628

10.5

file_modification_time

TIMESTAMP

Carimbo de data/hora da última modificação do arquivo de entrada.

2021-12-20 20:05:21

10.5

file_block_start

LONG

começar offset do bloco que está sendo lido, em bytes.

0

13,0

file_block_length

LONG

Comprimento do bloco que está sendo lido, em bytes.

628

13,0

Exemplos

Use em um leitor de fonte de dados baseado em arquivo básico

df = spark.read \
  .format("csv") \
  .schema(schema) \
  .load("dbfs:/tmp/*") \
  .select("*", "_metadata")

display(df)

'''
Result:
+---------+-----+----------------------------------------------------+
|   name  | age |                 _metadata                          |
+=========+=====+====================================================+
|         |     | {                                                  |
|         |     |    "file_path": "dbfs:/tmp/f0.csv",                |
| Debbie  | 18  |    "file_name": "f0.csv",                          |
|         |     |    "file_size": 12,                                |
|         |     |    "file_block_start": 0,                          |
|         |     |    "file_block_length": 12,                        |
|         |     |    "file_modification_time": "2021-07-02 01:05:21" |
|         |     | }                                                  |
+---------+-----+----------------------------------------------------+
|         |     | {                                                  |
|         |     |    "file_path": "dbfs:/tmp/f1.csv",                |
| Frank   | 24  |    "file_name": "f1.csv",                          |
|         |     |    "file_size": 12,                                |
|         |     |    "file_block_start": 0,                          |
|         |     |    "file_block_length": 12,                        |
|         |     |    "file_modification_time": "2021-12-20 02:06:21" |
|         |     | }                                                  |
+---------+-----+----------------------------------------------------+
'''
val df = spark.read
  .format("csv")
  .schema(schema)
  .load("dbfs:/tmp/*")
  .select("*", "_metadata")

display(df_population)

/* Result:
+---------+-----+----------------------------------------------------+
|   name  | age |                 _metadata                          |
+=========+=====+====================================================+
|         |     | {                                                  |
|         |     |    "file_path": "dbfs:/tmp/f0.csv",                |
| Debbie  | 18  |    "file_name": "f0.csv",                          |
|         |     |    "file_size": 12,                                |
|         |     |    "file_block_start": 0,                          |
|         |     |    "file_block_length": 12,                        |
|         |     |    "file_modification_time": "2021-07-02 01:05:21" |
|         |     | }                                                  |
+---------+-----+----------------------------------------------------+
|         |     | {                                                  |
|         |     |    "file_path": "dbfs:/tmp/f1.csv",                |
| Frank   | 24  |    "file_name": "f1.csv",                          |
|         |     |    "file_size": 10,                                |
|         |     |    "file_block_start": 0,                          |
|         |     |    "file_block_length": 12,                        |
|         |     |    "file_modification_time": "2021-12-20 02:06:21" |
|         |     | }                                                  |
+---------+-----+----------------------------------------------------+
*/

Selecione campos específicos

spark.read \
  .format("csv") \
  .schema(schema) \
  .load("dbfs:/tmp/*") \
  .select("_metadata.file_name", "_metadata.file_size")
spark.read
  .format("csv")
  .schema(schema)
  .load("dbfs:/tmp/*")
  .select("_metadata.file_name", "_metadata.file_size")

Usar em filtros

spark.read \
  .format("csv") \
  .schema(schema) \
  .load("dbfs:/tmp/*") \
  .select("*") \
  .filter(col("_metadata.file_name") == lit("test.csv"))
spark.read
  .format("csv")
  .schema(schema)
  .load("dbfs:/tmp/*")
  .select("*")
  .filter(col("_metadata.file_name") === lit("test.csv"))

Uso em COPY INTO (legado)

COPY INTO my_delta_table
FROM (
  SELECT *, _metadata FROM 's3://my-bucket/csvData'
)
FILEFORMAT = CSV

Usar no Auto Loader

Se seus dados de origem contiverem uma coluna chamada _metadata, renomeie-a para source_metadata. Se você não renomeá-la, não poderá acessar a coluna de metadados do arquivo na tabela de destino; em vez disso, as consultas retornarão a coluna de origem.

spark.readStream \
  .format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .schema(schema) \
  .load("s3://my-bucket/csvData") \
  .selectExpr("*", "_metadata as source_metadata") \
  .writeStream \
  .option("checkpointLocation", checkpointLocation) \
  .start(targetTable)
spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .schema(schema)
  .load("s3://my-bucket/csvData")
  .selectExpr("*", "_metadata as source_metadata")
  .writeStream
  .option("checkpointLocation", checkpointLocation)
  .start(targetTable)

Se o senhor usar foreachBatch e quiser incluir a coluna de metadados do arquivo na transmissão DataFrame, deverá fazer referência a ela na transmissão read DataFrame antes da função foreachBatch. Se você fizer referência apenas à coluna de metadados do arquivo dentro da função foreachBatch, a coluna não será incluída.

spark.readStream \
  .format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .load("s3://my-bucket/csvData") \
  .select("*", "metadata") \
  .writeStream \
  .foreachBatch(...)
spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .load("s3://my-bucket/csvData")
  .select("*", "metadata")
  .writeStream
  .foreachBatch(...)