# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import argparse
import os
import sys
from subprocess import Popen, PIPE
from isc.utils import prefix, version
prog = "dnssec-checkds"
############################################################################
# SECRR class:
# Class for DS resource record
############################################################################
class SECRR:
hashalgs = {1: "SHA-1", 2: "SHA-256", 3: "GOST", 4: "SHA-384"}
rrname = ""
rrclass = "IN"
keyid = None
keyalg = None
hashalg = None
digest = ""
ttl = 0
def __init__(self, rrtext):
if not rrtext:
raise Exception
# 'str' does not have decode method in python3
if type(rrtext) is not str:
fields = rrtext.decode("ascii").split()
else:
fields = rrtext.split()
if len(fields) < 7:
raise Exception
self.rrtype = "DS"
self.rrname = fields[0].lower()
fields = fields[1:]
if fields[0].upper() in ["IN", "CH", "HS"]:
self.rrclass = fields[0].upper()
fields = fields[1:]
else:
self.ttl = int(fields[0])
self.rrclass = fields[1].upper()
fields = fields[2:]
if fields[0].upper() != self.rrtype:
raise Exception("%s does not match %s" % (fields[0].upper(), self.rrtype))
self.keyid, self.keyalg, self.hashalg = map(int, fields[1:4])
self.digest = "".join(fields[4:]).upper()
def __repr__(self):
return "%s %s %s %d %d %d %s" % (
self.rrname,
self.rrclass,
self.rrtype,
self.keyid,
self.keyalg,
self.hashalg,
self.digest,
)
def __eq__(self, other):
return self.__repr__() == other.__repr__()
############################################################################
# check:
# Fetch DS RRset for the given zone from the DNS; fetch DNSKEY
# RRset from the masterfile if specified, or from DNS if not.
# Generate a set of expected DS records from the DNSKEY RRset,
# and report on congruency.
############################################################################
def check(zone, args):
rrlist = []
if args.dssetfile:
fp = open(args.dssetfile).read()
else:
cmd = [args.dig, "+noall", "+answer", "-t", "ds", "-q", zone]
fp, _ = Popen(cmd, stdout=PIPE).communicate()
for line in fp.splitlines():
if type(line) is not str:
line = line.decode("ascii")
rrlist.append(SECRR(line))
rrlist = sorted(rrlist, key=lambda rr: (rr.keyid, rr.keyalg, rr.hashalg))
klist = []
cmd = [args.dsfromkey]
for algo in args.algo:
cmd += ["-a", algo]
if args.masterfile:
cmd += ["-f", args.masterfile, zone]
fp, _ = Popen(cmd, stdout=PIPE).communicate()
else:
intods, _ = Popen(
[args.dig, "+noall", "+answer", "-t", "dnskey", "-q", zone], stdout=PIPE
).communicate()
cmd += ["-f", "-", zone]
fp, _ = Popen(cmd, stdin=PIPE, stdout=PIPE).communicate(intods)
for line in fp.splitlines():
if type(line) is not str:
line = line.decode("ascii")
klist.append(SECRR(line))
if len(klist) < 1:
print("No DNSKEY records found in zone apex")
return False
match = True
for rr in rrlist:
if rr not in klist:
print(
"KSK for %s %s/%03d/%05d (%s) missing from child"
% (
rr.rrtype,
rr.rrname.strip("."),
rr.keyalg,
rr.keyid,
SECRR.hashalgs[rr.hashalg],
)
)
match = False
for rr in klist:
if rr not in rrlist:
print(
"%s for KSK %s/%03d/%05d (%s) missing from parent"
% (
rr.rrtype,
rr.rrname.strip("."),
rr.keyalg,
rr.keyid,
SECRR.hashalgs[rr.hashalg],
)
)
match = False
for rr in klist:
if rr in rrlist:
print(
"%s for KSK %s/%03d/%05d (%s) found in parent"
% (
rr.rrtype,
rr.rrname.strip("."),
rr.keyalg,
rr.keyid,
SECRR.hashalgs[rr.hashalg],
)
)
return match
############################################################################
# parse_args:
# Read command line arguments, set global 'args' structure
############################################################################
def parse_args():
parser = argparse.ArgumentParser(description=prog + ": checks DS coverage")
bindir = "bin"
sbindir = "bin" if os.name == "nt" else "sbin"
parser.add_argument("zone", type=str, help="zone to check")
parser.add_argument(
"-a",
"--algo",
dest="algo",
action="append",
default=[],
type=str,
help="DS digest algorithm",
)
parser.add_argument(
"-d",
"--dig",
dest="dig",
default=os.path.join(prefix(bindir), "dig"),
type=str,
help="path to 'dig'",
)
parser.add_argument(
"-D",
"--dsfromkey",
dest="dsfromkey",
default=os.path.join(prefix(sbindir), "dnssec-dsfromkey"),
type=str,
help="path to 'dnssec-dsfromkey'",
)
parser.add_argument(
"-f", "--file", dest="masterfile", type=str, help="zone master file"
)
parser.add_argument(
"-s", "--dsset", dest="dssetfile", type=str, help="prepared DSset file"
)
parser.add_argument("-v", "--version", action="version", version=version)
args = parser.parse_args()
args.zone = args.zone.strip(".")
return args
############################################################################
# Main
############################################################################
def main():
args = parse_args()
match = check(args.zone, args)
exit(0 if match else 1)