Logo Search packages:      
Sourcecode: tahoe-lafs version File versions  Download package

def allmydata::mutable::publish::Publish::publish (   self,
  newdata 
)

Publish the filenode's current contents.  Returns a Deferred that
fires (with None) when the publish has done as much work as it's ever
going to do, or errbacks with ConsistencyError if it detects a
simultaneous write.

Definition at line 119 of file publish.py.

00119                               :
        """Publish the filenode's current contents.  Returns a Deferred that
        fires (with None) when the publish has done as much work as it's ever
        going to do, or errbacks with ConsistencyError if it detects a
        simultaneous write.
        """

        # 1: generate shares (SDMF: files are small, so we can do it in RAM)
        # 2: perform peer selection, get candidate servers
        #  2a: send queries to n+epsilon servers, to determine current shares
        #  2b: based upon responses, create target map
        # 3: send slot_testv_and_readv_and_writev messages
        # 4: as responses return, update share-dispatch table
        # 4a: may need to run recovery algorithm
        # 5: when enough responses are back, we're done

        self.log("starting publish, datalen is %s" % len(newdata))
        self._status.set_size(len(newdata))
        self._status.set_status("Started")
        self._started = time.time()

        self.done_deferred = defer.Deferred()

        self._writekey = self._node.get_writekey()
        assert self._writekey, "need write capability to publish"

        # first, which servers will we publish to? We require that the
        # servermap was updated in MODE_WRITE, so we can depend upon the
        # peerlist computed by that process instead of computing our own.
        if self._servermap:
            assert self._servermap.last_update_mode in (MODE_WRITE, MODE_CHECK)
            # we will push a version that is one larger than anything present
            # in the grid, according to the servermap.
            self._new_seqnum = self._servermap.highest_seqnum() + 1
        else:
            # If we don't have a servermap, that's because we're doing the
            # initial publish
            self._new_seqnum = 1
            self._servermap = ServerMap()
        self._status.set_servermap(self._servermap)

        self.log(format="new seqnum will be %(seqnum)d",
                 seqnum=self._new_seqnum, level=log.NOISY)

        # having an up-to-date servermap (or using a filenode that was just
        # created for the first time) also guarantees that the following
        # fields are available
        self.readkey = self._node.get_readkey()
        self.required_shares = self._node.get_required_shares()
        assert self.required_shares is not None
        self.total_shares = self._node.get_total_shares()
        assert self.total_shares is not None
        self._status.set_encoding(self.required_shares, self.total_shares)

        self._pubkey = self._node.get_pubkey()
        assert self._pubkey
        self._privkey = self._node.get_privkey()
        assert self._privkey
        self._encprivkey = self._node.get_encprivkey()

        sb = self._node._client.get_storage_broker()
        full_peerlist = sb.get_servers_for_index(self._storage_index)
        self.full_peerlist = full_peerlist # for use later, immutable
        self.bad_peers = set() # peerids who have errbacked/refused requests

        self.newdata = newdata
        self.salt = os.urandom(16)

        self.setup_encoding_parameters()

        # if we experience any surprises (writes which were rejected because
        # our test vector did not match, or shares which we didn't expect to
        # see), we set this flag and report an UncoordinatedWriteError at the
        # end of the publish process.
        self.surprised = False

        # as a failsafe, refuse to iterate through self.loop more than a
        # thousand times.
        self.looplimit = 1000

        # we keep track of three tables. The first is our goal: which share
        # we want to see on which servers. This is initially populated by the
        # existing servermap.
        self.goal = set() # pairs of (peerid, shnum) tuples

        # the second table is our list of outstanding queries: those which
        # are in flight and may or may not be delivered, accepted, or
        # acknowledged. Items are added to this table when the request is
        # sent, and removed when the response returns (or errbacks).
        self.outstanding = set() # (peerid, shnum) tuples

        # the third is a table of successes: share which have actually been
        # placed. These are populated when responses come back with success.
        # When self.placed == self.goal, we're done.
        self.placed = set() # (peerid, shnum) tuples

        # we also keep a mapping from peerid to RemoteReference. Each time we
        # pull a connection out of the full peerlist, we add it to this for
        # use later.
        self.connections = {}

        self.bad_share_checkstrings = {}

        # we use the servermap to populate the initial goal: this way we will
        # try to update each existing share in place.
        for (peerid, shnum) in self._servermap.servermap:
            self.goal.add( (peerid, shnum) )
            self.connections[peerid] = self._servermap.connections[peerid]
        # then we add in all the shares that were bad (corrupted, bad
        # signatures, etc). We want to replace these.
        for key, old_checkstring in self._servermap.bad_shares.items():
            (peerid, shnum) = key
            self.goal.add(key)
            self.bad_share_checkstrings[key] = old_checkstring
            self.connections[peerid] = self._servermap.connections[peerid]

        # create the shares. We'll discard these as they are delivered. SDMF:
        # we're allowed to hold everything in memory.

        self._status.timings["setup"] = time.time() - self._started
        d = self._encrypt_and_encode()
        d.addCallback(self._generate_shares)
        def _start_pushing(res):
            self._started_pushing = time.time()
            return res
        d.addCallback(_start_pushing)
        d.addCallback(self.loop) # trigger delivery
        d.addErrback(self._fatal_error)

        return self.done_deferred

    def setup_encoding_parameters(self):


Generated by  Doxygen 1.6.0   Back to index