1- # hmmscan --cpu 32 --tblout "${BUG}_genes_COG.tbl" "$COG_DB" "${BUG}_translated_gene_seqs.fasta"
2- write_compressed_parquet <- function (df , path ) {
1+ # ' Write a data frame to a compressed Parquet file
2+ # '
3+ # ' @param df A data frame or tibble to write.
4+ # ' @param path Output file path (`.parquet` extension).
5+ # '
6+ # ' @keywords internal
7+ .write_compressed_parquet <- function (df , path ) {
38 arrow :: write_parquet(
49 df ,
510 path ,
@@ -11,14 +16,16 @@ write_compressed_parquet <- function(df, path) {
1116
1217.runHMMER <- function (duckdb_path ,
1318 output_path ,
14- threads = 0 ,
19+ threads = 8L ,
1520 database_path ,
21+ docker_image = " staphb/hmmer" ,
1622 split_jobs = TRUE ,
17- num_of_splits = 20 ) {
23+ num_of_splits = 20L ,
24+ n_workers = 4L ) {
1825 # Fail fast if Docker is missing
19- # if (!nzchar(Sys.which("docker"))) {
20- # stop("Docker is not available on your PATH but is required to run CD-HIT .")
21- # }
26+ if (! nzchar(Sys.which(" docker" ))) {
27+ stop(" Docker is not available on your PATH but is required to run HMMER ." )
28+ }
2229
2330 duckdb_path <- .docker_path(duckdb_path )
2431 if (missing(output_path ) || output_path %in% c(" ." , " results" , " results/" )) {
@@ -33,14 +40,14 @@ write_compressed_parquet <- function(df, path) {
3340 prot_seqs <- DBI :: dbReadTable(con , " protein_cluster_seq" ) | >
3441 tibble :: as_tibble()
3542
36- # robust database name
43+ # derive a clean label from the database filename
3744 database <- tools :: file_path_sans_ext(basename(database_path ))
3845
39- # Chunking function
40- chunk_count <- num_of_splits
46+ # clamp splits to the number of sequences available
47+ chunk_count <- min(as.integer( num_of_splits ), nrow( prot_seqs ))
4148
4249 split_fasta <- function (seqs , prefix ) {
43- records <- paste0(" > " , seqs $ name , " \n " , seqs $ sequence )
50+ records <- paste0(" >" , seqs $ name , " \n " , seqs $ sequence )
4451 chunk_size <- ceiling(length(records ) / chunk_count )
4552 chunks <- split(records , ceiling(seq_along(records ) / chunk_size ))
4653
@@ -52,12 +59,8 @@ write_compressed_parquet <- function(df, path) {
5259
5360 split_fasta(prot_seqs , " protein" )
5461
55- # readr::write_lines(paste0("> ", prot_seqs$name, "\n", prot_seqs$sequence),
56- # file.path(output_path, "proteins_for_hmmer.fasta"))
57-
58- # Generate job list for ARG and COG
5962 job_list <- expand.grid(
60- chunk = sprintf(" %02d" , 1 : chunk_count ),
63+ chunk = sprintf(" %02d" , seq_len( chunk_count ) ),
6164 db = database ,
6265 stringsAsFactors = FALSE
6366 ) | >
@@ -68,16 +71,6 @@ write_compressed_parquet <- function(df, path) {
6871 ) | >
6972 dplyr :: select(JOB_NAME , FASTA , DB )
7073
71- # readr::write_tsv(job_list, file.path(output_path, paste0("hmmer_jobs_",database,".txt")))
72-
73- # number of parallel jobs (NOT threads per hmmscan)
74- n_workers <- 4
75-
76- # threads per hmmscan
77- threads <- 8
78-
79- future :: plan(future :: multisession , workers = n_workers )
80-
8174 .runHmmerJob <- function (JOB_NAME , FASTA , DB ) {
8275 hmmer_input <- file.path(output_path , FASTA )
8376 hmmer_output <- file.path(output_path , paste0(JOB_NAME , " .tbl" ))
@@ -93,83 +86,89 @@ write_compressed_parquet <- function(df, path) {
9386 mount_cont <- " /work"
9487
9588 cmd_args <- c(
96- " exec " ,
97- " -B " , paste0(mount_host , " :" , mount_cont ),
98- " -B " , paste0(db_host_dir , " :" , db_cont_dir ),
99- " /scratch/alpine/aghosh5@xsede.org/software/hmmer_latest.sif " ,
89+ " run " , " --rm " ,
90+ " -v " , paste0(mount_host , " :" , mount_cont ),
91+ " -v " , paste0(db_host_dir , " :" , db_cont_dir ),
92+ docker_image ,
10093 " hmmscan" ,
10194 " --cpu" , as.character(threads ),
10295 " --tblout" , .to_container(hmmer_output , mount_host , mount_cont ),
10396 db_cont_path ,
10497 .to_container(hmmer_input , mount_host , mount_cont )
10598 )
10699
107- message(" Running hmmer via Docker..." )
100+ message(" Running hmmscan via Docker..." )
108101 output <- tryCatch(
109102 {
110- system2(" apptainer " , args = cmd_args , stdout = TRUE , stderr = TRUE )
103+ system2(" docker " , args = cmd_args , stdout = TRUE , stderr = TRUE )
111104 },
112105 error = function (e ) {
113- stop(" hmmer execution failed: " , e $ message )
106+ stop(" hmmscan execution failed: " , e $ message )
114107 }
115108 )
116109
117110 if (! file.exists(hmmer_output )) {
118- stop(" hmmer failed: output file not found. Check stderr:\n " , paste(output , collapse = " \n " ))
111+ stop(" hmmscan failed: output file not found. Check stderr:\n " , paste(output , collapse = " \n " ))
119112 }
120113
121- message(" hmmer completed successfully." )
114+ message(" hmmscan completed successfully." )
122115
123- hmmer_tbl <- .parse_hmmer_output(hmmer_output )
124-
125- hmmer_tbl <- hmmer_tbl | >
116+ hmmer_tbl <- parse_hmmer_output(hmmer_output ) | >
126117 dplyr :: select(" name" , " query_name" , " description" )
127118
128- hmmer_tbl_filename <- file.path(dirname(hmmer_output ), paste0(tools :: file_path_sans_ext(basename(hmmer_output )), " .parquet" ))
119+ hmmer_tbl_filename <- file.path(
120+ dirname(hmmer_output ),
121+ paste0(tools :: file_path_sans_ext(basename(hmmer_output )), " .parquet" )
122+ )
129123
130- hmmer_tbl | >
131- write_compressed_parquet(hmmer_tbl_filename )
124+ .write_compressed_parquet(hmmer_tbl , hmmer_tbl_filename )
132125
133- return ( hmmer_tbl_filename )
126+ hmmer_tbl_filename
134127 }
135128
136- results <- furrr :: future_pwalk(
137- job_list ,
138- .runHmmerJob ,
139- .progress = TRUE
129+ parquet_files <- .with_future_plan(
130+ future :: multisession(workers = n_workers ),
131+ furrr :: future_pmap_chr(job_list , .runHmmerJob , .progress = TRUE )
140132 )
141- parquet_files <- results | >
142- dplyr :: mutate(parquet = paste0(JOB_NAME , " .parquet" )) | >
143- dplyr :: pull(parquet )
144133
145134 final_parquet <- file.path(output_path , paste0(" protein_" , database , " .parquet" ))
146135
147- dataset <- arrow :: open_dataset(
148- file.path(output_path , parquet_files ),
149- format = " parquet"
150- )
151-
152- arrow :: write_parquet(
153- dataset ,
154- file.path(output_path , paste0(" protein_" , database , " .parquet" ))
155- )
156-
157- message(" Combined parquet written" )
136+ purrr :: map(parquet_files , arrow :: read_parquet ) | >
137+ dplyr :: bind_rows() | >
138+ .write_compressed_parquet(final_parquet )
158139
159- # arrow::read_parquet("/scratch/alpine/aghosh5@xsede.org/AMR/data/Campylobacter_jejuni/protein_COG_count. parquet") |> DBI::dbWriteTable(conn=con, name="protein_COG_count ")
140+ message( " Combined parquet written. " )
160141
161142 arrow :: read_parquet(final_parquet ) | >
162143 DBI :: dbWriteTable(conn = con , name = tools :: file_path_sans_ext(basename(final_parquet )), overwrite = TRUE )
163144}
164145
165146
166- # ' Read a file created as HMMER output
167- # ' modified the rhmmer
168- # ' @param file Filename
169- # ' @return data.frame
170- # ' @export
147+ # ' Parse HMMER tabular output into a tibble
148+ # '
149+ # ' Reads a HMMER `--tblout` file and returns a tidy tibble with one row per
150+ # ' target-query hit. Comment lines are stripped and the free-text description
151+ # ' field is reunited from the remaining whitespace-delimited columns.
152+ # '
153+ # ' @param file Path to a HMMER `.tbl` output file produced with `--tblout`.
154+ # '
155+ # ' @return A tibble with 19 columns matching the HMMER per-sequence hit table:
156+ # ' `name`, `accession`, `query_name`, `query_accession`, `sequence_evalue`,
157+ # ' `sequence_score`, `sequence_bias`, `best_evalue`, `best_score`,
158+ # ' `best_bias`, `number_exp`, `number_reg`, `number_clu`, `number_ov`,
159+ # ' `number_env`, `number_dom`, `number_rep`, `number_inc`, `description`.
160+ # '
161+ # ' @references Adapted from the rhmmer package
162+ # ' (<https://github.com/arendsee/rhmmer>).
171163# '
172- .parse_hmmer_output <- function (file ) {
164+ # ' @examples
165+ # ' \dontrun{
166+ # ' hits <- parse_hmmer_output("results/Ecoli/protein_chunk_01_COG.tbl")
167+ # ' hits |> dplyr::filter(sequence_evalue < 1e-5)
168+ # ' }
169+ # '
170+ # ' @export
171+ parse_hmmer_output <- function (file ) {
173172 col_types <- readr :: cols(
174173 name = readr :: col_character(),
175174 accession = readr :: col_character(),
@@ -180,7 +179,7 @@ write_compressed_parquet <- function(df, path) {
180179 sequence_bias = readr :: col_double(),
181180 best_evalue = readr :: col_double(),
182181 best_score = readr :: col_double(),
183- best_bis = readr :: col_double(),
182+ best_bias = readr :: col_double(),
184183 number_exp = readr :: col_double(),
185184 number_reg = readr :: col_integer(),
186185 number_clu = readr :: col_integer(),
@@ -225,21 +224,43 @@ write_compressed_parquet <- function(df, path) {
225224 table
226225}
227226
227+ # ' Map HMMER protein annotations to genome-level count matrix and load into DuckDB
228+ # '
229+ # ' Reads a Parquet file of HMMER hits (produced by [.runHMMER()]), joins the
230+ # ' annotations to the protein-cluster count matrix already in DuckDB, aggregates
231+ # ' counts per genome and annotation, and writes the result both as a Parquet file
232+ # ' and as a new table in the DuckDB database.
233+ # '
234+ # ' @param annotated_parquet Path to the combined HMMER results Parquet file
235+ # ' (e.g. `"results/Ecoli/protein_COG.parquet"`). The filename stem is used as
236+ # ' the table name in DuckDB.
237+ # ' @param duckdb_path Path to the per-selection DuckDB database containing a
238+ # ' `protein_count` table (created by [CDHIT2duckdb()]).
239+ # '
240+ # ' @return Invisibly returns the path to the written count Parquet file.
241+ # '
242+ # ' @seealso [CDHIT2duckdb()], [runDataProcessing()]
243+ # '
244+ # ' @examples
245+ # ' \dontrun{
246+ # ' proteinAnnotations2Duckdb(
247+ # ' annotated_parquet = "results/Ecoli/protein_COG.parquet",
248+ # ' duckdb_path = "data/Ecoli/Eco.duckdb"
249+ # ' )
250+ # ' }
251+ # '
252+ # ' @export
228253proteinAnnotations2Duckdb <- function (annotated_parquet , duckdb_path ) {
229254 annotated_parquet <- .docker_path(annotated_parquet )
255+ duckdb_path <- .docker_path(duckdb_path )
230256
231- # robust database name
257+ # derive table name from the annotation filename stem
232258 database <- tools :: file_path_sans_ext(basename(annotated_parquet ))
233259
234- duckdb_path <- .docker_path(duckdb_path )
235-
236260 con <- DBI :: dbConnect(duckdb :: duckdb(), duckdb_path )
237261 on.exit(try(DBI :: dbDisconnect(con , shutdown = FALSE ), silent = TRUE ), add = TRUE )
238262
239- protein_count <- DBI :: dbReadTable(con , " protein_count" ) | >
240- tibble :: as_tibble()
241-
242- protein_long <- protein_count | >
263+ protein_long <- DBI :: dbReadTable(con , " protein_count" ) | >
243264 tibble :: as_tibble() | >
244265 tidyr :: pivot_longer(
245266 cols = - genome_id ,
@@ -248,34 +269,32 @@ proteinAnnotations2Duckdb <- function(annotated_parquet, duckdb_path) {
248269 ) | >
249270 dplyr :: filter(count > 0 )
250271
251- # protein_COG <- arrow::read_parquet("data/Campylobacter_jejuni/protein_COG.parquet")
252- Annotation <- arrow :: read_parquet(annotated_parquet )
272+ annotation <- arrow :: read_parquet(annotated_parquet )
253273
254- protein_long_annot <- protein_long | >
274+ genome_annot_matrix <- protein_long | >
275+ # protein IDs are stored with "." separator in DuckDB but "|" in HMMER output
255276 dplyr :: mutate(query_name = stringr :: str_replace(query_name , " ^fig\\ ." , " fig|" )) | >
256277 dplyr :: inner_join(
257- Annotation | >
258- dplyr :: select(name , query_name ),
278+ dplyr :: select(annotation , name , query_name ),
259279 by = " query_name"
260- )
261- genome_annot_counts <- protein_long_annot | >
280+ ) | >
262281 dplyr :: group_by(genome_id , name ) | >
263- dplyr :: summarise(count = sum(count ), .groups = " drop" )
264-
265- genome_annot_matrix <- genome_annot_counts | >
282+ dplyr :: summarise(count = sum(count ), .groups = " drop" ) | >
266283 tidyr :: pivot_wider(
267284 names_from = name ,
268285 values_from = count ,
269286 values_fill = 0
270287 )
271288
272- arrow :: write_parquet(
273- genome_annot_matrix ,
274- file.path(dirname(duckdb_path ), paste0(database , " _count" , " .parquet" ))
275- )
289+ count_path <- file.path(dirname(duckdb_path ), paste0(database , " _count.parquet" ))
290+ arrow :: write_parquet(genome_annot_matrix , count_path )
276291
277- count_path <- file.path(dirname(duckdb_path ), paste0(database , " _count" , " .parquet" ))
292+ DBI :: dbWriteTable(
293+ conn = con ,
294+ name = tools :: file_path_sans_ext(basename(count_path )),
295+ value = genome_annot_matrix ,
296+ overwrite = TRUE
297+ )
278298
279- arrow :: read_parquet(count_path ) | >
280- DBI :: dbWriteTable(conn = con , name = tools :: file_path_sans_ext(basename(count_path )), overwrite = TRUE )
299+ invisible (count_path )
281300}
0 commit comments