OpenStack: generalize to support multiple auth protocols, and add V2 protocol.

Signed-off-by: David-Sarah Hopwood <david-sarah@jacaranda.org>
This commit is contained in:
David-Sarah Hopwood 2013-02-20 06:21:25 +00:00 committed by Daira Hopwood
parent f311418382
commit fa2cf092e3
3 changed files with 194 additions and 89 deletions

View File

@ -10,10 +10,9 @@ from twisted.internet.protocol import Protocol
from twisted.web.client import Agent, FileBodyProducer, ResponseDone
from twisted.web.http_headers import Headers
from zope.interface import implements
from zope.interface import implements, Interface
from allmydata.util import log
from allmydata.util.assertutil import _assert
from allmydata.node import InvalidValueError
from allmydata.storage.backends.cloud.cloud_common import IContainer, \
CloudServiceError, ContainerItem, ContainerListing, ContainerRetryMixin
@ -22,10 +21,12 @@ from allmydata.storage.backends.cloud.cloud_common import IContainer, \
# Enabling this will cause secrets to be logged.
UNSAFE_DEBUG = False
#AUTH_PATH = "v1.0"
AUTH_PATH = "v2.0/tokens"
DEFAULT_AUTH_URLS = {
"rackspace.com": "https://identity.api.rackspacecloud.com/v1.0",
"rackspace.co.uk": "https://lon.identity.api.rackspacecloud.com/v1.0",
"rackspace.com": "https://identity.api.rackspacecloud.com/" + AUTH_PATH,
"rackspace.co.uk": "https://lon.identity.api.rackspacecloud.com/" + AUTH_PATH,
}
USER_AGENT = "Tahoe-LAFS OpenStack client"
@ -42,22 +43,17 @@ def configure_openstack_container(storedir, config):
container_name = config.get_config("storage", "openstack.container")
reauth_period = 23*60*60 #seconds
auth_client = AuthenticationClient(api_key, provider, auth_service_url, username, reauth_period)
AuthenticatorClass = {"v1.0": AuthenticatorV1, "v2.0/tokens": AuthenticatorV2}[AUTH_PATH]
authenticator = AuthenticatorClass(auth_service_url, username, api_key)
auth_client = AuthenticationClient(authenticator, reauth_period)
return OpenStackContainer(auth_client, container_name)
class UnexpectedAuthenticationResponse(Exception):
def __init__(self, msg, response_code, response_headers):
Exception.__init__(self, msg)
self.response_code = response_code
self.response_headers = response_headers
class AuthenticationInfo(object):
def __init__(self, storage_url, cdn_management_url, auth_token):
self.storage_url = storage_url
self.cdn_management_url = cdn_management_url
def __init__(self, auth_token, public_storage_url, internal_storage_url=None):
self.auth_token = auth_token
self.public_storage_url = public_storage_url
self.internal_storage_url = internal_storage_url
def _http_request(what, agent, method, url, request_headers, body=None, need_response_body=False):
@ -104,28 +100,123 @@ def _get_header(response, name):
return hs[0]
class AuthenticationClient(object):
class IAuthenticator(Interface):
def make_auth_request():
"""Returns (method, url, headers, body, need_response_body)."""
def parse_auth_response(response, body):
"""Returns AuthenticationInfo."""
class AuthenticatorV1(object):
implements(IAuthenticator)
"""
I implement a client for the Rackspace authentication service.
It is not clear whether this is also implemented by other OpenStack providers.
Authenticates according to V1 protocol as documented by Rackspace:
<http://docs.rackspace.com/files/api/v1/cf-devguide/content/Authentication-d1e639.html>.
"""
def __init__(self, api_key, provider, auth_service_url, username, reauth_period, override_reactor=None):
self._api_key = api_key
def __init__(self, auth_service_url, username, api_key):
self._auth_service_url = auth_service_url
self._username = username
self._api_key = api_key
def make_auth_request(self):
request_headers = {
'X-Auth-User': [self._username],
'X-Auth-Key': [self._api_key],
}
return ('GET', self._auth_service_url, request_headers, None, False)
def parse_auth_response(self, response, body):
auth_token = _get_header(response, 'X-Auth-Token')
storage_url = _get_header(response, 'X-Storage-Url')
#cdn_management_url = _get_header(response, 'X-CDN-Management-Url')
return AuthenticationInfo(auth_token, storage_url)
class AuthenticatorV2(object):
implements(IAuthenticator)
"""
Authenticates according to V2 protocol as documented by Rackspace:
<http://docs.rackspace.com/auth/api/v2.0/auth-client-devguide/content/POST_authenticate_v2.0_tokens_.html>.
"""
def __init__(self, auth_service_url, username, api_key):
self._auth_service_url = auth_service_url
self._username = username
self._api_key = api_key
#self._password = password
def make_auth_request(self):
# I suspect that 'RAX-KSKEY:apiKeyCredentials' is Rackspace-specific.
request = {
'auth': {
# 'passwordCredentials': {
# 'username': self._username,
# 'password': self._password,
# }
'RAX-KSKEY:apiKeyCredentials': {
'username': self._username,
'apiKey': self._api_key,
}
}
}
json = simplejson.dumps(request)
request_headers = {
'Content-Type': ['application/json'],
}
return ('POST', self._auth_service_url, request_headers, json, True)
def parse_auth_response(self, response, body):
try:
decoded_body = simplejson.loads(body)
except simplejson.decoder.JSONDecodeError, e:
raise CloudServiceError(None, response.code,
message="could not decode auth response: %s" % (e,))
try:
# Scrabble around in the annoyingly complicated response body for the credentials we need.
access = decoded_body['access']
token = access['token']
auth_token = token['id']
user = access['user']
default_region = user.get('RAX-AUTH:defaultRegion', '')
serviceCatalog = access['serviceCatalog']
for service in serviceCatalog:
if service['type'] == 'object-store':
endpoints = service['endpoints']
for endpoint in endpoints:
if not default_region or endpoint['region'] == default_region:
public_storage_url = endpoint['publicURL']
internal_storage_url = endpoint['internalURL']
return AuthenticationInfo(auth_token, public_storage_url, internal_storage_url)
except KeyError, e:
raise CloudServiceError(None, response.code,
message="missing field in auth response: %s" % (e,))
raise CloudServiceError(None, response.code,
message="could not find a suitable storage endpoint in auth response")
class AuthenticationClient(object):
"""
I implement a generic authentication client.
The construction of the auth request and parsing of the response is delegated to an authenticator.
"""
def __init__(self, authenticator, reauth_period, override_reactor=None):
self._authenticator = authenticator
self._reauth_period = reauth_period
self._reactor = override_reactor or reactor
self._agent = Agent(self._reactor)
self._delayed = None
_assert(provider.startswith("rackspace"), provider=provider)
self._authenticate = self._authenticate_to_rackspace
self._shutdown = False
# Not authorized yet.
self._auth_info = None
self._auth_lock = defer.DeferredLock()
d = self.get_auth_info()
d.addBoth(lambda ign: None)
self._reauthenticate()
def get_auth_info(self):
# It is intentional that this returns the previous auth_info while a reauthentication is in progress.
@ -137,44 +228,21 @@ class AuthenticationClient(object):
def get_auth_info_locked(self, suppress_errors=False):
d = self._auth_lock.run(self._authenticate)
d.addCallback(lambda ign: self._auth_info)
if suppress_errors:
d.addErrback(lambda ign: self._auth_info)
return d
def _authenticate_to_rackspace(self, ign=None):
# <http://docs.rackspace.com/files/api/v1/cf-devguide/content/Authentication-d1e639.html>
def _authenticate(self):
(method, url, request_headers, body, need_response_body) = self._authenticator.make_auth_request()
# Agent.request adds a Host header automatically based on the URL.
request_headers = {
'User-Agent': ['Tahoe-LAFS authentication client'],
'X-Auth-User': [self._username],
'X-Auth-Key': [self._api_key],
}
log.msg(format="OpenStack auth GET %(url)s %(headers)s",
url=self._auth_service_url, headers=repr(request_headers), level=log.OPERATIONAL)
d = defer.succeed(None)
d.addCallback(lambda ign: self._agent.request('GET', self._auth_service_url, Headers(request_headers), None))
def _got_response(response):
log.msg(format="OpenStack auth response: %(code)d %(phrase)s",
code=response.code, phrase=response.phrase, level=log.OPERATIONAL)
_check_response_code(response)
def _get_header(name):
hs = response.headers.getRawHeaders(name)
if len(hs) == 0:
raise UnexpectedAuthenticationResponse("missing response header %r" % (name,),
response.code, response.headers)
return hs[0]
storage_url = _get_header('X-Storage-Url')
cdn_management_url = _get_header('X-CDN-Management-Url')
auth_token = _get_header('X-Auth-Token')
d = _http_request("auth", self._agent, method, url, request_headers, body, need_response_body)
def _got_response( (response, body) ):
self._auth_info = self._authenticator.parse_auth_response(response, body)
if UNSAFE_DEBUG:
print "Auth response is %s %s %s" % (storage_url, cdn_management_url, auth_token)
self._auth_info = AuthenticationInfo(storage_url, cdn_management_url, auth_token)
print "Auth response is %s %s" % (self._auth_info.auth_token, self._auth_info.public_storage_url)
self._delayed = self._reactor.callLater(self._reauth_period, self.get_auth_info_locked, suppress_errors=True)
if not self._shutdown:
if self._delayed:
self._delayed.cancel()
self._delayed = self._reactor.callLater(self._reauth_period, self._reauthenticate)
d.addCallback(_got_response)
def _failed(f):
self._auth_info = None
@ -184,8 +252,15 @@ class AuthenticationClient(object):
d.addErrback(_failed)
return d
def _reauthenticate(self):
self._delayed = None
d = self.get_auth_info_locked()
d.addBoth(lambda ign: None)
return d
def shutdown(self):
"""Used by unit tests to avoid unclean reactor errors."""
self._shutdown = True
if self._delayed:
self._delayed.cancel()
@ -208,7 +283,7 @@ class DataCollector(Protocol):
if reason.check(ResponseDone):
eventually_callback(self._done)("".join(self._data))
else:
def _failed(): raise CloudServiceError(reason.getErrorMessage())
def _failed(): raise CloudServiceError(None, 0, message=reason.getErrorMessage())
eventually_errback(self._done)(defer.execute(_failed))
def when_done(self):
@ -230,10 +305,11 @@ class OpenStackContainer(ContainerRetryMixin):
return ("<%s %r>" % (self.__class__.__name__, self._container_name,))
def _make_container_url(self, auth_info):
return "%s/%s" % (auth_info.storage_url, urllib.quote(self._container_name, safe=''))
return "%s/%s" % (auth_info.public_storage_url, urllib.quote(self._container_name, safe=''))
def _make_object_url(self, auth_info, object_name):
return "%s/%s/%s" % (auth_info.storage_url, urllib.quote(self._container_name, safe=''), urllib.quote(object_name))
return "%s/%s/%s" % (auth_info.public_storage_url, urllib.quote(self._container_name, safe=''),
urllib.quote(object_name))
def _create(self):
"""
@ -329,7 +405,19 @@ class OpenStackContainer(ContainerRetryMixin):
"""
Retrieve object metadata only.
"""
raise NotImplementedError
d = self._auth_client.get_auth_info()
def _do_head(auth_info):
request_headers = {
'X-Auth-Token': [auth_info.auth_token],
}
url = self._make_object_url(auth_info, object_name)
return _http_request("head object", self._agent, 'HEAD', url, request_headers)
d.addCallback(_do_head)
def _got_head_response( (response, body) ):
print response
raise NotImplementedError
d.addCallback(_got_head_response)
return d
def _delete_object(self, object_name):
"""

