FlowBench

flowbench.base module

Base class for the FlowBench dataset.

Author: PoSeiDon Team License: MIT

class flowbench.base.BaseBench(root, name=None, binary_labels=True, feature_option='v1', anomaly_cat='all', force_reprocess=False, shift_ts_by_node=True, include_hops=True, transform=None, pre_transform=None, pre_filter=None, **kwargs)

Bases: InMemoryDataset, ABC

Base class for the FlowBench dataset.

Parameters:
  • root (str) – Root for processing the dataset.

  • name (str, optional) – Name of workflow. Defaults to “1000genome”.

  • binary_labels (bool, optional) – Specify the problem as binary classification if True. Defaults to True.

  • feature_option (str, optional) – Specify the feature options. More detailed options are available in README.md. Defaults to “v1”.

  • anomaly_cat (str, optional) – Specify the anomaly category. Defaults to “all”.

  • force_reprocess (bool, optional) – Force to reprocess the parsed data if True. Defaults to False.

  • transform (callable, optional) – Module for transform operations. Defaults to None.

  • pre_transform (callable, optional) – Module for pre_transform operations. Defaults to None.

  • pre_filter (callable, optional) – Module for pre_filter operations. Defaults to None.

bytes_features = ['stage_in_bytes', 'stage_out_bytes']
delay_features = ['wms_delay', 'queue_delay', 'runtime', 'post_script_delay', 'stage_in_delay', 'stage_out_delay']
features = ['auxiliary', 'compute', 'transfer', 'is_clustered', 'ready', 'pre_script_start', 'pre_script_end', 'submit', 'execute_start', 'execute_end', 'post_script_start', 'post_script_end', 'wms_delay', 'pre_script_delay', 'queue_delay', 'runtime', 'post_script_delay', 'stage_in_delay', 'stage_in_bytes', 'stage_out_delay', 'stage_out_bytes', 'kickstart_executables_cpu_time', 'kickstart_status', 'kickstart_executables_exitcode']
kickstart_features = ['kickstart_executables_cpu_time', 'kickstart_status', 'kickstart_executables_exitcode', 'kickstart_online_iowait', 'kickstart_online_bytes_read', 'kickstart_online_bytes_written', 'kickstart_online_read_system_calls', 'kickstart_online_write_system_calls', 'kickstart_online_utime', 'kickstart_online_stime', 'kickstart_online_bytes_read_per_second', 'kickstart_online_bytes_written_per_second']
property processed_file_names

The name of the files in the self.processed_dir folder that must be present in order to skip processing.

Returns:

List of file names.

Return type:

list

ts_features = ['ready', 'submit', 'execute_start', 'execute_end', 'post_script_start', 'post_script_end']
url = '../data'

flowbench.dataset module

FlowBench dataset interface for graph and tabular data.

Author: PoSeiDon Team License: MIT

class flowbench.dataset.FlowDataset(root, name='1000genome', binary_labels=True, node_level=True, feature_option='v1', anomaly_cat='all', force_reprocess=False, transform=None, pre_transform=None, pre_filter=None, **kwargs)

Bases: InMemoryDataset

FlowBench dataset interface for graph and tabular data.

Parameters:
  • root (str) – Root for processing the dataset.

  • name (str, optional) – Name of workflow. Defaults to “1000genome”.

  • binary_labels (bool, optional) – Specify the problem as binary classification if True. Defaults to True.

  • feature_option (str, optional) – Specify the feature options. More detailed options are available in README.md. Defaults to “v1”.

  • anomaly_cat (str, optional) – Specify the anomaly category. Defaults to “all”.

  • force_reprocess (bool, optional) – Force to reprocess the parsed data if True. Defaults to False.

  • transform (callable, optional) – Module for transform operations. Defaults to None.

  • pre_transform (callable, optional) – Module for pre_transform operations. Defaults to None.

  • pre_filter (callable, optional) – Module for pre_filter operations. Defaults to None.

bytes_features = ['stage_in_bytes', 'stage_out_bytes']
delay_features = ['wms_delay', 'queue_delay', 'runtime', 'post_script_delay', 'stage_in_delay', 'stage_out_delay']
download()

Downloads the dataset to the self.raw_dir folder. Override torch_geometric.data.Dataset.

