Como ler e gravar dados de uma tabela no PySpark

Como Ler E Gravar Dados De Uma Tabela No Pyspark



O processamento de dados no PySpark é mais rápido se os dados forem carregados na forma de tabela. Com isso, utilizando as Expressões SQL, o processamento será rápido. Portanto, converter o PySpark DataFrame/RDD em uma tabela antes de enviá-lo para processamento é a melhor abordagem. Hoje, veremos como ler os dados da tabela no PySpark DataFrame, gravar o PySpark DataFrame na tabela e inserir um novo DataFrame na tabela existente usando as funções integradas. Vamos!

Pyspark.sql.DataFrameWriter.saveAsTable()

Primeiro, veremos como gravar o PySpark DataFrame existente na tabela usando a função write.saveAsTable(). Leva o nome da tabela e outros parâmetros opcionais como modos, partionBy, etc., para gravar o DataFrame na tabela. Ele é armazenado como um arquivo parquet.

Sintaxe:







dataframe_obj.write.saveAsTable(path/Table_name,mode,partitionBy,…)
  1. O Table_name é o nome da tabela que é criada a partir do dataframe_obj.
  2. Podemos anexar/sobrescrever os dados da tabela usando o parâmetro mode.
  3. O partitionBy usa as colunas únicas/múltiplas para criar partições com base nos valores dessas colunas fornecidas.

Exemplo 1:

Crie um PySpark DataFrame com 5 linhas e 4 colunas. Grave este Dataframe em uma tabela chamada “Agri_Table1”.



importar pyspark

de pyspark.sql importar SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Dica do Linux' ).getOrCreate()

# dados agrícolas com 5 linhas e 5 colunas

agro =[{ 'Tipo de solo' : 'Preto' , 'Irrigação_disponibilidade' : 'Não' , 'Acres' : 2500 , 'Soil_status' : 'Seco' ,
'País' : 'EUA' },

{ 'Tipo de solo' : 'Preto' , 'Irrigação_disponibilidade' : 'Sim' , 'Acres' : 3500 , 'Soil_status' : 'Molhado' ,
'País' : 'Índia' },

{ 'Tipo de solo' : 'Vermelho' , 'Irrigação_disponibilidade' : 'Sim' , 'Acres' : 210 , 'Soil_status' : 'Seco' ,
'País' : 'REINO UNIDO' },

{ 'Tipo de solo' : 'Outro' , 'Irrigação_disponibilidade' : 'Não' , 'Acres' : 1000 , 'Soil_status' : 'Molhado' ,
'País' : 'EUA' },

{ 'Tipo de solo' : 'Areia' , 'Irrigação_disponibilidade' : 'Não' , 'Acres' : 500 , 'Soil_status' : 'Seco' ,
'País' : 'Índia' }]



# cria o dataframe a partir dos dados acima

agri_df = linuxhint_spark_app.createDataFrame(agri)

agri_df.show()

# Escreva o DataFrame acima na tabela.

agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Table1' )

Saída:







Podemos ver que um arquivo parquet é criado com os dados anteriores do PySpark.



Exemplo 2:

Considere o DataFrame anterior e escreva o “Agri_Table2” na tabela particionando os registros com base nos valores da coluna “Country”.

# Escreva o DataFrame acima na tabela com o parâmetro partitionBy

agri_df.write.saveAsTable( 'Agri_Table2' ,partitionBy=[ 'País' ])

Saída:

Existem três valores exclusivos na coluna 'País' - 'Índia', 'Reino Unido' e 'EUA'. Então, três partições são criadas. Cada partição contém os arquivos de parquet.

Pyspark.sql.DataFrameReader.table()

Vamos carregar a tabela no PySpark DataFrame usando a função spark.read.table(). Leva apenas um parâmetro que é o nome do caminho/tabela. Ele carrega diretamente a tabela no PySpark DataFrame e todas as funções SQL que são aplicadas ao PySpark DataFrame também podem ser aplicadas neste DataFrame carregado.

Sintaxe:

spark_app.read.table(path/'Table_name')

Neste cenário, usamos a tabela anterior que foi criada a partir do PySpark DataFrame. Certifique-se de que você precisa implementar os snippets de código do cenário anterior em seu ambiente.

Exemplo:

Carregue a tabela “Agri_Table1” no DataFrame chamado “loaded_data”.

load_data = linuxhint_spark_app.read.table( 'Agri_Table1' )

load_data.show()

Saída:

Podemos ver que a tabela é carregada no PySpark DataFrame.

Executando as consultas SQL

Agora, executamos algumas consultas SQL no DataFrame carregado usando a função spark.sql().

# Use o comando SELECT para exibir todas as colunas da tabela acima.

linuxhint_spark_app.sql( 'SELECT * de Agri_Table1' ).mostrar()

# Cláusula WHERE

linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Soil_status='Seco'' ).mostrar()

linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Acres > 2000' ).mostrar()

Saída:

  1. A primeira consulta exibe todas as colunas e registros do DataFrame.
  2. A segunda consulta exibe os registros com base na coluna “Soil_status”. Existem apenas três registros com o elemento “Dry”.
  3. A última consulta retorna dois registros com “Acres” maiores que 2.000.

Pyspark.sql.DataFrameWriter.insertInto()

Usando a função insertInto(), podemos anexar o DataFrame à tabela existente. Podemos usar esta função junto com o selectExpr() para definir os nomes das colunas e depois inserir na tabela. Essa função também recebe tableName como parâmetro.

Sintaxe:

DataFrame_obj.write.insertInto('Table_name')

Neste cenário, usamos a tabela anterior que foi criada a partir do PySpark DataFrame. Certifique-se de que você precisa implementar os snippets de código do cenário anterior em seu ambiente.

Exemplo:

Crie um novo DataFrame com dois registros e insira-os na tabela “Agri_Table1”.

importar pyspark

de pyspark.sql importar SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Dica do Linux' ).getOrCreate()

# dados agrícolas com 2 linhas

agro =[{ 'Tipo de solo' : 'Areia' , 'Irrigação_disponibilidade' : 'Não' , 'Acres' : 2500 , 'Soil_status' : 'Seco' ,
'País' : 'EUA' },

{ 'Tipo de solo' : 'Areia' , 'Irrigação_disponibilidade' : 'Não' , 'Acres' : 1200 , 'Soil_status' : 'Molhado' ,
'País' : 'Japão' }]

# cria o dataframe a partir dos dados acima

agri_df2 = linuxhint_spark_app.createDataFrame(agri)

agri_df2.show()

# write.insertInto()

agri_df2.selectExpr( 'Acres' , 'País' , 'Irrigação_disponibilidade' , 'Tipo de solo' ,
'Soil_status' ).write.insertInto( 'Agri_Table1' )

# Exibe a Agri_Table1 final

linuxhint_spark_app.sql( 'SELECT * de Agri_Table1' ).mostrar()

Saída:

Agora, o número total de linhas presentes no DataFrame é 7.

Conclusão

Agora você sabe como gravar o PySpark DataFrame na tabela usando a função write.saveAsTable(). Leva o nome da tabela e outros parâmetros opcionais. Em seguida, carregamos essa tabela no PySpark DataFrame usando a função spark.read.table(). Leva apenas um parâmetro que é o nome do caminho/tabela. Se você deseja anexar o novo DataFrame à tabela existente, use a função insertInto().