Vai al contenuto

Pattern Essenziali di Scripting in Nextflow

Traduzione assistita da IA - scopri di più e suggerisci miglioramenti

Nextflow è un linguaggio di programmazione che gira sulla Java Virtual Machine. Sebbene Nextflow sia costruito su Groovy e ne condivida gran parte della sintassi, Nextflow è molto più di un semplice "Groovy con estensioni" -- è un linguaggio autonomo con una sintassi e una libreria standard completamente specificate.

Si può scrivere molto codice Nextflow senza andare oltre la sintassi di base per variabili, map e liste. La maggior parte dei tutorial Nextflow si concentra sull'orchestrazione del flusso di lavoro (canali, processi e flusso dei dati), e si può arrivare sorprendentemente lontano con solo quello.

Tuttavia, quando si ha bisogno di manipolare dati, analizzare nomi di file complessi, implementare logica condizionale o costruire flussi di lavoro robusti per la produzione, è utile pensare a due aspetti distinti del codice: dataflow (canali, operatori, processi e flussi di lavoro) e scripting (il codice all'interno di closure, funzioni e script di processo). Sebbene questa distinzione sia in qualche modo arbitraria—è tutto codice Nextflow—fornisce un modello mentale utile per capire quando si sta orchestrando la pipeline rispetto a quando si stanno manipolando i dati. Padroneggiare entrambi migliora notevolmente la capacità di scrivere flussi di lavoro chiari e manutenibili.

Obiettivi di apprendimento

Questa side quest vi porta in un percorso pratico dai concetti di base ai pattern pronti per la produzione. Trasformeremo un semplice flusso di lavoro di lettura CSV in una sofisticata pipeline bioinformatica, facendola evolvere passo dopo passo attraverso sfide realistiche:

  • Comprendere i confini: Distinguere tra operazioni di dataflow e scripting, e capire come lavorano insieme
  • Manipolazione dei dati: Estrarre, trasformare e selezionare sottoinsiemi di map e collezioni usando operatori potenti
  • Elaborazione di stringhe: Analizzare schemi di denominazione di file complessi con pattern regex e padroneggiare l'interpolazione di variabili
  • Funzioni riutilizzabili: Estrarre logica complessa in funzioni con nome per flussi di lavoro più puliti e manutenibili
  • Logica dinamica: Costruire processi che si adattano a diversi tipi di input e usare closure per l'allocazione dinamica delle risorse
  • Routing condizionale: Instradare intelligentemente i campioni attraverso diversi processi in base alle loro caratteristiche di metadati
  • Operazioni sicure: Gestire i dati mancanti con operatori null-safe e validare gli input con messaggi di errore chiari
  • Handler basati sulla configurazione: Usare gli handler degli eventi del flusso di lavoro per logging, notifiche e gestione del ciclo di vita

Prerequisiti

Prima di affrontare questa side quest, dovreste:

  • Aver completato il tutorial Hello Nextflow o un corso equivalente per principianti.
  • Essere a proprio agio con i concetti e i meccanismi di base di Nextflow (processi, canali, operatori, lavorare con file e metadati)
  • Avere una familiarità di base con i costrutti di programmazione comuni (variabili, map, liste)

Questo tutorial spiegherà i concetti di programmazione man mano che li incontriamo, quindi non è necessaria una vasta esperienza di programmazione. Inizieremo con i concetti fondamentali e arriveremo ai pattern avanzati.


0. Iniziamo

Aprire il codespace di formazione

Se non lo avete ancora fatto, assicuratevi di aprire l'ambiente di formazione come descritto nella sezione Configurazione dell'ambiente.

Open in GitHub Codespaces

Spostarsi nella directory del progetto

Spostiamoci nella directory dove si trovano i file per questo tutorial.

cd side-quests/essential_scripting_patterns

Esaminare i materiali

Troverete un file principale del flusso di lavoro e una directory data contenente file di dati di esempio.

Directory contents
.
├── collect.nf
├── data
│   ├── samples.csv
│   └── sequences
│       ├── SAMPLE_001_S1_L001_R1_001.fastq
│       ├── SAMPLE_002_S2_L001_R1_001.fastq
│       └── SAMPLE_003_S3_L001_R1_001.fastq
├── main.nf
├── modules
│   ├── fastp.nf
│   ├── generate_report.nf
│   └── trimgalore.nf
└── nextflow.config

Il nostro CSV di campioni contiene informazioni sui campioni biologici che necessitano di elaborazioni diverse in base alle loro caratteristiche:

samples.csv
sample_id,organism,tissue_type,sequencing_depth,file_path,quality_score
SAMPLE_001,human,liver,30000000,data/sequences/SAMPLE_001_S1_L001_R1_001.fastq,38.5
SAMPLE_002,mouse,brain,25000000,data/sequences/SAMPLE_002_S2_L001_R1_001.fastq,35.2
SAMPLE_003,human,kidney,45000000,data/sequences/SAMPLE_003_S3_L001_R1_001.fastq,42.1

Useremo questo dataset realistico per esplorare tecniche di programmazione pratiche che incontrerete nei veri flussi di lavoro bioinformatici.

Lista di controllo per la preparazione

Pensate di essere pronti a tuffarvi?

  • Comprendo l'obiettivo di questo corso e i suoi prerequisiti
  • Il mio codespace è attivo e funzionante
  • Ho impostato correttamente la mia directory di lavoro

Se riuscite a spuntare tutte le caselle, siete pronti per partire.


1. Dataflow vs Scripting: Comprendere i Confini

1.1. Identificare Cosa è Cosa

Quando si scrivono flussi di lavoro Nextflow, è importante distinguere tra dataflow (come i dati si muovono attraverso canali e processi) e scripting (il codice che manipola i dati e prende decisioni). Costruiamo un flusso di lavoro che dimostri come lavorano insieme.

1.1.1. Flusso di Lavoro Nextflow di Base

Iniziamo con un semplice flusso di lavoro che legge solo il file CSV (lo abbiamo già fatto per voi in main.nf):

main.nf
1
2
3
4
5
workflow {
    ch_samples = channel.fromPath("./data/samples.csv")
        .splitCsv(header: true)
        .view()
}

Il blocco workflow definisce la struttura della nostra pipeline, mentre channel.fromPath() crea un canale da un percorso di file. L'operatore .splitCsv() elabora il file CSV e converte ogni riga in una struttura dati map.

Eseguiamo questo flusso di lavoro per vedere i dati CSV grezzi:

nextflow run main.nf
Output del comando
Launching `main.nf` [marvelous_tuckerman] DSL2 - revision: 6113e05c17

[sample_id:SAMPLE_001, organism:human, tissue_type:liver, sequencing_depth:30000000, file_path:data/sequences/SAMPLE_001_S1_L001_R1_001.fastq, quality_score:38.5]
[sample_id:SAMPLE_002, organism:mouse, tissue_type:brain, sequencing_depth:25000000, file_path:data/sequences/SAMPLE_002_S2_L001_R1_001.fastq, quality_score:35.2]
[sample_id:SAMPLE_003, organism:human, tissue_type:kidney, sequencing_depth:45000000, file_path:data/sequences/SAMPLE_003_S3_L001_R1_001.fastq, quality_score:42.1]

1.1.2. Aggiungere l'Operatore Map

Ora aggiungeremo scripting per trasformare i dati, usando l'operatore .map() con cui probabilmente avete già familiarità. Questo operatore accetta una 'closure' dove possiamo scrivere codice per trasformare ogni elemento.

Nota

Una closure è un blocco di codice che può essere passato e eseguito in seguito. Pensatela come una funzione che definite inline. Le closure sono scritte con le parentesi graffe { } e possono accettare parametri. Sono fondamentali per il funzionamento degli operatori Nextflow e se scrivete Nextflow da un po', potreste già averle usate senza rendervene conto!

Ecco come appare quell'operazione map:

main.nf
2
3
4
5
6
7
    ch_samples = channel.fromPath("./data/samples.csv")
        .splitCsv(header: true)
        .map { row ->
            return row
        }
        .view()
main.nf
2
3
4
    ch_samples = channel.fromPath("./data/samples.csv")
        .splitCsv(header: true)
        .view()

Questa è la nostra prima closure - una funzione anonima che potete passare come argomento (simile alle lambda in Python o alle arrow function in JavaScript). Le closure sono essenziali per lavorare con gli operatori Nextflow.

La closure { row -> return row } accetta un parametro row (potrebbe avere qualsiasi nome: item, sample, ecc.).

Quando l'operatore .map() elabora ogni elemento del canale, passa quell'elemento alla vostra closure. Qui, row contiene una riga CSV alla volta.

Applicate questa modifica ed eseguite il flusso di lavoro:

nextflow run main.nf

Vedrete lo stesso output di prima, perché stiamo semplicemente restituendo l'input invariato. Questo conferma che l'operatore map funziona correttamente. Ora iniziamo a trasformare i dati.

1.1.3. Creare una Struttura Dati Map

Ora scriveremo logica di scripting all'interno della nostra closure per trasformare ogni riga di dati. È qui che elaboriamo i singoli elementi di dati piuttosto che orchestrare il flusso dei dati.

main.nf
    ch_samples = channel.fromPath("./data/samples.csv")
        .splitCsv(header: true)
        .map { row ->
            // Scripting per la trasformazione dei dati
            def sample_meta = [
                id: row.sample_id.toLowerCase(),
                organism: row.organism,
                tissue: row.tissue_type.replaceAll('_', ' ').toLowerCase(),
                depth: row.sequencing_depth.toInteger(),
                quality: row.quality_score.toDouble()
            ]
            return sample_meta
        }
        .view()
main.nf
2
3
4
5
6
7
    ch_samples = channel.fromPath("./data/samples.csv")
        .splitCsv(header: true)
        .map { row ->
            return row
        }
        .view()

La map sample_meta è una struttura dati chiave-valore (come i dizionari in Python, gli oggetti in JavaScript, o gli hash in Ruby) che memorizza informazioni correlate: ID campione, organismo, tipo di tessuto, profondità di sequenziamento e punteggio di qualità.

Usiamo metodi di manipolazione delle stringhe come .toLowerCase() e .replaceAll() per pulire i nostri dati, e metodi di conversione del tipo come .toInteger() e .toDouble() per convertire i dati stringa del CSV nei tipi numerici appropriati.

Applicate questa modifica ed eseguite il flusso di lavoro:

nextflow run main.nf
Output del comando
[id:sample_001, organism:human, tissue:liver, depth:30000000, quality:38.5]
[id:sample_002, organism:mouse, tissue:brain, depth:25000000, quality:35.2]
[id:sample_003, organism:human, tissue:kidney, depth:45000000, quality:42.1]

1.1.4. Aggiungere Logica Condizionale

Aggiungiamo ora più scripting - questa volta usando un operatore ternario per prendere decisioni basate sui valori dei dati.

Apportate la seguente modifica:

main.nf
    ch_samples = channel.fromPath("./data/samples.csv")
        .splitCsv(header: true)
        .map { row ->
            def sample_meta = [
                id: row.sample_id.toLowerCase(),
                organism: row.organism,
                tissue: row.tissue_type.replaceAll('_', ' ').toLowerCase(),
                depth: row.sequencing_depth.toInteger(),
                quality: row.quality_score.toDouble()
            ]
            def priority = sample_meta.quality > 40 ? 'high' : 'normal'
            return sample_meta + [priority: priority]
        }
        .view()
main.nf
    ch_samples = channel.fromPath("./data/samples.csv")
        .splitCsv(header: true)
        .map { row ->
            def sample_meta = [
                id: row.sample_id.toLowerCase(),
                organism: row.organism,
                tissue: row.tissue_type.replaceAll('_', ' ').toLowerCase(),
                depth: row.sequencing_depth.toInteger(),
                quality: row.quality_score.toDouble()
            ]
            return sample_meta
        }
        .view()

L'operatore ternario è una scorciatoia per un'istruzione if/else che segue il pattern condizione ? valore_se_vero : valore_se_falso. Questa riga significa: "Se la qualità è maggiore di 40, usa 'high', altrimenti usa 'normal'". Il suo cugino, l'operatore Elvis (?:), fornisce valori predefiniti quando qualcosa è null o vuoto - esploreremo quel pattern più avanti in questo tutorial.

L'operatore di addizione per map + crea una nuova map piuttosto che modificare quella esistente. Questa riga crea una nuova map che contiene tutte le coppie chiave-valore di sample_meta più la nuova chiave priority.

Nota

Non modificate mai le map passate nelle closure - create sempre nuove map usando + (ad esempio). In Nextflow, gli stessi dati spesso fluiscono attraverso più operazioni simultaneamente. Modificare una map in-place può causare effetti collaterali imprevedibili quando altre operazioni fanno riferimento allo stesso oggetto. Creare nuove map garantisce che ogni operazione abbia la propria copia pulita.

Eseguite il flusso di lavoro modificato:

nextflow run main.nf
Output del comando
[id:sample_001, organism:human, tissue:liver, depth:30000000, quality:38.5, priority:normal]
[id:sample_002, organism:mouse, tissue:brain, depth:25000000, quality:35.2, priority:normal]
[id:sample_003, organism:human, tissue:kidney, depth:45000000, quality:42.1, priority:high]

Abbiamo aggiunto con successo logica condizionale per arricchire i nostri metadati con un livello di priorità basato sui punteggi di qualità.

1.1.5. Selezionare Sottoinsiemi di Map con .subMap()

Mentre l'operatore + aggiunge chiavi a una map, a volte è necessario fare il contrario - estrarre solo chiavi specifiche. Il metodo .subMap() è perfetto per questo.

Aggiungiamo una riga per creare una versione semplificata dei nostri metadati che contenga solo i campi di identificazione:

main.nf
    ch_samples = channel.fromPath("./data/samples.csv")
        .splitCsv(header: true)
        .map { row ->
            // Scripting per la trasformazione dei dati
            def sample_meta = [
                id: row.sample_id.toLowerCase(),
                organism: row.organism,
                tissue: row.tissue_type.replaceAll('_', ' ').toLowerCase(),
                depth: row.sequencing_depth.toInteger(),
                quality: row.quality_score.toDouble()
            ]
            def id_only = sample_meta.subMap(['id', 'organism', 'tissue'])
            println "ID fields only: ${id_only}"

            def priority = sample_meta.quality > 40 ? 'high' : 'normal'
            return sample_meta + [priority: priority]
        }
        .view()
main.nf
    ch_samples = channel.fromPath("./data/samples.csv")
        .splitCsv(header: true)
        .map { row ->
            // Scripting per la trasformazione dei dati
            def sample_meta = [
                id: row.sample_id.toLowerCase(),
                organism: row.organism,
                tissue: row.tissue_type.replaceAll('_', ' ').toLowerCase(),
                depth: row.sequencing_depth.toInteger(),
                quality: row.quality_score.toDouble()
            ]
            def priority = sample_meta.quality > 40 ? 'high' : 'normal'
            return sample_meta + [priority: priority]
        }
        .view()

Eseguite il flusso di lavoro modificato:

nextflow run main.nf
Output del comando
N E X T F L O W   ~  version 25.10.2

Launching `main.nf` [peaceful_cori] DSL2 - revision: 4cc4a8340f

ID fields only: [id:sample_001, organism:human, tissue:liver]
ID fields only: [id:sample_002, organism:mouse, tissue:brain]
ID fields only: [id:sample_003, organism:human, tissue:kidney]
[id:sample_001, organism:human, tissue:liver, depth:30000000, quality:38.5, priority:normal]
[id:sample_002, organism:mouse, tissue:brain, depth:25000000, quality:35.2, priority:normal]
[id:sample_003, organism:human, tissue:kidney, depth:45000000, quality:42.1, priority:high]

Questo mostra sia i metadati completi visualizzati dall'operazione view() sia il sottoinsieme estratto che abbiamo stampato con println.

Il metodo .subMap() accetta una lista di chiavi e restituisce una nuova map contenente solo quelle chiavi. Se una chiave non esiste nella map originale, semplicemente non viene inclusa nel risultato.

Questo è particolarmente utile quando è necessario creare diverse versioni di metadati per diversi processi - alcuni potrebbero aver bisogno di metadati completi mentre altri necessitano solo di campi di identificazione minimi.

Ora rimuovete quelle istruzioni println per ripristinare il flusso di lavoro allo stato precedente, poiché non ne avremo bisogno in seguito.

Suggerimento: Riepilogo delle Operazioni sulle Map

  • Aggiungere chiavi: map1 + [new_key: value] - Crea una nuova map con chiavi aggiuntive
  • Estrarre chiavi: map1.subMap(['key1', 'key2']) - Crea una nuova map con solo le chiavi specificate
  • Entrambe le operazioni creano nuove map - Le map originali rimangono invariate

1.1.6. Combinare Map e Restituire Risultati

Finora, abbiamo restituito solo quella che la comunità Nextflow chiama la 'meta map', e abbiamo ignorato i file a cui quei metadati si riferiscono. Ma se state scrivendo flussi di lavoro Nextflow, probabilmente volete fare qualcosa con quei file.

Produciamo una struttura di canale composta da una tupla di 2 elementi: la map dei metadati arricchita e il percorso del file corrispondente. Questo è un pattern comune in Nextflow per passare dati ai processi.

main.nf
    ch_samples = channel.fromPath("./data/samples.csv")
        .splitCsv(header: true)
        .map { row ->
            def sample_meta = [
                id: row.sample_id.toLowerCase(),
                organism: row.organism,
                tissue: row.tissue_type.replaceAll('_', ' ').toLowerCase(),
                depth: row.sequencing_depth.toInteger(),
                quality: row.quality_score.toDouble()
            ]
            def priority = sample_meta.quality > 40 ? 'high' : 'normal'
            return tuple( sample_meta + [priority: priority], file(row.file_path) )
        }
        .view()
main.nf
    ch_samples = channel.fromPath("./data/samples.csv")
        .splitCsv(header: true)
        .map { row ->
            def sample_meta = [
                id: row.sample_id.toLowerCase(),
                organism: row.organism,
                tissue: row.tissue_type.replaceAll('_', ' ').toLowerCase(),
                depth: row.sequencing_depth.toInteger(),
                quality: row.quality_score.toDouble()
            ]
            def priority = sample_meta.quality > 40 ? 'high' : 'normal'
            return sample_meta + [priority: priority]
        }
        .view()

Applicate questa modifica ed eseguite il flusso di lavoro:

nextflow run main.nf
Output del comando
[[id:sample_001, organism:human, tissue:liver, depth:30000000, quality:38.5, priority:normal], /workspaces/training/side-quests/essential_scripting_patterns/data/sequences/SAMPLE_001_S1_L001_R1_001.fastq]
[[id:sample_002, organism:mouse, tissue:brain, depth:25000000, quality:35.2, priority:normal], /workspaces/training/side-quests/essential_scripting_patterns/data/sequences/SAMPLE_002_S2_L001_R1_001.fastq]
[[id:sample_003, organism:human, tissue:kidney, depth:45000000, quality:42.1, priority:high], /workspaces/training/side-quests/essential_scripting_patterns/data/sequences/SAMPLE_003_S3_L001_R1_001.fastq]

Questa struttura a tupla [meta, file] è un pattern comune in Nextflow per passare sia i metadati che i file associati ai processi.

Nota

Map e Metadati: Le map sono fondamentali per lavorare con i metadati in Nextflow. Per una spiegazione più dettagliata su come lavorare con le map dei metadati, consultate la side quest Lavorare con i metadati.

Il nostro flusso di lavoro dimostra il pattern fondamentale: le operazioni di dataflow (workflow, channel.fromPath(), .splitCsv(), .map(), .view()) orchestrano come i dati si muovono attraverso la pipeline, mentre lo scripting (map [key: value], metodi sulle stringhe, conversioni di tipo, operatori ternari) all'interno della closure .map() gestisce la trasformazione dei singoli elementi di dati.

1.2. Comprendere i Diversi Tipi: Channel vs List

Fin qui tutto bene, riusciamo a distinguere tra operazioni di dataflow e scripting. Ma cosa succede quando lo stesso nome di metodo esiste in entrambi i contesti?

Un esempio perfetto è il metodo collect, che esiste sia per i tipi di canale che per i tipi List nella libreria standard Nextflow. Il metodo collect() su una List trasforma ogni elemento, mentre l'operatore collect() su un canale raccoglie tutte le emissioni del canale in un canale a elemento singolo.

Dimostriamolo con alcuni dati di esempio, iniziando col rinfrescarci su cosa fa l'operatore collect() del canale. Date un'occhiata a collect.nf:

collect.nf
1
2
3
4
5
6
7
def sample_ids = ['sample_001', 'sample_002', 'sample_003']

// channel.collect() - raggruppa più emissioni del canale in una sola
ch_input = channel.fromList(sample_ids)
ch_input.view { sample -> "Individual channel item: ${sample}" }
ch_collected = ch_input.collect()
ch_collected.view { list -> "channel.collect() result: ${list} (${list.size()} items grouped into 1)" }

Passi:

  • Definire una List di ID campione
  • Creare un canale con fromList() che emette ogni ID campione separatamente
  • Stampare ogni elemento con view() mentre scorre
  • Raccogliere tutti gli elementi in una singola lista con l'operatore collect() del canale
  • Stampare il risultato raccolto (elemento singolo contenente tutti gli ID campione) con un secondo view()

Abbiamo cambiato la struttura del canale, ma non abbiamo cambiato i dati stessi.

Eseguiamo il flusso di lavoro per confermarlo:

nextflow run collect.nf
Output del comando
N E X T F L O W   ~  version 25.10.2

Launching `collect.nf` [loving_mendel] DSL2 - revision: e8d054a46e

Individual channel item: sample_001
Individual channel item: sample_002
Individual channel item: sample_003
channel.collect() result: [sample_001, sample_002, sample_003] (3 items grouped into 1)

view() restituisce un output per ogni emissione del canale, quindi sappiamo che questo singolo output contiene tutti e 3 gli elementi originali raggruppati in una lista.

Ora vediamo il metodo collect su una List in azione. Modificate collect.nf per applicare il metodo collect della List alla lista originale di ID campione:

main.nf
def sample_ids = ['sample_001', 'sample_002', 'sample_003']

// channel.collect() - raggruppa più emissioni del canale in una sola
ch_input = channel.fromList(sample_ids)
ch_input.view { sample -> "Individual channel item: ${sample}" }
ch_collected = ch_input.collect()
ch_collected.view { list -> "channel.collect() result: ${list} (${list.size()} items grouped into 1)" }

// List.collect() - trasforma ogni elemento, preserva la struttura
def formatted_ids = sample_ids.collect { id ->
    id.toUpperCase().replace('SAMPLE_', 'SPECIMEN_')
}
println "List.collect() result: ${formatted_ids} (${sample_ids.size()} items transformed into ${formatted_ids.size()})"
main.nf
1
2
3
4
5
6
7
def sample_ids = ['sample_001', 'sample_002', 'sample_003']

// channel.collect() - raggruppa più emissioni del canale in una sola
ch_input = channel.fromList(sample_ids)
ch_input.view { sample -> "Individual channel item: ${sample}" }
ch_collected = ch_input.collect()
ch_collected.view { list -> "channel.collect() result: ${list} (${list.size()} items grouped into 1)" }

In questo nuovo frammento:

  • Definiamo una nuova variabile formatted_ids che usa il metodo collect della List per trasformare ogni ID campione nella lista originale
  • Stampiamo il risultato usando println

Eseguite il flusso di lavoro modificato:

nextflow run collect.nf
Output del comando
N E X T F L O W   ~  version 25.10.2

Launching `collect.nf` [cheeky_stonebraker] DSL2 - revision: 2d5039fb47

List.collect() result: [SPECIMEN_001, SPECIMEN_002, SPECIMEN_003] (3 items transformed into 3)
Individual channel item: sample_001
Individual channel item: sample_002
Individual channel item: sample_003
channel.collect() result: [sample_001, sample_002, sample_003] (3 items grouped into 1)

Questa volta, NON abbiamo cambiato la struttura dei dati, abbiamo ancora 3 elementi nella lista, ma ABBIAMO trasformato ogni elemento usando il metodo collect della List per produrre una nuova lista con valori modificati. Questo è simile all'uso dell'operatore map su un canale, ma opera su una struttura dati List piuttosto che su un canale.

collect è un caso estremo che usiamo qui per sottolineare un punto. La lezione chiave è che quando si scrivono flussi di lavoro, bisogna sempre distinguere tra strutture dati (List, Map, ecc.) e canali (costrutti di dataflow). Le operazioni possono condividere nomi ma comportarsi in modo completamente diverso a seconda del tipo su cui vengono chiamate.

1.3. L'Operatore Spread (*.) - Scorciatoia per l'Estrazione di Proprietà

Correlato al metodo collect della List è l'operatore spread (*.), che fornisce un modo conciso per estrarre proprietà dalle collezioni. È essenzialmente zucchero sintattico per un pattern collect comune.

Aggiungiamo una dimostrazione al nostro file collect.nf:

collect.nf
def sample_ids = ['sample_001', 'sample_002', 'sample_003']

// channel.collect() - raggruppa più emissioni del canale in una sola
ch_input = channel.fromList(sample_ids)
ch_input.view { sample -> "Individual channel item: ${sample}" }
ch_collected = ch_input.collect()
ch_collected.view { list -> "channel.collect() result: ${list} (${list.size()} items grouped into 1)" }

// List.collect() - trasforma ogni elemento, preserva la struttura
def formatted_ids = sample_ids.collect { id ->
    id.toUpperCase().replace('SAMPLE_', 'SPECIMEN_')
}
println "List.collect() result: ${formatted_ids} (${sample_ids.size()} items transformed into ${formatted_ids.size()})"

// Operatore spread - accesso conciso alle proprietà
def sample_data = [[id: 's1', quality: 38.5], [id: 's2', quality: 42.1], [id: 's3', quality: 35.2]]
def all_ids = sample_data*.id
println "Spread operator result: ${all_ids}"
collect.nf
def sample_ids = ['sample_001', 'sample_002', 'sample_003']

// channel.collect() - raggruppa più emissioni del canale in una sola
ch_input = channel.fromList(sample_ids)
ch_input.view { sample -> "Individual channel item: ${sample}" }
ch_collected = ch_input.collect()
ch_collected.view { list -> "channel.collect() result: ${list} (${list.size()} items grouped into 1)" }

// List.collect() - trasforma ogni elemento, preserva la struttura
def formatted_ids = sample_ids.collect { id ->
    id.toUpperCase().replace('SAMPLE_', 'SPECIMEN_')
}
println "List.collect() result: ${formatted_ids} (${sample_ids.size()} items transformed into ${formatted_ids.size()})"

Eseguite il flusso di lavoro aggiornato:

Test spread operator
nextflow run collect.nf
Output del comando
N E X T F L O W   ~  version 25.10.2

Launching `collect.nf` [cranky_galileo] DSL2 - revision: 5f3c8b2a91

List.collect() result: [SPECIMEN_001, SPECIMEN_002, SPECIMEN_003] (3 items transformed into 3)
Spread operator result: [s1, s2, s3]
Individual channel item: sample_001
Individual channel item: sample_002
Individual channel item: sample_003
channel.collect() result: [sample_001, sample_002, sample_003] (3 items grouped into 1)

L'operatore spread *. è una scorciatoia per un pattern collect comune:

// Questi sono equivalenti:
def ids = samples*.id
def ids = samples.collect { it.id }

// Funziona anche con le chiamate ai metodi:
def names = files*.getName()
def names = files.collect { it.getName() }

L'operatore spread è particolarmente utile quando è necessario estrarre una singola proprietà da una lista di oggetti - è più leggibile che scrivere per intero la closure collect.

Suggerimento: Quando Usare Spread vs Collect

  • Usate spread (*.) per l'accesso semplice alle proprietà: samples*.id, files*.name
  • Usate collect per trasformazioni o logica complessa: samples.collect { it.id.toUpperCase() }, samples.collect { [it.id, it.quality > 40] }

Takeaway

In questa sezione, avete imparato:

  • Dataflow vs scripting: Gli operatori dei canali orchestrano come i dati fluiscono attraverso la pipeline, mentre lo scripting trasforma i singoli elementi di dati
  • Comprendere i tipi: Lo stesso nome di metodo (come collect) può comportarsi diversamente a seconda del tipo su cui viene chiamato (Channel vs List)
  • Il contesto è importante: Siate sempre consapevoli se state lavorando con canali (dataflow) o strutture dati (scripting)

Comprendere questi confini è essenziale per il debug, la documentazione e la scrittura di flussi di lavoro manutenibili.

Successivamente ci addentreremo nelle capacità di elaborazione delle stringhe, essenziali per gestire dati del mondo reale.


2. Elaborazione di Stringhe e Generazione Dinamica di Script

Padroneggiare l'elaborazione delle stringhe separa i flussi di lavoro fragili dalle pipeline robuste. Questa sezione tratta l'analisi di nomi di file complessi, la generazione dinamica di script e l'interpolazione di variabili.

2.1. Pattern Matching e Espressioni Regolari

I file bioinformatici spesso hanno convenzioni di denominazione complesse che codificano metadati. Estraiamoli automaticamente usando il pattern matching con espressioni regolari.

Torneremo al nostro flusso di lavoro main.nf e aggiungeremo logica di pattern matching per estrarre informazioni aggiuntive sui campioni dai nomi dei file. I file FASTQ nel nostro dataset seguono le convenzioni di denominazione in stile Illumina con nomi come SAMPLE_001_S1_L001_R1_001.fastq.gz. Potrebbero sembrare criptici, ma in realtà codificano metadati utili come l'ID campione, il numero di lane e la direzione di lettura. Useremo le capacità regex per analizzare questi nomi.

Apportate la seguente modifica al vostro flusso di lavoro main.nf esistente:

main.nf
        .map { row ->
            // Scripting per la trasformazione dei dati
            def sample_meta = [
                id: row.sample_id.toLowerCase(),
                organism: row.organism,
                tissue: row.tissue_type.replaceAll('_', ' ').toLowerCase(),
                depth: row.sequencing_depth.toInteger(),
                quality: row.quality_score.toDouble()
            ]
            def fastq_path = file(row.file_path)

            def m = (fastq_path.name =~ /^(.+)_S(\d+)_L(\d{3})_(R[12])_(\d{3})\.fastq(?:\.gz)?$/)
            def file_meta = m ? [
                sample_num: m[0][2].toInteger(),
                lane: m[0][3],
                read: m[0][4],
                chunk: m[0][5]
            ] : [:]

            def priority = sample_meta.quality > 40 ? 'high' : 'normal'
            return tuple(sample_meta + file_meta + [priority: priority], fastq_path)
        }
main.nf
        .map { row ->
            // Scripting per la trasformazione dei dati
            def sample_meta = [
                id: row.sample_id.toLowerCase(),
                organism: row.organism,
                tissue: row.tissue_type.replaceAll('_', ' ').toLowerCase(),
                depth: row.sequencing_depth.toInteger(),
                quality: row.quality_score.toDouble()
            ]
            def priority = sample_meta.quality > 40 ? 'high' : 'normal'
            return tuple(sample_meta + [priority: priority], file(row.file_path))
        }

Questo dimostra i concetti chiave di elaborazione delle stringhe:

  1. Letterali di espressioni regolari usando la sintassi ~/pattern/ - questo crea un pattern regex senza dover fare l'escape dei backslash
  2. Pattern matching con l'operatore =~ - questo tenta di far corrispondere una stringa a un pattern regex
  3. Oggetti Matcher che catturano i gruppi con [0][1], [0][2], ecc. - [0] si riferisce all'intera corrispondenza, [1], [2], ecc. si riferiscono ai gruppi catturati tra parentesi

Analizziamo il pattern regex ^(.+)_S(\d+)_L(\d{3})_(R[12])_(\d{3})\.fastq(?:\.gz)?$:

Pattern Corrisponde a Cattura
^(.+) Nome campione dall'inizio Gruppo 1: nome campione
_S(\d+) Numero campione _S1, _S2, ecc. Gruppo 2: numero campione
_L(\d{3}) Numero lane _L001 Gruppo 3: lane (3 cifre)
_(R[12]) Direzione lettura _R1 o _R2 Gruppo 4: direzione lettura
_(\d{3}) Numero chunk _001 Gruppo 5: chunk (3 cifre)
\.fastq(?:\.gz)?$ Estensione file .fastq o .fastq.gz Non catturato (?: è non-catturante)

Questo analizza le convenzioni di denominazione in stile Illumina per estrarre automaticamente i metadati.

Eseguite il flusso di lavoro modificato:

Test pattern matching
nextflow run main.nf
Output del comando
N E X T F L O W   ~  version 25.10.2

Launching `main.nf` [clever_pauling] DSL2 - revision: 605d2058b4

[[id:sample_001, organism:human, tissue:liver, depth:30000000, quality:38.5, sample_num:1, lane:001, read:R1, chunk:001, priority:normal], /workspaces/training/side-quests/essential_scripting_patterns/data/sequences/SAMPLE_001_S1_L001_R1_001.fastq]
[[id:sample_002, organism:mouse, tissue:brain, depth:25000000, quality:35.2, sample_num:2, lane:001, read:R1, chunk:001, priority:normal], /workspaces/training/side-quests/essential_scripting_patterns/data/sequences/SAMPLE_002_S2_L001_R1_001.fastq]
[[id:sample_003, organism:human, tissue:kidney, depth:45000000, quality:42.1, sample_num:3, lane:001, read:R1, chunk:001, priority:high], /workspaces/training/side-quests/essential_scripting_patterns/data/sequences/SAMPLE_003_S3_L001_R1_001.fastq]

Questo mostra i metadati arricchiti dai nomi dei file.

2.2. Generazione Dinamica di Script nei Processi

I blocchi script dei processi sono essenzialmente stringhe multi-riga che vengono passate alla shell. Potete usare logica condizionale (if/else, operatori ternari) per generare dinamicamente diverse stringhe di script in base alle caratteristiche dell'input. Questo è essenziale per gestire tipi di input diversi—come letture di sequenziamento single-end vs paired-end—senza duplicare le definizioni dei processi.

Aggiungiamo un processo al nostro flusso di lavoro che dimostri questo pattern. Aprite modules/fastp.nf e date un'occhiata:

modules/fastp.nf
process FASTP {
    container 'community.wave.seqera.io/library/fastp:0.24.0--62c97b06e8447690'

    input:
    tuple val(meta), path(reads)

    output:
    tuple val(meta), path("*_trimmed*.fastq.gz"), emit: reads

    script:
    """
    fastp \\
        --in1 ${reads[0]} \\
        --in2 ${reads[1]} \\
        --out1 ${meta.id}_trimmed_R1.fastq.gz \\
        --out2 ${meta.id}_trimmed_R2.fastq.gz \\
        --json ${meta.id}.fastp.json \\
        --html ${meta.id}.fastp.html \\
        --thread $task.cpus
    """
}

Il processo accetta file FASTQ come input ed esegue lo strumento fastp per tagliare gli adattatori e filtrare le letture di bassa qualità. Purtroppo, chi ha scritto questo processo non ha previsto le letture single-end che abbiamo nel nostro dataset di esempio. Aggiungiamolo al nostro flusso di lavoro e vediamo cosa succede:

Prima, includete il modulo alla prima riga del vostro flusso di lavoro main.nf:

main.nf
include { FASTP } from './modules/fastp.nf'

Poi modificate il blocco workflow per connettere il canale ch_samples al processo FASTP:

main.nf
workflow {

    ch_samples = channel.fromPath("./data/samples.csv")
        .splitCsv(header: true)
        .map { row ->
            def sample_meta = [
                id: row.sample_id.toLowerCase(),
                organism: row.organism,
                tissue: row.tissue_type.replaceAll('_', ' ').toLowerCase(),
                depth: row.sequencing_depth.toInteger(),
                quality: row.quality_score.toDouble()
            ]
            def fastq_path = file(row.file_path)

            def m = (fastq_path.name =~ /^(.+)_S(\d+)_L(\d{3})_(R[12])_(\d{3})\.fastq(?:\.gz)?$/)
            def file_meta = m ? [
                sample_num: m[0][2].toInteger(),
                lane: m[0][3],
                read: m[0][4],
                chunk: m[0][5]
            ] : [:]

            def priority = sample_meta.quality > 40 ? 'high' : 'normal'
            return tuple(sample_meta + file_meta + [priority: priority], fastq_path)
        }

    ch_fastp = FASTP(ch_samples)
}
main.nf
workflow {

    ch_samples = channel.fromPath("./data/samples.csv")
        .splitCsv(header: true)
        .map { row ->
            def sample_meta = [
                id: row.sample_id.toLowerCase(),
                organism: row.organism,
                tissue: row.tissue_type.replaceAll('_', ' ').toLowerCase(),
                depth: row.sequencing_depth.toInteger(),
                quality: row.quality_score.toDouble()
            ]
            def fastq_path = file(row.file_path)

            def m = (fastq_path.name =~ /^(.+)_S(\d+)_L(\d{3})_(R[12])_(\d{3})\.fastq(?:\.gz)?$/)
            def file_meta = m ? [
                sample_num: m[0][2].toInteger(),
                lane: m[0][3],
                read: m[0][4],
                chunk: m[0][5]
            ] : [:]

            def priority = sample_meta.quality > 40 ? 'high' : 'normal'
            return [sample_meta + file_meta + [priority: priority], file(row.file_path)]
        }
        .view()
}

Eseguite questo flusso di lavoro modificato:

nextflow run main.nf
Output del comando
ERROR ~ Error executing process > 'FASTP (3)'

Caused by:
  Process `FASTP (3)` terminated with an error exit status (255)


Command executed:

  fastp \
      --in1 SAMPLE_003_S3_L001_R1_001.fastq \
      --in2 null \
      --out1 sample_003_trimmed_R1.fastq.gz \
      --out2 sample_003_trimmed_R2.fastq.gz \
      --json sample_003.fastp.json \
      --html sample_003.fastp.html \
      --thread 2

Command exit status:
  255

Command output:
  (empty)

Potete vedere che il processo sta cercando di eseguire fastp con un valore null per il secondo file di input, il che lo fa fallire. Questo perché il nostro dataset contiene letture single-end, ma il processo è codificato per aspettarsi letture paired-end (due file di input alla volta).

Risolvete questo aggiungendo logica condizionale al blocco script: del processo FASTP. Un'istruzione if/else controlla il numero di file di lettura e adatta il comando di conseguenza.

main.nf
    script:
    // Semplice rilevamento single-end vs paired-end
    def is_single = reads instanceof List ? reads.size() == 1 : true

    if (is_single) {
        def input_file = reads instanceof List ? reads[0] : reads
        """
        fastp \\
            --in1 ${input_file} \\
            --out1 ${meta.id}_trimmed.fastq.gz \\
            --json ${meta.id}.fastp.json \\
            --html ${meta.id}.fastp.html \\
            --thread $task.cpus
        """
    } else {
        """
        fastp \\
            --in1 ${reads[0]} \\
            --in2 ${reads[1]} \\
            --out1 ${meta.id}_trimmed_R1.fastq.gz \\
            --out2 ${meta.id}_trimmed_R2.fastq.gz \\
            --json ${meta.id}.fastp.json \\
            --html ${meta.id}.fastp.html \\
            --thread $task.cpus
        """
    }
main.nf
        script:
        """
        fastp \\
            --in1 ${reads[0]} \\
            --in2 ${reads[1]} \\
            --out1 ${meta.id}_trimmed_R1.fastq.gz \\
            --out2 ${meta.id}_trimmed_R2.fastq.gz \\
            --json ${meta.id}.fastp.json \\
            --html ${meta.id}.fastp.html \\
            --thread $task.cpus
        """
    }

