1313use std:: { io:: Cursor , sync:: Arc } ;
1414
1515use futures:: future;
16+ use futures:: stream:: FuturesUnordered ;
1617use itertools:: Itertools ;
1718use manifest:: ManifestReader ;
1819use manifest_list:: read_snapshot;
@@ -313,9 +314,8 @@ async fn datafiles(
313314 None => Box :: new ( manifests. iter ( ) ) ,
314315 } ;
315316
316- // Collect a vector of data files by creating a stream over the manifst files, fetch their content and return a flatten stream over their entries.
317- Ok ( stream:: iter ( iter)
318- . then ( move |file| {
317+ let stream: FuturesUnordered < _ > = iter
318+ . map ( move |file| {
319319 let object_store = object_store. clone ( ) ;
320320 async move {
321321 let manifest_path = & file. manifest_path ;
@@ -329,35 +329,51 @@ async fn datafiles(
329329 Ok :: < _ , Error > ( ( bytes, manifest_path, file. sequence_number ) )
330330 }
331331 } )
332- . flat_map_unordered ( None , move |result| {
333- let ( bytes, path, sequence_number) = result. unwrap ( ) ;
332+ . collect ( ) ;
334333
335- let reader = ManifestReader :: new ( bytes) . unwrap ( ) ;
336- stream:: iter ( reader) . try_filter_map ( move |mut x| {
337- future:: ready ( {
338- let sequence_number = if let Some ( sequence_number) = x. sequence_number ( ) {
339- * sequence_number
340- } else {
341- * x. sequence_number_mut ( ) = Some ( sequence_number) ;
342- sequence_number
343- } ;
334+ // // Collect a vector of data files by creating a stream over the manifst files, fetch their content and return a flatten stream over their entries.
335+ // let stream = stream::iter(iter).then(move |file| {
336+ // let object_store = object_store.clone();
337+ // async move {
338+ // let manifest_path = &file.manifest_path;
339+ // let path: Path = util::strip_prefix(manifest_path).into();
340+ // let bytes = Cursor::new(Vec::from(
341+ // object_store
342+ // .get(&path)
343+ // .and_then(|file| file.bytes())
344+ // .await?,
345+ // ));
346+ // Ok::<_, Error>((bytes, manifest_path, file.sequence_number))
347+ // }
348+ // });
349+ //
350+ Ok ( stream. flat_map_unordered ( None , move |result| {
351+ let ( bytes, path, sequence_number) = result. unwrap ( ) ;
344352
345- let filter = match sequence_number_range {
346- ( Some ( start) , Some ( end) ) => {
347- start < sequence_number && sequence_number <= end
348- }
349- ( Some ( start) , None ) => start < sequence_number,
350- ( None , Some ( end) ) => sequence_number <= end,
351- _ => true ,
352- } ;
353- if filter {
354- Ok ( Some ( ( path. to_owned ( ) , x) ) )
355- } else {
356- Ok ( None )
357- }
358- } )
353+ let reader = ManifestReader :: new ( bytes) . unwrap ( ) ;
354+ stream:: iter ( reader) . try_filter_map ( move |mut x| {
355+ future:: ready ( {
356+ let sequence_number = if let Some ( sequence_number) = x. sequence_number ( ) {
357+ * sequence_number
358+ } else {
359+ * x. sequence_number_mut ( ) = Some ( sequence_number) ;
360+ sequence_number
361+ } ;
362+
363+ let filter = match sequence_number_range {
364+ ( Some ( start) , Some ( end) ) => start < sequence_number && sequence_number <= end,
365+ ( Some ( start) , None ) => start < sequence_number,
366+ ( None , Some ( end) ) => sequence_number <= end,
367+ _ => true ,
368+ } ;
369+ if filter {
370+ Ok ( Some ( ( path. to_owned ( ) , x) ) )
371+ } else {
372+ Ok ( None )
373+ }
359374 } )
360- } ) )
375+ } )
376+ } ) )
361377}
362378
363379/// delete all datafiles, manifests and metadata files, does not remove table from catalog
0 commit comments