diff --git a/crates/sc-consensus-subspace/src/archiver.rs b/crates/sc-consensus-subspace/src/archiver.rs index 9344f633be..6fd9f434ef 100644 --- a/crates/sc-consensus-subspace/src/archiver.rs +++ b/crates/sc-consensus-subspace/src/archiver.rs @@ -58,7 +58,9 @@ use rand::prelude::*; use rand_chacha::ChaCha8Rng; use rayon::prelude::*; use rayon::ThreadPoolBuilder; -use sc_client_api::{AuxStore, Backend as BackendT, BlockBackend, Finalizer, LockImportRun}; +use sc_client_api::{ + AuxStore, Backend as BackendT, BlockBackend, BlockchainEvents, Finalizer, LockImportRun, +}; use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_INFO}; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender}; use sp_api::ProvideRuntimeApi; @@ -830,7 +832,7 @@ where fn finalize_block( client: &Client, - telemetry: Option, + telemetry: Option<&TelemetryHandle>, hash: Block::Hash, number: NumberFor, ) where @@ -916,6 +918,7 @@ where + HeaderBackend + LockImportRun + Finalizer + + BlockchainEvents + AuxStore + Send + Sync @@ -1055,12 +1058,12 @@ where } } + let max_segment_index_before = segment_headers_store.max_segment_index(); (best_archived_block_hash, best_archived_block_number) = archive_block( &mut archiver, segment_headers_store.clone(), &*client, &sync_oracle, - telemetry.clone(), subspace_link.object_mapping_notification_sender.clone(), subspace_link.archived_segment_notification_sender.clone(), best_archived_block_hash, @@ -1068,6 +1071,54 @@ where create_object_mappings, ) .await?; + + let max_segment_index = segment_headers_store.max_segment_index(); + if max_segment_index_before != max_segment_index { + let maybe_block_number_to_finalize = max_segment_index + // Skip last `FINALIZATION_DEPTH_IN_SEGMENTS` archived segments + .and_then(|max_segment_index| { + max_segment_index.checked_sub(FINALIZATION_DEPTH_IN_SEGMENTS) + }) + .and_then(|segment_index| { + segment_headers_store.get_segment_header(segment_index) + }) + .map(|segment_header| segment_header.last_archived_block().number) + // Make sure not to finalize block number that does not yet exist (segment + // headers store may contain future blocks during initial sync) + .map(|block_number| block_number_to_archive.min(block_number.into())) + // Do not finalize blocks twice + .filter(|block_number| *block_number > client.info().finalized_number); + + if let Some(block_number_to_finalize) = maybe_block_number_to_finalize { + { + let mut import_notification = client.every_import_notification_stream(); + + // Drop notification to drop acknowledgement and allow block import to + // proceed + drop(block_importing_notification); + + while let Some(notification) = import_notification.next().await { + // Wait for importing block to finish importing + if notification.header.number() == &importing_block_number { + break; + } + } + } + + // Block is not guaranteed to be present this deep if we have only synced recent + // blocks + if let Some(block_hash_to_finalize) = + client.block_hash(block_number_to_finalize)? + { + finalize_block( + &*client, + telemetry.as_ref(), + block_hash_to_finalize, + block_number_to_finalize, + ); + } + } + } } Ok(()) @@ -1081,7 +1132,6 @@ async fn archive_block( segment_headers_store: SegmentHeadersStore, client: &Client, sync_oracle: &SubspaceSyncOracle, - telemetry: Option, object_mapping_notification_sender: SubspaceNotificationSender, archived_segment_notification_sender: SubspaceNotificationSender, best_archived_block_hash: Block::Hash, @@ -1157,7 +1207,6 @@ where encoded_block.len() as f32 / 1024.0 ); - let mut new_segment_headers = Vec::new(); let block_outcome = archiver.add_block( encoded_block, block_object_mappings, @@ -1175,36 +1224,6 @@ where send_archived_segment_notification(&archived_segment_notification_sender, archived_segment) .await; - - new_segment_headers.push(segment_header); - } - - if !new_segment_headers.is_empty() { - let maybe_block_number_to_finalize = segment_headers_store - .max_segment_index() - // Skip last `FINALIZATION_DEPTH_IN_SEGMENTS` archived segments - .and_then(|max_segment_index| { - max_segment_index.checked_sub(FINALIZATION_DEPTH_IN_SEGMENTS) - }) - .and_then(|segment_index| segment_headers_store.get_segment_header(segment_index)) - .map(|segment_header| segment_header.last_archived_block().number) - // Make sure not to finalize block number that does not yet exist (segment - // headers store may contain future blocks during initial sync) - .map(|block_number| block_number_to_archive.min(block_number.into())) - // Do not finalize blocks twice - .filter(|block_number| *block_number > client.info().finalized_number); - - if let Some(block_number_to_finalize) = maybe_block_number_to_finalize { - // Block is not guaranteed to be present this deep if we have only synced recent blocks - if let Some(block_hash_to_finalize) = client.block_hash(block_number_to_finalize)? { - finalize_block( - client, - telemetry.clone(), - block_hash_to_finalize, - block_number_to_finalize, - ); - } - } } Ok((block_hash_to_archive, block_number_to_archive))