PySpark Pandas_Udf()

Pyspark Pandas Udf



A transformação do PySpark DataFrame é possível usando a função pandas_udf(). É uma função definida pelo usuário que é aplicada no PySpark DataFrame com seta. Podemos realizar as operações vetorizadas usando o pandas_udf(). Pode ser implementado passando esta função como decorador. Vamos mergulhar neste guia para conhecer a sintaxe, parâmetros e diferentes exemplos.

Tema do conteúdo:

Se você quiser saber sobre o PySpark DataFrame e a instalação do módulo, consulte este artigo .







Pyspark.sql.functions.pandas_udf()

O pandas_udf () está disponível no módulo sql.functions no PySpark, que pode ser importado usando a palavra-chave “from”. Ele é usado para executar as operações vetorizadas em nosso PySpark DataFrame. Esta função é implementada como um decorador passando três parâmetros. Depois disso, podemos criar uma função definida pelo usuário que retorne os dados no formato vetorial (como usamos series/NumPy para isso) usando uma seta. Dentro desta função, podemos retornar o resultado.



Estrutura e Sintaxe:



Primeiro, vamos ver a estrutura e a sintaxe desta função:

@pandas_udf(tipo de dados)
def function_name(operação) -> convert_format:
declaração de retorno

Aqui, o function_name é o nome da nossa função definida. O tipo de dados especifica o tipo de dados que é retornado por esta função. Podemos retornar o resultado usando a palavra-chave “return”. Todas as operações são realizadas dentro da função com a atribuição de seta.





Pandas_udf (Função e ReturnType)

  1. O primeiro parâmetro é a função definida pelo usuário que é passada para ele.
  2. O segundo parâmetro é usado para especificar o tipo de dados de retorno da função.

Dados:

Em todo este guia, usamos apenas um PySpark DataFrame para demonstração. Todas as funções definidas pelo usuário que definimos são aplicadas neste PySpark DataFrame. Certifique-se de criar este DataFrame em seu ambiente primeiro após a instalação do PySpark.



importar pyspark

de pyspark.sql importar SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Dica do Linux' ).getOrCreate()

de pyspark.sql.functions import pandas_udf

da importação de pyspark.sql.types *

importar pandas como panda

# detalhes vegetais

vegetal =[{ 'tipo' : 'vegetal' , 'nome' : 'tomate' , 'localizar_país' : 'EUA' , 'quantidade' : 800 },

{ 'tipo' : 'fruta' , 'nome' : 'banana' , 'localizar_país' : 'CHINA' , 'quantidade' : vinte },

{ 'tipo' : 'vegetal' , 'nome' : 'tomate' , 'localizar_país' : 'EUA' , 'quantidade' : 800 },

{ 'tipo' : 'vegetal' , 'nome' : 'Manga' , 'localizar_país' : 'JAPÃO' , 'quantidade' : 0 },

{ 'tipo' : 'fruta' , 'nome' : 'limão' , 'localizar_país' : 'ÍNDIA' , 'quantidade' : 1700 },

{ 'tipo' : 'vegetal' , 'nome' : 'tomate' , 'localizar_país' : 'EUA' , 'quantidade' : 1200 },

{ 'tipo' : 'vegetal' , 'nome' : 'Manga' , 'localizar_país' : 'JAPÃO' , 'quantidade' : 0 },

{ 'tipo' : 'fruta' , 'nome' : 'limão' , 'localizar_país' : 'ÍNDIA' , 'quantidade' : 0 }

]

# crie o dataframe de mercado a partir dos dados acima

market_df = linuxhint_spark_app.createDataFrame(vegetal)

market_df.show()

Saída:

Aqui, criamos este DataFrame com 4 colunas e 8 linhas. Agora, usamos o pandas_udf() para criar as funções definidas pelo usuário e aplicá-las a essas colunas.

Pandas_udf () com diferentes tipos de dados

Neste cenário, criamos algumas funções definidas pelo usuário com pandas_udf() e as aplicamos em colunas e exibimos os resultados usando o método select(). Em cada caso, usamos o pandas.Series enquanto realizamos as operações vetorizadas. Isso considera os valores da coluna como um array unidimensional e a operação é aplicada na coluna. No próprio decorador, especificamos o tipo de retorno da função.

Exemplo 1: Pandas_udf() com tipo String

Aqui, criamos duas funções definidas pelo usuário com o tipo de retorno string para converter os valores da coluna do tipo string em maiúsculas e minúsculas. Finalmente, aplicamos essas funções nas colunas “type” e “locate_country”.

# Converter a coluna de tipo para maiúsculas com pandas_udf

@pandas_udf(StringType())

def type_upper_case(i: panda.Series) -> panda.Series:

return i.str.upper()

# Converter a coluna local_país para letras minúsculas com pandas_udf

@pandas_udf(StringType())

def country_lower_case(i: panda.Series) -> panda.Series:

return i.str.lower()

# Mostra as colunas usando select()

market_df.select( 'tipo' ,type_upper_case( 'tipo' ), 'localizar_país' ,
country_lower_case( 'localizar_país' )).mostrar()

