summaryrefslogtreecommitdiff
path: root/poky/meta/classes/cve-check.bbclass
blob: 1c8b2223a20832db0a64d8be705c8dc8301591f5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
# This class is used to check recipes against public CVEs.
#
# In order to use this class just inherit the class in the
# local.conf file and it will add the cve_check task for
# every recipe. The task can be used per recipe, per image,
# or using the special cases "world" and "universe". The
# cve_check task will print a warning for every unpatched
# CVE found and generate a file in the recipe WORKDIR/cve
# directory. If an image is build it will generate a report
# in DEPLOY_DIR_IMAGE for all the packages used.
#
# Example:
#   bitbake -c cve_check openssl
#   bitbake core-image-sato
#   bitbake -k -c cve_check universe
#
# DISCLAIMER
#
# This class/tool is meant to be used as support and not
# the only method to check against CVEs. Running this tool
# doesn't guarantee your packages are free of CVEs.

# The product name that the CVE database uses.  Defaults to BPN, but may need to
# be overriden per recipe (for example tiff.bb sets CVE_PRODUCT=libtiff).
CVE_PRODUCT ??= "${BPN}"
CVE_VERSION ??= "${PV}"

CVE_CHECK_DB_DIR ?= "${DL_DIR}/CVE_CHECK"
CVE_CHECK_DB_FILE ?= "${CVE_CHECK_DB_DIR}/nvdcve_1.0.db"

CVE_CHECK_LOG ?= "${T}/cve.log"
CVE_CHECK_TMP_FILE ?= "${TMPDIR}/cve_check"

CVE_CHECK_DIR ??= "${DEPLOY_DIR}/cve"
CVE_CHECK_MANIFEST ?= "${DEPLOY_DIR_IMAGE}/${IMAGE_NAME}${IMAGE_NAME_SUFFIX}.cve"
CVE_CHECK_COPY_FILES ??= "1"
CVE_CHECK_CREATE_MANIFEST ??= "1"

# Whitelist for packages (PN)
CVE_CHECK_PN_WHITELIST ?= ""

# Whitelist for CVE. If a CVE is found, then it is considered patched.
# The value is a string containing space separated CVE values:
# 
# CVE_CHECK_WHITELIST = 'CVE-2014-2524 CVE-2018-1234'
# 
CVE_CHECK_WHITELIST ?= ""

python do_cve_check () {
    """
    Check recipe for patched and unpatched CVEs
    """

    if os.path.exists(d.getVar("CVE_CHECK_DB_FILE")):
        patched_cves = get_patches_cves(d)
        patched, unpatched = check_cves(d, patched_cves)
        if patched or unpatched:
            cve_data = get_cve_info(d, patched + unpatched)
            cve_write_data(d, patched, unpatched, cve_data)
    else:
        bb.note("No CVE database found, skipping CVE check")

}

addtask cve_check after do_unpack before do_build
do_cve_check[depends] = "cve-update-db-native:do_populate_cve_db"
do_cve_check[nostamp] = "1"

python cve_check_cleanup () {
    """
    Delete the file used to gather all the CVE information.
    """

    bb.utils.remove(e.data.getVar("CVE_CHECK_TMP_FILE"))
}

addhandler cve_check_cleanup
cve_check_cleanup[eventmask] = "bb.cooker.CookerExit"

python cve_check_write_rootfs_manifest () {
    """
    Create CVE manifest when building an image
    """

    import shutil

    if d.getVar("CVE_CHECK_COPY_FILES") == "1":
        deploy_file = os.path.join(d.getVar("CVE_CHECK_DIR"), d.getVar("PN"))
        if os.path.exists(deploy_file):
            bb.utils.remove(deploy_file)

    if os.path.exists(d.getVar("CVE_CHECK_TMP_FILE")):
        bb.note("Writing rootfs CVE manifest")
        deploy_dir = d.getVar("DEPLOY_DIR_IMAGE")
        link_name = d.getVar("IMAGE_LINK_NAME")
        manifest_name = d.getVar("CVE_CHECK_MANIFEST")
        cve_tmp_file = d.getVar("CVE_CHECK_TMP_FILE")

        shutil.copyfile(cve_tmp_file, manifest_name)

        if manifest_name and os.path.exists(manifest_name):
            manifest_link = os.path.join(deploy_dir, "%s.cve" % link_name)
            # If we already have another manifest, update symlinks
            if os.path.exists(os.path.realpath(manifest_link)):
                os.remove(manifest_link)
            os.symlink(os.path.basename(manifest_name), manifest_link)
            bb.plain("Image CVE report stored in: %s" % manifest_name)
}

