Convertendo PySpark DataFrame para CSV

Convertendo Pyspark Dataframe Para Csv



Vejamos os quatro cenários diferentes de conversão do PySpark DataFrame em CSV. Diretamente, usamos o método write.csv() para converter o PySpark DataFrame em CSV. Usando a função to_csv(), convertemos o PySpark Pandas DataFrame em CSV. Também pode ser possível convertendo-o para a matriz NumPy.

Tema do conteúdo:

Se você quiser saber sobre o PySpark DataFrame e a instalação do módulo, consulte este artigo .







PySpark DataFrame para CSV convertendo para Pandas DataFrame

O to_csv() é um método disponível no módulo Pandas que converte o Pandas DataFrame em CSV. Primeiro, precisamos converter nosso PySpark DataFrame para Pandas DataFrame. O método toPandas() é usado para fazer isso. Vamos ver a sintaxe de to_csv() junto com seus parâmetros.



Sintaxe:



pandas_dataframe_obj.to_csv(caminho/ 'file_name.csv' , cabeçalho ,índice,colunas,modo...)
  1. Precisamos especificar o nome do arquivo CSV. Se você deseja armazenar o CSV baixado em um local específico no seu PC, também pode especificar o caminho junto com o nome do arquivo.
  2. As colunas serão incluídas se o cabeçalho estiver definido como 'Verdadeiro'. Se você não precisar de colunas, defina o cabeçalho como 'Falso'.
  3. Os índices são especificados se o índice for definido como “True”. Se você não precisar de índices, defina o índice como “False”.
  4. O parâmetro Columns obtém uma lista de nomes de coluna na qual podemos especificar quais colunas específicas são extraídas para o arquivo CSV.
  5. Podemos adicionar os registros ao CSV usando o parâmetro mode. Anexar – “a” é usado para fazer isso.

Exemplo 1: com os parâmetros de cabeçalho e índice

Crie o PySpark DataFrame “skills_df” com 3 linhas e 4 colunas. Converta este DataFrame em CSV convertendo-o primeiro no Pandas DataFrame.





importar pyspark

de pyspark.sql importar SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Dica do Linux' ).getOrCreate()

# dados de habilidades com 3 linhas e 4 colunas

habilidades =[{ 'eu ia' : 123 , 'pessoa' : 'Mel' , 'habilidade' : 'pintura' , 'prêmio' : 25000 },

{ 'eu ia' : 112 , 'pessoa' : 'Mouni' , 'habilidade' : 'dança' , 'prêmio' : 2000 },

{ 'eu ia' : 153 , 'pessoa' : 'Tulasi' , 'habilidade' : 'leitura' , 'prêmio' : 1200 }

]

# cria o dataframe de habilidades a partir dos dados acima

skills_df = linuxhint_spark_app.createDataFrame(skills)

skills_df.show()

# Converter skills_df para pandas DataFrame

pandas_skills_df= skills_df.toPandas()

print(pandas_skills_df)

# Converta este DataFrame para csv com cabeçalho e índice

pandas_skills_df.to_csv( 'pandas_skills1.csv' , cabeçalho =Verdadeiro, índice=Verdadeiro)

Saída:



Podemos ver que o PySpark DataFrame é convertido em Pandas DataFrame. Vamos ver se ele é convertido para CSV com nomes de colunas e índices:

Exemplo 2: anexar os dados ao CSV

Crie mais um PySpark DataFrame com 1 registro e anexe-o ao CSV que é criado como parte do nosso primeiro exemplo. Certifique-se de que precisamos definir o cabeçalho como “False” junto com o parâmetro mode. Caso contrário, os nomes das colunas também serão anexados como uma linha.

importar pyspark

de pyspark.sql importar SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Dica do Linux' ).getOrCreate()

habilidades =[{ 'eu ia' : 90 , 'pessoa' : 'Bhargav' , 'habilidade' : 'leitura' , 'prêmio' : 12000 }

]

# cria o dataframe de habilidades a partir dos dados acima

skills_df = linuxhint_spark_app.createDataFrame(skills)

# Converter skills_df para pandas DataFrame

pandas_skills_df= skills_df.toPandas()

# Adicione este DataFrame ao arquivo pandas_skills1.csv

pandas_skills_df.to_csv( 'pandas_skills1.csv' , modo= 'a' , cabeçalho =Falso)

Saída CSV:

