Source code for hatchet.readers.timemory_reader

# Copyright 2017-2023 Lawrence Livermore National Security, LLC and other
# Hatchet Project Developers. See the top-level LICENSE file for details.
#
# SPDX-License-Identifier: MIT

import json
import pandas as pd
import os
import glob
from hatchet.graphframe import GraphFrame
from ..node import Node
from ..graph import Graph
from ..frame import Frame
from ..util.timer import Timer


[docs]class TimemoryReader: """Read in timemory JSON output""" def __init__(self, input, select=None, **_kwargs): """Arguments: input (str or file-stream or dict or None): Valid argument types are: 1. Filename for a timemory JSON tree file 2. Open file stream to one of these files 3. Dictionary from timemory JSON tree select (list of str): A list of strings which match the component enumeration names, e.g. ["cpu_clock"]. per_thread (boolean): Ensures that when applying filters to the graphframe, frames with identical name/file/line/etc. info but from different threads are not combined per_rank (boolean): Ensures that when applying filters to the graphframe, frames with identical name/file/line/etc. info but from different ranks are not combined """ self.graph_dict = {"timemory": {}} self.input = input self.default_metric = None self.timer = Timer() self.metric_cols = [] self.properties = {} self.include_tid = True self.include_nid = True self.multiple_ranks = False self.multiple_threads = False self.callpath_to_node_dict = {} # (callpath, rank, thread): <node_dict> self.callpath_to_node = {} # (callpath): <node> # the per_thread and per_rank settings make sure that # squashing doesn't collapse the threads/ranks self.per_thread = _kwargs["per_thread"] if "per_thread" in _kwargs else False self.per_rank = _kwargs["per_rank"] if "per_rank" in _kwargs else False if select is None: self.select = select elif isinstance(select, list): if select: def _get_select(val): if isinstance(val, str): return val.lower() elif callable(val): return val().lower() raise TypeError( "Items in select must be string or callable: {}".format( type(val).__name__ ) ) self.select = [_get_select(v) for v in select] else: raise TypeError("select must be None or list of string")
[docs] def create_graph(self): """Create graph and dataframe""" list_roots = [] def remove_keys(_dict, _keys): """Remove keys from dictionary""" if isinstance(_keys, str): if _keys in _dict: del _dict[_keys] else: for _key in _keys: _dict = remove_keys(_dict, _key) return _dict def add_metrics(_dict): """Add any keys to metric_cols which don't already exist""" for key, itr in _dict.items(): if key not in self.metric_cols: self.metric_cols.append(key) def process_regex(_data): """Process the regex data for func/file/line info""" _tmp = {} if _data is not None and len(_data.groups()) > 0: for _key in ("head", "func", "file", "line", "tail"): try: _val = _data.group(_key) if _val: _tmp[_key] = _val except Exception: pass return _tmp if _tmp else None def perform_regex(_prefix): """Performs a search for standard configurations of function + file + line""" import re _tmp = None for _pattern in [ # [func][file] r"(^\[)(?P<func>.*)(\]\[)(?P<file>.*)(\]$)", # label [func/file:line] r"(?P<head>.+)([ \t]+)\[(?P<func>\S+)([/])(?P<file>\S+):(?P<line>[0-9]+)\]$", # func@file:line/tail # func/file:line/tail r"(?P<func>\S+)([@/])(?P<file>\S+):(?P<line>[0-9]+)[/]*(?P<tail>.*)", # func@file/tail # func/file/tail r"(?P<func>\S+)([@/])(?P<file>\S+)([/])(?P<tail>.*)", # func:line/tail r"(?P<func>\S+):(?P<line>[0-9]+)([/]*)(?P<tail>.*)", ]: _tmp = process_regex(re.search(_pattern, _prefix)) if _tmp: break return _tmp if _tmp else None def get_name_line_file(_prefix): """Get the standard set of dictionary entries. Also, parses the prefix for func-file-line info which is typically in the form: <FUNC>@<FILE>:<LINE>/... <FUNC>/<FILE>:<LINE>/... <SOURCE> [<FUNC>/<FILE>:<LINE>] """ _keys = { "type": "region", "name": _prefix, } _extra = {"file": "<unknown>", "line": "0"} _pdict = perform_regex(_prefix) if _pdict is not None: if "head" in _pdict: _keys["name"] = _pdict["head"].rstrip() _extra["line"] = _pdict["line"] _extra["file"] = _pdict["file"] else: _keys["name"] = _pdict["func"] _extra["file"] = ( _pdict["file"] if "file" in _pdict else "<unknown file>" ) if "line" in _pdict: _extra["line"] = _pdict["line"] if "tail" in _pdict: _keys["name"] = "{}/{}".format(_keys["name"], _pdict["tail"]) return (_keys, _extra) def format_labels(_labels): """Formats multi dimensional metrics which refer to multiple metrics stored in a 1D list. Example: PAPI_TOT_CYC, PAPI_TOT_INS, and PAPI_L2_TCM are stored as ["Total_cycles", "Instr_completed", "L2_cache_misses"]. After formatting: ['Total-cycles', 'Instr-completed', 'L2-cache-misses'] """ _ret = [] if isinstance(_labels, str): # put in a list if the label is a string. _ret = [_labels] elif isinstance(_labels, dict): for _key, _item in _labels.items(): _ret.append(_key.strip().replace(" ", "-").replace("_", "-")) elif isinstance(_labels, list) or isinstance(_labels, tuple): for _item in _labels: _ret.append(_item.strip().replace(" ", "-").replace("_", "-")) return _ret def match_labels_and_values(_metric_stats, _metric_label, _metric_type): """Match metric labels with values and add '(inc)' if the metric type is inclusive. _metric_stat example 1: {'sum': 0.010, 'min': 0.001, ...} _metric_stat example 2: {'sum': [0.010, 0.020, 0.030], ...} _metric_label example 1: wall_clock _metric_label example 2: ['Total-cycles', 'Instr-completed', 'L2-cache-misses'] _metric_type: ' (inc)' or '' """ _ret = {} for _key, _item in _metric_stats.items(): if isinstance(_item, dict): for i, (k, v) in enumerate(_item.items()): _ret["{}.{}{}".format(_key, _metric_label[i], _metric_type)] = v # match with metric labels if _metric_stat item is a list. elif isinstance(_item, list): for i in range(len(_item)): _ret[ "{}.{}{}".format(_key, _metric_label[i], _metric_type) ] = _item[i] # check if _metric_stat item is not a dict or list else: _ret["{}.{}{}".format(_key, _metric_label, _metric_type)] = _item return _ret def collapse_ids(_obj, _expect_scalar=False): """node/rank/thread id may be int, array of ints, or None. When the entry is a list of integers (which happens when metric values are aggregates of multiple ranks/threads), this function generates a consistent form which is NOT numerical to avoid `groupby(...).sum()` operations from producing something nonsensical (i.e. adding together thread-ids) but ensures the entry is still hashable (i.e. a list of integers is not hashable and will cause `groupby(...).sum()` to throw an error) Arguments: _obj (int or list of ints): The node/rank/thread id(s) for the metric. If a list is provided, this implies that the metric values are aggregations from multiple nodes/ranks/threads _expect_scalar (bool): Setting this value to true means that `_obj` is expected to be an integer and the return value should be converted to an integer. If this value is true and an array of ints is passed, an error will be thrown Return Value: if _expect_scalar is False: string if _expect_scalar is True: int """ if isinstance(_obj, list): if len(_obj) == 1: return int(_obj[0]) else: if _expect_scalar: raise ValueError( f"collapse_ids expected per-rank or per-thread values but list of ids ({_obj}) implies that data is aggregated across multiple ranks or threads" ) return ",".join([f"{x}" for x in _obj]).strip(",") elif _obj is not None: return f"{_obj}" if _expect_scalar else int(_obj) return None def parse_node(_metric_name, _node_data, _hparent, _rank, _parent_callpath): """Create callpath_to_node_dict for one node and then call the function recursively on all children. """ # If the hash is zero, that indicates that the node # is a dummy for the root or is used for synchronizing data # between multiple threads # TODO: do we have some intermediate nodes that have hash = 0? if _node_data["node"]["hash"] == 0: if "children" in _node_data: for _child in _node_data["children"]: parse_node( _metric_name, _child, _hparent, _rank, _parent_callpath, ) return _prop = self.properties[_metric_name] _frame_attrs, _extra = get_name_line_file(_node_data["node"]["prefix"]) callpath = _parent_callpath + (_frame_attrs["name"],) # check if the node already exits. _hnode = self.callpath_to_node.get(callpath) if _hnode is None: # connect with the parent during node creation. _hnode = Node(Frame(_frame_attrs), _hparent) self.callpath_to_node[callpath] = _hnode if _hparent is None: # if parent is none, this is a root node. list_roots.append(_hnode) else: # if parent is not none, add as a child. _hparent.add_child(_hnode) # by placing the thread-id or rank-id in _frame_attrs, the hash # for the Frame(_keys) effectively circumvent Hatchet's # default behavior of combining similar thread/rank entries _tid_dict = _frame_attrs if self.per_thread else _extra _rank_dict = _frame_attrs if self.per_rank else _extra # handle the rank _rank_dict["rank"] = collapse_ids(_rank, self.per_rank) if _rank_dict["rank"] is None: del _rank_dict["rank"] self.include_nid = False # extract some relevant data _tid_dict["thread"] = collapse_ids( _node_data["node"]["tid"], self.per_thread ) _extra["pid"] = collapse_ids(_node_data["node"]["pid"], False) _extra["count"] = _node_data["node"]["inclusive"]["entry"]["laps"] # check if there are multiple threads # TODO: move this outside if don't have per thread data in timemory if not self.multiple_threads: if _tid_dict["thread"] != 0: self.multiple_threads = True # this is the name for the metrics _labels = None if "type" not in _prop else _prop["type"] # if the labels are not a single string, they are multi-dimensional _metrics_in_vector = True if not isinstance(_labels, str) else False # remove some spurious data from inclusive/exclusive stats _remove = ["cereal_class_version", "count"] _inc_stats = remove_keys(_node_data["node"]["inclusive"]["stats"], _remove) _exc_stats = remove_keys(_node_data["node"]["exclusive"]["stats"], _remove) # if multi-dimensions, create alternative "sum.<...>", etc. labels + data # add " (inc)" to the end of every column that represents an inclusive stat if _metrics_in_vector: # Example of a multi-dimensional output: if we have 3 papi events # PAPI_TOT_CYC, PAPI_TOT_INS, PAPI_L2_TCM: # _metric_labels: ["Total_cycles", "Instr_completed", "L2_cache_misses"] # _exc_stats -> "sum": [8301.0, 4910.0, 275.0], _metric_labels = format_labels(_labels) _exc_stats = match_labels_and_values(_exc_stats, _metric_labels, "") _inc_stats = match_labels_and_values( _inc_stats, _metric_labels, " (inc)" ) else: # add metric name and type. # Example: sum -> sum.wall_clock (inc) _inc_stats = match_labels_and_values(_inc_stats, _metric_name, " (inc)") # Example: sum -> sum.wallclock _exc_stats = match_labels_and_values(_exc_stats, _metric_name, "") # add the inclusive and exclusive columns to the list of relevant column names add_metrics(_exc_stats) add_metrics(_inc_stats) # we use callpath_to_node_dict instead of directly # using node_dicts to be able to merge metrics. # We use its values later as node_dicts. # (callpath, rank, thread): <node_dict> callpath_rank_thread = tuple((callpath, _rank, _tid_dict["thread"])) node_dict = self.callpath_to_node_dict.get(callpath_rank_thread) # check if we saw this (callpath, rank, thread) before if node_dict is None: # if no, create a new dict. self.callpath_to_node_dict[callpath_rank_thread] = dict( {"node": _hnode, **_frame_attrs}, **_extra, **_exc_stats, **_inc_stats, ) else: # if yes, don't create a new dict, just add the new metrics to # the existing node_dict using update(). # we are doing this to combine different metrics on a single dataframe. self.callpath_to_node_dict[callpath_rank_thread].update( dict(**_exc_stats, **_inc_stats) ) # recursion if "children" in _node_data: for _child in _node_data["children"]: parse_node(_metric_name, _child, _hnode, _rank, callpath) def read_graph(_metric_name, ranks_data, _rank): """The layout of the graph at this stage is subject to slightly different structures based on whether distributed memory parallelism (DMP) (e.g. MPI, UPC++) was supported and active Returns the last rank (_idx). """ rank = None total_ranks = len(ranks_data) for i in range(total_ranks): # rank_data stores all the graph/cct data of a rank # starting from the first node in the cct. rank_data = ranks_data[i] rank = None if _rank is None else i + _rank if isinstance(rank_data, list): for data in rank_data: if len(data["children"]) != 0: # empty tuple represents the parent callpath for the root node. # third parameter is the parent node. It's none for the root node. parse_node( _metric_name, data, None, rank, tuple(), ) else: if len(rank_data["children"]) != 0: # empty tuple represents the parent callpath for the root node. # third parameter is the parent node. It's none for the root node. parse_node(_metric_name, rank_data, None, rank, tuple()) if total_ranks > 0: return True return False def read_properties(properties, _metric_name, _metric_data): """Read in the properties for a component. This contains information on the type of the component, a description, a unit_value relative to the standard, a unit label, whether the data is only relevant per-thread, the number of MPI and/or UPC++ ranks (some results can theoretically use both UPC++ and MPI), the number of threads in the application, and the total number of processes """ if _metric_name not in properties: properties[_metric_name] = {} try: properties[_metric_name]["properties"] = remove_keys( _metric_data["properties"], "cereal_class_version" ) except KeyError: pass for p in ( "type", "description", "unit_value", "unit_repr", "thread_scope_only", "mpi_size", "upcxx_size", "thread_count", "process_count", ): if ( p not in properties[_metric_name] or properties[_metric_name][p] is None ): if p in _metric_data: properties[_metric_name][p] = _metric_data[p] else: properties[_metric_name][p] = None # graph_dict[timemory] stores all metric data. # each metric data is another item in this dict. for metric_name, metric_data in self.graph_dict["timemory"].items(): # strip out the namespace if provided metric_name = ( metric_name.replace("tim::", "").replace("component::", "").lower() ) # check for selection if self.select is not None and metric_name not in self.select: continue # read in properties read_properties(self.properties, metric_name, metric_data) # if no DMP supported if "graph" in metric_data: read_graph(metric_name, metric_data["graph"], None) else: # read in MPI results if "mpi" in metric_data: self.multiple_ranks = read_graph(metric_name, metric_data["mpi"], 0) # if MPI and UPC++, report ranks # offset by MPI_Comm_size _rank = self.properties[metric_name]["mpi_size"] _rank = 0 if _rank is None else int(_rank) if "upc" in metric_data: self.multiple_ranks = read_graph( metric_name, metric_data["upc"], _rank ) elif "upcxx" in metric_data: self.multiple_ranks = read_graph( metric_name, metric_data["upcxx"], _rank ) # create the graph of the roots graph = Graph(list_roots) graph.enumerate_traverse() # separate out the inclusive vs. exclusive columns exc_metrics = [] inc_metrics = [] for column in self.metric_cols: if column.endswith(" (inc)"): inc_metrics.append(column) else: exc_metrics.append(column) # set the default metric if self.default_metric is None: if len(exc_metrics) > 0: if "sum.wall_clock" in exc_metrics: self.default_metric = "sum.wall_clock" elif "sum.cpu_clock" in exc_metrics: self.default_metric = "sum.cpu_clock" else: self.default_metric = exc_metrics[0] elif len(inc_metrics) > 0: self.default_metric = inc_metrics[0] else: self.default_metric = "sum" node_dicts = list(self.callpath_to_node_dict.values()) dataframe = pd.DataFrame(data=node_dicts) indices = [] # Set indices according to rank/thread numbers. if self.multiple_ranks and self.multiple_threads: indices = ["node", "rank", "thread"] elif self.multiple_ranks: dataframe.drop(columns=["thread"], inplace=True) indices = ["node", "rank"] elif self.multiple_threads: dataframe.drop(columns=["rank"], inplace=True) indices = ["node", "thread"] else: indices = ["node"] dataframe.set_index(indices, inplace=True) dataframe.sort_index(inplace=True) # Fill the missing ranks # After unstacking and iterating over rows, there # will be "NaN" values for some ranks. Find the first # rank that has notna value and use it for other rows/ranks # of the multiindex. # TODO: iterrows() is not the best way to iterate over rows. if self.multiple_ranks or self.multiple_threads: dataframe = dataframe.unstack() for idx, row in dataframe.iterrows(): # There is always a valid name for an index. # Take that valid name and assign to other ranks/rows. name = row["name"][row["name"].first_valid_index()] dataframe.loc[idx, "name"] = name # Sometimes there is no file information. if row["file"].first_valid_index() is not None: file = row["file"][row["file"].first_valid_index()] dataframe.loc[idx, "file"] = file # Sometimes there is no file information. if row["type"].first_valid_index() is not None: file = row["type"][row["type"].first_valid_index()] dataframe.loc[idx, "type"] = file # Fill the rest with 0 dataframe.fillna(0, inplace=True) # Stack the dataframe dataframe = dataframe.stack() return GraphFrame( graph, dataframe, exc_metrics, inc_metrics, self.default_metric )
[docs] def read(self): """Read timemory json.""" # check if the input is a dictionary. if isinstance(self.input, dict): self.graph_dict = self.input # check if the input is a directory and get '.tree.json' files if true. elif os.path.isdir(self.input): tree_files = glob.glob(self.input + "/*.tree.json") for file in tree_files: # read all files that end with .tree.json. with open(file, "r") as f: # add all metrics to the same dict even though timemory # creates a separate file for each metric. self.graph_dict["timemory"].update(json.load(f)["timemory"]) # check if the input is a filename that ends in json elif isinstance(self.input, str) and self.input.endswith("json"): with open(self.input, "r") as f: self.graph_dict = json.load(f) elif not isinstance(self.input, str): self.graph_dict = json.loads(self.input.read()) else: raise TypeError("input must be dict, directory, json file, or string") return self.create_graph()