#!/usr/bin/env python3 import logging import re import os import signal import sys from itertools import chain import multiprocessing from multiprocessing import Pool from multiprocessing.util import Finalize from pathlib import Path from urllib.parse import urlparse from fontTools.subset import Options, Subsetter, load_font from selenium import webdriver from config import config logging.basicConfig(format='[%(relativeCreated)d] %(message)s') logger = logging.getLogger('websubset') logger.setLevel(logging.INFO) options = Options() options.parse_opts(config['ftsubset_options']) if options.with_zopfli: from fontTools.ttLib import sfnt sfnt.USE_ZOPFLI = True if config['screenshots']: from io import BytesIO from PIL import Image, ImageChops def gen_font_face(font): if 'fontfile' not in font: return '' return ''.join([ '@font-face{', 'font-family:"', font['family'], '";', 'font-weight:', font['weight'], ';', 'font-style:', font['style'], ';', 'src: url("', font["fontfile"], '") format("opentype");', '}']) REPLACE_FONTS_SCRIPT = ''.join([ "let style = document.createElement('style'); style.innerHTML = '", ''.join(gen_font_face(font) for font in config['fonts']), "'; document.body.appendChild(style);"]) EXTRACT_SCRIPT = r''' let whitelist = new Set(arguments[0]); let walker = document.createTreeWalker(document.body, NodeFilter.SHOW_TEXT); let node, dict = new Map(); while (node = walker.nextNode()) { let cs = getComputedStyle(node.parentNode); let css = k => cs.getPropertyValue(k); if (css('display') == 'none') continue; let k = css('font-family').replace(/"/g, '').replace(/,.*/, '') + ';' + css('font-weight') + ';' + css('font-style'); if (!whitelist.has(k)) continue; const t = node.nodeValue; if (dict.has(k)) for (let c of t) dict.get(k).add(c); else dict.set(k, new Set(t)); } let dict2 = {}; dict.forEach((v, k) => dict2[k] = [...v.keys()].join('')); return dict2; ''' DRIVER = None def stop_driver(): global DRIVER if DRIVER: DRIVER.quit() DRIVER = None def hook_sys(name): orig_hook = getattr(sys, name) def my_hook(*args, **kwargs): stop_driver() orig_hook(*args, **kwargs) setattr(sys, name, my_hook) def hook_sig(signum): orig_handler = signal.getsignal(signum) if orig_handler is None: raise Exception('{signum} handler is None') def term_handler(*_): stop_driver() signal.signal(signum, orig_handler) os.kill(os.getpid(), signum) signal.signal(signum, term_handler) def start_wworker(driver_name): hook_sig(signal.SIGTERM) global DRIVER if driver_name == 'chrome': chrome_options = webdriver.chrome.options.Options() chrome_options.headless = True chrome_options.experimental_options["prefs"] = { "profile.default_content_setting_values.images": 2 } DRIVER = webdriver.Chrome(options=chrome_options, desired_capabilities={'detach': True}) elif driver_name == 'firefox': firefox_profile = webdriver.FirefoxProfile() firefox_profile.set_preference('permissions.default.image', 2) firefox_options = webdriver.firefox.options.Options() firefox_options.headless = True DRIVER = webdriver.Firefox(firefox_profile=firefox_profile, options=firefox_options) else: raise Exception('unknown driver name') Finalize(DRIVER, stop_driver, exitpriority=16) def is_uri(path): parsed = urlparse(path) return parsed.scheme and parsed.netloc def make_uri(path): if is_uri(path): return path else: return Path(path).resolve().as_uri() def extract(path, whitelist, screenshots): logger.info('fetching %s', path) DRIVER.get(make_uri(path)) if screenshots: logger.info('replacing fonts for %s', path) DRIVER.execute_script(REPLACE_FONTS_SCRIPT) logger.info('taking pre-screenshot for %s', path) height = DRIVER.execute_script('return document.body.parentNode.scrollHeight') DRIVER.set_window_size(2000, height) screenshot = DRIVER.find_element_by_tag_name('body').screenshot_as_png else: screenshot = None logger.info('extracting text from %s', path) return (path, DRIVER.execute_script(EXTRACT_SCRIPT, whitelist), screenshot) def get_fontdesc(fonts, fontspec): font_match = dict(zip(('family', 'weight', 'style'), fontspec.split(';'))) for font in fonts: if font_match.items() <= font.items(): return font return None def subset(fontdesc, text, fts_opts): fontfile = fontdesc['fontfile'] logger.info('subsetting %s', fontfile) font = load_font(fontfile, fts_opts, dontLoadGlyphNames=True) subsetter = Subsetter(options=fts_opts) if 'extratext' in fontdesc: text += fontdesc['extratext'] subsetter.populate(text=text) subsetter.subset(font) ret = [] for flavor in ['woff', 'woff2']: if 'outfile' in fontdesc and flavor in fontdesc['outfile']: outfile = fontdesc['outfile'][flavor] else: outfile = re.sub(r'\.otf$', f'.subset.{flavor}', fontfile) if outfile == fontfile: raise Exception('cannot overwrite font file') ret.append((font, flavor, outfile)) return ret def write_subset(font, flavor, outfile): logger.info('writing %s', outfile) font.flavor = flavor font.save(outfile) def verify(path, screenshot_begin_png): logger.info('refetching %s', path) DRIVER.get(make_uri(path)) logger.info('taking post-screenshot for %s', path) height = DRIVER.execute_script('return document.body.parentNode.scrollHeight') DRIVER.set_window_size(2000, height) screenshot_end_png = DRIVER.find_element_by_tag_name('body').screenshot_as_png logger.info('checking screenshot for %s', path) screenshot_begin = Image.open(BytesIO(screenshot_begin_png)) screenshot_end = Image.open(BytesIO(screenshot_end_png)) if ImageChops.difference(screenshot_begin, screenshot_end).getbbox(): raise Exception(f'screenshots do not match for {path}') class LocalPool: def __init__(self, driver_name): start_wworker(driver_name) def starmap(self, func, args, *_): return [func(*arg) for arg in args] def close(self): stop_driver() def main(argv): ncpus = len(os.sched_getaffinity(0)) nfontfiles = sum('fontfile' in font for font in config['fonts']) nwworkers = min(len(argv) - 1, ncpus) nfworkers = min(nfontfiles * 2, ncpus) logger.info('starting %d web workers, %d font workers', nwworkers, nfworkers) with Pool(nfworkers) as fpool, \ Pool(nwworkers, start_wworker, (config['selenium_driver_name'],)) as wpool: all_font_texts = {} screenshots = [] whitelist = [ ';'.join((f['family'], f['weight'], f['style'])) for f in config['fonts']] extract_args = ((arg, whitelist, config['screenshots']) for arg in sys.argv[1:]) extracted = wpool.starmap(extract, extract_args) for path, font_texts, screenshot in extracted: if config['screenshots']: screenshots.append((path, screenshot)) for fontspec, text in font_texts.items(): if fontspec in all_font_texts: all_font_texts[fontspec] |= set(text) else: all_font_texts[fontspec] = set(text) if not config['screenshots']: logger.info('shutting down web workers early') wpool.close() subset_args = ( (get_fontdesc(config['fonts'], fontspec), ''.join(text), options) for fontspec, text in all_font_texts.items()) subsetted = fpool.starmap(subset, subset_args) fpool.starmap(write_subset, chain(*subsetted)) if config['screenshots']: wpool.starmap(verify, screenshots, 1) if multiprocessing.active_children(): logger.info('waiting for workers') for proc in multiprocessing.active_children(): proc.join() logger.info('exiting successfully') if __name__ == '__main__': main(sys.argv)