Multiple changes:

- Move to a Python process for the LanguageLink client [fixes #1, presumably]
- Finish a first rough draft of the booklet [fixes #4]
This commit is contained in:
2022-06-11 18:43:17 +02:00
parent acded68da5
commit 6369d15e93
39 changed files with 3977 additions and 56 deletions

14
carp/src/Pipfile Normal file
View File

@@ -0,0 +1,14 @@
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true
[dev-packages]
[packages]
flask = "*"
requests = "*"
msgpack = "*"
[requires]
python_version = "3"

View File

178
carp/src/bridge/bridge.py Normal file
View File

@@ -0,0 +1,178 @@
import argparse
import threading
import sys
import hooks
from bridge.hooks import *
from bridge.object_registry import registry
def pbbreak():
print("Breaking now")
breakpoint()
print("Continuing")
class EvalCommand:
statements = ""
bindings = {}
commandId = 0
def __init__(self, commandId, statements, bindings):
self.statements = statements
self.commandId = commandId
self.bindings = bindings
def execute_using_env(self, env):
try:
self.execute()
except Exception as err:
self.perform_proceed_action(notify_error(err,self))
def perform_proceed_action(self, actionDict):
actionSymbol = actionDict['action']
if actionSymbol == "IGNORE":
pass
if actionSymbol == "DROP_QUEUE":
bridge.globals.globalCommandList.drop_queue()
if actionSymbol == "REPLACE_COMMAND":
commandDict = actionDict["command"]
bridge.globals.globalCommandList.push_command_at_first(EvalCommand(
commandDict["commandId"],
commandDict["statements"],
commandDict["bindings"]))
def command_id(self):
return self.commandId
def execute(self):
bridge.globals.proc.evaluate(self.statements)
class Logger():
def log(self, msg):
print(str(msg), file=sys.stderr, flush=True)
class NoLogger():
def log(self, msg):
pass
# This List is thought to be multi-producer and single-consumer. For optimal results wait for push_command return value to push another command that depends on the previous one.
class CommandList:
currentCommandIndex = 0
commandList = []
listLock = threading.Lock()
consumeSemaphore = threading.Semaphore(value=0)
# This method locks the thread until the command has been succesfully appended to the list. Even though that it has a lock inside, we do not expect long waiting time.
def push_command(self, aCommand):
self.listLock.acquire()
self.commandList.append(aCommand)
commandIndex = len(self.commandList) - 1
self.listLock.release()
self.consumeSemaphore.release()
return commandIndex
def push_command_at_first(self, aCommand):
self.listLock.acquire()
self.commandList.insert(self.currentCommandIndex, aCommand)
self.listLock.release()
self.consumeSemaphore.release()
return self.currentCommandIndex
def drop_queue(self):
self.listLock.acquire()
self.consumeSemaphore = threading.Semaphore(value=0)
self.currentCommandIndex = len(self.commandList)
self.listLock.release()
def consume_command(self):
repeatMonitorFlag = True
while repeatMonitorFlag:
self.consumeSemaphore.acquire()
self.listLock.acquire()
repeatMonitorFlag = False
if(self.currentCommandIndex >= len(self.commandList)):
repeatMonitorFlag = True
self.listLock.release()
command = self.commandList[self.currentCommandIndex]
self.currentCommandIndex += 1
self.listLock.release()
return command
def get_current_command(self):
if self.currentCommandIndex == 0:
return None
self.listLock.acquire()
command = self.commandList[self.currentCommandIndex-1]
self.listLock.release()
return command
def get_command_list(self):
self.listLock.acquire()
listCopy = self.commandList.copy()
self.listLock.release()
return listCopy
#### UTILS FUNCTIONS
def clean_locals_env():
return locals()
def deserialize(text):
result = bridge.globals.msg_service.serializer.deserialize(text)
bridge.globals.logger.log("DESERIALISE (bridge): " + str(result))
if registry().isProxy(result):
result = registry().resolve(result['__pyid__'])
return result
def enqueue_command(data):
bridge.globals.globalCommandList.push_command(EvalCommand(
data["commandId"],
data["statements"],
{k: deserialize(v) for k, v in data["bindings"].items()}))
def run_bridge():
ap = argparse.ArgumentParser()
ap.add_argument("-p", "--port", required=False,
help="port to be used for receiving instructions")
ap.add_argument("-o", "--pharo", required=True,
help="port to be used for sending notifications back to pharo")
ap.add_argument("-m", "--method", required=False,
help="identifier for communication protocol strategy http or msgpack")
ap.add_argument("--log", required=False, const=True, nargs="?",
help="enable logging")
args = vars(ap.parse_args())
bridge.globals.pharoPort = args["pharo"]
if args["log"]:
bridge.globals.logger = Logger()
else:
bridge.globals.logger = NoLogger()
bridge.globals.pyPort = args["port"]
bridge.globals.globalCommandList = CommandList()
globalCommandList = bridge.globals.globalCommandList
bridge.globals.proc = carp.start_carp_proc()
env = clean_locals_env()
msg_service = None
if args["port"] == None:
args["port"] = '0'
if args["method"] == None:
args["method"] = 'http'
if args["method"] == 'http':
from bridge import flask_platform
msg_service = flask_platform.build_service(int(args["port"]), int(args["pharo"]), enqueue_command)
elif args["method"] == 'msgpack':
from bridge import msgpack_socket_platform
msg_service = msgpack_socket_platform.build_service(int(args["port"]), int(args["pharo"]), enqueue_command)
else:
raise Exception("Invalid communication strategy.")
bridge.globals.msg_service = msg_service
msg_service.start()
bridge.globals.logger.log("PYTHON: Start consuming commands")
while True:
command = globalCommandList.consume_command()
bridge.globals.logger.log("PYTHON: Executing command " + command.command_id())
bridge.globals.logger.log("PYTHON: bindings: " + str(command.bindings))
bridge.globals.logger.log("PYTHON: " + command.statements)
command.execute_using_env(env)
bridge.globals.logger.log("PYTHON: Finished command execution")
if __name__ == "__main__":
run_bridge()

13
carp/src/bridge/carp.py Normal file
View File

@@ -0,0 +1,13 @@
import subprocess
class CarpProc:
def __init__(self):
self.proc = subprocess.Popen(['carp'])
def evaluate(self, statements):
self.proc.stdin.send(statements)
def start_carp_proc():
return CarpProc()

View File

@@ -0,0 +1,73 @@
from flask import Flask, request
import http.client
import json
import threading
import bridge.globals
import bridge.utils
from bridge import json_encoder
import sys
import logging
import requests
class FlaskMsgService:
def __init__(self, port, pharo_port, feed_callback):
self.serializer = json_encoder.JsonSerializer()
log = logging.getLogger('werkzeug')
log.setLevel(logging.ERROR)
self.thread = None
self.port = port
self.pharo_port = pharo_port
self.feed_callback = feed_callback
self.app = Flask('carp_bridge')
self.app.use_reloader=False
self.session = requests.Session()
self.session.trust_env = True
@self.app.route("/ENQUEUE", methods=["POST"])
def eval_expression():
data = request.get_json(force=True)
self.feed_callback(data)
return "{}"
@self.app.route("/IS_ALIVE", methods=["POST"])
def status_endpoint():
return "{}"
def addMapping(self, key_type, mapping_function):
json_encoder.addMapping(key_type, mapping_function)
def _start(self):
try:
self.app.run(port=self.port)
except OSError as err:
bridge.globals.logger.log('Critical Error:' + str(err))
exit(42)
def start(self):
self.thread = threading.Thread(target=self._start, args=())
self.thread.daemon = True
self.thread.start()
def is_running(self):
return self.thread != None
def stop(self):
pass
def send_async_message(self, msg):
self.send_sync_message(msg)
def send_sync_message(self, msg):
msg['__sync'] = bridge.utils.random_str()
bridge.globals.logger.log("SYNC_MSG: " + json.dumps(msg))
response = self.session.post(
'http://localhost:' + str(self.pharo_port) + '/' + msg['type'],
data=json.dumps(msg),
headers={'content-type': 'application/json'},
allow_redirects=True).content.decode('utf-8')
bridge.globals.logger.log("SYNC_ANS: " + response)
return json.loads(response)
def build_service(port, pharo_port, feed_callback):
return FlaskMsgService(port, pharo_port, feed_callback)

View File

50
carp/src/bridge/hooks.py Normal file
View File

@@ -0,0 +1,50 @@
from flask import Flask, request
import http.client
import argparse
import sys
import traceback
import bridge.globals
def serialize(obj):
return bridge.globals.msg_service.serializer.serialize(obj)
def deserialize(text):
return bridge.globals.msg_service.serializer.deserialize(text)
def observer(commandId, observerId):
return lambda obj: notify_observer(obj, commandId, observerId)
#### NOTIFICATION FUNCTIONS
def notify(obj, notificationId):
bridge.globals.logger.log("PYTHON: Notify " + str(notificationId))
data = {}
data["type"] = "EVAL"
data["id"] = notificationId
data["value"] = serialize(obj)
bridge.globals.msg_service.send_async_message(data)
def notify_observer(obj, commandId, observerId):
bridge.globals.logger.log("PYTHON: Notify observer " + str(commandId) + " " + str(observerId))
data = {}
data["type"] = "CALLBACK"
data["commandId"] = commandId
data["observerId"] = observerId
data["value"] = serialize(obj)
rawValue = bridge.globals.msg_service.send_sync_message(data)['value']
return deserialize(rawValue)
def notify_error(ex, command):
bridge.globals.logger.log("Error on command: " + str(command.command_id()))
bridge.globals.logger.log("Exception: " + str(ex))
data = {}
data["type"] = "ERR"
data["errMsg"] = str(ex)
data["trace"] = traceback.format_exc(100)
data["commandId"] = command.command_id()
return bridge.globals.msg_service.send_sync_message(data)
def bridge_inspect(obj):
if hasattr(obj,'__dict__'):
return obj.__dict__
else:
return {}

View File

@@ -0,0 +1,28 @@
import json
import io
from bridge.object_registry import registry
mapper = {}
def addMapping(key_type, mapping_function):
mapper[key_type] = mapping_function
class JsonEncoder(json.JSONEncoder):
def __init__(self, *args, **kwargs):
json.JSONEncoder.__init__(self, *args, **kwargs)
self.mapper = mapper
def default(self, obj):
if type(obj) in self.mapper:
return self.mapper[type(obj)](obj)
return {
'__pyclass__': type(obj).__name__,
'__pyid__': registry().register(obj)
}
class JsonSerializer:
def serialize(self, obj):
return json.dumps(obj, cls=JsonEncoder)
def deserialize(self, text):
return json.loads(text)

View File

@@ -0,0 +1,41 @@
import msgpack
from bridge.object_registry import registry
primitive_types = [
type(None),
bool,
int,
bytes,
str,
dict,
list,
memoryview,
bytearray]
mapper = {}
def addMapping(key_type, mapping_function):
mapper[key_type] = mapping_function
class MsgPackSerializer:
def __init__(self):
self.primitive_types = primitive_types
def mapper(self):
return mapper
def default(self, obj):
if type(obj) in self.primitive_types:
return obj
if type(obj) in self.mapper():
return self.mapper()[type(obj)](obj)
return {
'__pyclass__': type(obj).__name__,
'__pyid__': registry().register(obj)
}
def serialize(self, obj):
return msgpack.packb(obj, default=self.default, use_bin_type=True)
def deserialize(self, binary):
return msgpack.unpackb(binary, raw=False)

View File

@@ -0,0 +1,120 @@
import msgpack
import socket
import _thread
import threading
import time
import sys
import bridge.globals
from bridge import stoppable_thread, msgpack_serializer
from uuid import uuid1
# Messages supported by this sockets must be Dictionaries. This is because we use special key __sync to know if it is
# a synchronized message or not. If it is we hook a semaphore to that id under the __sync key and after we receive the
# value we store there the return message and signal the semaphore.
class MsgPackSocketPlatform:
def __init__(self, port):
self.port = port
self.client = None
self.serializer = msgpack_serializer.MsgPackSerializer()
self.unpacker = msgpack.Unpacker(raw=False)
self.packer = msgpack.Packer(use_bin_type=True)
self.sync_table = {}
self.async_handlers = {}
def addMapping(self, key_type, mapping_function):
msgpack_serializer.addMapping(key_type, mapping_function)
def set_handler(self, msg_type, async_handler):
self.async_handlers[msg_type] = async_handler
def prim_handle(self):
try:
bridge.globals.logger.log("loop func")
data = self.client.recv(2048)
if len(data) == 0:
time.sleep(0.005)
else:
self.unpacker.feed(data)
for msg in self.unpacker:
bridge.globals.logger.log("prim handle message")
self.prim_handle_msg(msg)
except OSError:
bridge.globals.logger.log("OSError: " + str(err))
self.stop()
sys.exit()
exit(-1)
except Exception as err:
bridge.globals.logger.log("ERROR message: " + str(err))
def setup_func(self):
self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client.connect(('localhost', self.port))
def stop(self):
if self.thread is not None:
self.thread.stop()
if self.client is not None:
self.client.close()
self.client = None
def send_answer(self, msg, answer):
if answer['type'] != msg['type']:
raise Exception('Type mismatch')
answer['__sync'] = msg['__sync']
self.send_async_message(answer)
def is_running(self):
return self.client != None
def prim_handle_msg(self, raw_msg):
msg = raw_msg
msg_type = msg['type']
if msg_type in self.async_handlers:
self.async_handlers[msg['type']](msg)
elif is_sync_msg(msg):
sync_id = message_sync_id(msg)
semaphore = self.sync_table[sync_id]
self.sync_table[sync_id] = msg
semaphore.release()
else:
bridge.globals.logger.log("Error! Msg couldnt be handled")
raise Exception('Message couldn''t be handled')
def start(self):
self.thread = stoppable_thread.StoppableThread(
loop_func= self.prim_handle,
setup_func= self.setup_func)
self.thread.start()
time.sleep(.1)
def send_async_message(self, msg):
self.client.send(self.packer.pack(msg))
def send_sync_message(self, msg):
sync_id = mark_message_as_sync(msg)
semaphore = threading.Semaphore(value=0)
self.sync_table[sync_id] = semaphore
self.send_async_message(msg)
semaphore.acquire()
ans = self.sync_table[sync_id]
del self.sync_table[sync_id]
return ans
def is_sync_msg(msg):
return '__sync' in msg
def message_sync_id(msg):
return msg['__sync']
def mark_message_as_sync(msg):
sync_id = uuid1().hex
msg['__sync'] = sync_id
return sync_id
def build_service(port, pharo_port, feed_callback):
service = MsgPackSocketPlatform(pharo_port)
service.set_handler('ENQUEUE',feed_callback)
service.set_handler('IS_ALIVE', lambda msg: service.send_answer(msg, {'type': 'IS_ALIVE'}))
return service

View File

@@ -0,0 +1,88 @@
import bridge.globals
from uuid import uuid1
def ensure_global_registry():
if not hasattr(bridge.globals, 'ObjectRegistry'):
bridge.globals.ObjectRegistry = Registry()
def registry():
return bridge.globals.ObjectRegistry
primitive = (int, str, bool)
def is_primitive(obj):
return isinstance(obj, primitive)
class Registry():
def __init__(self):
self.idToObjMap = {}
self.objToIdMap = {}
def hasId(self, anId):
return anId in self.idToObjMap
def createNewObjId(self):
return uuid1().hex
def register(self, obj):
if obj is None or is_primitive(obj):
return 0
if id(obj) in self.objToIdMap:
return self.objToIdMap[id(obj)]
else:
return self._register(obj, self.createNewObjId())
def register_with_id(self, obj, newObjId):
if obj is None or is_primitive(obj):
return RegisterForbiddenObject(obj)
if id(obj) in self.objToIdMap:
objId = self.objToIdMap[id(obj)]
if objId == newObjId:
return newObjId
else:
raise RegisterWithDifferentIdError(obj, newObjId)
else:
return self._register(obj, newObjId)
def resolve(self, objId):
if objId in self.idToObjMap:
return self.idToObjMap[objId]
else:
raise ResolveUnknownObject(objId)
def _register(self, obj, newObjId):
self.idToObjMap[newObjId] = obj
self.objToIdMap[id(obj)] = newObjId
return newObjId
def clean(self, objId):
obj = self.idToObjMap[objId]
del self.idToObjMap[objId]
del self.objToIdMap[id(obj)]
def isProxy(self, anObject):
is_proxy = False
if isinstance(anObject, dict):
if len(anObject.keys()) == 2 and ('__pyclass__' in anObject) and ('__pyid__' in anObject):
is_proxy = True
return is_proxy
class RegistryError(Exception):
pass
class RegisterWithDifferentIdError(RegistryError):
def __init__(self, obj, newId):
RegistryError.__init__(self,"Attempting to register object {0} with ID {1} with different ID {2}.".format(type(obj).__name__, registry().register(obj), newId))
class ResolveUnknownObject(RegistryError):
def __init__(self, objId):
RegistryError.__init__(self,"Attempting to resolve unknown object with id {0}.".format(objId))
class RegisterForbiddenObject(RegistryError):
def __init__(self, obj):
RegistryError.__init__(self,"Attempting to register forbidden object of type {0}.".format(type(obj).__name__))
ensure_global_registry()

View File

@@ -0,0 +1,24 @@
import threading
class StoppableThread(threading.Thread):
def __init__(self, loop_func, setup_func):
threading.Thread.__init__(self)
self._stop_event = threading.Event()
self.daemon = True
self.loop_func = loop_func
self.setup_func = setup_func
# function using _stop function
def stop(self):
self._stop_event.set()
def stopped(self):
return self._stop_event.isSet()
def run(self):
self.setup_func()
while True:
if self.stopped():
return
self.loop_func()

View File

@@ -0,0 +1,36 @@
import argparse
import threading
import time
# Calculate the factorial
def factorial(n, t):
time.sleep(n*t/4)
print("Start: " + str(t) + ": " + str(n))
if n == 1:
res = 1
if t == 1:
print("Feel free to break here")
else:
res = n * factorial(n-1, t)
return res
# Calculate the factorial and print the result
def factorial_thread(n, t):
time.sleep(2)
result = factorial(n, t)
print("Thread " + str(t) + " = "+str(result))
def launch_factorials(n):
threads = []
print("Calculate: "+str(n))
breakpoint()
for i in range(n):
threads.append(threading.Thread(target=factorial_thread, args=(n+i, i+1)))
threads[-1].start()
print("Wait for the results")
for thread in threads:
thread.join()
print("Done")

2
carp/src/bridge/tmp.py Normal file
View File

@@ -0,0 +1,2 @@
import time
time.sleep(3)

5
carp/src/bridge/utils.py Normal file
View File

@@ -0,0 +1,5 @@
from uuid import uuid1
def random_str():
return uuid1().hex

View File

@@ -0,0 +1,17 @@
(load "/Users/veitheller/Documents/Code/Github/carp/archive/socket/sockets.carp")
(use Socket)
(defn main []
(let-do [port (Maybe.from (from-string &(Maybe.from (nth &System.args 1) @"")) 0)]
(IO.println &(fmt "Starting server on localhost, port %d" port))
(Socket.with-server sock "127.0.0.1" port
(if (Socket.valid? &sock)
(Socket.while-connection &sock client
(IO.println &(read &client))
(send &client "hi")
(IO.println "yay"))
(IO.errorln "Server couldnt be started.")))))
(build)
(run)

4
carp/src/languagelink.py Normal file
View File

@@ -0,0 +1,4 @@
from bridge import bridge
from bridge.bridge_hooks import *
bridge.run_bridge()