#!/usr/bin/env python
"""Validate the SAML response.
This script serves as an example of how to validate a SAML response via the
SSO trigger mechanism. It is expected to be run by the Perforce server as an
auth-check-sso trigger. The one command line argument is the Perforce user
name or email address. The SAML response is read from standard input and the
validated nameId is compared to the name/email.
"""
# Copyright and license info is available in the LICENSE file.
import argparse
import base64
import json
import os
import six
import sys
import time
from six.moves.urllib import parse as urlparse
from onelogin.saml2.auth import OneLogin_Saml2_Auth
from onelogin.saml2.idp_metadata_parser import OneLogin_Saml2_IdPMetadataParser
from onelogin.saml2.response import OneLogin_Saml2_Response
from onelogin.saml2.settings import OneLogin_Saml2_Settings
#
# The saml-agent uses chromium which generates its own stdout messages;
# the work around is that the agent adds a header for the pertinent data.
#
SAML_HEADER = 'saml-response: '
COOKIES_HEADER = 'cookies: '
def load_settings(idp_url):
"""Build out the settings for SAML library."""
# Elaborate process to construct settings that have everything python3-saml
# is expecting, and has the correct IdP certificate.
if idp_url:
retries = 0
while retries < 3:
try:
settings = OneLogin_Saml2_IdPMetadataParser.parse_remote(idp_url)
break
except:
time.sleep(1)
retries += 1
else:
settings = dict()
base_path = os.path.dirname(sys.argv[0])
with open(os.path.join(base_path, 'settings.json')) as fobj:
settings.update(json.load(fobj))
with open(os.path.join(base_path, 'advanced_settings.json')) as fobj:
settings.update(json.load(fobj))
# Set the path for finding the 'certs' directory, otherwise it defaults
# to the installation location of the pysaml module.
return OneLogin_Saml2_Settings(settings, custom_base_path=base_path)
def make_fake_request(settings, response):
"""Build a fake request object for python3-saml to use."""
sp = settings.get_sp_data()
acs_url = sp['assertionConsumerService']['url']
if acs_url is None:
sys.stderr.write('missing SP ACS URL in configuration\n')
sys.exit(2)
result = urlparse.urlparse(acs_url)
return {
'server_port': result.port,
'script_name': result.path,
'http_host': result.hostname,
'https': 'off' if result.scheme == 'http' else 'on',
'post_data': {
'SAMLResponse': response
}
}
def encode_nameid(name_id):
"""Produce a base64 encoded version of name_id."""
if six.PY3:
return base64.urlsafe_b64encode(name_id.encode('utf8')).decode('utf8')
else:
return base64.urlsafe_b64encode(name_id)
def load_request_id(base_path, name_id):
"""Load the previously saved request identifier for this user."""
requests_dir = os.path.expanduser(os.path.join(base_path, 'requests'))
key_path = os.path.join(requests_dir, encode_nameid(name_id))
if not os.path.exists(key_path):
return None
with open(key_path) as fobj:
request_id = fobj.read()
# remove the file to prevent replay attacks
os.unlink(key_path)
return request_id
def save_cookies(base_path, name_id, cookie_json):
"""Save the JSON formatted cookies to a file."""
cookies_dir = os.path.expanduser(os.path.join(base_path, 'cookies'))
if not os.path.exists(cookies_dir):
os.makedirs(cookies_dir)
cookie_path = os.path.join(cookies_dir, encode_nameid(name_id))
with open(cookie_path, 'w') as fobj:
fobj.write(cookie_json)
def save_session_index(base_path, name_id, value):
"""Save the SAML session index for the logout trigger to use."""
session_dir = os.path.expanduser(os.path.join(base_path, 'sessions'))
if not os.path.exists(session_dir):
os.makedirs(session_dir)
session_path = os.path.join(session_dir, encode_nameid(name_id))
with open(session_path, 'w') as fobj:
fobj.write(value)
def main():
"""Read the SAML response and verify user match."""
parser = argparse.ArgumentParser(description='Validate a SAML response.')
parser.add_argument('--idpUrl', help='URL of IdP metadata')
parser.add_argument('--base', default='~/saml', help='base path for trigger to store files')
parser.add_argument('--no-id', action='store_true', help='disable request identifier verification')
parser.add_argument('identifier', help='Perforce user User or Email field value')
args = parser.parse_args()
# Read the SAML response from standard input.
resp_b64 = ''
while True:
# read until the end of the input so we can get all the data
raw_line = sys.stdin.readline()
if raw_line == '':
break
if len(raw_line.strip()) == 0:
# skip empty lines
continue
if raw_line.startswith(SAML_HEADER):
try:
resp_b64 = raw_line[len(SAML_HEADER):]
# python3-saml expects the response to be base64 encoded; do
# a quick sanity check to avoid getting an odd exception.
base64.b64decode(resp_b64)
except TypeError:
sys.stderr.write('input must contain base64 encoded SAML response\n')
sys.exit(2)
elif raw_line.startswith(COOKIES_HEADER):
save_cookies(args.base, args.identifier, raw_line[len(COOKIES_HEADER):])
if resp_b64 == '':
sys.stderr.write("expected SAML response prefixed with '{0}'\n".format(SAML_HEADER))
sys.exit(2)
settings = load_settings(args.idpUrl)
try:
fake_request = make_fake_request(settings, resp_b64)
auth = OneLogin_Saml2_Auth(fake_request, settings)
if args.no_id:
request_id = None
else:
request_id = load_request_id(args.base, args.identifier)
auth.process_response(request_id=request_id)
if auth.is_authenticated():
if auth.get_nameid() == args.identifier:
save_session_index(args.base, args.identifier, auth.get_session_index())
sys.exit(0)
sys.stderr.write('name_id mismatch\n')
else:
sys.stderr.write('error: {}\n'.format('\n'.join(auth.get_errors())))
sys.stderr.write('reason: {}\n'.format(auth.get_last_error_reason()))
sys.exit(1)
except Exception as e:
sys.stderr.write('{err}\n'.format(err=e))
sys.exit(1)
if __name__ == "__main__":
main()
| # | Change | User | Description | Committed | |
|---|---|---|---|---|---|
| #5 | 25196 | Nathan Fiedler | Retry and then raise the error if unsuccesful. | ||
| #4 | 25195 | Nathan Fiedler |
Retry IdP metadata fetch a couple of times. To address empheral errors when retrieving the IdP metadata from the remote side, retry the request 3 times with a 1 second pause inbetween. |
||
| #3 | 24953 | Nathan Fiedler |
Add note about Swarm, and flag to disable ID verification. When using the SAML triggers with Swarm, it is necessary to add the --no-id option to the saml_validate.py invocation, which disables the request identifier verfication. This is because the current release of Swarm generates the login URL, with a different request identifier, and the trigger cannot validate the response. |
||
| #2 | 24933 | Nathan Fiedler | Add copyright and license details for SAML triggers. | ||
| #1 | 24785 | Nathan Fiedler | Add SAML triggers. |