Source code for bkr.server.authentication

from turbogears.database import session
from turbogears import controllers, expose, flash, widgets, validate, \
        error_handler, validators, redirect, paginate, identity
from turbogears.widgets import AutoCompleteField
from turbogears.identity import IdentityException
from turbogears.identity.saprovider import SqlAlchemyIdentity
from cherrypy import request, response
from kid import Element
from bkr.server.xmlrpccontroller import RPCRoot
from bkr.server.helpers import *
from bkr.server.model import *
from xmlrpclib import ProtocolError
import cherrypy
import time
import re
import logging
import string

log = logging.getLogger(__name__)

__all__ = ['Auth']

def proxy_identity(current_identity, proxy_user_name):
    if 'proxy_auth' not in current_identity.permissions:
        raise IdentityException('%s does not have proxy_auth permission' % current_identity.user_name)
    proxy_user = User.by_user_name(proxy_user_name)
    if not proxy_user:
        log.warning('Attempted to proxy as nonexistent user %s', proxy_user_name)
        return None
    log.info("Associating proxy user (%s) with visit (%s)",
            proxy_user_name, current_identity.visit_key)
    # XXX shouldn't assume a particular implementation class here:
    proxied_identity = SqlAlchemyIdentity(current_identity.visit_key, proxy_user)
    proxied_identity.visit_link.proxied_by_user = current_identity.user
    return proxied_identity

class Auth(RPCRoot):
    # For XMLRPC methods in this class.
    exposed = True

    KRB_AUTH_PRINCIPAL = get("identity.krb_auth_principal")
    KRB_AUTH_KEYTAB = get("identity.krb_auth_keytab")

    @cherrypy.expose
    @identity.require(identity.not_anonymous())
    def who_am_i(self):
        """
        Returns an XML-RPC structure (dict) with information about the 
        currently logged in user.
        Provided for testing purposes.

        .. versionadded:: 0.6.0
        .. versionchanged:: 0.6.1
           Formerly returned only the username.
        """
        retval = {'username': identity.current.user.user_name}
        if identity.current.visit_link.proxied_by_user is not None:
            retval['proxied_by_username'] = identity.current.visit_link.proxied_by_user.user_name
        return retval

    @cherrypy.expose
    def renew_session(self, *args, **kw):
        """
        Renew session, here to support kobo.
        """
        if identity.current.anonymous:
            return True
        return False

    @cherrypy.expose
    def login_password(self, username, password, proxy_user=None):
        """
        Authenticates the current session using the given username and password.

        The caller may act as a proxy on behalf of another user by passing the 
        *proxy_user* argument. This requires that the caller has 'proxy_auth' 
        permission.

        :param proxy_user: username on whose behalf the caller is proxying
        :type proxy_user: string or None
        """
        visit_key = turbogears.visit.current().key
        user = identity.current_provider.validate_identity(username, password, visit_key)
        if user is None:
            raise IdentityException("Invalid username or password")
        if proxy_user:
            proxied_user = proxy_identity(user, proxy_user)
            if proxied_user is None:
                raise IdentityException('Failed to authenticate on behalf of %s' % proxy_user)
        return identity.current.visit_key

    @cherrypy.expose
    def login_krbV(self, krb_request, proxy_user=None):
        """
        Authenticates the current session using Kerberos.

        The caller may act as a proxy on behalf of another user by passing the 
        *proxy_user* argument. This requires that the caller has 'proxy_auth' 
        permission.
        
        :param krb_request: KRB_AP_REQ message containing client credentials, 
            as produced by :c:func:`krb5_mk_req`
        :type krb_request: base64-encoded string
        :param proxy_user: username on whose behalf the caller is proxying
        :type proxy_user: string or None

        This method is also available under the alias :meth:`auth.login_krbv`, 
        for compatibility with `Kobo`_.
        """
        import krbV
        import base64

        context = krbV.default_context()
        server_principal = krbV.Principal(name=self.KRB_AUTH_PRINCIPAL, context=context)
        server_keytab = krbV.Keytab(name=self.KRB_AUTH_KEYTAB, context=context)
    
        auth_context = krbV.AuthContext(context=context)
        auth_context.flags = krbV.KRB5_AUTH_CONTEXT_DO_SEQUENCE | krbV.KRB5_AUTH_CONTEXT_DO_TIME
        auth_context.addrs = (socket.gethostbyname(cherrypy.request.remote_host), 0, cherrypy.request.remote_addr, 0)
    
        # decode and read the authentication request
        decoded_request = base64.decodestring(krb_request)
        auth_context, opts, server_principal, cache_credentials = context.rd_req(decoded_request, server=server_principal, keytab=server_keytab, auth_context=auth_context, options=krbV.AP_OPTS_MUTUAL_REQUIRED)
        cprinc = cache_credentials[2]
    
        # remove @REALM
        username = cprinc.name.split("@")[0]
        visit_key = turbogears.visit.current().key
        user = identity.current_provider.validate_identity(
                user_name=username, password=None,
                visit_key=visit_key, krb=True)
        if user is None:
            raise IdentityException()
        if proxy_user:
            proxied_identity = proxy_identity(user, proxy_user)
            if proxied_identity is None:
                raise IdentityException('Failed to authenticate on behalf of %s' % proxy_user)
        return identity.current.visit_key

    # Alias kerberos login
    login_krbv = login_krbV

    @cherrypy.expose
    def logout(self, *args):
        """
        Invalidates the current session.
        """
        identity.current.logout()
        return True

# this is just a hack for sphinx autodoc
auth = Auth