Source code for k3d.helpers

"""Utilities module."""
import base64
import itertools
import msgpack
import numpy as np
import os
import zlib
from traitlets import TraitError
from urllib.request import urlopen

from ._protocol import get_protocol

# import logging
# from pprint import pprint, pformat
# logger = logging.getLogger()
# logger.setLevel(logging.INFO)
# fh = logging.FileHandler('k3d.log')
# fh.setLevel(logging.INFO)
# logger.addHandler(fh)

# pylint: disable=unused-argument
# noinspection PyUnusedLocal
[docs]def array_to_json(ar, compression_level=0, force_contiguous=True): """Return the serialization of a numpy array. Parameters ---------- ar : ndarray A numpy array. compression_level : int, optional Level of compression [-1, 9], by default 0. force_contiguous : bool, optional Make the array contiguous in memory, by default True. Returns ------- dict Binary data of the array with its dtype and shape. Raises ------ ValueError Unsupported dtype. """ if ar.dtype.kind not in ["u", "i", "f"]: # ints and floats raise ValueError("Unsupported dtype: %s" % ar.dtype) if ar.dtype == np.float64: # WebGL does not support float64 ar = ar.astype(np.float32) elif ar.dtype == np.int64: # JS does not support int64 ar = ar.astype(np.int32) # make sure it's contiguous if force_contiguous and not ar.flags["C_CONTIGUOUS"]: ar = np.ascontiguousarray(ar) if compression_level > 0: ret = { "compressed_data": zlib.compress(ar.flatten(), compression_level), "dtype": str(ar.dtype), "shape": ar.shape, } else: ret = { "data": memoryview(ar.flatten()), "dtype": str(ar.dtype), "shape": ar.shape, } if get_protocol() == 'text': return 'base64_' + base64.b64encode(msgpack.packb(ret, use_bin_type=True)).decode('ascii') else: return ret
# noinspection PyUnusedLocal
[docs]def json_to_array(value, obj=None): """Return numpy array from serialization. Parameters ---------- value : dict Binary data of an array with its dtype and shape. obj : dict, optional Object, by default None. Returns ------- ndarray Numpy array or None. """ if value: if "data" in value: return np.frombuffer(value["data"], dtype=value["dtype"]).reshape( value["shape"] ) else: return np.frombuffer( zlib.decompress(value["compressed_data"]), dtype=value["dtype"] ).reshape(value["shape"]) return None
[docs]def to_json(name, input, obj=None, compression_level=0): """Return JSON object serialization.""" if hasattr(obj, "compression_level"): compression_level = obj.compression_level if isinstance(input, dict): property = obj[name] ret = {} for key, value in input.items(): ret[str(key)] = to_json(key, value, property, compression_level) return ret elif isinstance(input, np.ndarray) and input.dtype is np.dtype(object): return to_json(name, input.tolist(), obj, compression_level) elif isinstance(input, list): property = obj[name] return [ to_json(idx, v, property, compression_level) for idx, v in enumerate(input) ] elif isinstance(input, bytes): return array_to_json(np.frombuffer(input, dtype=np.uint8), compression_level) elif isinstance(input, np.ndarray): return array_to_json(input, compression_level) else: return input
[docs]def from_json(input, obj=None): """Return JSON object deserialization.""" if isinstance(input, str) and input[0:7] == 'base64_': input = msgpack.unpackb(base64.b64decode(input[7:])) if isinstance(input, dict) \ and "dtype" in input \ and ("data" in input or "compressed_data" in input) \ and "shape" in input: return json_to_array(input, obj) elif isinstance(input, list): return [from_json(i, obj) for i in input] elif isinstance(input, dict): ret = {} for key, value in input.items(): ret[key] = from_json(value, obj) return ret else: return input
[docs]def array_serialization_wrap(name): """Return a wrap of the serialization and deserialization functions for array objects.""" return { "to_json": (lambda input, obj: to_json(name, input, obj)), "from_json": from_json, }
[docs]def callback_serialization_wrap(name): """Return a wrap of the serialization and deserialization functions for mouse actions.""" return { "to_json": (lambda input, obj: obj[name] is not None), "from_json": from_json, }
[docs]def download(url): """Retrieve the file at url, save it locally and return its name. Parameters ---------- url : str URL. Returns ------- str File path. """ basename = os.path.basename(url) if os.path.exists(basename): return basename with urlopen(url) as response, open(basename, "wb") as output: output.write( return basename
[docs]def minmax(arr): """Return the minimum and maximum value of an array. Parameters ---------- arr : array_like Array of numbers. Returns ------- list Array of two numbers. """ return [float(np.nanmin(arr)), float(np.nanmax(arr))]
[docs]def check_attribute_color_range(attribute, color_range=()): """Return color range versus provided attribute. Parameters ---------- attribute : list or dict (for timeseries) Array of numbers. color_range : tuple, optional Two numbers, by default (). Returns ------- tuple Color range. """ if len(color_range) == 2: return color_range elif type(attribute) is dict: t = [minmax(attribute[k]) for k in attribute.keys()] color_range = [min([v[0] for v in t]), max([v[1] for v in t])] elif attribute.size == 0: return color_range else: color_range = minmax(attribute) if color_range[0] == color_range[1]: color_range[1] += 1.0 return color_range
[docs]def map_colors(attribute, color_map, color_range=()): """Return color mapping according to an attribute and a colormap. The attribute represents the data on which the colormap will be apply. The color range allows to constraint the colormap between two values. Parameters ---------- attribute : ndarray Array of numbers. color_map : array_like Array of numbers. color_range : tuple, optional Two numbers, by default (). Returns ------- ndarray Color mapping. """ a_min, a_max = check_attribute_color_range(attribute, color_range) map_array = np.asarray(color_map) map_array = map_array.reshape((map_array.size // 4, 4)) # normalizing attribute for range lookup attribute = (attribute - a_min) / (a_max - a_min) red, green, blue = [ np.array( 255 * np.interp(attribute, xp=map_array[:, 0], fp=map_array[:, i + 1]), dtype=np.int32) for i in range(3) ] colors = (red << 16) + (green << 8) + blue return colors
[docs]def bounding_corners(bounds, z_bounds=(0, 1)): """Return corner point coordinates for bounds array. `z_bounds` assigns Z points coordinates if bounds contains less than 5 items. Parameters ---------- bounds : array_like Array of numbers. z_bounds : tuple, optional Two numbers, by default (0, 1). Returns ------- ndarray Corner points coordinates. """ return np.array( list(itertools.product(bounds[:2], bounds[2:4], bounds[4:] or z_bounds)) )
[docs]def min_bounding_dimension(bounds): """Return the minimal dimension along axis in a bounds array. `bounds` must be of the form [min_x, max_x, min_y, max_y, min_z, max_z]. Parameters ---------- bounds : array_like Array of numbers. Returns ------- number Minimum value of the array. """ return min(abs(x1 - x0) for x0, x1 in zip(bounds, bounds[1:]))
[docs]def shape_validation(*dimensions): """Create a validator callback ensuring array shape. Returns ------- function Shape validator function. Raises ------ TraitError Expected an array of shape _ and got _. """ def validator(trait, value): if np.shape(value) != dimensions: raise TraitError( "Expected an array of shape %s and got %s" % ( dimensions, value.shape) ) return value return validator
[docs]def sparse_voxels_validation(): """Check sparse voxels for array shape and values. Returns ------- function Sparse voxels validator function. Raises ------ TraitError Expected an array of shape (N, 4) and got _. TraitError Voxel coordinates and values must be non-negative. """ def validator(trait, value): if len(value.shape) != 2 or value.shape[1] != 4: raise TraitError( "Expected an array of shape (N, 4) and got %s" % (value.shape,) ) if (value.astype(np.int16) < 0).any(): raise TraitError( "Voxel coordinates and values must be non-negative") return value return validator
[docs]def quad(w, h): """Return the vertices and indices of a `w` * `h` quadrilateral. Parameters ---------- w : number Quadrilateral width. h : number Quadrilateral height. Returns ------- tuple Array of vertices and indices. """ w /= 2 h /= 2 vertices = np.array([-w, -h, -0, w, -h, 0, w, h, 0, -w, h, 0], dtype=np.float32) indices = np.array([0, 1, 2, 0, 2, 3], dtype=np.uint32) return vertices, indices
[docs]def get_bounding_box(model_matrix, boundary=[-0.5, 0.5, -0.5, 0.5, -0.5, 0.5]): """Return the boundaries of a model matrix. Parameters ---------- model_matrix : ndarray Matrix of numbers. Must have four columns. boundary : list, optional Array of numbers, by default [-0.5, 0.5, -0.5, 0.5, -0.5, 0.5]. Must be of the form [min_x, max_x, min_y, max_y, min_z, max_z]. Returns ------- ndarray Model matrix boundaries. """ b_min = np.array([boundary[0], boundary[2], boundary[4], 0]) b_max = np.array([boundary[1], boundary[3], boundary[5], 0]) b_min = b_max = return np.dstack([b_min[0:3], b_max[0:3]]).flatten()
[docs]def get_bounding_box_points(arr, model_matrix): """Return the minimum and maximum coordinates on x, y, z axes. Parameters ---------- arr : ndarray Array of vertices [x, y, z]. model_matrix : ndarray Matrix of numbers. Must have four columns. Returns ------- ndarray Array of numbers [min_x, max_x, min_y, max_y, min_z, max_z]. """ d = arr.flatten() if d.shape[0] < 3: d = np.array([0, 0, 0]) # fmt: off boundary = np.array([ np.min(d[0::3]), np.max(d[0::3]), np.min(d[1::3]), np.max(d[1::3]), np.min(d[2::3]), np.max(d[2::3]), ]) # fmt: on return get_bounding_box(model_matrix, boundary)
[docs]def get_bounding_box_point(position): """Return the boundaries of a position. Parameters ---------- position : array_like Array of numbers. Returns ------- ndarray Array of numbers. """ return np.dstack([np.array(position), np.array(position)]).flatten()