1
+ # Standard Python Libraries
1
2
import json
2
3
import logging
3
- import time
4
- import random
4
+ from typing import Optional
5
5
import urllib .request
6
6
7
- from rich .progress import track
8
-
7
+ # Third-Party Libraries
9
8
from cyhy_db .models import KEVDoc
9
+ from jsonschema import SchemaError , ValidationError , validate
10
+ from rich .progress import track
10
11
11
12
# TODO rename this file to something better
12
13
14
+ ALLOWED_URL_SCHEMES = ["http" , "https" ]
15
+
13
16
logger = logging .getLogger (__name__ )
14
17
15
- # def sync(url: str = DEFAULT_KEV_URL) -> None:
16
- # """Synchronize the KEV data from the given URL."""
17
18
18
- # for _ in track(
19
- # range(100),
20
- # description="KEV Syncing",
21
- # ):
22
- # time.sleep(random.uniform(0.01, 1))
19
+ async def fetch_kev_data (
20
+ kev_json_url : str , kev_schema_url : Optional [str ] = None
21
+ ) -> dict :
22
+ """Fetch the KEV data from the given URL."""
23
23
24
+ # Create a Request object so we can test the safety of the URL
25
+ key_json_request = urllib .request .Request (kev_json_url )
26
+ if key_json_request .type not in ALLOWED_URL_SCHEMES :
27
+ raise ValueError ("Invalid URL scheme in json URL: %s" % key_json_request .type )
24
28
25
- async def fetch_kev_data ( url : str ) -> dict :
26
- """Fetch the KEV data from the given URL."""
29
+ # Below we disable the bandit blacklist for the urllib.request.urlopen() function
30
+ # since we are checking the URL scheme before using.
27
31
28
- # We disable the bandit blacklist for the urllib.request.urlopen() function
29
- # because the URL is either the default (safe) URL or one provided in the
30
- # Lambda configuration so we can assume it is safe.
31
- with urllib .request .urlopen (url ) as response : # nosec B310
32
+ with urllib .request .urlopen (kev_json_url ) as response : # nosec B310
32
33
if response .status != 200 :
33
34
raise Exception ("Failed to retrieve KEV JSON." )
34
35
35
36
kev_json = json .loads (response .read ().decode ("utf-8" ))
36
37
37
- # TODO: Check the data against the schema
38
- # https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities_schema.json
39
-
40
- # Sanity check the JSON data
41
- if "vulnerabilities" not in kev_json :
42
- raise ValueError (
43
- "JSON does not look like valid KEV data. Missing vulnerabilities."
44
- )
38
+ # If a schema URL was provided, we will validate the JSON data against it
39
+ if kev_schema_url :
40
+ # Create a Request object so we can test the safety of the URL
41
+ key_schema_request = urllib .request .Request (kev_schema_url )
42
+ if key_schema_request .type not in ALLOWED_URL_SCHEMES :
43
+ raise ValueError (
44
+ "Invalid URL scheme in schema URL: %s" % key_json_request .type
45
+ )
46
+ with urllib .request .urlopen (kev_schema_url ) as response : # nosec B310
47
+ if response .status != 200 :
48
+ raise Exception ("Failed to retrieve KEV JSON schema." )
49
+ kev_schema = json .loads (response .read ().decode ("utf-8" ))
50
+ try :
51
+ validate (instance = kev_json , schema = kev_schema )
52
+ logger .info ("KEV JSON is valid against the schema." )
53
+ except ValidationError as e :
54
+ logger .error ("JSON validation error: %s" , e .message )
55
+ except SchemaError as e :
56
+ logger .error ("Schema error: %s" , e .message )
45
57
46
58
reported_vuln_count = kev_json .get ("count" )
47
- if reported_vuln_count is None :
48
- raise ValueError ("JSON does not look like valid KEV data. Missing count." )
49
-
50
59
actual_vuln_count = len (kev_json ["vulnerabilities" ])
51
60
if reported_vuln_count != actual_vuln_count :
52
61
logger .warning (
@@ -67,8 +76,8 @@ async def create_kev_doc(kev_json: dict) -> str:
67
76
"""Add the provided KEV to the database and return its id."""
68
77
cve_id = kev_json .get ("cveID" )
69
78
if not cve_id :
70
- raise ValueError ("JSON does not look like valid KEV data ." )
71
- known_ransomware = kev_json . get ( "knownRansomwareCampaignUse" ) .lower () == "known"
79
+ raise ValueError ("cveID not found in KEV JSON ." )
80
+ known_ransomware = kev_json [ "knownRansomwareCampaignUse" ] .lower () == "known"
72
81
kev_doc = KEVDoc (id = cve_id , known_ransomware = known_ransomware )
73
82
await kev_doc .save ()
74
83
logger .debug ("Created KEV document with id: %s" , cve_id )
@@ -83,7 +92,10 @@ async def remove_outdated_kevs() -> None:
83
92
84
93
async def process_kev_json (kev_json : dict ) -> None :
85
94
"""Process the KEV JSON data."""
86
- for kev in kev_json ["vulnerabilities" ]:
95
+ for kev in track (
96
+ kev_json ["vulnerabilities" ],
97
+ description = "Creating KEV docs" ,
98
+ ):
87
99
try :
88
100
await create_kev_doc (kev )
89
101
except Exception as e :
0 commit comments