#!/usr/bin/env python3 import asyncio import argparse import gc import os import pygments from aiohttp import web from concurrent.futures import ProcessPoolExecutor from pygments import highlight from pygments.formatters import HtmlFormatter from pygments.lexers import guess_lexer, guess_lexer_for_filename from pygments.lexers.special import TextLexer from pygments.util import ClassNotFound def do_highlight(filename, data, formatter): try: lexer = guess_lexer_for_filename(filename, data) except ClassNotFound: try: lexer = guess_lexer(data) # SqlLexer always gives 0.01 if lexer.analyse_text(data) <= 0.01: lexer = TextLexer() except ClassNotFound: lexer = TextLexer() return ''.join([ f'', '', highlight(data, lexer, formatter) ]) def parse_args(): parser = argparse.ArgumentParser(description='syntax highlighting server', formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--host', type=str, default='127.0.0.1', help='the host to listen on') parser.add_argument('--port', type=int, default=4872, help='the port to listen on') parser.add_argument('--style', type=str, default='pastie', help='pygments formatting style') parser.add_argument('--preload', type=bool, default=True, help='preload lexers to reduce fork memory usage') parser.add_argument('--workers', type=int, default=0, help='number of workers, 0 is one per cpu') return parser.parse_args() async def handle_highlight(request): loop = asyncio.get_running_loop() text = await request.text() result = await loop.run_in_executor( request.app['pool'], do_highlight, request.query['filename'], text, request.app['formatter']) return web.Response(text=result) def run(args, pool=None): app = web.Application() app['pool'] = pool app['formatter'] = HtmlFormatter(style=args.style, nobackground=True) app.add_routes([web.post('/highlight', handle_highlight)]) web.run_app(app, host=args.host, port=args.port) def noop(*args, **kws): pass def main(): args = parse_args() if args.preload: guess_lexer('') if args.workers == 0: workers = len(os.sched_getaffinity(0)) else: workers = args.workers with ProcessPoolExecutor(max_workers=workers) as pool: gc.collect() gc.freeze() pool.map(noop, [None] * workers) run(args, pool) if __name__ == '__main__': main()