from meshtastic import logging, os from Filters.base import Filter from Siph.router import Router from webui.serve import Server from Transmission.transmission import Transmitter import asyncio import sys import time import psutil import logging import datetime from Cryptography.DHEFern import DHEFern from microdot import Request import traceback from ui import TUI import threading, os if __name__ == "__main__": # Global objects for the PierMesh service and the TUI so we can terminate the associated processes later global nodeOb, tuiOb tuiOb = None nodeOb = None # Enable 500 kB files in the webui Request.max_content_length = 1024 * 1024 * 0.5 Request.max_body_length = 1024 * 1024 * 0.5 # Pull startup parameters device, webPort, serverInfoFile, delay, nodeNickname = sys.argv[1:] # Set up file based logging logPath = "logs" fileName = datetime.datetime.now().strftime("%m%d%Y_%H%M%S") logFormat = "%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s" logging.basicConfig( format=logFormat, level=logging.INFO, filename="{0}/{2}_{1}.log".format(logPath, fileName, nodeNickname), ) class Node: """ Class that handles most of the PierMesh data `🔗 Source `_ """ def __init__(self): self.toLog = [] """ We store logs to be processed here See Also -------- logPassLoop: Loop to handle logging to file and TUI """ actionsList = [f for f in dir(self) if "action" in f] self.actions = {} for a in actionsList: self.actions[a.split("_")[1]] = getattr(self, a) self.cLog(20, "Past action mapping") self.r = Router(self.cLog, nfpath=serverInfoFile) self.cLog(20, "Router initialized") self.onodeID = str(self.r.serverInfo.get()["nodeID"]) self.catch = self.r.c self.cache = self.r.cache self.s = None self.todo = [] self.f = Filter(self.cache, self.onodeID, self.todo, self.cLog) self.cLog(20, "Filter initialized") # self.cLog(30, sys.argv) self.t = None self.cLog(20, "Cryptography initializing") self.cryptographyInfo = DHEFern(self.cache, nodeNickname, self.cLog) self.cLog(20, "Cryptography initialized") self.processed = [] self.proc = psutil.Process(os.getpid()) self.mTasks = {} def cLog(self, priority: int, message: str): """ Convenience function that logs to the ui and log files Parameters ---------- priority: int Priority of message to be passed to logging message: str Message to log Returns ------- None """ logging.log(priority, message) self.toLog.append("[{0}]:\n{1}".format(datetime.datetime.now(), message)) async def monitor(self): """ Monitor and log ram and cpu usage """ while True: if tuiOb != None: if tuiOb.done: print("Terminating PierMesh service...") self.proc.terminate() await asyncio.sleep(10) memmb = self.proc.memory_info().rss / (1024 * 1024) memmb = round(memmb, 2) cpup = self.proc.cpu_percent(interval=1) self.cLog( 20, " MEM: {0} mB | CPU: {1} %".format( memmb, cpup, ), ) # Set cpu and memory usage in the TUI tuiOb.do_set_cpu_percent(float(cpup)) tuiOb.do_set_mem(memmb) async def fListen(self): """ Loop to watch for tasks to do See Also -------- Filters.base.sieve: Packet filtering/parsing Notes ----- We use a common technique here that calls the function from our preloaded actions via dictionary entry """ while True: while len(self.todo) >= 1: todoNow = self.todo.pop() action = todoNow["action"] self.cLog(20, "Action: " + action) data = todoNow["data"] await self.actions[action](data) await asyncio.sleep(1) async def action_sendToPeer(self, data: dict): """ Send data to a peer connected to the server Parameters ---------- data: dict Data passed from the filter, this is a generic object so it's similar on all actions here See Also -------- Filters.Protocols: Protocol based packet filtering webui.serve.Server: Runs a light Microdot web server with http/s and websocket functionality webui.serve.Server.sendToPeer: Function to actually execute the action """ self.s.sendToPeer(data["recipient"], data["res"]) async def action_sendCatch(self, data: dict): """ Get catch and return the data to a peer See Also -------- Bubble.router.Router: Routing class """ res = self.r.getCatch(data["head"], data["body"], fins=data["fins"]) self.s.sendToPeer(data["recipient"], res) async def action_map(self, data: dict): """ Map new network data to internal network map See Also -------- Bubble.network.Network: Layered graph etwork representation """ self.r.n.addLookup(data["onodeID"], data["mnodeID"]) self.cLog(20, "Lookup addition done") self.r.n.addon(data["onodeID"]) async def action_initNodeDH(self, data: dict): """ Initialize diffie hellman key exchange See Also -------- Cryptography.DHEFern.DHEFern: End to end encryption functionality """ if self.cryptographyInfo.getRecord("key", data["onodeID"]) == False: await self.t.initNodeDH( self.cryptographyInfo, int(data["mnodeID"]), data["onodeID"] ) async def action_keyDeriveDH(self, data: dict): """ Derive key via diffie hellman key exchange """ try: self.cryptographyInfo.keyDerive( data["publicKey"], self.cryptographyInfo.getSalt(), data["recipientNode"], data["params"], ) except: self.cLog(30, traceback.format_exc()) async def logPassLoop(): """ Loop to pass logs up to the TUI See Also -------- tui.TUI: TUI implementation """ global tuiOb, nodeOb while True: await asyncio.sleep(1) if tuiOb == None or nodeOb == None: await asyncio.sleep(1) elif tuiOb.done == True: print("Terminating PierMesh service...") nodeOb.proc.terminate() else: ctoLog = [l for l in nodeOb.toLog] for l in ctoLog: tuiOb.do_write_line(l) nodeOb.toLog.pop() async def main(): """ Main method for running the PierMesh service """ global nodeOb try: n = Node() nodeOb = n nodeOb.cLog(20, "Starting up") nodeOb.cLog(20, "Staggering {0} seconds, please wait".format(sys.argv[4])) time.sleep(int(sys.argv[4])) n.t = Transmitter( sys.argv[1], n.f, n.onodeID, n.cache, n.catch, n.cryptographyInfo, n.cLog ) n.s = Server(n.t, n.catch, n.onodeID, n.r.n, n.cLog) n.mTasks["list"] = asyncio.create_task(n.fListen()) await asyncio.sleep(1) n.mTasks["pct"] = asyncio.create_task(n.t.progressCheck()) await asyncio.sleep(1) n.mTasks["mon"] = asyncio.create_task(n.monitor()) await asyncio.sleep(1) n.mTasks["announce"] = asyncio.create_task(n.t.announce()) await asyncio.sleep(1) await n.s.app.start_server(port=int(sys.argv[2]), debug=True) except KeyboardInterrupt: sys.exit() if __name__ == "__main__": try: t = threading.Thread(target=asyncio.run, args=(main(),)) t.start() lplt = threading.Thread(target=asyncio.run, args=(logPassLoop(),)) lplt.start() tuiOb = TUI() tuiOb.nodeOb = nodeOb tuiOb.run() except KeyboardInterrupt: nodeOb.cLog(30, "Terminating PierMesh service...") except Exception: nodeOb.cLog(30, sys.gettrace())