# Copyright 2017-2023 Lawrence Livermore National Security, LLC and other
# Hatchet Project Developers. See the top-level LICENSE file for details.
#
# SPDX-License-Identifier: MIT
import pandas as pd
import numpy as np
import os
import caliperreader as cr
import hatchet.graphframe
from hatchet.node import Node
from hatchet.graph import Graph
from hatchet.frame import Frame
from hatchet.util.timer import Timer
[docs]class CaliperNativeReader:
"""Read in a native `.cali` file using Caliper's python reader."""
def __init__(self, filename_or_caliperreader, native, string_attributes):
"""Read in a native cali using Caliper's python reader.
Args:
filename_or_caliperreader (str or CaliperReader): name of a `cali` file OR
a CaliperReader object
native (bool): use native metric names or user-readable metric names
string_attributes (str or list): Adds existing string attributes from within the caliper file to the dataframe
"""
self.filename_or_caliperreader = filename_or_caliperreader
self.filename_ext = ""
self.use_native_metric_names = native
self.string_attributes = string_attributes
self.df_nodes = {}
self.metric_cols = []
self.record_data_cols = []
self.node_dicts = []
self.callpath_to_node = {}
self.idx_to_node = {}
self.callpath_to_idx = {}
self.global_nid = 0
self.node_ordering = False
self.default_metric = None
self.timer = Timer()
if isinstance(self.filename_or_caliperreader, str):
_, self.filename_ext = os.path.splitext(filename_or_caliperreader)
if isinstance(self.string_attributes, str):
self.string_attributes = [self.string_attributes]
[docs] def read_metrics(self, ctx="path"):
all_metrics = []
records = self.filename_or_caliperreader.records
# read metadata from the caliper reader
for record in records:
node_dict = {}
if ctx in record:
# only parse records that have spot.channel=regionprofile or no
# spot.channel attribute
if (
"spot.channel" in record
and record["spot.channel"] == "regionprofile"
) or "spot.channel" not in record:
# get the node label and callpath for the record
if isinstance(record[ctx], list):
# specify how to parse cupti records
if "cupti.activity.kind" in record:
if record["cupti.activity.kind"] == "kernel":
node_label = record["cupti.kernel.name"]
node_callpath = tuple(record[ctx] + [node_label])
elif record["cupti.activity.kind"] == "memcpy":
node_label = record["cupti.activity.kind"]
node_callpath = tuple(record[ctx] + [node_label])
else:
node_label = record[ctx][-1]
node_callpath = tuple(record[ctx])
else:
node_label = record[ctx]
node_callpath = tuple([record[ctx]])
if "spot.channel" in record:
node_dict["spot.channel"] = record["spot.channel"]
# get node nid based on callpath
node_dict["nid"] = self.callpath_to_idx.get(node_callpath)
for item in record.keys():
if item != ctx:
if (
self.filename_or_caliperreader.attribute(
item
).attribute_type()
== "double"
):
node_dict[item] = float(record[item])
if item not in self.record_data_cols:
self.record_data_cols.append(item)
elif (
self.filename_or_caliperreader.attribute(
item
).attribute_type()
== "int"
):
node_dict[item] = int(record[item])
if item not in self.record_data_cols:
self.record_data_cols.append(item)
elif (
self.filename_or_caliperreader.attribute(
item
).attribute_type()
== "string"
):
if item in self.string_attributes:
node_dict[item] = record[item]
if item not in self.record_data_cols:
self.record_data_cols.append(item)
all_metrics.append(node_dict)
# make list of metric columns
for col in self.record_data_cols:
if self.filename_or_caliperreader.attribute(col).is_value():
self.metric_cols.append(col)
df_metrics = pd.DataFrame.from_dict(data=all_metrics)
df_new = df_metrics.groupby(df_metrics["nid"]).aggregate("first").reset_index()
return df_new
[docs] def create_graph(self, ctx="path"):
list_roots = []
def _create_parent(child_node, parent_callpath):
"""We may encounter a parent node in the callpath before we see it
as a child node. In this case, we need to create a hatchet node for
the parent.
This function recursively creates parent nodes in a callpath
until it reaches the already existing parent in that callpath.
"""
parent_node = self.callpath_to_node.get(parent_callpath)
if parent_node:
# return if arrives at the parent
parent_node.add_child(child_node)
child_node.add_parent(parent_node)
return
else:
# else create the parent and add parent/child
# if root node, end recursive call to create parent nodes
if not parent_callpath:
list_roots.append(child_node)
node_dict = dict(
{
"name": child_node.frame["name"],
"node": child_node,
"nid": self.global_nid,
}
)
self.idx_to_node[self.global_nid] = node_dict
self.global_nid += 1
else:
grandparent_callpath = parent_callpath[:-1]
parent_name = parent_callpath[-1]
parent_node = Node(
Frame({"type": "function", "name": parent_name}), None
)
self.callpath_to_node[parent_callpath] = parent_node
self.callpath_to_idx[parent_callpath] = self.global_nid
node_dict = dict(
{
"name": parent_name,
"node": parent_node,
"nid": self.global_nid,
},
)
self.idx_to_node[self.global_nid] = node_dict
self.global_nid += 1
parent_node.add_child(child_node)
child_node.add_parent(parent_node)
_create_parent(parent_node, grandparent_callpath)
parent_hnode = None
records = self.filename_or_caliperreader.records
order = -1
for record in records:
node_label = ""
if ctx in record:
if (
"spot.channel" in record
and record["spot.channel"] == "regionprofile"
) or "spot.channel" not in record:
# if it's a list, then it's a callpath
if isinstance(record[ctx], list):
# specify how to parse cupti records
if "cupti.activity.kind" in record:
if record["cupti.activity.kind"] == "kernel":
node_label = record["cupti.kernel.name"]
node_callpath = tuple(record[ctx] + [node_label])
parent_callpath = node_callpath[:-1]
node_type = "kernel"
elif record["cupti.activity.kind"] == "memcpy":
node_label = record["cupti.activity.kind"]
node_callpath = tuple(record[ctx] + [node_label])
parent_callpath = node_callpath[:-1]
node_type = "memcpy"
else:
Exception("Haven't seen this activity kind yet")
else:
node_label = record[ctx][-1]
node_callpath = tuple(record[ctx])
parent_callpath = node_callpath[:-1]
node_type = "function"
hnode = self.callpath_to_node.get(node_callpath)
if not hnode:
# set the _hatchet_nid by the node order column if it exists, else -1
if "min#min#aggregate.slot" in record:
self.node_ordering = True
order = record["min#min#aggregate.slot"]
frame = Frame({"type": node_type, "name": node_label})
order = int(order)
hnode = Node(frame, hnid=order)
self.callpath_to_node[node_callpath] = hnode
# get parent from node callpath
parent_hnode = self.callpath_to_node.get(parent_callpath)
# create parent if it doesn't exist
# else if parent already exists, add child-parent
if not parent_hnode:
_create_parent(hnode, parent_callpath)
else:
parent_hnode.add_child(hnode)
hnode.add_parent(parent_hnode)
self.callpath_to_idx[node_callpath] = self.global_nid
node_dict = dict(
{
"name": node_label,
"node": hnode,
"nid": self.global_nid,
},
)
self.idx_to_node[self.global_nid] = node_dict
self.global_nid += 1
# if it's a string, then it's a root
else:
root_label = record[ctx]
root_callpath = tuple([root_label])
if root_callpath not in self.callpath_to_node:
# create the root since it doesn't exist
frame = Frame({"type": "function", "name": root_label})
graph_root = Node(frame, None)
# store callpaths to identify the root
self.callpath_to_node[root_callpath] = graph_root
self.callpath_to_idx[root_callpath] = self.global_nid
list_roots.append(graph_root)
node_dict = dict(
{
"name": root_label,
"node": graph_root,
"nid": self.global_nid,
}
)
self.idx_to_node[self.global_nid] = node_dict
self.global_nid += 1
return list_roots
def _parse_metadata(self, mdata):
"""Convert Caliper Metadata values into correct Python objects.
Args:
mdata (dict[str: str]): metadata to convert
Return:
(dict[str: str]): modified metadata
"""
parsed_mdata = {}
for k, v in mdata.items():
# environment information service brings in different metadata types
if isinstance(v, list):
parsed_mdata[k] = v
continue
# If the value is an int, convert it to an int.
try:
parsed_mdata[k] = int(v)
except ValueError:
# If the value is a float, convert it to a float
try:
parsed_mdata[k] = float(v)
except ValueError:
# If the value is a list or tuple, convert it to a list or
# tuple
if v.startswith("[") and v.endswith("]"):
parsed_mdata[k] = [
elem.strip() for elem in v.strip("][").split(",")
]
elif v.startswith("(") and v.endswith(")"):
parsed_mdata[k] = [
elem.strip() for elem in v.strip(")(").split(",")
]
# If the value is a string, just save it as-is
else:
parsed_mdata[k] = v
return parsed_mdata
[docs] def read(self):
"""Read the caliper records to extract the calling context tree."""
if isinstance(self.filename_or_caliperreader, str):
if self.filename_ext != ".cali":
raise ValueError("from_caliperreader() needs a .cali file")
else:
cali_file = self.filename_or_caliperreader
self.filename_or_caliperreader = cr.CaliperReader()
self.filename_or_caliperreader.read(cali_file)
with self.timer.phase("graph construction"):
list_roots = self.create_graph()
self.df_nodes = pd.DataFrame(data=list(self.idx_to_node.values()))
# create a graph object once all the nodes have been added
graph = Graph(list_roots)
if self.node_ordering:
graph.node_ordering = True
graph.enumerate_traverse()
with self.timer.phase("read metrics"):
df_fixed_data = self.read_metrics()
metrics = pd.DataFrame.from_dict(data=df_fixed_data)
# add missing intermediate nodes to the df_fixed_data dataframe
if "mpi.rank" in df_fixed_data.columns:
num_ranks = metrics["mpi.rank"].max() + 1
rank_list = range(0, num_ranks)
# create a standard dict to be used for filling all missing rows
default_metric_dict = {}
for idx, col in enumerate(self.record_data_cols):
if self.filename_or_caliperreader.attribute(col).is_value():
default_metric_dict[list(self.record_data_cols)[idx]] = 0
else:
default_metric_dict[list(self.record_data_cols)[idx]] = None
default_metric_dict["nid"] = np.nan
# create a list of dicts, one dict for each missing row
missing_nodes = []
for iteridx, row in self.df_nodes.iterrows():
# check if df_nodes row exists in df_fixed_data
metric_rows = df_fixed_data.loc[metrics["nid"] == row["nid"]]
if "mpi.rank" not in self.metric_cols:
if metric_rows.empty:
# add a single row
node_dict = dict(default_metric_dict)
missing_nodes.append(node_dict)
else:
if metric_rows.empty:
# add a row per MPI rank
for rank in rank_list:
node_dict = dict(default_metric_dict)
node_dict["nid"] = row["nid"]
node_dict["mpi.rank"] = rank
missing_nodes.append(node_dict)
elif len(metric_rows) < num_ranks:
# add a row for each missing MPI rank
present_ranks = metric_rows["mpi.rank"].values
missing_ranks = [x for x in rank_list if x not in present_ranks]
for rank in missing_ranks:
node_dict = dict(default_metric_dict)
node_dict["nid"] = row["nid"]
node_dict["mpi.rank"] = rank
missing_nodes.append(node_dict)
df_missing = pd.DataFrame.from_dict(data=missing_nodes)
df_metrics = pd.concat([df_fixed_data, df_missing], sort=False)
# rename columns to user-readable metric names (i.e., aliases)
if not self.use_native_metric_names:
for col in df_metrics.columns:
if col == "nid":
continue
alias = self.filename_or_caliperreader.attribute(col).get(
"attribute.alias"
)
if alias:
# update column name in metrics dataframe
df_metrics.rename(columns={col: alias}, inplace=True)
# also update list of metric columns
self.metric_cols = [
alias if item == col else item for item in self.metric_cols
]
# dict mapping old to new column names to make columns consistent with
# other readers
old_to_new = {
"mpi.rank": "rank",
"module#cali.sampler.pc": "module",
"sum#time.duration": "time",
"sum#avg#sum#time.duration": "time",
"inclusive#sum#time.duration": "time (inc)",
"sum#avg#inclusive#sum#time.duration": "time (inc)",
}
# change column names
new_cols = []
for col in df_metrics.columns:
if col in old_to_new:
new_cols.append(old_to_new[col])
else:
new_cols.append(col)
df_metrics.columns = new_cols
# create list of exclusive and inclusive metric columns
ignore_columns = ["mpi.rank", "aggregate.slot", "Node order"]
exc_metrics = []
inc_metrics = []
for column in self.metric_cols:
# ignore certain columns as an exc or inc metric
if column in ignore_columns:
continue
# add new column names to list of metrics if inc or inclusive in
# old column names
if "(inc)" in column or "inclusive" in column:
if column in old_to_new:
column = old_to_new[column]
inc_metrics.append(column)
else:
if column in old_to_new:
column = old_to_new[column]
exc_metrics.append(column)
with self.timer.phase("data frame"):
# merge the metrics and node dataframes on the nid column
dataframe = pd.merge(df_metrics, self.df_nodes, on="nid")
dataframe["nid"] = dataframe["nid"].astype(pd.Int64Dtype())
# set the index to be a MultiIndex
indices = ["node"]
if "rank" in dataframe.columns:
indices.append("rank")
dataframe.set_index(indices, inplace=True)
dataframe.sort_index(inplace=True)
# set the default metric
if self.default_metric is None:
if "time (inc)" in dataframe.columns:
self.default_metric = "time"
elif "avg#inclusive#sum#time.duration" in dataframe.columns:
self.default_metric = "avg#inclusive#sum#time.duration"
elif len(inc_metrics) > 0:
self.default_metric = inc_metrics[0]
elif len(exc_metrics) > 0:
self.default_metric = exc_metrics[0]
metadata = self.filename_or_caliperreader.globals
parsed_metadata = self._parse_metadata(metadata)
return hatchet.graphframe.GraphFrame(
graph,
dataframe,
exc_metrics,
inc_metrics,
self.default_metric,
metadata=parsed_metadata,
)