piermesh/src/run.py

281 lines
8.4 KiB
Python
Raw Normal View History

2024-07-28 11:21:15 +00:00
from meshtastic import logging, os
from Filters.base import Filter
from Bubble.router import Router
from webui.serve import Server
from Transmission.transmission import Transmitter
import asyncio
import sys
import time
import psutil
import logging
import datetime
from Cryptography.DHEFern import DHEFern
from microdot import Request
import traceback
from ui import TUI
import threading, os
if __name__ == "__main__":
# Global objects for the PierMesh service and the TUI so we can terminate the associated processes later
global nodeOb, tuiOb
tuiOb = None
nodeOb = None
# Enable 500 kB files in the webui
Request.max_content_length = 1024 * 1024 * 0.5
Request.max_body_length = 1024 * 1024 * 0.5
# Pull startup parameters
device, webPort, serverInfoFile, delay, nodeNickname = sys.argv[1:]
# Set up file based logging
logPath = "logs"
fileName = datetime.datetime.now().strftime("%m%d%Y_%H%M%S")
logFormat = "%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s"
logging.basicConfig(
format=logFormat,
level=logging.INFO,
filename="{0}/{2}_{1}.log".format(logPath, fileName, nodeNickname),
)
class Node:
"""
Class that handles most of the PierMesh data
`🔗 Source <https://git.utopic.work/PierMesh/piermesh/src/branch/main/src/run.py>`_
"""
def __init__(self):
self.toLog = []
"""
We store logs to be processed here
See Also
--------
logPassLoop: Loop to handle logging to file and TUI
"""
actionsList = [f for f in dir(self) if "action" in f]
self.actions = {}
for a in actionsList:
self.actions[a.split("_")[1]] = getattr(self, a)
self.cLog(20, "Past action mapping")
self.r = Router(self.cLog, nfpath=serverInfoFile)
self.cLog(20, "Router initialized")
self.onodeID = str(self.r.serverInfo.get()["nodeID"])
self.catch = self.r.c
self.cache = self.r.cache
self.s = None
self.todo = []
self.f = Filter(self.cache, self.onodeID, self.todo, self.cLog)
self.cLog(20, "Filter initialized")
# self.cLog(30, sys.argv)
self.t = None
self.cLog(20, "Cryptography initializing")
self.cryptographyInfo = DHEFern(self.cache, nodeNickname, self.cLog)
self.cLog(20, "Cryptography initialized")
self.processed = []
self.proc = psutil.Process(os.getpid())
self.mTasks = {}
def cLog(self, priority: int, message: str):
"""
Convenience function that logs to the ui and log files
Parameters
----------
priority: int
Priority of message to be passed to logging
message: str
Message to log
Returns
-------
None
"""
logging.log(priority, message)
self.toLog.append("[{0}]:\n{1}".format(datetime.datetime.now(), message))
async def monitor(self):
"""
Monitor and log ram and cpu usage
"""
while True:
if tuiOb != None:
if tuiOb.done:
print("Terminating PierMesh service...")
self.proc.terminate()
await asyncio.sleep(10)
memmb = self.proc.memory_info().rss / (1024 * 1024)
memmb = round(memmb, 2)
cpup = self.proc.cpu_percent(interval=1)
self.cLog(
20,
" MEM: {0} mB | CPU: {1} %".format(
memmb,
cpup,
),
)
# Set cpu and memory usage in the TUI
tuiOb.do_set_cpu_percent(float(cpup))
tuiOb.do_set_mem(memmb)
async def fListen(self):
"""
Loop to watch for tasks to do
See Also
--------
Filters.base.sieve: Packet filtering/parsing
Notes
-----
We use a common technique here that calls the function from our preloaded actions via dictionary entry
"""
while True:
while len(self.todo) >= 1:
todoNow = self.todo.pop()
action = todoNow["action"]
self.cLog(20, "Action: " + action)
data = todoNow["data"]
await self.actions[action](data)
await asyncio.sleep(1)
async def action_sendToPeer(self, data: dict):
"""
Send data to a peer connected to the server
Parameters
----------
data: dict
Data passed from the filter, this is a generic object so it's similar on all actions here
See Also
--------
Filters.Protocols: Protocol based packet filtering
webui.serve.Server: Runs a light Microdot web server with http/s and websocket functionality
webui.serve.Server.sendToPeer: Function to actually execute the action
"""
self.s.sendToPeer(data["recipient"], data["res"])
async def action_sendCatch(self, data: dict):
"""
Get catch and return the data to a peer
See Also
--------
Bubble.router.Router: Routing class
"""
res = self.r.getCatch(data["head"], data["body"], fins=data["fins"])
self.s.sendToPeer(data["recipient"], res)
async def action_map(self, data: dict):
"""
Map new network data to internal network map
See Also
--------
Bubble.network.Network: Layered graph etwork representation
"""
self.r.n.addLookup(data["onodeID"], data["mnodeID"])
self.cLog(20, "Lookup addition done")
self.r.n.addon(data["onodeID"])
async def action_initNodeDH(self, data: dict):
"""
Initialize diffie hellman key exchange
See Also
--------
Cryptography.DHEFern.DHEFern: End to end encryption functionality
"""
if self.cryptographyInfo.getRecord("key", data["onodeID"]) == False:
await self.t.initNodeDH(
self.cryptographyInfo, int(data["mnodeID"]), data["onodeID"]
)
async def action_keyDeriveDH(self, data: dict):
"""
Derive key via diffie hellman key exchange
"""
try:
self.cryptographyInfo.keyDerive(
data["publicKey"],
self.cryptographyInfo.getSalt(),
data["recipientNode"],
data["params"],
)
except:
self.cLog(30, traceback.format_exc())
async def logPassLoop():
"""
Loop to pass logs up to the TUI
See Also
--------
tui.TUI: TUI implementation
"""
global tuiOb, nodeOb
while True:
await asyncio.sleep(1)
if tuiOb == None or nodeOb == None:
await asyncio.sleep(1)
elif tuiOb.done == True:
print("Terminating PierMesh service...")
nodeOb.proc.terminate()
else:
ctoLog = [l for l in nodeOb.toLog]
for l in ctoLog:
tuiOb.do_write_line(l)
nodeOb.toLog.pop()
async def main():
"""
Main method for running the PierMesh service
"""
global nodeOb
try:
n = Node()
nodeOb = n
nodeOb.cLog(20, "Starting up")
nodeOb.cLog(20, "Staggering {0} seconds, please wait".format(sys.argv[4]))
time.sleep(int(sys.argv[4]))
n.t = Transmitter(
sys.argv[1], n.f, n.onodeID, n.cache, n.catch, n.cryptographyInfo, n.cLog
)
n.s = Server(n.t, n.catch, n.onodeID, n.r.n, n.cLog)
n.mTasks["list"] = asyncio.create_task(n.fListen())
await asyncio.sleep(1)
n.mTasks["pct"] = asyncio.create_task(n.t.progressCheck())
await asyncio.sleep(1)
n.mTasks["mon"] = asyncio.create_task(n.monitor())
await asyncio.sleep(1)
n.mTasks["announce"] = asyncio.create_task(n.t.announce())
await asyncio.sleep(1)
await n.s.app.start_server(port=int(sys.argv[2]), debug=True)
except KeyboardInterrupt:
sys.exit()
if __name__ == "__main__":
try:
t = threading.Thread(target=asyncio.run, args=(main(),))
t.start()
lplt = threading.Thread(target=asyncio.run, args=(logPassLoop(),))
lplt.start()
tuiOb = TUI()
tuiOb.nodeOb = nodeOb
tuiOb.run()
except KeyboardInterrupt:
nodeOb.cLog(30, "Terminating PierMesh service...")
except Exception:
nodeOb.cLog(30, sys.gettrace())