Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 84 additions & 15 deletions crates/floresta-electrum/src/electrum_protocol.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::borrow::Borrow;
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
Expand Down Expand Up @@ -112,7 +111,7 @@ pub struct ElectrumServer<Blockchain: BlockchainInterface> {
/// We accumulate those addresses here and then periodically
/// scan, since a wallet will often send multiple addresses, but
/// in different requests.
pub addresses_to_scan: Vec<sha256::Hash>,
pub addresses_to_scan: Vec<ScriptBuf>,
}

impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
Expand Down Expand Up @@ -275,6 +274,76 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
let hash = get_arg!(request, sha256::Hash, 0);
self.client_addresses.insert(hash, client);

let history = self.address_cache.read().await.get_address_history(&hash);
match history {
Some(transactions) if !transactions.is_empty() => {
let res = get_status(transactions);
json_rpc_res!(request, res)
}
_ => {
json_rpc_res!(request, null)
}
}
}
"blockchain.scripthash.unsubscribe" => {
let address = get_arg!(request, sha256::Hash, 0);
self.client_addresses.remove(&address);
json_rpc_res!(request, true)
}

// those endpoinsts are experimental and aren't implemented by any other implementation yet
"blockchain.scriptpubkey.get_balance" => {
let script = get_arg!(request, ScriptBuf, 0);
let hash = get_spk_hash(&script);

if !self.address_cache.read().await.is_address_cached(&hash) {
self.address_cache.write().await.cache_address_hash(hash);
self.addresses_to_scan.push(script);
let res = json!({
"confirmed": 0,
"unconfirmed": 0
});
return json_rpc_res!(request, res);
}

let balance = self.address_cache.read().await.get_address_balance(&hash);
let result = json!({
"confirmed": balance,
"unconfirmed": 0
});
json_rpc_res!(request, result)
}
"blockchain.scriptpubkey.get_history" => {
let script = get_arg!(request, ScriptBuf, 0);
let hash = get_spk_hash(&script);

if !self.address_cache.read().await.is_address_cached(&hash) {
self.address_cache.write().await.cache_address_hash(hash);
self.addresses_to_scan.push(script);
return json_rpc_res!(request, null);
}

self.address_cache
.read()
.await
.get_address_history(&hash)
.map(|transactions| {
let res = Self::process_history(&transactions);
json_rpc_res!(request, res)
})
.unwrap_or_else(|| {
Ok(json!({
"jsonrpc": "2.0",
"result": null,
"id": request.id
}))
})
}
"blockchain.scriptpubkey.subscribe" => {
let script = get_arg!(request, ScriptBuf, 0);
let hash = get_spk_hash(&script);
self.client_addresses.insert(hash, client);

let history = self.address_cache.read().await.get_address_history(&hash);
match history {
Some(transactions) if !transactions.is_empty() => {
Expand All @@ -285,16 +354,19 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
json_rpc_res!(request, null)
}
None => {
self.addresses_to_scan.push(hash);
self.addresses_to_scan.push(script);
json_rpc_res!(request, null)
}
}
}
"blockchain.scripthash.unsubscribe" => {
let address = get_arg!(request, sha256::Hash, 0);
self.client_addresses.remove(&address);
"blockchain.scriptpubkey.unsubscribe" => {
let script = get_arg!(request, ScriptBuf, 0);
let hash = get_spk_hash(&script);
self.client_addresses.remove(&hash);
json_rpc_res!(request, true)
}

// end of experimental endpoints
"blockchain.transaction.broadcast" => {
let tx = get_arg!(request, String, 0);
let hex: Vec<_> =
Expand Down Expand Up @@ -361,8 +433,8 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
{
"genesis_hash": genesis_hash,
"hosts": {"127.0.0.1": {"tcp_port": 50001}},
"protocol_max": "1.4",
"protocol_min": "1.0",
"protocol_max": "1.5",
"protocol_min": "1.4",
"pruning": null,
"server_version": format!("Floresta {}", env!("CARGO_PKG_VERSION")),
"hash_function": "sha256"
Expand Down Expand Up @@ -407,10 +479,7 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
// rescan for new addresses, if any
if !self.addresses_to_scan.is_empty() {
info!("Catching up with addresses {:?}", self.addresses_to_scan);
let addresses: Vec<sha256::Hash> = self.addresses_to_scan.drain(..).collect();
for address in addresses.iter().copied() {
self.address_cache.write().await.cache_address_hash(address);
}
let addresses: Vec<_> = self.addresses_to_scan.drain(..).collect();
self.rescan_for_addresses(addresses).await?;
}
}
Expand All @@ -424,7 +493,7 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
/// more bandwidth-intensive method of actually downloading blocks.
async fn rescan_for_addresses(
&mut self,
addresses: Vec<sha256::Hash>,
addresses: Vec<ScriptBuf>,
) -> Result<(), super::error::Error> {
// If compact block filters are enabled, use them. Otherwise, fallback
// to the "old-school" rescaning.
Expand All @@ -443,13 +512,13 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
async fn rescan_with_block_filters(
&self,
cfilters: &Arc<NetworkFilters<KvFilterStore>>,
addresses: Vec<sha256::Hash>,
addresses: Vec<ScriptBuf>,
) -> Result<(), super::error::Error> {
// By default, we look from 1..tip
let height = self.chain.get_height().unwrap_or(0) as u64;
let mut _addresses = addresses
.iter()
.map(|hash| hash.borrow())
.map(|address| address.as_bytes())
.collect::<Vec<_>>();

// TODO (Davidson): Let users select what the starting and end height is
Expand Down
4 changes: 4 additions & 0 deletions crates/floresta-watch-only/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,10 @@ impl<D: AddressCacheDatabase> AddressCache<D> {
let known_descs = self.database.descs_get()?;
Ok(known_descs.contains(desc))
}
/// Tells wheter an address is already cached
pub fn is_address_cached(&self, script_hash: &Hash) -> bool {
self.address_map.contains_key(script_hash)
}
pub fn push_descriptor(&self, descriptor: &str) -> Result<(), WatchOnlyError<D::Error>> {
Ok(self.database.desc_save(descriptor)?)
}
Expand Down