piermesh/src/run.py

304 lines
9.2 KiB
Python
Raw Normal View History

2024-07-29 03:47:27 +00:00
# PierMesh libraries
from Sponge.base import Filter
from Siph.map import Network
from Components.daisy import Catch
from Components.daisy import Cache
2024-07-28 11:21:15 +00:00
from webui.serve import Server
from Transmission.transmission import Transmitter
2024-07-29 03:47:27 +00:00
from Cryptography.DHEFern import DHEFern
from ui import TUI
import logging
import os
2024-07-28 11:21:15 +00:00
import asyncio
import sys
import time
import psutil
import datetime
import traceback
2024-07-29 03:47:27 +00:00
import threading
import random
from microdot import Request
2024-07-28 11:21:15 +00:00
if __name__ == "__main__":
# Global objects for the PierMesh service and the TUI so we can terminate the associated processes later
global nodeOb, tuiOb
tuiOb = None
nodeOb = None
# Enable 500 kB files in the webui
Request.max_content_length = 1024 * 1024 * 0.5
Request.max_body_length = 1024 * 1024 * 0.5
# Pull startup parameters
device, webPort, serverInfoFile, delay, nodeNickname = sys.argv[1:]
# Set up file based logging
logPath = "logs"
fileName = datetime.datetime.now().strftime("%m%d%Y_%H%M%S")
logFormat = "%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s"
logging.basicConfig(
format=logFormat,
level=logging.INFO,
filename="{0}/{2}_{1}.log".format(logPath, fileName, nodeNickname),
)
class Node:
"""
Class that handles most of the PierMesh data
`🔗 Source <https://git.utopic.work/PierMesh/piermesh/src/branch/main/src/run.py>`_
"""
def __init__(self):
self.toLog = []
"""
We store logs to be processed here
See Also
--------
logPassLoop: Loop to handle logging to file and TUI
"""
actionsList = [f for f in dir(self) if "action" in f]
self.actions = {}
for a in actionsList:
self.actions[a.split("_")[1]] = getattr(self, a)
self.cLog(20, "Past action mapping")
2024-07-29 03:47:27 +00:00
self.network = Network()
self.catch = Catch(walk=True)
self.cache = Cache(walk=True)
self.serverInfo = self.cache.get(serverInfoFile)
if self.serverInfo == False:
self.cache.create(serverInfoFile, {"nodeID": random.randrange(0, 1000000)})
self.serverInfo = self.cache.get(serverInfoFile)
self.network.addin(self.serverInfo.get()["nodeID"])
self.cLog(20, "Siph network stack initialized")
self.onodeID = str(self.serverInfo.get()["nodeID"])
self.server = None
2024-07-28 11:21:15 +00:00
self.todo = []
2024-07-29 03:47:27 +00:00
self.sponge = Filter(self.cache, self.onodeID, self.todo, self.cLog)
2024-07-28 11:21:15 +00:00
self.cLog(20, "Filter initialized")
# self.cLog(30, sys.argv)
2024-07-29 03:47:27 +00:00
self.oTransmitter = None
2024-07-28 11:21:15 +00:00
self.cLog(20, "Cryptography initializing")
self.cryptographyInfo = DHEFern(self.cache, nodeNickname, self.cLog)
self.cLog(20, "Cryptography initialized")
self.processed = []
self.proc = psutil.Process(os.getpid())
self.mTasks = {}
def cLog(self, priority: int, message: str):
"""
Convenience function that logs to the ui and log files
Parameters
----------
priority: int
Priority of message to be passed to logging
message: str
Message to log
Returns
-------
None
"""
logging.log(priority, message)
self.toLog.append("[{0}]:\n{1}".format(datetime.datetime.now(), message))
async def monitor(self):
"""
Monitor and log ram and cpu usage
"""
while True:
if tuiOb != None:
if tuiOb.done:
print("Terminating PierMesh service...")
self.proc.terminate()
await asyncio.sleep(10)
memmb = self.proc.memory_info().rss / (1024 * 1024)
memmb = round(memmb, 2)
cpup = self.proc.cpu_percent(interval=1)
self.cLog(
20,
" MEM: {0} mB | CPU: {1} %".format(
memmb,
cpup,
),
)
# Set cpu and memory usage in the TUI
tuiOb.do_set_cpu_percent(float(cpup))
tuiOb.do_set_mem(memmb)
2024-07-29 03:47:27 +00:00
async def spongeListen(self):
2024-07-28 11:21:15 +00:00
"""
Loop to watch for tasks to do
See Also
--------
Filters.base.sieve: Packet filtering/parsing
Notes
-----
We use a common technique here that calls the function from our preloaded actions via dictionary entry
"""
while True:
while len(self.todo) >= 1:
todoNow = self.todo.pop()
action = todoNow["action"]
self.cLog(20, "Action: " + action)
data = todoNow["data"]
await self.actions[action](data)
await asyncio.sleep(1)
async def action_sendToPeer(self, data: dict):
"""
Send data to a peer connected to the server
Parameters
----------
data: dict
Data passed from the filter, this is a generic object so it's similar on all actions here
See Also
--------
Filters.Protocols: Protocol based packet filtering
webui.serve.Server: Runs a light Microdot web server with http/s and websocket functionality
webui.serve.Server.sendToPeer: Function to actually execute the action
"""
2024-07-29 03:47:27 +00:00
self.server.sendToPeer(data["recipient"], data["res"])
2024-07-28 11:21:15 +00:00
async def action_sendCatch(self, data: dict):
"""
Get catch and return the data to a peer
See Also
--------
Bubble.router.Router: Routing class
"""
2024-07-29 03:47:27 +00:00
res = self.catch.get(data["head"], data["body"], fins=data["fins"])
self.server.sendToPeer(data["recipient"], res)
2024-07-28 11:21:15 +00:00
async def action_map(self, data: dict):
"""
Map new network data to internal network map
See Also
--------
2024-07-29 03:47:27 +00:00
Siph.network.Network: Layered graph etwork representation
2024-07-28 11:21:15 +00:00
"""
2024-07-29 03:47:27 +00:00
self.network.addLookup(data["onodeID"], data["mnodeID"])
2024-07-28 11:21:15 +00:00
self.cLog(20, "Lookup addition done")
2024-07-29 03:47:27 +00:00
self.network.addon(data["onodeID"])
2024-07-28 11:21:15 +00:00
async def action_initNodeDH(self, data: dict):
"""
Initialize diffie hellman key exchange
See Also
--------
Cryptography.DHEFern.DHEFern: End to end encryption functionality
"""
if self.cryptographyInfo.getRecord("key", data["onodeID"]) == False:
2024-07-29 03:47:27 +00:00
await self.oTransmitter.initNodeDH(
2024-07-28 11:21:15 +00:00
self.cryptographyInfo, int(data["mnodeID"]), data["onodeID"]
)
async def action_keyDeriveDH(self, data: dict):
"""
Derive key via diffie hellman key exchange
"""
try:
self.cryptographyInfo.keyDerive(
data["publicKey"],
self.cryptographyInfo.getSalt(),
data["recipientNode"],
data["params"],
)
except:
self.cLog(30, traceback.format_exc())
async def logPassLoop():
"""
Loop to pass logs up to the TUI
See Also
--------
2024-07-29 03:47:27 +00:00
ui.TUI: TUI implementation
2024-07-28 11:21:15 +00:00
"""
global tuiOb, nodeOb
while True:
await asyncio.sleep(1)
if tuiOb == None or nodeOb == None:
await asyncio.sleep(1)
elif tuiOb.done == True:
print("Terminating PierMesh service...")
nodeOb.proc.terminate()
else:
ctoLog = [l for l in nodeOb.toLog]
for l in ctoLog:
tuiOb.do_write_line(l)
nodeOb.toLog.pop()
async def main():
"""
Main method for running the PierMesh service
"""
global nodeOb
try:
2024-07-29 03:47:27 +00:00
nodeOb = Node()
2024-07-28 11:21:15 +00:00
nodeOb.cLog(20, "Starting up")
nodeOb.cLog(20, "Staggering {0} seconds, please wait".format(sys.argv[4]))
time.sleep(int(sys.argv[4]))
2024-07-29 03:47:27 +00:00
nodeOb.oTransmitter = Transmitter(
sys.argv[1],
nodeOb.sponge,
nodeOb.onodeID,
nodeOb.cache,
nodeOb.catch,
nodeOb.cryptographyInfo,
nodeOb.cLog,
)
nodeOb.server = Server(
nodeOb.oTransmitter,
nodeOb.catch,
nodeOb.onodeID,
nodeOb.network,
nodeOb.cLog,
2024-07-28 11:21:15 +00:00
)
2024-07-29 03:47:27 +00:00
nodeOb.mTasks["list"] = asyncio.create_task(nodeOb.spongeListen())
2024-07-28 11:21:15 +00:00
await asyncio.sleep(1)
2024-07-29 03:47:27 +00:00
nodeOb.mTasks["pct"] = asyncio.create_task(nodeOb.oTransmitter.progressCheck())
2024-07-28 11:21:15 +00:00
await asyncio.sleep(1)
2024-07-29 03:47:27 +00:00
nodeOb.mTasks["mon"] = asyncio.create_task(nodeOb.monitor())
2024-07-28 11:21:15 +00:00
await asyncio.sleep(1)
2024-07-29 03:47:27 +00:00
nodeOb.mTasks["announce"] = asyncio.create_task(nodeOb.oTransmitter.announce())
2024-07-28 11:21:15 +00:00
await asyncio.sleep(1)
2024-07-29 03:47:27 +00:00
await nodeOb.server.app.start_server(port=int(sys.argv[2]), debug=True)
2024-07-28 11:21:15 +00:00
except KeyboardInterrupt:
sys.exit()
if __name__ == "__main__":
try:
2024-07-29 03:47:27 +00:00
mainThread = threading.Thread(target=asyncio.run, args=(main(),))
mainThread.start()
lplThread = threading.Thread(target=asyncio.run, args=(logPassLoop(),))
lplThread.start()
2024-07-28 11:21:15 +00:00
tuiOb = TUI()
tuiOb.nodeOb = nodeOb
tuiOb.run()
except KeyboardInterrupt:
nodeOb.cLog(30, "Terminating PierMesh service...")
except Exception:
nodeOb.cLog(30, sys.gettrace())