Source code for hatchet.readers.caliper_reader

# Copyright 2017-2023 Lawrence Livermore National Security, LLC and other
# Hatchet Project Developers. See the top-level LICENSE file for details.
#
# SPDX-License-Identifier: MIT

import json
import sys
import re
import subprocess
import os
import math

import pandas as pd
import numpy as np

import hatchet.graphframe
from hatchet.node import Node
from hatchet.graph import Graph
from hatchet.frame import Frame
from hatchet.util.timer import Timer
from hatchet.util.executable import which

unknown_label_counter = 0


[docs]class CaliperReader: """Read in a Caliper file (`cali` or split JSON) or file-like object.""" def __init__(self, filename_or_stream, query=""): """Read from Caliper files (`cali` or split JSON). Args: filename_or_stream (str or file-like): name of a `cali` or `cali-query` split JSON file, OR an open file object query (str): cali-query arguments (for cali file) """ self.filename_or_stream = filename_or_stream self.filename_ext = "" self.query = query self.node_ordering = False self.json_data = {} self.json_cols = {} self.json_cols_mdata = {} self.json_nodes = {} self.metadata = {} self.idx_to_label = {} self.idx_to_node = {} self.timer = Timer() self.nid_col_name = "nid" if isinstance(self.filename_or_stream, str): _, self.filename_ext = os.path.splitext(filename_or_stream)
[docs] def read_json_sections(self): # if cali-query exists, extract data from .cali to a file-like object if self.filename_ext == ".cali": cali_query = which("cali-query") if not cali_query: raise ValueError("from_caliper() needs cali-query to query .cali file") cali_json = subprocess.Popen( [cali_query, "-q", self.query, self.filename_or_stream], stdout=subprocess.PIPE, ) self.filename_or_stream = cali_json.stdout # if filename_or_stream is a str, then open the file, otherwise # directly load the file-like object if isinstance(self.filename_or_stream, str): with open(self.filename_or_stream) as cali_json: json_obj = json.load(cali_json) else: json_obj = json.loads(self.filename_or_stream.read().decode("utf-8")) # read various sections of the Caliper JSON file self.json_data = json_obj["data"] self.json_cols = json_obj["columns"] self.json_cols_mdata = json_obj["column_metadata"] self.json_nodes = json_obj["nodes"] # read run metadata: all top-level elements in the json object that aren't # one of the above sections are run metadata skip = ["data", "columns", "column_metadata", "nodes"] keys = [k for k in json_obj.keys() if k not in skip] self.metadata = {k: json_obj[k] for k in keys} # decide which column to use as the primary path hierarchy # first preference to callpath if available if "source.function#callpath.address" in self.json_cols: self.path_col_name = "source.function#callpath.address" self.node_type = "function" if "path" in self.json_cols: self.both_hierarchies = True else: self.both_hierarchies = False elif "path" in self.json_cols: self.path_col_name = "path" self.node_type = "region" self.both_hierarchies = False else: sys.exit("No hierarchy column in input file") # remove data entries containing None in `path` column (null in json file) # first, get column where `path` data is # then, parse json_data list of lists to identify lists containing None in # `path` column path_col = self.json_cols.index(self.path_col_name) entries_to_remove = [] for sublist in self.json_data: if sublist[path_col] is None: entries_to_remove.append(sublist) # then, remove them from the json_data list for i in entries_to_remove: self.json_data.remove(i) # change column names for idx, item in enumerate(self.json_cols): if item == self.path_col_name: # this column is just a pointer into the nodes section self.json_cols[idx] = self.nid_col_name # make other columns consistent with other readers if item == "mpi.rank": self.json_cols[idx] = "rank" if item == "module#cali.sampler.pc": self.json_cols[idx] = "module" if item == "sum#time.duration" or item == "sum#avg#sum#time.duration": self.json_cols[idx] = "time" if ( item == "inclusive#sum#time.duration" or item == "sum#avg#inclusive#sum#time.duration" ): self.json_cols[idx] = "time (inc)" # make list of metric columns self.metric_columns = [] for idx, item in enumerate(self.json_cols_mdata): if self.json_cols[idx] != "rank" and item["is_value"] is True: self.metric_columns.append(self.json_cols[idx])
[docs] def create_graph(self): list_roots = [] global unknown_label_counter order = -1 # find nodes in the nodes section that represent the path hierarchy for idx, node in enumerate(self.json_nodes): node_label = node["label"] if node_label == "": node_label = "UNKNOWN " + str(unknown_label_counter) unknown_label_counter += 1 self.idx_to_label[idx] = node_label if node["column"] == self.path_col_name: # If there is a node orderering, assign to the _hatchet_nid if "Node order" in self.json_cols: self.node_ordering = True order = self.json_data[idx][0] if "parent" not in node: # since this node does not have a parent, this is a root graph_root = Node( Frame({"type": self.node_type, "name": node_label}), hnid=order ) list_roots.append(graph_root) node_dict = { self.nid_col_name: idx, "name": node_label, "node": graph_root, } self.idx_to_node[idx] = node_dict else: parent_hnode = (self.idx_to_node[node["parent"]])["node"] hnode = Node( Frame({"type": self.node_type, "name": node_label}), hnid=order, ) parent_hnode.add_child(hnode) node_dict = { self.nid_col_name: idx, "name": node_label, "node": hnode, } self.idx_to_node[idx] = node_dict return list_roots
[docs] def read(self): """Read the caliper JSON file to extract the calling context tree.""" with self.timer.phase("read json"): self.read_json_sections() with self.timer.phase("graph construction"): list_roots = self.create_graph() # create a dataframe of metrics from the data section self.df_json_data = pd.DataFrame(self.json_data, columns=self.json_cols) # when an nid has multiple entries due to the secondary hierarchy # we need to aggregate them for each (nid, rank) groupby_cols = [self.nid_col_name] if "rank" in self.json_cols: groupby_cols.append("rank") if "sourceloc#cali.sampler.pc" in self.json_cols: groupby_cols.append("sourceloc#cali.sampler.pc") if self.both_hierarchies is True: # create dict that stores aggregation function for each column agg_dict = {} for idx, item in enumerate(self.json_cols_mdata): col = self.json_cols[idx] if col != "rank" and col != "nid": if item["is_value"] is True: agg_dict[col] = np.sum else: agg_dict[col] = lambda x: x.iloc[0] grouped = self.df_json_data.groupby(groupby_cols).aggregate(agg_dict) self.df_json_data = grouped.reset_index() # map non-numeric columns to their mappings in the nodes section for idx, item in enumerate(self.json_cols_mdata): if item["is_value"] is False and self.json_cols[idx] != self.nid_col_name: if self.json_cols[idx] == "sourceloc#cali.sampler.pc": # split source file and line number into two columns self.df_json_data["file"] = self.df_json_data[ self.json_cols[idx] ].apply( lambda x: re.match( r"(.*):(\d+)", self.json_nodes[x]["label"] ).group(1) ) self.df_json_data["line"] = self.df_json_data[ self.json_cols[idx] ].apply( lambda x: re.match( r"(.*):(\d+)", self.json_nodes[x]["label"] ).group(2) ) self.df_json_data.drop(self.json_cols[idx], axis=1, inplace=True) sourceloc_idx = idx elif self.json_cols[idx] == "path": # we will only reach here if path is the "secondary" # hierarchy in the data self.df_json_data["path"] = self.df_json_data["path"].apply( lambda x: None if (math.isnan(x)) else self.json_nodes[int(x)]["label"] ) else: self.df_json_data[self.json_cols[idx]] = self.df_json_data[ self.json_cols[idx] ].apply(lambda x: self.json_nodes[x]["label"]) # since we split sourceloc, we should update json_cols and # json_cols_mdata if "sourceloc#cali.sampler.pc" in self.json_cols: self.json_cols.pop(sourceloc_idx) self.json_cols_mdata.pop(sourceloc_idx) self.json_cols.append("file") self.json_cols.append("line") self.json_cols_mdata.append({"is_value": False}) self.json_cols_mdata.append({"is_value": False}) max_nid = self.df_json_data[self.nid_col_name].max() if "line" in self.df_json_data.columns: # split nodes that have multiple file:line numbers to have a child # each with a unique file:line number unique_nodes = self.df_json_data.groupby(self.nid_col_name) df_concat = [self.df_json_data] for nid, super_node in unique_nodes: line_groups = super_node.groupby("line") # only need to do something if there are more than one # file:line number entries for the node if len(line_groups.size()) > 1: sn_hnode = self.idx_to_node[nid]["node"] for line, line_group in line_groups: # create the node label file_path = (line_group.head(1))["file"].item() file_name = os.path.basename(file_path) node_label = file_name + ":" + line # create a new hatchet node max_nid += 1 idx = max_nid hnode = Node( Frame( {"type": "statement", "file": file_path, "line": line} ), sn_hnode, ) sn_hnode.add_child(hnode) node_dict = { self.nid_col_name: idx, "name": node_label, "node": hnode, } self.idx_to_node[idx] = node_dict # change nid of the original node to new node in place for index, row in line_group.iterrows(): self.df_json_data.loc[index, "nid"] = max_nid # add new row for original node node_copy = super_node.head(1).copy() for cols in self.metric_columns: node_copy[cols] = 0 df_concat.append(node_copy) # concatenate all the newly created dataframes with # self.df_json_data self.df_fixed_data = pd.concat(df_concat) else: self.df_fixed_data = self.df_json_data # create a dataframe with all nodes in the call graph self.df_nodes = pd.DataFrame.from_dict(data=list(self.idx_to_node.values())) # add missing intermediate nodes to the df_fixed_data dataframe if "rank" in self.json_cols: self.num_ranks = self.df_fixed_data["rank"].max() + 1 rank_list = range(0, self.num_ranks) # create a standard dict to be used for filling all missing rows default_metric_dict = {} for idx, item in enumerate(self.json_cols_mdata): if self.json_cols[idx] != self.nid_col_name: if item["is_value"] is True: default_metric_dict[self.json_cols[idx]] = 0 else: default_metric_dict[self.json_cols[idx]] = None # create a list of dicts, one dict for each missing row missing_nodes = [] for iteridx, row in self.df_nodes.iterrows(): # check if df_nodes row exists in df_fixed_data metric_rows = self.df_fixed_data.loc[ self.df_fixed_data[self.nid_col_name] == row[self.nid_col_name] ] if "rank" not in self.json_cols: if metric_rows.empty: # add a single row node_dict = dict(default_metric_dict) node_dict[self.nid_col_name] = row[self.nid_col_name] missing_nodes.append(node_dict) else: if metric_rows.empty: # add a row per MPI rank for rank in rank_list: node_dict = dict(default_metric_dict) node_dict[self.nid_col_name] = row[self.nid_col_name] node_dict["rank"] = rank missing_nodes.append(node_dict) elif len(metric_rows) < self.num_ranks: # add a row for each missing MPI rank present_ranks = metric_rows["rank"].values missing_ranks = [x for x in rank_list if x not in present_ranks] for rank in missing_ranks: node_dict = dict(default_metric_dict) node_dict[self.nid_col_name] = row[self.nid_col_name] node_dict["rank"] = rank missing_nodes.append(node_dict) self.df_missing = pd.DataFrame.from_dict(data=missing_nodes) self.df_metrics = pd.concat([self.df_fixed_data, self.df_missing]) # create a graph object once all the nodes have been added graph = Graph(list_roots) if self.node_ordering: graph.node_ordering = True # we do not want to expose the "Node order" column to the user self.df_metrics = self.df_metrics.drop(columns="Node order") graph.enumerate_traverse() # merge the metrics and node dataframes on the idx column with self.timer.phase("data frame"): dataframe = pd.merge(self.df_metrics, self.df_nodes, on=self.nid_col_name) # set the index to be a MultiIndex indices = ["node"] if "rank" in self.json_cols: indices.append("rank") dataframe.set_index(indices, inplace=True) dataframe.sort_index(inplace=True) # create list of exclusive and inclusive metric columns exc_metrics = [] inc_metrics = [] for column in self.metric_columns: if "(inc)" in column: inc_metrics.append(column) else: exc_metrics.append(column) return hatchet.graphframe.GraphFrame( graph, dataframe, exc_metrics, inc_metrics, metadata=self.metadata )