Podemos ver que uma nova linha foi adicionada ao arquivo CSV.

Exemplo 3: Com o Parâmetro Colunas

Vamos ter o mesmo DataFrame e convertê-lo em CSV com duas colunas: “pessoa” e “prêmio”.

importar pyspark

de pyspark.sql importar SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Dica do Linux' ).getOrCreate()

# dados de habilidades com 3 linhas e 4 colunas

habilidades =[{ 'eu ia' : 123 , 'pessoa' : 'Mel' , 'habilidade' : 'pintura' , 'prêmio' : 25000 },

{ 'eu ia' : 112 , 'pessoa' : 'Mouni' , 'habilidade' : 'dança' , 'prêmio' : 2000 },

{ 'eu ia' : 153 , 'pessoa' : 'Tulasi' , 'habilidade' : 'leitura' , 'prêmio' : 1200 }

]

# cria o dataframe de habilidades a partir dos dados acima

skills_df = linuxhint_spark_app.createDataFrame(skills)

# Converter skills_df para pandas DataFrame

pandas_skills_df= skills_df.toPandas()

# Converta este DataFrame para csv com colunas específicas

pandas_skills_df.to_csv( 'pandas_skills2.csv' , colunas=[ 'pessoa' , 'prêmio' ])

Saída CSV:

Podemos ver que apenas as colunas “pessoa” e “prêmio” existem no arquivo CSV.

PySpark Pandas DataFrame para CSV usando o método To_Csv ()

O to_csv() é um método disponível no módulo Pandas que converte o Pandas DataFrame em CSV. Primeiro, precisamos converter nosso PySpark DataFrame para Pandas DataFrame. O método toPandas() é usado para fazer isso. Vamos ver a sintaxe de to_csv() junto com seus parâmetros:

Sintaxe:

pyspark_pandas_dataframe_obj.to_csv(caminho/ 'file_name.csv' , cabeçalho ,índice,colunas,...)
  1. Precisamos especificar o nome do arquivo CSV. Se você deseja armazenar o CSV baixado em um local específico no seu PC, também pode especificar o caminho junto com o nome do arquivo.
  2. As colunas serão incluídas se o cabeçalho estiver definido como 'Verdadeiro'. Se você não precisar de colunas, defina o cabeçalho como 'Falso'.
  3. Os índices são especificados se o índice for definido como “True”. Se você não precisar de índices, defina o índice como “False”.
  4. O parâmetro de colunas obtém uma lista de nomes de colunas na qual podemos especificar quais colunas específicas são extraídas para o arquivo CSV.

Exemplo 1: Com o Parâmetro Colunas

Crie um PySpark Pandas DataFrame com 3 colunas e converta-o em CSV usando to_csv() com as colunas “pessoa” e “prêmio”.

do pyspark importar pandas

pyspark_pandas_dataframe=pandas.DataFrame({ 'eu ia' :[ 90 , 78 , 90 , 57 ], 'pessoa' :[ 'Mel' , 'Mouni' , 'ele mesmo' , 'radha' ], 'prêmio' :[ 1 , 2 , 3 , 4 ]})

print(pyspark_pandas_dataframe)

# Converta este DataFrame para csv com colunas específicas

pyspark_pandas_dataframe.to_csv( 'pyspark_pandas1' , colunas=[ 'pessoa' , 'prêmio' ])

Saída:

Podemos ver que o PySpark Pandas DataFrame é convertido em CSV com duas partições. Cada partição contém 2 registros. Além disso, as colunas no CSV são apenas 'pessoa' e 'prêmio'.

Arquivo de partição 1:

Arquivo de Partição 2:

Exemplo 2: com o parâmetro de cabeçalho

Use o DataFrame anterior e especifique o parâmetro do cabeçalho definindo-o como “True”.

do pyspark importar pandas

pyspark_pandas_dataframe=pandas.DataFrame({ 'eu ia' :[ 90 , 78 , 90 , 57 ], 'pessoa' :[ 'Mel' , 'Mouni' , 'ele mesmo' , 'radha' ], 'prêmio' :[ 1 , 2 , 3 , 4 ]})

# Converta este DataFrame para csv com cabeçalho.

pyspark_pandas_dataframe.to_csv( 'pyspark_pandas2' , cabeçalho =Verdadeiro)

Saída CSV:

Podemos ver que o PySpark Pandas DataFrame é convertido em CSV com duas partições. Cada partição contém 2 registros com nomes de coluna.

