Source code for k3d.helpers

"""Utilities module."""

import base64
import itertools
import logging
import msgpack
import numpy as np
import os
import zlib
from traitlets import TraitError
from typing import Any
from typing import Dict as TypingDict
from typing import List as TypingList
from typing import Optional, Tuple, Union
from urllib.request import urlopen

from ._protocol import get_protocol

# Set up module-level logger
logger = logging.getLogger(__name__)
if not logger.hasHandlers():
    handler = logging.StreamHandler()
    formatter = logging.Formatter(
        "[%(asctime)s] %(levelname)s in %(module)s: %(message)s"
    )
    handler.setFormatter(formatter)
    logger.addHandler(handler)
logger.setLevel(logging.INFO)


# pylint: disable=unused-argument
# noinspection PyUnusedLocal
[docs] def array_to_json( ar: np.ndarray, compression_level: int = 0, force_contiguous: bool = True ) -> Union[str, TypingDict[str, Any]]: """ Return the serialization of a numpy array. Parameters ---------- ar : ndarray A numpy array. compression_level : int, optional Level of compression [-1, 9], by default 0. force_contiguous : bool, optional Make the array contiguous in memory, by default True. Returns ------- dict or str Binary data of the array with its dtype and shape, or a base64 string if protocol is 'text'. Raises ------ ValueError If the dtype is unsupported. """ if ar.dtype.kind not in ["u", "i", "f"]: # ints and floats logger.error(f"Unsupported dtype: {ar.dtype}") raise ValueError(f"Unsupported dtype: {ar.dtype}") if ar.dtype == np.float64: # WebGL does not support float64 logger.info("Converting float64 array to float32 for WebGL compatibility.") ar = ar.astype(np.float32) elif ar.dtype == np.int64: # JS does not support int64 logger.info("Converting int64 array to int32 for JS compatibility.") ar = ar.astype(np.int32) # make sure it's contiguous if force_contiguous and not ar.flags["C_CONTIGUOUS"]: ar = np.ascontiguousarray(ar) if compression_level > 0: ret = { "compressed_data": zlib.compress(ar.flatten(), compression_level), "dtype": str(ar.dtype), "shape": ar.shape, } else: ret = { "data": memoryview(ar.flatten()), "dtype": str(ar.dtype), "shape": ar.shape, } if get_protocol() == "text": return "base64_" + base64.b64encode( msgpack.packb(ret, use_bin_type=True) ).decode("ascii") else: return ret
# noinspection PyUnusedLocal
[docs] def json_to_array( value: Optional[TypingDict[str, Any]], obj: Optional[Any] = None ) -> Optional[np.ndarray]: """ Return numpy array from serialization. Parameters ---------- value : dict Binary data of an array with its dtype and shape. obj : dict, optional Object, by default None. Returns ------- ndarray or None Numpy array or None. """ if value: if "data" in value: return np.frombuffer(value["data"], dtype=value["dtype"]).reshape( value["shape"] ) else: return np.frombuffer( zlib.decompress(value["compressed_data"]), dtype=value["dtype"] ).reshape(value["shape"]) return None
[docs] def to_json( name: str, input: Any, obj: Optional[Any] = None, compression_level: int = 0 ) -> Any: """ Return JSON object serialization. Parameters ---------- name : str Name of the property. input : Any Input data to serialize. obj : Any, optional Object containing property, by default None. compression_level : int, optional Compression level, by default 0. Returns ------- Any Serialized JSON object. """ if hasattr(obj, "compression_level"): compression_level = obj.compression_level if isinstance(input, dict): property = obj[name] ret = {} for key, value in input.items(): ret[str(key)] = to_json(key, value, property, compression_level) return ret elif isinstance(input, np.ndarray) and input.dtype is np.dtype(object): return to_json(name, input.tolist(), obj, compression_level) elif isinstance(input, list): property = obj[name] return [ to_json(idx, v, property, compression_level) for idx, v in enumerate(input) ] elif isinstance(input, bytes): return array_to_json(np.frombuffer(input, dtype=np.uint8), compression_level) elif isinstance(input, np.ndarray): return array_to_json(input, compression_level) elif isinstance(input, np.number): return input.tolist() else: return input
[docs] def from_json(input: Any, obj: Optional[Any] = None) -> Any: """ Return JSON object deserialization. Parameters ---------- input : Any Input data to deserialize. obj : Any, optional Object, by default None. Returns ------- Any Deserialized object. """ if isinstance(input, str) and input[0:7] == "base64_": input = msgpack.unpackb(base64.b64decode(input[7:])) if ( isinstance(input, dict) and "dtype" in input and ("data" in input or "compressed_data" in input) and "shape" in input ): return json_to_array(input, obj) elif isinstance(input, list): return [from_json(i, obj) for i in input] elif isinstance(input, dict): ret = {} for key, value in input.items(): ret[key] = from_json(value, obj) return ret else: return input
[docs] def array_serialization_wrap(name: str) -> TypingDict[str, Any]: """ Return a wrap of the serialization and deserialization functions for array objects. Parameters ---------- name : str Name of the property. Returns ------- dict Dictionary with 'to_json' and 'from_json' functions. """ return { "to_json": (lambda input, obj: to_json(name, input, obj)), "from_json": from_json, }
[docs] def callback_serialization_wrap(name: str) -> TypingDict[str, Any]: """ Return a wrap of the serialization and deserialization functions for mouse actions. Parameters ---------- name : str Name of the property. Returns ------- dict Dictionary with 'to_json' and 'from_json' functions. """ return { "to_json": (lambda input, obj: obj[name] is not None), "from_json": from_json, }
[docs] def download(url: str) -> str: """ Retrieve the file at url, save it locally and return its name. Parameters ---------- url : str URL. Returns ------- str File path. """ basename = os.path.basename(url) if os.path.exists(basename): logger.info(f"File already exists locally: {basename}") return basename try: with urlopen(url) as response, open(basename, "wb") as output: output.write(response.read()) logger.info(f"Downloaded file from {url} to {basename}") except Exception as e: logger.error(f"Failed to download {url}: {e}") raise return basename
[docs] def minmax(arr: np.ndarray) -> TypingList[float]: """Return the minimum and maximum value of an array. Parameters ---------- arr : array_like Array of numbers. Returns ------- list Array of two numbers. """ return [float(np.nanmin(arr)), float(np.nanmax(arr))]
[docs] def check_attribute_color_range( attribute: Union[np.ndarray, TypingDict[str, np.ndarray]], color_range: Union[TypingList[float], Tuple[float, ...]] = (), ) -> TypingList[float]: """Return color range versus provided attribute. Parameters ---------- attribute : list or dict (for timeseries) Array of numbers. color_range : tuple, optional Two numbers, by default (). Returns ------- tuple Color range. """ if color_range is None: color_range = [] if len(color_range) == 2: return color_range elif type(attribute) is dict: t = [minmax(attribute[k]) for k in attribute.keys()] color_range = [min([v[0] for v in t]), max([v[1] for v in t])] elif attribute.size == 0: return color_range else: color_range = minmax(attribute) if color_range[0] == color_range[1]: color_range[1] += 1.0 return color_range
[docs] def map_colors( attribute: np.ndarray, color_map: Union[TypingList[TypingList[float]], np.ndarray], color_range: Union[TypingList[float], Tuple[float, ...]] = (), ) -> np.ndarray: """Return color mapping according to an attribute and a colormap. The attribute represents the data on which the colormap will be applied. The color range allows to constraint the colormap between two values. Parameters ---------- attribute : ndarray Array of numbers. color_map : array_like Array of numbers. color_range : tuple, optional Two numbers, by default (). Returns ------- ndarray Color mapping. """ a_min, a_max = check_attribute_color_range(attribute, color_range) map_array = np.asarray(color_map) map_array = map_array.reshape((map_array.size // 4, 4)) # normalizing attribute for range lookup attribute = (attribute - a_min) / (a_max - a_min) red, green, blue = [ np.array( 255 * np.interp(attribute, xp=map_array[:, 0], fp=map_array[:, i + 1]), dtype=np.int32, ) for i in range(3) ] colors = (red << 16) + (green << 8) + blue return colors
[docs] def bounding_corners( bounds: Union[TypingList[float], np.ndarray], z_bounds: Tuple[float, float] = (0, 1) ) -> np.ndarray: """Return corner point coordinates for bounds array. `z_bounds` assigns Z points coordinates if bounds contains less than 5 items. Parameters ---------- bounds : array_like Array of numbers. z_bounds : tuple, optional Two numbers, by default (0, 1). Returns ------- ndarray Corner points coordinates. """ return np.array( list(itertools.product(bounds[:2], bounds[2:4], bounds[4:] or z_bounds)) )
[docs] def min_bounding_dimension(bounds: Union[TypingList[float], np.ndarray]) -> float: """Return the minimal dimension along axis in a bounds array. `bounds` must be of the form [min_x, max_x, min_y, max_y, min_z, max_z]. Parameters ---------- bounds : array_like Array of numbers. Returns ------- number Minimum value of the array. """ return min(abs(x1 - x0) for x0, x1 in zip(bounds, bounds[1:]))
[docs] def shape_validation(*dimensions): """Create a validator callback ensuring array shape. Returns ------- function Shape validator function. Raises ------ TraitError Expected an array of shape _ and got _. """ def validator(trait, value): if np.shape(value) != dimensions: raise TraitError( "Expected an array of shape %s and got %s" % (dimensions, value.shape) ) return value return validator
[docs] def sparse_voxels_validation(): """Check sparse voxels for array shape and values. Returns ------- function Sparse voxels validator function. Raises ------ TraitError Expected an array of shape (N, 4) and got _. TraitError Voxel coordinates and values must be non-negative. """ def validator(trait, value): if len(value.shape) != 2 or value.shape[1] != 4: raise TraitError( "Expected an array of shape (N, 4) and got %s" % (value.shape,) ) if (value.astype(np.int16) < 0).any(): raise TraitError("Voxel coordinates and values must be non-negative") return value return validator
[docs] def quad(w: float, h: float) -> Tuple[np.ndarray, np.ndarray]: """Return the vertices and indices of a `w` * `h` quadrilateral. Parameters ---------- w : number Quadrilateral width. h : number Quadrilateral height. Returns ------- tuple Array of vertices and indices. """ w /= 2 h /= 2 vertices = np.array([-w, -h, -0, w, -h, 0, w, h, 0, -w, h, 0], dtype=np.float32) indices = np.array([0, 1, 2, 0, 2, 3], dtype=np.uint32) return vertices, indices
[docs] def get_bounding_box(model_matrix, boundary=[-0.5, 0.5, -0.5, 0.5, -0.5, 0.5]): """Return the boundaries of a model matrix. Parameters ---------- model_matrix : ndarray Matrix of numbers. Must have four columns. boundary : list, optional Array of numbers, by default [-0.5, 0.5, -0.5, 0.5, -0.5, 0.5]. Must be of the form [min_x, max_x, min_y, max_y, min_z, max_z]. Returns ------- ndarray Model matrix boundaries. """ b_min = np.array([boundary[0], boundary[2], boundary[4], 0]) b_max = np.array([boundary[1], boundary[3], boundary[5], 0]) b_min = model_matrix.dot(b_min) b_max = model_matrix.dot(b_max) return np.dstack([b_min[0:3], b_max[0:3]]).flatten()
[docs] def get_bounding_box_points(arr, model_matrix): """Return the minimum and maximum coordinates on x, y, z axes. Parameters ---------- arr : ndarray Array of vertices [x, y, z]. model_matrix : ndarray Matrix of numbers. Must have four columns. Returns ------- ndarray Array of numbers [min_x, max_x, min_y, max_y, min_z, max_z]. """ d = arr.flatten() if d.shape[0] < 3: d = np.array([0, 0, 0]) # fmt: off boundary = np.array([ np.nanmin(d[0::3]), np.nanmax(d[0::3]), np.nanmin(d[1::3]), np.nanmax(d[1::3]), np.nanmin(d[2::3]), np.nanmax(d[2::3]), ]) # fmt: on return get_bounding_box(model_matrix, boundary)
[docs] def get_bounding_box_point(position): """Return the boundaries of a position. Parameters ---------- position : array_like Array of numbers. Returns ------- ndarray Array of numbers. """ return np.dstack([np.array(position), np.array(position)]).flatten()
def unify_color_map(cm): cm[0::4] = (cm[0::4] - np.min(cm[0::4])) / (np.max(cm[0::4]) - np.min(cm[0::4])) return cm def contour(data, bounds, values, clustering_factor=0): import pyvista grid = pyvista.ImageData( dimensions=data.shape[::-1], spacing=(bounds[1::2] - bounds[::2]) / np.array(data.shape[::-1]), origin=bounds[::2], ) grid.point_data["values"] = data.flatten() mesh = grid.contour(values, grid.point_data["values"], method="flying_edges") mesh.compute_normals(inplace=True) if clustering_factor > 1: import pyacvd clus = pyacvd.Clustering(mesh) clus.cluster(mesh.n_points // clustering_factor) return clus.create_mesh() else: return mesh