ADDED ipc.t Index: ipc.t ================================================================== --- ipc.t +++ ipc.t @@ -0,0 +1,300 @@ +-- vim: ft=terra + +-- NOTA BENE: this is a hack to get around the +-- weaknesses of the terra macro parser. it is +-- highly undesirable and should be replaced if +-- suitable IPC primitives become available +local S = terralib.includecstring [[ + #include + int _min(void) { return SIGRTMIN; } + int _max(void) { return SIGRTMAX; } + typedef struct sigaction _sig_spec; +]]--terra cannot access structures when they have the same name as a function + +local m = { + signals = { + -- rtsigs + notify_state_change = 0x3; + notify_cond_update = 0x4; + -- values sent through rtsig + state_success = 0x69; + state_fail_find = 0x93; + + -- system sigs + sys_child = S.SIGCHLD; + }; + cmd = lib.enum { + 'none', 'enumerate', 'stop', 'cfgrefresh', 'chnoise' + }; + proc_active = global(true); + ipckey = 0x8e037c6; +} + +terra m.signum(reqsig: int) + var sig = S._max() - reqsig + -- we implement this in contradiction to the recommendations of the manpages + -- because they make no goddamn sense. if SIGRTMIN can vary, using as a basis + -- for signals is really rather stupid -- there's no guarantee a signal sent + -- by one process will be interpreted correctly by another, as its SIGRTMIN + -- might well be different! so we use SIGRTMAX as the base instead, assuming + -- that it's more likely to remain constant across the whole system. + if reqsig > S._max() or sig < S._min() then lib.bail('realtime signal error - requested signal number is greater than the available number of realtime signals') end + return sig +end + +terra m.sigdec(rtsig: int) +-- exactly like the above, except without bounds checking + return S._max() - rtsig +end + +local getppid = terralib.externfunction('getppid', {} -> int); +local sigsend = terralib.externfunction('sigqueue', {int,int,intptr} -> int); +local sigfd = terralib.includec 'sys/signalfd.h'; + +terra m.notify_parent(sig: int) + sigsend(getppid(), m.signum(m.signals.notify_state_change), sig) +end + +struct m.listener { prevmask:S.sigset_t fd:int } + +terra m.listener.methods.mk() + var mask: S.sigset_t + var l: m.listener + S.sigemptyset(&mask) + -- important to keep this in sync with any new signals we add + S.sigaddset(&mask, m.signum(m.signals.notify_state_change)) + S.sigaddset(&mask, m.signum(m.signals.notify_cond_update)) + S.sigaddset(&mask, S.SIGCHLD) + S.sigprocmask(S.SIG_BLOCK, &mask, &l.prevmask) + l.fd = sigfd.signalfd(-1, &mask, 0) + return l +end + +terra m.listener:block() + var si: sigfd.signalfd_siginfo + lib.io.recv(self.fd, [rawstring](&si), sizeof([si.type])) + var syssig = si.ssi_signo < S._min() or si.ssi_signo > S._max(); + return { + from = si.ssi_pid; + system = syssig; + sig = lib.trn(syssig, si.ssi_signo, m.sigdec(si.ssi_signo)); + event = si.ssi_int; + exit = si.ssi_status; + } +end + +terra m.listener:release() + lib.io.close(self.fd) + S.sigprocmask(S.SIG_SETMASK, &self.prevmask, nil) +end + +local ipc = terralib.includec 'sys/ipc.h' +local shm = terralib.includec 'sys/shm.h' +local sem = terralib.includec 'sys/sem.h' +local mq = terralib.includec 'sys/msg.h' + +struct m.ack { + clid: uint64 + cliq: int + success: bool + union { + iname: int8[64] + } +} + +struct m.demand { + cmd: m.cmd.t + empq: ipc.key_t -- ID of the current emperor's msg queue, chosen randomly + operand: uint64 -- extra cmd parameter, where useful +} + +struct m.edict { + semid: int -- controls access to shared memory + serial: uint64 -- starts at zero, no action taken until set to 1 + nextclient: uint64 -- starting at 1 + + demand: m.demand +} + +terra m.edict:sem(op: int) + var sb = sem.sembuf { + sem_num = 1; + sem_op = op; + sem_flg = 0; + } + return sem.semop(self.semid, &sb, 1) +end + +struct m.emperor { + shmid: int + msqid: int -- my message queue + clid: uint64 -- my client serial id + lastcmd: uint64 -- when serial increases above this, execute the command + edict: &m.edict + client: bool + oldsig_int: S._sig_spec + oldsig_term: S._sig_spec +} + + +terra m.emperor:countpeers(): intptr + var si: shm.shmid_ds + shm.shmctl(self.shmid,ipc.IPC_STAT,&si) + return si.shm_nattch - 1 +end + + +local terra handle_interrupt :: int -> {} + +terra m.emperor.methods.mk(client: bool) + var em = m.emperor { lastcmd = 0, client = client } + em.shmid = shm.shmget(m.ipckey, sizeof(m.emperor), 0); + if em.shmid == -1 then -- needs to be created + em.shmid = shm.shmget(m.ipckey, sizeof(m.emperor), ipc.IPC_CREAT or ipc.IPC_EXCL or 0600); + if em.shmid == -1 then lib.bail('could not get shared memory segment; may have clashed with simultaneously starting process, if any') end + em.edict = [&m.edict](shm.shmat(em.shmid, nil, 0)) + em.edict.semid = sem.semget([ipc.key_t](0), 1, 0600) + else + em.edict = [&m.edict](shm.shmat(em.shmid, nil, 0)) + end + if [intptr](em.edict) == [intptr](-1) then lib.bail('could not attach shared memory segment') end + if client then + em.edict:sem(0) -- wait our turn + em.edict:sem(1) -- lock edict + if em.edict.nextclient == 0 then em.edict.nextclient = 1 end + em.clid = em.edict.nextclient + em.edict.nextclient = em.clid + 1 + em.edict:sem(-1) -- release edict + end +-- while true do +-- var id = lib.crypt.random(uint64) % [ipc.key_t:max()] +-- em.msqid = mq.msgget([ipc.key_t](id),ipc.IPC_CREAT or ipc.IPC_EXCL) +-- if em.msqid ~= -1 then break end +-- end + em.msqid = mq.msgget([ipc.key_t](0), 0600) + var empty: S.sigset_t + S.sigemptyset(&empty) -- 🙄 + var ss = S._sig_spec { sa_mask = empty, sa_flags = 0 } + ss.__sigaction_handler.sa_handler = handle_interrupt; -- may not be portable + S.sigaction(S.SIGINT, &ss, &em.oldsig_int) + S.sigaction(S.SIGTERM, &ss, &em.oldsig_term) + return em +end + +terra m.emperor:poll(cmd: &m.demand) if self.shmid == 0 then return end + if self.edict.serial > self.lastcmd then + self.lastcmd = self.edict.serial + @cmd = self.edict.demand + self.edict:sem(-1) -- release my lock + return + end + + if mq.msgrcv(self.msqid, cmd, sizeof(m.demand), 0, ipc.IPC_NOWAIT) ~= -1 then + return + end + + cmd.cmd = m.cmd.none +end + +terra m.emperor:mallack() return lib.mem.heapa(m.ack, self:countpeers()) end +terra m.emperor.methods.decree :: { + &m.emperor, uint64, rawstring, m.cmd.t, uint64, &m.ack +} -> {} +terra m.emperor:decree( + tgtclid: uint64, tgtname: rawstring, + cmd: m.cmd.t, operand: uint64, + result: &m.ack +): {} + if self.client then lib.bail('client attempted to issue IPC decree') end + var dem = m.demand { + cmd = cmd; + operand = operand; + empq = self.msqid; -- register to receive replies + } + var npeers = self:countpeers() + if npeers == 0 then lib.bail('no processes connected to control bus') end + if tgtclid == 0 and tgtname == nil then + lib.dbg('sending to all instances, waiting for edict to become writable') + self.edict:sem(0) -- wait for all locks on edict to resolve + lib.dbg('locking edict') + self.edict:sem(npeers) -- place a read lock for each peer + self.edict.demand = dem + lib.dbg('sending edict') + self.edict.serial = self.edict.serial + 1 -- send command + + var acks: intptr = 0 + while acks < npeers do + lib.dbg('awaiting responses from instances') + if mq.msgrcv(self.msqid, result + acks, sizeof(m.ack), 0, 0) == -1 then + lib.bail('error occurred while waiting for responses from instances') + end + acks = acks + 1 + -- TODO: handle rare case that a process dies here without releasing + -- its semaphore read lock, desyncing and paralyzing IPC + end + lib.dbg('all instances have reported in') + else + -- TODO: get msgqueue and send edict + var acks = lib.mem.heapa(m.ack, npeers) + self:decree(0, nil, m.cmd.enumerate, 0, &acks(0)) + var tgt: int + var found = false + for i=0,npeers do + if acks(i).clid == tgtclid or ( + tgtname ~= nil and lib.str.cmp(tgtname, acks(i).iname) == 0 + ) then + found = true + tgt = acks(i).cliq + end + end + acks:free() + if not found then + lib.warn('no such instance is currently online and responding to IPC calls') + else + lib.dbg('located instance, sending command') + if mq.msgsnd(tgt, &dem, sizeof(m.demand), 0) == -1 then + lib.bail('could not send command to target process') + end + while true do + var ack: m.ack + if mq.msgrcv(self.msqid, &ack, sizeof(m.ack), 0, 0) == -1 then + lib.bail('error while awaiting response') + end + if ack.cliq == tgt then + lib.dbg('got response, writing out and returning') + @result = ack + break + else lib.warn('got spurious response, ignoring') end + end + end + end +end + +terra m.demand:ack(emp: &m.emperor, a: &m.ack) + a.clid = emp.clid + a.cliq = emp.msqid + mq.msgsnd(self.empq, a, sizeof(m.ack), 0) +end + +m.global_emperor = global(m.emperor, `m.emperor {shmid = 0}, 'process-emperor') -- :/ + +terra m.emperor:release() if self.shmid == 0 then return end + if self:countpeers() == 0 then + -- we're the last user, go ahead and destroy it + sem.semctl(self.edict.semid,0,ipc.IPC_RMID) + shm.shmctl(self.shmid,ipc.IPC_RMID,nil) + end + shm.shmdt([&opaque](self.edict)) + mq.msgctl(self.msqid,ipc.IPC_RMID,nil) + S.sigaction(S.SIGINT, &m.global_emperor.oldsig_int, nil) + S.sigaction(S.SIGTERM, &m.global_emperor.oldsig_term, nil) + self.shmid = 0 +end + +terra handle_interrupt(sig: int): {} + m.global_emperor:release() + lib.warn('caught interrupt, shutting down') + m.proc_active = false +end + +return m