OpenStack: mostly complete implementation of OpenStackContainer.

Signed-off-by: David-Sarah Hopwood <david-sarah@jacaranda.org>
This commit is contained in:
David-Sarah Hopwood 2013-02-13 22:08:46 +00:00 committed by Daira Hopwood
parent b9a9f9f30b
commit ed6ee84786
1 changed files with 220 additions and 42 deletions

View File

@ -1,6 +1,13 @@
import urllib, simplejson
from cStringIO import StringIO
from collections import deque
from twisted.internet import defer, reactor
from twisted.web.client import Agent
from allmydata.util.deferredutil import eventually_callback, eventually_errback
from twisted.internet.protocol import Protocol
from twisted.web.client import Agent, FileBodyProducer, ResponseDone
from twisted.web.http_headers import Headers
from zope.interface import implements
@ -9,7 +16,10 @@ from allmydata.util import log
from allmydata.util.assertutil import _assert
from allmydata.node import InvalidValueError
from allmydata.storage.backends.cloud.cloud_common import IContainer, \
ContainerRetryMixin, ContainerListMixin
ContainerRetryMixin
# move this
from allmydata.storage.backends.cloud.mock_cloud import ContainerItem, ContainerListing
# Enabling this will cause secrets to be logged.
@ -145,61 +155,229 @@ class AuthenticationClient(object):
self._delayed.cancel()
class OpenStackContainer(ContainerRetryMixin, ContainerListMixin):
implements(IContainer)
"""
I represent a real OpenStack container.
"""
class OpenStackError(Exception):
pass
def __init__(self, auth_client, container_name):
class Discard(Protocol):
# see http://twistedmatrix.com/trac/ticket/5488
def makeConnection(self, producer):
producer.stopProducing()
class DataCollector(Protocol):
def __init__(self):
self._data = deque()
self._done = defer.Deferred()
def dataReceived(self, bytes):
print 'Got %d bytes' % (len(bytes),)
self._data.append(bytes)
def connectionLost(self, reason):
print 'Finished receiving body: %s' % (reason.getErrorMessage(),)
#reason.raiseException()
if reason.check(ResponseDone):
eventually_callback(self._done)("".join(self._data))
else:
def _failed(): raise OpenStackError(reason.getErrorMessage())
eventually_errback(self._done)(defer.execute(_failed))
def when_done(self):
"""CAUTION: this always returns the same Deferred."""
return self._done
class OpenStackContainer(ContainerRetryMixin):
implements(IContainer)
USER_AGENT = 'Tahoe-LAFS OpenStack client'
def __init__(self, auth_client, container_name, override_reactor=None):
self._auth_client = auth_client
self._container_name = container_name
#self.client = OpenStackClient(auth_client)
#self.ServiceError = OpenStackError
self._reactor = override_reactor or reactor
self._agent = Agent(self._reactor)
self.ServiceError = OpenStackError
def __repr__(self):
return ("<%s>" % (self.__class__.__name__,))
return ("<%s %r>" % (self.__class__.__name__, self._container_name,))
def _make_container_url(self, auth_info):
return "%s/%s" % (auth_info.storage_url, urllib.quote(self._container_name, safe=''))
def _make_object_url(self, auth_info, object_name):
return "%s/%s/%s" % (auth_info.storage_url, urllib.quote(self._container_name, safe=''), urllib.quote(object_name))
def _create(self):
"""
Create this container.
"""
raise NotImplementedError
def _delete(self):
"""
Delete this container.
The cloud service may require the container to be empty before it can be deleted.
"""
raise NotImplementedError
def _list_objects(self, prefix=''):
"""
Get a ContainerListing that lists objects in this container.
prefix: (str) limit the returned keys to those starting with prefix.
"""
d = self._auth_client.get_auth_info()
def _do_list(auth_info):
request_headers = {
'User-Agent': [self.USER_AGENT],
'X-Auth-Token': [auth_info.auth_token],
}
url = self._make_container_url(auth_info)
if prefix:
url += "?format=json&prefix=%s" % (urllib.quote(prefix, safe=''),)
log.msg(format="OpenStack list GET %(url)s", url=url, level=log.OPERATIONAL)
return self._agent.request('GET', url, Headers(request_headers), None)
d.addCallback(_do_list)
def _got_list_response(response):
log.msg(format="OpenStack list GET response: %(code)d %(phrase)s",
code=response.code, phrase=response.phrase, level=log.OPERATIONAL)
if response.code < 200 or response.code >= 300:
raise OpenStackError("unexpected response code %r %s" % (response.code, response.phrase),
response.code, response.headers)
collector = DataCollector()
response.deliverBody(collector)
return collector.when_done()
d.addCallback(_got_list_response)
def _parse_list(json):
items = simplejson.loads(json)
log.msg(format="OpenStack list GET read %(length)d bytes, parsed as %(items)d items",
length=len(json), items=len(items), level=log.OPERATIONAL)
def _make_containeritem(item):
try:
key = item['name']
size = item['bytes']
modification_date = item['last_modified']
etag = item['hash']
storage_class = 'STANDARD'
except KeyError, e:
raise OpenStackError(str(e))
else:
return ContainerItem(key, modification_date, etag, size, storage_class)
contents = map(_make_containeritem, items)
return ContainerListing(self._container_name, prefix, None, 10000, False, contents=contents)
d.addCallback(_parse_list)
return d
def _put_object(self, object_name, data, content_type='application/octet-stream', metadata={}):
"""
Put an object in this bucket.
Any existing object of the same name will be replaced.
"""
d = self._auth_client.get_auth_info()
def _do_put(auth_info):
content_length = len(data)
request_headers = {
'User-Agent': [self.USER_AGENT],
'X-Auth-Token': [auth_info.auth_token],
'Content-Type': [content_type],
'Content-Length': [content_length],
}
producer = FileBodyProducer(StringIO(data))
url = self._make_object_url(auth_info, object_name)
log.msg(format="OpenStack PUT %(url)s %(content_length)d",
url=url, content_length=content_length, level=log.OPERATIONAL)
return self._agent.request('PUT', url, Headers(request_headers), producer)
d.addCallback(_do_put)
def _got_put_response(response):
log.msg(format="OpenStack PUT response: %(code)d %(phrase)s",
code=response.code, phrase=response.phrase, level=log.OPERATIONAL)
if response.code < 200 or response.code >= 300:
raise OpenStackError("unexpected response code %r %s" % (response.code, response.phrase),
response.code, response.headers)
response.deliverBody(Discard())
d.addCallback(_got_put_response)
return d
def _get_object(self, object_name):
"""
Get an object from this container.
"""
d = self._auth_client.get_auth_info()
def _do_get(auth_info):
request_headers = {
'User-Agent': [self.USER_AGENT],
'X-Auth-Token': [auth_info.auth_token],
}
url = self._make_object_url(auth_info, object_name)
log.msg(format="OpenStack GET %(url)s", url=url, level=log.OPERATIONAL)
return self._agent.request('GET', url, Headers(request_headers), None)
d.addCallback(_do_get)
def _got_get_response(response):
log.msg(format="OpenStack GET response: %(code)d %(phrase)s",
code=response.code, phrase=response.phrase, level=log.OPERATIONAL)
if response.code < 200 or response.code >= 300:
raise OpenStackError("unexpected response code %r %s" % (response.code, response.phrase),
response.code, response.headers)
collector = DataCollector()
response.deliverBody(collector)
return collector.when_done()
d.addCallback(_got_get_response)
return d
def _head_object(self, object_name):
"""
Retrieve object metadata only.
"""
raise NotImplementedError
def _delete_object(self, object_name):
"""
Delete an object from this container.
Once deleted, there is no method to restore or undelete an object.
"""
d = self._auth_client.get_auth_info()
def _do_delete(auth_info):
request_headers = {
'User-Agent': [self.USER_AGENT],
'X-Auth-Token': [auth_info.auth_token],
}
url = self._make_object_url(auth_info, object_name)
log.msg(format="OpenStack DELETE %(url)s", url=url, level=log.OPERATIONAL)
return self._agent.request('DELETE', url, Headers(request_headers), None)
d.addCallback(_do_delete)
def _got_delete_response(response):
log.msg(format="OpenStack DELETE response: %(code)d %(phrase)s",
code=response.code, phrase=response.phrase, level=log.OPERATIONAL)
if response.code < 200 or response.code >= 300:
raise OpenStackError("unexpected response code %r %s" % (response.code, response.phrase),
response.code, response.headers)
response.deliverBody(Discard())
d.addCallback(_got_delete_response)
return d
def create(self):
return self._do_request('create bucket', self.client.create, self.container_name)
return self._do_request('create container', self._create)
def delete(self):
return self._do_request('delete bucket', self.client.delete, self.container_name)
return self._do_request('delete container', self._delete)
def list_some_objects(self, **kwargs):
return self._do_request('list objects', self.client.get_bucket, self.container_name, **kwargs)
def list_objects(self, prefix=''):
return self._do_request('list objects', self._list_objects, prefix)
def put_object(self, object_name, data, content_type='application/octet-stream', metadata={}):
return self._do_request('PUT object', self.client.put_object, self.container_name,
object_name, data, content_type, metadata)
return self._do_request('PUT object', self._put_object, object_name, data, content_type, metadata)
def get_object(self, object_name):
return self._do_request('GET object', self.client.get_object, self.container_name, object_name)
return self._do_request('GET object', self._get_object, object_name)
def head_object(self, object_name):
return self._do_request('HEAD object', self.client.head_object, self.container_name, object_name)
return self._do_request('HEAD object', self._head_object, object_name)
def delete_object(self, object_name):
return self._do_request('DELETE object', self.client.delete_object, self.container_name, object_name)
def put_policy(self, policy):
"""
Set access control policy on a bucket.
"""
query = self.client.query_factory(
action='PUT', creds=self.client.creds, endpoint=self.client.endpoint,
bucket=self.container_name, object_name='?policy', data=policy)
return self._do_request('PUT policy', query.submit)
def get_policy(self):
query = self.client.query_factory(
action='GET', creds=self.client.creds, endpoint=self.client.endpoint,
bucket=self.container_name, object_name='?policy')
return self._do_request('GET policy', query.submit)
def delete_policy(self):
query = self.client.query_factory(
action='DELETE', creds=self.client.creds, endpoint=self.client.endpoint,
bucket=self.container_name, object_name='?policy')
return self._do_request('DELETE policy', query.submit)
return self._do_request('DELETE object', self._delete_object, object_name)