"""
Helper to search, retrieve and parse CVE's and CWE's.
Example:
CVE.find_all_cve('vim', 'vim')
CVE.find_cve('CVE-2019-20079')
CWE.find_all_cwe()
"""
import csv
import io
import time
import typing as tp
import zipfile
from datetime import datetime
import requests
import requests_cache # type: ignore
from packaging.version import LegacyVersion, Version
from packaging.version import parse as version_parse
from tabulate import tabulate
from varats.utils.settings import vara_cfg
[docs]class CVE:
"""
CVE representation with the major fields.
Mainly a data object to store everything. Uses the API at
https://cve.circl.lu/api/search/ to find entries.
"""
def __init__(
self, cve_id: str, score: float, published: datetime,
vector: tp.FrozenSet[str], references: tp.FrozenSet[str], summary: str,
vulnerable_versions: tp.FrozenSet[tp.Union[LegacyVersion, Version]]
) -> None:
self.__cve_id = cve_id
self.__score = score
self.__published = published
self.__vector = vector
self.__references = references
self.__summary = summary
self.__vulnerable_versions = vulnerable_versions
@property
def cve_id(self) -> str:
"""The CVE ID."""
return self.__cve_id
@property
def score(self) -> float:
"""The score of this CVE."""
return self.__score
@property
def published(self) -> datetime:
"""The date when this CVE was published."""
return self.__published
@property
def vector(self) -> tp.FrozenSet[str]:
"""The CVE vector."""
return self.__vector
@property
def references(self) -> tp.FrozenSet[str]:
"""A set of external references/urls."""
return self.__references
@property
def summary(self) -> str:
"""The summary of the CVE."""
return self.__summary
@property
def vulnerable_versions(
self
) -> tp.FrozenSet[tp.Union[LegacyVersion, Version]]:
"""The set of vulnerable version numbers."""
return self.__vulnerable_versions
@property
def url(self) -> str:
"""The URL to the Mitre entry."""
return f"https://cve.mitre.org/cgi-bin/cvename.cgi?name={self.cve_id}"
def __str__(self) -> str:
return tabulate([
["ID", self.cve_id],
["Score", self.score],
["Published", self.published],
["Vector", '/'.join(self.vector)],
["URL", self.url],
["Summary", self.summary[:128]],
],
tablefmt="grid")
def __repr__(self) -> str:
return self.__str__()
def __eq__(self, other: object) -> bool:
if not isinstance(other, CVE):
return NotImplemented
return self.cve_id == other.cve_id
def __hash__(self) -> int:
return hash(self.cve_id)
[docs]class CWE:
"""
CWE representation with the major fields.
Mainly a data object to store everything.
"""
def __init__(self, cwe_id: str, name: str, description: str) -> None:
self.__cwe_id = cwe_id
self.__name = name
self.__description = description
@property
def cwe_id(self) -> str:
"""The CWE ID."""
return self.__cwe_id
@property
def name(self) -> str:
"""The name of this CWE."""
return self.__name
@property
def description(self) -> str:
"""The CWE description."""
return self.__description
@property
def url(self) -> str:
"""The URL to the Mitre entry."""
id_num = self.cwe_id.split('-')[1]
return f'https://cwe.mitre.org/data/definitions/{id_num}.html'
def __str__(self) -> str:
return tabulate([
["ID", self.cwe_id],
["Name", self.name[:128]],
["URL", self.url],
["Description", self.description[:128]],
],
tablefmt="grid")
def __repr__(self) -> str:
return self.__str__()
def __eq__(self, other: object) -> bool:
if not isinstance(other, CWE):
return NotImplemented
return self.cwe_id == other.cwe_id
def __hash__(self) -> int:
return hash(self.cwe_id)
def __fetch_url(source_url: str) -> requests.Response:
response = requests.get(source_url)
# Sometimes the rate limit is hit so keep repeating
while response.status_code == 429:
time.sleep(3)
response = requests.get(source_url)
if response.status_code != 200:
raise ValueError(
f'Could not retrieve CVE information '
f'(Code: {response.status_code})!'
)
return response
def __fetch_cve_data(source_url: str) -> tp.Dict[str, tp.Any]:
return tp.cast(tp.Dict[str, tp.Any], __fetch_url(source_url).json())
def __parse_cve(cve_data: tp.Dict[str, tp.Any]) -> CVE:
vulnerable_configurations = cve_data.get('vulnerable_configuration', [])
if vulnerable_configurations and isinstance(
vulnerable_configurations[0], str
):
vulnerable_versions = frozenset([
version_parse(x.replace(':*', '').split(':')[-1])
for x in vulnerable_configurations
])
else:
vulnerable_versions = frozenset([
version_parse(x['title'].replace(':*', '').split(':')[-1])
for x in vulnerable_configurations
])
return CVE(
cve_id=cve_data.get('id', None),
score=cve_data.get('cvss', None),
published=datetime.strptime(
cve_data.get('Published', None), '%Y-%m-%dT%H:%M:%S'
),
vector=frozenset(cve_data.get('cvss-vector', '').split('/')),
references=cve_data.get('references', None),
summary=cve_data.get('summary', None),
vulnerable_versions=vulnerable_versions
)
[docs]def find_all_cve(vendor: str, product: str) -> tp.FrozenSet[CVE]:
"""
Find all CVE's for a given vendor and product combination.
Args:
vendor: vendor to search for
product: product to search for
Return:
a set of :class:`CVE` objects.
"""
if not vendor or not product:
raise ValueError('Missing a vendor or product to search CVE\'s for!')
response_data = __fetch_cve_data(
f'http://cve.circl.lu/api/search/{vendor}/{product}'
)
cve_list: tp.Set[CVE] = set()
for entry in response_data['results']:
try:
cve_list.add(__parse_cve(entry))
except KeyError as error_msg:
cve_id = entry.get('id')
print(f'Error parsing {cve_id}: {error_msg}!')
return frozenset(cve_list)
[docs]def find_cve(cve_id: str) -> CVE:
"""
Find a CVE by its ID (CVE-YYYY-XXXXX).
Args:
cve_id: CVE id to search for
Return:
a CVE object
"""
if not cve_id:
raise ValueError('Missing a CVE ID!')
cve_data = __fetch_cve_data(f'https://cve.circl.lu/api/cve/{cve_id}')
if not cve_data:
raise ValueError(
f'Could not find CVE information for {cve_id}, '
f'maybe it is a wrong number?'
)
return __parse_cve(cve_data)
def __find_all_cwe() -> tp.FrozenSet[CWE]:
source_urls: tp.FrozenSet[str] = frozenset([
'https://cwe.mitre.org/data/csv/699.csv.zip',
'https://cwe.mitre.org/data/csv/1194.csv.zip',
'https://cwe.mitre.org/data/csv/1000.csv.zip'
])
cwe_list: tp.Set[CWE] = set()
# Download each zip file, extract it and parse its entries
for source_url in source_urls:
response = __fetch_url(source_url)
zip_file = zipfile.ZipFile(io.BytesIO(response.content))
with zip_file.open(zip_file.namelist()[0], 'r') as csv_file:
reader = csv.DictReader(
io.TextIOWrapper(csv_file), delimiter=',', quotechar='"'
)
for entry in reader:
cwe_id = entry.get('CWE-ID')
cwe_list.add(
CWE(
cwe_id=f'CWE-{cwe_id}',
name=entry.get('Name', ''),
description=entry.get('Description', '')
)
)
return frozenset(cwe_list)
__CWE_LIST: tp.Optional[tp.FrozenSet[CWE]] = None
[docs]def find_all_cwe() -> tp.FrozenSet[CWE]:
"""
Create a set of all CWE's. The set with CWE numbers is downloaded from.
@https://cwe.mitre.org/data/downloads.html.
Return:
a set of CWE objects
"""
# pylint: disable=W0603
global __CWE_LIST
if not __CWE_LIST:
__CWE_LIST = __find_all_cwe()
return __CWE_LIST
[docs]def find_cwe(
cwe_id: str = '', cwe_name: str = '', cwe_description: str = ''
) -> CWE:
"""
Find a CWE by its attributes (ID (CWE-XXX), name, description).
Args:
cwe_id: the ID of the CWE to search for
cwe_name: the name of the CWE to search for
cwe_description: the description of the CWE to search for
Return:
a CWE if one is found, otherwise raise a ``ValueError``
"""
for cwe in find_all_cwe():
if ((cwe_id and cwe.cwe_id == cwe_id) or
(cwe_name and cwe.name == cwe_name) or
(cwe_description and cwe.description == cwe_description)):
return cwe
raise ValueError(
f'Could not find CWE ({cwe_id}, {cwe_name}, {cwe_description})!'
)
# Cache all requests to limit external requests for a week
requests_cache.install_cache(
f"{str(vara_cfg()['data_cache'])}/requests_cache", expire_after=604800
)