Saída:

Explicação:

A função StringType() está disponível no módulo pyspark.sql.types. Já importamos este módulo ao criar o PySpark DataFrame.

  1. Primeiro, a UDF (função definida pelo usuário) retorna as strings em letras maiúsculas usando a função str.upper(). O str.upper() está disponível na Estrutura de Dados da Série (pois estamos convertendo para série com uma seta dentro da função) que converte a string dada para letras maiúsculas. Finalmente, esta função é aplicada à coluna “type” que é especificada dentro do método select(). Anteriormente, todas as strings na coluna de tipo estavam em letras minúsculas. Agora, eles são alterados para maiúsculas.
  2. Em segundo lugar, UDF retorna as strings em letras maiúsculas usando a função str.lower(). O str.lower() está disponível na estrutura de dados da série, que converte a string fornecida em minúsculas. Finalmente, esta função é aplicada à coluna “type” que é especificada dentro do método select(). Anteriormente, todas as strings na coluna de tipo estavam em letras maiúsculas. Agora, eles são alterados para minúsculas.

Exemplo 2: Pandas_udf() com tipo inteiro

Vamos criar uma UDF que converta a coluna inteira PySpark DataFrame na série Pandas e adicione 100 a cada valor. Passe a coluna “quantidade” para esta função dentro do método select().

# Adicionar 100

@pandas_udf(IntegerType())

def add_100(i: panda.Series) -> panda.Series:

retornar i+ 100

# Passe a coluna de quantidade para a função e exibição acima.

market_df.select( 'quantidade' ,add_100( 'quantidade' )).mostrar()

Saída:

Explicação:

Dentro do UDF, iteramos todos os valores e os convertemos em Series. Depois disso, somamos 100 a cada valor da Série. Finalmente, passamos a coluna “quantidade” para esta função e podemos ver que 100 é adicionado a todos os valores.

Pandas_udf() com diferentes tipos de dados usando Groupby() & Agg()

Vejamos os exemplos para passar o UDF para as colunas agregadas. Aqui, os valores da coluna são agrupados primeiro usando a função groupby() e a agregação é feita usando a função agg(). Passamos nosso UDF dentro dessa função de agregação.

Sintaxe:

pyspark_dataframe_object.groupby( 'coluna_de_agrupamento' ).agg(UDF
(pyspark_dataframe_object[ 'coluna' ]))

Aqui, os valores na coluna de agrupamento são agrupados primeiro. Em seguida, a agregação é feita em cada dado agrupado em relação ao nosso UDF.

Exemplo 1: Pandas_udf() com Aggregate Mean()

Aqui, criamos uma função definida pelo usuário com um tipo de retorno float. Dentro da função, calculamos a média usando a função mean(). Essa UDF é passada para a coluna “quantidade” para obter a quantidade média de cada tipo.

# retorna a média/média

@pandas_udf( 'flutuador' )

def average_function(i: panda.Series) -> float:

return i.mean()

# Passe a coluna quantidade para a função agrupando a coluna tipo.

market_df.groupby( 'tipo' ).agg(average_function(market_df[ 'quantidade' ])).mostrar()

Saída:

Estamos agrupando com base nos elementos da coluna “tipo”. Dois grupos são formados – “frutas” e “vegetais”. Para cada grupo, a média é calculada e retornada.

Exemplo 2: Pandas_udf() com Aggregate Max() e Min()

Aqui, criamos duas funções definidas pelo usuário com o tipo de retorno inteiro (int). A primeira UDF retorna o valor mínimo e a segunda UDF retorna o valor máximo.

# pandas_udf que retornam o valor mínimo

@pandas_udf( 'int' )

def min_(i: panda.Series) -> int:

return i.min()

# pandas_udf que retorna o valor máximo

@pandas_udf( 'int' )

def max_(i: panda.Series) -> int:

return i.max()

# Passe a coluna de quantidade para o min_ pandas_udf agrupando a localização_país.

market_df.groupby( 'localizar_país' ).agg(min_(mercado_df[ 'quantidade' ])).mostrar()

# Passe a coluna de quantidade para o max_ pandas_udf agrupando a localização_país.

market_df.groupby( 'localizar_país' ).agg(max_(mercado_df[ 'quantidade' ])).mostrar()

Saída:

Para retornar valores mínimos e máximos, utilizamos as funções min() e max() no tipo de retorno das UDFs. Agora, agrupamos os dados na coluna “locate_country”. Quatro grupos são formados (“CHINA”, “INDIA”, “JAPAN”, “USA”). Para cada grupo, retornamos a quantidade máxima. Da mesma forma, retornamos a quantidade mínima.

Conclusão

Basicamente, o pandas_udf() é usado para realizar as operações vetorizadas em nosso PySpark DataFrame. Vimos como criar o pandas_udf() e aplicá-lo ao PySpark DataFrame. Para melhor compreensão, discutimos os diferentes exemplos considerando todos os tipos de dados (string, float e integer). Pode ser possível usar o pandas_udf() com groupby() através da função agg().