Arquivo de partição 1:

Arquivo de Partição 2:

PySpark Pandas DataFrame para CSV convertendo para matriz NumPy

Temos a opção de converter o PySpark Pandas DataFrame em CSV convertendo na matriz Numpy. O to_numpy() é um método disponível no módulo PySpark Pandas que converte o PySpark Pandas DataFrame no array NumPy.

Sintaxe:

pyspark_pandas_dataframe_obj.to_numpy()

Não terá nenhum parâmetro.

Usando o método Tofile()

Depois de converter para o array NumPy, podemos usar o método tofile() para converter NumPy para CSV. Aqui, ele armazena cada registro em uma nova célula colunar no arquivo CSV.

Sintaxe:

array_obj.to_numpy(filename/path,sep=’ ’)

Leva o nome do arquivo ou caminho de um CSV e um separador.

Exemplo:

Crie PySpark Pandas DataFrame com 3 colunas e 4 registros e converta-o em CSV convertendo-o primeiro em uma matriz NumPy.

de pandas de importação de pyspark

pyspark_pandas_dataframe=pandas.DataFrame({ 'eu ia' :[ 90 , 78 , 90 , 57 ], 'pessoa' :[ 'Mel' , 'Mouni' , 'ele mesmo' , 'radha' ], 'prêmio' :[ 1 , 2 , 3 , 4 ]})

# Converta o DataFrame acima em array numpy

convertido = pyspark_pandas_dataframe.to_numpy()

imprimir(convertido)

# Usando tofile()

convertido.paraarquivo( 'convertido1.csv' , set = ',' )

Saída:

[[ 90 'Mel' 1 ]

[ 78 'Mouni' 2 ]

[ 90 'ele mesmo' 3 ]

[ 57 'radha' 4 ]]

Podemos ver que o PySpark Pandas DataFrame é convertido em uma matriz NumPy (12 valores). Se você puder ver os dados CSV, ele armazena cada valor de célula em uma nova coluna.

PySpark DataFrame para CSV usando o método Write.Csv()

O método write.csv() pega o nome/caminho do arquivo onde precisamos salvar o arquivo CSV como parâmetro.

Sintaxe:

dataframe_object.coalesce( 1 ).write.csv( 'nome do arquivo' )

Na verdade, o CSV é salvo como partições (mais de uma). Para nos livrarmos disso, mesclamos todos os arquivos CSV particionados em um. Neste cenário, usamos a função coalesce(). Agora, podemos ver apenas um arquivo CSV com todas as linhas do PySpark DataFrame.

Exemplo:

Considere o PySpark DataFrame com 4 registros com 4 colunas. Grave este DataFrame em CSV com o arquivo chamado “market_details”.

importar pyspark

de pyspark.sql importar SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Dica do Linux' ).getOrCreate()

# dados de mercado com 4 linhas e 4 colunas

mercado =[{ 'm_id' : 'mz-001' , 'm_name' : 'ABC' , 'm_city' : 'Délhi' , 'm_state' : 'Délhi' },

{ 'm_id' : 'mz-002' , 'm_name' : 'XYZ' , 'm_city' : 'patna' , 'm_state' : 'sorte' },

{ 'm_id' : 'mz-003' , 'm_name' : 'PQR' , 'm_city' : 'florida' , 'm_state' : 'um' },

{ 'm_id' : 'mz-004' , 'm_name' : 'ABC' , 'm_city' : 'Délhi' , 'm_state' : 'sorte' }

]



# crie o dataframe de mercado a partir dos dados acima

market_df = linuxhint_spark_app.createDataFrame(mercado)

# Dados de mercado reais

market_df.show()

# escreva.csv()

market_df.coalesce( 1 ).write.csv( 'detalhes_do_mercado' )

Saída:

Vamos verificar o arquivo:

Abra o último arquivo para ver os registros.

Conclusão

Aprendemos os quatro cenários diferentes que convertem o PySpark DataFrame em CSV com exemplos considerando diferentes parâmetros. Ao trabalhar com o PySpark DataFrame, você tem duas opções para converter esse DataFrame em CSV: uma maneira é usar o método write() e outra é usar o método to_csv() convertendo para Pandas DataFrame. Se você estiver trabalhando com o PySpark Pandas DataFrame, também poderá utilizar to_csv() e tofile() convertendo para o array NumPy.