PySpark Read.Parquet()

Pyspark Read Parquet



No PySpark, a função write.parquet() grava o DataFrame no arquivo parquet e read.parquet() lê o arquivo parquet no PySpark DataFrame ou qualquer outro DataSource. Para processar as colunas no Apache Spark de forma rápida e eficiente, precisamos compactar os dados. A compactação de dados economiza nossa memória e todas as colunas são convertidas em nível plano. Isso significa que o armazenamento em nível de coluna simples existe. O arquivo que os armazena é conhecido como arquivo PARQUET.

Neste guia, focaremos principalmente na leitura/carregamento do arquivo parquet no PySpark DataFrame/SQL usando a função read.parquet() que está disponível na classe pyspark.sql.DataFrameReader.

Tema do conteúdo:







Obtenha o Arquivo Parquet



Leia o arquivo Parquet para o PySpark DataFrame



Leia o arquivo Parquet para o PySpark SQL





Pyspark.sql.DataFrameReader.parquet()

Esta função é usada para ler o arquivo parquet e carregá-lo no PySpark DataFrame. Leva o nome do caminho/arquivo do arquivo parquet. Podemos simplesmente usar a função read.parquet() já que esta é a função genérica.

Sintaxe:



Vejamos a sintaxe de read.parquet():

spark_app.read.parquet(file_name.parquet/path)

Primeiro, instale o módulo PySpark usando o comando pip:

pip instalar pyspark

Obtenha o Arquivo Parquet

Para ler um arquivo parquet, você precisa dos dados nos quais o arquivo parquet é gerado a partir desses dados. Nesta parte, veremos como gerar um arquivo parquet a partir do PySpark DataFrame.

Vamos criar um PySpark DataFrame com 5 registros e gravá-lo no arquivo parquet “industry_parquet”.

importar pyspark

de pyspark.sql import SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'Dica do Linux' ).getOrCreate()

# crie o dataframe que armazena os detalhes da indústria

industry_df = linuxhint_spark_app.createDataFrame([Row(Type= 'Agricultura' ,Área= 'EUA' ,
Avaliação= 'Quente' ,Total_funcionários= 100 ),

Linha(Tipo= 'Agricultura' ,Área= 'Índia' ,Classificação= 'Quente' ,Total_funcionários= 200 ),

Linha(Tipo= 'Desenvolvimento' ,Área= 'EUA' ,Classificação= 'Esquentar' ,Total_funcionários= 100 ),

Linha(Tipo= 'Educação' ,Área= 'EUA' ,Classificação= 'Legal' ,Total_funcionários= 400 ),

Linha(Tipo= 'Educação' ,Área= 'EUA' ,Classificação= 'Esquentar' ,Total_funcionários= vinte )

])

# DataFrame real

indústria_df.show()

# Escreva o industry_df no arquivo parquet

indústria_df.coalesce( 1 ).write.parquet( 'indústria_parquet' )

Saída:

Este é o DataFrame que contém 5 registros.

Um arquivo parquet é criado para o DataFrame anterior. Aqui, nosso nome de arquivo com uma extensão é “part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet”. Usamos esse arquivo em todo o tutorial.

Leia o arquivo Parquet para o PySpark DataFrame

Temos o arquivo parquet. Vamos ler este arquivo usando a função read.parquet() e carregá-lo no PySpark DataFrame.

importar pyspark

de pyspark.sql import SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'Dica do Linux' ).getOrCreate()

# Leia o arquivo parquet no objeto dataframe_from_parquet.

dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )

# Mostra o dataframe_from_parquet-DataFrame

dataframe_from_parquet.show()

Saída:

Exibimos o DataFrame usando o método show() que foi criado a partir do arquivo parquet.

Consultas SQL com arquivo Parquet

Após o carregamento no DataFrame, pode ser possível criar as tabelas SQL e exibir os dados presentes no DataFrame. Precisamos criar uma TEMPORARY VIEW e usar os comandos SQL para retornar os registros do DataFrame que é criado a partir do arquivo parquet.

Exemplo 1:

Crie uma exibição temporária chamada “Sectors” e use o comando SELECT para exibir os registros no DataFrame. Você pode se referir a isso tutorial que explica como criar uma VIEW no Spark – SQL.

importar pyspark

de pyspark.sql import SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'Dica do Linux' ).getOrCreate()

# Leia o arquivo parquet no objeto dataframe_from_parquet.

dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )

# Crie uma visualização a partir do arquivo parquet acima chamado - 'Setores'

dataframe_from_parquet.createOrReplaceTempView( 'Setores' )

# Consulta para exibir todos os registros dos Setores

linuxhint_spark_app.sql( 'selecionar * de Setores' ).mostrar()

Saída:

Exemplo 2:

Usando a VIEW anterior, escreva a consulta SQL:

  1. Para exibir todos os registros dos Setores que pertencem a “Índia”.
  2. Exibir todos os registros dos Setores com funcionário maior que 100.
# Consulta para exibir todos os registros dos Setores pertencentes a 'Índia'.

linuxhint_spark_app.sql( 'selecione * de setores onde Área='Índia'' ).mostrar()

# Consulta para exibir todos os registros dos Setores com funcionário maior que 100

linuxhint_spark_app.sql( 'selecione * de setores onde Total_employees>100' ).mostrar()

Saída:

Existe apenas um registro com área que é “Índia” e dois registros com funcionários maiores que 100.

Leia o arquivo Parquet para o PySpark SQL

Primeiro, precisamos criar uma VIEW usando o comando CREATE. Usando a palavra-chave “path” dentro da consulta SQL, podemos ler o arquivo parquet para o Spark SQL. Após o caminho, precisamos especificar o nome do arquivo/localização do arquivo.

Sintaxe:

spark_app.sql( 'CRIAR VISUALIZAÇÃO TEMPORÁRIA view_name USANDO OPÇÕES de parquet (caminho ' file_name.parquet ')' )

Exemplo 1:

Crie uma visualização temporária chamada “Sector2” e leia o arquivo parquet nela. Usando a função sql(), escreva a consulta de seleção para exibir todos os registros presentes na exibição.

importar pyspark

de pyspark.sql import SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'Dica do Linux' ).getOrCreate()

# Leia o arquivo parquet no Spark-SQL

linuxhint_spark_app.sql( 'CRIAR VISUALIZAÇÃO TEMPORÁRIA Setor2 USANDO OPÇÕES de parquet (caminho ' part-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.snappy.parquet ')' )

# Consulta para exibir todos os registros do Setor2

linuxhint_spark_app.sql( 'selecionar * do Setor2' ).mostrar()

Saída:

Exemplo 2:

Use a VIEW anterior e escreva a consulta para exibir todos os registros com a classificação de “Hot” ou “Cool”.

# Consulta para exibir todos os registros do Setor2 com Rating- Quente ou Frio.

linuxhint_spark_app.sql( 'selecione * de Sector2 onde Rating='Hot' OR Rating='Cool'' ).mostrar()

Saída:

Existem três registros com a classificação de “Hot” ou “Cool”.

Conclusão

No PySpark, a função write.parquet() grava o DataFrame no arquivo parquet. A função read.parquet() lê o arquivo parquet para o PySpark DataFrame ou qualquer outro DataSource. Aprendemos como ler o arquivo parquet no PySpark DataFrame e na tabela PySpark. Como parte deste tutorial, também discutimos como criar as tabelas do PySpark DataFrame e filtrar os dados usando a cláusula WHERE.