1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
#!/usr/bin/env python3
import asyncio
import argparse
import gc
import os
import pygments
from aiohttp import web
from concurrent.futures import ProcessPoolExecutor
from pygments import highlight
from pygments.formatters import HtmlFormatter
from pygments.lexers import guess_lexer, guess_lexer_for_filename
from pygments.lexers.special import TextLexer
from pygments.util import ClassNotFound
def do_highlight(filename, data, formatter):
try:
lexer = guess_lexer_for_filename(filename, data)
except ClassNotFound:
try:
lexer = guess_lexer(data)
# SqlLexer always gives 0.01
if lexer.analyse_text(data) <= 0.01:
lexer = TextLexer()
except ClassNotFound:
lexer = TextLexer()
return ''.join([
f'<!-- Pygments {pygments.__version__}: {lexer.name} ({lexer.__class__.__name__}) -->',
'<style>', formatter.get_style_defs('.highlight'), '</style>',
highlight(data, lexer, formatter)
])
def parse_args():
parser = argparse.ArgumentParser(description='syntax highlighting server',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--host', type=str, default='127.0.0.1',
help='the host to listen on')
parser.add_argument('--port', type=int, default=4872,
help='the port to listen on')
parser.add_argument('--style', type=str, default='pastie',
help='pygments formatting style')
parser.add_argument('--preload', type=bool, default=True,
help='preload lexers to reduce fork memory usage')
parser.add_argument('--workers', type=int, default=0,
help='number of workers, 0 is one per cpu')
return parser.parse_args()
async def handle_highlight(request):
loop = asyncio.get_running_loop()
text = await request.text()
result = await loop.run_in_executor(
request.app['pool'], do_highlight,
request.query['filename'], text, request.app['formatter'])
return web.Response(text=result)
def run(args, pool=None):
app = web.Application()
app['pool'] = pool
app['formatter'] = HtmlFormatter(style=args.style, nobackground=True)
app.add_routes([web.post('/highlight', handle_highlight)])
web.run_app(app, host=args.host, port=args.port)
def noop(*args, **kws):
pass
def main():
args = parse_args()
if args.preload:
guess_lexer('')
if args.workers == 0:
workers = len(os.sched_getaffinity(0))
else:
workers = args.workers
if workers == 1:
run(args)
else:
with ProcessPoolExecutor(max_workers=workers) as pool:
gc.collect()
gc.freeze()
pool.map(noop, [None] * workers)
run(args, pool)
if __name__ == '__main__':
main()
|