Neste guia, focaremos principalmente na leitura/carregamento do arquivo parquet no PySpark DataFrame/SQL usando a função read.parquet() que está disponível na classe pyspark.sql.DataFrameReader.
Tema do conteúdo:
Leia o arquivo Parquet para o PySpark DataFrame
Leia o arquivo Parquet para o PySpark SQL
Pyspark.sql.DataFrameReader.parquet()
Esta função é usada para ler o arquivo parquet e carregá-lo no PySpark DataFrame. Leva o nome do caminho/arquivo do arquivo parquet. Podemos simplesmente usar a função read.parquet() já que esta é a função genérica.
Sintaxe:
Vejamos a sintaxe de read.parquet():
spark_app.read.parquet(file_name.parquet/path)Primeiro, instale o módulo PySpark usando o comando pip:
pip instalar pyspark
Obtenha o Arquivo Parquet
Para ler um arquivo parquet, você precisa dos dados nos quais o arquivo parquet é gerado a partir desses dados. Nesta parte, veremos como gerar um arquivo parquet a partir do PySpark DataFrame.
Vamos criar um PySpark DataFrame com 5 registros e gravá-lo no arquivo parquet “industry_parquet”.
importar pysparkde pyspark.sql import SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( 'Dica do Linux' ).getOrCreate()
# crie o dataframe que armazena os detalhes da indústria
industry_df = linuxhint_spark_app.createDataFrame([Row(Type= 'Agricultura' ,Área= 'EUA' ,
Avaliação= 'Quente' ,Total_funcionários= 100 ),
Linha(Tipo= 'Agricultura' ,Área= 'Índia' ,Classificação= 'Quente' ,Total_funcionários= 200 ),
Linha(Tipo= 'Desenvolvimento' ,Área= 'EUA' ,Classificação= 'Esquentar' ,Total_funcionários= 100 ),
Linha(Tipo= 'Educação' ,Área= 'EUA' ,Classificação= 'Legal' ,Total_funcionários= 400 ),
Linha(Tipo= 'Educação' ,Área= 'EUA' ,Classificação= 'Esquentar' ,Total_funcionários= vinte )
])
# DataFrame real
indústria_df.show()
# Escreva o industry_df no arquivo parquet
indústria_df.coalesce( 1 ).write.parquet( 'indústria_parquet' )
Saída:
Este é o DataFrame que contém 5 registros.
Um arquivo parquet é criado para o DataFrame anterior. Aqui, nosso nome de arquivo com uma extensão é “part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet”. Usamos esse arquivo em todo o tutorial.
Leia o arquivo Parquet para o PySpark DataFrame
Temos o arquivo parquet. Vamos ler este arquivo usando a função read.parquet() e carregá-lo no PySpark DataFrame.
importar pysparkde pyspark.sql import SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( 'Dica do Linux' ).getOrCreate()
# Leia o arquivo parquet no objeto dataframe_from_parquet.
dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )
# Mostra o dataframe_from_parquet-DataFrame
dataframe_from_parquet.show()
Saída:
Exibimos o DataFrame usando o método show() que foi criado a partir do arquivo parquet.
Consultas SQL com arquivo Parquet
Após o carregamento no DataFrame, pode ser possível criar as tabelas SQL e exibir os dados presentes no DataFrame. Precisamos criar uma TEMPORARY VIEW e usar os comandos SQL para retornar os registros do DataFrame que é criado a partir do arquivo parquet.
Exemplo 1:
Crie uma exibição temporária chamada “Sectors” e use o comando SELECT para exibir os registros no DataFrame. Você pode se referir a isso tutorial que explica como criar uma VIEW no Spark – SQL.
importar pysparkde pyspark.sql import SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( 'Dica do Linux' ).getOrCreate()
# Leia o arquivo parquet no objeto dataframe_from_parquet.
dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )
# Crie uma visualização a partir do arquivo parquet acima chamado - 'Setores'
dataframe_from_parquet.createOrReplaceTempView( 'Setores' )
# Consulta para exibir todos os registros dos Setores
linuxhint_spark_app.sql( 'selecionar * de Setores' ).mostrar()
Saída:
Exemplo 2:
Usando a VIEW anterior, escreva a consulta SQL:
- Para exibir todos os registros dos Setores que pertencem a “Índia”.
- Exibir todos os registros dos Setores com funcionário maior que 100.
linuxhint_spark_app.sql( 'selecione * de setores onde Área='Índia'' ).mostrar()
# Consulta para exibir todos os registros dos Setores com funcionário maior que 100
linuxhint_spark_app.sql( 'selecione * de setores onde Total_employees>100' ).mostrar()
Saída:
Existe apenas um registro com área que é “Índia” e dois registros com funcionários maiores que 100.
Leia o arquivo Parquet para o PySpark SQL
Primeiro, precisamos criar uma VIEW usando o comando CREATE. Usando a palavra-chave “path” dentro da consulta SQL, podemos ler o arquivo parquet para o Spark SQL. Após o caminho, precisamos especificar o nome do arquivo/localização do arquivo.
Sintaxe:
spark_app.sql( 'CRIAR VISUALIZAÇÃO TEMPORÁRIA view_name USANDO OPÇÕES de parquet (caminho ' file_name.parquet ')' )Exemplo 1:
Crie uma visualização temporária chamada “Sector2” e leia o arquivo parquet nela. Usando a função sql(), escreva a consulta de seleção para exibir todos os registros presentes na exibição.
importar pysparkde pyspark.sql import SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( 'Dica do Linux' ).getOrCreate()
# Leia o arquivo parquet no Spark-SQL
linuxhint_spark_app.sql( 'CRIAR VISUALIZAÇÃO TEMPORÁRIA Setor2 USANDO OPÇÕES de parquet (caminho ' part-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.snappy.parquet ')' )
# Consulta para exibir todos os registros do Setor2
linuxhint_spark_app.sql( 'selecionar * do Setor2' ).mostrar()
Saída:
Exemplo 2:
Use a VIEW anterior e escreva a consulta para exibir todos os registros com a classificação de “Hot” ou “Cool”.
# Consulta para exibir todos os registros do Setor2 com Rating- Quente ou Frio.linuxhint_spark_app.sql( 'selecione * de Sector2 onde Rating='Hot' OR Rating='Cool'' ).mostrar()
Saída:
Existem três registros com a classificação de “Hot” ou “Cool”.
Conclusão
No PySpark, a função write.parquet() grava o DataFrame no arquivo parquet. A função read.parquet() lê o arquivo parquet para o PySpark DataFrame ou qualquer outro DataSource. Aprendemos como ler o arquivo parquet no PySpark DataFrame e na tabela PySpark. Como parte deste tutorial, também discutimos como criar as tabelas do PySpark DataFrame e filtrar os dados usando a cláusula WHERE.