-- vim: ft=terra
-- NOTA BENE: this is a hack to get around the
-- weaknesses of the terra macro parser. it is
-- highly undesirable and should be replaced if
-- suitable IPC primitives become available
local S = terralib.includecstring [[
#include <signal.h>
int _min(void) { return SIGRTMIN; }
int _max(void) { return SIGRTMAX; }
typedef struct sigaction _sig_spec;
]]--terra cannot access structures when they have the same name as a function
local m = {
signals = {
-- rtsigs
notify_state_change = 0x3;
notify_cond_update = 0x4;
-- values sent through rtsig
state_success = 0x69;
state_fail_find = 0x93;
-- system sigs
sys_child = S.SIGCHLD;
};
cmd = lib.enum {
'none', 'enumerate', 'stop', 'cfgrefresh', 'chnoise'
};
proc_active = global(true);
ipckey = 0x8e037c6;
}
terra m.signum(reqsig: int)
var sig = S._max() - reqsig
-- we implement this in contradiction to the recommendations of the manpages
-- because they make no goddamn sense. if SIGRTMIN can vary, using it as a basis
-- for signals is really rather stupid -- there's no guarantee a signal sent
-- by one process will be interpreted correctly by another, as its SIGRTMIN
-- might well be different! so we use SIGRTMAX as the base instead, assuming
-- that it's more likely to remain constant across the whole system.
if reqsig > S._max() or sig < S._min() then lib.bail('realtime signal error - requested signal number is greater than the available number of realtime signals') end
return sig
end
terra m.sigdec(rtsig: int)
-- exactly like the above, except without bounds checking
return S._max() - rtsig
end
local getppid = terralib.externfunction('getppid', {} -> int);
local sigsend = terralib.externfunction('sigqueue', {int,int,intptr} -> int);
local sigfd = terralib.includec 'sys/signalfd.h';
terra m.notify_parent(sig: int)
sigsend(getppid(), m.signum(m.signals.notify_state_change), sig)
end
struct m.listener { prevmask:S.sigset_t fd:int }
terra m.listener.methods.mk()
var mask: S.sigset_t
var l: m.listener
S.sigemptyset(&mask)
-- important to keep this in sync with any new signals we add
S.sigaddset(&mask, m.signum(m.signals.notify_state_change))
S.sigaddset(&mask, m.signum(m.signals.notify_cond_update))
S.sigaddset(&mask, S.SIGCHLD)
S.sigprocmask(S.SIG_BLOCK, &mask, &l.prevmask)
l.fd = sigfd.signalfd(-1, &mask, 0)
return l
end
terra m.listener:block()
var si: sigfd.signalfd_siginfo
lib.io.recv(self.fd, [rawstring](&si), sizeof([si.type]))
var syssig = si.ssi_signo < S._min() or si.ssi_signo > S._max();
return {
from = si.ssi_pid;
system = syssig;
sig = lib.trn(syssig, si.ssi_signo, m.sigdec(si.ssi_signo));
event = si.ssi_int;
exit = si.ssi_status;
}
end
terra m.listener:release()
lib.io.close(self.fd)
S.sigprocmask(S.SIG_SETMASK, &self.prevmask, nil)
end
local ipc = terralib.includec 'sys/ipc.h'
local shm = terralib.includec 'sys/shm.h'
local sem = terralib.includec 'sys/sem.h'
local mq = terralib.includec 'sys/msg.h'
struct m.ack {
clid: uint64
cliq: int
success: bool
union {
iname: int8[64]
}
}
struct m.demand {
cmd: m.cmd.t
empq: ipc.key_t -- ID of the current emperor's msg queue, chosen randomly
operand: uint64 -- extra cmd parameter, where useful
}
struct m.edict {
semid: int -- controls access to shared memory
serial: uint64 -- starts at zero, no action taken until set to 1
nextclient: uint64 -- starting at 1
demand: m.demand
}
terra m.edict:sem(op: int)
var sb = sem.sembuf {
sem_num = 1;
sem_op = op;
sem_flg = 0;
}
return sem.semop(self.semid, &sb, 1)
end
struct m.emperor {
shmid: int
msqid: int -- my message queue
clid: uint64 -- my client serial id
lastcmd: uint64 -- when serial increases above this, execute the command
edict: &m.edict
client: bool
oldsig_int: S._sig_spec
oldsig_term: S._sig_spec
}
terra m.emperor:countpeers(): intptr
var si: shm.shmid_ds
shm.shmctl(self.shmid,ipc.IPC_STAT,&si)
return si.shm_nattch - 1
end
local terra handle_interrupt :: int -> {}
terra m.emperor.methods.mk(client: bool)
var em = m.emperor { lastcmd = 0, client = client }
em.shmid = shm.shmget(m.ipckey, sizeof(m.emperor), 0);
if em.shmid == -1 then -- needs to be created
em.shmid = shm.shmget(m.ipckey, sizeof(m.emperor), ipc.IPC_CREAT or ipc.IPC_EXCL or 0600);
if em.shmid == -1 then lib.bail('could not get shared memory segment; may have clashed with simultaneously starting process, if any') end
em.edict = [&m.edict](shm.shmat(em.shmid, nil, 0))
em.edict.semid = sem.semget([ipc.key_t](0), 1, 0600)
else
em.edict = [&m.edict](shm.shmat(em.shmid, nil, 0))
end
if [intptr](em.edict) == [intptr](-1) then lib.bail('could not attach shared memory segment') end
if client then
em.edict:sem(0) -- wait our turn
em.edict:sem(1) -- lock edict
if em.edict.nextclient == 0 then em.edict.nextclient = 1 end
em.clid = em.edict.nextclient
em.edict.nextclient = em.clid + 1
em.edict:sem(-1) -- release edict
end
-- while true do
-- var id = lib.crypt.random(uint64) % [ipc.key_t:max()]
-- em.msqid = mq.msgget([ipc.key_t](id),ipc.IPC_CREAT or ipc.IPC_EXCL)
-- if em.msqid ~= -1 then break end
-- end
em.msqid = mq.msgget([ipc.key_t](0), 0600)
var empty: S.sigset_t
S.sigemptyset(&empty) -- 🙄
var ss = S._sig_spec { sa_mask = empty, sa_flags = 0 }
ss.__sigaction_handler.sa_handler = handle_interrupt; -- may not be portable
S.sigaction(S.SIGINT, &ss, &em.oldsig_int)
S.sigaction(S.SIGTERM, &ss, &em.oldsig_term)
return em
end
terra m.emperor:poll(cmd: &m.demand) if self.shmid == 0 then return end
if self.edict.serial > self.lastcmd then
self.lastcmd = self.edict.serial
@cmd = self.edict.demand
self.edict:sem(-1) -- release my lock
return
end
if mq.msgrcv(self.msqid, cmd, sizeof(m.demand), 0, ipc.IPC_NOWAIT) ~= -1 then
return
end
cmd.cmd = m.cmd.none
end
terra m.emperor:mallack() return lib.mem.heapa(m.ack, self:countpeers()) end
terra m.emperor.methods.decree :: {
&m.emperor, uint64, rawstring, m.cmd.t, uint64, &m.ack
} -> bool
terra m.emperor:decree(
tgtclid: uint64, tgtname: rawstring,
cmd: m.cmd.t, operand: uint64,
result: &m.ack
): bool
if self.client then lib.bail('client attempted to issue IPC decree') end
var dem = m.demand {
cmd = cmd;
operand = operand;
empq = self.msqid; -- register to receive replies
}
var npeers = self:countpeers()
if npeers == 0 then lib.warn('no processes connected to control bus') return false end
if tgtclid == 0 and tgtname == nil then
lib.dbg('sending to all instances, waiting for edict to become writable')
self.edict:sem(0) -- wait for all locks on edict to resolve
lib.dbg('locking edict')
self.edict:sem(npeers) -- place a read lock for each peer
self.edict.demand = dem
lib.dbg('sending edict')
self.edict.serial = self.edict.serial + 1 -- send command
var acks: intptr = 0
while acks < npeers do
lib.dbg('awaiting responses from instances')
if mq.msgrcv(self.msqid, result + acks, sizeof(m.ack), 0, 0) == -1 then
lib.bail('error occurred while waiting for responses from instances')
end
acks = acks + 1
-- TODO: handle rare case that a process dies here without releasing
-- its semaphore read lock, desyncing and paralyzing IPC
end
lib.dbg('all instances have reported in')
else
-- TODO: get msgqueue and send edict
var acks = lib.mem.heapa(m.ack, npeers)
self:decree(0, nil, m.cmd.enumerate, 0, &acks(0))
var tgt: int
var found = false
for i=0,npeers do
if acks(i).clid == tgtclid or (
tgtname ~= nil and lib.str.cmp(tgtname, acks(i).iname) == 0
) then
found = true
tgt = acks(i).cliq
end
end
acks:free()
if not found then
lib.warn('no such instance is currently online and responding to IPC calls')
return false
else
lib.dbg('located instance, sending command')
if mq.msgsnd(tgt, &dem, sizeof(m.demand), 0) == -1 then
lib.bail('could not send command to target process')
end
while true do
var ack: m.ack
if mq.msgrcv(self.msqid, &ack, sizeof(m.ack), 0, 0) == -1 then
lib.bail('error while awaiting response')
end
if ack.cliq == tgt then
lib.dbg('got response, writing out and returning')
@result = ack
break
else lib.warn('got spurious response, ignoring') end
end
end
end
return true
end
terra m.demand:ack(emp: &m.emperor, a: &m.ack)
a.clid = emp.clid
a.cliq = emp.msqid
mq.msgsnd(self.empq, a, sizeof(m.ack), 0)
end
m.global_emperor = global(m.emperor, `m.emperor {shmid = 0}, 'process-emperor') -- :/
terra m.emperor:release() if self.shmid == 0 then return end
if self:countpeers() == 0 then
-- we're the last user, go ahead and destroy it
sem.semctl(self.edict.semid,0,ipc.IPC_RMID)
shm.shmctl(self.shmid,ipc.IPC_RMID,nil)
end
shm.shmdt([&opaque](self.edict))
mq.msgctl(self.msqid,ipc.IPC_RMID,nil)
S.sigaction(S.SIGINT, &m.global_emperor.oldsig_int, nil)
S.sigaction(S.SIGTERM, &m.global_emperor.oldsig_term, nil)
self.shmid = 0
end
terra handle_interrupt(sig: int): {}
m.global_emperor:release()
lib.warn('caught interrupt, shutting down')
m.proc_active = false
end
return m