Sunday, September 21, 2025

AI: Sample Dataset – National Vulnerability Database (NVD) CVE

Any AI needs seed data such as critical business data. With the help of ChatGPT, I created a Python app that downloads records from the National Vulnerability Database (NVD) CVE feed. This data is a structured dump of security vulnerability records retrieved from the REST endpoint:

https://services.nvd.nist.gov/rest/json/cves/2.0

The file size for all records in 2024 is approximately 100 MB. Be aware that Microsoft's documentation specifies Azure AI Search pricing as follows, where the size limit of the Free tier is 50 MB:

Each record retrieved from the National Vulnerability Database's CVE feed is in this format:

{
  "cve": {
    "id": "CVE-2024-12345",
    "published": "2024-03-05T17:00Z",
    "lastModified": "2024-05-01T12:34Z",
    "descriptions": [
      {"lang": "en", "value": "Buffer overflow in XYZ software before 1.2.3 allows remote attackers to execute code."}
    ],
    "metrics": {
      "cvssMetricV31": [
        {
          "cvssData": {
            "baseScore": 9.8,
            "baseSeverity": "CRITICAL"
          }
        }
      ]
    },
    "weaknesses": [
      {"description": [{"lang": "en", "value": "CWE-120 Buffer Copy without Checking Size of Input"}]}
    ],
    "references": [
      {"url": "https://vendor.com/security/advisory/123"}
    ]
  }
}

The data endpoint rejects too many requests and cannot handle large requests. To work around this:

  • A paging strategy was implemented where records were retrieved 120 days at a time.
  • A current status checkpoint file, checkpoint.json, was maintained so the query could restart from the failure point.
  • Iterations made use of timeouts (time.sleep) between page retrievals and between retries after errors.

On any failure, simply wait a few minutes and the code will restart from the last point of failure.

The Python code is as follows:

#!/usr/bin/env python3
import argparse
import datetime as dt
import io
import json
import os
import random
import sys
import time
import urllib.request
import urllib.parse
import urllib.error
from typing import Optional, List, Dict, Tuple

API_BASE = "https://services.nvd.nist.gov/rest/json/cves/2.0"
UA = "Mozilla/5.0 (cve-extractor/1.0)"
MAX_WINDOW_DAYS = 120
DEFAULT_PAGE_SIZE = 1000
MAX_RETRIES = 6
BACKOFF_BASE = 1.6
JITTER_MAX = 0.5
DEFAULT_CHECKPOINT = "checkpoint.json"

def _get_default_year() -> int:
    """Gets most recent full years (all days in year) versus say the current year which is partial."""
    LAST_MONTH_OF_YEAR = 12
    LAST_DAY_OF_DECEMBER = 31
    today = dt.date.today()
    year = today.year
    if today.month < LAST_MONTH_OF_YEAR or today.day < LAST_DAY_OF_DECEMBER:
        year -= 1
    return year

def _windows_for_year(year: int) -> List[Tuple[str, str]]:
    """Yield (start_iso, end_iso) windows of ≤120 days across the year."""
    start = dt.datetime(year, 1, 1, 0, 0, 0, 0)
    year_end = dt.datetime(year, 12, 31, 23, 59, 59, 999000)
    step = dt.timedelta(days=MAX_WINDOW_DAYS)
    cur = start
    out = []
    while cur <= year_end:
        end = min(cur + step - dt.timedelta(seconds=1), year_end)
        s = cur.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
        e = end.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
        out.append((s, e))
        cur = end + dt.timedelta(seconds=1)
    return out

def _request_json(url: str, api_key: Optional[str], retries: int = MAX_RETRIES) -> dict:
    """HTTP GET with retries/backoff; honors Retry-After for 429/403/503."""
    headers = {"User-Agent": UA, "Accept": "application/json"}
    if api_key:
        headers["apiKey"] = api_key
    last_err = None
    for attempt in range(retries):
        req = urllib.request.Request(url, headers=headers)
        try:
            with urllib.request.urlopen(req, timeout=60) as r:
                return json.loads(r.read().decode("utf-8"))
        except urllib.error.HTTPError as e:
            last_err = e
            if e.code in (429, 403, 503):
                retry_after = 0.0
                try:
                    ra = e.headers.get("Retry-After")
                    if ra:
                        retry_after = float(ra)
                except Exception:
                    retry_after = 0.0
                backoff = max(retry_after, (BACKOFF_BASE ** attempt) + random.uniform(0, JITTER_MAX))
                sys.stderr.write(f"HTTP {e.code} -> backoff {backoff:.2f}s (attempt {attempt+1}/{retries})\n")
                time.sleep(backoff)
                continue
            raise
        except urllib.error.URLError as e:
            last_err = e
            backoff = (BACKOFF_BASE ** attempt) + random.uniform(0, JITTER_MAX)
            sys.stderr.write(f"Network error '{e.reason}' -> retry in {backoff:.2f}s (attempt {attempt+1}/{retries})\n")
            time.sleep(backoff)
            continue
    raise last_err

