Training courses

Kernel and Embedded Linux

Bootlin training courses

Embedded Linux, kernel,
Yocto Project, Buildroot, real-time,
graphics, boot time, debugging...

Bootlin logo

Elixir Cross Referencer

# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0.  If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.

import argparse
import os
import sys
from subprocess import Popen, PIPE

from isc.utils import prefix, version

prog = "dnssec-checkds"


############################################################################
# SECRR class:
# Class for DS resource record
############################################################################
class SECRR:
    hashalgs = {1: "SHA-1", 2: "SHA-256", 3: "GOST", 4: "SHA-384"}
    rrname = ""
    rrclass = "IN"
    keyid = None
    keyalg = None
    hashalg = None
    digest = ""
    ttl = 0

    def __init__(self, rrtext):
        if not rrtext:
            raise Exception

        # 'str' does not have decode method in python3
        if type(rrtext) is not str:
            fields = rrtext.decode("ascii").split()
        else:
            fields = rrtext.split()
        if len(fields) < 7:
            raise Exception

        self.rrtype = "DS"
        self.rrname = fields[0].lower()

        fields = fields[1:]
        if fields[0].upper() in ["IN", "CH", "HS"]:
            self.rrclass = fields[0].upper()
            fields = fields[1:]
        else:
            self.ttl = int(fields[0])
            self.rrclass = fields[1].upper()
            fields = fields[2:]

        if fields[0].upper() != self.rrtype:
            raise Exception("%s does not match %s" % (fields[0].upper(), self.rrtype))

        self.keyid, self.keyalg, self.hashalg = map(int, fields[1:4])
        self.digest = "".join(fields[4:]).upper()

    def __repr__(self):
        return "%s %s %s %d %d %d %s" % (
            self.rrname,
            self.rrclass,
            self.rrtype,
            self.keyid,
            self.keyalg,
            self.hashalg,
            self.digest,
        )

    def __eq__(self, other):
        return self.__repr__() == other.__repr__()


############################################################################
# check:
# Fetch DS RRset for the given zone from the DNS; fetch DNSKEY
# RRset from the masterfile if specified, or from DNS if not.
# Generate a set of expected DS records from the DNSKEY RRset,
# and report on congruency.
############################################################################
def check(zone, args):
    rrlist = []
    if args.dssetfile:
        fp = open(args.dssetfile).read()
    else:
        cmd = [args.dig, "+noall", "+answer", "-t", "ds", "-q", zone]
        fp, _ = Popen(cmd, stdout=PIPE).communicate()

    for line in fp.splitlines():
        if type(line) is not str:
            line = line.decode("ascii")
        rrlist.append(SECRR(line))
    rrlist = sorted(rrlist, key=lambda rr: (rr.keyid, rr.keyalg, rr.hashalg))

    klist = []

    cmd = [args.dsfromkey]
    for algo in args.algo:
        cmd += ["-a", algo]

    if args.masterfile:
        cmd += ["-f", args.masterfile, zone]
        fp, _ = Popen(cmd, stdout=PIPE).communicate()
    else:
        intods, _ = Popen(
            [args.dig, "+noall", "+answer", "-t", "dnskey", "-q", zone], stdout=PIPE
        ).communicate()
        cmd += ["-f", "-", zone]
        fp, _ = Popen(cmd, stdin=PIPE, stdout=PIPE).communicate(intods)

    for line in fp.splitlines():
        if type(line) is not str:
            line = line.decode("ascii")
        klist.append(SECRR(line))

    if len(klist) < 1:
        print("No DNSKEY records found in zone apex")
        return False

    match = True
    for rr in rrlist:
        if rr not in klist:
            print(
                "KSK for %s %s/%03d/%05d (%s) missing from child"
                % (
                    rr.rrtype,
                    rr.rrname.strip("."),
                    rr.keyalg,
                    rr.keyid,
                    SECRR.hashalgs[rr.hashalg],
                )
            )
            match = False
    for rr in klist:
        if rr not in rrlist:
            print(
                "%s for KSK %s/%03d/%05d (%s) missing from parent"
                % (
                    rr.rrtype,
                    rr.rrname.strip("."),
                    rr.keyalg,
                    rr.keyid,
                    SECRR.hashalgs[rr.hashalg],
                )
            )
            match = False
    for rr in klist:
        if rr in rrlist:
            print(
                "%s for KSK %s/%03d/%05d (%s) found in parent"
                % (
                    rr.rrtype,
                    rr.rrname.strip("."),
                    rr.keyalg,
                    rr.keyid,
                    SECRR.hashalgs[rr.hashalg],
                )
            )

    return match


############################################################################
# parse_args:
# Read command line arguments, set global 'args' structure
############################################################################
def parse_args():
    parser = argparse.ArgumentParser(description=prog + ": checks DS coverage")

    bindir = "bin"
    sbindir = "bin" if os.name == "nt" else "sbin"

    parser.add_argument("zone", type=str, help="zone to check")
    parser.add_argument(
        "-a",
        "--algo",
        dest="algo",
        action="append",
        default=[],
        type=str,
        help="DS digest algorithm",
    )
    parser.add_argument(
        "-d",
        "--dig",
        dest="dig",
        default=os.path.join(prefix(bindir), "dig"),
        type=str,
        help="path to 'dig'",
    )
    parser.add_argument(
        "-D",
        "--dsfromkey",
        dest="dsfromkey",
        default=os.path.join(prefix(sbindir), "dnssec-dsfromkey"),
        type=str,
        help="path to 'dnssec-dsfromkey'",
    )
    parser.add_argument(
        "-f", "--file", dest="masterfile", type=str, help="zone master file"
    )
    parser.add_argument(
        "-s", "--dsset", dest="dssetfile", type=str, help="prepared DSset file"
    )
    parser.add_argument("-v", "--version", action="version", version=version)
    args = parser.parse_args()

    args.zone = args.zone.strip(".")

    return args


############################################################################
# Main
############################################################################
def main():
    args = parse_args()
    match = check(args.zone, args)
    exit(0 if match else 1)