#!/usr/bin/env python # This script requires the pytop module. # You can get it with "pip install pyotp" # Source: https://github.com/pyotp/pyotp # # This trigger will check to see if the userid is in a the LOCL_PASSWD_FILE first and # authenticate with that if found. If the users isn't in the local file, it checks to # see if the user is a service user, and if so, it will authenticate against just the auth server. # Finally, it will check the user's password and authenticator token if the other two conditions # don't match. # # The trigger table entry is: # authtrigger auth-check auth "/p4/common/bin/triggers/vip_authcheckTrigger.py %user% %serverport%" import os import re import ldap import sys import subprocess import pyotp #### Configuration values P4="/p4/common/bin/p4" # authtype can be p4 or ad authtype="p4" # Services users are users that by-pass two factor authentication. SVC_USER_FILE="/p4/common/bin/triggers/serviceusers.txt" # Local users by-pass two factor and external auth # Format is username:password, one per line. LOCAL_PASSWD_FILE="/p4/common/bin/triggers/local.passwd" # These variables are for authtype ad TIMEOUT=20 AD_HOST="ldap://domain.company.com" DOMAIN="MYDOMAIN\\" # These variables are for authtype p4 # You must set up a p4d server for user passwords and keys, if you # are using ad for passwords, your auth server should be your production server. # This user must have at least admin access and an unlimited login timeout. p4admin = os.environ['P4USER'] # This is the Perforce server where user's password and/or keys are stored. authserver = "localhost:1667" # These are user accounts you wish to temporarily block access block_users = [] #### End Configuration # localUserExists # checks to see if the user exists in the local password file. returns True or False def localUserExists(username): exists = False if LOCAL_PASSWD_FILE is not None and os.path.isfile(LOCAL_PASSWD_FILE): f = open(LOCAL_PASSWD_FILE) for line in f: if line.startswith("%s:"%username): exists = True break f.close() return exists # getLocalPassword # retrieves the local password entry (if there is one) for the specified user def getLocalPassword(username): password = None if LOCAL_PASSWD_FILE is not None and os.path.isfile(LOCAL_PASSWD_FILE): f = open(LOCAL_PASSWD_FILE) for line in f: line = line.strip() if line.startswith("%s:"%username): parts = line.split(":",2) password = parts[1] break f.close() return password # checkLDAPPassword # checks the user and password against the LDAP server def checkLDAPPassword(userid, password): # the following is needed to allow Python to accept a non-CA cert #ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER) con = ldap.initialize(AD_HOST) con.set_option(ldap.OPT_NETWORK_TIMEOUT, TIMEOUT) con.set_option(ldap.OPT_TIMEOUT, TIMEOUT) try: # try to bind aduser = con.simple_bind_s(DOMAIN + userid, password) return 0 except Exception as e: # if bind throws an exception, then access is DENIED return 1 # Check P4 Password def checkP4Password(authserver, p4admin, userid, password): password = password[:-6] password = password.replace(r'$',r'\$') try: if re.search(r'"', password): if re.search(r"'", password): print("You cannot have both a \" and a ' in your password.") sys.exit(1) password = password.replace(r'"','\"') return_code = subprocess.call("echo '%s'|p4 -p %s -u %s login" % (password, authserver, userid), shell=True) else: password = password.replace(r"'","\'") return_code = subprocess.call('echo "%s"|p4 -p %s -u %s login' % (password, authserver, userid), shell=True) return return_code except Exception as e: print("Problem with call to auth server. Please contact IT") return 1 # Check token def otpCheck(authserver, p4admin, userid, password): # Get the user's secret key from Perforce try: otpkey = (subprocess.check_output("p4 -p %s -u %s key %s" % (authserver, p4admin, userid), shell=True)).strip() except: print("Error connecting to p4 key server %s." % authserver) sys.exit(1) # Create a time based otp object with the user's key totp = pyotp.TOTP(otpkey) current_key = totp.now() userkey = password[-6:] if(int(current_key) == int(userkey)): return 0 else: return 1 def getsvcusers(serviceusers): svcuserfile = open(SVC_USER_FILE, "r") for line in svcuserfile.readlines(): line = line.lower() serviceusers.append(line.strip()) svcuserfile.close() # doAuthenticate # main routine for performing the authentication logic def doAuthenticate(userid): svcusers = [] # read password from STDIN -- this is passed by a perforce client when the user does a 'p4 login' and is prompted for a password # default behavior is to deny access (return code 1) result=1 svc_user=0 password = sys.stdin.read().strip() if(password == ""): print("Blank password not allowed.") sys.exit(1) # check to see if the user account exists in the local password file # if user exists in local password file, we will not consult LDAP if(localUserExists(userid)): # get the local password, if there is one localPassword = getLocalPassword(userid) # user exists in local password file if(localPassword is None or len(localPassword.strip()) == 0): print("Local password is missing.") else: # retrieved from the local password file if(password == localPassword): result = 0 else: if(authtype == "p4"): passresult = checkP4Password(authserver, p4admin, userid, password) else: passresult = checkLDAPPassword(userid, password) getsvcusers(svcusers) if(userid.lower() in svcusers): svc_user=1 result = passresult else: otpresult = otpCheck(authserver, p4admin, userid, password) if(otpresult or passresult): result = 1 else: result = 0 # now check the result, returning "authentication failed" error if result is not 0 if (result): if (svc_user): print("Invalid login, please use only your password to login.") else: print("Invalid login, you must enter your password and Google Authenticator token to login.") # exit with the result code sys.exit(result) if __name__ == '__main__': if len(sys.argv) < 2: print( "Usage: %s username serverport" % sys.argv[0]) sys.exit(1) userid = sys.argv[1] if userid.lower() in block_users: print("Your account is currently blocked due to an automated process trying to log in.") sys.exit(1) doAuthenticate(userid)