Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
#### Experts

#### Outputs
- `intelmq.bots.outputs.misp.output_feed`: Handle failures if saved current event wasn't saved or is incorrect (PR by Kamil Mankowski).
- `intelmq.bots.outputs.misp.output_feed`:
- Handle failures if saved current event wasn't saved or is incorrect (PR by Kamil Mankowski).
- Allow saving messages in bulks instead of refreshing the feed immediately (PR#2505 by Kamil Mankowski).
- `intelmq.bots.outputs.smtp_batch.output`: Documentation on multiple recipients added (PR#2501 by Edvard Rejthar).

### Documentation
Expand Down
7 changes: 7 additions & 0 deletions docs/user/bots.md
Original file line number Diff line number Diff line change
Expand Up @@ -4595,6 +4595,13 @@ The PyMISP library >= 2.4.119.1 is required, see
() The output bot creates one event per each interval, all data in this time frame is part of this event. Default "1
hour", string.

**`bulk_save_count`**

(optional, int) If set to a non-0 value, the bot won't refresh the MISP feed immediately, but will cache
incoming messages until the given number of them. Use it if your bot proceeds a high number of messages
and constant saving to the disk is a problem. Reloading or restarting bot as well as generating
a new MISP event based on `interval_event` triggers saving regardless of the cache size.

**Usage in MISP**

Configure the destination directory of this feed as feed in MISP, either as local location, or served via a web server.
Expand Down
108 changes: 66 additions & 42 deletions intelmq/bots/outputs/misp/output_feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
# -*- coding: utf-8 -*-
import datetime
import json
import re
from pathlib import Path
from uuid import uuid4
import re

from intelmq.lib.bot import OutputBot
from intelmq.lib.exceptions import MissingDependencyError
from intelmq.lib.mixins import CacheMixin
from intelmq.lib.utils import parse_relative

try:
Expand All @@ -19,19 +20,14 @@
except ImportError:
# catching SyntaxError because of https://github.com/MISP/PyMISP/issues/501
MISPEvent = None
import_fail_reason = 'import'
except SyntaxError:
# catching SyntaxError because of https://github.com/MISP/PyMISP/issues/501
MISPEvent = None
import_fail_reason = 'syntax'

import_fail_reason = "import"

# NOTE: This module is compatible with Python 3.6+


class MISPFeedOutputBot(OutputBot):
class MISPFeedOutputBot(OutputBot, CacheMixin):
"""Generate an output in the MISP Feed format"""

interval_event: str = "1 hour"
bulk_save_count: int = None
misp_org_name = None
misp_org_uuid = None
output_dir: str = "/opt/intelmq/var/lib/bots/mispfeed-output" # TODO: should be path
Expand All @@ -45,13 +41,8 @@ def check_output_dir(dirname):
return True

def init(self):
if MISPEvent is None and import_fail_reason == 'syntax':
raise MissingDependencyError("pymisp",
version='>=2.4.117.3',
additional_text="Python versions below 3.6 are "
"only supported by pymisp <= 2.4.119.1.")
elif MISPEvent is None:
raise MissingDependencyError('pymisp', version='>=2.4.117.3')
if MISPEvent is None:
raise MissingDependencyError("pymisp", version=">=2.4.117.3")

self.current_event = None

Expand All @@ -71,59 +62,92 @@ def init(self):
try:
with (self.output_dir / '.current').open() as f:
self.current_file = Path(f.read())
self.current_event = MISPEvent()
self.current_event.load_file(self.current_file)

last_min_time, last_max_time = re.findall('IntelMQ event (.*) - (.*)', self.current_event.info)[0]
last_min_time = datetime.datetime.strptime(last_min_time, '%Y-%m-%dT%H:%M:%S.%f')
last_max_time = datetime.datetime.strptime(last_max_time, '%Y-%m-%dT%H:%M:%S.%f')
if last_max_time < datetime.datetime.now():
self.min_time_current = datetime.datetime.now()
self.max_time_current = self.min_time_current + self.timedelta
self.current_event = None
else:
self.min_time_current = last_min_time
self.max_time_current = last_max_time

if self.current_file.exists():
self.current_event = MISPEvent()
self.current_event.load_file(self.current_file)

last_min_time, last_max_time = re.findall(
"IntelMQ event (.*) - (.*)", self.current_event.info
)[0]
last_min_time = datetime.datetime.strptime(
last_min_time, "%Y-%m-%dT%H:%M:%S.%f"
)
last_max_time = datetime.datetime.strptime(
last_max_time, "%Y-%m-%dT%H:%M:%S.%f"
)
if last_max_time < datetime.datetime.now():
self.min_time_current = datetime.datetime.now()
self.max_time_current = self.min_time_current + self.timedelta
self.current_event = None
else:
self.min_time_current = last_min_time
self.max_time_current = last_max_time
except:
self.logger.exception("Loading current event %s failed. Skipping it.", self.current_event)
self.logger.exception(
"Loading current event %s failed. Skipping it.", self.current_event
)
self.current_event = None
else:
self.min_time_current = datetime.datetime.now()
self.max_time_current = self.min_time_current + self.timedelta

def process(self):

if not self.current_event or datetime.datetime.now() > self.max_time_current:
self.min_time_current = datetime.datetime.now()
self.max_time_current = self.min_time_current + self.timedelta
self.current_event = MISPEvent()
self.current_event.info = ('IntelMQ event {begin} - {end}'
''.format(begin=self.min_time_current.isoformat(),
end=self.max_time_current.isoformat()))
self.current_event.info = "IntelMQ event {begin} - {end}" "".format(
begin=self.min_time_current.isoformat(),
end=self.max_time_current.isoformat(),
)
self.current_event.set_date(datetime.date.today())
self.current_event.Orgc = self.misp_org
self.current_event.uuid = str(uuid4())
self.current_file = self.output_dir / f'{self.current_event.uuid}.json'
with (self.output_dir / '.current').open('w') as f:
self.current_file = self.output_dir / f"{self.current_event.uuid}.json"
with (self.output_dir / ".current").open("w") as f:
f.write(str(self.current_file))

# On startup or when timeout occurs, clean the queue to ensure we do not
# keep events forever because there was not enough generated
self._generate_feed()

event = self.receive_message().to_dict(jsondict_as_string=True)

obj = self.current_event.add_object(name='intelmq_event')
for object_relation, value in event.items():
cache_size = None
if self.bulk_save_count:
cache_size = self.cache_put(event)

if cache_size is None:
self._generate_feed(event)
elif cache_size >= self.bulk_save_count:
self._generate_feed()

self.acknowledge_message()

def _add_message_to_feed(self, message: dict):
obj = self.current_event.add_object(name="intelmq_event")
for object_relation, value in message.items():
try:
obj.add_attribute(object_relation, value=value)
except NewAttributeError:
# This entry isn't listed in the harmonization file, ignoring.
pass

feed_output = self.current_event.to_feed(with_meta=False)
def _generate_feed(self, message: dict = None):
if message:
self._add_message_to_feed(message)

message = self.cache_pop()
while message:
self._add_message_to_feed(message)
message = self.cache_pop()

with self.current_file.open('w') as f:
feed_output = self.current_event.to_feed(with_meta=False)
with self.current_file.open("w") as f:
json.dump(feed_output, f)

feed_meta_generator(self.output_dir)
self.acknowledge_message()

@staticmethod
def check(parameters):
Expand Down
4 changes: 4 additions & 0 deletions intelmq/lib/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,10 @@ def catch_shutdown():
def harmonization(self):
return self._harmonization

@property
def bot_id(self):
return self.__bot_id_full

def __handle_sigterm_signal(self, signum: int, stack: Optional[object]):
"""
Calls when a SIGTERM is received. Stops the bot.
Expand Down
29 changes: 27 additions & 2 deletions intelmq/lib/mixins/cache.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,29 @@
""" CacheMixin for IntelMQ
"""CacheMixin for IntelMQ

SPDX-FileCopyrightText: 2021 Sebastian Waldbauer
SPDX-License-Identifier: AGPL-3.0-or-later

CacheMixin is used for caching/storing data in redis.
"""

import json
from typing import Any, Optional
import redis
import intelmq.lib.utils as utils


class CacheMixin:
"""Provides caching possibilities for bots

For key-value cache, use methods:
cache_exists
cache_get
cache_set

To store dict elements in a cache queue named after bot id, use methods:
cache_put
cache_pop
"""
__redis: redis.Redis = None
redis_cache_host: str = "127.0.0.1"
redis_cache_port: int = 6379
Expand All @@ -31,7 +43,9 @@ def __init__(self, **kwargs):
"socket_timeout": 5,
}

self.__redis = redis.Redis(db=self.redis_cache_db, password=self.redis_cache_password, **kwargs)
self.__redis = redis.Redis(
db=self.redis_cache_db, password=self.redis_cache_password, **kwargs
)
super().__init__()

def cache_exists(self, key: str):
Expand All @@ -51,6 +65,17 @@ def cache_set(self, key: str, value: Any, ttl: Optional[int] = None):
if self.redis_cache_ttl:
self.__redis.expire(key, self.redis_cache_ttl)

def cache_put(self, value: dict) -> int:
# Returns the length of the list after pushing
size = self.__redis.lpush(self.bot_id, json.dumps(value))
return size

def cache_pop(self) -> dict:
data = self.__redis.rpop(self.bot_id)
if data is None:
return None
return json.loads(data)

def cache_flush(self):
"""
Flushes the currently opened database by calling FLUSHDB.
Expand Down
Loading