Skip to content

Instantly share code, notes, and snippets.

@jonasfj
Created May 7, 2018 15:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jonasfj/5dc713ea4175da7bd066cfca89826cdc to your computer and use it in GitHub Desktop.
Save jonasfj/5dc713ea4175da7bd066cfca89826cdc to your computer and use it in GitHub Desktop.
import requests
from relengapi.lib.permissions import p
def scope_satisfied(req, scopes):
"""
Returns true, if scopes satisfies req
"""
return any((
s == req or (s.endswith('*') and req.startswith(s[:-1])) for s in scopes
))
def permissions_from_scopes(scopes):
# Find all permissions
all_perms = [perm for (perm, doc) in p]
return [perm for perm in all_perms if has_scope('relengapi:' + perm, scopes)]
class TaskclusterUser(auth.BaseUser):
type = 'taskcluster'
def __init__(self, clientId, scopes):
self._clientId = clientId
self._permissions = permissions_from_scopes(scopes)
def get_id(self):
return self._clientId
def get_permissions(self):
return self._permissions
authBaseUrl = 'https://auth.taskcluster.net/v1'
# Ensure requests are retried up-to 7 times
requests.Session().mount(authBaseUrl, requests.adapters.HTTPAdapter(max_retries=7))
@auth.request_loader
def _taskclusterAuthenticateHawk(request):
authHeader = request.headers.get('Authorization')
# If there is no hawk authorization header we return
if not authHeader or not authHeader.lower().startswith('hawk '):
return
# Find host and port
hostPort = request.host.split(':')
host = hostPort[0]
if request.headers.get('X-Forwarded-Port'):
# Needed if running behind a reverse proxy which terminates HTTPS
port = int(request.headers.get('X-Forwarded-Port'))
elif len(hostPort) > 1:
port = int(hostPort[1])
elif request.scheme == 'https':
port = 443
else
port = 80
# Call authenticateHawk
r = requests.post(authBaseUrl + '/authenticate-hawk', timeout=30, json={
'method': request.method.lower(),
'resource': request.full_path,
'host': host,
'port': port,
'authorization': authHeader,
})
r.raise_for_status()
data = r.json()
# If authentication failed we return a bad request
if data['status'] != 'auth-success':
raise BadRequest('authenticateHawk failed: ' + data['message'])
# If authentication was successful, we return a TaskclusterUser object
return TaskclusterUser(data['clientId'], data['scopes'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment