Source code for hatchet.external.console

# -*- coding: utf-8 -*-

# Copyright (c) 2014-2018, Joe Rickerby and contributors
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

from ..version import __version__

import pandas as pd
import numpy as np
import warnings
from ..util.colormaps import ColorMaps


[docs]class ConsoleRenderer: def __init__(self, unicode=False, color=False): self.unicode = unicode self.color = color self.visited = []
[docs] def render(self, roots, dataframe, **kwargs): self.render_header = kwargs["render_header"] if self.render_header: result = self.render_preamble() else: result = "" if roots is None: result += "The graph is empty.\n\n" return result self.metric_columns = kwargs["metric_column"] self.annotation_column = kwargs["annotation_column"] self.precision = kwargs["precision"] self.name = kwargs["name_column"] self.expand = kwargs["expand_name"] self.context = kwargs["context_column"] self.rank = kwargs["rank"] self.thread = kwargs["thread"] self.depth = kwargs["depth"] self.highlight = kwargs["highlight_name"] self.colormap = kwargs["colormap"] self.invert_colormap = kwargs["invert_colormap"] self.colormap_annotations = kwargs["colormap_annotations"] self.min_value = kwargs["min_value"] self.max_value = kwargs["max_value"] if self.color: self.colors = self.colors_enabled # set the colormap based on user input self.colors.colormap = ColorMaps().get_colors( self.colormap, self.invert_colormap ) if self.annotation_column and self.colormap_annotations: self.colors_annotations = self.colors_enabled() if isinstance(self.colormap_annotations, (str, list)): if isinstance(self.colormap_annotations, str): self.colors_annotations.colormap = ColorMaps().get_colors( self.colormap_annotations, False ) elif isinstance(self.colormap_annotations, list): self.colors_annotations.colormap = self.colormap_annotations self.colors_annotations_mapping = sorted( list(dataframe[self.annotation_column].apply(str).unique()) ) elif isinstance(self.colormap_annotations, dict): self.colors_annotations_mapping = self.colormap_annotations else: self.colors = self.colors_disabled if isinstance(self.metric_columns, str): self.primary_metric = self.metric_columns self.second_metric = None elif isinstance(self.metric_columns, list): if len(self.metric_columns) > 2: warnings.warn( "More than 2 metrics specified in metric_column=. Tree() will only show 2 metrics at a time. The remaining metrics will not be shown.", SyntaxWarning, ) self.primary_metric = self.metric_columns[0] self.second_metric = self.metric_columns[1] elif len(self.metric_columns) == 2: self.primary_metric = self.metric_columns[0] self.second_metric = self.metric_columns[1] elif len(self.metric_columns) == 1: self.primary_metric = self.metric_columns[0] self.second_metric = None if self.primary_metric not in dataframe.columns: raise KeyError( "metric_column={} does not exist in the dataframe, please select a valid column. See a list of the available metrics with GraphFrame.show_metric_columns().".format( self.primary_metric ) ) if ( self.second_metric is not None and self.second_metric not in dataframe.columns ): raise KeyError( "metric_column={} does not exist in the dataframe, please select a valid column. See a list of the available metrics with GraphFrame.show_metric_columns().".format( self.second_metric ) ) # grab the min and max value for the primary metric, ignoring inf and # nan values if "rank" in dataframe.index.names: metric_series = (dataframe.xs(self.rank, level=1))[self.primary_metric] else: metric_series = dataframe[self.primary_metric] isfinite_mask = np.isfinite(metric_series.values) filtered_series = pd.Series( metric_series.values[isfinite_mask], metric_series.index[isfinite_mask] ) self.max_metric = self.max_value if self.max_value else filtered_series.max() self.min_metric = self.min_value if self.min_value else filtered_series.min() if self.unicode: self.lr_arrows = {"◀": u"◀ ", "▶": u"▶ "} else: self.lr_arrows = {"◀": u"< ", "▶": u"> "} for root in sorted(roots, key=lambda n: n._hatchet_nid): result += self.render_frame(root, dataframe) if self.color is True: result += self.render_legend() if self.unicode: return result else: return result.encode("utf-8")
# pylint: disable=W1401
[docs] def render_preamble(self): lines = [ r" __ __ __ __ ", r" / /_ ____ _/ /______/ /_ ___ / /_", r" / __ \/ __ `/ __/ ___/ __ \/ _ \/ __/", r" / / / / /_/ / /_/ /__/ / / / __/ /_ ", r"/_/ /_/\__,_/\__/\___/_/ /_/\___/\__/ {:>2}".format("v" + __version__), r"", r"", ] return "\n".join(lines)
[docs] def render_legend(self): def render_label(index, low, high): metric_range = self.max_metric - self.min_metric return ( self.colors.colormap[index] + u"█ " + self.colors.end + "{:.2f}".format(low * metric_range + self.min_metric) + " - " + "{:.2f}".format(high * metric_range + self.min_metric) + "\n" ) legend = ( "\n" + "\033[4m" + "Legend" + self.colors.end + " (Metric: " + str(self.primary_metric) + " Min: {:.2f}".format(self.min_metric) + " Max: {:.2f}".format(self.max_metric) + ")\n" ) legend += render_label(0, 0.9, 1.0) legend += render_label(1, 0.7, 0.9) legend += render_label(2, 0.5, 0.7) legend += render_label(3, 0.3, 0.5) legend += render_label(4, 0.1, 0.3) legend += render_label(5, 0.0, 0.1) legend += "\n" + self._ansi_color_for_name("name") + "name" + self.colors.end legend += " User code " legend += self.colors.left + self.lr_arrows["◀"] + self.colors.end legend += " Only in left graph " legend += self.colors.right + self.lr_arrows["▶"] + self.colors.end legend += " Only in right graph\n" return legend
[docs] def render_frame(self, node, dataframe, indent=u"", child_indent=u""): node_depth = node._depth if node_depth < self.depth: # set dataframe index based on whether rank and thread are part of # the MultiIndex if "rank" in dataframe.index.names and "thread" in dataframe.index.names: df_index = (node, self.rank, self.thread) elif "rank" in dataframe.index.names: df_index = (node, self.rank) elif "thread" in dataframe.index.names: df_index = (node, self.thread) else: df_index = node node_metric = dataframe.loc[df_index, self.primary_metric] metric_precision = "{:." + str(self.precision) + "f}" metric_str = ( self._ansi_color_for_metric(node_metric) + metric_precision.format(node_metric) + self.colors.end ) if self.second_metric is not None: metric_str += u" {c.faint}{second_metric:.{precision}f}{c.end}".format( second_metric=dataframe.loc[df_index, self.second_metric], precision=self.precision, c=self.colors, ) if self.annotation_column is not None: annotation_content = str( dataframe.loc[df_index, self.annotation_column] ) if self.colormap_annotations: if isinstance(self.colormap_annotations, dict): color_annotation = self.colors_annotations_mapping[ annotation_content ] else: color_annotation = self.colors_annotations.colormap[ self.colors_annotations_mapping.index(annotation_content) % len(self.colors_annotations.colormap) ] metric_str += " [{}".format(color_annotation) metric_str += "{}".format(annotation_content) metric_str += "{}]".format(self.colors_annotations.end) else: metric_str += " [{}]".format(annotation_content) node_name = dataframe.loc[df_index, self.name] if self.expand is False: if len(node_name) > 39: node_name = ( node_name[:18] + "..." + node_name[(len(node_name) - 18) :] ) name_str = ( self._ansi_color_for_name(node_name) + node_name + self.colors.end ) # 0 is "", 1 is "L", and 2 is "R" if "_missing_node" in dataframe.columns: left_or_right = dataframe.loc[df_index, "_missing_node"] if left_or_right == 0: lr_decorator = u"" elif left_or_right == 1: lr_decorator = u" {c.left}{decorator}{c.end}".format( decorator=self.lr_arrows["◀"], c=self.colors ) elif left_or_right == 2: lr_decorator = u" {c.right}{decorator}{c.end}".format( decorator=self.lr_arrows["▶"], c=self.colors ) result = u"{indent}{metric_str} {name_str}".format( indent=indent, metric_str=metric_str, name_str=name_str ) if "_missing_node" in dataframe.columns: result += lr_decorator if self.context in dataframe.columns: result += u" {c.faint}{context}{c.end}\n".format( context=dataframe.loc[df_index, self.context], c=self.colors ) else: result += u"\n" if self.unicode: indents = {"├": u"├─ ", "│": u"│ ", "└": u"└─ ", " ": u" "} else: indents = {"├": u"|- ", "│": u"| ", "└": u"`- ", " ": u" "} # ensures that we never revisit nodes in the case of # large complex graphs if node not in self.visited: self.visited.append(node) sorted_children = sorted(node.children, key=lambda n: n._hatchet_nid) if sorted_children: last_child = sorted_children[-1] for child in sorted_children: if child is not last_child: c_indent = child_indent + indents["├"] cc_indent = child_indent + indents["│"] else: c_indent = child_indent + indents["└"] cc_indent = child_indent + indents[" "] result += self.render_frame( child, dataframe, indent=c_indent, child_indent=cc_indent ) else: result = "" indents = {"├": u"", "│": u"", "└": u"", " ": u""} return result
def _ansi_color_for_metric(self, metric): metric_range = self.max_metric - self.min_metric if metric_range != 0: proportion_of_total = (metric - self.min_metric) / metric_range else: proportion_of_total = metric / 1 if proportion_of_total > 0.9: return self.colors.colormap[0] elif proportion_of_total > 0.7: return self.colors.colormap[1] elif proportion_of_total > 0.5: return self.colors.colormap[2] elif proportion_of_total > 0.3: return self.colors.colormap[3] elif proportion_of_total > 0.1: return self.colors.colormap[4] elif proportion_of_total >= 0: return self.colors.colormap[5] else: return self.colors.blue def _ansi_color_for_name(self, node_name): if self.highlight is False: return "" if "<unknown procedure>" in node_name or "<unknown file>" in node_name: return "" else: return self.colors.bg_white_255 + self.colors.dark_gray_255
[docs] class colors_enabled: colormap = [] blue = "\033[34m" cyan = "\033[36m" bg_white_255 = "\033[48;5;246m" dark_gray_255 = "\033[38;5;232m" left = "\033[38;5;160m" right = "\033[38;5;28m" faint = "\033[2m" end = "\033[0m"
[docs] class colors_disabled: colormap = ["", "", "", "", "", "", ""] def __getattr__(self, key): return ""
colors_disabled = colors_disabled()