ROOTFS_POSTPROCESS_COMMAND_prepend = "${@'cve_check_write_rootfs_manifest; ' if d.getVar('CVE_CHECK_CREATE_MANIFEST') == '1' else ''}"
do_rootfs[recrdeptask] += "${@'do_cve_check' if d.getVar('CVE_CHECK_CREATE_MANIFEST') == '1' else ''}"

def get_patches_cves(d):
    """
    Get patches that solve CVEs using the "CVE: " tag.
    """

    import re

    pn = d.getVar("PN")
    cve_match = re.compile("CVE:( CVE\-\d{4}\-\d+)+")

    # Matches last CVE-1234-211432 in the file name, also if written
    # with small letters. Not supporting multiple CVE id's in a single
    # file name.
    cve_file_name_match = re.compile(".*([Cc][Vv][Ee]\-\d{4}\-\d+)")

    patched_cves = set()
    bb.debug(2, "Looking for patches that solves CVEs for %s" % pn)
    for url in src_patches(d):
        patch_file = bb.fetch.decodeurl(url)[2]

        # Check patch file name for CVE ID
        fname_match = cve_file_name_match.search(patch_file)
        if fname_match:
            cve = fname_match.group(1).upper()
            patched_cves.add(cve)
            bb.debug(2, "Found CVE %s from patch file name %s" % (cve, patch_file))

        with open(patch_file, "r", encoding="utf-8") as f:
            try:
                patch_text = f.read()
            except UnicodeDecodeError:
                bb.debug(1, "Failed to read patch %s using UTF-8 encoding"
                        " trying with iso8859-1" %  patch_file)
                f.close()
                with open(patch_file, "r", encoding="iso8859-1") as f:
                    patch_text = f.read()

        # Search for one or more "CVE: " lines
        text_match = False
        for match in cve_match.finditer(patch_text):
            # Get only the CVEs without the "CVE: " tag
            cves = patch_text[match.start()+5:match.end()]
            for cve in cves.split():
                bb.debug(2, "Patch %s solves %s" % (patch_file, cve))
                patched_cves.add(cve)
                text_match = True

        if not fname_match and not text_match:
            bb.debug(2, "Patch %s doesn't solve CVEs" % patch_file)

    return patched_cves

def check_cves(d, patched_cves):
    """
    Connect to the NVD database and find unpatched cves.
    """
    import ast, csv, tempfile, subprocess, io
    from distutils.version import LooseVersion

    cves_unpatched = []
    # CVE_PRODUCT can contain more than one product (eg. curl/libcurl)
    products = d.getVar("CVE_PRODUCT").split()
    # If this has been unset then we're not scanning for CVEs here (for example, image recipes)
    if not products:
        return ([], [])
    pv = d.getVar("CVE_VERSION").split("+git")[0]

    # If the recipe has been whitlisted we return empty lists
    if d.getVar("PN") in d.getVar("CVE_CHECK_PN_WHITELIST").split():
        bb.note("Recipe has been whitelisted, skipping check")
        return ([], [])

    old_cve_whitelist =  d.getVar("CVE_CHECK_CVE_WHITELIST")
    if old_cve_whitelist:
        bb.warn("CVE_CHECK_CVE_WHITELIST is deprecated, please use CVE_CHECK_WHITELIST.")
    cve_whitelist = d.getVar("CVE_CHECK_WHITELIST").split()

    import sqlite3
    db_file = d.getVar("CVE_CHECK_DB_FILE")
    conn = sqlite3.connect(db_file)

    for product in products:
        c = conn.cursor()
        if ":" in product:
            vendor, product = product.split(":", 1)
            c.execute("SELECT * FROM PRODUCTS WHERE PRODUCT IS ? AND VENDOR IS ?", (product, vendor))
        else:
            c.execute("SELECT * FROM PRODUCTS WHERE PRODUCT IS ?", (product,))

        for row in c:
            cve = row[0]
            version_start = row[3]
            operator_start = row[4]
            version_end = row[5]
            operator_end = row[6]

            if cve in cve_whitelist:
                bb.note("%s-%s has been whitelisted for %s" % (product, pv, cve))
                # TODO: this should be in the report as 'whitelisted'
                patched_cves.add(cve)
            elif cve in patched_cves:
                bb.note("%s has been patched" % (cve))
            else:
                to_append = False
                if (operator_start == '=' and pv == version_start):
                    to_append = True
                else:
                    if operator_start:
                        try:
                            to_append_start =  (operator_start == '>=' and LooseVersion(pv) >= LooseVersion(version_start))
                            to_append_start |= (operator_start == '>' and LooseVersion(pv) > LooseVersion(version_start))
                        except:
                            bb.warn("%s: Failed to compare %s %s %s for %s" %
                                    (product, pv, operator_start, version_start, cve))
                            to_append_start = False
                    else:
                        to_append_start = False

                    if operator_end:
                        try:
                            to_append_end  = (operator_end == '<=' and LooseVersion(pv) <= LooseVersion(version_end))
                            to_append_end |= (operator_end == '<' and LooseVersion(pv) < LooseVersion(version_end))
                        except:
                            bb.warn("%s: Failed to compare %s %s %s for %s" %
                                    (product, pv, operator_end, version_end, cve))
                            to_append_end = False
                    else:
                        to_append_end = False

                    if operator_start and operator_end:
                        to_append = to_append_start and to_append_end
                    else:
                        to_append = to_append_start or to_append_end

                if to_append:
                    bb.note("%s-%s is vulnerable to %s" % (product, pv, cve))
                    cves_unpatched.append(cve)
                else:
                    bb.note("%s-%s is not vulnerable to %s" % (product, pv, cve))
                    patched_cves.add(cve)
    conn.close()

    return (list(patched_cves), cves_unpatched)

