#!/usr/bin/env python """Validate the SAML response. This script serves as an example of how to validate a SAML response via the SSO trigger mechanism. It is expected to be run by the Perforce server as an auth-check-sso trigger. The one command line argument is the Perforce user name or email address. The SAML response is read from standard input and the validated nameId is compared to the name/email. """ # Copyright and license info is available in the LICENSE file. import argparse import base64 import json import os import six import sys from six.moves.urllib import parse as urlparse from onelogin.saml2.auth import OneLogin_Saml2_Auth from onelogin.saml2.idp_metadata_parser import OneLogin_Saml2_IdPMetadataParser from onelogin.saml2.response import OneLogin_Saml2_Response from onelogin.saml2.settings import OneLogin_Saml2_Settings # # The saml-agent uses chromium which generates its own stdout messages; # the work around is that the agent adds a header for the pertinent data. # SAML_HEADER = 'saml-response: ' COOKIES_HEADER = 'cookies: ' def load_settings(idp_url): """Build out the settings for SAML library.""" # Elaborate process to construct settings that have everything python3-saml # is expecting, and has the correct IdP certificate. if idp_url: settings = OneLogin_Saml2_IdPMetadataParser.parse_remote(idp_url) else: settings = dict() base_path = os.path.dirname(sys.argv[0]) with open(os.path.join(base_path, 'settings.json')) as fobj: settings.update(json.load(fobj)) with open(os.path.join(base_path, 'advanced_settings.json')) as fobj: settings.update(json.load(fobj)) # Set the path for finding the 'certs' directory, otherwise it defaults # to the installation location of the pysaml module. return OneLogin_Saml2_Settings(settings, custom_base_path=base_path) def make_fake_request(settings, response): """Build a fake request object for python3-saml to use.""" sp = settings.get_sp_data() acs_url = sp['assertionConsumerService']['url'] if acs_url is None: sys.stderr.write('missing SP ACS URL in configuration\n') sys.exit(2) result = urlparse.urlparse(acs_url) return { 'server_port': result.port, 'script_name': result.path, 'http_host': result.hostname, 'https': 'off' if result.scheme == 'http' else 'on', 'post_data': { 'SAMLResponse': response } } def encode_nameid(name_id): """Produce a base64 encoded version of name_id.""" if six.PY3: return base64.urlsafe_b64encode(name_id.encode('utf8')).decode('utf8') else: return base64.urlsafe_b64encode(name_id) def load_request_id(base_path, name_id): """Load the previously saved request identifier for this user.""" requests_dir = os.path.expanduser(os.path.join(base_path, 'requests')) key_path = os.path.join(requests_dir, encode_nameid(name_id)) if not os.path.exists(key_path): return None with open(key_path) as fobj: request_id = fobj.read() # remove the file to prevent replay attacks os.unlink(key_path) return request_id def save_cookies(base_path, name_id, cookie_json): """Save the JSON formatted cookies to a file.""" cookies_dir = os.path.expanduser(os.path.join(base_path, 'cookies')) if not os.path.exists(cookies_dir): os.makedirs(cookies_dir) cookie_path = os.path.join(cookies_dir, encode_nameid(name_id)) with open(cookie_path, 'w') as fobj: fobj.write(cookie_json) def save_session_index(base_path, name_id, value): """Save the SAML session index for the logout trigger to use.""" session_dir = os.path.expanduser(os.path.join(base_path, 'sessions')) if not os.path.exists(session_dir): os.makedirs(session_dir) session_path = os.path.join(session_dir, encode_nameid(name_id)) with open(session_path, 'w') as fobj: fobj.write(value) def main(): """Read the SAML response and verify user match.""" parser = argparse.ArgumentParser(description='Validate a SAML response.') parser.add_argument('--idpUrl', help='URL of IdP metadata') parser.add_argument('--base', default='~/saml', help='base path for trigger to store files') parser.add_argument('identifier', help='Perforce user User or Email field value') args = parser.parse_args() # Read the SAML response from standard input. resp_b64 = '' while True: # read until the end of the input so we can get all the data raw_line = sys.stdin.readline() if raw_line == '': break if len(raw_line.strip()) == 0: # skip empty lines continue if raw_line.startswith(SAML_HEADER): try: resp_b64 = raw_line[len(SAML_HEADER):] # python3-saml expects the response to be base64 encoded; do # a quick sanity check to avoid getting an odd exception. base64.b64decode(resp_b64) except TypeError: sys.stderr.write('input must contain base64 encoded SAML response\n') sys.exit(2) elif raw_line.startswith(COOKIES_HEADER): save_cookies(args.base, args.identifier, raw_line[len(COOKIES_HEADER):]) if resp_b64 == '': sys.stderr.write("expected SAML response prefixed with '{0}'\n".format(SAML_HEADER)) sys.exit(2) settings = load_settings(args.idpUrl) try: fake_request = make_fake_request(settings, resp_b64) auth = OneLogin_Saml2_Auth(fake_request, settings) request_id = load_request_id(args.base, args.identifier) auth.process_response(request_id=request_id) if auth.is_authenticated(): if auth.get_nameid() == args.identifier: save_session_index(args.base, args.identifier, auth.get_session_index()) sys.exit(0) sys.stderr.write('name_id mismatch\n') else: sys.stderr.write('error: {}\n'.format('\n'.join(auth.get_errors()))) sys.stderr.write('reason: {}\n'.format(auth.get_last_error_reason())) sys.exit(1) except Exception as e: sys.stderr.write('{err}\n'.format(err=e)) sys.exit(1) if __name__ == "__main__": main()