Logo Search packages:      
Sourcecode: tahoe-lafs version File versions  Download package

storage_client.py

00001 
"""
I contain the client-side code which speaks to storage servers, in particular
the foolscap-based server implemented in src/allmydata/storage/*.py .
"""

# roadmap:
#
# 1: implement StorageFarmBroker (i.e. "storage broker"), change Client to
# create it, change uploader/servermap to get rrefs from it. ServerFarm calls
# IntroducerClient.subscribe_to . ServerFarm hides descriptors, passes rrefs
# to clients. webapi status pages call broker.get_info_about_serverid.
#
# 2: move get_info methods to the descriptor, webapi status pages call
# broker.get_descriptor_for_serverid().get_info
#
# 3?later?: store descriptors in UploadResults/etc instead of serverids,
# webapi status pages call descriptor.get_info and don't use storage_broker
# or Client
#
# 4: enable static config: tahoe.cfg can add descriptors. Make the introducer
# optional. This closes #467
#
# 5: implement NativeStorageClient, pass it to Tahoe2PeerSelector and other
# clients. Clients stop doing callRemote(), use NativeStorageClient methods
# instead (which might do something else, i.e. http or whatever). The
# introducer and tahoe.cfg only create NativeStorageClients for now.
#
# 6: implement other sorts of IStorageClient classes: S3, etc

import sha, time
from zope.interface import implements, Interface
from foolscap.api import eventually
from allmydata.interfaces import IStorageBroker
from allmydata.util import idlib, log
from allmydata.util.assertutil import _assert, precondition
from allmydata.util.rrefutil import add_version_to_remote_reference

# who is responsible for de-duplication?
#  both?
#  IC remembers the unpacked announcements it receives, to provide for late
#  subscribers and to remove duplicates

# if a client subscribes after startup, will they receive old announcements?
#  yes

# who will be responsible for signature checking?
#  make it be IntroducerClient, so they can push the filter outwards and
#  reduce inbound network traffic

# what should the interface between StorageFarmBroker and IntroducerClient
# look like?
#  don't pass signatures: only pass validated blessed-objects

class StorageFarmBroker:
    implements(IStorageBroker)
    """I live on the client, and know about storage servers. For each server
    that is participating in a grid, I either maintain a connection to it or
    remember enough information to establish a connection to it on demand.
    I'm also responsible for subscribing to the IntroducerClient to find out
    about new servers as they are announced by the Introducer.
    """
    def __init__(self, tub, permute_peers):
        self.tub = tub
        assert permute_peers # False not implemented yet
        self.permute_peers = permute_peers
        # self.descriptors maps serverid -> IServerDescriptor, and keeps
        # track of all the storage servers that we've heard about. Each
        # descriptor manages its own Reconnector, and will give us a
        # RemoteReference when we ask them for it.
        self.descriptors = {}
        # self.test_servers are statically configured from unit tests
        self.test_servers = {} # serverid -> rref
        self.introducer_client = None

    # these two are used in unit tests
    def test_add_server(self, serverid, rref):
        self.test_servers[serverid] = rref
    def test_add_descriptor(self, serverid, dsc):
        self.descriptors[serverid] = dsc

    def use_introducer(self, introducer_client):
        self.introducer_client = ic = introducer_client
        ic.subscribe_to("storage", self._got_announcement)

    def _got_announcement(self, serverid, ann_d):
        precondition(isinstance(serverid, str), serverid)
        precondition(len(serverid) == 20, serverid)
        assert ann_d["service-name"] == "storage"
        old = self.descriptors.get(serverid)
        if old:
            if old.get_announcement() == ann_d:
                return # duplicate
            # replacement
            del self.descriptors[serverid]
            old.stop_connecting()
            # now we forget about them and start using the new one
        dsc = NativeStorageClientDescriptor(serverid, ann_d)
        self.descriptors[serverid] = dsc
        dsc.start_connecting(self.tub, self._trigger_connections)
        # the descriptor will manage their own Reconnector, and each time we
        # need servers, we'll ask them if they're connected or not.

    def _trigger_connections(self):
        # when one connection is established, reset the timers on all others,
        # to trigger a reconnection attempt in one second. This is intended
        # to accelerate server connections when we've been offline for a
        # while. The goal is to avoid hanging out for a long time with
        # connections to only a subset of the servers, which would increase
        # the chances that we'll put shares in weird places (and not update
        # existing shares of mutable files). See #374 for more details.
        for dsc in self.descriptors.values():
            dsc.try_to_connect()



    def get_servers_for_index(self, peer_selection_index):
        # first cut: return a list of (peerid, versioned-rref) tuples
        assert self.permute_peers == True
        servers = self.get_all_servers()
        key = peer_selection_index
        return sorted(servers, key=lambda x: sha.new(key+x[0]).digest())

    def get_all_servers(self):
        # return a frozenset of (peerid, versioned-rref) tuples
        servers = {}
        for serverid,rref in self.test_servers.items():
            servers[serverid] = rref
        for serverid,dsc in self.descriptors.items():
            rref = dsc.get_rref()
            if rref:
                servers[serverid] = rref
        result = frozenset(servers.items())
        _assert(len(result) <= len(self.get_all_serverids()), result, self.get_all_serverids())
        return result

    def get_all_serverids(self):
        serverids = set()
        serverids.update(self.test_servers.keys())
        serverids.update(self.descriptors.keys())
        return frozenset(serverids)

    def get_all_descriptors(self):
        return sorted(self.descriptors.values(),
                      key=lambda dsc: dsc.get_serverid())

    def get_nickname_for_serverid(self, serverid):
        if serverid in self.descriptors:
            return self.descriptors[serverid].get_nickname()
        return None