View File

@ -254,9 +254,11 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase):
"s3.bucket = test\n")
self.failUnlessRaises(MissingConfigEntry, client.Client, basedir)
@mock.patch('allmydata.storage.backends.cloud.openstack.openstack_container.AuthenticatorV2')
@mock.patch('allmydata.storage.backends.cloud.openstack.openstack_container.AuthenticationClient')
@mock.patch('allmydata.storage.backends.cloud.openstack.openstack_container.OpenStackContainer')
def test_openstack_config_good_defaults(self, mock_OpenStackContainer, mock_AuthenticationClient):
def test_openstack_config_good_defaults(self, mock_OpenStackContainer, mock_AuthenticationClient,
mock_Authenticator):
basedir = "client.Basic.test_openstack_config_good_defaults"
os.mkdir(basedir)
self._write_secret(basedir, "openstack_api_key")
@ -270,9 +272,11 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase):
fileutil.write(os.path.join(basedir, "tahoe.cfg"), config)
c = client.Client(basedir)
mock_AuthenticationClient.assert_called_with("dummy", "rackspace.com",
"https://identity.api.rackspacecloud.com/v1.0",
"alex", 23*60*60)
mock_Authenticator.assert_called_with("https://identity.api.rackspacecloud.com/v2.0/tokens",
"alex", "dummy")
authclient_call_args = mock_AuthenticationClient.call_args_list
self.failUnlessEqual(len(authclient_call_args), 1)
self.failUnlessEqual(authclient_call_args[0][0][1:], (23*60*60,))
container_call_args = mock_OpenStackContainer.call_args_list
self.failUnlessEqual(len(container_call_args), 1)
self.failUnlessEqual(container_call_args[0][0][1:], ("test",))

