Skip to content
Snippets Groups Projects

Rewrite get-file to exit faster when not enough blocks are available

Open DISSOUBRAY Nathan requested to merge timeout_get_file into master
1 file
+ 130
34
Compare changes
  • Side-by-side
  • Inline
+ 130
34
@@ -1080,6 +1080,7 @@ where {
"Got provider list for file {}: {:?}",
file_hash, provider_list
);
let number_of_providers = provider_list.len();
// Check where to write the blocks
let (block_dir_sender, block_dir_recv) = oneshot::channel();
@@ -1150,6 +1151,7 @@ where {
async fn download_first_k_blocks<F, G, P>(
mut info_receiver: UnboundedReceiver<Result<PeerBlockInfo>>,
number_of_providers: usize,
powers_path: PathBuf,
block_hashes_on_disk: &mut Vec<String>,
cmd_sender: UnboundedSender<DragoonCommand>,
@@ -1162,13 +1164,84 @@ where {
P: DenseUVPolynomial<F>,
for<'a, 'b> &'a P: Div<&'b P, Output = P>,
{
// returns true if loop should be continued, false if return should be used
fn receive_blocks<F, G, P>(
response: Result<Option<BlockResponse>>,
file_hash: &String,
powers: &Powers<F, G>,
block_dir: &Path,
number_of_blocks_written: &mut u32,
block_hashes_on_disk: &mut Vec<String>,
block_k: &mut u32,
) -> Result<bool>
where
F: PrimeField,
G: CurveGroup<ScalarField = F>,
P: DenseUVPolynomial<F>,
for<'a, 'b> &'a P: Div<&'b P, Output = P>,
{
//TODO change this unwrap
let maybe_block_response = response.unwrap();
if let Some(block_response) = maybe_block_response {
let block: Block<F, G> = match Block::deserialize_with_mode(
&block_response.block_data[..],
Compress::Yes,
Validate::Yes,
) {
Ok(block) => block,
Err(e) => {
error!(
"Could not deserialize a block in get-file, got error: {}",
e
);
return Ok(true);
}
};
debug!(
"Got a block for the file {} : {} ",
file_hash, block_response.block_hash
);
let number_of_blocks_to_reconstruct_file = block.shard.k;
debug!(
"Number of blocks to reconstruct file {} : {}",
file_hash, number_of_blocks_to_reconstruct_file
);
if verify::<F, G, P>(&block, powers)? {
//TODO check if the new block is not linearly dependant with the other blocks already on disk
debug!(
"Block {} for file {} was verified successfully; Now dumping to disk",
block_response.block_hash, file_hash
);
let _ = fs::dump(&block, block_dir, None, Compress::Yes)?;
*number_of_blocks_written += 1;
block_hashes_on_disk.push(block_response.block_hash);
if *block_k == 0 {
*block_k = number_of_blocks_to_reconstruct_file
}
if *number_of_blocks_written >= number_of_blocks_to_reconstruct_file {
debug!("Received exactly {} blocks, pausing block download and trying to reconstruct the file {}", number_of_blocks_to_reconstruct_file, file_hash);
//TODO properly stop downloads ? drop/close receiver ?
return Ok(false);
}
} else {
//TODO ask the block again ? change provider ?
todo!()
}
} else {
error!("No block response was sent when using get file, the node might have saved it to disk")
}
Ok(true)
}
let mut already_request_block = vec![];
let powers = get_powers(powers_path).await?;
let powers: Powers<F, G> = get_powers(powers_path).await?;
let mut number_of_blocks_written: u32 = 0;
let (block_sender, mut block_receiver) = mpsc::unbounded_channel();
let mut number_of_provider_response = 0;
let mut number_of_blocks_to_reconstruct_file = 0;
'download_first_k_blocks: loop {
'receive_all_info: loop {
tokio::select! {
biased;
Some(response) = info_receiver.recv() => {
@@ -1177,6 +1250,7 @@ where {
let response = response.map_err(|e| -> anyhow::Error {
format_err!("Could not retrieve peer block block info: {}", e)
})?;
number_of_provider_response += 1;
let PeerBlockInfo { peer_id_base_58, file_hash, block_hashes, .. } = response;
debug!("Got block list from {} for file {} : {:?}", peer_id_base_58, file_hash, block_hashes);
let blocks_to_request: Vec<String> = block_hashes
@@ -1196,44 +1270,59 @@ where {
}
}
},
Some(response) = block_receiver.recv() => {
//TODO change this unwrap
let maybe_block_response = response.unwrap();
if let Some(block_response) = maybe_block_response {
let block: Block<F,G> = match Block::deserialize_with_mode(&block_response.block_data[..], Compress::Yes, Validate::Yes) {
Ok(block) => block,
Err(e) => {error!("Could not deserialize a block in get-file, got error: {}", e);
continue 'download_first_k_blocks}
};
debug!("Got a block for the file {} : {} ", file_hash, block_response.block_hash);
let number_of_blocks_to_reconstruct_file = block.shard.k;
debug!("Number of blocks to reconstruct file {} : {}", file_hash, number_of_blocks_to_reconstruct_file);
if verify::<F,G,P>(&block, &powers)? {
//TODO check if the new block is not linearly dependant with the other blocks already on disk
debug!("Block {} for file {} was verified successfully; Now dumping to disk", block_response.block_hash, file_hash);
let _ = fs::dump(&block, &block_dir, None, Compress::Yes)?;
number_of_blocks_written += 1;
block_hashes_on_disk.push(block_response.block_hash);
if number_of_blocks_written >= number_of_blocks_to_reconstruct_file {
debug!("Received exactly {} blocks, pausing block download and trying to reconstruct the file {}", number_of_blocks_to_reconstruct_file, file_hash);
//TODO properly stop downloads ? drop/close receiver ?
break 'download_first_k_blocks;
if number_of_provider_response >= number_of_providers {
drop(block_sender);
if already_request_block.len() >= (number_of_blocks_to_reconstruct_file as usize){
break 'receive_all_info;
}
else {
return Err(format_err!("Getting the required amount of blocks to make the file failed, there are not enough blocks in all known peers to decode the file"));
}
}
else {
//TODO ask the block again ? change provider ?
todo!()
},
Some(response) = block_receiver.recv() => {
if let Ok(continue_loop) = receive_blocks(
response,
&file_hash,
&powers,
&block_dir,
&mut number_of_blocks_written,
block_hashes_on_disk,
&mut number_of_blocks_to_reconstruct_file,
) {
if !continue_loop {
return Ok(()) //this means we already have k blocks
}
}
else {
error!("No block response was sent when using get file, the node might have saved it to disk")
}
}
}
}
'finish_k_block_download: loop {
if let Some(response) = block_receiver.recv().await {
if let Ok(continue_loop) = receive_blocks(
response,
&file_hash,
&powers,
&block_dir,
&mut number_of_blocks_written,
block_hashes_on_disk,
&mut number_of_blocks_to_reconstruct_file,
) {
if !continue_loop {
break 'finish_k_block_download;
}
}
} else {
// meaning all sender have been dropped but we didn't reach k block
// there is no need to even try to decode the file
return Err(format_err!("Getting the required amount of blocks to make the file failed, all peers have already answered, not enough blocks to make the file"));
}
}
Ok(())
}
@@ -1243,6 +1332,7 @@ where {
timeout_duration,
download_first_k_blocks::<F, G, P>(
info_receiver,
number_of_providers,
powers_path,
&mut block_hashes_on_disk,
cmd_sender,
@@ -1271,7 +1361,7 @@ where {
}
}
let _ = Self::decode_blocks::<F, G>(
let res = Self::decode_blocks::<F, G>(
block_dir.clone(),
&block_hashes_on_disk,
output_filename.clone(),
@@ -1281,8 +1371,14 @@ where {
//TODO if it fails, keep requesting block info, try to check which matrix is invertible taking k-1 blocks already on disk and one more that isn't
//TODO if it fails, do the same with k-2, etc...
//TODO when a combination of the blocks that works is found, request the missing blocks
Ok([file_dir, PathBuf::from(output_filename)].iter().collect())
//Ok(PathBuf::from(format!("{:?}/{}", file_dir, output_filename)))
if res.is_err() {
Err(format_err!(
"Could not decode blocks due to the following: {:?}",
res
))
} else {
Ok([file_dir, PathBuf::from(output_filename)].iter().collect())
}
}
async fn dial(&mut self, multiaddr: String) -> Result<()> {
Loading