features = ['auxiliary', 'compute', 'transfer', 'is_clustered', 'ready', 'pre_script_start', 'pre_script_end', 'submit', 'execute_start', 'execute_end', 'post_script_start', 'post_script_end', 'wms_delay', 'pre_script_delay', 'queue_delay', 'runtime', 'post_script_delay', 'stage_in_delay', 'stage_in_bytes', 'stage_out_delay', 'stage_out_bytes', 'kickstart_executables_cpu_time', 'kickstart_status', 'kickstart_executables_exitcode']
kickstart_features = ['kickstart_executables_cpu_time', 'kickstart_status', 'kickstart_executables_exitcode', 'kickstart_online_iowait', 'kickstart_online_bytes_read', 'kickstart_online_bytes_written', 'kickstart_online_read_system_calls', 'kickstart_online_write_system_calls', 'kickstart_online_utime', 'kickstart_online_stime', 'kickstart_online_bytes_read_per_second', 'kickstart_online_bytes_written_per_second']
property num_edges_per_graph
property num_node_attributes
property num_node_labels
property num_nodes_per_graph
process()

Processes the raw data to the graphs and saves it in self.processed_dir.

property processed_dir
property processed_file_names

The name of the files in the self.processed_dir folder that must be present in order to skip processing.

Returns:

List of file names.

Return type:

list

property raw_dir
property raw_file_names

A list of files in the self.raw_dir which needs to be found in order to skip the download.

Returns:

List of file names.

Return type:

list

real_dir = '/home/runner/work/FlowBench/FlowBench/flowbench'
relative_path = '/home/runner/work/FlowBench/FlowBench/flowbench/../data'
to_graph_level()

Convert the node_level data to graph_level setting.

ts_features = ['ready', 'submit', 'execute_start', 'execute_end', 'post_script_start', 'post_script_end']
url = 'file:///home/runner/work/FlowBench/FlowBench/flowbench/../data'
class flowbench.dataset.MergeFlowDataset(root, name='all', binary_labels=True, node_level=True, feature_option='v1', anomaly_cat='all', force_reprocess=False, transform=None, pre_transform=None, pre_filter=None, **kwargs)

Bases: InMemoryDataset

A merged dataset of multiple FlowBench datasets. # TODO: verify the MergeFlowBench class

process()

Processes the dataset to the self.processed_dir folder.

property processed_dir
property processed_file_names

The name of the files in the self.processed_dir folder that must be present in order to skip processing.

Returns:

List of file names.

Return type:

list

flowbench.dataset.filter_dataset(dataset, anomaly_cat)

Filter the dataset based on the anomaly category.

Parameters:
  • dataset (FlowBench) – The dataset to be filtered.

  • anomaly_cat (str) – The anomaly category to be filtered.

flowbench.losses module

class flowbench.losses.FocalLoss(gamma=0, alpha=None, size_average=True)

Bases: Module

Focal loss for classification tasks.

\[FL(p_t) = -\alpha_t (1 - p_t)^{\gamma} \log(p_t)\]
Parameters:
  • gamma (float, optional) – Focusing parameter. Default: 0

  • alpha (float, optional) – Weighting parameter. Default: None

  • size_average (bool, optional) – Average the loss over the batch. Default: True

forward(input, target)

Define the computation performed at every call.

Should be overridden by all subclasses.

Note

Although the recipe for forward pass needs to be defined within this function, one should call the Module instance afterwards instead of this since the former takes care of running the registered hooks while the latter silently ignores them.

flowbench.losses.focal_loss(input, target, alpha, gamma, size_average, **kwargs)

Compute the focal loss between the input and target tensors.

Parameters:
  • input (Tensor) – The input tensor.

  • target (Tensor) – The target tensor.

  • alpha (Tensor) – The alpha tensor for class balancing.

  • gamma (float) – The gamma value for focal loss.

  • size_average (bool) – Whether to average the loss.

Returns:

The computed focal loss.

Return type:

Tensor

flowbench.metrics module

flowbench.metrics.eval_accuracy(y_true, y_pred, **kwargs)

Accuracy score for binary classification.

Parameters:
  • y_true (array-like) – Ground truth (true labels vector).

  • y_pred (array-like) – Predicted labels, as returned by a classifier.

Returns:

Accuracy score.

Return type:

float

flowbench.metrics.eval_average_precision(y_true, y_pred, **kwargs)

Average precision score for binary classification.

Parameters:
  • y_true (array-like) – Ground truth (true labels vector).

  • y_pred (array-like) – Predicted labels, as returned by a classifier.

Returns:

Average precision score.

Return type:

float

