Distributed SHM Server (daoServer)
daoServer is a Python daemon that bridges local shared memory (SHM) segments
with a Redis-backed distributed registry, letting any machine on the network
subscribe to live ZMQ streams of another machine’s SHM data.
Overview
Without daoServer each DAO process can only access SHM segments on the same
host. daoServer lifts that restriction:
Each participating machine runs a
daoServerinstance.On start-up the server discovers every
*.im.shmfile on the local host and publishes its name, shape, and dtype to a shared Redis registry.When a client requests a remote SHM segment the server allocates a ZMQ PUB socket and starts streaming frame updates. The client creates a local mirror SHM segment that is kept up-to-date by a background subscriber thread.
Both sides exchange heartbeat keys in Redis. When a client goes silent its mirror subscription is torn down and the ZMQ publisher is stopped if no other clients remain.
Note
The diagram above will render once the file docs/source/_static/daoServer_architecture.png
is added to the repository. In the meantime the text below describes the same topology.
Architecture
┌─────────────────────────────────────────────────────────────────────────┐
│ Redis Registry │
│ │
│ dao:machines:<host>:ip / hostname / role / last_connect │
│ dao:machines:<host>:shm:<name>:dimensions / dtype / connections │
│ dao:connections:<host>:<shm> (pending connection request) │
│ dao:heartbeats:<host>:<shm>:<client> │
└──────────────────┬──────────────────────────────┬───────────────────────┘
│ │
┌────────┴────────┐ ┌────────┴────────┐
│ Machine A │ │ Machine B │
│ (daoServer) │ ZMQ PUB → │ (daoServer) │
│ │ ─────────► │ │
│ /tmp/wfs.im.shm│ │ /tmp/remote_ │
│ │ │ MachineA_wfs │
└─────────────────┘ └─────────────────┘
Prerequisites
In addition to the standard DAO dependencies the following Python packages are required:
pip install redis pyzmq numpy
The redis package talks to a running Redis server. Install the server itself
with your system package manager if it is not already present:
# Ubuntu / Debian
sudo apt install redis
# RedHat / CentOS / Fedora
sudo yum install redis
# macOS
brew install redis
Redis must be reachable from every machine that participates in the distributed
registry. A single Redis instance is sufficient for a small cluster; point all
daoServer instances at the same host and port.
Redis Key Schema
All keys share the dao: namespace prefix.
Key pattern |
Description |
|---|---|
|
IPv4 address of the registered machine |
|
Hostname string |
|
|
|
ISO-8601 UTC timestamp of the last heartbeat |
|
JSON array of the segment’s array shape, e.g. |
|
NumPy dtype string, e.g. |
|
JSON array of active client connection records (server IP, port, …) |
|
Transient request key written by a client; deleted once the server starts publishing |
|
ISO-8601 UTC timestamp refreshed by the client every 10 s; auto-expires after 75 s |
Master / Minion Roles
Every cluster needs exactly one master node. The master is responsible for evicting stale machines (those that have not sent a heartbeat within the configured TTL). All other nodes are minions.
Role assignment:
Pass
--role masteror--role minionexplicitly on the command line.Omit
--roleto letdaoServerauto-detect: if no master is found in Redis the node promotes itself to master and logs a warning.
Only the master can call deregister_machine() for a different host or run
cleanup_stale_machines().
Running the Daemon
Command-line interface
python -m daoServer [OPTIONS]
# or directly
python daoServer.py [OPTIONS]
Options
Flag |
Default |
Description |
|---|---|---|
|
|
Hostname or IP of the Redis server |
|
|
Redis TCP port |
|
auto |
Node role; omit to auto-detect |
|
|
Interval between heartbeat writes to Redis |
|
|
First ZMQ port allocated for SHM publishing |
|
— |
Scan local subnets for Redis / daoServer instances and exit |
Scanning the network
Before starting the daemon you can check which Redis instances are visible on your local subnets:
python daoServer.py --discover
Example output:
Found 2 Redis instance(s):
192.168.64.2:6379 [dao] machines: daovm, rts-machine
192.168.64.3:6379 machines: —
A [dao] tag means the instance already has daoServer keys registered.
Typical multi-machine setup
On the Redis host (or any machine that should be master):
# Start Redis
redis-server &
# Start daoServer as master, pointing at local Redis
python daoServer.py --role master --redis-host localhost
On every other machine:
python daoServer.py --role minion --redis-host 192.168.64.2
Python API
discover_redis — subnet scanner
from daoServer import discover_redis
results = discover_redis(port=6379, timeout=0.3, max_workers=64)
for r in results:
print(r["host"], r["dao"], r["machines"])
Probes every host on each local /24 subnet. Returns a list of dicts:
[
{"host": "192.168.64.2", "port": 6379, "dao": True, "machines": ["rts"]},
{"host": "192.168.64.5", "port": 6379, "dao": False, "machines": []},
]
DaoServer class
Constructor
from daoServer import DaoServer
server = DaoServer(
redis_host="192.168.64.2",
redis_port=6379,
role="minion", # or "master", or None for auto-detect
heartbeat_interval=30.0,
base_pub_port=5560,
)
Raises ConnectionError if Redis is unreachable.
Listing registered machines
machines = server.list_machines()
for m in machines:
print(m["hostname"], m["role"], m["shm_segments"])
Each entry contains:
Key |
Description |
|---|---|
|
Machine hostname string |
|
IPv4 address |
|
|
|
ISO-8601 UTC string of the last heartbeat |
|
List of SHM segment names registered for this host |
Accessing a remote SHM segment
# Get a local shm object that mirrors wfs@rts-machine
mirror = server.get_remote_shm("rts-machine", "wfs")
# Read the latest frame
data = mirror.get_data()
# When done, release the subscription
server.release_remote_shm("rts-machine", "wfs")
get_remote_shm blocks until the remote server starts publishing (up to
30 s). It raises:
KeyError— hostname or SHM name not found in Redis.TimeoutError— remote server did not respond within 30 s.NotImplementedError— unsupported transport type in the connection record.
The returned object is a standard daoShm.shm instance; call get_data(),
get_data(check=True) etc. exactly as with a locally-created segment.
Bidirectional mirroring
When bidirectional=True both ends run a publisher and a subscriber so
writes to the local mirror propagate back to the remote SHM:
mirror = server.get_remote_shm("rts-machine", "dm_commands", bidirectional=True)
# Write a new DM command vector — it will appear in the remote SHM
import numpy as np
mirror.set_data(np.zeros(140, dtype=np.float32))
Warning
Bidirectional mode doubles the ZMQ traffic. Both machines must be able to
reach each other’s base_pub_port range. Use unidirectional mode (the
default) wherever write-back is not needed.
Lifecycle
Use start() / stop() when embedding DaoServer in your own daemon:
import threading, signal
from daoServer import DaoServer
server = DaoServer(redis_host="192.168.64.2", role="minion")
def _stop(sig, frame):
server.stop()
signal.signal(signal.SIGINT, _stop)
signal.signal(signal.SIGTERM, _stop)
# start() blocks until stop() is called
threading.Thread(target=server.start, daemon=True).start()
# ... do your own work here ...
start() spawns the following background daemon threads:
Thread name |
Purpose |
|---|---|
|
Refreshes |
|
Syncs local |
|
Watches for incoming connection requests and starts ZMQ publishers |
|
Evicts stale clients and stops publishers with no active subscribers |
|
Deregisters machines that have not heartbeated within the TTL (default 120 s) |
Manual registration / deregistration
These are called automatically by start() / stop(). They can also be
used directly for scripting or testing:
server.register_machine() # write local SHM metadata to Redis
server.deregister_machine() # remove all Redis keys for this host
# Master may deregister another host
server.deregister_machine("stale-rts-machine")
Network discovery via class method
discover() is a static method so it can be called without constructing a
DaoServer first:
results = DaoServer.discover()
if results:
server = DaoServer(redis_host=results[0]["host"])
End-to-End Examples
Unidirectional: read a wavefront sensor on a remote machine
Machine A (WFS host — 192.168.64.2):
python daoServer.py --role master --redis-host localhost
Machine B (consumer):
from daoServer import DaoServer
server = DaoServer(redis_host="192.168.64.2", role="minion")
# Start background threads (non-blocking — run in a thread)
import threading
t = threading.Thread(target=server.start, daemon=True)
t.start()
# Mirror the remote WFS shared memory
wfs_mirror = server.get_remote_shm("machineA", "wfs")
import time
for _ in range(100):
frame = wfs_mirror.get_data(check=True) # blocks until new frame
print("frame shape:", frame.shape, " mean:", frame.mean())
time.sleep(0)
server.release_remote_shm("machineA", "wfs")
server.stop()
Bidirectional: write DM commands back to the RTS
Machine A (RTS — runs the DM loop):
python daoServer.py --role master --redis-host localhost
Machine B (external controller):
from daoServer import DaoServer
import numpy as np, threading
server = DaoServer(redis_host="192.168.64.2", role="minion")
threading.Thread(target=server.start, daemon=True).start()
dm = server.get_remote_shm("rts-machine", "dm_commands", bidirectional=True)
# Push a new command vector — it is immediately visible on the RTS
new_cmd = np.ones(140, dtype=np.float32) * 0.01
dm.set_data(new_cmd)
server.release_remote_shm("rts-machine", "dm_commands")
server.stop()
Troubleshooting
Symptom |
Resolution |
|---|---|
|
Ensure |
|
The remote |
|
The remote server received the connection request but could not open the SHM file. Check that the segment exists in |
Mirror receives zeros indefinitely |
The remote SHM exists but nothing is writing to it. The initial-publish nudge fires after 150 ms; subsequent updates only arrive when the remote SHM counter increments. |
Two servers both claim |
Restart both and pass |
ZMQ ports unreachable between machines |
Open the port range |