def _flatten_vuln(v: dict) -> Dict[str, object]:
    """Flatten one NVD v2 vulnerability object to a compact record for RAG."""
    cve = v.get("cve", {})
    cve_id = cve.get("id")
    # description (english)
    desc = ""
    for d in cve.get("descriptions", []):
        if d.get("lang") == "en":
            desc = d.get("value", "")
            break
    # metrics: prefer v3.1 → v3.0 → v2
    severity = None
    score = None
    metrics = cve.get("metrics", {}) if isinstance(cve.get("metrics", {}), dict) else {}
    for key in ("cvssMetricV31", "cvssMetricV30", "cvssMetricV2"):
        arr = metrics.get(key) or []
        if arr:
            m = arr[0]
            if key.startswith("cvssMetricV3"):
                cd = m.get("cvssData", {})
                severity, score = cd.get("baseSeverity"), cd.get("baseScore")
            else:
                severity = m.get("baseSeverity")
                score = m.get("cvssData", {}).get("baseScore", m.get("baseScore"))
            break
    # CWEs
    cwes = []
    for w in cve.get("weaknesses", []):
        for d in w.get("description", []):
            if d.get("lang") == "en" and d.get("value"):
                cwes.append(d["value"])
    # references
    refs = [r.get("url") for r in cve.get("references", []) if r.get("url")]
    return {
        "id": cve_id,
        "title": (desc.split("\n", 1)[0].strip() if desc else cve_id),
        "description": desc,
        "severity": severity,
        "score": score,
        "cwes": cwes,
        "references": refs,
        "published": cve.get("published"),
        "last_modified": cve.get("lastModified"),
        "source": "nvd",
    }

def _write_jsonl_records(path: str, records: List[dict], flatten: bool):
    """Append JSONL records to file (create if not exists)."""
    mode = "a" if os.path.exists(path) else "w"
    with open(path, mode, encoding="utf-8") as f:
        for v in records:
            rec = _flatten_vuln(v) if flatten else v
            f.write(json.dumps(rec, ensure_ascii=False) + "\n")

# -----------------------
# checkpointing
# -----------------------
def _load_checkpoint(path: str) -> Optional[dict]:
    if not os.path.exists(path):
        return None
    with open(path, "r", encoding="utf-8") as f:
        return json.load(f)

def _save_checkpoint(path: str, year: int, window: Tuple[str, str], next_index: int, out_path: str):
    tmp = path + ".tmp"
    data = {
        "year": year,
        "window": {"start": window[0], "end": window[1]},
        "next_index": next_index,
        "out": out_path,
        "updated": dt.datetime.utcnow().isoformat(timespec="seconds") + "Z",
    }
    with open(tmp, "w", encoding="utf-8") as f:
        json.dump(data, f, ensure_ascii=False, indent=2)
    os.replace(tmp, path)

def _clear_checkpoint(path: str):
    try:
        os.remove(path)
    except FileNotFoundError:
        pass