def get_cve_info(d, cves):
    """
    Get CVE information from the database.

    Unfortunately the only way to get CVE info is set the output to
    html (hard to parse) or query directly the database.
    """

    try:
        import sqlite3
    except ImportError:
        from pysqlite2 import dbapi2 as sqlite3

    cve_data = {}
    db_file = d.getVar("CVE_CHECK_DB_FILE")
    placeholder = ",".join("?" * len(cves))
    query = "SELECT * FROM NVD WHERE id IN (%s)" % placeholder
    conn = sqlite3.connect(db_file)
    cur = conn.cursor()
    for row in cur.execute(query, tuple(cves)):
        cve_data[row[0]] = {}
        cve_data[row[0]]["summary"] = row[1]
        cve_data[row[0]]["scorev2"] = row[2]
        cve_data[row[0]]["scorev3"] = row[3]
        cve_data[row[0]]["modified"] = row[4]
        cve_data[row[0]]["vector"] = row[5]
    conn.close()

    return cve_data

def cve_write_data(d, patched, unpatched, cve_data):
    """
    Write CVE information in WORKDIR; and to CVE_CHECK_DIR, and
    CVE manifest if enabled.
    """

    cve_file = d.getVar("CVE_CHECK_LOG")
    nvd_link = "https://web.nvd.nist.gov/view/vuln/detail?vulnId="
    write_string = ""
    unpatched_cves = []
    bb.utils.mkdirhier(os.path.dirname(cve_file))

    for cve in sorted(cve_data):
        write_string += "PACKAGE NAME: %s\n" % d.getVar("PN")
        write_string += "PACKAGE VERSION: %s\n" % d.getVar("PV")
        write_string += "CVE: %s\n" % cve
        if cve in patched:
            write_string += "CVE STATUS: Patched\n"
        else:
            unpatched_cves.append(cve)
            write_string += "CVE STATUS: Unpatched\n"
        write_string += "CVE SUMMARY: %s\n" % cve_data[cve]["summary"]
        write_string += "CVSS v2 BASE SCORE: %s\n" % cve_data[cve]["scorev2"]
        write_string += "CVSS v3 BASE SCORE: %s\n" % cve_data[cve]["scorev3"]
        write_string += "VECTOR: %s\n" % cve_data[cve]["vector"]
        write_string += "MORE INFORMATION: %s%s\n\n" % (nvd_link, cve)

    if unpatched_cves:
        bb.warn("Found unpatched CVE (%s), for more information check %s" % (" ".join(unpatched_cves),cve_file))

    with open(cve_file, "w") as f:
        bb.note("Writing file %s with CVE information" % cve_file)
        f.write(write_string)

    if d.getVar("CVE_CHECK_COPY_FILES") == "1":
        cve_dir = d.getVar("CVE_CHECK_DIR")
        bb.utils.mkdirhier(cve_dir)
        deploy_file = os.path.join(cve_dir, d.getVar("PN"))
        with open(deploy_file, "w") as f:
            f.write(write_string)

    if d.getVar("CVE_CHECK_CREATE_MANIFEST") == "1":
        with open(d.getVar("CVE_CHECK_TMP_FILE"), "a") as f:
            f.write("%s" % write_string)