Como implementar streaming de dados em tempo real em Python

Como Implementar Streaming De Dados Em Tempo Real Em Python



Dominar a implementação de streaming de dados em tempo real em Python atua como uma habilidade essencial no mundo atual, envolvido em dados. Este guia explora as etapas principais e ferramentas essenciais para utilizar streaming de dados em tempo real com autenticidade em Python. Desde a seleção de uma estrutura adequada como Apache Kafka ou Apache Pulsar até escrever um código Python para consumo, processamento e visualização eficaz de dados sem esforço, adquiriremos as habilidades necessárias para construir canais de dados ágeis e eficientes em tempo real.

Exemplo 1: Implementação de streaming de dados em tempo real em Python

Implementar um streaming de dados em tempo real em Python é crucial na era e no mundo atual orientado a dados. Neste exemplo detalhado, percorreremos o processo de construção de um sistema de streaming de dados em tempo real usando Apache Kafka e Python no Google Colab.







Para inicializar o exemplo antes de começarmos a codificar, é essencial construir um ambiente específico no Google Colab. A primeira coisa que precisamos fazer é instalar as bibliotecas necessárias. Usamos a biblioteca “kafka-python” para integração do Kafka.



! pip instalar kafka-python


Este comando instala a biblioteca “kafka-python” que fornece as funções Python e as ligações para Apache Kafka. A seguir, importamos as bibliotecas necessárias para o nosso projeto. A importação das bibliotecas necessárias, incluindo “KafkaProducer” e “KafkaConsumer”, são as classes da biblioteca “kafka-python” que nos permitem interagir com os corretores Kafka. JSON é a biblioteca Python para trabalhar com os dados JSON que usamos para serializar e desserializar as mensagens.



de importação de kafka KafkaProducer, KafkaConsumer
importar JSON


Criação de um Produtor Kafka





Isto é importante porque um produtor Kafka envia os dados para um tópico Kafka. Em nosso exemplo, criamos um produtor para enviar dados simulados em tempo real para um tópico chamado “tópico em tempo real”.

Criamos uma instância “KafkaProducer” que especifica o endereço do corretor Kafka como “localhost:9092”. Em seguida, utilizamos o “value_serializer”, função que serializa os dados antes de enviá-los ao Kafka. No nosso caso, uma função lambda codifica os dados como JSON codificado em UTF-8. Agora, vamos simular alguns dados em tempo real e enviá-los para o tópico Kafka.



produtor = KafkaProdutor ( servidores_bootstrap = 'localhost:9092' ,
valor_serializador = lambda v: json.dumps ( em ) .codificar ( 'utf-8' ) )
# Dados simulados em tempo real
dados = { 'sensor_id' : 1 , 'temperatura' : 25,5 , 'umidade' : 60,2 }
# Enviando dados para o tópico
produtor.enviar ( 'tópico em tempo real' , dados )


Nestas linhas, definimos um dicionário de “dados” que representa os dados de um sensor simulado. Em seguida, usamos o método “enviar” para publicar esses dados no “tópico em tempo real”.

Então, queremos criar um consumidor Kafka, e um consumidor Kafka lê os dados de um tópico Kafka. Criamos um consumidor para consumir e processar as mensagens no “tópico em tempo real”. Criamos uma instância “KafkaConsumer”, especificando o tópico que queremos consumir, por exemplo, (tópico em tempo real) e o endereço do corretor Kafka. Então, o “value_deserializer” é uma função que desserializa os dados recebidos do Kafka. No nosso caso, uma função lambda decodifica os dados como JSON codificado em UTF-8.

consumidor = KafkaConsumidor ( 'tópico em tempo real' ,
servidores_bootstrap = 'localhost:9092' ,
valor_deserializador =lambda x: json.loads ( x.decodificar ( 'utf-8' ) ) )


Usamos um loop iterativo para consumir e processar continuamente as mensagens do tópico.

# Leitura e processamento de dados em tempo real
para mensagem em consumidor:
dados = mensagem.valor
imprimir ( f 'Dados recebidos: {dados}' )


Recuperamos o valor de cada mensagem e os dados simulados do sensor dentro do loop e os imprimimos no console. A execução do produtor e consumidor Kafka envolve a execução desse código no Google Colab e a execução das células do código individualmente. O produtor envia os dados simulados para o tópico Kafka e o consumidor lê e imprime os dados recebidos.


Análise da saída à medida que o código é executado

Observaremos dados em tempo real que estão sendo produzidos e consumidos. O formato dos dados pode variar dependendo da nossa simulação ou da fonte de dados real. Neste exemplo detalhado, cobrimos todo o processo de configuração de um sistema de streaming de dados em tempo real usando Apache Kafka e Python no Google Colab. Explicaremos cada linha de código e seu significado na construção deste sistema. O streaming de dados em tempo real é um recurso poderoso e este exemplo serve como base para aplicações mais complexas do mundo real.

Exemplo 2: Implementando um streaming de dados em tempo real em Python usando dados do mercado de ações

