Tema do conteúdo:
- Pandas_udf () com diferentes tipos de dados
- Pandas_udf() com diferentes tipos de dados usando Groupby() & Agg()
Se você quiser saber sobre o PySpark DataFrame e a instalação do módulo, consulte este artigo .
Pyspark.sql.functions.pandas_udf()
O pandas_udf () está disponível no módulo sql.functions no PySpark, que pode ser importado usando a palavra-chave “from”. Ele é usado para executar as operações vetorizadas em nosso PySpark DataFrame. Esta função é implementada como um decorador passando três parâmetros. Depois disso, podemos criar uma função definida pelo usuário que retorne os dados no formato vetorial (como usamos series/NumPy para isso) usando uma seta. Dentro desta função, podemos retornar o resultado.
Estrutura e Sintaxe:
Primeiro, vamos ver a estrutura e a sintaxe desta função:
@pandas_udf(tipo de dados)def function_name(operação) -> convert_format:
declaração de retorno
Aqui, o function_name é o nome da nossa função definida. O tipo de dados especifica o tipo de dados que é retornado por esta função. Podemos retornar o resultado usando a palavra-chave “return”. Todas as operações são realizadas dentro da função com a atribuição de seta.
Pandas_udf (Função e ReturnType)
- O primeiro parâmetro é a função definida pelo usuário que é passada para ele.
- O segundo parâmetro é usado para especificar o tipo de dados de retorno da função.
Dados:
Em todo este guia, usamos apenas um PySpark DataFrame para demonstração. Todas as funções definidas pelo usuário que definimos são aplicadas neste PySpark DataFrame. Certifique-se de criar este DataFrame em seu ambiente primeiro após a instalação do PySpark.
importar pyspark
de pyspark.sql importar SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Dica do Linux' ).getOrCreate()
de pyspark.sql.functions import pandas_udf
da importação de pyspark.sql.types *
importar pandas como panda
# detalhes vegetais
vegetal =[{ 'tipo' : 'vegetal' , 'nome' : 'tomate' , 'localizar_país' : 'EUA' , 'quantidade' : 800 },
{ 'tipo' : 'fruta' , 'nome' : 'banana' , 'localizar_país' : 'CHINA' , 'quantidade' : vinte },
{ 'tipo' : 'vegetal' , 'nome' : 'tomate' , 'localizar_país' : 'EUA' , 'quantidade' : 800 },
{ 'tipo' : 'vegetal' , 'nome' : 'Manga' , 'localizar_país' : 'JAPÃO' , 'quantidade' : 0 },
{ 'tipo' : 'fruta' , 'nome' : 'limão' , 'localizar_país' : 'ÍNDIA' , 'quantidade' : 1700 },
{ 'tipo' : 'vegetal' , 'nome' : 'tomate' , 'localizar_país' : 'EUA' , 'quantidade' : 1200 },
{ 'tipo' : 'vegetal' , 'nome' : 'Manga' , 'localizar_país' : 'JAPÃO' , 'quantidade' : 0 },
{ 'tipo' : 'fruta' , 'nome' : 'limão' , 'localizar_país' : 'ÍNDIA' , 'quantidade' : 0 }
]
# crie o dataframe de mercado a partir dos dados acima
market_df = linuxhint_spark_app.createDataFrame(vegetal)
market_df.show()
Saída:
Aqui, criamos este DataFrame com 4 colunas e 8 linhas. Agora, usamos o pandas_udf() para criar as funções definidas pelo usuário e aplicá-las a essas colunas.
Pandas_udf () com diferentes tipos de dados
Neste cenário, criamos algumas funções definidas pelo usuário com pandas_udf() e as aplicamos em colunas e exibimos os resultados usando o método select(). Em cada caso, usamos o pandas.Series enquanto realizamos as operações vetorizadas. Isso considera os valores da coluna como um array unidimensional e a operação é aplicada na coluna. No próprio decorador, especificamos o tipo de retorno da função.
Exemplo 1: Pandas_udf() com tipo String
Aqui, criamos duas funções definidas pelo usuário com o tipo de retorno string para converter os valores da coluna do tipo string em maiúsculas e minúsculas. Finalmente, aplicamos essas funções nas colunas “type” e “locate_country”.
# Converter a coluna de tipo para maiúsculas com pandas_udf@pandas_udf(StringType())
def type_upper_case(i: panda.Series) -> panda.Series:
return i.str.upper()
# Converter a coluna local_país para letras minúsculas com pandas_udf
@pandas_udf(StringType())
def country_lower_case(i: panda.Series) -> panda.Series:
return i.str.lower()
# Mostra as colunas usando select()
market_df.select( 'tipo' ,type_upper_case( 'tipo' ), 'localizar_país' ,
country_lower_case( 'localizar_país' )).mostrar()
Saída:
Explicação:
A função StringType() está disponível no módulo pyspark.sql.types. Já importamos este módulo ao criar o PySpark DataFrame.
- Primeiro, a UDF (função definida pelo usuário) retorna as strings em letras maiúsculas usando a função str.upper(). O str.upper() está disponível na Estrutura de Dados da Série (pois estamos convertendo para série com uma seta dentro da função) que converte a string dada para letras maiúsculas. Finalmente, esta função é aplicada à coluna “type” que é especificada dentro do método select(). Anteriormente, todas as strings na coluna de tipo estavam em letras minúsculas. Agora, eles são alterados para maiúsculas.
- Em segundo lugar, UDF retorna as strings em letras maiúsculas usando a função str.lower(). O str.lower() está disponível na estrutura de dados da série, que converte a string fornecida em minúsculas. Finalmente, esta função é aplicada à coluna “type” que é especificada dentro do método select(). Anteriormente, todas as strings na coluna de tipo estavam em letras maiúsculas. Agora, eles são alterados para minúsculas.
Exemplo 2: Pandas_udf() com tipo inteiro
Vamos criar uma UDF que converta a coluna inteira PySpark DataFrame na série Pandas e adicione 100 a cada valor. Passe a coluna “quantidade” para esta função dentro do método select().
# Adicionar 100@pandas_udf(IntegerType())
def add_100(i: panda.Series) -> panda.Series:
retornar i+ 100
# Passe a coluna de quantidade para a função e exibição acima.
market_df.select( 'quantidade' ,add_100( 'quantidade' )).mostrar()
Saída:
Explicação:
Dentro do UDF, iteramos todos os valores e os convertemos em Series. Depois disso, somamos 100 a cada valor da Série. Finalmente, passamos a coluna “quantidade” para esta função e podemos ver que 100 é adicionado a todos os valores.
Pandas_udf() com diferentes tipos de dados usando Groupby() & Agg()
Vejamos os exemplos para passar o UDF para as colunas agregadas. Aqui, os valores da coluna são agrupados primeiro usando a função groupby() e a agregação é feita usando a função agg(). Passamos nosso UDF dentro dessa função de agregação.
Sintaxe:
pyspark_dataframe_object.groupby( 'coluna_de_agrupamento' ).agg(UDF(pyspark_dataframe_object[ 'coluna' ]))
Aqui, os valores na coluna de agrupamento são agrupados primeiro. Em seguida, a agregação é feita em cada dado agrupado em relação ao nosso UDF.
Exemplo 1: Pandas_udf() com Aggregate Mean()
Aqui, criamos uma função definida pelo usuário com um tipo de retorno float. Dentro da função, calculamos a média usando a função mean(). Essa UDF é passada para a coluna “quantidade” para obter a quantidade média de cada tipo.
# retorna a média/média@pandas_udf( 'flutuador' )
def average_function(i: panda.Series) -> float:
return i.mean()
# Passe a coluna quantidade para a função agrupando a coluna tipo.
market_df.groupby( 'tipo' ).agg(average_function(market_df[ 'quantidade' ])).mostrar()
Saída:
Estamos agrupando com base nos elementos da coluna “tipo”. Dois grupos são formados – “frutas” e “vegetais”. Para cada grupo, a média é calculada e retornada.
Exemplo 2: Pandas_udf() com Aggregate Max() e Min()
Aqui, criamos duas funções definidas pelo usuário com o tipo de retorno inteiro (int). A primeira UDF retorna o valor mínimo e a segunda UDF retorna o valor máximo.
# pandas_udf que retornam o valor mínimo@pandas_udf( 'int' )
def min_(i: panda.Series) -> int:
return i.min()
# pandas_udf que retorna o valor máximo
@pandas_udf( 'int' )
def max_(i: panda.Series) -> int:
return i.max()
# Passe a coluna de quantidade para o min_ pandas_udf agrupando a localização_país.
market_df.groupby( 'localizar_país' ).agg(min_(mercado_df[ 'quantidade' ])).mostrar()
# Passe a coluna de quantidade para o max_ pandas_udf agrupando a localização_país.
market_df.groupby( 'localizar_país' ).agg(max_(mercado_df[ 'quantidade' ])).mostrar()
Saída:
Para retornar valores mínimos e máximos, utilizamos as funções min() e max() no tipo de retorno das UDFs. Agora, agrupamos os dados na coluna “locate_country”. Quatro grupos são formados (“CHINA”, “INDIA”, “JAPAN”, “USA”). Para cada grupo, retornamos a quantidade máxima. Da mesma forma, retornamos a quantidade mínima.
Conclusão
Basicamente, o pandas_udf() é usado para realizar as operações vetorizadas em nosso PySpark DataFrame. Vimos como criar o pandas_udf() e aplicá-lo ao PySpark DataFrame. Para melhor compreensão, discutimos os diferentes exemplos considerando todos os tipos de dados (string, float e integer). Pode ser possível usar o pandas_udf() com groupby() através da função agg().