Source code for yubico_client.yubico

# -*- coding: utf-8 -*-
# Name: Yubico Python Client
# Description: Python class for verifying Yubico One Time Passwords (OTPs).
# Author: Tomaz Muraus (
# License: BSD
# Copyright (c) 2010-2019, Tomaž Muraus
# Copyright (c) 2012, Yubico AB
# All rights reserved.

import re
import os
import sys
import time
import hmac
import base64
import hashlib
import threading
import logging

import requests

from yubico_client import __version__
from yubico_client.otp import OTP
from yubico_client.yubico_exceptions import (StatusCodeError,
from yubico_client.py3 import b
from yubico_client.py3 import urlencode
from yubico_client.py3 import unquote

logger = logging.getLogger('yubico.client')

# Path to the standard CA bundle file locations for most of the operating
# systems


# How long to wait before the time out occurs

# How many seconds can pass between the first and last OTP generation so the
# OTP is still considered valid

                    'BACKEND_ERROR', 'NOT_ENOUGH_ANSWERS',

CLIENT_VERSION = '.'.join([str(part) for part in __version__])
PYTHON_VERSION = '%s.%s.%s' % (sys.version_info[0], sys.version_info[1],

[docs]class Yubico(object): # pylint: disable=too-many-instance-attributes def __init__(self, client_id, key=None, verify_cert=True, translate_otp=True, api_urls=DEFAULT_API_URLS, ca_certs_bundle_path=None, max_retries=3, retry_delay=0.5): """ :param max_retries: Number of times to try to retry the request if server returns 5xx status code. :type max_retries: ``int`` :param retry_delay: How long to wait (in seconds) beteween each retry attempt. :param retry_delay: ``float`` """ if ca_certs_bundle_path and \ not self._is_valid_ca_bundle_file(ca_certs_bundle_path): raise ValueError('Invalid value provided for ca_certs_bundle_path' ' argument') self.client_id = client_id if key is not None: key = base64.b64decode(key.encode('ascii')) self.key = key self.verify_cert = verify_cert self.translate_otp = translate_otp self.api_urls = self._init_request_urls(api_urls=api_urls) self.ca_certs_bundle_path = ca_certs_bundle_path self.max_retries = max_retries self.retry_delay = retry_delay
[docs] def verify(self, otp, timestamp=False, sl=None, timeout=None, return_response=False): """ Verify a provided OTP. :param otp: OTP to verify. :type otp: ``str`` :param timestamp: True to include request timestamp and session counter in the response. Defaults to False. :type timestamp: ``bool`` :param sl: A value indicating percentage of syncing required by client. :type sl: ``int`` or ``str`` :param timeout: Number of seconds to wait for sync responses. :type timeout: ``int`` :param return_response: True to return a response object instead of the status code. Defaults to False. :type return_response: ``bool`` :return: True is the provided OTP is valid, False if the REPLAYED_OTP status value is returned or the response message signature verification failed and None for the rest of the status values. """ ca_bundle_path = self._get_ca_bundle_path() otp = OTP(otp, self.translate_otp) rand_str = b(os.urandom(30)) nonce = base64.b64encode(rand_str, b('xz'))[:25].decode('utf-8') query_string = self.generate_query_string(otp.otp, nonce, timestamp, sl, timeout) threads = [] timeout = timeout or DEFAULT_TIMEOUT for url in self.api_urls: thread = URLThread(url='%s?%s' % (url, query_string), timeout=timeout, verify_cert=self.verify_cert, ca_bundle_path=ca_bundle_path, max_retries=self.max_retries, retry_delay=self.retry_delay) thread.start() threads.append(thread) # Wait for a first positive or negative response start_time = time.time() # If there's only one server to talk to, raise thread exceptions. # Otherwise we end up ignoring a good answer from a different # server later. raise_exceptions = (len(threads) == 1) # pylint: disable=too-many-nested-blocks while threads and (start_time + timeout) > time.time(): for thread in threads: if not thread.is_alive(): if thread.exception and raise_exceptions: raise thread.exception elif thread.response: status = self.verify_response(thread.response, otp.otp, nonce, return_response) if status: # pylint: disable=no-else-return if return_response: return status else: return True threads.remove(thread) time.sleep(0.1) # Timeout or no valid response received raise Exception('NO_VALID_ANSWERS')
[docs] def verify_multi(self, otp_list, max_time_window=DEFAULT_MAX_TIME_WINDOW, sl=None, timeout=None): """ Verify a provided list of OTPs. :param max_time_window: Maximum number of seconds which can pass between the first and last OTP generation for the OTP to still be considered valid. :type max_time_window: ``int`` """ # Create the OTP objects otps = [] for otp in otp_list: otps.append(OTP(otp, self.translate_otp)) if len(otp_list) < 2: raise ValueError('otp_list needs to contain at least two OTPs') device_ids = set() for otp in otps: device_ids.add(otp.device_id) # Check that all the OTPs contain same device id if len(device_ids) != 1: raise Exception('OTPs contain different device ids') # Now we verify the OTPs and save the server response for each OTP. # We need the server response, to retrieve the timestamp. # It's possible to retrieve this value locally, without querying the # server but in this case, user would need to provide his AES key. for otp in otps: response = self.verify(otp.otp, True, sl, timeout, return_response=True) if not response: return False otp.timestamp = int(response['timestamp']) count = len(otps) delta = otps[count - 1].timestamp - otps[0].timestamp # OTPs have an 8Hz timestamp counter so we need to divide it to get # seconds delta = delta / 8 if delta < 0: raise Exception('delta is smaller than zero. First OTP appears to ' 'be older than the last one') if delta > max_time_window: raise Exception('More than %s seconds have passed between ' 'generating the first and the last OTP.' % (max_time_window)) return True
[docs] def verify_response(self, response, otp, nonce, return_response=False): """ Returns True if the OTP is valid (status=OK) and return_response=False, otherwise (return_response = True) it returns the server response as a dictionary. Throws an exception if the OTP is replayed, the server response message verification failed or the client id is invalid, returns False otherwise. """ try: status ='status=([A-Z0-9_]+)', response) \ .groups() if len(status) > 1: message = 'More than one status= returned. Possible attack!' raise InvalidValidationResponse(message, response) status = status[0] except (AttributeError, IndexError): return False signature, parameters = \ self.parse_parameters_from_response(response) # Secret key is specified, so we verify the response message # signature if self.key: generated_signature = \ self.generate_message_signature(parameters) # Signature located in the response does not match the one we # have generated if signature != generated_signature: logger.warning("signature mismatch for parameters=%r", parameters) raise SignatureVerificationError(generated_signature, signature) param_dict = self.get_parameters_as_dictionary(parameters) if 'otp' in param_dict and param_dict['otp'] != otp: message = 'Unexpected OTP in response. Possible attack!' raise InvalidValidationResponse(message, response, param_dict) if 'nonce' in param_dict and param_dict['nonce'] != nonce: message = 'Unexpected nonce in response. Possible attack!' raise InvalidValidationResponse(message, response, param_dict) if status == 'OK': if return_response: # pylint: disable=no-else-return return param_dict else: return True elif status == 'NO_SUCH_CLIENT': raise InvalidClientIdError(self.client_id) elif status == 'REPLAYED_OTP': raise StatusCodeError(status) return False
[docs] def generate_query_string(self, otp, nonce, timestamp=False, sl=None, timeout=None): """ Returns a query string which is sent to the validation servers. """ data = [('id', self.client_id), ('otp', otp), ('nonce', nonce)] if timestamp: data.append(('timestamp', '1')) if sl is not None: if sl not in range(0, 101) and sl not in ['fast', 'secure']: raise Exception('sl parameter value must be between 0 and ' '100 or string "fast" or "secure"') data.append(('sl', sl)) if timeout: data.append(('timeout', timeout)) query_string = urlencode(data) if self.key: hmac_signature = self.generate_message_signature(query_string) query_string += '&h=%s' % (hmac_signature.replace('+', '%2B')) return query_string
[docs] def generate_message_signature(self, query_string): """ Returns a HMAC-SHA-1 signature for the given query string. """ # split for sorting pairs = query_string.split('&') pairs = [pair.split('=', 1) for pair in pairs] pairs_sorted = sorted(pairs) pairs_string = '&' . join(['=' . join(pair) for pair in pairs_sorted]) digest =, b(pairs_string), hashlib.sha1).digest() signature = base64.b64encode(digest).decode('utf-8') return signature
[docs] def parse_parameters_from_response(self, response): """ Returns a response signature and query string generated from the server response. 'h' aka signature argument is stripped from the returned query string. """ lines = response.splitlines() pairs = [line.strip().split('=', 1) for line in lines if '=' in line] pairs = sorted(pairs) signature = ([unquote(v) for k, v in pairs if k == 'h'] or [None])[0] # already quoted query_string = '&' . join([k + '=' + v for k, v in pairs if k != 'h']) return (signature, query_string)
[docs] def get_parameters_as_dictionary(self, query_string): """ Returns query string parameters as a dictionary. """ pairs = (x.split('=', 1) for x in query_string.split('&')) return dict((k, unquote(v)) for k, v in pairs)
def _init_request_urls(self, api_urls): """ Returns a list of the API URLs. """ if not isinstance(api_urls, (str, list, tuple)): raise TypeError('api_urls needs to be string or iterable!') if isinstance(api_urls, str): api_urls = (api_urls,) api_urls = list(api_urls) for url in api_urls: if not url.startswith('http://') and \ not url.startswith('https://'): raise ValueError('URL "%s" contains an invalid or missing' ' scheme' % (url)) return list(api_urls) def _get_ca_bundle_path(self): """ Return a path to the CA bundle which is used for verifying the hosts SSL certificate. """ if self.ca_certs_bundle_path: # User provided a custom path return self.ca_certs_bundle_path # Return first bundle which is available for file_path in COMMON_CA_LOCATIONS: if self._is_valid_ca_bundle_file(file_path=file_path): return file_path return None def _is_valid_ca_bundle_file(self, file_path): return os.path.exists(file_path) and os.path.isfile(file_path)
class URLThread(threading.Thread): # pylint: disable=too-many-instance-attributes def __init__(self, url, timeout, verify_cert, ca_bundle_path=None, max_retries=3, retry_delay=0.5): super(URLThread, self).__init__() self.url = url self.timeout = timeout self.verify_cert = verify_cert self.ca_bundle_path = ca_bundle_path self.max_retries = max_retries self.retry_delay = retry_delay self.exception = None self.request = None self.response = None def run(self): logger.debug('Sending HTTP request to %s (thread=%s)' % (self.url, verify = self.verify_cert if self.ca_bundle_path is not None: verify = self.ca_bundle_path logger.debug('Using custom CA bunde: %s' % (self.ca_bundle_path)) headers = { 'User-Agent': ('yubico-python-client/%s (Python v%s)' % (CLIENT_VERSION, PYTHON_VERSION)) } try: retry = 0 done = False while retry < self.max_retries and not done: retry += 1 self.request = requests.get( url=self.url, timeout=self.timeout, verify=verify, headers=headers ) status_code = self.request.status_code args = (status_code, self.url, logger.debug('HTTP %d from %s (thread=%s)' % (args)) if status_code in (500, 502, 503, 504): logger.debug('Retrying HTTP request (attempt_count=%s,' 'max_retries=%s)' % (retry, self.max_retries)) time.sleep(self.retry_delay) else: done = True self.response = self.request.content.decode("utf-8") except requests.exceptions.SSLError: e = sys.exc_info()[1] args = (self.url,, str(e)) logger.error('SSL error talking to %s (thread=%s): %s' % (args)) self.exception = e self.response = None except Exception: # pylint: disable=broad-except e = sys.exc_info()[1] logger.error('Failed to retrieve response: %s' % (str(e))) self.response = None args = (self.url,, self.response) logger.debug('Received response from %s (thread=%s): %s' % (args))