class IServerDescriptor(Interface):
    def start_connecting(tub, trigger_cb):
        pass
    def get_nickname():
        pass
    def get_rref():
        pass

00161 class NativeStorageClientDescriptor:
    """I hold information about a storage server that we want to connect to.
    If we are connected, I hold the RemoteReference, their host address, and
    the their version information. I remember information about when we were
    last connected too, even if we aren't currently connected.

    @ivar announcement_time: when we first heard about this service
    @ivar last_connect_time: when we last established a connection
    @ivar last_loss_time: when we last lost a connection

    @ivar version: the server's versiondict, from the most recent announcement
    @ivar nickname: the server's self-reported nickname (unicode), same

    @ivar rref: the RemoteReference, if connected, otherwise None
    @ivar remote_host: the IAddress, if connected, otherwise None
    """
    implements(IServerDescriptor)

    VERSION_DEFAULTS = {
        "http://allmydata.org/tahoe/protocols/storage/v1" :
        { "maximum-immutable-share-size": 2**32,
          "tolerates-immutable-read-overrun": False,
          "delete-mutable-shares-with-zero-length-writev": False,
          },
        "application-version": "unknown: no get_version()",
        }

    def __init__(self, serverid, ann_d, min_shares=1):
        self.serverid = serverid
        self.announcement = ann_d
        self.min_shares = min_shares

        self.serverid_s = idlib.shortnodeid_b2a(self.serverid)
        self.announcement_time = time.time()
        self.last_connect_time = None
        self.last_loss_time = None
        self.remote_host = None
        self.rref = None
        self._reconnector = None
        self._trigger_cb = None

    def get_serverid(self):
        return self.serverid

    def get_nickname(self):
        return self.announcement["nickname"].decode("utf-8")
    def get_announcement(self):
        return self.announcement
    def get_remote_host(self):
        return self.remote_host
    def get_last_connect_time(self):
        return self.last_connect_time
    def get_last_loss_time(self):
        return self.last_loss_time
    def get_announcement_time(self):
        return self.announcement_time

    def start_connecting(self, tub, trigger_cb):
        furl = self.announcement["FURL"]
        self._trigger_cb = trigger_cb
        self._reconnector = tub.connectTo(furl, self._got_connection)

    def _got_connection(self, rref):
        lp = log.msg(format="got connection to %(serverid)s, getting versions",
                     serverid=self.serverid_s,
                     facility="tahoe.storage_broker", umid="coUECQ")
        if self._trigger_cb:
            eventually(self._trigger_cb)
        default = self.VERSION_DEFAULTS
        d = add_version_to_remote_reference(rref, default)
        d.addCallback(self._got_versioned_service, lp)
        d.addErrback(log.err, format="storageclient._got_connection",
                     serverid=self.serverid_s, umid="Sdq3pg")

    def _got_versioned_service(self, rref, lp):
        log.msg(format="%(serverid)s provided version info %(version)s",
                serverid=self.serverid_s, version=rref.version,
                facility="tahoe.storage_broker", umid="SWmJYg",
                level=log.NOISY, parent=lp)

        self.last_connect_time = time.time()
        self.remote_host = rref.getPeer()
        self.rref = rref
        rref.notifyOnDisconnect(self._lost)

    def get_rref(self):
        return self.rref

    def _lost(self):
        log.msg(format="lost connection to %(serverid)s",
                serverid=self.serverid_s,
                facility="tahoe.storage_broker", umid="zbRllw")
        self.last_loss_time = time.time()
        self.rref = None
        self.remote_host = None

    def stop_connecting(self):
        # used when this descriptor has been superceded by another
        self._reconnector.stopConnecting()

    def try_to_connect(self):
        # used when the broker wants us to hurry up
        self._reconnector.reset()

class UnknownServerTypeError(Exception):
    pass

Generated by  Doxygen 1.6.0   Back to index