Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions tests/translators_loggers/Dockerfile.makeflow
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# docker build --platform amd64 -t wfcommons-dev -f Dockerfile.parsl .
# docker run -it --rm -v `pwd`:/home/wfcommons wfcommons-dev /bin/bash

FROM amd64/ubuntu:noble

LABEL org.containers.image.authors="[email protected]"

# update repositories
RUN apt-get update

# set timezone
RUN echo "America/Los_Angeles" > /etc/timezone && export DEBIAN_FRONTEND=noninteractive && apt-get install -y tzdata

# install useful stuff
RUN apt-get -y install pkg-config
RUN apt-get -y install git
RUN apt-get -y install wget
RUN apt-get -y install curl
RUN apt-get -y install make
RUN apt-get -y install cmake
RUN apt-get -y install cmake-data
RUN apt-get -y install sudo
RUN apt-get -y install vim --fix-missing
RUN apt-get -y install gcc
RUN apt-get -y install gcc-multilib

# Python stuff
RUN apt-get -y install python3 python3-pip
RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 1
RUN python3 -m pip install --break-system-packages pathos pandas filelock
RUN python3 -m pip install --break-system-packages networkx scipy matplotlib
RUN python3 -m pip install --break-system-packages pyyaml jsonschema requests
RUN python3 -m pip install --break-system-packages --upgrade setuptools

# Stress-ng
RUN apt-get -y install stress-ng

# Add wfcommons user
RUN useradd -ms /bin/bash wfcommons
RUN adduser wfcommons sudo
RUN echo '%sudo ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers
ENV PATH="$PATH:/home/wfcommons/.local/bin/"

USER wfcommons
WORKDIR /home/wfcommons

# Install Miniforge
RUN wget -O Miniforge3.sh "https://github.com/conda-forge/miniforge/releases/latest/download/Miniforge3-$(uname)-$(uname -m).sh" && \
bash Miniforge3.sh -b -p "${HOME}/conda" && \
rm Miniforge3.sh

# Make sure conda is available in each shell session
RUN echo "source ${HOME}/conda/etc/profile.d/conda.sh" >> ${HOME}/.bashrc
RUN echo "conda activate" >> ${HOME}/.bashrc

# Install necessary packages
RUN . ${HOME}/conda/etc/profile.d/conda.sh && \
conda activate base && \
conda install -c conda-forge ndcctools && \
conda clean --all -f -y

2 changes: 1 addition & 1 deletion tests/translators_loggers/build_docker_docker_images.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

set -e

for backend in "dask" "parsl" "nextflow" "airflow" "bash" "taskvine" "cwl" "pegasus" "swiftt"; do
for backend in "dask" "parsl" "nextflow" "airflow" "bash" "taskvine" "makeflow" "cwl" "pegasus" "swiftt"; do
echo "Building $backend Docker image..."
docker build --platform linux/amd64 -t wfcommons/wfcommons-testing-$backend -f Dockerfile.$backend .
done
15 changes: 14 additions & 1 deletion tests/translators_loggers/test_translators_loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import sys
import json
import time
import networkx
import re

from tests.test_helpers import _create_fresh_local_dir
from tests.test_helpers import _remove_local_dir_if_it_exists
Expand All @@ -31,6 +31,7 @@
from wfcommons.wfbench import AirflowTranslator
from wfcommons.wfbench import BashTranslator
from wfcommons.wfbench import TaskVineTranslator
from wfcommons.wfbench import MakeflowTranslator
from wfcommons.wfbench import CWLTranslator
from wfcommons.wfbench import PegasusTranslator
from wfcommons.wfbench import SwiftTTranslator
Expand Down Expand Up @@ -105,6 +106,7 @@ def _additional_setup_swiftt(container):
"airflow": noop,
"bash": noop,
"taskvine": _additional_setup_taskvine,
"makeflow": noop,
"cwl": noop,
"pegasus": _additional_setup_pegasus,
"swiftt": _additional_setup_swiftt,
Expand Down Expand Up @@ -161,6 +163,14 @@ def run_workflow_taskvine(container, num_tasks, str_dirpath):
assert (exit_code == 0)
assert (output.decode().count("completed") == num_tasks)

def run_workflow_makeflow(container, num_tasks, str_dirpath):
# Run the workflow (with full logging)
exit_code, output = container.exec_run(cmd=["bash", "-c", "source ~/conda/etc/profile.d/conda.sh && conda activate && makeflow --log-verbose ./workflow.makeflow"], stdout=True, stderr=True)
# Check sanity
assert (exit_code == 0)
num_completed_jobs = len(re.findall(r'job \d+ completed', output.decode()))
assert (num_completed_jobs == num_tasks)

def run_workflow_cwl(container, num_tasks, str_dirpath):
# Run the workflow!
# Note that the input file is hardcoded and Blast-specific
Expand Down Expand Up @@ -194,6 +204,7 @@ def run_workflow_swiftt(container, num_tasks, str_dirpath):
"airflow": run_workflow_airflow,
"bash": run_workflow_bash,
"taskvine": run_workflow_taskvine,
"makeflow": run_workflow_makeflow,
"cwl": run_workflow_cwl,
"pegasus": run_workflow_pegasus,
"swiftt": run_workflow_swiftt,
Expand All @@ -206,6 +217,7 @@ def run_workflow_swiftt(container, num_tasks, str_dirpath):
"airflow": AirflowTranslator,
"bash": BashTranslator,
"taskvine": TaskVineTranslator,
"makeflow": MakeflowTranslator,
"cwl": CWLTranslator,
"pegasus": PegasusTranslator,
"swiftt": SwiftTTranslator,
Expand All @@ -224,6 +236,7 @@ class TestTranslators:
"airflow",
"bash",
"taskvine",
"makeflow",
"cwl",
"pegasus",
])
Expand Down
2 changes: 1 addition & 1 deletion wfcommons/wfbench/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
# (at your option) any later version.