View File

@ -413,8 +413,8 @@ class OpenStackCloudBackend(ServiceParentMixin, WorkdirMixin, ShouldFailMixin, u
USERNAME = "username"
CONTAINER = "container"
API_KEY = "api_key"
STORAGE_URL = "https://storage.example/a"
CDN_MANAGEMENT_URL = "https://cdn.example/a"
PUBLIC_STORAGE_URL = "https://public.storage.example/a"
INTERNAL_STORAGE_URL = "https://internal.storage.example/a"
AUTH_TOKEN = "auth_token"
TEST_SHARE_PREFIX = "shares/te/"
@ -450,17 +450,18 @@ class OpenStackCloudBackend(ServiceParentMixin, WorkdirMixin, ShouldFailMixin, u
self.failUnlessIsInstance(headers, Headers)
for (key, values) in expected_headers.iteritems():
self.failUnlessEqual(headers.getRawHeaders(key), values)
self.failUnlessEqual(headers.getRawHeaders(key), values, str((headers, key)))
d = defer.succeed(None)
if bodyProducer is None:
self.failUnlessEqual(expected_body, "")
else:
self.failUnless(IBodyProducer.providedBy(bodyProducer))
self.failUnlessIn(bodyProducer.length, (len(expected_body), UNKNOWN_LENGTH))
body = StringIO()
d = bodyProducer.startProducing(FileConsumer(body))
d.addCallback(lambda ign: self.failUnlessEqual(body.getvalue(), expected_body))
d.addCallback(lambda ign: self.failUnlessIn(bodyProducer.length,
(len(expected_body), UNKNOWN_LENGTH)))
d.addCallback(lambda ign: MockResponse(response_code, response_phrase, response_headers, response_body))
return d
@ -474,15 +475,27 @@ class OpenStackCloudBackend(ServiceParentMixin, WorkdirMixin, ShouldFailMixin, u
response_code, response_phrase, response_headers, response_body)
def _make_server(self, name):
self._set_request('GET', self.AUTH_SERVICE_URL, {
'X-Auth-User': [self.USERNAME],
'X-Auth-Key': [self.API_KEY],
}, "",
204, "No Content", {
'X-Storage-Url': [self.STORAGE_URL],
'X-CDN-Management-Url': [self.CDN_MANAGEMENT_URL],
'X-Auth-Token': [self.AUTH_TOKEN],
}, "")
# This is for the v1 auth protocol.
#self._set_request('GET', self.AUTH_SERVICE_URL, {
# 'X-Auth-User': [self.USERNAME],
# 'X-Auth-Key': [self.API_KEY],
# }, "",
# 204, "No Content", {
# 'X-Storage-Url': [self.STORAGE_URL],
# 'X-Auth-Token': [self.AUTH_TOKEN],
# }, "")
self._set_request('POST', self.AUTH_SERVICE_URL, {
'Content-Type': ['application/json'],
}, '{"auth": {"RAX-KSKEY:apiKeyCredentials": {"username": "username", "apiKey": "api_key"}}}',
200, "OK", {
}, '''
{"access": {"token": {"id": "%s"},
"serviceCatalog": [{"endpoints": [{"region": "FOO", "publicURL": "%s", "internalURL": "%s"}],
"type": "object-store"}],
"user": {"RAX-AUTH:defaultRegion": "", "name": "%s"}
}
}''' % (self.AUTH_TOKEN, self.PUBLIC_STORAGE_URL, self.INTERNAL_STORAGE_URL, self.USERNAME))
storage_config = {
'openstack.provider': self.PROVIDER,
@ -527,8 +540,8 @@ class OpenStackCloudBackend(ServiceParentMixin, WorkdirMixin, ShouldFailMixin, u
d = self.container._auth_client.get_auth_info()
def _check(auth_info):
self.failUnlessEqual(auth_info.storage_url, self.STORAGE_URL)
self.failUnlessEqual(auth_info.cdn_management_url, self.CDN_MANAGEMENT_URL)
self.failUnlessEqual(auth_info.public_storage_url, self.PUBLIC_STORAGE_URL)
self.failUnlessEqual(auth_info.internal_storage_url, self.INTERNAL_STORAGE_URL)
self.failUnlessEqual(auth_info.auth_token, self.AUTH_TOKEN)
d.addCallback(_check)
d.addBoth(self._shutdown)
@ -538,26 +551,26 @@ class OpenStackCloudBackend(ServiceParentMixin, WorkdirMixin, ShouldFailMixin, u
self._patch_agent()
# Set up the requests that we expect to receive.
self._set_request('GET', "/".join((self.STORAGE_URL, self.CONTAINER, "unexpected")), {
self._set_request('GET', "/".join((self.PUBLIC_STORAGE_URL, self.CONTAINER, "unexpected")), {
'X-Auth-Token': [self.AUTH_TOKEN],
}, "",
404, "Not Found", {}, "")
self._set_request('PUT', "/".join((self.STORAGE_URL, self.CONTAINER, self.TEST_SHARE_NAME)), {
self._set_request('PUT', "/".join((self.PUBLIC_STORAGE_URL, self.CONTAINER, self.TEST_SHARE_NAME)), {
'X-Auth-Token': [self.AUTH_TOKEN],
'Content-Type': ['application/octet-stream'],
'Content-Length': [len(self.TEST_SHARE_DATA)],
#'Content-Length': [len(self.TEST_SHARE_DATA)],
}, self.TEST_SHARE_DATA,
204, "No Content", {}, "")
quoted_prefix = urllib.quote(self.TEST_SHARE_PREFIX, safe='')
self._set_request('GET', "%s/%s?format=json&prefix=%s"
% (self.STORAGE_URL, self.CONTAINER, quoted_prefix), {
% (self.PUBLIC_STORAGE_URL, self.CONTAINER, quoted_prefix), {
'X-Auth-Token': [self.AUTH_TOKEN],
}, "",
200, "OK", {}, self.TEST_LISTING_JSON)
self._set_request('GET', "/".join((self.STORAGE_URL, self.CONTAINER, self.TEST_SHARE_NAME)), {
self._set_request('GET', "/".join((self.PUBLIC_STORAGE_URL, self.CONTAINER, self.TEST_SHARE_NAME)), {
'X-Auth-Token': [self.AUTH_TOKEN],
}, "",
200, "OK", {}, self.TEST_SHARE_DATA)
@ -588,14 +601,14 @@ class OpenStackCloudBackend(ServiceParentMixin, WorkdirMixin, ShouldFailMixin, u
d.addCallback(lambda res: self.failUnlessEqual(res, self.TEST_SHARE_DATA))
def _set_up_delete(ign):
self._set_request('DELETE', "/".join((self.STORAGE_URL, self.CONTAINER, self.TEST_SHARE_NAME)), {
self._set_request('DELETE', "/".join((self.PUBLIC_STORAGE_URL, self.CONTAINER, self.TEST_SHARE_NAME)), {
'X-Auth-Token': [self.AUTH_TOKEN],
}, "",
204, "No Content", {}, "")
# this changes the response to the request set up above
self._set_request('GET', "%s/%s?format=json&prefix=%s"
% (self.STORAGE_URL, self.CONTAINER, quoted_prefix), {
% (self.PUBLIC_STORAGE_URL, self.CONTAINER, quoted_prefix), {
'X-Auth-Token': [self.AUTH_TOKEN],
}, "",
200, "OK", {}, "[]")