55# -*- coding: utf-8 -*-
66import datetime
77import json
8+ import re
89from pathlib import Path
910from uuid import uuid4
10- import re
1111
1212from intelmq .lib .bot import OutputBot
1313from intelmq .lib .exceptions import MissingDependencyError
14+ from intelmq .lib .mixins import CacheMixin
1415from intelmq .lib .utils import parse_relative
1516
1617try :
1920except ImportError :
2021 # catching SyntaxError because of https://github.com/MISP/PyMISP/issues/501
2122 MISPEvent = None
22- import_fail_reason = 'import'
23- except SyntaxError :
24- # catching SyntaxError because of https://github.com/MISP/PyMISP/issues/501
25- MISPEvent = None
26- import_fail_reason = 'syntax'
27-
23+ import_fail_reason = "import"
2824
29- # NOTE: This module is compatible with Python 3.6+
3025
31-
32- class MISPFeedOutputBot (OutputBot ):
26+ class MISPFeedOutputBot (OutputBot , CacheMixin ):
3327 """Generate an output in the MISP Feed format"""
28+
3429 interval_event : str = "1 hour"
30+ delay_save_event_count : int = None
3531 misp_org_name = None
3632 misp_org_uuid = None
3733 output_dir : str = "/opt/intelmq/var/lib/bots/mispfeed-output" # TODO: should be path
@@ -45,13 +41,8 @@ def check_output_dir(dirname):
4541 return True
4642
4743 def init (self ):
48- if MISPEvent is None and import_fail_reason == 'syntax' :
49- raise MissingDependencyError ("pymisp" ,
50- version = '>=2.4.117.3' ,
51- additional_text = "Python versions below 3.6 are "
52- "only supported by pymisp <= 2.4.119.1." )
53- elif MISPEvent is None :
54- raise MissingDependencyError ('pymisp' , version = '>=2.4.117.3' )
44+ if MISPEvent is None :
45+ raise MissingDependencyError ("pymisp" , version = ">=2.4.117.3" )
5546
5647 self .current_event = None
5748
@@ -71,59 +62,90 @@ def init(self):
7162 try :
7263 with (self .output_dir / '.current' ).open () as f :
7364 self .current_file = Path (f .read ())
74- self .current_event = MISPEvent ()
75- self .current_event .load_file (self .current_file )
76-
77- last_min_time , last_max_time = re .findall ('IntelMQ event (.*) - (.*)' , self .current_event .info )[0 ]
78- last_min_time = datetime .datetime .strptime (last_min_time , '%Y-%m-%dT%H:%M:%S.%f' )
79- last_max_time = datetime .datetime .strptime (last_max_time , '%Y-%m-%dT%H:%M:%S.%f' )
80- if last_max_time < datetime .datetime .now ():
81- self .min_time_current = datetime .datetime .now ()
82- self .max_time_current = self .min_time_current + self .timedelta
83- self .current_event = None
84- else :
85- self .min_time_current = last_min_time
86- self .max_time_current = last_max_time
65+
66+ if self .current_file .exists ():
67+ self .current_event = MISPEvent ()
68+ self .current_event .load_file (self .current_file )
69+
70+ last_min_time , last_max_time = re .findall (
71+ "IntelMQ event (.*) - (.*)" , self .current_event .info
72+ )[0 ]
73+ last_min_time = datetime .datetime .strptime (
74+ last_min_time , "%Y-%m-%dT%H:%M:%S.%f"
75+ )
76+ last_max_time = datetime .datetime .strptime (
77+ last_max_time , "%Y-%m-%dT%H:%M:%S.%f"
78+ )
79+ if last_max_time < datetime .datetime .now ():
80+ self .min_time_current = datetime .datetime .now ()
81+ self .max_time_current = self .min_time_current + self .timedelta
82+ self .current_event = None
83+ else :
84+ self .min_time_current = last_min_time
85+ self .max_time_current = last_max_time
8786 except :
88- self .logger .exception ("Loading current event %s failed. Skipping it." , self .current_event )
87+ self .logger .exception (
88+ "Loading current event %s failed. Skipping it." , self .current_event
89+ )
8990 self .current_event = None
9091 else :
9192 self .min_time_current = datetime .datetime .now ()
9293 self .max_time_current = self .min_time_current + self .timedelta
9394
9495 def process (self ):
95-
9696 if not self .current_event or datetime .datetime .now () > self .max_time_current :
9797 self .min_time_current = datetime .datetime .now ()
9898 self .max_time_current = self .min_time_current + self .timedelta
9999 self .current_event = MISPEvent ()
100- self .current_event .info = ('IntelMQ event {begin} - {end}'
101- '' .format (begin = self .min_time_current .isoformat (),
102- end = self .max_time_current .isoformat ()))
100+ self .current_event .info = "IntelMQ event {begin} - {end}" "" .format (
101+ begin = self .min_time_current .isoformat (),
102+ end = self .max_time_current .isoformat (),
103+ )
103104 self .current_event .set_date (datetime .date .today ())
104105 self .current_event .Orgc = self .misp_org
105106 self .current_event .uuid = str (uuid4 ())
106- self .current_file = self .output_dir / f' { self .current_event .uuid } .json'
107- with (self .output_dir / ' .current' ).open ('w' ) as f :
107+ self .current_file = self .output_dir / f" { self .current_event .uuid } .json"
108+ with (self .output_dir / " .current" ).open ("w" ) as f :
108109 f .write (str (self .current_file ))
109110
111+ # On startup or when timeout occurs, clean the queue to ensure we do not
112+ # keep events forever because there was not enough generated
113+ self ._generate_feed ()
114+
110115 event = self .receive_message ().to_dict (jsondict_as_string = True )
111116
112- obj = self .current_event .add_object (name = 'intelmq_event' )
113- for object_relation , value in event .items ():
117+ cache_size = None
118+ if self .delay_save_event_count :
119+ cache_size = self .cache_put (event )
120+
121+ if cache_size is None :
122+ self ._generate_feed (event )
123+ elif cache_size >= self .delay_save_event_count :
124+ self ._generate_feed ()
125+
126+ self .acknowledge_message ()
127+
128+ def _add_message_to_feed (self , message : dict ):
129+ obj = self .current_event .add_object (name = "intelmq_event" )
130+ for object_relation , value in message .items ():
114131 try :
115132 obj .add_attribute (object_relation , value = value )
116133 except NewAttributeError :
117134 # This entry isn't listed in the harmonization file, ignoring.
118135 pass
119136
120- feed_output = self .current_event .to_feed (with_meta = False )
137+ def _generate_feed (self , message : dict = None ):
138+ if message :
139+ self ._add_message_to_feed (message )
140+
141+ while message := self .cache_pop ():
142+ self ._add_message_to_feed (message )
121143
122- with self .current_file .open ('w' ) as f :
144+ feed_output = self .current_event .to_feed (with_meta = False )
145+ with self .current_file .open ("w" ) as f :
123146 json .dump (feed_output , f )
124147
125148 feed_meta_generator (self .output_dir )
126- self .acknowledge_message ()
127149
128150 @staticmethod
129151 def check (parameters ):
0 commit comments