flowbench.metrics.eval_f1(y_true, y_pred, **kwargs)

F1 score for binary classification.

Parameters:
  • y_true (array-like) – Ground truth (true labels vector).

  • y_pred (array-like) – Predicted labels, as returned by a classifier.

Returns:

F1 score.

Return type:

float

flowbench.metrics.eval_metrics(y_true, y_pred, metric=None, average='weighted', **kwargs)

Evaluate the models

Parameters:
  • y_true (np.array) – True y labels.

  • y_pred (np.array) – Predicted y labels.

  • metric (str, optional) – Option of [‘acc’, ‘f1’, ‘prec’, ‘roc_auc’, ‘conf_mat’]. Defaults to None, which eval all metrics

  • average (str, optional) – This parameter is required for multiclass/multilabel targets. Defaults to “weighted”.

Returns:

metric results

Return type:

dict or float

flowbench.metrics.eval_precision(y_true, y_pred, **kwargs)

Precision score for binary classification.

Parameters:
  • y_true (array-like) – Ground truth (true labels vector).

  • y_pred (array-like) – Predicted labels, as returned by a classifier.

Returns:

Precision score.

Return type:

float

flowbench.metrics.eval_precision_at_k(y_true, y_pred, k=None)

Precision score for top k instances with the highest outlier scores.

Parameters:
  • y_true (array-like) – Ground truth (true labels vector).

  • y_pred (array-like) – Predicted labels, as returned by a classifier.

  • k (int, optional) – Top K instances to be considered.

Returns:

Precision at k (between 0 and 1).

Return type:

float

flowbench.metrics.eval_recall(y_true, y_pred, **kwargs)

Recall score for binary classification.

Parameters:
  • y_true (array-like) – Ground truth (true labels vector).

  • y_pred (array-like) – Predicted labels, as returned by a classifier.

Returns:

Recall score.

Return type:

float

flowbench.metrics.eval_recall_at_k(y_true, y_pred, k=None)

Recall score for top k instances with the highest outlier scores.

Parameters:
  • y_true (array-like) – Ground truth (true labels vector).

  • y_pred (array-like) – Predicted labels, as returned by a classifier.

  • k (int, optional) – Top K instances to be considered.

Returns:

Recall score at k (between 0 and 1).

Return type:

float

flowbench.metrics.eval_roc_auc(y_true, y_pred, **kwargs)

ROC-AUC score for binary classification.

Parameters:
  • y_true (array-like) – Ground truth (true labels vector).

  • y_pred (array-like) – Predicted labels, as returned by a classifier.

Returns:

ROC-AUC score.

Return type:

float

flowbench.transforms module

Transformations for graphs.

  • Column wise standardization of node features.

  • Column wise min-max normalization of node features.

class flowbench.transforms.CustomizeNormalizeFeatures(attrs: List[str] = ['x'], dim: int = 1)

Bases: BaseTransform

Column-normalizes the attributes given in attrs to sum-up to one (functional name: customize_normalize_features).

Parameters:
  • attrs (List[str]) – The names of attributes to normalize. (default: ["x"])

  • dim (int) – The axis for normalization 0 (column-wise). 1(row-wise).

class flowbench.transforms.MinMaxNormalizeFeatures(attrs: List[str] = ['x'], min: int = 0, max: int = 1)

Bases: BaseTransform

Min-max normalizes the attributes given in attrs to scale between 0 and 1. (functional name: minmax_normalize_features). :param attrs: The names of attributes to normalize. Defaults to [“x”]. :type attrs: List[str], optional

class flowbench.transforms.MyFilter

Bases: object

Filter class for the dataset.

Parameters:

data (Data) – The data object.

Example

return data.num_nodes > 1 will return a boolean list of whether the graph has more than 1 node.

flowbench.utils module

flowbench.utils.create_dir(path)

Create a dir where the processed data will be stored

Parameters:

path (str) – Path to create the folder.

flowbench.utils.parse_adj(workflow)

Processing adjacency file.

Parameters:

workflow (str) – Workflow name.

Raises:

NotImplementedError – No need to process the workflow all.

Returns:

(dict, list)

dict: Dictionary of nodes. list: List of directed edges.

Return type:

tuple

flowbench.version module

Module contents

FlowBench: A benchmarking framework for scientific workflows.

Author: PoSeiDon Team License: MIT

flowbench.list_workflows()

List of available workflows in FlowBench.

Returns:

List of workflow names.

Return type:

list