from .bench import WorkflowBenchmark
from .translator import AirflowTranslator, DaskTranslator, NextflowTranslator, ParslTranslator, PegasusTranslator, SwiftTTranslator, TaskVineTranslator, CWLTranslator, BashTranslator, PyCompssTranslator
from .translator import AirflowTranslator, DaskTranslator, NextflowTranslator, ParslTranslator, PegasusTranslator, SwiftTTranslator, TaskVineTranslator, MakeflowTranslator, CWLTranslator, BashTranslator, PyCompssTranslator
1 change: 1 addition & 0 deletions wfcommons/wfbench/translator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@
from .pycompss import PyCompssTranslator
from .swift_t import SwiftTTranslator
from .taskvine import TaskVineTranslator
from .makeflow import MakeflowTranslator
118 changes: 118 additions & 0 deletions wfcommons/wfbench/translator/makeflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024-2025 The WfCommons Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

import pathlib
import shutil

from logging import Logger
from typing import Optional, Union

from .abstract_translator import Translator
from ...common import Workflow

this_dir = pathlib.Path(__file__).resolve().parent

class MakeflowTranslator(Translator):
"""
A WfFormat parser for creating Makeflow workflow applications.

:param workflow: Workflow benchmark object or path to the workflow benchmark JSON instance.
:type workflow: Union[Workflow, pathlib.Path],
:param logger: The logger where to log information/warning or errors (optional).
:type logger: Logger
"""
def __init__(self,
workflow: Union[Workflow, pathlib.Path],
logger: Optional[Logger] = None) -> None:
"""Create an object of the translator."""
super().__init__(workflow, logger)
self._script = ""

def translate(self, output_folder: pathlib.Path) -> None:
"""
Translate a workflow benchmark description (WfFormat) into an actual workflow application.

:param output_folder: The path to the folder in which the workflow benchmark will be generated.
:type output_folder: pathlib.Path
"""

# Generate code
self._generate_code()

# write benchmark files
output_folder.mkdir(parents=True)
with open(output_folder.joinpath("workflow.makeflow"), "w") as fp:
fp.write(self._script)

# additional files
self._copy_binary_files(output_folder)
self._generate_input_files(output_folder)

# README file
self._write_readme_file(output_folder)

def _generate_code(self):
"""
Generate the Makeflow code

:return: the code
:rtype: str
"""
self._script = "# Makeflow workflow specification\n\n"
for task_name, task in self.workflow.tasks.items():
make_clause = ""
# output files
for output_file in task.output_files:
make_clause += f"data/{output_file.file_id} "
make_clause += ": "
# input files
for input_file in task.input_files:
make_clause += f"data/{input_file.file_id} "
make_clause += "\n"
# Command
make_clause += "\t"
make_clause += task.program + " "

input_spec = "\"["
for file in task.input_files:
input_spec += f"\\\\\"data/{file.file_id}\\\\\","
input_spec = input_spec[:-1] + "]\""

output_spec = "\"{"
for file in task.output_files:
output_spec += f"\\\\\"data/{file.file_id}\\\\\":{str(file.size)},"
output_spec = output_spec[:-1] + "}\""

args = []
for a in task.args:
if "--output-files" in a:
args.append(f"--output-files {output_spec}")
elif "--input-files" in a:
args.append(f"--input-files {input_spec}")
else:
args.append(a)

args = " ".join(f"{a}" for a in args)
make_clause += args + "\n"
self._script += make_clause + "\n\n"
return

def _write_readme_file(self, output_folder: pathlib.Path) -> None:
"""
Write the README file.

:param output_folder: The path of the output folder.
:type output_folder: pathlib.Path
"""
readme_file_path = output_folder.joinpath("README")
with open(readme_file_path, "w") as out:
out.write(f"In directory {str(output_folder)}:\n")
out.write(f" - The Makeflow input file: workflow.makeflow\n")
out.write(f" - Run the workflow: makeflow workflow.makeflow\n")
9 changes: 5 additions & 4 deletions wfcommons/wfbench/translator/taskvine.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def __init__(self,
logger: Optional[Logger] = None) -> None:
"""Create an object of the translator."""
super().__init__(workflow, logger)
self._script = ""
self.parsed_tasks = []
self.task_counter = 1
self.output_files_map = {}
Expand All @@ -45,16 +46,16 @@ def translate(self, output_folder: pathlib.Path) -> None:
:param output_folder: The path to the folder in which the workflow benchmark will be generated.
:type output_folder: pathlib.Path
"""
self.script = "# workflow tasks\n"
self._script = "# workflow tasks\n"

# add tasks per level
self.next_level = self.root_task_names.copy()
while self.next_level:
self.next_level = self._add_level_tasks(self.next_level)
self.script += "wait_for_tasks_completion()\n"
self._script += "wait_for_tasks_completion()\n"

# generate code
run_workflow_code = self._merge_codelines("templates/taskvine_template.py", self.script)
run_workflow_code = self._merge_codelines("templates/taskvine_template.py", self._script)

# generate Flowcept code
if self.workflow.workflow_id is not None:
Expand Down Expand Up @@ -156,7 +157,7 @@ def _add_task(self, task_name: str, parent_task: Optional[str] = None) -> list[s
args = " ".join(f"{a}" for a in args)

# write task
self.script += f"t_{self.task_counter} = vine.Task('{task.program} {args}')\n" \
self._script += f"t_{self.task_counter} = vine.Task('{task.program} {args}')\n" \
f"t_{self.task_counter}.set_cores(1)\n{task_script}"

self.task_counter += 1
Expand Down