Training courses

Kernel and Embedded Linux

Bootlin training courses

Embedded Linux, kernel,
Yocto Project, Buildroot, real-time,
graphics, boot time, debugging...

Bootlin logo

Elixir Cross Referencer

# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0.  If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.

############################################################################
# rndc.py
# This module implements the RNDC control protocol.
############################################################################

from collections import OrderedDict
import time
import struct
import hashlib
import hmac
import base64
import random
import socket


class rndc(object):
    """RNDC protocol client library"""

    __algos = {
        "md5": 157,
        "sha1": 161,
        "sha224": 162,
        "sha256": 163,
        "sha384": 164,
        "sha512": 165,
    }

    def __init__(self, host, algo, secret):
        """Creates a persistent connection to RNDC and logs in
        host - (ip, port) tuple
        algo - HMAC algorithm: one of md5, sha1, sha224, sha256, sha384, sha512
               (with optional prefix 'hmac-')
        secret - HMAC secret, base64 encoded"""
        self.host = host
        algo = algo.lower()
        if algo.startswith("hmac-"):
            algo = algo[5:]
        self.algo = algo
        self.hlalgo = getattr(hashlib, algo)
        self.secret = base64.b64decode(secret)
        self.ser = random.randint(0, 1 << 24)
        self.nonce = None
        self.__connect_login()

    def call(self, cmd):
        """Call a RNDC command, all parsing is done on the server side
        cmd - a complete string with a command (eg 'reload zone example.com')
        """
        return dict(self.__command(type=cmd)["_data"])

    def __serialize_dict(self, data, ignore_auth=False):
        rv = bytearray()
        for k, v in data.items():
            if ignore_auth and k == "_auth":
                continue
            rv += struct.pack("B", len(k)) + k.encode("ascii")
            if type(v) == str:
                rv += struct.pack(">BI", 1, len(v)) + v.encode("ascii")
            elif type(v) == bytes:
                rv += struct.pack(">BI", 1, len(v)) + v
            elif type(v) == bytearray:
                rv += struct.pack(">BI", 1, len(v)) + v
            elif type(v) == OrderedDict:
                sd = self.__serialize_dict(v)
                rv += struct.pack(">BI", 2, len(sd)) + sd
            else:
                raise NotImplementedError(
                    "Cannot serialize element of type %s" % type(v)
                )
        return rv

    def __prep_message(self, *args, **kwargs):
        self.ser += 1
        now = int(time.time())
        data = OrderedDict(*args, **kwargs)

        d = OrderedDict()
        d["_auth"] = OrderedDict()
        d["_ctrl"] = OrderedDict()
        d["_ctrl"]["_ser"] = str(self.ser)
        d["_ctrl"]["_tim"] = str(now)
        d["_ctrl"]["_exp"] = str(now + 60)
        if self.nonce is not None:
            d["_ctrl"]["_nonce"] = self.nonce
        d["_data"] = data

        msg = self.__serialize_dict(d, ignore_auth=True)
        hash = hmac.new(self.secret, msg, self.hlalgo).digest()
        bhash = base64.b64encode(hash)
        if self.algo == "md5":
            d["_auth"]["hmd5"] = struct.pack("22s", bhash)
        else:
            d["_auth"]["hsha"] = bytearray(
                struct.pack("B88s", self.__algos[self.algo], bhash)
            )
        msg = self.__serialize_dict(d)
        msg = struct.pack(">II", len(msg) + 4, 1) + msg
        return msg

    def __verify_msg(self, msg):
        if self.nonce is not None and msg["_ctrl"]["_nonce"] != self.nonce:
            return False
        if self.algo == "md5":
            bhash = msg["_auth"]["hmd5"]
        else:
            bhash = msg["_auth"]["hsha"][1:]
        if type(bhash) == bytes:
            bhash = bhash.decode("ascii")
        bhash += "=" * (4 - (len(bhash) % 4))
        remote_hash = base64.b64decode(bhash)
        my_msg = self.__serialize_dict(msg, ignore_auth=True)
        my_hash = hmac.new(self.secret, my_msg, self.hlalgo).digest()
        return my_hash == remote_hash

    def __command(self, *args, **kwargs):
        msg = self.__prep_message(*args, **kwargs)
        sent = self.socket.send(msg)
        if sent != len(msg):
            raise IOError("Cannot send the message")

        header = self.socket.recv(8)
        if len(header) != 8:
            # What should we throw here? Bad auth can cause this...
            raise IOError("Can't read response header")

        length, version = struct.unpack(">II", header)
        if version != 1:
            raise NotImplementedError("Wrong message version %d" % version)

        # it includes the header
        length -= 4
        data = self.socket.recv(length, socket.MSG_WAITALL)
        if len(data) != length:
            raise IOError("Can't read response data")

        if type(data) == str:
            data = bytearray(data)
        msg = self.__parse_message(data)
        if not self.__verify_msg(msg):
            raise IOError("Authentication failure")

        return msg

    def __connect_login(self):
        self.socket = socket.create_connection(self.host)
        self.nonce = None
        msg = self.__command(type="null")
        self.nonce = msg["_ctrl"]["_nonce"]

    def __parse_element(self, input):
        pos = 0
        labellen = input[pos]
        pos += 1
        label = input[pos : pos + labellen].decode("ascii")
        pos += labellen
        type = input[pos]
        pos += 1
        datalen = struct.unpack(">I", input[pos : pos + 4])[0]
        pos += 4
        data = input[pos : pos + datalen]
        pos += datalen
        rest = input[pos:]

        if type == 1:  # raw binary value
            return label, data, rest
        elif type == 2:  # dictionary
            d = OrderedDict()
            while len(data) > 0:
                ilabel, value, data = self.__parse_element(data)
                d[ilabel] = value
            return label, d, rest
        # TODO type 3 - list
        else:
            raise NotImplementedError("Unknown element type %d" % type)

    def __parse_message(self, input):
        rv = OrderedDict()
        hdata = None
        while len(input) > 0:
            label, value, input = self.__parse_element(input)
            rv[label] = value
        return rv