# -----------------------
# main fetch (API + pagination + checkpoint)
# -----------------------
def fetch_year_to_jsonl(year: int,
                        out_path: str,
                        api_key: Optional[str],
                        page_size: int,
                        page_delay: float,
                        window_delay: float,
                        flatten: bool,
                        checkpoint_path: Optional[str],
                        resume: bool):
    # Prepare windows
    windows = _windows_for_year(year)

    # Checkpoint load
    start_window_idx = 0
    resume_index = 0
    if resume and checkpoint_path:
        cp = _load_checkpoint(checkpoint_path)
        if cp and cp.get("year") == year and cp.get("out") == out_path:
            w = cp.get("window") or {}
            if "start" in w and "end" in w and "next_index" in cp:
                try:
                    start_window_idx = windows.index((w["start"], w["end"]))
                    resume_index = int(cp["next_index"])
                    sys.stderr.write(f"Resuming from window {start_window_idx+1}/{len(windows)} "
                                     f"@ startIndex={resume_index}\n")
                except ValueError:
                    sys.stderr.write("Checkpoint window not found in computed windows; starting fresh.\n")

    # For a clean restart (new out file) consider removing existing file;
    # here we append to allow true resume.
    for w_idx, (start_iso, end_iso) in enumerate(windows[start_window_idx:], start=start_window_idx):
        sys.stderr.write(f"Window {w_idx+1}/{len(windows)}: {start_iso} → {end_iso}\n")
        start_index = resume_index if w_idx == start_window_idx else 0

        while True:
            qs = urllib.parse.urlencode({
                "pubStartDate": start_iso,
                "pubEndDate":   end_iso,
                "startIndex":   start_index,
                "resultsPerPage": page_size,
            })
            url = f"{API_BASE}?{qs}"
            data = _request_json(url, api_key)
            batch = data.get("vulnerabilities", []) or []
            total = int(data.get("totalResults", 0))

            if batch:
                _write_jsonl_records(out_path, batch, flatten=flatten)
                start_index += len(batch)
                # Save checkpoint after each page
                if checkpoint_path:
                    _save_checkpoint(checkpoint_path, year, (start_iso, end_iso), start_index, out_path)
                sys.stderr.write(
                    f"  window {w_idx+1}/{len(windows)} "
                    f"page {start_index//page_size+1}: +{len(batch):,} / {total:,} in this window\n"
                )
                # Page pacing
                if page_delay > 0:
                    time.sleep(page_delay)
            else:
                break

            if start_index >= total:
                break

        # reset resume index for next window
        resume_index = 0
        # Small delay between windows
        if window_delay > 0:
            time.sleep(window_delay)

    if checkpoint_path:
        _clear_checkpoint(checkpoint_path)

def main():
    default_year = _get_default_year()
    ap = argparse.ArgumentParser(description='Download and export NVD CVEs for a year to JSONL (with resume).')
    ap.add_argument('--year', type=int, default=default_year, help='CVE year (e.g., 2024)')
    ap.add_argument('--out', type=str, default=None, help='Output JSONL path (default: cves-<year>.jsonl)')
    ap.add_argument('--api-key', type=str, default=None, help='Optional NVD API key (header: apiKey)')
    ap.add_argument('--raw', action='store_true', help='Write raw NVD objects instead of flattened records')
    ap.add_argument('--page-size', type=int, default=DEFAULT_PAGE_SIZE, help='API resultsPerPage (default: 1000)')
    ap.add_argument('--page-delay', type=float, default=0.8, help='Seconds to sleep between pages')
    ap.add_argument('--window-delay', type=float, default=1.5, help='Seconds to sleep between 120-day windows')
    ap.add_argument('--checkpoint', type=str, default=DEFAULT_CHECKPOINT, help='Checkpoint file path')
    ap.add_argument('--no-resume', 
                    action='store_false', 
                    dest='resume',
                    help='Do not resume from checkpoint (default: resume)')
    ap.add_argument('--rate', type=int, default=0, help='Target records/hour (overrides --page-delay if set)')
    ap.add_argument('--limit', 
                    type=int, 
                    default=0, 
                    help='(Deprecated here) Not applied because we stream-write pages')

    args = ap.parse_args()
    out_path = args.out or f"cves-{args.year}.jsonl"

    # If a rate is specified, compute per-page sleep to approximate that rate.
    page_delay = args.page_delay
    if args.rate and args.rate > 0:
        # seconds per page = 3600 * page_size / rate
        page_delay = (3600.0 * args.page_size) / float(args.rate)
        sys.stderr.write(f"Rate target {args.rate} recs/hour → page_delay ≈ {page_delay:.2f}s\n")

    try:
        fetch_year_to_jsonl(
            year=args.year,
            out_path=out_path,
            api_key=args.api_key,
            page_size=args.page_size,
            page_delay=page_delay,
            window_delay=args.window_delay,
            flatten=(not args.raw),
            checkpoint_path=args.checkpoint,
            resume=args.resume,
        )
    except Exception as e:
        print(f"ERROR: {e}", file=sys.stderr)
        sys.exit(1)

    print(f"Done. Output: {out_path}")

if __name__ == '__main__':
    main()


No comments :

Post a Comment