Ora il flusso di lavoro può gestire sia le letture single-end che paired-end. La logica condizionale controlla il numero di file di input e costruisce il comando appropriato per fastp. Vediamo se funziona:

nextflow run main.nf
Output del comando
N E X T F L O W   ~  version 25.10.2

Launching `main.nf` [adoring_rosalind] DSL2 - revision: 04b1cd93e9

executor >  local (3)
[31/a8ad4d] process > FASTP (3) [100%] 3 of 3 ✔

Ottimo! Se controlliamo i comandi effettivamente eseguiti (personalizzate con il vostro hash dell'attività):

Check commands executed
cat work/31/a8ad4d95749e685a6d842d3007957f/.command.sh

Possiamo vedere che Nextflow ha correttamente scelto il comando giusto per le letture single-end:

.command.sh
#!/bin/bash -ue
fastp \
    --in1 SAMPLE_003_S3_L001_R1_001.fastq \
    --out1 sample_003_trimmed.fastq.gz \
    --json sample_003.fastp.json \
    --html sample_003.fastp.html \
    --thread 2

Un altro uso comune della logica di script dinamica si può vedere nel modulo Genomics di Nextflow for Science. In quel modulo, il processo GATK chiamato può accettare più file di input, ma ognuno deve essere preceduto da -V per formare una riga di comando corretta. Il processo usa lo scripting per trasformare una collezione di file di input (all_gvcfs) negli argomenti di comando corretti:

command line manipulation for GATK
1
2
3
4
5
6
7
8
    script:
    def gvcfs_line = all_gvcfs.collect { gvcf -> "-V ${gvcf}" }.join(' ')
    """
    gatk GenomicsDBImport \
        ${gvcfs_line} \
        -L ${interval_list} \
        --genomicsdb-workspace-path ${cohort_name}_gdb
    """

Questi pattern di utilizzo dello scripting nei blocchi script dei processi sono estremamente potenti e possono essere applicati in molti scenari - dalla gestione di tipi di input variabili alla costruzione di argomenti complessi della riga di comando da collezioni di file, rendendo i vostri processi veramente adattabili ai diversi requisiti dei dati del mondo reale.

2.3. Interpolazione di Variabili: Nextflow e Variabili Shell

Gli script dei processi mescolano variabili Nextflow, variabili shell e sostituzioni di comandi, ognuna con una sintassi di interpolazione diversa. Usare la sintassi sbagliata causa errori. Esploriamoli con un processo che crea un report di elaborazione.

Date un'occhiata al file del modulo modules/generate_report.nf:

modules/generate_report.nf
process GENERATE_REPORT {

    publishDir 'results/reports', mode: 'copy'

    input:
    tuple val(meta), path(reads)

    output:
    path "${meta.id}_report.txt"

    script:
    """
    echo "Processing ${reads}" > ${meta.id}_report.txt
    echo "Sample: ${meta.id}" >> ${meta.id}_report.txt
    """
}

Questo processo scrive un semplice report con l'ID campione e il nome del file. Ora eseguiamolo per vedere cosa succede quando dobbiamo mescolare diversi tipi di variabili.

Includete il processo nel vostro main.nf e aggiungetelo al flusso di lavoro:

main.nf
include { FASTP } from './modules/fastp.nf'
include { GENERATE_REPORT } from './modules/generate_report.nf'

workflow {
    ch_samples = channel.fromPath("./data/samples.csv")
        .splitCsv(header: true)
        .map { row ->
            def sample_meta = [
                id: row.sample_id.toLowerCase(),
                organism: row.organism,
                tissue: row.tissue_type.replaceAll('_', ' ').toLowerCase(),
                depth: row.sequencing_depth.toInteger(),
                quality: row.quality_score.toDouble()
            ]
            def fastq_path = file(row.file_path)

            def m = (fastq_path.name =~ /^(.+)_S(\d+)_L(\d{3})_(R[12])_(\d{3})\.fastq(?:\.gz)?$/)
            def file_meta = m ? [
                sample_num: m[0][2].toInteger(),
                lane: m[0][3],
                read: m[0][4],
                chunk: m[0][5]
            ] : [:]

            def priority = sample_meta.quality > 40 ? 'high' : 'normal'
            return tuple(sample_meta + file_meta + [priority: priority], fastq_path)
        }

    ch_fastp = FASTP(ch_samples)
    GENERATE_REPORT(ch_samples)
}
main.nf
include { FASTP } from './modules/fastp.nf'

workflow {
    ch_samples = channel.fromPath("./data/samples.csv")
        .splitCsv(header: true)
        .map { row ->
            def sample_meta = [
                id: row.sample_id.toLowerCase(),
                organism: row.organism,
                tissue: row.tissue_type.replaceAll('_', ' ').toLowerCase(),
                depth: row.sequencing_depth.toInteger(),
                quality: row.quality_score.toDouble()
            ]
            def fastq_path = file(row.file_path)

            def m = (fastq_path.name =~ /^(.+)_S(\d+)_L(\d{3})_(R[12])_(\d{3})\.fastq(?:\.gz)?$/)
            def file_meta = m ? [
                sample_num: m[0][2].toInteger(),
                lane: m[0][3],
                read: m[0][4],
                chunk: m[0][5]
            ] : [:]

            def priority = sample_meta.quality > 40 ? 'high' : 'normal'
            return tuple(sample_meta + file_meta + [priority: priority], fastq_path)
        }

    ch_fastp = FASTP(ch_samples)
}

Ora eseguite il flusso di lavoro e controllate i report generati in results/reports/. Dovrebbero contenere informazioni di base su ogni campione.

Output del comando
<!-- TODO: output -->

Ma cosa succede se vogliamo aggiungere informazioni su quando e dove è avvenuta l'elaborazione? Modifichiamo il processo per usare variabili shell e un po' di sostituzione di comandi per includere l'utente corrente, il nome host e la data nel report:

modules/generate_report.nf
    script:
    """
    echo "Processing ${reads}" > ${meta.id}_report.txt
    echo "Sample: ${meta.id}" >> ${meta.id}_report.txt
    echo "Processed by: ${USER}" >> ${meta.id}_report.txt
    echo "Hostname: $(hostname)" >> ${meta.id}_report.txt
    echo "Date: $(date)" >> ${meta.id}_report.txt
    """
modules/generate_report.nf
    script:
    """
    echo "Processing ${reads}" > ${meta.id}_report.txt
    echo "Sample: ${meta.id}" >> ${meta.id}_report.txt
    """

Se eseguite questo, noterete un errore - Nextflow cerca di interpretare ${USER} come una variabile Nextflow che non esiste.

Output del comando
Error modules/generate_report.nf:15:27: `USER` is not defined
│  15 |     echo "Processed by: ${USER}" >> ${meta.id}_report.txt
╰     |                           ^^^^

ERROR ~ Script compilation failed

Dobbiamo fare l'escape in modo che Bash possa gestirlo invece.

Risolvete questo facendo l'escape delle variabili shell e delle sostituzioni di comandi con un backslash (\):

modules/generate_report.nf
    script:
    """
    echo "Processing ${reads}" > ${meta.id}_report.txt
    echo "Sample: ${meta.id}" >> ${meta.id}_report.txt
    echo "Processed by: \${USER}" >> ${meta.id}_report.txt
    echo "Hostname: \$(hostname)" >> ${meta.id}_report.txt
    echo "Date: \$(date)" >> ${meta.id}_report.txt
    """
modules/generate_report.nf
    script:
    """
    echo "Processing ${reads}" > ${meta.id}_report.txt
    echo "Sample: ${meta.id}" >> ${meta.id}_report.txt
    echo "Processed by: ${USER}" >> ${meta.id}_report.txt
    echo "Hostname: $(hostname)" >> ${meta.id}_report.txt
    echo "Date: $(date)" >> ${meta.id}_report.txt
    """

Ora funziona! Il backslash (\) dice a Nextflow "non interpretare questo, passalo a Bash."

Takeaway

In questa sezione, avete imparato le tecniche di elaborazione delle stringhe:

  • Espressioni regolari per l'analisi dei file: Usare l'operatore =~ e i pattern regex (~/pattern/) per estrarre metadati da convenzioni di denominazione di file complesse
  • Generazione dinamica di script: Usare logica condizionale (if/else, operatori ternari) per generare diverse stringhe di script in base alle caratteristiche dell'input
  • Interpolazione di variabili: Capire quando Nextflow interpreta le stringhe rispetto a quando lo fa la shell
  • ${var} - Variabili Nextflow (interpolate da Nextflow al momento della compilazione del flusso di lavoro)
  • \${var} - Variabili d'ambiente shell (con escape, passate a bash a runtime)
  • \$(cmd) - Sostituzione di comandi shell (con escape, eseguita da bash a runtime)

Questi pattern di elaborazione e generazione di stringhe sono essenziali per gestire i diversi formati di file e le convenzioni di denominazione che incontrerete nei flussi di lavoro bioinformatici del mondo reale.


3. Creare Funzioni Riutilizzabili

La logica complessa del flusso di lavoro inline negli operatori dei canali o nelle definizioni dei processi riduce la leggibilità e la manutenibilità. Le funzioni vi permettono di estrarre questa logica in componenti con nome e riutilizzabili.

La nostra operazione map è diventata lunga e complessa. Estraiamola in una funzione riutilizzabile usando la parola chiave def.

Per illustrare come appare con il nostro flusso di lavoro esistente, apportate la modifica seguente, usando def per definire una funzione riutilizzabile chiamata separateMetadata:

main.nf
include { FASTP } from './modules/fastp.nf'
include { GENERATE_REPORT } from './modules/generate_report.nf'

def separateMetadata(row) {
    def sample_meta = [
        id: row.sample_id.toLowerCase(),
        organism: row.organism,
        tissue: row.tissue_type.replaceAll('_', ' ').toLowerCase(),
        depth: row.sequencing_depth.toInteger(),
        quality: row.quality_score.toDouble()
    ]
    def fastq_path = file(row.file_path)

    def m = (fastq_path.name =~ /^(.+)_S(\d+)_L(\d{3})_(R[12])_(\d{3})\.fastq(?:\.gz)?$/)
    def file_meta = m ? [
        sample_num: m[0][2].toInteger(),
        lane: m[0][3],
        read: m[0][4],
        chunk: m[0][5]
    ] : [:]

    def priority = sample_meta.quality > 40 ? 'high' : 'normal'
    return tuple(sample_meta + file_meta + [priority: priority], fastq_path)
}

workflow {
    ch_samples = channel.fromPath("./data/samples.csv")
        .splitCsv(header: true)
        .map{ row -> separateMetadata(row) }

    ch_fastp = FASTP(ch_samples)
    GENERATE_REPORT(ch_samples)
}
main.nf
include { FASTP } from './modules/fastp.nf'
include { GENERATE_REPORT } from './modules/generate_report.nf'

workflow {
    ch_samples = channel.fromPath("./data/samples.csv")
        .splitCsv(header: true)
        .map { row ->
            def sample_meta = [
                id: row.sample_id.toLowerCase(),
                organism: row.organism,
                tissue: row.tissue_type.replaceAll('_', ' ').toLowerCase(),
                depth: row.sequencing_depth.toInteger(),
                quality: row.quality_score.toDouble()
            ]
            def fastq_path = file(row.file_path)

            def m = (fastq_path.name =~ /^(.+)_S(\d+)_L(\d{3})_(R[12])_(\d{3})\.fastq(?:\.gz)?$/)
            def file_meta = m ? [
                sample_num: m[0][2].toInteger(),
                lane: m[0][3],
                read: m[0][4],
                chunk: m[0][5]
            ] : [:]

            def priority = sample_meta.quality > 40 ? 'high' : 'normal'
            return tuple(sample_meta + file_meta + [priority: priority], fastq_path)
        }

    ch_fastp = FASTP(ch_samples)
    GENERATE_REPORT(ch_samples)
}

Estraendo questa logica in una funzione, abbiamo ridotto la logica effettiva del flusso di lavoro a qualcosa di molto più pulito:

minimal workflow
    ch_samples = channel.fromPath("./data/samples.csv")
        .splitCsv(header: true)
        .map{ row -> separateMetadata(row) }

    ch_fastp = FASTP(ch_samples)
    GENERATE_REPORT(ch_samples)

Questo rende la logica del flusso di lavoro molto più facile da leggere e capire a colpo d'occhio. La funzione separateMetadata incapsula tutta la logica complessa per l'analisi e l'arricchimento dei metadati, rendendola riutilizzabile e testabile.

Eseguite il flusso di lavoro per assicurarvi che funzioni ancora:

nextflow run main.nf
Output del comando
N E X T F L O W   ~  version 25.10.2

Launching `main.nf` [admiring_panini] DSL2 - revision: 8cc832e32f

executor >  local (6)
[8c/2e3f91] process > FASTP (3)           [100%] 3 of 3 ✔
[7a/1b4c92] process > GENERATE_REPORT (3) [100%] 3 of 3 ✔

L'output dovrebbe mostrare entrambi i processi completati con successo. Il flusso di lavoro è ora molto più pulito e facile da mantenere, con tutta la logica complessa di elaborazione dei metadati incapsulata nella funzione separateMetadata.

Takeaway

In questa sezione, avete imparato la creazione di funzioni:

  • Definire funzioni con def: La parola chiave per creare funzioni con nome (come def in Python o function in JavaScript)
  • Scope delle funzioni: Le funzioni definite a livello di script sono accessibili in tutto il vostro flusso di lavoro Nextflow
  • Valori di ritorno: Le funzioni restituiscono automaticamente l'ultima espressione, oppure usate return esplicito
  • Codice più pulito: Estrarre la logica complessa in funzioni è una pratica fondamentale di ingegneria del software in qualsiasi linguaggio

Successivamente, esploreremo come usare le closure nelle direttive dei processi per l'allocazione dinamica delle risorse.


4. Direttive Dinamiche delle Risorse con le Closure

Finora abbiamo usato lo scripting nel blocco script dei processi. Ma le closure (introdotte nella Sezione 1.1) sono anche incredibilmente utili nelle direttive dei processi, specialmente per l'allocazione dinamica delle risorse. Aggiungiamo direttive di risorse al nostro processo FASTP che si adattano in base alle caratteristiche del campione.

4.1. Allocazione delle risorse specifica per campione

Attualmente, il nostro processo FASTP usa le risorse predefinite. Rendiamolo più intelligente allocando più CPU per i campioni ad alta profondità. Modificate modules/fastp.nf per includere una direttiva cpus dinamica e una direttiva memory statica:

modules/fastp.nf
1
2
3
4
5
6
7
8
process FASTP {
    container 'community.wave.seqera.io/library/fastp:0.24.0--62c97b06e8447690'

    cpus { meta.depth > 40000000 ? 2 : 1 }
    memory 2.GB

    input:
    tuple val(meta), path(reads)
modules/fastp.nf
1
2
3
4
5
process FASTP {
    container 'community.wave.seqera.io/library/fastp:0.24.0--62c97b06e8447690'

    input:
    tuple val(meta), path(reads)

La closure { meta.depth > 40000000 ? 2 : 1 } usa l'operatore ternario (trattato nella Sezione 1.1) e viene valutata per ogni attività, consentendo l'allocazione delle risorse per campione. I campioni ad alta profondità (>40M letture) ottengono 2 CPU, mentre gli altri ottengono 1 CPU.

Nota: Accesso alle Variabili di Input nelle Direttive

La closure può accedere a qualsiasi variabile di input (come meta qui) perché Nextflow valuta queste closure nel contesto dell'esecuzione di ogni attività.

Eseguite di nuovo il flusso di lavoro con l'opzione -ansi-log false per rendere più facile vedere gli hash delle attività.

nextflow run main.nf -ansi-log false
Output del comando
N E X T F L O W  ~  version 25.10.2
Launching `main.nf` [fervent_albattani] DSL2 - revision: fa8f249759
[bd/ff3d41] Submitted process > FASTP (2)
[a4/a3aab2] Submitted process > FASTP (1)
[48/6db0c9] Submitted process > FASTP (3)
[ec/83439d] Submitted process > GENERATE_REPORT (3)
[bd/15d7cc] Submitted process > GENERATE_REPORT (2)
[42/699357] Submitted process > GENERATE_REPORT (1)

Potete controllare il comando docker esatto che è stato eseguito per vedere l'allocazione delle CPU per qualsiasi attività:

Check docker command
cat work/48/6db0c9e9d8aa65e4bb4936cd3bd59e/.command.run | grep "docker run"

Dovreste vedere qualcosa come:

docker command
    docker run -i --cpu-shares 4096 --memory 2048m -e "NXF_TASK_WORKDIR" -v /workspaces/training/side-quests/essential_scripting_patterns:/workspaces/training/side-quests/essential_scripting_patterns -w "$NXF_TASK_WORKDIR" --name $NXF_BOXID community.wave.seqera.io/library/fastp:0.24.0--62c97b06e8447690 /bin/bash -ue /workspaces/training/side-quests/essential_scripting_patterns/work/48/6db0c9e9d8aa65e4bb4936cd3bd59e/.command.sh

In questo esempio abbiamo scelto un esempio che ha richiesto 2 CPU (--cpu-shares 2048), perché era un campione ad alta profondità, ma dovreste vedere diverse allocazioni di CPU a seconda della profondità del campione. Provate anche per le altre attività.

4.2. Strategie di retry

Un altro pattern potente è l'uso di task.attempt per le strategie di retry. Per mostrare perché questo è utile, inizieremo riducendo l'allocazione di memoria a FASTP a meno di quanto ne ha bisogno. Cambiate la direttiva memory in modules/fastp.nf a 1.GB:

modules/fastp.nf
1
2
3
4
5
6
7
8
process FASTP {
    container 'community.wave.seqera.io/library/fastp:0.24.0--62c97b06e8447690'

    cpus { meta.depth > 40000000 ? 4 : 2 }
    memory 1.GB

    input:
    tuple val(meta), path(reads)
modules/fastp.nf
1
2
3
4
5
6
7
8
process FASTP {
    container 'community.wave.seqera.io/library/fastp:0.24.0--62c97b06e8447690'

    cpus { meta.depth > 40000000 ? 4 : 2 }
    memory 2.GB

    input:
    tuple val(meta), path(reads)

... ed eseguite di nuovo il flusso di lavoro:

nextflow run main.nf
Output del comando
Command exit status:
  137

Command output:
  (empty)

Command error:
  Detecting adapter sequence for read1...
  No adapter detected for read1

  .command.sh: line 7:   101 Killed                  fastp --in1 SAMPLE_002_S2_L001_R1_001.fastq --out1 sample_002_trimmed.fastq.gz --json sample_002.fastp.json --html sample_002.fastp.html --thread 2

Questo indica che il processo è stato terminato per aver superato i limiti di memoria.

Questo è uno scenario molto comune nei flussi di lavoro del mondo reale - a volte non si sa quanta memoria un'attività avrà bisogno finché non la si esegue.

Per rendere il nostro flusso di lavoro più robusto, possiamo implementare una strategia di retry che aumenta l'allocazione di memoria ad ogni tentativo, usando ancora una volta una closure Groovy. Modificate la direttiva memory per moltiplicare la memoria base per task.attempt, e aggiungete le direttive errorStrategy 'retry' e maxRetries 2:

modules/fastp.nf
process FASTP {
    container 'community.wave.seqera.io/library/fastp:0.24.0--62c97b06e8447690'

    cpus { meta.depth > 40000000 ? 4 : 2 }
    memory { 1.GB * task.attempt }
    errorStrategy 'retry'
    maxRetries 2

    input:
    tuple val(meta), path(reads)
modules/fastp.nf
1
2
3
4
5
6
7
8
process FASTP {
    container 'community.wave.seqera.io/library/fastp:0.24.0--62c97b06e8447690'

    cpus { meta.depth > 40000000 ? 4 : 2 }
    memory 2.GB

    input:
    tuple val(meta), path(reads)

Ora se il processo fallisce per memoria insufficiente, Nextflow riproverà con più memoria:

  • Primo tentativo: 1 GB (task.attempt = 1)
  • Secondo tentativo: 2 GB (task.attempt = 2)

... e così via, fino al limite maxRetries.

Takeaway

Le direttive dinamiche con le closure vi permettono di:

  • Allocare risorse in base alle caratteristiche dell'input
  • Implementare strategie di retry automatiche con risorse crescenti
  • Combinare più fattori (metadati, numero di tentativo, priorità)
  • Usare logica condizionale per calcoli complessi delle risorse

Questo rende i vostri flussi di lavoro sia più efficienti (senza sovra-allocare) che più robusti (retry automatico con più risorse).


5. Logica Condizionale e Controllo dei Processi

In precedenza, abbiamo usato .map() con lo scripting per trasformare i dati del canale. Ora useremo la logica condizionale per controllare quali processi vengono eseguiti in base ai dati—essenziale per flussi di lavoro flessibili che si adattano a diversi tipi di campioni.

Gli operatori di dataflow di Nextflow accettano closure valutate a runtime, consentendo alla logica condizionale di guidare le decisioni del flusso di lavoro in base al contenuto del canale.

5.1. Routing con .branch()

Ad esempio, fingiamo che i nostri campioni di sequenziamento debbano essere tagliati con FASTP solo se sono campioni umani con una copertura superiore a una certa soglia. I campioni di topo o a bassa copertura dovrebbero essere eseguiti con Trimgalore invece (questo è un esempio artificioso, ma illustra il punto).

Abbiamo fornito un semplice processo Trimgalore in modules/trimgalore.nf, date un'occhiata se volete, ma i dettagli non sono importanti per questo esercizio. Il punto chiave è che vogliamo instradare i campioni in base ai loro metadati.

Includete il nuovo modulo da modules/trimgalore.nf:

main.nf
include { FASTP } from './modules/fastp.nf'
include { TRIMGALORE } from './modules/trimgalore.nf'
main.nf
include { FASTP } from './modules/fastp.nf'

... e poi modificate il vostro flusso di lavoro main.nf per ramificare i campioni in base ai loro metadati e instradarli attraverso il processo di trimming appropriato, in questo modo:

main.nf
    ch_samples = channel.fromPath("./data/samples.csv")
        .splitCsv(header: true)
        .map { row -> separateMetadata(row) }

    trim_branches = ch_samples
        .branch { meta, reads ->
            fastp: meta.organism == 'human' && meta.depth >= 30000000
            trimgalore: true
        }

    ch_fastp = FASTP(trim_branches.fastp)
    ch_trimgalore = TRIMGALORE(trim_branches.trimgalore)
    GENERATE_REPORT(ch_samples)
main.nf
    ch_samples = channel.fromPath("./data/samples.csv")
        .splitCsv(header: true)
        .map { row -> separateMetadata(row) }

    ch_fastp = FASTP(ch_samples)
    GENERATE_REPORT(ch_samples)

Eseguite questo flusso di lavoro modificato:

nextflow run main.nf
Output del comando
N E X T F L O W   ~  version 25.10.2

Launching `main.nf` [adoring_galileo] DSL2 - revision: c9e83aaef1

executor >  local (6)
[1d/0747ac] process > FASTP (2)           [100%] 2 of 2 ✔
[cc/c44caf] process > TRIMGALORE (1)      [100%] 1 of 1 ✔
[34/bd5a9f] process > GENERATE_REPORT (1) [100%] 3 of 3 ✔

Qui abbiamo usato piccole ma potenti espressioni condizionali all'interno dell'operatore .branch{} per instradare i campioni in base ai loro metadati. I campioni umani con alta copertura passano attraverso FASTP, mentre tutti gli altri campioni passano attraverso TRIMGALORE.

5.2. Usare .filter() con la Truthiness

Un altro pattern potente per controllare l'esecuzione del flusso di lavoro è l'operatore .filter(), che usa una closure per determinare quali elementi dovrebbero continuare nella pipeline. All'interno della closure del filter, scriverete espressioni booleane che decidono quali elementi passano.

Nextflow (come molti linguaggi dinamici) ha un concetto di "truthiness" che determina quali valori vengono valutati come true o false in contesti booleani:

  • Truthy: Valori non-null, stringhe non vuote, numeri non-zero, collezioni non vuote
  • Falsy: null, stringhe vuote "", zero 0, collezioni vuote [] o [:], false

Questo significa che meta.id da solo (senza != null esplicito) controlla se l'ID esiste e non è vuoto. Usiamo questo per filtrare i campioni che non soddisfano i nostri requisiti di qualità.

Aggiungete quanto segue prima dell'operazione di branch:

main.nf
    ch_samples = channel.fromPath("./data/samples.csv")
        .splitCsv(header: true)
        .map { row -> separateMetadata(row) }

    // Filtrare i campioni non validi o di bassa qualità
    ch_valid_samples = ch_samples
        .filter { meta, reads ->
            meta.id && meta.organism && meta.depth >= 25000000
        }

    trim_branches = ch_valid_samples
        .branch { meta, reads ->
            fastp: meta.organism == 'human' && meta.depth >= 30000000
            trimgalore: true
        }
main.nf
    ch_samples = channel.fromPath("./data/samples.csv")
        .splitCsv(header: true)
        .map { row -> separateMetadata(row) }

    trim_branches = ch_samples
        .branch { meta, reads ->
            fastp: meta.organism == 'human' && meta.depth >= 30000000
            trimgalore: true
        }

Eseguite di nuovo il flusso di lavoro:

nextflow run main.nf
Output del comando
N E X T F L O W  ~  version 25.10.2
Launching `main.nf` [lonely_williams] DSL2 - revision: d0b3f121ec
[94/b48eac] Submitted process > FASTP (2)
[2c/d2b28f] Submitted process > GENERATE_REPORT (2)
[65/2e3be4] Submitted process > GENERATE_REPORT (1)
[94/b48eac] NOTE: Process `FASTP (2)` terminated with an error exit status (137) -- Execution is retried (1)
[3e/0d8664] Submitted process > TRIMGALORE (1)
[6a/9137b0] Submitted process > FASTP (1)
[6a/9137b0] NOTE: Process `FASTP (1)` terminated with an error exit status (137) -- Execution is retried (1)
[83/577ac0] Submitted process > GENERATE_REPORT (3)
[a2/5117de] Re-submitted process > FASTP (1)
[1f/a1a4ca] Re-submitted process > FASTP (2)

Poiché abbiamo scelto un filtro che esclude alcuni campioni, sono state eseguite meno attività.

L'espressione del filtro meta.id && meta.organism && meta.depth >= 25000000 combina la truthiness con confronti espliciti:

  • meta.id && meta.organism controlla che entrambi i campi esistano e non siano vuoti (usando la truthiness)
  • meta.depth >= 25000000 garantisce una profondità di sequenziamento sufficiente con un confronto esplicito

Nota: La Truthiness in Pratica

L'espressione meta.id && meta.organism è più concisa che scrivere:

meta.id != null && meta.id != '' && meta.organism != null && meta.organism != ''

Questo rende la logica di filtraggio molto più pulita e facile da leggere.

Takeaway

In questa sezione, avete imparato a usare la logica condizionale per controllare l'esecuzione del flusso di lavoro usando le interfacce closure degli operatori Nextflow come .branch{} e .filter{}, sfruttando la truthiness per scrivere espressioni condizionali concise.

La nostra pipeline ora instrada intelligentemente i campioni attraverso i processi appropriati, ma i flussi di lavoro in produzione devono gestire i dati non validi con grazia. Rendiamo il nostro flusso di lavoro robusto contro valori mancanti o null.


6. Operatori di Navigazione Sicura ed Elvis

La nostra funzione separateMetadata attualmente assume che tutti i campi CSV siano presenti e validi. Ma cosa succede con dati incompleti? Scopriamolo.

6.1. Il Problema: Accedere a Proprietà che Non Esistono

Diciamo che vogliamo aggiungere supporto per informazioni opzionali sul run di sequenziamento. In alcuni laboratori, i campioni potrebbero avere un campo aggiuntivo per l'ID del run di sequenziamento o il numero di batch, ma il nostro CSV attuale non ha questa colonna. Proviamo ad accedervi comunque.

Modificate la funzione separateMetadata per includere un campo run_id:

main.nf
def separateMetadata(row) {
    def sample_meta = [
        id: row.sample_id.toLowerCase(),
        organism: row.organism,
        tissue: row.tissue_type.replaceAll('_', ' ').toLowerCase(),
        depth: row.sequencing_depth.toInteger(),
        quality: row.quality_score.toDouble()
    ]
    def run_id = row.run_id.toUpperCase()
main.nf
def separateMetadata(row) {
    def sample_meta = [
        id: row.sample_id.toLowerCase(),
        organism: row.organism,
        tissue: row.tissue_type.replaceAll('_', ' ').toLowerCase(),
        depth: row.sequencing_depth.toInteger(),
        quality: row.quality_score.toDouble()
    ]

Ora eseguite il flusso di lavoro:

nextflow run main.nf
Output del comando
N E X T F L O W   ~  version 25.10.2

Launching `main.nf` [trusting_torvalds] DSL2 - revision: b56fbfbce2

ERROR ~ Cannot invoke method toUpperCase() on null object

-- Check script 'main.nf' at line: 13 or see '.nextflow.log' file for more details

Questo si blocca con una NullPointerException.

Il problema è che row.run_id restituisce null perché la colonna run_id non esiste nel nostro CSV. Quando proviamo a chiamare .toUpperCase() su null, si blocca. È qui che l'operatore di navigazione sicura ci salva.

6.2. Operatore di Navigazione Sicura (?.)

L'operatore di navigazione sicura (?.) restituisce null invece di lanciare un'eccezione quando viene chiamato su un valore null. Se l'oggetto prima di ?. è null, l'intera espressione viene valutata come null senza eseguire il metodo.

Aggiornate la funzione per usare la navigazione sicura:

main.nf
def separateMetadata(row) {
    def sample_meta = [
        id: row.sample_id.toLowerCase(),
        organism: row.organism,
        tissue: row.tissue_type.replaceAll('_', ' ').toLowerCase(),
        depth: row.sequencing_depth.toInteger(),
        quality: row.quality_score.toDouble()
    ]
    def run_id = row.run_id?.toUpperCase()
main.nf
def separateMetadata(row) {
    def sample_meta = [
        id: row.sample_id.toLowerCase(),
        organism: row.organism,
        tissue: row.tissue_type.replaceAll('_', ' ').toLowerCase(),
        depth: row.sequencing_depth.toInteger(),
        quality: row.quality_score.toDouble()
    ]
    def run_id = row.run_id.toUpperCase()

Eseguite di nuovo:

nextflow run main.nf
Output del comando
<!-- TODO: output -->

Nessun crash! Il flusso di lavoro ora gestisce il campo mancante con grazia. Quando row.run_id è null, l'operatore ?. impedisce la chiamata a .toUpperCase(), e run_id diventa null invece di causare un'eccezione.

6.3. Operatore Elvis (?:) per i Valori Predefiniti

L'operatore Elvis (?:) fornisce valori predefiniti quando il lato sinistro è "falsy" (come spiegato in precedenza). Prende il nome da Elvis Presley perché ?: assomiglia ai suoi famosi capelli e occhi visti di lato!

Ora che usiamo la navigazione sicura, run_id sarà null per i campioni senza quel campo. Usiamo l'operatore Elvis per fornire un valore predefinito e aggiungerlo alla nostra map sample_meta:

main.nf
def separateMetadata(row) {
    def sample_meta = [
        id: row.sample_id.toLowerCase(),
        organism: row.organism,
        tissue: row.tissue_type.replaceAll('_', ' ').toLowerCase(),
        depth: row.sequencing_depth.toInteger(),
        quality: row.quality_score.toDouble()
    ]
    def run_id = row.run_id?.toUpperCase() ?: 'UNSPECIFIED'
    sample_meta.run = run_id
main.nf
def separateMetadata(row) {
    def sample_meta = [
        id: row.sample_id.toLowerCase(),
        organism: row.organism,
        tissue: row.tissue_type.replaceAll('_', ' ').toLowerCase(),
        depth: row.sequencing_depth.toInteger(),
        quality: row.quality_score.toDouble()
    ]
    def run_id = row.run_id?.toUpperCase()

Aggiungete anche un operatore view() nel flusso di lavoro per vedere i risultati:

main.nf
    ch_samples = channel.fromPath("./data/samples.csv")
        .splitCsv(header: true)
        .map{ row -> separateMetadata(row) }
        .view()
main.nf
    ch_samples = channel.fromPath("./data/samples.csv")
        .splitCsv(header: true)
        .map{ row -> separateMetadata(row) }

ed eseguite il flusso di lavoro:

nextflow run main.nf
Output del comando
[[id:sample_001, organism:human, tissue:liver, depth:30000000, quality:38.5, run:UNSPECIFIED, sample_num:1, lane:001, read:R1, chunk:001, priority:normal], /workspaces/training/side-quests/essential_scripting_patterns/data/sequences/SAMPLE_001_S1_L001_R1_001.fastq]
[[id:sample_002, organism:mouse, tissue:brain, depth:25000000, quality:35.2, run:UNSPECIFIED, sample_num:2, lane:001, read:R1, chunk:001, priority:normal], /workspaces/training/side-quests/essential_scripting_patterns/data/sequences/SAMPLE_002_S2_L001_R1_001.fastq]
[[id:sample_003, organism:human, tissue:kidney, depth:45000000, quality:42.1, run:UNSPECIFIED, sample_num:3, lane:001, read:R1, chunk:001, priority:high], /workspaces/training/side-quests/essential_scripting_patterns/data/sequences/SAMPLE_003_S3_L001_R1_001.fastq]

Perfetto! Ora tutti i campioni hanno un campo run con il loro ID run effettivo (in maiuscolo) o il valore predefinito 'UNSPECIFIED'. La combinazione di ?. e ?: fornisce sia sicurezza (nessun crash) che valori predefiniti sensati.

Rimuovete ora l'operatore .view() ora che abbiamo confermato che funziona.

Suggerimento: Combinare Navigazione Sicura ed Elvis

Il pattern value?.method() ?: 'default' è comune nei flussi di lavoro in produzione:

  • value?.method() - Chiama il metodo in modo sicuro, restituisce null se value è null
  • ?: 'default' - Fornisce un fallback se il risultato è null

Questo pattern gestisce i dati mancanti/incompleti con grazia.

Usate questi operatori in modo coerente nelle funzioni, nelle closure degli operatori (.map{}, .filter{}), negli script dei processi e nei file di configurazione. Prevengono i crash quando si gestiscono dati del mondo reale.

Takeaway

  • Navigazione sicura (?.): Previene i crash sui valori null - restituisce null invece di lanciare un'eccezione
  • Operatore Elvis (?:): Fornisce valori predefiniti - value ?: 'default'
  • Combinazione: value?.method() ?: 'default' è il pattern comune

Questi operatori rendono i flussi di lavoro resilienti ai dati incompleti - essenziale per il lavoro nel mondo reale.


7. Validazione con error() e log.warn

A volte è necessario fermare immediatamente il flusso di lavoro se i parametri di input non sono validi. In Nextflow, potete usare funzioni integrate come error() e log.warn, così come costrutti di programmazione standard come le istruzioni if e la logica booleana, per implementare la logica di validazione. Aggiungiamo la validazione al nostro flusso di lavoro.

Create una funzione di validazione prima del blocco del flusso di lavoro, chiamatela dal flusso di lavoro e cambiate la creazione del canale per usare un parametro per il percorso del file CSV. Se il parametro manca o il file non esiste, chiamate error() per fermare l'esecuzione con un messaggio chiaro.

main.nf
include { FASTP } from './modules/fastp.nf'
include { TRIMGALORE } from './modules/trimgalore.nf'
include { GENERATE_REPORT } from './modules/generate_report.nf'

def validateInputs() {
    // Controlla che il parametro di input sia fornito
    if (!params.input) {
        error("Input CSV file path not provided. Please specify --input <file.csv>")
    }

    // Controlla che il file CSV esista
    if (!file(params.input).exists()) {
        error("Input CSV file not found: ${params.input}")
    }
}
...
workflow {
    validateInputs()
    ch_samples = channel.fromPath(params.input)
main.nf
1
2
3
4
5
6
7
include { FASTP } from './modules/fastp.nf'
include { TRIMGALORE } from './modules/trimgalore.nf'
include { GENERATE_REPORT } from './modules/generate_report.nf'

...
workflow {
    ch_samples = channel.fromPath("./data/samples.csv")

Ora provate a eseguire senza il file CSV:

nextflow run main.nf
Output del comando
N E X T F L O W   ~  version 25.10.2

Launching `main.nf` [confident_coulomb] DSL2 - revision: 07059399ed

WARN: Access to undefined parameter `input` -- Initialise it to a default value eg. `params.input = some_value`
Input CSV file path not provided. Please specify --input <file.csv>

Il flusso di lavoro si ferma immediatamente con un messaggio di errore chiaro invece di fallire misteriosamente in seguito.

Ora eseguitelo con un file inesistente:

nextflow run main.nf --input ./data/nonexistent.csv
Output del comando
N E X T F L O W   ~  version 25.10.2

Launching `main.nf` [cranky_gates] DSL2 - revision: 26839ae3eb

Input CSV file not found: ./data/nonexistent.csv

Infine, eseguitelo con il file corretto:

nextflow run main.nf --input ./data/samples.csv
Output del comando
<!-- TODO: output -->

Questa volta viene eseguito con successo.

Potete anche aggiungere la validazione all'interno della funzione separateMetadata. Usiamo il non-fatale log.warn per emettere avvisi per i campioni con bassa profondità di sequenziamento, ma permettiamo comunque al flusso di lavoro di continuare:

main.nf
1
2
3
4
5
6
7
8
9
    def priority = sample_meta.quality > 40 ? 'high' : 'normal'

    // Validare che i dati abbiano senso
    if (sample_meta.depth < 30000000) {
        log.warn "Low sequencing depth for ${sample_meta.id}: ${sample_meta.depth}"
    }

    return tuple(sample_meta + file_meta + [priority: priority], fastq_path)
}
main.nf
1
2
3
4
    def priority = sample_meta.quality > 40 ? 'high' : 'normal'

    return tuple(sample_meta + file_meta + [priority: priority], fastq_path)
}

Eseguite di nuovo il flusso di lavoro con il CSV originale:

nextflow run main.nf --input ./data/samples.csv
Output del comando
N E X T F L O W   ~  version 25.10.2

Launching `main.nf` [awesome_goldwasser] DSL2 - revision: a31662a7c1

executor >  local (5)
[ce/df5eeb] process > FASTP (2)           [100%] 2 of 2 ✔
[-        ] process > TRIMGALORE          -
[d1/7d2b4b] process > GENERATE_REPORT (3) [100%] 3 of 3 ✔
WARN: Low sequencing depth for sample_002: 25000000

Vediamo un avviso sulla bassa profondità di sequenziamento per uno dei campioni.

Takeaway

  • error(): Ferma immediatamente il flusso di lavoro con un messaggio chiaro
  • log.warn: Emette avvisi senza fermare il flusso di lavoro
  • Validazione anticipata: Controllare gli input prima dell'elaborazione per fallire velocemente con errori utili
  • Funzioni di validazione: Creare logica di validazione riutilizzabile che può essere chiamata all'avvio del flusso di lavoro

Una validazione appropriata rende i flussi di lavoro più robusti e facili da usare, catturando i problemi in anticipo con messaggi di errore chiari.


8. Handler degli Eventi del Flusso di Lavoro

Finora, abbiamo scritto codice negli script del flusso di lavoro e nelle definizioni dei processi. Ma c'è un'altra caratteristica importante che dovreste conoscere: gli handler degli eventi del flusso di lavoro.

Gli handler degli eventi sono closure che vengono eseguite in punti specifici del ciclo di vita del vostro flusso di lavoro. Sono perfetti per aggiungere logging, notifiche o operazioni di pulizia. Questi handler devono essere definiti nel vostro script del flusso di lavoro insieme alla definizione del flusso di lavoro.

8.1. L'Handler onComplete

L'handler degli eventi più comunemente usato è onComplete, che viene eseguito quando il vostro flusso di lavoro termina (che abbia avuto successo o meno). Aggiungiamone uno per riassumere i risultati della nostra pipeline.

Aggiungete l'handler degli eventi al vostro file main.nf, all'interno della definizione del flusso di lavoro:

main.nf
    ch_fastp = FASTP(trim_branches.fastp)
    ch_trimgalore = TRIMGALORE(trim_branches.trimgalore)
    GENERATE_REPORT(ch_samples)

    workflow.onComplete = {
        println ""
        println "Pipeline execution summary:"
        println "=========================="
        println "Completed at: ${workflow.complete}"
        println "Duration    : ${workflow.duration}"
        println "Success     : ${workflow.success}"
        println "workDir     : ${workflow.workDir}"
        println "exit status : ${workflow.exitStatus}"
        println ""
    }
}
main.nf
    ch_fastp = FASTP(trim_branches.fastp)
    ch_trimgalore = TRIMGALORE(trim_branches.trimgalore)
    GENERATE_REPORT(ch_samples)
}

Questa closure viene eseguita quando il flusso di lavoro si completa. All'interno, avete accesso all'oggetto workflow che fornisce proprietà utili sull'esecuzione.

Eseguite il vostro flusso di lavoro e vedrete questo riepilogo apparire alla fine!

nextflow run main.nf --input ./data/samples.csv -ansi-log false
Output del comando
N E X T F L O W  ~  version 25.10.2
Launching `main.nf` [marvelous_boltzmann] DSL2 - revision: a31662a7c1
WARN: Low sequencing depth for sample_002: 25000000
[9b/d48e40] Submitted process > FASTP (2)
[6a/73867a] Submitted process > GENERATE_REPORT (2)
[79/ad0ac5] Submitted process > GENERATE_REPORT (1)
[f3/bda6cb] Submitted process > FASTP (1)
[34/d5b52f] Submitted process > GENERATE_REPORT (3)

Pipeline execution summary:
==========================
Completed at: 2025-10-10T12:14:24.885384+01:00
Duration    : 2.9s
Success     : true
workDir     : /workspaces/training/side-quests/essential_scripting_patterns/work
exit status : 0

Rendiamolo più utile aggiungendo logica condizionale:

main.nf
    ch_fastp = FASTP(trim_branches.fastp)
    ch_trimgalore = TRIMGALORE(trim_branches.trimgalore)
    GENERATE_REPORT(ch_samples)

    workflow.onComplete = {
        println ""
        println "Pipeline execution summary:"
        println "=========================="
        println "Completed at: ${workflow.complete}"
        println "Duration    : ${workflow.duration}"
        println "Success     : ${workflow.success}"
        println "workDir     : ${workflow.workDir}"
        println "exit status : ${workflow.exitStatus}"
        println ""

        if (workflow.success) {
            println "✅ Pipeline completed successfully!"
        } else {
            println "❌ Pipeline failed!"
            println "Error: ${workflow.errorMessage}"
        }
    }
}
main.nf
    ch_fastp = FASTP(trim_branches.fastp)
    ch_trimgalore = TRIMGALORE(trim_branches.trimgalore)
    GENERATE_REPORT(ch_samples)

    workflow.onComplete = {
        println ""
        println "Pipeline execution summary:"
        println "=========================="
        println "Completed at: ${workflow.complete}"
        println "Duration    : ${workflow.duration}"
        println "Success     : ${workflow.success}"
        println "workDir     : ${workflow.workDir}"
        println "exit status : ${workflow.exitStatus}"
        println ""
    }
}

Ora otteniamo un riepilogo ancora più informativo, incluso un messaggio di successo/fallimento e la directory di output se specificata:

Output del comando
N E X T F L O W  ~  version 25.10.2
Launching `main.nf` [boring_linnaeus] DSL2 - revision: a31662a7c1
WARN: Low sequencing depth for sample_002: 25000000
[e5/242efc] Submitted process > FASTP (2)
[3b/74047c] Submitted process > GENERATE_REPORT (3)
[8a/7a57e6] Submitted process > GENERATE_REPORT (1)
[a8/b1a31f] Submitted process > GENERATE_REPORT (2)
[40/648429] Submitted process > FASTP (1)

Pipeline execution summary:
==========================
Completed at: 2025-10-10T12:16:00.522569+01:00
Duration    : 3.6s
Success     : true
workDir     : /workspaces/training/side-quests/essential_scripting_patterns/work
exit status : 0

✅ Pipeline completed successfully!

Potete anche scrivere il riepilogo su un file usando le operazioni sui file:

main.nf - Writing summary to file
workflow {
    // ... il codice del vostro flusso di lavoro ...

    workflow.onComplete = {
        def summary = """
        Pipeline Execution Summary
        ===========================
        Completed: ${workflow.complete}
        Duration : ${workflow.duration}
        Success  : ${workflow.success}
        Command  : ${workflow.commandLine}
        """

        println summary

        // Scrivere su un file di log
        def log_file = file("${workflow.launchDir}/pipeline_summary.txt")
        log_file.text = summary
    }
}

8.2. L'Handler onError

Oltre a onComplete, c'è un altro handler degli eventi che potete usare: onError, che viene eseguito solo se il flusso di lavoro fallisce:

main.nf - onError handler
workflow {
    // ... il codice del vostro flusso di lavoro ...

    workflow.onError = {
        println "="* 50
        println "Pipeline execution failed!"
        println "Error message: ${workflow.errorMessage}"
        println "="* 50

        // Scrivere un log di errore dettagliato
        def error_file = file("${workflow.launchDir}/error.log")
        error_file.text = """
        Workflow Error Report
        =====================
        Time: ${new Date()}
        Error: ${workflow.errorMessage}
        Error report: ${workflow.errorReport ?: 'No detailed report available'}
        """

        println "Error details written to: ${error_file}"
    }
}

Potete usare più handler insieme nel vostro script del flusso di lavoro:

main.nf - Combined handlers
workflow {
    // ... il codice del vostro flusso di lavoro ...

    workflow.onError = {
        println "Workflow failed: ${workflow.errorMessage}"
    }

    workflow.onComplete = {
        def duration_mins = workflow.duration.toMinutes().round(2)
        def status = workflow.success ? "SUCCESS ✅" : "FAILED ❌"

        println """
        Pipeline finished: ${status}
        Duration: ${duration_mins} minutes
        """
    }
}

Takeaway

In questa sezione, avete imparato:

  • Closure degli handler degli eventi: Closure nel vostro script del flusso di lavoro che vengono eseguite in diversi punti del ciclo di vita
  • Handler onComplete: Per riepiloghi dell'esecuzione e report dei risultati
  • Handler onError: Per la gestione degli errori e il logging dei fallimenti
  • Proprietà dell'oggetto workflow: Accedere a workflow.success, workflow.duration, workflow.errorMessage, ecc.

Gli handler degli eventi mostrano come potete usare tutta la potenza del linguaggio Nextflow all'interno dei vostri script del flusso di lavoro per aggiungere capacità sofisticate di logging e notifica.


Congratulazioni, ce l'avete fatta!

Nel corso di questa side quest, avete costruito una pipeline completa di elaborazione dei campioni che si è evoluta dalla gestione di base dei metadati a un flusso di lavoro sofisticato e pronto per la produzione. Ogni sezione ha costruito sulla precedente, dimostrando come i costrutti di programmazione trasformino semplici flussi di lavoro in potenti sistemi di elaborazione dei dati, con i seguenti vantaggi:

  • Codice più chiaro: Comprendere il dataflow vs lo scripting vi aiuta a scrivere flussi di lavoro più organizzati
  • Gestione robusta: Gli operatori di navigazione sicura ed Elvis rendono i flussi di lavoro resilienti ai dati mancanti
  • Elaborazione flessibile: La logica condizionale permette ai vostri flussi di lavoro di elaborare diversi tipi di campioni in modo appropriato
  • Risorse adattive: Le direttive dinamiche ottimizzano l'uso delle risorse in base alle caratteristiche dell'input

Questa progressione rispecchia l'evoluzione nel mondo reale delle pipeline bioinformatiche, dai prototipi di ricerca che gestiscono pochi campioni ai sistemi di produzione che elaborano migliaia di campioni in laboratori e istituzioni. Ogni sfida che avete risolto e ogni pattern che avete imparato riflette problemi reali che gli sviluppatori affrontano quando scalano i flussi di lavoro Nextflow.

Applicare questi pattern nel vostro lavoro vi permetterà di costruire flussi di lavoro robusti e pronti per la produzione.

Pattern chiave

  1. Dataflow vs Scripting: Avete imparato a distinguere tra operazioni di dataflow (orchestrazione dei canali) e scripting (codice che manipola i dati), incluse le differenze cruciali tra operazioni su tipi diversi come collect su Channel vs List.

    • Dataflow: orchestrazione dei canali
    channel.fromPath('*.fastq').splitCsv(header: true)
    
    • Scripting: elaborazione dei dati sulle collezioni
    sample_data.collect { it.toUpperCase() }
    
  2. Elaborazione Avanzata delle Stringhe: Avete padroneggiato le espressioni regolari per l'analisi dei nomi dei file, la generazione dinamica di script nei processi e l'interpolazione di variabili (Nextflow vs Bash vs Shell).

    • Pattern matching
    filename =~ ~/^(\w+)_(\w+)_(\d+)\.fastq$/
    
    • Funzione con ritorno condizionale
    def parseSample(filename) {
        def matcher = filename =~ pattern
        return matcher ? [valid: true, data: matcher[0]] : [valid: false]
    }
    
    • Collezione di file in argomenti della riga di comando (nel blocco script del processo)
    script:
    def file_args = input_files.collect { file -> "--input ${file}" }.join(' ')
    """
    analysis_tool ${file_args} --output results.txt
    """
    
  3. Creare Funzioni Riutilizzabili: Avete imparato a estrarre logica complessa in funzioni con nome che possono essere chiamate dagli operatori dei canali, rendendo i flussi di lavoro più leggibili e manutenibili.

    • Definire una funzione con nome
    def separateMetadata(row) {
        def sample_meta = [ /* code hidden for brevity */ ]
        def fastq_path = file(row.file_path)
        def m = (fastq_path.name =~ /^(.+)_S(\d+)_L(\d{3})_(R[12])_(\d{3})\.fastq(?:\.gz)?$/)
        def file_meta = m ? [ /* code hidden for brevity */ ] : [:]
        def priority = sample_meta.quality > 40 ? 'high' : 'normal'
    
        return tuple(sample_meta + file_meta + [priority: priority], fastq_path)
    }
    
    • Chiamare la funzione con nome in un flusso di lavoro
    workflow {
        ch_samples = channel.fromPath("./data/samples.csv")
            .splitCsv(header: true)
            .map{ row -> separateMetadata(row) }
    
        ch_fastp = FASTP(ch_samples)
    }
    
  4. Direttive Dinamiche delle Risorse con le Closure: Avete esplorato l'uso delle closure nelle direttive dei processi per l'allocazione adattiva delle risorse in base alle caratteristiche dell'input.

    • Closure con nome e composizione
    def enrichData = normalizeId >> addQualityCategory >> addFlags
    def processor = generalFunction.curry(fixedParam)
    
    • Closure con accesso allo scope
    def collectStats = { data -> stats.count++; return data }
    
  5. Logica Condizionale e Controllo dei Processi: Avete aggiunto routing intelligente usando gli operatori .branch() e .filter(), sfruttando la truthiness per espressioni condizionali concise.

    • Usare .branch() per instradare i dati attraverso diversi rami del flusso di lavoro
    trim_branches = ch_samples
    .branch { meta, reads ->
        fastp: meta.organism == 'human' && meta.depth >= 30000000
        trimgalore: true
    }
    
    ch_fastp = FASTP(trim_branches.fastp)
    ch_trimgalore = TRIMGALORE(trim_branches.trimgalore)
    
    • Valutazione booleana con la Groovy Truth
    if (sample.files) println "Has files"
    
    • Usare filter() per selezionare sottoinsiemi di dati con la 'truthiness'
    ch_valid_samples = ch_samples
        .filter { meta, reads ->
            meta.id && meta.organism && meta.depth >= 25000000
        }
    
  6. Operatori di Navigazione Sicura ed Elvis: Avete reso la pipeline robusta contro i dati mancanti usando ?. per l'accesso null-safe alle proprietà e ?: per fornire valori predefiniti.

    def id = data?.sample?.id ?: 'unknown'
    
  7. Validazione con error() e log.warn: Avete imparato a validare gli input in anticipo e a fallire velocemente con messaggi di errore chiari.

    try {
        def errors = validateSample(sample)
        if (errors) throw new RuntimeException("Invalid: ${errors.join(', ')}")
    } catch (Exception e) {
        println "Error: ${e.message}"
    }
    
  8. Handler degli Eventi di Configurazione: Avete imparato a usare gli handler degli eventi del flusso di lavoro (onComplete e onError) per logging, notifiche e gestione del ciclo di vita.

    • Usare onComplete per il logging e le notifiche
    workflow.onComplete = {
        println "Success     : ${workflow.success}"
        println "exit status : ${workflow.exitStatus}"
    
        if (workflow.success) {
            println "✅ Pipeline completed successfully!"
        } else {
            println "❌ Pipeline failed!"
            println "Error: ${workflow.errorMessage}"
        }
    }
    
    • Usare onError per agire specificamente in caso di fallimento
    workflow.onError = {
        // Scrivere un log di errore dettagliato
        def error_file = file("${workflow.launchDir}/error.log")
        error_file.text = """
        Time: ${new Date()}
        Error: ${workflow.errorMessage}
        Error report: ${workflow.errorReport ?: 'No detailed report available'}
        """
    
        println "Error details written to: ${error_file}"
    }
    

Risorse aggiuntive

Assicuratevi di consultare queste risorse quando avete bisogno di esplorare funzionalità più avanzate.

Trarrete beneficio dalla pratica e dall'espansione delle vostre competenze per:

  • Scrivere flussi di lavoro più puliti con una correttaseparazione tra dataflow e scripting
  • Padroneggiare l'interpolazione di variabili per evitare le insidie comuni con le variabili Nextflow, Bash e shell
  • Usare le direttive dinamiche delle risorse per flussi di lavoro efficienti e adattativi
  • Trasformare le collezioni di file in argomenti della riga di comando correttamente formattati
  • Gestire con grazia le diverse convenzioni di denominazione dei file e i formati di input usando regex ed elaborazione delle stringhe
  • Costruire codice riutilizzabile e manutenibile usando pattern avanzati di closure e programmazione funzionale
  • Elaborare e organizzare dataset complessi usando le operazioni sulle collezioni
  • Aggiungere validazione, gestione degli errori e logging per rendere i vostri flussi di lavoro pronti per la produzione
  • Implementare la gestione del ciclo di vita del flusso di lavoro con gli handler degli eventi

Cosa c'è dopo?

Tornate al menu delle Side Quest o cliccate il pulsante in basso a destra della pagina per passare all'argomento successivo nell'elenco.