- Notifications
You must be signed in to change notification settings - Fork51
ipfs-rust/ipfs-embed
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
A small, fast and reliable ipfs implementation designed for embedding in to complex p2papplications.
- node discovery via mdns
- provider discovery via kademlia
- exchange blocks via bitswap
- lru eviction policy
- aliases, an abstraction of recursively named pins
- temporary recursive pins for building dags, preventing races with the garbage collector
- efficiently syncing large dags of blocks
Some compatibility with go-ipfs can be enabled with thecompat
feature flag.
use ipfs_embed::{Config,DefaultParams,Ipfs};use libipld::DagCbor;use libipld::store::Store;#[derive(Clone,DagCbor,Debug,Eq,PartialEq)]structIdentity{id:u64,name:String,age:u8,}#[async_std::main]asyncfnmain() ->Result<(),Box<dyn std::error::Error>>{let cache_size =10;let ipfs =Ipfs::<DefaultParams>::new(Config::new(None, cache_size)).await?; ipfs.listen_on("/ip4/0.0.0.0/tcp/0".parse()?).await?;let identity =Identity{id:0,name:"David Craven".into(),age:26,};let cid = ipfs.insert(&identity)?;let identity2 = ipfs.get(&cid)?;assert_eq!(identity, identity2);println!("identity cid is {}", cid);Ok(())}
Below is some notes on the history of ipfs-embed. The information is no longer accurrate for the current implementation.
Ipfs is a p2p network for locating and providing chunks of content addressed datacalled blocks.
Content addressing means that the data is located via it's hash as opposed tolocation addressing.
Unsurprisingly this is done using a distributed hash table. To avoid storing largeamounts of data in the dht, the dht stores which peers have a block. After determiningthe peers that are providing a block, the block is requested from those peers.
To verify that the peer is sending the requested block and not an infinite stream ofgarbage, blocks need to have a finite size. In practice we'll assume a maximum blocksize of 1MB.
To encode arbitrary data in to 1MB blocks imposes two requirements on the codec. Itneeds to have a canonical representation to ensure that the same data results in thesame hash and it needs to support linking to other content addressed blocks. Codecshaving these two properties are called ipld codecs.
A property that follows from content addressing (representing edges as hashes of thenode) is that arbitrary graphs of blocks are not possible. A graph of blocks isguaranteed to be directed and acyclic.
{"a":3}
{"a":3,}
{"/":"QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8u"}
Let's start with a naive model of a persistent block store.
traitBlockStorage{fnget(&self,cid:&Cid) ->Result<Option<Vec<u8>>>;fninsert(&mutself,cid:&Cid,data:&[u8]) ->Result<()>;fnremove(&mutself,cid:&Cid) ->Result<()>;}
Since content addressed blocks form a directed acyclic graph, blocks can't simplybe deleted. A block may be referenced by multiple nodes, so some form of referencecounting and garbage collection is required to determine when a block can safelybe deleted. In the interest of being a good peer on the p2p network, we may want tokeep old blocks around that other peers may want. So thinking of it as a referencecounted cache may be a more appropriate model. We end up with something like this:
traitBlockStorage{fnget(&self,cid:&Cid) ->Result<Option<Vec<u8>>>;fninsert(&mutself,cid:&Cid,data:&[u8],references:&[Cid]) ->Result<()>;fnevict(&mutself) ->Result<()>;fnpin(&mutself,cid:&Cid) ->Result<()>;fnunpin(&mutself,cid:&Cid) ->Result<()>;}
To mutate a block we need to perform three steps. Get the block, modify and insert themodified block and finally remove the old one. We also need a map from keys to cids, soeven more steps are required. Any of these steps can fail leaving the block store in aninconsistent state, leading to data leakage. To prevent data leakage every api consumerwould have to implement a write-ahead-log. To resolve these issues we extend the storewith named pins called aliases.
traitBlockStorage{fnget(&self,cid:&Cid) ->Result<Option<Vec<u8>>>;fninsert(&mutself,cid:&Cid,data:&[u8],references:&[Cid]) ->Result<()>;fnevict(&mutself) ->Result<()>;fnalias(&mutself,alias:&[u8],cid:Option<&Cid>) ->Result<()>;fnresolve(&self,alias:&[u8]) ->Result<Option<Cid>>;}
Assuming that each operation is atomic and durable, we have the minimal set of operationsrequired to store dags of content addressed blocks.
implIpfs{pubfnnew(storage:Arc<S>,network:Arc<N>) ->Self{ ..}pubfnlocal_peer_id(&self) ->&PeerId{ ..}pubasyncfnlisteners(&self) ->Vec<Multiaddr>{ ..}pubasyncfnexternal_addresses(&self) ->Vec<Multiaddr>{ ..}pubasyncfnpinned(&self,cid:&Cid) ->Result<Option<bool>>{ ..}pubasyncfnget(&self,cid:&Cid) ->Result<Block>{ifletSome(block) =self.storage.get(cid)?{returnOk(block);}self.network.get(cid).await?;ifletSome(block) =self.storage.get(cid)?{returnOk(block);} log::error!("block evicted too soon");Err(BlockNotFound(*cid))}pubasyncfninsert(&self,cid:&Cid) ->Result<()>{self.storage.insert(cid)?;self.network.provide(cid)?;Ok(())}pubasyncfnalias(&self,alias:&[u8],cid:Option<&Cid>) ->Result<()>{ifletSome(cid) = cid{self.network.sync(cid).await?;}self.storage.alias(alias, cid).await?;Ok(())}pubasyncfnresolve(&self,alias:&[u8]) ->Result<Option<Cid>>{self.storage.resolve(alias)?;Ok(())}}
We'll be looking at some patterns used in thechain
example. Thechain
example usesipfs-embed
to store a chain of blocks. A block is defined as:
#[derive(Debug,Default,DagCbor)]pubstructBlock{prev:Option<Cid>,id:u32,loopback:Option<Cid>,payload:Vec<u8>,}
We have to different db's in this example. The one managed byipfs-embed
that storesblocks and aliases and one specific to the example that maps the block index to the blockcid, so that we can lookup blocks quickly without having to traverse the entire chain. Toguarantee atomicity we define two aliases and perform the syncing in two steps. This ensuresthat the synced chain always has it's blocks indexed.
constTMP_ROOT:&str =alias!(tmp_root);constROOT:&str =alias!(root);ipfs.alias(TMP_ROOT,Some(new_root)).await?;for _in prev_root_id..new_root_id{// index block may error for various reasons}ipfs.alias(ROOT,Some(new_root)).await?;
The recursive syncing algorithm will perform worst when it is syncing a chain, as every block needsto be synced one after the other, without being able to take advantage of any parallelism. Toresolve this issue we increase the linking of the chain by including loopbacks, to increase thebranching of the dag.
An algorithm was proposed by @rklaehn for this purpose:
fnloopback(block:usize) ->Option<usize>{let x = block.trailing_zeros();if x >1 && block >0{Some(block -(1 <<(x -1)))}else{None}}
Syncing can take a long time and doesn't allow selecting the subset of data that is needed. Forthis purpose there is an experimentalalias_with_syncer
api that allows customizing the syncingbehaviour. In the chain example it is used to provide block validation, to ensure that the blocksare valid. Altough this api is likely to change in the future.
pubstructChainSyncer<S:StoreParams,T:Storage<S>>{index: sled::Db,storage:BitswapStorage<S,T>,}impl<S:StoreParams,T:Storage<S>>BitswapSyncforChainSyncer<S,T>whereS::Codecs:Into<DagCborCodec>,{fnreferences(&self,cid:&Cid) ->Box<dynIterator<Item =Cid>>{ifletSome(data) =self.storage.get(cid){let ipld_block = libipld::Block::<S>::new_unchecked(*cid, data);ifletOk(block) = ipld_block.decode::<DagCborCodec,Block>(){returnBox::new(block.prev.into_iter().chain(block.loopback.into_iter()));}}Box::new(std::iter::empty())}fncontains(&self,cid:&Cid) ->bool{self.storage.contains(cid)}}
Ipfs embed uses SQLite to implement the block store, which is a performant embeddable SQL persistence layer / database.
typeId =u64;typeAtime =u64;#[derive(Clone)]structBlockCache{// Cid -> Idlookup:Tree,// Id -> Cidcid:Tree,// Id -> Vec<u8>data:Tree,// Id -> Vec<Id>refs:Tree,// Id -> Atimeatime:Tree,// Atime -> Idlru:Tree,}implBlockCache{// Updates the atime and lru trees and returns the data from the data tree.fnget(&self,cid:&Cid) ->Result<Option<Vec<u8>>>{ ..}// Returns an iterator of blocks sorted by least recently used.fnlru(&self) ->implIterator<Item =Result<Id>>{self.lru.iter().values()}// Inserts into all trees.fninsert(&self,cid:&Cid,data:&[u8]) ->Result<()>{ ...}// Removes from all trees.fnremove(&self,id:&Id) ->Result<()>{ ...}// Returns the recursive set of references.fnclosure(&self,cid:&Cid) ->Result<Vec<Id>>{ ...}// A stream of insert/remove events, useful for plugging in a network layer.fnsubscribe(&self) ->implStream<Item =StorageEvent>{ ...}}
Given the description of operations and how it's structured in terms of trees, these operationsare straight forward to implement.
#[derive(Clone)]structBlockStorage{cache:BlockCache,// Vec<u8> -> Idalias:Tree,// Bag of live idsfilter:Arc<Mutex<CuockooFilter>>,// Id -> Vec<Id>closure:Tree,}implBlockStorage{// get from cachefnget(&self,cid:&Cid) ->Result<Option<Vec<u8>>>{self.cache.get(cid)}// insert to cachefninsert(&self,cid:&Cid,data:&[u8]) ->Result<()>{self.cache.insert(cid, data)}// returns the value of the alias treefnresolve(&self,alias:&[u8]) ->Result<Option<Cid>>{ ...}// remove the lru block that is not in the bag of live ids and remove it's closure from// the closure treefnevict(&self) ->Result<()>{ ...}// aliasing is an expensive operation, the implementation is sketched in pseudo codefnalias(&self,alias:&[u8],cid:Option<&Cid>) ->Result<()>{// precompute the closurelet prev_id =self.alias.get(alias)?;let prev_closure =self.closure.get(&prev_id)?;let new_id =self.cache.lookup(&cid);let new_closure =self.cache.closure(&cid);// lock the filter preventing evictionsletmut filter =self.filter.lock().unwrap();// make sure that new closure wasn't evicted in the mean timefor idin&new_closure{if !self.cache.contains_id(&id){returnErr("cannot alias, missing references");}}// update the live setfor idin&new_closure{ filter.add(id);}for idin&prev_closure{ filter.delete(id);}// perform transactionlet res =(&self.alias,&self.closure).transaction(|(talias, tclosure)|{ifletSome(id) = prev_id.as_ref(){ talias.remove(alias)?;}ifletSome(id) = id.as_ref(){ talias.insert(alias, id)?; tclosure.insert(id,&closure)?;}Ok(())});// if transaction failed revert live set to previous stateif res.is_err(){for idin&prev_closure{ filter.add(id);}for idin&closure{ filter.delete(id)}} res}}
Bitswap is a very simple protocol. It was adapted and simplified for ipfs-embed. The messageformat can be represented by the following enums.
pubenumBitswapRequest{Have(Cid),Block(Cid),}pubenumBitswapResponse{Have(bool),Block(Vec<u8>),}
The mechanism for locating providers can be abstracted. A dht can be plugged in or a centralizeddb query. The bitswap api looks as follows:
pubenumQuery{Get(Cid),Sync(Cid),}pubenumBitswapEvent{GetProviders(Cid),QueryComplete(Query,Result<()>),}implBitswap{pubfnadd_address(&mutself,peer_id:&PeerId,addr:Multiaddr){ ..}pubfnget(&mutself,cid:Cid){ ..}pubfncancel_get(&mutself,cid:Cid){ ..}pubfnadd_provider(&mutself,cid:Cid,peer_id:PeerId){ ..}pubfncomplete_get_providers(&mutself,cid:Cid){ ..}pubfnpoll(&mutself,cx:&mutContext) ->BitswapEvent{ ..}}
So what happens when you create a get request? First all the providers in the initial setare queried with the have request. As an optimization, in every batch of queries a blockrequest is sent instead. If the get query finds a block it returns a query complete. If theblock wasn't found in the initial set, aGetProviders(Cid)
event is emitted. This is wherethe bitswap consumer tries to locate providers by for example performing a dht lookup. Theseproviders are registered by calling theadd_provider
method. After the locating of providerscompletes, it is signaled by callingcomplete_get_providers
. The query manager then performsbitswap requests using the new provider set which results in the block being found or a blocknot found error.
Often we want to sync an entire dag of blocks. We can efficiently sync dags of blocks by addinga sync query that runs get queries in parallel for all the references of a block. The set ofproviders that had a block is used as the initial set in a reference query. For this we extendour api with the following calls.
/// Bitswap sync trait for customizing the syncing behaviour.pubtraitBitswapSync{/// Returns the list of blocks that need to be synced.fnreferences(&self,cid:&Cid) ->Box<dynIterator<Item =Cid>>;/// Returns if a cid needs to be synced.fncontains(&self,cid:&Cid) ->bool;}implBitswap{pubfnsync(&mutself,cid:Cid,syncer:Arc<dynBitswapSync>){ ..}pubfncancel_sync(&mutself,cid:Cid){ ..}}
Note that we can customize the syncing behaviour arbitrarily by selecting a subset of blockswe want to sync. See design patterns for more information.
MIT OR Apache-2.0