Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cortexutils/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def build_taxonomy(self, level, namespace, predicate, value):
'namespace': namespace,
'predicate': predicate,
'value': value
}
}

def summary(self, raw):
"""Returns a summary, needed for 'short.html' template. Overwrite it for your needs!
Expand Down
107 changes: 107 additions & 0 deletions cortexutils/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class Extractor:
def __init__(self, ignore=None):
self.ignore = ignore
self.regex = self.__init_regex()
self.ftregex = self.__init_ft_regex()

@staticmethod
def __init_regex():
Expand Down Expand Up @@ -117,6 +118,87 @@ def __init_regex():

return regex

@staticmethod
def __init_ft_regex():

"""
Returns compiled full text regex list.

:return: List of {type, regex} dicts
:rtype: list
"""

#### Generic regexes

# IPv4
ftregex = [{
'types': ['ip'],
'regex': re.compile(r'(?:^|\D)((?:25[0-5]|2[0-4]\d|[1]\d\d|[1-9]\d|[0-9])\.(?:25[0-5]|2[0-4]\d|[1]\d\d|[1-9]\d|[0-9])\.(?:25[0-5]|2[0-4]\d|[1]\d\d|[1-9]\d|[0-9])\.(?:25[0-5]|2[0-4]\d|[1]\d\d|[1-9]\d|[0-9]))(?:\D|$)', re.MULTILINE)
}]

# URL
ftregex.append({
'types': ['url','fqdn','domain','uri_path'],
'regex': re.compile(r'((?:http|https):\/\/((?:(?:.*?)\.)?(.*?(?:\.\w+)+))\/?([a-zA-Z0-9\/\-\_\.\~\=\?]+\??)?)', re.MULTILINE)
})

# mail
ftregex.append({
'types': ['mail','domain'],
'regex': re.compile(r'((?:[a-zA-Z0-9\/\-\_\.\+]+)@{1}([a-zA-Z0-9\-\_]+\.[a-zA-Z0-9\-\_\.]+)+)', re.MULTILINE)
})

### Mail Specific regexes

return ftregex

@staticmethod
def __init_analyzer_regex():

"""
Returns False when the analyzer has no analyzer specific regexes.

:return: Empty list
:rtype: list
"""

empty_list = []

return empty_list

def __findftmatch(self, value):
"""Checks if the given value is contains regexes

:param value: The value to check
:type value: str or number
:return: Data type of value, if known, else empty string
:rtype: str
"""
self.found_observables = []
if isinstance(value, (str, unicode)):
self.regexpack = self.ftregex + self.asregex
Comment thread
nadouani marked this conversation as resolved.
Outdated
for r in self.regexpack:
self.hits = re.findall(r.get('regex'), value)
if len(self.hits) > 0:
for found_observable in self.hits:
if isinstance(found_observable, tuple):
i = 0
for groups in found_observable:
self.found_observables.append({
'type': r.get('types')[i],
'value': found_observable[i]
})
i += 1
else:
self.found_observables.append({
'type': r.get('types')[0],
'value': found_observable
})
if len(self.found_observables) > 0:
return self.found_observables
else:
return ''

def __checktype(self, value):
"""Checks if the given value is a known datatype

Expand Down Expand Up @@ -167,6 +249,10 @@ def check_iterable(self, iterable):
'type': dt,
'value': iterable
})
#Check full text for regex matches
matches = self.__findftmatch(iterable)
if len(matches) > 0:
Comment thread
nadouani marked this conversation as resolved.
results.extend(matches)
elif isinstance(iterable, list):
for item in iterable:
if isinstance(item, list) or isinstance(item, dict):
Expand All @@ -178,6 +264,10 @@ def check_iterable(self, iterable):
'type': dt,
'value': item
})
#Check full text for regex matches
matches = self.__findftmatch(item)
if len(matches) > 0:
Comment thread
nadouani marked this conversation as resolved.
results.extend(matches)
elif isinstance(iterable, dict):
for _, item in iterable.items():
if isinstance(item, list) or isinstance(item, dict):
Expand All @@ -189,7 +279,24 @@ def check_iterable(self, iterable):
'type': dt,
'value': item
})
#Check full text for regex matches
matches = self.__findftmatch(item)
if len(matches) > 0:
Comment thread
nadouani marked this conversation as resolved.
results.extend(matches)
else:
raise TypeError('Not supported type.')

#Deduplicate results for a cleaner result
results = self.deduplicate(results)
return results

def deduplicate(self, list_of_objects):
dedup_list = []
for object in list_of_objects:
present = False
for new_object in dedup_list:
if object['type'] == new_object['type'] and object['value'] == new_object['value']:
present = True
if not present:
dedup_list.append(object)
return dedup_list