Source code for luma.core.virtual

# -*- coding: utf-8 -*-
# Copyright (c) 2017-2022 Richard Hull and contributors
# See LICENSE.rst for details.

from time import sleep
from textwrap import TextWrapper

from PIL import Image, ImageDraw, ImageFont

from luma.core import mixin, ansi_color
from luma.core.threadpool import threadpool
from luma.core.render import canvas
from luma.core.util import mutable_string, observable, perf_counter

pool = threadpool(4)

[docs]def calc_bounds(xy, entity): """ For an entity with width and height attributes, determine the bounding box if were positioned at ``(x, y)``. """ left, top = xy right, bottom = left + entity.width, top + entity.height return [left, top, right, bottom]
[docs]def range_overlap(a_min, a_max, b_min, b_max): """ Neither range is completely greater than the other. """ return (a_min < b_max) and (b_min < a_max)
[docs]class viewport(mixin.capabilities): """ The viewport offers a positionable window into a larger resolution pseudo-display, that also supports the concept of hotspots (which act like live displays). :param device: The device to project the enlarged pseudo-display viewport onto. :param width: The number of horizontal pixels. :type width: int :param height: The number of vertical pixels. :type height: int :param mode: The supported color model, one of ``"1"``, ``"RGB"`` or ``"RGBA"`` only. :type mode: str :param dither: By default, any color (other than black) will be `generally` treated as white when displayed on monochrome devices. However, this behaviour can be changed by adding ``dither=True`` and the image will be converted from RGB space into a 1-bit monochrome image where dithering is employed to differentiate colors at the expense of resolution. :type dither: bool """ def __init__(self, device, width, height, mode=None, dither=False): self.capabilities(width, height, rotate=0, mode=mode or device.mode) if hasattr(device, "segment_mapper"): self.segment_mapper = device.segment_mapper self._device = device self._backing_image =, self.size) self._position = (0, 0) self._hotspots = [] self._dither = dither
[docs] def display(self, image): assert image.mode == self.mode assert image.size == self.size self._backing_image.paste(image) self.refresh()
[docs] def set_position(self, xy): self._position = xy self.refresh()
[docs] def add_hotspot(self, hotspot, xy): """ Add the hotspot at ``(x, y)``. The hotspot must fit inside the bounds of the virtual device. If it does not then an ``AssertError`` is raised. """ (x, y) = xy assert 0 <= x <= self.width - hotspot.width assert 0 <= y <= self.height - hotspot.height # TODO: should it check to see whether hotspots overlap each other? # Is sensible to _allow_ them to overlap? self._hotspots.append((hotspot, xy))
[docs] def remove_hotspot(self, hotspot, xy): """ Remove the hotspot at ``(x, y)``: Any previously rendered image where the hotspot was placed is erased from the backing image, and will be "undrawn" the next time the virtual device is refreshed. If the specified hotspot is not found for ``(x, y)``, a ``ValueError`` is raised. """ self._hotspots.remove((hotspot, xy)) eraser =, hotspot.size) self._backing_image.paste(eraser, xy)
[docs] def is_overlapping_viewport(self, hotspot, xy): """ Checks to see if the hotspot at position ``(x, y)`` is (at least partially) visible according to the position of the viewport. """ l1, t1, r1, b1 = calc_bounds(xy, hotspot) l2, t2, r2, b2 = calc_bounds(self._position, self._device) return range_overlap(l1, r1, l2, r2) and range_overlap(t1, b1, t2, b2)
[docs] def refresh(self): should_wait = False for hotspot, xy in self._hotspots: if hotspot.should_redraw() and self.is_overlapping_viewport(hotspot, xy): pool.add_task(hotspot.paste_into, self._backing_image, xy) should_wait = True if should_wait: pool.wait_completion() im = self._backing_image.crop(box=self._crop_box()) if self._dither: im = im.convert(self._device.mode) self._device.display(im)
def _crop_box(self): (left, top) = self._position right = left + self._device.width bottom = top + self._device.height assert 0 <= left <= right <= self.width assert 0 <= top <= bottom <= self.height return (left, top, right, bottom)
[docs]class hotspot(mixin.capabilities): """ A hotspot (`a place of more than usual interest, activity, or popularity`) is a live display which may be added to a virtual viewport - if the hotspot and the viewport are overlapping, then the :func:`update` method will be automatically invoked when the viewport is being refreshed or its position moved (such that an overlap occurs). You would either: * create a ``hotspot`` instance, suppling a render function (taking an :py:mod:`PIL.ImageDraw` object, ``width`` & ``height`` dimensions. The render function should draw within a bounding box of ``(0, 0, width, height)``, and render a full frame. * sub-class ``hotspot`` and override the :func:`should_redraw` and :func:`update` methods. This might be more useful for slow-changing values where it is not necessary to update every refresh cycle, or your implementation is stateful. """ def __init__(self, width, height, draw_fn=None): self.capabilities(width, height, rotate=0) # TODO: set mode? self._fn = draw_fn
[docs] def paste_into(self, image, xy): im =, self.size) draw = ImageDraw.Draw(im) self.update(draw) image.paste(im, xy) del draw
[docs] def should_redraw(self): """ Override this method to return true or false on some condition (possibly on last updated member variable) so that for slow changing hotspots they are not updated too frequently. """ return True
[docs] def update(self, draw): if self._fn: self._fn(draw, self.width, self.height)
[docs]class snapshot(hotspot): """ A snapshot is a `type of` hotspot, but only updates once in a given interval, usually much less frequently than the viewport requests refresh updates. """ def __init__(self, width, height, draw_fn=None, interval=1.0): assert interval > 0 super(snapshot, self).__init__(width, height, draw_fn) self.interval = interval self.last_updated = -interval
[docs] def should_redraw(self): """ Only requests a redraw after ``interval`` seconds have elapsed. """ return perf_counter() - self.last_updated > self.interval
[docs] def paste_into(self, image, xy): super(snapshot, self).paste_into(image, xy) self.last_updated = perf_counter()
[docs]class terminal(object): """ Provides a terminal-like interface to a device (or a device-like object that has :class:`mixin.capabilities` characteristics). """ def __init__(self, device, font=None, color="white", bgcolor="black", tabstop=4, line_height=None, animate=True, word_wrap=False): self._device = device self.font = font or ImageFont.load_default() self.default_fgcolor = color self.default_bgcolor = bgcolor self.animate = animate self.tabstop = tabstop self.word_wrap = word_wrap self._cw, self._ch = (0, 0) for i in range(32, 128): left, top, w, h = self.font.getbbox(chr(i)) self._cw = max(w, self._cw) self._ch = max(h, self._ch) self._ch = line_height or self._ch self.width = device.width // self._cw self.height = device.height // self._ch self.size = (self.width, self.height) self.reset() self._backing_image =, self._device.size, self._bgcolor) self._canvas = ImageDraw.Draw(self._backing_image) self.clear() if self.word_wrap: = TextWrapper() = self.width = False = False = False = True
[docs] def clear(self): """ Clears the display and resets the cursor position to ``(0, 0)``. """ self._cx, self._cy = (0, 0) self._canvas.rectangle(self._device.bounding_box, fill=self.default_bgcolor) self.flush()
[docs] def println(self, text=""): """ Prints the supplied text to the device, scrolling where necessary. The text is always followed by a newline. :param text: The text to print. :type text: str """ if self.word_wrap: # find directives in complete text directives = ansi_color.find_directives(text, self) # strip ansi from text clean_text = ansi_color.strip_ansi_codes(text) # wrap clean text clean_lines = # print wrapped text index = 0 for line in clean_lines: line_length = len(line) y = 0 while y < line_length: method, args = directives[index] if method == self.putch: y += 1 method(*args) index += 1 self.newline() else: self.puts(text) self.newline()
[docs] def puts(self, text): """ Prints the supplied text, handling special character codes for carriage return (\\r), newline (\\n), backspace (\\b) and tab (\\t). ANSI color codes are also supported. If the ``animate`` flag was set to True (default), then each character is flushed to the device, giving the effect of 1970's teletype device. :param text: The text to print. :type text: str """ for method, args in ansi_color.find_directives(text, self): method(*args)
[docs] def putch(self, char): """ Prints the specific character, which must be a valid printable ASCII value in the range 32..127 only, or one of carriage return (\\r), newline (\\n), backspace (\\b) or tab (\\t). :param char: The character to print. """ if char == '\r': self.carriage_return() elif char == '\n': self.newline() elif char == '\b': self.backspace() elif char == '\t': else: left, top, w, h = self.font.getbbox(char) if self._cx + w >= self._device.width: self.newline() self.erase() self._canvas.text((self._cx, self._cy), text=char, font=self.font, fill=self._fgcolor) self._cx += w if self.animate: self.flush()
[docs] def carriage_return(self): """ Returns the cursor position to the left-hand side without advancing downwards. """ self._cx = 0
[docs] def tab(self): """ Advances the cursor position to the next (soft) tabstop. """ soft_tabs = self.tabstop - ((self._cx // self._cw) % self.tabstop) for _ in range(soft_tabs): self.putch(" ")
[docs] def newline(self): """ Advances the cursor position ot the left hand side, and to the next line. If the cursor is on the lowest line, the displayed contents are scrolled, causing the top line to be lost. """ self.carriage_return() if self._cy + (2 * self._ch) >= self._device.height: # Simulate a vertical scroll copy = self._backing_image.crop((0, self._ch, self._device.width, self._device.height)) self._backing_image.paste(copy, (0, 0)) self._canvas.rectangle((0, copy.height, self._device.width, self._device.height), fill=self.default_bgcolor) else: self._cy += self._ch self.flush() if self.animate: sleep(0.2)
[docs] def backspace(self): """ Moves the cursor one place to the left, erasing the character at the current position. Cannot move beyond column zero, nor onto the previous line. """ if self._cx + self._cw >= 0: self.erase() self._cx -= self._cw self.flush()
[docs] def erase(self): """ Erase the contents of the cursor's current position without moving the cursor's position. """ bounds = (self._cx, self._cy, self._cx + self._cw, self._cy + self._ch) self._canvas.rectangle(bounds, fill=self._bgcolor)
[docs] def flush(self): """ Cause the current backing store to be rendered on the nominated device. """ self._device.display(self._backing_image)
[docs] def foreground_color(self, value): """ Sets the foreground color. :param value: The new color value, either string name or RGB tuple. :type value: str or tuple """ self._fgcolor = value
[docs] def background_color(self, value): """ Sets the background color. :param value: The new color value, either string name or RGB tuple. :type value: str or tuple """ self._bgcolor = value
[docs] def reset(self): """ Resets the foreground and background color value back to the original when initialised. """ self._fgcolor = self.default_fgcolor self._bgcolor = self.default_bgcolor
[docs] def reverse_colors(self): """ Flips the foreground and background colors. """ self._bgcolor, self._fgcolor = self._fgcolor, self._bgcolor
[docs]class history(mixin.capabilities): """ Wraps a device (or emulator) to provide a facility to be able to make a savepoint (a point at which the screen display can be "rolled-back" to). This is mostly useful for displaying transient error/dialog messages which could be subsequently dismissed, reverting back to the previous display. """ def __init__(self, device): self.capabilities(device.width, device.height, rotate=0, mode=device.mode) if hasattr(device, "segment_mapper"): self.segment_mapper = device.segment_mapper self._savepoints = [] self._device = device self._last_image = None
[docs] def display(self, image): self._last_image = image.copy() self._device.display(image)
[docs] def savepoint(self): """ Copies the last displayed image. """ if self._last_image: self._savepoints.append(self._last_image) self._last_image = None
[docs] def restore(self, drop=0): """ Restores the last savepoint. If ``drop`` is supplied and greater than zero, then that many savepoints are dropped, and the next savepoint is restored. :param drop: :type drop: int """ assert drop >= 0 while drop > 0: self._savepoints.pop() drop -= 1 img = self._savepoints.pop() self.display(img)
def __len__(self): """ Indication of the number of savepoints retained. """ return len(self._savepoints)
[docs]class sevensegment(object): """ Abstraction that wraps a device, this class provides a ``text`` property which can be used to set and get a text value, which when combined with a ``segment_mapper`` sets the correct bit representation for seven-segment displays and propagates that onto the underlying device. :param device: A device instance. :param segment_mapper: An optional function that maps strings into the correct representation for the 7-segment physical layout. If not provided, the default mapper from compatible devices is used instead. :param undefined: The default character to substitute when an unrenderable character is supplied to the text property. :type undefined: char """ def __init__(self, device, undefined="_", segment_mapper=None): self.device = device self.undefined = undefined self.segment_mapper = segment_mapper or device.segment_mapper self._bufsize = device.width * device.height // 8 self.text = "" @property def text(self): """ Returns the current state of the text buffer. This may not reflect accurately what is displayed on the seven-segment device, as certain alpha-numerics and most punctuation cannot be rendered on the limited display. """ return self._text_buffer @text.setter def text(self, value): """ Updates the seven-segment display with the given value. If there is not enough space to show the full text, an ``OverflowException`` is raised. :param value: The value to render onto the device. Any characters which cannot be rendered will be converted into the ``undefined`` character supplied in the constructor. :type value: str """ self._text_buffer = observable(mutable_string(value), observer=self._flush) def _flush(self, buf): data = bytearray(self.segment_mapper(buf, notfound=self.undefined) ).ljust(self._bufsize, b'\0') if len(data) > self._bufsize: raise OverflowError( f"Device's capabilities insufficient for value '{buf}'") with canvas(self.device) as draw: for x, byte in enumerate(reversed(data)): for y in range(8): if byte & 0x01: draw.point((x, y), fill="white") byte >>= 1
[docs]class character(object): """ Abstraction that wraps a device, this class provides a ``text`` property which can be used to set and get a text value allowing the device to be treated as a character style display such as the HD44780 LCD If the device is actually a character style device, be careful to provide a font that adheres to the pixel dimensions of the display. :param device: A device instance. :param font: The font to be used to paint the characters within the ``text`` property. If the device contains a font (e.g. hd44780, ws0010) it will be used as the default if no font is provided. :type font: `PIL.ImageFont` object :param undefined: The default character to substitute when an unrenderable character is supplied to the text property. :type undefined: char .. versionadded:: 1.15.0 """ def __init__(self, device, font=None, undefined="_"): self.device = device self._undefined = undefined self.font = font if font else device.font if hasattr(device, 'font') else None assert self.font, 'No font available' self.text = '' @property def text(self): """ Returns the current state of the text buffer. This may not reflect accurately what is displayed on the device if the font does not have a symbol for a requested text value. """ return self._text_buffer @text.setter def text(self, value): """ Updates the display with the given value. :param value: The value to render onto the device. Any characters which cannot be rendered will be converted into the ``undefined`` character supplied in the constructor. Newline characters '\n' work as expected but no other control characters (e.g. \r) are honored. :type value: str """ self._text_buffer = observable(mutable_string(value), observer=self._flush) def _flush(self, buf): # Replace any characters that are not in the font with the undefined character buf = ''.join([i if i == '\n' or self.font.getlength(i) > 0 else self._undefined for i in buf]) # Draw text onto display's image using the provided font with canvas(self.device) as draw: # Place text draw.text((0, 0), buf, fill='white', font=self.font, spacing=0)