Skip to content

Commit

Permalink
[backend/frontend] add configuration to have better chances to get RS…
Browse files Browse the repository at this point in the history
…S Feed contents (#8736)

Co-authored-by: Jeremy Cloarec <jeremy.cloarec@filigran.io>
  • Loading branch information
aHenryJard and JeremyCloarec authored Jan 24, 2025
1 parent 22acaa8 commit bcf47c5
Show file tree
Hide file tree
Showing 13 changed files with 115 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,17 @@ const IngestionCsv = () => {
},
uri: {
label: 'URL',
width: '30%',
width: '25%',
isSortable: true,
},
ingestion_running: {
label: 'Status',
width: '20%',
width: '15%',
isSortable: false,
},
last_execution_date: {
label: 'Last run',
width: '15%',
isSortable: false,
},
current_state_hash: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,17 @@ const IngestionRss = () => {
},
uri: {
label: 'URL',
width: '30%',
width: '25%',
isSortable: true,
},
ingestion_running: {
label: 'Status',
width: '20%',
width: '15%',
isSortable: false,
},
last_execution_date: {
label: 'Last run',
width: '15%',
isSortable: false,
},
current_state_date: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ const ingestionCsvLineFragment = graphql`
uri
ingestion_running
current_state_hash
last_execution_date
}
`;

Expand All @@ -70,7 +71,7 @@ export const IngestionCsvLineComponent: FunctionComponent<IngestionCsvLineProps>
paginationOptions,
}) => {
const classes = useStyles();
const { t_i18n } = useFormatter();
const { t_i18n, fldt } = useFormatter();
const data = useFragment(ingestionCsvLineFragment, node);
const [stateHash, setStateHash] = useState(data.current_state_hash ? data.current_state_hash : '-');
return (
Expand Down Expand Up @@ -106,6 +107,11 @@ export const IngestionCsvLineComponent: FunctionComponent<IngestionCsvLineProps>
<div
className={classes.bodyItem}
style={{ width: dataColumns.current_state_hash.width }}
>
{fldt(data.last_execution_date) || '-'}
</div>
<div
className={classes.bodyItem}
>
{stateHash}
</div>
Expand Down Expand Up @@ -185,11 +191,21 @@ export const IngestionCsvLineDummy = ({ dataColumns }: { dataColumns: DataColumn
height="100%"
/>
</div>
<div
className={classes.bodyItem}
>
<Skeleton
animation="wave"
variant="rectangular"
width={100}
height="100%"
/>
</div>
</div>
}
/>
<ListItemSecondaryAction classes={{ root: classes.itemIconDisabled }}>
<MoreVert />
<MoreVert/>
</ListItemSecondaryAction>
</ListItem>
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,17 @@ class IngestionRssLineLineComponent extends Component {
status={!!node.ingestion_running}
/>
</div>
<div
className={classes.bodyItem}
style={{ width: dataColumns.last_execution_date.width }}
>
{nsdt(node.last_execution_date) || '-'}
</div>
<div
className={classes.bodyItem}
style={{ width: dataColumns.current_state_date.width }}
>
{nsdt(node.current_state_date)}
{nsdt(node.current_state_date) || '-'}
</div>
</div>
}
Expand Down Expand Up @@ -141,6 +147,7 @@ const IngestionRssLineFragment = createFragmentContainer(
uri
ingestion_running
current_state_date
last_execution_date
}
`,
},
Expand Down Expand Up @@ -200,6 +207,17 @@ class IngestionRssDummyComponent extends Component {
height="100%"
/>
</div>
<div
className={classes.bodyItem}
style={{ width: dataColumns.last_execution_date.width }}
>
<Skeleton
animation="wave"
variant="rectangular"
width={100}
height="100%"
/>
</div>
<div
className={classes.bodyItem}
style={{ width: dataColumns.current_state_date.width }}
Expand All @@ -215,7 +233,7 @@ class IngestionRssDummyComponent extends Component {
}
/>
<ListItemSecondaryAction classes={{ root: classes.itemIconDisabled }}>
<MoreVert />
<MoreVert/>
</ListItemSecondaryAction>
</ListItem>
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11447,6 +11447,7 @@ type IngestionRss implements InternalObject & BasicObject {
defaultMarkingDefinitions: [MarkingDefinition]
report_types: [String!]
current_state_date: DateTime
last_execution_date: DateTime
ingestion_running: Boolean
}

Expand Down Expand Up @@ -11608,6 +11609,7 @@ type IngestionCsv implements InternalObject & BasicObject {
ingestion_running: Boolean
current_state_hash: String
current_state_date: DateTime
last_execution_date: DateTime
markings: [String!]
}

Expand Down
9 changes: 8 additions & 1 deletion opencti-platform/opencti-graphql/config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,14 @@
"ingestion_manager": {
"enabled": true,
"lock_key": "ingestion_manager_lock",
"interval": 30000
"interval": 30000,
"rss_feed": {
"min_interval_minutes": 5,
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:120.0) Gecko/20100101 Firefox/120.0"
},
"csv_feed": {
"min_interval_minutes": 5
}
},
"retention_manager": {
"enabled": true,
Expand Down
4 changes: 4 additions & 0 deletions opencti-platform/opencti-graphql/src/generated/graphql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10956,6 +10956,7 @@ export type IngestionCsv = BasicObject & InternalObject & {
entity_type: Scalars['String']['output'];
id: Scalars['ID']['output'];
ingestion_running?: Maybe<Scalars['Boolean']['output']>;
last_execution_date?: Maybe<Scalars['DateTime']['output']>;
markings?: Maybe<Array<Scalars['String']['output']>>;
name: Scalars['String']['output'];
parent_types: Array<Maybe<Scalars['String']['output']>>;
Expand Down Expand Up @@ -11010,6 +11011,7 @@ export type IngestionRss = BasicObject & InternalObject & {
entity_type: Scalars['String']['output'];
id: Scalars['ID']['output'];
ingestion_running?: Maybe<Scalars['Boolean']['output']>;
last_execution_date?: Maybe<Scalars['DateTime']['output']>;
name: Scalars['String']['output'];
parent_types: Array<Scalars['String']['output']>;
report_types?: Maybe<Array<Scalars['String']['output']>>;
Expand Down Expand Up @@ -35964,6 +35966,7 @@ export type IngestionCsvResolvers<ContextType = any, ParentType extends Resolver
entity_type?: Resolver<ResolversTypes['String'], ParentType, ContextType>;
id?: Resolver<ResolversTypes['ID'], ParentType, ContextType>;
ingestion_running?: Resolver<Maybe<ResolversTypes['Boolean']>, ParentType, ContextType>;
last_execution_date?: Resolver<Maybe<ResolversTypes['DateTime']>, ParentType, ContextType>;
markings?: Resolver<Maybe<Array<ResolversTypes['String']>>, ParentType, ContextType>;
name?: Resolver<ResolversTypes['String'], ParentType, ContextType>;
parent_types?: Resolver<Array<Maybe<ResolversTypes['String']>>, ParentType, ContextType>;
Expand Down Expand Up @@ -35996,6 +35999,7 @@ export type IngestionRssResolvers<ContextType = any, ParentType extends Resolver
entity_type?: Resolver<ResolversTypes['String'], ParentType, ContextType>;
id?: Resolver<ResolversTypes['ID'], ParentType, ContextType>;
ingestion_running?: Resolver<Maybe<ResolversTypes['Boolean']>, ParentType, ContextType>;
last_execution_date?: Resolver<Maybe<ResolversTypes['DateTime']>, ParentType, ContextType>;
name?: Resolver<ResolversTypes['String'], ParentType, ContextType>;
parent_types?: Resolver<Array<ResolversTypes['String']>, ParentType, ContextType>;
report_types?: Resolver<Maybe<Array<ResolversTypes['String']>>, ParentType, ContextType>;
Expand Down
52 changes: 41 additions & 11 deletions opencti-platform/opencti-graphql/src/manager/ingestionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import { v4 as uuidv4 } from 'uuid';
import { clearIntervalAsync, setIntervalAsync } from 'set-interval-async/fixed';
import type { SetIntervalAsyncTimer } from 'set-interval-async/fixed';
import type { Moment } from 'moment';
import { AxiosError } from 'axios';
import { lockResource } from '../database/redis';
import conf, { booleanConf, logApp } from '../config/conf';
import { TYPE_LOCK_ERROR, UnsupportedError } from '../config/errors';
import { executionContext, SYSTEM_USER } from '../utils/access';
import { type GetHttpClient, getHttpClient, OpenCTIHeaders } from '../utils/http-client';
import { isEmptyField, isNotEmptyField } from '../database/utils';
import { FROM_START_STR, now, sanitizeForMomentParsing, utcDate } from '../utils/format';
import { FROM_START_STR, now, sanitizeForMomentParsing, sinceNowInMinutes, utcDate } from '../utils/format';
import { generateStandardId } from '../schema/identifier';
import { ENTITY_TYPE_CONTAINER_REPORT } from '../schema/stixDomainObject';
import { pushToWorkerForConnector } from '../database/rabbitmq';
Expand Down Expand Up @@ -47,6 +48,9 @@ import type { CsvMapperParsed } from '../modules/internal/csvMapper/csvMapper-ty
// If the lock is free, every API as the right to take it.
const SCHEDULE_TIME = conf.get('ingestion_manager:interval') || 30000;
const INGESTION_MANAGER_KEY = conf.get('ingestion_manager:lock_key') || 'ingestion_manager_lock';
const RSS_FEED_MIN_INTERVAL_MINUTES = conf.get('ingestion_manager:rss_feed:min_interval_minutes') || 5;
const RSS_FEED_USER_AGENT = conf.get('ingestion_manager:rss_feed:user_agent') || 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:120.0) Gecko/20100101 Firefox/120.0';
const CSV_FEED_MIN_INTERVAL_MINUTES = conf.get('ingestion_manager:csv_feed:min_interval_minutes') || 5;

let running = false;

Expand All @@ -61,6 +65,11 @@ const asArray = (data: unknown) => {
return [];
};

const shouldExecuteIngestion = (ingestion: BasicStoreEntityIngestionRss | BasicStoreEntityIngestionCsv, min_interval_minutes: number) => {
const { last_execution_date } = ingestion;
return !last_execution_date || sinceNowInMinutes(last_execution_date) >= min_interval_minutes;
};

interface UpdateInfo {
state?: any
buffering?: boolean
Expand Down Expand Up @@ -176,7 +185,7 @@ const rssItemV2Convert = (turndownService: TurndownService, channel: RssElement,
const rssHttpGetter = (): Getter => {
const httpClientOptions: GetHttpClient = {
responseType: 'text',
headers: { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:120.0) Gecko/20100101 Firefox/120.0' }
headers: { 'User-Agent': RSS_FEED_USER_AGENT }
};
const httpClient = getHttpClient(httpClientOptions);
return async (uri: string) => {
Expand Down Expand Up @@ -242,11 +251,14 @@ const rssDataHandler = async (context: AuthContext, httpRssGet: Getter, turndown
await pushBundleToConnectorQueue(context, ingestion, bundle);
// Update the state
lastPubDate = R.last(items)?.pubDate;
await patchRssIngestion(context, SYSTEM_USER, ingestion.internal_id, { current_state_date: lastPubDate });
logApp.info('[OPENCTI-MODULE] lastPubDate:', { lastPubDate });
await patchRssIngestion(context, SYSTEM_USER, ingestion.internal_id, { current_state_date: lastPubDate, last_execution_date: now() });
// Patch the related connector
const state = { current_state_date: lastPubDate };
await updateBuiltInConnectorInfo(context, ingestion.user_id, ingestion.id, { state });
} else {
logApp.info('[OPENCTI-MODULE] Rss ingestion execution done, but no new item to ingest.');
await patchRssIngestion(context, SYSTEM_USER, ingestion.internal_id, { last_execution_date: now() });
await updateBuiltInConnectorInfo(context, ingestion.user_id, ingestion.id);
}
};
Expand All @@ -263,16 +275,26 @@ const rssExecutor = async (context: AuthContext, turndownService: TurndownServic
const ingestionPromises = [];
for (let i = 0; i < ingestions.length; i += 1) {
const ingestion = ingestions[i];
// If ingestion have remaining messages in the queue, dont fetch any new data
// If ingestion have remaining messages in the queue, or if last execution was done before RSS_FEED_MIN_INTERVAL_MINUTES minutes, dont fetch any new data
const { messages_number, messages_size } = await queueDetails(connectorIdFromIngestId(ingestion.id));
if (messages_number === 0) {
if (messages_number === 0 && shouldExecuteIngestion(ingestion, RSS_FEED_MIN_INTERVAL_MINUTES)) {
const ingestionPromise = rssDataHandler(context, httpGet, turndownService, ingestion)
.catch((e) => {
logApp.warn('[OPENCTI-MODULE] INGESTION - RSS ingestion execution', { cause: e, name: ingestion.name });
if (e instanceof AxiosError) {
if (e?.response?.headers) {
if (e.response.headers['cf-mitigated']) {
logApp.warn(`[OPENCTI-MODULE] INGESTION Rss - Cloudflare challenge fail for ${ingestion.uri}`);
}
}
}
// In case of error we need also to take in account the min_interval_minutes with last_execution_date update.
patchRssIngestion(context, SYSTEM_USER, ingestion.internal_id, { last_execution_date: now() });
});
ingestionPromises.push(ingestionPromise);
} else {
// Update the state
logApp.info(`[OPENCTI-MODULE] INGESTION Rss, skipping ${ingestion.name} - queue already filled with messages (${messages_number}) or last run is more recent than ${RSS_FEED_MIN_INTERVAL_MINUTES} minutes.`);
const ingestionPromise = updateBuiltInConnectorInfo(context, ingestion.user_id, ingestion.id, { buffering: true, messages_size });
ingestionPromises.push(ingestionPromise);
}
Expand Down Expand Up @@ -492,8 +514,8 @@ export const processCsvLines = async (
objectsInBundleCount = objectCount;
await reportExpectation(context, ingestionUser, work.id);// csv file ends = 1 operation done.

logApp.info(`[OPENCTI-MODULE] INGESTION - Sent: ${bundleCount} bundles for ${objectsInBundleCount} objects.`);
const state = { current_state_hash: hashedIncomingData, added_after_start: utcDate(addedLast) };
logApp.info(`[OPENCTI-MODULE] INGESTION Csv - Sent: ${bundleCount} bundles for ${objectsInBundleCount} objects.`);
const state = { current_state_hash: hashedIncomingData, added_after_start: utcDate(addedLast), last_execution_date: now() };
await patchCsvIngestion(context, SYSTEM_USER, ingestion.internal_id, state);
await updateBuiltInConnectorInfo(context, ingestion.user_id, ingestion.id, { state });
}
Expand All @@ -505,8 +527,14 @@ const csvDataHandler = async (context: AuthContext, ingestion: BasicStoreEntityI
const csvMapper = await findById(context, user, ingestion.csv_mapper_id);
const csvMapperParsed = parseCsvMapper(csvMapper);
csvMapperParsed.user_chosen_markings = ingestion.markings ?? [];
const { csvLines, addedLast } = await fetchCsvFromUrl(csvMapperParsed, ingestion);
await processCsvLines(context, ingestion, csvMapperParsed, csvLines, addedLast);
try {
const { csvLines, addedLast } = await fetchCsvFromUrl(csvMapperParsed, ingestion);
await processCsvLines(context, ingestion, csvMapperParsed, csvLines, addedLast);
} catch (e: any) {
logApp.error(`[OPENCTI-MODULE] INGESTION Csv - Error trying to fetch csv feed for: ${ingestion.name}`);
logApp.error(e, { ingestion });
throw e;
}
};

const csvExecutor = async (context: AuthContext) => {
Expand All @@ -520,16 +548,18 @@ const csvExecutor = async (context: AuthContext) => {
const ingestionPromises = [];
for (let i = 0; i < ingestions.length; i += 1) {
const ingestion = ingestions[i];
// If ingestion have remaining messages in the queue, dont fetch any new data
const { messages_number, messages_size } = await queueDetails(connectorIdFromIngestId(ingestion.id));
if (messages_number === 0) {
if (messages_number === 0 && shouldExecuteIngestion(ingestion, CSV_FEED_MIN_INTERVAL_MINUTES)) {
const ingestionPromise = csvDataHandler(context, ingestion)
.catch((e) => {
logApp.warn('[OPENCTI-MODULE] INGESTION - CSV ingestion execution', { cause: e, name: ingestion.name });
// In case of error we need also to take in account the min_interval_minutes with last_execution_date update.
patchCsvIngestion(context, SYSTEM_USER, ingestion.internal_id, { last_execution_date: now() });
});
ingestionPromises.push(ingestionPromise);
} else {
// Update the state
logApp.info(`[OPENCTI-MODULE] INGESTION csv, skipping ${ingestion.name} - queue already filled with messages (${messages_number}) or last run is more recent than ${RSS_FEED_MIN_INTERVAL_MINUTES} minutes.`);
const ingestionPromise = updateBuiltInConnectorInfo(context, ingestion.user_id, ingestion.id, { buffering: true, messages_size });
ingestionPromises.push(ingestionPromise);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type IngestionCsv implements InternalObject & BasicObject {
ingestion_running: Boolean
current_state_hash: String
current_state_date: DateTime
last_execution_date: DateTime
markings: [String!]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const INGESTION_CSV_DEFINITION: ModuleDefinition<StoreEntityIngestionCsv, StixIn
{ name: 'current_state_hash', label: 'Current_state_hash', type: 'string', format: 'short', mandatoryType: 'external', editDefault: true, multiple: false, upsert: true, isFilterable: true },
{ name: 'markings', label: 'Markings', type: 'string', format: 'short', mandatoryType: 'external', editDefault: false, multiple: true, upsert: true, isFilterable: false },
{ name: 'authentication_type', label: 'Authentication type', type: 'string', format: 'short', mandatoryType: 'no', editDefault: false, multiple: false, upsert: true, isFilterable: true },
{ name: 'last_execution_date', label: 'Last execution date', type: 'date', mandatoryType: 'no', editDefault: false, multiple: false, upsert: false, isFilterable: true },
{ name: 'authentication_value', label: 'Authentication value', type: 'string', format: 'short', mandatoryType: 'no', editDefault: false, multiple: false, upsert: true, isFilterable: true },
],
relations: [],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type IngestionRss implements InternalObject & BasicObject {
defaultMarkingDefinitions: [MarkingDefinition]
report_types: [String!]
current_state_date: DateTime
last_execution_date: DateTime
ingestion_running: Boolean
}
enum IngestionRssOrdering {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const INGESTION_RSS_DEFINITION: ModuleDefinition<StoreEntityIngestionRss, StixIn
{ name: 'created_by_ref', label: 'Created by', type: 'string', format: 'short', mandatoryType: 'external', editDefault: true, multiple: false, upsert: true, isFilterable: false },
{ name: 'object_marking_refs', label: 'Marking', type: 'string', format: 'short', mandatoryType: 'external', editDefault: true, multiple: true, upsert: true, isFilterable: false },
{ name: 'current_state_date', label: 'Current state date', type: 'date', mandatoryType: 'no', editDefault: true, multiple: false, upsert: true, isFilterable: true },
{ name: 'last_execution_date', label: 'Last execution date', type: 'date', mandatoryType: 'no', editDefault: false, multiple: false, upsert: false, isFilterable: true },
{ name: 'ingestion_running', label: 'Ingestion running', type: 'boolean', mandatoryType: 'external', editDefault: true, multiple: false, upsert: true, isFilterable: true },
],
relations: [],
Expand Down
Loading

0 comments on commit bcf47c5

Please sign in to comment.