Skip to content

Commit

Permalink
Merge pull request #1 from RJ-SMTR/pipelines1.0
Browse files Browse the repository at this point in the history
acrescenta tratamento para relatorio no projeto antigo
  • Loading branch information
borisaraujo authored Mar 21, 2024
2 parents 4f3206f + 3a78556 commit 80a3ca8
Showing 1 changed file with 73 additions and 2 deletions.
75 changes: 73 additions & 2 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,73 @@ def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(crontab(hour=11, minute=0),
daily_report.s(), name='Daily Report')

def get_data_old_project(**kwargs):

query = '''
query (
$name: String!,
$project: String!,
$exclude: String!,
$min_start_time: timestamptz!,
$max_start_time: timestamptz!
){
flow (
where: {
name: {_like: $name, _nlike: $exclude},
project: {name: {_eq: $project}},
}
){
id,
name,
is_schedule_active,
archived,
created,
version,
schedule,
flow_runs (where: {
scheduled_start_time: {
_gte: $min_start_time,
_lt: $max_start_time,
}
}){
id,
flow_id,
state,
scheduled_start_time,
idempotency_key
}
}
}
'''

args = {
'name': f'%{kwargs["name"]}%',
'exclude': kwargs['exclude'],
'project': kwargs['project_name'],
'min_start_time': kwargs['start_time'],
'max_start_time': kwargs['end_time']
}

client = Client(
api_server=kwargs['api_server'],
api_key=kwargs['api_key'],
tenant_id=kwargs['tenant_id'],
)

result = client.graphql(query=query, variables=args)

flows = pd.json_normalize(result['data']['flow'])
flows = flows.rename(columns={'id': 'flow_id'})
flow_runs = pd.DataFrame()
for _, flow in flows.iterrows():
df_flow_run = pd.DataFrame(flow['flow_runs'])
df_flow_run['flow_name'] = flow['name']
df_flow_run['version'] = flow['version']
flow_runs = pd.concat([flow_runs, df_flow_run])
return flows, flow_runs

def get_data(**kwargs):

query = '''
query (
$tenant_id: uuid!,
Expand Down Expand Up @@ -310,10 +375,16 @@ def generate_daily_report(now: datetime, config: dict):
'exclude': '%(subflow)%',
'start_time': start_time.isoformat(),
'end_time': now.isoformat(),
'name': config['name'],
}


flows, flow_runs = get_data(**query_args)
if config['project_name'] == 'main':
flows, flow_runs = get_data_old_project(**query_args)
elif config['project_name'] == 'production':
flows, flow_runs = get_data(**query_args)
else:
return pd.DataFrame()

if flow_runs.empty:
return flow_runs

Expand Down

0 comments on commit 80a3ca8

Please sign in to comment.