Pyspark.sql.DataFrameWriter.saveAsTable()
Primeiro, veremos como gravar o PySpark DataFrame existente na tabela usando a função write.saveAsTable(). Leva o nome da tabela e outros parâmetros opcionais como modos, partionBy, etc., para gravar o DataFrame na tabela. Ele é armazenado como um arquivo parquet.
Sintaxe:
dataframe_obj.write.saveAsTable(path/Table_name,mode,partitionBy,…)
- O Table_name é o nome da tabela que é criada a partir do dataframe_obj.
- Podemos anexar/sobrescrever os dados da tabela usando o parâmetro mode.
- O partitionBy usa as colunas únicas/múltiplas para criar partições com base nos valores dessas colunas fornecidas.
Exemplo 1:
Crie um PySpark DataFrame com 5 linhas e 4 colunas. Grave este Dataframe em uma tabela chamada “Agri_Table1”.
importar pyspark
de pyspark.sql importar SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Dica do Linux' ).getOrCreate()
# dados agrícolas com 5 linhas e 5 colunas
agro =[{ 'Tipo de solo' : 'Preto' , 'Irrigação_disponibilidade' : 'Não' , 'Acres' : 2500 , 'Soil_status' : 'Seco' ,
'País' : 'EUA' },
{ 'Tipo de solo' : 'Preto' , 'Irrigação_disponibilidade' : 'Sim' , 'Acres' : 3500 , 'Soil_status' : 'Molhado' ,
'País' : 'Índia' },
{ 'Tipo de solo' : 'Vermelho' , 'Irrigação_disponibilidade' : 'Sim' , 'Acres' : 210 , 'Soil_status' : 'Seco' ,
'País' : 'REINO UNIDO' },
{ 'Tipo de solo' : 'Outro' , 'Irrigação_disponibilidade' : 'Não' , 'Acres' : 1000 , 'Soil_status' : 'Molhado' ,
'País' : 'EUA' },
{ 'Tipo de solo' : 'Areia' , 'Irrigação_disponibilidade' : 'Não' , 'Acres' : 500 , 'Soil_status' : 'Seco' ,
'País' : 'Índia' }]
# cria o dataframe a partir dos dados acima
agri_df = linuxhint_spark_app.createDataFrame(agri)
agri_df.show()
# Escreva o DataFrame acima na tabela.
agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Table1' )
Saída:
Podemos ver que um arquivo parquet é criado com os dados anteriores do PySpark.
Exemplo 2:
Considere o DataFrame anterior e escreva o “Agri_Table2” na tabela particionando os registros com base nos valores da coluna “Country”.
# Escreva o DataFrame acima na tabela com o parâmetro partitionByagri_df.write.saveAsTable( 'Agri_Table2' ,partitionBy=[ 'País' ])
Saída:
Existem três valores exclusivos na coluna 'País' - 'Índia', 'Reino Unido' e 'EUA'. Então, três partições são criadas. Cada partição contém os arquivos de parquet.
Pyspark.sql.DataFrameReader.table()
Vamos carregar a tabela no PySpark DataFrame usando a função spark.read.table(). Leva apenas um parâmetro que é o nome do caminho/tabela. Ele carrega diretamente a tabela no PySpark DataFrame e todas as funções SQL que são aplicadas ao PySpark DataFrame também podem ser aplicadas neste DataFrame carregado.
Sintaxe:
spark_app.read.table(path/'Table_name')Neste cenário, usamos a tabela anterior que foi criada a partir do PySpark DataFrame. Certifique-se de que você precisa implementar os snippets de código do cenário anterior em seu ambiente.
Exemplo:
Carregue a tabela “Agri_Table1” no DataFrame chamado “loaded_data”.
load_data = linuxhint_spark_app.read.table( 'Agri_Table1' )load_data.show()
Saída:
Podemos ver que a tabela é carregada no PySpark DataFrame.
Executando as consultas SQL
Agora, executamos algumas consultas SQL no DataFrame carregado usando a função spark.sql().
# Use o comando SELECT para exibir todas as colunas da tabela acima.linuxhint_spark_app.sql( 'SELECT * de Agri_Table1' ).mostrar()
# Cláusula WHERE
linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Soil_status='Seco'' ).mostrar()
linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Acres > 2000' ).mostrar()
Saída:
- A primeira consulta exibe todas as colunas e registros do DataFrame.
- A segunda consulta exibe os registros com base na coluna “Soil_status”. Existem apenas três registros com o elemento “Dry”.
- A última consulta retorna dois registros com “Acres” maiores que 2.000.
Pyspark.sql.DataFrameWriter.insertInto()
Usando a função insertInto(), podemos anexar o DataFrame à tabela existente. Podemos usar esta função junto com o selectExpr() para definir os nomes das colunas e depois inserir na tabela. Essa função também recebe tableName como parâmetro.
Sintaxe:
DataFrame_obj.write.insertInto('Table_name')Neste cenário, usamos a tabela anterior que foi criada a partir do PySpark DataFrame. Certifique-se de que você precisa implementar os snippets de código do cenário anterior em seu ambiente.
Exemplo:
Crie um novo DataFrame com dois registros e insira-os na tabela “Agri_Table1”.
importar pysparkde pyspark.sql importar SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Dica do Linux' ).getOrCreate()
# dados agrícolas com 2 linhas
agro =[{ 'Tipo de solo' : 'Areia' , 'Irrigação_disponibilidade' : 'Não' , 'Acres' : 2500 , 'Soil_status' : 'Seco' ,
'País' : 'EUA' },
{ 'Tipo de solo' : 'Areia' , 'Irrigação_disponibilidade' : 'Não' , 'Acres' : 1200 , 'Soil_status' : 'Molhado' ,
'País' : 'Japão' }]
# cria o dataframe a partir dos dados acima
agri_df2 = linuxhint_spark_app.createDataFrame(agri)
agri_df2.show()
# write.insertInto()
agri_df2.selectExpr( 'Acres' , 'País' , 'Irrigação_disponibilidade' , 'Tipo de solo' ,
'Soil_status' ).write.insertInto( 'Agri_Table1' )
# Exibe a Agri_Table1 final
linuxhint_spark_app.sql( 'SELECT * de Agri_Table1' ).mostrar()
Saída:
Agora, o número total de linhas presentes no DataFrame é 7.
Conclusão
Agora você sabe como gravar o PySpark DataFrame na tabela usando a função write.saveAsTable(). Leva o nome da tabela e outros parâmetros opcionais. Em seguida, carregamos essa tabela no PySpark DataFrame usando a função spark.read.table(). Leva apenas um parâmetro que é o nome do caminho/tabela. Se você deseja anexar o novo DataFrame à tabela existente, use a função insertInto().