Coverage for src/ensae_teaching_cs/td_1a/flask_helper.py: 56%
27 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-18 22:15 +0100
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-18 22:15 +0100
1# -*- coding: utf-8 -*-
2"""
3@file
4@brief Helpers for :epkg:`Flask`.
5"""
6import ctypes
7import traceback
8import threading
9from flask import Response
12def Text2Response(text):
13 """
14 Converts a text into plain text.
16 @param text text to convert
17 @return textReponse
18 """
19 return Response(text, mimetype='text/plain')
22def Exception2Response(e):
23 """
24 Converts an exception into plain text and display the stack trace.
26 @param e Exception
27 @return textReponse
28 """
29 text = traceback.format_exc()
30 return Text2Response(f"Exception: {str(e)}\nSTACK:\n{text}")
33class FlaskInThread(threading.Thread):
35 """
36 Defines a thread for the server.
38 :param app: :epkg:`Flask` application
39 """
41 def __init__(self, app, host="localhost", port=8081, debug=False):
42 threading.Thread.__init__(self)
43 self._app = app
44 self._host = host
45 self._port = port
46 self.daemon = True
47 self.debug = debug
49 def run(self):
50 """
51 Starts the server.
52 """
53 self._app.run(host=self._host, port=self._port, debug=self.debug)
55 def shutdown(self):
56 """
57 Shuts down the server, the function could work if:
59 * method run keeps a pointer on a server instance
60 (the one owning method
61 `serve_forever
62 <https://docs.python.org/3/library/socketserver.html#socketserver.BaseServer.serve_forever>`_)
63 * module `werkzeug <http://werkzeug.pocoo.org/>`_
64 returns this instance in function
65 `serving.run_simple
66 <https://github.com/mitsuhiko/werkzeug/blob/master/werkzeug/serving.py>`_
67 * module `Flask <http://flask.pocoo.org/>`_
68 returns this instance in method
69 `app.Flask.run
70 <https://github.com/mitsuhiko/flask/blob/master/flask/app.py>`_
71 """
72 raise NotImplementedError()
73 # self.server.shutdown()
74 # self.server.server_close()
76 def raise_exception(self):
77 thread_id = self.native_id
78 res = ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id,
79 ctypes.py_object(SystemExit))
80 if res > 1:
81 ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, 0)