Training courses

Kernel and Embedded Linux

Bootlin training courses

Embedded Linux, kernel,
Yocto Project, Buildroot, real-time,
graphics, boot time, debugging...

Bootlin logo

Elixir Cross Referencer

# -*- coding: utf-8 -*-
# $OpenLDAP$
## This work is part of OpenLDAP Software <http://www.openldap.org/>.
##
## Copyright 2016-2021 Ondřej Kuzník, Symas Corp.
## Copyright 2021 The OpenLDAP Foundation.
## All rights reserved.
##
## Redistribution and use in source and binary forms, with or without
## modification, are permitted only as authorized by the OpenLDAP
## Public License.
##
## A copy of this license is available in the file LICENSE in the
## top-level directory of the distribution or, alternatively, at
## <http://www.OpenLDAP.org/license.html>.

from __future__ import print_function

import hashlib
import hmac
import os
import struct
import sys
import time

import ldap
from ldap.cidict import cidict as CIDict
from ldap.ldapobject import LDAPObject

if len(sys.argv) > 1 and sys.argv[1] == "--check":
    raise SystemExit(0)


def get_digits(h, digits):
    offset = h[19] & 15
    number = struct.unpack(">I", h[offset:offset+4])[0] & 0x7fffffff
    number %= (10 ** digits)
    return ("%0*d" % (digits, number)).encode()


def get_hotp_token(secret, interval_no):
    msg = struct.pack(">Q", interval_no)
    h = hmac.new(secret, msg, hashlib.sha1).digest()
    return get_digits(bytearray(h), 6)


def get_interval(period=30):
    return int(time.time() // period)


def get_token_for(connection, dn, typ="totp"):
    result = connection.search_s(dn, ldap.SCOPE_BASE)
    dn, attrs = result[0]
    attrs = CIDict(attrs)

    tokendn = attrs['oath'+typ+'token'][0].decode()

    result = connection.search_s(tokendn, ldap.SCOPE_BASE)
    dn, attrs = result[0]
    attrs = CIDict(attrs)

    return dn, attrs


def main():
    uri = os.environ["URI1"]

    managerdn = os.environ['MANAGERDN']
    passwd = os.environ['PASSWD']

    babsdn = os.environ['BABSDN']
    babspw = b"bjensen"

    bjornsdn = os.environ['BJORNSDN']
    bjornspw = b"bjorn"

    connection = LDAPObject(uri)

    start = time.time()
    connection.bind_s(managerdn, passwd)
    end = time.time()

    if end - start > 1:
        print("It takes more than a second to connect and bind, "
              "skipping potentially unstable test", file=sys.stderr)
        raise SystemExit(0)

    dn, token_entry = get_token_for(connection, babsdn)

    paramsdn = token_entry['oathTOTPParams'][0].decode()
    result = connection.search_s(paramsdn, ldap.SCOPE_BASE)
    _, attrs = result[0]
    params = CIDict(attrs)

    secret = token_entry['oathSecret'][0]
    period = int(params['oathTOTPTimeStepPeriod'][0].decode())

    bind_conn = LDAPObject(uri)

    interval_no = get_interval(period)
    token = get_hotp_token(secret, interval_no-3)

    print("Testing old tokens are not useable")
    bind_conn.bind_s(babsdn, babspw+token)
    try:
        bind_conn.bind_s(babsdn, babspw+token)
    except ldap.INVALID_CREDENTIALS:
        pass
    else:
        raise SystemExit("Bind with an old token should have failed")

    interval_no = get_interval(period)
    token = get_hotp_token(secret, interval_no)

    print("Testing token can only be used once")
    bind_conn.bind_s(babsdn, babspw+token)
    try:
        bind_conn.bind_s(babsdn, babspw+token)
    except ldap.INVALID_CREDENTIALS:
        pass
    else:
        raise SystemExit("Bind with a reused token should have failed")

    token = get_hotp_token(secret, interval_no+1)
    try:
        bind_conn.bind_s(babsdn, babspw+token)
    except ldap.INVALID_CREDENTIALS:
        raise SystemExit("Bind should have succeeded")

    dn, token_entry = get_token_for(connection, babsdn)
    last = int(token_entry['oathTOTPLastTimeStep'][0].decode())
    if last != interval_no+1:
        SystemExit("Unexpected counter value %d (expected %d)" %
                   (last, interval_no+1))

    print("Resetting counter and testing secret sharing between accounts")
    connection.modify_s(dn, [(ldap.MOD_REPLACE, 'oathTOTPLastTimeStep', [])])

    interval_no = get_interval(period)
    token = get_hotp_token(secret, interval_no)

    try:
        bind_conn.bind_s(bjornsdn, bjornspw+token)
    except ldap.INVALID_CREDENTIALS:
        raise SystemExit("Bind should have succeeded")

    try:
        bind_conn.bind_s(babsdn, babspw+token)
    except ldap.INVALID_CREDENTIALS:
        pass
    else:
        raise SystemExit("Bind with a reused token should have failed")

    print("Testing token is retired even with a wrong password")
    connection.modify_s(dn, [(ldap.MOD_REPLACE, 'oathTOTPLastTimeStep', [])])

    interval_no = get_interval(period)
    token = get_hotp_token(secret, interval_no)

    try:
        bind_conn.bind_s(babsdn, b"not the password"+token)
    except ldap.INVALID_CREDENTIALS:
        pass
    else:
        raise SystemExit("Bind with an incorrect password should have failed")

    try:
        bind_conn.bind_s(babsdn, babspw+token)
    except ldap.INVALID_CREDENTIALS:
        pass
    else:
        raise SystemExit("Bind with a reused token should have failed")

    token = get_hotp_token(secret, interval_no+1)
    try:
        bind_conn.bind_s(babsdn, babspw+token)
    except ldap.INVALID_CREDENTIALS:
        raise SystemExit("Bind should have succeeded")


if __name__ == "__main__":
    sys.exit(main())