Overview
| Comment: | check in missing file |
|---|---|
| Downloads: | Tarball | ZIP archive | SQL archive |
| Timelines: | family | ancestors | descendants | both | trunk |
| Files: | files | file ages | folders |
| SHA3-256: |
5a4f99fb5583001945effbb48f4b3366 |
| User & Date: | lexi on 2020-12-29 14:35:21 |
| Other Links: | manifest | tags |
Context
|
2020-12-29
| ||
| 15:48 | enable remote control of running instances check-in: f8816b0ab5 user: lexi tags: trunk | |
| 14:35 | check in missing file check-in: 5a4f99fb55 user: lexi tags: trunk | |
| 14:35 | add ipc backbone check-in: 87731d4007 user: lexi tags: trunk | |
Changes
Added ipc.t version [5ea8386d8f].
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 |
-- vim: ft=terra -- NOTA BENE: this is a hack to get around the -- weaknesses of the terra macro parser. it is -- highly undesirable and should be replaced if -- suitable IPC primitives become available local S = terralib.includecstring [[ #include <signal.h> int _min(void) { return SIGRTMIN; } int _max(void) { return SIGRTMAX; } typedef struct sigaction _sig_spec; ]]--terra cannot access structures when they have the same name as a function local m = { signals = { -- rtsigs notify_state_change = 0x3; notify_cond_update = 0x4; -- values sent through rtsig state_success = 0x69; state_fail_find = 0x93; -- system sigs sys_child = S.SIGCHLD; }; cmd = lib.enum { 'none', 'enumerate', 'stop', 'cfgrefresh', 'chnoise' }; proc_active = global(true); ipckey = 0x8e037c6; } terra m.signum(reqsig: int) var sig = S._max() - reqsig -- we implement this in contradiction to the recommendations of the manpages -- because they make no goddamn sense. if SIGRTMIN can vary, using as a basis -- for signals is really rather stupid -- there's no guarantee a signal sent -- by one process will be interpreted correctly by another, as its SIGRTMIN -- might well be different! so we use SIGRTMAX as the base instead, assuming -- that it's more likely to remain constant across the whole system. if reqsig > S._max() or sig < S._min() then lib.bail('realtime signal error - requested signal number is greater than the available number of realtime signals') end return sig end terra m.sigdec(rtsig: int) -- exactly like the above, except without bounds checking return S._max() - rtsig end local getppid = terralib.externfunction('getppid', {} -> int); local sigsend = terralib.externfunction('sigqueue', {int,int,intptr} -> int); local sigfd = terralib.includec 'sys/signalfd.h'; terra m.notify_parent(sig: int) sigsend(getppid(), m.signum(m.signals.notify_state_change), sig) end struct m.listener { prevmask:S.sigset_t fd:int } terra m.listener.methods.mk() var mask: S.sigset_t var l: m.listener S.sigemptyset(&mask) -- important to keep this in sync with any new signals we add S.sigaddset(&mask, m.signum(m.signals.notify_state_change)) S.sigaddset(&mask, m.signum(m.signals.notify_cond_update)) S.sigaddset(&mask, S.SIGCHLD) S.sigprocmask(S.SIG_BLOCK, &mask, &l.prevmask) l.fd = sigfd.signalfd(-1, &mask, 0) return l end terra m.listener:block() var si: sigfd.signalfd_siginfo lib.io.recv(self.fd, [rawstring](&si), sizeof([si.type])) var syssig = si.ssi_signo < S._min() or si.ssi_signo > S._max(); return { from = si.ssi_pid; system = syssig; sig = lib.trn(syssig, si.ssi_signo, m.sigdec(si.ssi_signo)); event = si.ssi_int; exit = si.ssi_status; } end terra m.listener:release() lib.io.close(self.fd) S.sigprocmask(S.SIG_SETMASK, &self.prevmask, nil) end local ipc = terralib.includec 'sys/ipc.h' local shm = terralib.includec 'sys/shm.h' local sem = terralib.includec 'sys/sem.h' local mq = terralib.includec 'sys/msg.h' struct m.ack { clid: uint64 cliq: int success: bool union { iname: int8[64] } } struct m.demand { cmd: m.cmd.t empq: ipc.key_t -- ID of the current emperor's msg queue, chosen randomly operand: uint64 -- extra cmd parameter, where useful } struct m.edict { semid: int -- controls access to shared memory serial: uint64 -- starts at zero, no action taken until set to 1 nextclient: uint64 -- starting at 1 demand: m.demand } terra m.edict:sem(op: int) var sb = sem.sembuf { sem_num = 1; sem_op = op; sem_flg = 0; } return sem.semop(self.semid, &sb, 1) end struct m.emperor { shmid: int msqid: int -- my message queue clid: uint64 -- my client serial id lastcmd: uint64 -- when serial increases above this, execute the command edict: &m.edict client: bool oldsig_int: S._sig_spec oldsig_term: S._sig_spec } terra m.emperor:countpeers(): intptr var si: shm.shmid_ds shm.shmctl(self.shmid,ipc.IPC_STAT,&si) return si.shm_nattch - 1 end local terra handle_interrupt :: int -> {} terra m.emperor.methods.mk(client: bool) var em = m.emperor { lastcmd = 0, client = client } em.shmid = shm.shmget(m.ipckey, sizeof(m.emperor), 0); if em.shmid == -1 then -- needs to be created em.shmid = shm.shmget(m.ipckey, sizeof(m.emperor), ipc.IPC_CREAT or ipc.IPC_EXCL or 0600); if em.shmid == -1 then lib.bail('could not get shared memory segment; may have clashed with simultaneously starting process, if any') end em.edict = [&m.edict](shm.shmat(em.shmid, nil, 0)) em.edict.semid = sem.semget([ipc.key_t](0), 1, 0600) else em.edict = [&m.edict](shm.shmat(em.shmid, nil, 0)) end if [intptr](em.edict) == [intptr](-1) then lib.bail('could not attach shared memory segment') end if client then em.edict:sem(0) -- wait our turn em.edict:sem(1) -- lock edict if em.edict.nextclient == 0 then em.edict.nextclient = 1 end em.clid = em.edict.nextclient em.edict.nextclient = em.clid + 1 em.edict:sem(-1) -- release edict end -- while true do -- var id = lib.crypt.random(uint64) % [ipc.key_t:max()] -- em.msqid = mq.msgget([ipc.key_t](id),ipc.IPC_CREAT or ipc.IPC_EXCL) -- if em.msqid ~= -1 then break end -- end em.msqid = mq.msgget([ipc.key_t](0), 0600) var empty: S.sigset_t S.sigemptyset(&empty) -- 🙄 var ss = S._sig_spec { sa_mask = empty, sa_flags = 0 } ss.__sigaction_handler.sa_handler = handle_interrupt; -- may not be portable S.sigaction(S.SIGINT, &ss, &em.oldsig_int) S.sigaction(S.SIGTERM, &ss, &em.oldsig_term) return em end terra m.emperor:poll(cmd: &m.demand) if self.shmid == 0 then return end if self.edict.serial > self.lastcmd then self.lastcmd = self.edict.serial @cmd = self.edict.demand self.edict:sem(-1) -- release my lock return end if mq.msgrcv(self.msqid, cmd, sizeof(m.demand), 0, ipc.IPC_NOWAIT) ~= -1 then return end cmd.cmd = m.cmd.none end terra m.emperor:mallack() return lib.mem.heapa(m.ack, self:countpeers()) end terra m.emperor.methods.decree :: { &m.emperor, uint64, rawstring, m.cmd.t, uint64, &m.ack } -> {} terra m.emperor:decree( tgtclid: uint64, tgtname: rawstring, cmd: m.cmd.t, operand: uint64, result: &m.ack ): {} if self.client then lib.bail('client attempted to issue IPC decree') end var dem = m.demand { cmd = cmd; operand = operand; empq = self.msqid; -- register to receive replies } var npeers = self:countpeers() if npeers == 0 then lib.bail('no processes connected to control bus') end if tgtclid == 0 and tgtname == nil then lib.dbg('sending to all instances, waiting for edict to become writable') self.edict:sem(0) -- wait for all locks on edict to resolve lib.dbg('locking edict') self.edict:sem(npeers) -- place a read lock for each peer self.edict.demand = dem lib.dbg('sending edict') self.edict.serial = self.edict.serial + 1 -- send command var acks: intptr = 0 while acks < npeers do lib.dbg('awaiting responses from instances') if mq.msgrcv(self.msqid, result + acks, sizeof(m.ack), 0, 0) == -1 then lib.bail('error occurred while waiting for responses from instances') end acks = acks + 1 -- TODO: handle rare case that a process dies here without releasing -- its semaphore read lock, desyncing and paralyzing IPC end lib.dbg('all instances have reported in') else -- TODO: get msgqueue and send edict var acks = lib.mem.heapa(m.ack, npeers) self:decree(0, nil, m.cmd.enumerate, 0, &acks(0)) var tgt: int var found = false for i=0,npeers do if acks(i).clid == tgtclid or ( tgtname ~= nil and lib.str.cmp(tgtname, acks(i).iname) == 0 ) then found = true tgt = acks(i).cliq end end acks:free() if not found then lib.warn('no such instance is currently online and responding to IPC calls') else lib.dbg('located instance, sending command') if mq.msgsnd(tgt, &dem, sizeof(m.demand), 0) == -1 then lib.bail('could not send command to target process') end while true do var ack: m.ack if mq.msgrcv(self.msqid, &ack, sizeof(m.ack), 0, 0) == -1 then lib.bail('error while awaiting response') end if ack.cliq == tgt then lib.dbg('got response, writing out and returning') @result = ack break else lib.warn('got spurious response, ignoring') end end end end end terra m.demand:ack(emp: &m.emperor, a: &m.ack) a.clid = emp.clid a.cliq = emp.msqid mq.msgsnd(self.empq, a, sizeof(m.ack), 0) end m.global_emperor = global(m.emperor, `m.emperor {shmid = 0}, 'process-emperor') -- :/ terra m.emperor:release() if self.shmid == 0 then return end if self:countpeers() == 0 then -- we're the last user, go ahead and destroy it sem.semctl(self.edict.semid,0,ipc.IPC_RMID) shm.shmctl(self.shmid,ipc.IPC_RMID,nil) end shm.shmdt([&opaque](self.edict)) mq.msgctl(self.msqid,ipc.IPC_RMID,nil) S.sigaction(S.SIGINT, &m.global_emperor.oldsig_int, nil) S.sigaction(S.SIGTERM, &m.global_emperor.oldsig_term, nil) self.shmid = 0 end terra handle_interrupt(sig: int): {} m.global_emperor:release() lib.warn('caught interrupt, shutting down') m.proc_active = false end return m |