Vamos fazer outro exemplo único de implementação de streaming de dados em tempo real em Python usando um cenário diferente; desta vez, vamos nos concentrar nos dados do mercado de ações. Criamos um sistema de streaming de dados em tempo real que captura as alterações no preço das ações e as processa usando Apache Kafka e Python no Google Colab. Conforme demonstrado no exemplo anterior, começamos configurando nosso ambiente no Google Colab. Primeiro, instalamos as bibliotecas necessárias:

! pip instalar kafka-python yfinance


Aqui, adicionamos a biblioteca “yfinance” que nos permite obter dados do mercado de ações em tempo real. A seguir, importamos as bibliotecas necessárias. Continuamos a usar as classes “KafkaProducer” e “KafkaConsumer” da biblioteca “kafka-python” para interação Kafka. Importamos JSON para trabalhar com os dados JSON. Também usamos “yfinance” para obter dados do mercado de ações em tempo real. Também importamos a biblioteca “time” para adicionar um atraso para simular as atualizações em tempo real.

de importação de kafka KafkaProducer, KafkaConsumer
importar JSON
importar e financiar como sim
importar tempo


Agora, criamos um produtor Kafka para dados de estoque. Nosso produtor Kafka obtém dados de estoque em tempo real e os envia para um tópico Kafka chamado “preço das ações”.

produtor = KafkaProdutor ( servidores_bootstrap = 'localhost:9092' ,
valor_serializador = lambda v: json.dumps ( em ) .codificar ( 'utf-8' ) )

enquanto Verdadeiro:
estoque = yf.Ticker ( 'AAPL' ) # Exemplo: ações da Apple Inc.
stock_data = estoque.histórico ( período = '1d' )
último_preço = dados_estoque [ 'Fechar' ] .iloc [ - 1 ]
dados = { 'símbolo' : 'AAPL' , 'preço' : último preço }
produtor.enviar ( 'preço das ações' , dados )
hora de dormir ( 10 ) # Simule atualizações em tempo real a cada 10 segundos


Criamos uma instância “KafkaProducer” com o endereço do corretor Kafka neste código. Dentro do loop, usamos “yfinance” para obter o preço mais recente das ações da Apple Inc. (“AAPL”). Em seguida, extraímos o último preço de fechamento e enviamos para o tópico “preço da ação”. Eventualmente, introduzimos um atraso para simular as atualizações em tempo real a cada 10 segundos.

Vamos criar um consumidor Kafka para ler e processar os dados do preço das ações do tópico “preço das ações”.

consumidor = KafkaConsumidor ( 'preço das ações' ,
servidores_bootstrap = 'localhost:9092' ,
valor_deserializador =lambda x: json.loads ( x.decodificar ( 'utf-8' ) ) )

para mensagem em consumidor:
stock_data = mensagem.valor
imprimir ( f 'Dados de estoque recebidos: {stock_data['symbol']} - Preço: {stock_data['price']}' )


Este código é semelhante à configuração do consumidor do exemplo anterior. Ele lê e processa continuamente as mensagens do tópico “preço das ações” e imprime o símbolo da ação e o preço no console. Executamos as células de código sequencialmente, por exemplo, uma por uma no Google Colab para executar o produtor e o consumidor. O produtor recebe e envia atualizações do preço das ações em tempo real enquanto o consumidor lê e exibe esses dados.

! pip instalar kafka-python yfinance
de importação de kafka KafkaProducer, KafkaConsumer
importar JSON
importar e financiar como sim
importar tempo
produtor = KafkaProdutor ( servidores_bootstrap = 'localhost:9092' ,
valor_serializador = lambda v: json.dumps ( em ) .codificar ( 'utf-8' ) )

enquanto Verdadeiro:
estoque = yf.Ticker ( 'AAPL' ) # ações da Apple Inc.
stock_data = estoque.histórico ( período = '1d' )
último_preço = dados_estoque [ 'Fechar' ] .iloc [ - 1 ]

dados = { 'símbolo' : 'AAPL' , 'preço' : último preço }

produtor.enviar ( 'preço das ações' , dados )

hora de dormir ( 10 ) # Simule atualizações em tempo real a cada 10 segundos
consumidor = KafkaConsumidor ( 'preço das ações' ,
servidores_bootstrap = 'localhost:9092' ,
valor_deserializador =lambda x: json.loads ( x.decodificar ( 'utf-8' ) ) )

para mensagem em consumidor:
stock_data = mensagem.valor
imprimir ( f 'Dados de estoque recebidos: {stock_data['symbol']} - Preço: {stock_data['price']}' )


Na análise da saída após a execução do código, observaremos as atualizações em tempo real dos preços das ações da Apple Inc.

Conclusão

Neste exemplo único, demonstramos a implementação de streaming de dados em tempo real em Python usando Apache Kafka e a biblioteca “yfinance” para capturar e processar os dados do mercado de ações. Explicamos detalhadamente cada linha do código. O streaming de dados em tempo real pode ser aplicado a vários campos para construir aplicações do mundo real em finanças, IoT e muito mais.