Script em Python para conectar com o Postgresql (do CloudFoundry) para leitura e escrita de dados no Insights Hub(opcional).
Este projeto foi desenvolvido para facilitar a integração entre a plataforma MindSphere (aka Insights Hub) e um banco de dados PostgreSQL, permitindo uma coleta de dados eficiente e automatizada. É ideal para usuários que precisam de uma solução robusta para gerenciamento de dados de IoT.
- Extração automática de dados da API do MindSphere.
- Armazenamento de dados em um banco de dados PostgreSQL.
- Agendamento de tarefas para automação de processos.
- Configuração flexível para diferentes ambientes de execução.
- Python
- PostgreSQL
- APScheduler
- CloudFoundry
- Python 3.6 ou superior;
- Acesso ao CloudFoundry;
- Acesso ao Insights Hub (opcional)
Clone o repositório para sua máquina local:
git clone https://github.com/Abyss-Whisper/Python-Postgresql-CloudFoundry.git
cd Python-Postgresql-CloudFoundry
O manifest.yml
é o Arquivo que será usado para orientar o CloudFoundry a inicializar a aplicação: como nome
, tamanho da aplicação
, instancias
.
Recomenda-se mudar apenas o nome
do app, pois esse é que será registrado como seu app
.
Para configurar todo o CloudFoundry, não é tão complexo, mas requer atenção:
Você primeiro precisa entender quais pacotes têm na sua Org, para isso, use o seguinte comando:
cf marketplace
De acordo com o offering
, plans
iremos decidir qual o offering (Postgresql) e o plano fica a seu critério.
Após decidir qual o plan
irá ser usado, vamos criar a instancia:
cf create-service <service_orffering> <service_plan> <service_name>
Após a criação do service
, vamos verificar se deu certo a criação:
cf services
Agora, vá na pasta da sua aplicação que você copiou do Repositório, e copie o caminho e use o cd
, para trocar o caminho da pasta. Agora, use o cf push
:
cf push
A sua aplicação constará erro, pois não linkamos o banco de dados
nem as credenciais. O motivo de darmos o push
, é para termos nosso app no CloudFoundry e depois linkarmos ele com o PostgreSQL.
Agora, nós iremos bindar
a aplicação com o PostgreSQL.
cf bind-service <app_name> <service_name>
Quando a bind
for feita, agora iremos adquirir os dados de credenciais do PostgreSQL, com o seguinte comando:
cf env <app_name>
Edite de acordo com as suas credenciais. Abra o arquivo main.py
para editar as opções nessaa seguinte parte do código:
def connect_to_db():
try:
conn = psycopg2.connect(
dbname="<database_name>",
user="<database_username>",
password="<database_password",
host="<database_host>"
)
print('conectado')
return conn
except Exception as e:
print(f"Erro ao conectar ao banco de dados: {e}")
return None
Abaixo, iremos mostrar algumas funções que podem ser usadas e EDITADAS para o seu caso:
#criando a tabela (caso não exista ainda)
def create_table(conn):
try:
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS <table_name> (
id SERIAL PRIMARY KEY,
variavel TEXT NOT NULL,
valor INTEGER NOT NULL,
timestamp TEXT NOT NULL);
""");
conn.commit()
print('tabela criada')
cur.close()
except Exception as e:
print(f"Erro ao criar tabela: {e}")
#editando a tabela
def edit_table(conn):
try:
cur = conn.cursor()
cur.execute("""
ALTER TABLE <table_name>
ADD timestamp VARCHAR(30)
""")
conn.commit()
cur.close()
print('tabela editada')
except Exception as e:
print(f"Erro ao criar tabela: {e}")
#zerar a tabela
def delete_table(conn):
try:
cur = conn.cursor()
cur.execute("""
DELETE FROM <table_name>;
""")
conn.commit()
cur.close()
print('tabela zerada')
except Exception as e:
print(f"Erro ao zerar tabela: {e}")
#inserir dados manual
def insert_data(conn, nome, valor):
try:
cur = conn.cursor()
cur.execute("INSERT INTO <table_name> (variavel, valor) VALUES (%s, %s)", (nome, valor))
conn.commit()
cur.close()
except Exception as e:
print(f"Erro ao inserir dados: {e}")
#inserção com timeseries Insights Hub
def insert_timeseries(conn, data):
try:
cur = conn.cursor()
for item in data:
timestamp = datetime.fromisoformat(item['_time'].rstrip('Z'))
for key, value in item.items():
if key != '_time':
cur.execute("INSERT INTO <table_name> (variavel, valor, timestamp) VALUES (%s, %s, %s)", (key, value, timestamp))
conn.commit()
cur.close()
except Exception as e:
print(f"Erro ao inserir dados: {e}")
conn.rollback()
def read_data(conn):
try:
cur = conn.cursor()
cur.execute("SELECT * FROM <table_name>")
rows = cur.fetchall()
for row in rows:
print(row)
cur.close()
except Exception as e:
print(f"Erro ao ler dados: {e}")
Caso você queira fazer essa função utilizando o Insights Hub também, pode continuar o tutorial, mas caso não, pule para o próximo passo ``
(TUTORIAL EM ANDAMENTO)
Como não há tutorial ainda, o arquivo config.py
serve apenas para as APIs do Insights Hub.
- No arquivo
Procfile
é onde está indicado qual arquivo será usado para inicializar, no nosso caso omain.py
; - A variavel
data2
, é responsável para armazenar os dados aAPI GET
do Insights Hub. Troque esse endpoint caso queira; - A função
insert_timeseries
, serve para inserir dados requisitados do Insights Hub para o Postgresql (dados já tratados); - A função
scheduled_task
, serve para as tarefas de agendamento, ou seja, para o nosso caso, ele é usado para executar a tarefa de:ler dados do Insights Hub > Conectar no PostgreSQL > Inserir dados na Tabela > Retornar esses dados da tabela
Caso queira rodar apenas na primeira para criar a tabela, rode a última main
, e habilite as funções: create_table(conn)
, insert_timeseries(conn, data2)
, read_data(conn)
, conn.close()
. E então, dê o
cf push
if __name__ == "__main__":
conn = connect_to_db()
create_table(conn)
#delete_table(conn)
insert_timeseries(conn, data2)
read_data(conn)
conn.close()
- Para verificar se está tudo certo, rode o seguinte comando:
cf logs <app_name> --recent
- Na função
scheduled_task
, a gente pode editar o momento em que ela será executada:
if __name__ == "__main__":
conn = connect_to_db()
if conn is not None:
scheduler = BackgroundScheduler()
scheduler.add_job(scheduled_task, 'interval', minutes=3) # Executa a cada 3 minutos
scheduler.start()
# Desliga o agendador quando o aplicativo encerrar
atexit.register(lambda: scheduler.shutdown())
# Mantém o script em execução
while True:
pass
#create_table(conn)
insert_timeseries(conn, data2)
read_data(conn)
conn.close()
- E use o
cf push
- Ao ver que o app, foi publicado, verifique usando o
cf apps
: