Logo Search packages:      
Sourcecode: tahoe-lafs version File versions  Download package

no_network.py

# This contains a test harness that creates a full Tahoe grid in a single
# process (actually in a single MultiService) which does not use the network.
# It does not use an Introducer, and there are no foolscap Tubs. Each storage
# server puts real shares on disk, but is accessed through loopback
# RemoteReferences instead of over serialized SSL. It is not as complete as
# the common.SystemTestMixin framework (which does use the network), but
# should be considerably faster: on my laptop, it takes 50-80ms to start up,
# whereas SystemTestMixin takes close to 2s.

# This should be useful for tests which want to examine and/or manipulate the
# uploaded shares, checker/verifier/repairer tests, etc. The clients have no
# Tubs, so it is not useful for tests that involve a Helper, a KeyGenerator,
# or the control.furl .

import os.path
import sha
from zope.interface import implements
from twisted.application import service
from twisted.internet import reactor
from twisted.python.failure import Failure
from foolscap.api import Referenceable, fireEventually, RemoteException
from base64 import b32encode
from allmydata import uri as tahoe_uri
from allmydata.client import Client
from allmydata.storage.server import StorageServer, storage_index_to_dir
from allmydata.util import fileutil, idlib, hashutil
from allmydata.test.common_web import HTTPClientGETFactory
from allmydata.interfaces import IStorageBroker

class IntentionalError(Exception):
    pass

class Marker:
    pass

class LocalWrapper:
    def __init__(self, original):
        self.original = original
        self.broken = False
        self.post_call_notifier = None
        self.disconnectors = {}

    def callRemoteOnly(self, methname, *args, **kwargs):
        d = self.callRemote(methname, *args, **kwargs)
        return None

    def callRemote(self, methname, *args, **kwargs):
        # this is ideally a Membrane, but that's too hard. We do a shallow
        # wrapping of inbound arguments, and per-methodname wrapping of
        # selected return values.
        def wrap(a):
            if isinstance(a, Referenceable):
                return LocalWrapper(a)
            else:
                return a
        args = tuple([wrap(a) for a in args])
        kwargs = dict([(k,wrap(kwargs[k])) for k in kwargs])
        def _call():
            if self.broken:
                raise IntentionalError("I was asked to break")
            meth = getattr(self.original, "remote_" + methname)
            return meth(*args, **kwargs)
        d = fireEventually()
        d.addCallback(lambda res: _call())
        def _wrap_exception(f):
            return Failure(RemoteException(f))
        d.addErrback(_wrap_exception)
        def _return_membrane(res):
            # rather than complete the difficult task of building a
            # fully-general Membrane (which would locate all Referenceable
            # objects that cross the simulated wire and replace them with
            # wrappers), we special-case certain methods that we happen to
            # know will return Referenceables.
            if methname == "allocate_buckets":
                (alreadygot, allocated) = res
                for shnum in allocated:
                    allocated[shnum] = LocalWrapper(allocated[shnum])
            if methname == "get_buckets":
                for shnum in res:
                    res[shnum] = LocalWrapper(res[shnum])
            return res
        d.addCallback(_return_membrane)
        if self.post_call_notifier:
            d.addCallback(self.post_call_notifier, methname)
        return d

    def notifyOnDisconnect(self, f, *args, **kwargs):
        m = Marker()
        self.disconnectors[m] = (f, args, kwargs)
        return m
    def dontNotifyOnDisconnect(self, marker):
        del self.disconnectors[marker]

def wrap_storage_server(original):
    # Much of the upload/download code uses rref.version (which normally
    # comes from rrefutil.add_version_to_remote_reference). To avoid using a
    # network, we want a LocalWrapper here. Try to satisfy all these
    # constraints at the same time.
    wrapper = LocalWrapper(original)
    wrapper.version = original.remote_get_version()
    return wrapper

class NoNetworkStorageBroker:
    implements(IStorageBroker)
    def get_servers_for_index(self, key):
        return sorted(self.client._servers,
                      key=lambda x: sha.new(key+x[0]).digest())
    def get_all_servers(self):
        return frozenset(self.client._servers)
    def get_nickname_for_serverid(self, serverid):
        return None

class NoNetworkClient(Client):
    DEFAULT_MUTABLE_KEYSIZE = 522

    def create_tub(self):
        pass
    def init_introducer_client(self):
        pass
    def setup_logging(self):
        pass
    def startService(self):
        service.MultiService.startService(self)
    def stopService(self):
        service.MultiService.stopService(self)
    def when_tub_ready(self):
        raise NotImplementedError("NoNetworkClient has no Tub")
    def init_control(self):
        pass
    def init_helper(self):
        pass
    def init_key_gen(self):
        pass
    def init_storage(self):
        pass
    def init_client_storage_broker(self):
        self.storage_broker = NoNetworkStorageBroker()
        self.storage_broker.client = self
    def init_stub_client(self):
        pass
    #._servers will be set by the NoNetworkGrid which creates us

class SimpleStats:
    def __init__(self):
        self.counters = {}
        self.stats_producers = []

    def count(self, name, delta=1):
        val = self.counters.setdefault(name, 0)
        self.counters[name] = val + delta

    def register_producer(self, stats_producer):
        self.stats_producers.append(stats_producer)

    def get_stats(self):
        stats = {}
        for sp in self.stats_producers:
            stats.update(sp.get_stats())
        ret = { 'counters': self.counters, 'stats': stats }
        return ret

class NoNetworkGrid(service.MultiService):
    def __init__(self, basedir, num_clients=1, num_servers=10,
                 client_config_hooks={}):
        service.MultiService.__init__(self)
        self.basedir = basedir
        fileutil.make_dirs(basedir)

        self.servers_by_number = {}
        self.servers_by_id = {}
        self.clients = []

        for i in range(num_servers):
            ss = self.make_server(i)
            self.add_server(i, ss)

        for i in range(num_clients):
            clientid = hashutil.tagged_hash("clientid", str(i))[:20]
            clientdir = os.path.join(basedir, "clients",
                                     idlib.shortnodeid_b2a(clientid))
            fileutil.make_dirs(clientdir)
            f = open(os.path.join(clientdir, "tahoe.cfg"), "w")
            f.write("[node]\n")
            f.write("nickname = client-%d\n" % i)
            f.write("web.port = tcp:0:interface=127.0.0.1\n")
            f.write("[storage]\n")
            f.write("enabled = false\n")
            f.close()
            c = None
            if i in client_config_hooks:
                # this hook can either modify tahoe.cfg, or return an
                # entirely new Client instance
                c = client_config_hooks[i](clientdir)
            if not c:
                c = NoNetworkClient(clientdir)
            c.nodeid = clientid
            c.short_nodeid = b32encode(clientid).lower()[:8]
            c._servers = self.all_servers # can be updated later
            c.setServiceParent(self)
            self.clients.append(c)

    def make_server(self, i):
        serverid = hashutil.tagged_hash("serverid", str(i))[:20]
        serverdir = os.path.join(self.basedir, "servers",
                                 idlib.shortnodeid_b2a(serverid))
        fileutil.make_dirs(serverdir)
        ss = StorageServer(serverdir, serverid, stats_provider=SimpleStats())
        return ss

    def add_server(self, i, ss):
        # to deal with the fact that all StorageServers are named 'storage',
        # we interpose a middleman
        middleman = service.MultiService()
        middleman.setServiceParent(self)
        ss.setServiceParent(middleman)
        serverid = ss.my_nodeid
        self.servers_by_number[i] = ss
        self.servers_by_id[serverid] = wrap_storage_server(ss)
        self.all_servers = frozenset(self.servers_by_id.items())
        for c in self.clients:
            c._servers = self.all_servers

class GridTestMixin:
    def setUp(self):
        self.s = service.MultiService()
        self.s.startService()

    def tearDown(self):
        return self.s.stopService()

    def set_up_grid(self, num_clients=1, num_servers=10,
                    client_config_hooks={}):
        # self.basedir must be set
        self.g = NoNetworkGrid(self.basedir,
                               num_clients=num_clients,
                               num_servers=num_servers,
                               client_config_hooks=client_config_hooks)
        self.g.setServiceParent(self.s)
        self.client_webports = [c.getServiceNamed("webish").listener._port.getHost().port
                                for c in self.g.clients]
        self.client_baseurls = ["http://localhost:%d/" % p
                                for p in self.client_webports]

    def get_clientdir(self, i=0):
        return self.g.clients[i].basedir

    def get_serverdir(self, i):
        return self.g.servers_by_number[i].storedir

    def iterate_servers(self):
        for i in sorted(self.g.servers_by_number.keys()):
            ss = self.g.servers_by_number[i]
            yield (i, ss, ss.storedir)

    def find_shares(self, uri):
        si = tahoe_uri.from_string(uri).get_storage_index()
        prefixdir = storage_index_to_dir(si)
        shares = []
        for i,ss in self.g.servers_by_number.items():
            serverid = ss.my_nodeid
            basedir = os.path.join(ss.storedir, "shares", prefixdir)
            if not os.path.exists(basedir):
                continue
            for f in os.listdir(basedir):
                try:
                    shnum = int(f)
                    shares.append((shnum, serverid, os.path.join(basedir, f)))
                except ValueError:
                    pass
        return sorted(shares)

    def delete_share(self, (shnum, serverid, sharefile)):
        os.unlink(sharefile)

    def delete_shares_numbered(self, uri, shnums):
        for (i_shnum, i_serverid, i_sharefile) in self.find_shares(uri):
            if i_shnum in shnums:
                os.unlink(i_sharefile)

    def corrupt_share(self, (shnum, serverid, sharefile), corruptor_function):
        sharedata = open(sharefile, "rb").read()
        corruptdata = corruptor_function(sharedata)
        open(sharefile, "wb").write(corruptdata)

    def corrupt_shares_numbered(self, uri, shnums, corruptor):
        for (i_shnum, i_serverid, i_sharefile) in self.find_shares(uri):
            if i_shnum in shnums:
                sharedata = open(i_sharefile, "rb").read()
                corruptdata = corruptor(sharedata)
                open(i_sharefile, "wb").write(corruptdata)

    def GET(self, urlpath, followRedirect=False, return_response=False,
            method="GET", clientnum=0, **kwargs):
        # if return_response=True, this fires with (data, statuscode,
        # respheaders) instead of just data.
        assert not isinstance(urlpath, unicode)
        url = self.client_baseurls[clientnum] + urlpath
        factory = HTTPClientGETFactory(url, method=method,
                                       followRedirect=followRedirect, **kwargs)
        reactor.connectTCP("localhost", self.client_webports[clientnum],factory)
        d = factory.deferred
        def _got_data(data):
            return (data, factory.status, factory.response_headers)
        if return_response:
            d.addCallback(_got_data)
        return factory.deferred

Generated by  Doxygen 1.6.0   Back to index