Note that there are some explanatory texts on larger screens.

plurals
  1. PO
    text
    copied!<p>Here is my take on this problem. Requiring that the user scripts run inside vanilla CPython means you either need to write an interpreter for your mini language, or compile it to Python bytecode (or use Python as your source language) and then "sanitize" the bytecode before executing it.</p> <p>I've gone for a quick example based on the assumption that users can write their scripts in Python, and that the source and bytecode can be sufficiently sanitized through some combination of filtering unsafe syntax from the parse tree and/or removing unsafe opcodes from the bytecode.</p> <p>The second part of the solution requires that the user script bytecode be periodically interrupted by a watchdog task which will ensure that the user script does not exceed some opcode limit, and for all of this to run on vanilla CPython.</p> <p>Summary of my attempt, which mostly focuses on the 2nd part of the problem.</p> <ul> <li>User scripts are written in Python.</li> <li>Use <strong>byteplay</strong> to filter and modify the bytecode.</li> <li>Instrument the user's bytecode to insert an opcode counter and calls to a function which context switches to the watchdog task.</li> <li>Use <strong>greenlet</strong> to execute the user's bytecode, with yields switching between the user's script and the watchdog coroutine.</li> <li>The watchdog enforces a preset limit on the number of opcodes which can be executed before raising an error.</li> </ul> <p>Hopefully this at least goes in the right direction. I'm interested to hear more about your solution when you arrive at it.</p> <p>Source code for <code>lowperf.py</code>:</p> <pre><code># std import ast import dis import sys from pprint import pprint # vendor import byteplay import greenlet # bytecode snippet to increment our global opcode counter INCREMENT = [ (byteplay.LOAD_GLOBAL, '__op_counter'), (byteplay.LOAD_CONST, 1), (byteplay.INPLACE_ADD, None), (byteplay.STORE_GLOBAL, '__op_counter') ] # bytecode snippet to perform a yield to our watchdog tasklet. YIELD = [ (byteplay.LOAD_GLOBAL, '__yield'), (byteplay.LOAD_GLOBAL, '__op_counter'), (byteplay.CALL_FUNCTION, 1), (byteplay.POP_TOP, None) ] def instrument(orig): """ Instrument bytecode. We place a call to our yield function before jumps and returns. You could choose alternate places depending on your use case. """ line_count = 0 res = [] for op, arg in orig.code: line_count += 1 # NOTE: you could put an advanced bytecode filter here. # whenever a code block is loaded we must instrument it if op == byteplay.LOAD_CONST and isinstance(arg, byteplay.Code): code = instrument(arg) res.append((op, code)) continue # 'setlineno' opcode is a safe place to increment our global # opcode counter. if op == byteplay.SetLineno: res += INCREMENT line_count += 1 # append the opcode and its argument res.append((op, arg)) # if we're at a jump or return, or we've processed 10 lines of # source code, insert a call to our yield function. you could # choose other places to yield more appropriate for your app. if op in (byteplay.JUMP_ABSOLUTE, byteplay.RETURN_VALUE) \ or line_count &gt; 10: res += YIELD line_count = 0 # finally, build and return new code object return byteplay.Code(res, orig.freevars, orig.args, orig.varargs, orig.varkwargs, orig.newlocals, orig.name, orig.filename, orig.firstlineno, orig.docstring) def transform(path): """ Transform the Python source into a form safe to execute and return the bytecode. """ # NOTE: you could call ast.parse(data, path) here to get an # abstract syntax tree, then filter that tree down before compiling # it into bytecode. i've skipped that step as it is pretty verbose. data = open(path, 'rb').read() suite = compile(data, path, 'exec') orig = byteplay.Code.from_code(suite) return instrument(orig) def execute(path, limit = 40): """ This transforms the user's source code into bytecode, instrumenting it, then kicks off the watchdog and user script tasklets. """ code = transform(path) target = greenlet.greenlet(run_task) def watcher_task(op_count): """ Task which is yielded to by the user script, making sure it doesn't use too many resources. """ while 1: if op_count &gt; limit: raise RuntimeError("script used too many resources") op_count = target.switch() watcher = greenlet.greenlet(watcher_task) target.switch(code, watcher.switch) def run_task(code, yield_func): "This is the greenlet task which runs our user's script." globals_ = {'__yield': yield_func, '__op_counter': 0} eval(code.to_code(), globals_, globals_) execute(sys.argv[1]) </code></pre> <p>Here is a sample user script <code>user.py</code>:</p> <pre><code>def otherfunc(b): return b * 7 def myfunc(a): for i in range(0, 20): print i, otherfunc(i + a + 3) myfunc(2) </code></pre> <p>Here is a sample run:</p> <pre><code>% python lowperf.py user.py 0 35 1 42 2 49 3 56 4 63 5 70 6 77 7 84 8 91 9 98 10 105 11 112 Traceback (most recent call last): File "lowperf.py", line 114, in &lt;module&gt; execute(sys.argv[1]) File "lowperf.py", line 105, in execute target.switch(code, watcher.switch) File "lowperf.py", line 101, in watcher_task raise RuntimeError("script used too many resources") RuntimeError: script used too many resources </code></pre>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload