#!/usr/bin/env python3 import pygments from pygments import highlight from pygments.formatters import HtmlFormatter from pygments.lexers import guess_lexer, guess_lexer_for_filename from pygments.lexers.special import TextLexer from pygments.util import ClassNotFound def do_highlight(filename, data, formatter): try: lexer = guess_lexer_for_filename(filename, data) except ClassNotFound: try: lexer = guess_lexer(data) # SqlLexer always gives 0.01 if lexer.analyse_text(data) <= 0.01: lexer = TextLexer() except ClassNotFound: lexer = TextLexer() return ''.join([ f'', '', highlight(data, lexer, formatter) ]) def parse_args(): import argparse parser = argparse.ArgumentParser(description='syntax highlighting server', formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--host', type=str, default='127.0.0.1', help='the host to listen on') parser.add_argument('--port', type=int, default=4872, help='the port to listen on') parser.add_argument('--style', type=str, default='pastie', help='pygments formatting style') parser.add_argument('--preload', type=bool, default=True, help='preload lexers to reduce fork memory usage') parser.add_argument('--max-workers', type=int, default=0, help='number of workers, 0 is one per cpu') return parser.parse_args() async def handle_highlight(request): import asyncio from aiohttp import web loop = asyncio.get_running_loop() text = await request.text() result = await loop.run_in_executor( request.app['pool'], do_highlight, request.query['filename'], text, request.app['formatter']) return web.Response(text=result) def run(args, pool=None): from aiohttp import web app = web.Application() app['pool'] = pool app['formatter'] = HtmlFormatter(style=args.style, nobackground=True) app.add_routes([web.post('/highlight', handle_highlight)]) web.run_app(app, host=args.host, port=args.port) def main(): args = parse_args() if args.preload: guess_lexer('') if args.max_workers == 0: max_workers = None else: max_workers = args.max_workers from concurrent.futures import ProcessPoolExecutor with ProcessPoolExecutor(max_workers=max_workers) as pool: run(args, pool) if __name__ == '__main__': main()