======================================= LEXIS Workflow Definition Documentation ======================================= Overview -------- The LEXIS Workflow Definition is a YAML-based format for defining computational workflows with job dependencies, data flow, and resource requirements. Workflows consist of multiple jobs that can run sequentially or in parallel across different computing clusters. Main Structure of YAML file --------------------------- :: id: string desc: string project_shortname: string metadata: mapping (optional) start_date: string end_date: string catchup: boolean jobs: mapping JOB_NAME_1: mapping requirements: mapping ... data_inputs: list (optional) ... data_outputs: list (optional) ... depends_on: list (optional) ... JOB_NAME_2: mapping requirements: mapping ... data_inputs: list (optional) ... data_outputs: list (optional) ... depends_on: list (optional) ... ... Thus, the workflow (YAML) file contains: - **id**: Unique workflow identifier - **desc**: Human-readable description - **project_shortname**: LEXIS Project identifier - **metadata**: Optional workflow-level settings - *start_date*: The timestamp (YYYY-MM-DDTHH:mm:ss) from which the scheduler will attempt to backfill. - *end_date*: A date (YYYY-MM-DDTHH:mm:ss) beyond which your DAG won't run, leave to None for open-ended scheduling. - *schedule*: Cron-like string to schedule the DAG. Other possible Airflow-like values are: "@once", "@continuous", "@hourly", "@daily", "@weekly", "@monthly", "@quarterly", "@yearly", "@annually". - *catchup*: Perform scheduler catchup (or only run latest)? Defaults to `false`. - **jobs**: Dictionary (mapping) of job definitions Job Definition -------------- Each job specifies: Requirements ~~~~~~~~~~~~ :: requirements: command_template_name: string locations: list - location_name: string location_resource: string location_node_type: string location_no_comp_units: integer location_parallelization_parameters: list (optional) - parallel_max_cores: integer (optional) parallel_mpi_processes: integer (optional) parallel_open_mp_threads: integer (optional) walltime_limit: integer template_parameter_values: list (optional) environment_variables: list (optional) - some_variable: some_value is_extra_long_job: boolean (optional) max_retries: integer (optional) - **command_template_name**: Command template to be executed - **environment_variables**: Environment variables used for a job execution. - **is_extra_long_job**: Flag for special long-running queues. Default is `false`. - **locations**: List of target computing locations (clusters) - *location_name*: Name of target location (cluster). - *location_resource*: Computational resource related to target location (cluster). - *location_node_type*: Node type (partition) realted to target location (cluster). - *location_no_comp_units*: Number of CPUs/GPUs (depends on location's node type/partition) to be used for a job execution. - *location_parallelization_parameters*: Parallel (MPI/OpenMP) configuration per nodes. - *parallel_max_cores*: Maximum CPUs per node. It has not exceed the number of `location_no_comp_units` in total. - *parallel_mpi_processes*: Number of MPI processes per node. - *parallel_open_mp_threads*: Number of OpenMP threads per node. - **walltime_limit**: Maximum execution time in seconds - **max_retries**: Retry attempts for a job execution (if it fails). Default value is `1`.` Other optinal requirements can be seen in LEXIS Datatypes specification. Data Inputs ~~~~~~~~~~~ :: data_inputs: list - source: string storage: mapping location_resource: string location_name: string target: string Data sources can be: - **source**: Source data to be transfer. Prefix has to be one of the following: - *ddi://* or *irods://*: To transfer a dataset from the DDI. The prefix should be followed by a dataset ID, and subsequent relative path in the dataset, i.e. `ddi://DATASET_ID/RELATIVE_PATH`. - *dspace://*: To transfer a dataset from external DSpace repository. Consider that DSpace is a type of flat storage space. The prefix is followed by the DSpace ID, i.e. `dspace://DSPACE_ID`. - *job://*: To transfer data from one job to the another one. The prefix should be followed by `job_name` defined in YAML file, and relative path, i.e. `job://JOB_NAME/RELATIVE_PATH`. - **storage**: Specifies storage resource and storage location which will be used while feching dataset. Information about storage name and resource can be found in the LEXIS Portal, e.g. in details of the dataset or in the list of project resources. - **target**: Relative path in target cluster.ě Data Outputs ~~~~~~~~~~~~ :: data_outputs: # Internal job-to-job transfer - source: string metadata: mapping $name: string # External transfer to iRODS (DDI) - source: string target: string storage: mapping location_resource: string location_name: string metadata: mapping title: string access: string datacite: mapping (mandatory, if dataset is public) titles: list - title: string creators: list - name: string publicationYear: integer publisher: mapping name: string types: mapping resourceType: string resourceTypeGeneral: string schemaVersion: string (e.g. http://datacite.org/schema/kernel-4) # External transfer to DSpace - source: string target: string storage: mapping location_resource: string location_name: string **Internal job-to-job transfer**: - **source**: Relative path in a job directory. - **metadata**: Annotation for the output data. - *$name*: Name of the dataset for the internal (only!) job-to-job transfer. **Important**: Properties beginning with **$** (like **$name**) are internal references used only for job-to-job data transfer within the workflow. These properties are not translated to the final DAG and will not appear in the dataset metadata after workflow execution. **External transfer to iRODS (DDI)**: - **source**: Relative path in a job directory. - **target**: Relative path within new/existing iRODS dataset. - *ddi://DATASET_ID/RELATIVE_PATH*: Upload data to existing dataset. - *ddi://~/RELATIVE_PATH*: Upload data to new dataset. - **storage**: Specifies storage resource and storage name (location) which will be used to store output dataset. Information about names can be found within your LEXIS project's resources in the LEXIS Portal. Should be one of iRODS related resources. - **metadata**: Output annotations including name and access level and optional name of storage resource and location - *title*: Title of the dataset. - *access*: Access policy to the dataset. One of the following: - *public*: For the public dataset. `datacite` object has to be defined! - *project*: Restrict access to the LEXIS project. - *user*: Restrict access to the user only who uploads the dataset. - *datacite*: Datacite object. **External transfer to DSpace**: - **source**: Relative path in a job directory. - **target**: DSpace collection ID should be specified, e.g. `dspace://85743`. - **storage**: Specifies storage resource and storage name (location) which will be used to store output dataset. Information about names can be found in your LEXIS project in the LEXIS Portal. Should be one of DSpace related resources. Dependencies ~~~~~~~~~~~~ :: depends_on: list - JOB_NAME_1 - JOB_NAME_2 Explicit job dependencies (optional, as data flow creates implicit dependencies). Simple Sequential Example ------------------------- :: id: simple_pipeline desc: Basic three-step processing pipeline project_shortname: myproject jobs: preprocess: requirements: command_template_name: cleaner locations: - location_name: cpu_cluster location_resource: CPU-XX-X location_node_type: cpu-part location_no_comp_units: 128 walltime_limit: 1800 data_inputs: - source: ddi://DATASET_ID/raw_data.tar target: input/ data_outputs: - source: clean/ metadata: $name: cleaned_data analyze: requirements: command_template_name: analyzer locations: - location_name: gpu_cluster location_resource: GPU-XX-X location_node_type: gpu-part location_no_comp_units: 16 walltime_limit: 3600 data_inputs: - source: job://preprocess/cleaned_data target: data/ data_outputs: - source: results/ metadata: $name: analysis_results report: requirements: command_template_name: reporter locations: - location_name: cpu_cluster location_resource: CPU-XX-X location_node_type: cpu-part location_no_comp_units: 128 walltime_limit: 1200 data_inputs: - source: job://analyze/analysis_results target: input/ data_outputs: - source: report.pdf target: ddi://~/final_report.pdf Parallel Processing Example --------------------------- :: id: parallel_workflow desc: Parallel data processing with final merge project_shortname: parallel_proj jobs: split_data: requirements: command_template_name: splitter locations: - location_name: prep_cluster location_resource: CPU-XX-X location_node_type: cpu-part location_no_comp_units: 128 walltime_limit: 1800 data_inputs: - source: ddi://DATASET_ID/big_dataset.tar target: source/ data_outputs: - source: chunk1/ metadata: $name: data_chunk_1 - source: chunk2/ metadata: $name: data_chunk_2 - source: chunk3/ metadata: $name: data_chunk_3 process_chunk1: requirements: command_template_name: processor locations: - location_name: worker1 location_resource: CPU-XX-X location_node_type: cpu-part location_no_comp_units: 64 walltime_limit: 2400 data_inputs: - source: job://split_data/data_chunk_1 target: input/ data_outputs: - source: output/ metadata: $name: result_1 process_chunk2: requirements: command_template_name: processor locations: - location_name: worker2 location_resource: CPU-XX-X location_node_type: cpu-part location_no_comp_units: 64 walltime_limit: 2400 data_inputs: - source: job://split_data/data_chunk_2 target: input/ data_outputs: - source: output/ metadata: $name: result_2 process_chunk3: requirements: command_template_name: processor locations: - location_name: worker3 location_resource: CPU-XX-X location_node_type: cpu-part location_no_comp_units: 64 walltime_limit: 2400 data_inputs: - source: job://split_data/data_chunk_3 target: input/ data_outputs: - source: output/ metadata: $name: result_3 merge_results: requirements: command_template_name: merger locations: - location_name: merge_cluster location_resource: CPU-XX-X location_node_type: cpu-part location_no_comp_units: 128 walltime_limit: 1800 data_inputs: - source: job://process_chunk1/result_1 target: input1/ - source: job://process_chunk2/result_2 target: input2/ - source: job://process_chunk3/result_3 target: input3/ data_outputs: - source: merged/ target: ddi://~/final_results.tar Integration with Airflow ------------------------ Workflows integrate with Apache Airflow using the translator:: from lexis.operators.aai import LexisAAIOperator from yaml2dag.lexis_workflow_definition import LexisWorkflowTranslator t = LexisWorkflowTranslator( yaml_path="workflow.yaml", log_level=DEBUG ) dag = t.build(override_tags=["resourcetest"]) Best Practices -------------- 1. Use descriptive job and output names 2. Use **$name** metadata for internal data flow references 3. Test workflows with small datasets first