Overview
| Comment: | check in missing file |
|---|---|
| Downloads: | Tarball | ZIP archive | SQL archive |
| Timelines: | family | ancestors | descendants | both | trunk |
| Files: | files | file ages | folders |
| SHA3-256: |
5a4f99fb5583001945effbb48f4b3366 |
| User & Date: | lexi on 2020-12-29 14:35:21 |
| Other Links: | manifest | tags |
Context
|
2020-12-29
| ||
| 15:48 | enable remote control of running instances check-in: f8816b0ab5 user: lexi tags: trunk | |
| 14:35 | check in missing file check-in: 5a4f99fb55 user: lexi tags: trunk | |
| 14:35 | add ipc backbone check-in: 87731d4007 user: lexi tags: trunk | |
Changes
Added ipc.t version [5ea8386d8f].
1 +-- vim: ft=terra 2 + 3 +-- NOTA BENE: this is a hack to get around the 4 +-- weaknesses of the terra macro parser. it is 5 +-- highly undesirable and should be replaced if 6 +-- suitable IPC primitives become available 7 +local S = terralib.includecstring [[ 8 + #include <signal.h> 9 + int _min(void) { return SIGRTMIN; } 10 + int _max(void) { return SIGRTMAX; } 11 + typedef struct sigaction _sig_spec; 12 +]]--terra cannot access structures when they have the same name as a function 13 + 14 +local m = { 15 + signals = { 16 + -- rtsigs 17 + notify_state_change = 0x3; 18 + notify_cond_update = 0x4; 19 + -- values sent through rtsig 20 + state_success = 0x69; 21 + state_fail_find = 0x93; 22 + 23 + -- system sigs 24 + sys_child = S.SIGCHLD; 25 + }; 26 + cmd = lib.enum { 27 + 'none', 'enumerate', 'stop', 'cfgrefresh', 'chnoise' 28 + }; 29 + proc_active = global(true); 30 + ipckey = 0x8e037c6; 31 +} 32 + 33 +terra m.signum(reqsig: int) 34 + var sig = S._max() - reqsig 35 + -- we implement this in contradiction to the recommendations of the manpages 36 + -- because they make no goddamn sense. if SIGRTMIN can vary, using as a basis 37 + -- for signals is really rather stupid -- there's no guarantee a signal sent 38 + -- by one process will be interpreted correctly by another, as its SIGRTMIN 39 + -- might well be different! so we use SIGRTMAX as the base instead, assuming 40 + -- that it's more likely to remain constant across the whole system. 41 + if reqsig > S._max() or sig < S._min() then lib.bail('realtime signal error - requested signal number is greater than the available number of realtime signals') end 42 + return sig 43 +end 44 + 45 +terra m.sigdec(rtsig: int) 46 +-- exactly like the above, except without bounds checking 47 + return S._max() - rtsig 48 +end 49 + 50 +local getppid = terralib.externfunction('getppid', {} -> int); 51 +local sigsend = terralib.externfunction('sigqueue', {int,int,intptr} -> int); 52 +local sigfd = terralib.includec 'sys/signalfd.h'; 53 + 54 +terra m.notify_parent(sig: int) 55 + sigsend(getppid(), m.signum(m.signals.notify_state_change), sig) 56 +end 57 + 58 +struct m.listener { prevmask:S.sigset_t fd:int } 59 + 60 +terra m.listener.methods.mk() 61 + var mask: S.sigset_t 62 + var l: m.listener 63 + S.sigemptyset(&mask) 64 + -- important to keep this in sync with any new signals we add 65 + S.sigaddset(&mask, m.signum(m.signals.notify_state_change)) 66 + S.sigaddset(&mask, m.signum(m.signals.notify_cond_update)) 67 + S.sigaddset(&mask, S.SIGCHLD) 68 + S.sigprocmask(S.SIG_BLOCK, &mask, &l.prevmask) 69 + l.fd = sigfd.signalfd(-1, &mask, 0) 70 + return l 71 +end 72 + 73 +terra m.listener:block() 74 + var si: sigfd.signalfd_siginfo 75 + lib.io.recv(self.fd, [rawstring](&si), sizeof([si.type])) 76 + var syssig = si.ssi_signo < S._min() or si.ssi_signo > S._max(); 77 + return { 78 + from = si.ssi_pid; 79 + system = syssig; 80 + sig = lib.trn(syssig, si.ssi_signo, m.sigdec(si.ssi_signo)); 81 + event = si.ssi_int; 82 + exit = si.ssi_status; 83 + } 84 +end 85 + 86 +terra m.listener:release() 87 + lib.io.close(self.fd) 88 + S.sigprocmask(S.SIG_SETMASK, &self.prevmask, nil) 89 +end 90 + 91 +local ipc = terralib.includec 'sys/ipc.h' 92 +local shm = terralib.includec 'sys/shm.h' 93 +local sem = terralib.includec 'sys/sem.h' 94 +local mq = terralib.includec 'sys/msg.h' 95 + 96 +struct m.ack { 97 + clid: uint64 98 + cliq: int 99 + success: bool 100 + union { 101 + iname: int8[64] 102 + } 103 +} 104 + 105 +struct m.demand { 106 + cmd: m.cmd.t 107 + empq: ipc.key_t -- ID of the current emperor's msg queue, chosen randomly 108 + operand: uint64 -- extra cmd parameter, where useful 109 +} 110 + 111 +struct m.edict { 112 + semid: int -- controls access to shared memory 113 + serial: uint64 -- starts at zero, no action taken until set to 1 114 + nextclient: uint64 -- starting at 1 115 + 116 + demand: m.demand 117 +} 118 + 119 +terra m.edict:sem(op: int) 120 + var sb = sem.sembuf { 121 + sem_num = 1; 122 + sem_op = op; 123 + sem_flg = 0; 124 + } 125 + return sem.semop(self.semid, &sb, 1) 126 +end 127 + 128 +struct m.emperor { 129 + shmid: int 130 + msqid: int -- my message queue 131 + clid: uint64 -- my client serial id 132 + lastcmd: uint64 -- when serial increases above this, execute the command 133 + edict: &m.edict 134 + client: bool 135 + oldsig_int: S._sig_spec 136 + oldsig_term: S._sig_spec 137 +} 138 + 139 + 140 +terra m.emperor:countpeers(): intptr 141 + var si: shm.shmid_ds 142 + shm.shmctl(self.shmid,ipc.IPC_STAT,&si) 143 + return si.shm_nattch - 1 144 +end 145 + 146 + 147 +local terra handle_interrupt :: int -> {} 148 + 149 +terra m.emperor.methods.mk(client: bool) 150 + var em = m.emperor { lastcmd = 0, client = client } 151 + em.shmid = shm.shmget(m.ipckey, sizeof(m.emperor), 0); 152 + if em.shmid == -1 then -- needs to be created 153 + em.shmid = shm.shmget(m.ipckey, sizeof(m.emperor), ipc.IPC_CREAT or ipc.IPC_EXCL or 0600); 154 + if em.shmid == -1 then lib.bail('could not get shared memory segment; may have clashed with simultaneously starting process, if any') end 155 + em.edict = [&m.edict](shm.shmat(em.shmid, nil, 0)) 156 + em.edict.semid = sem.semget([ipc.key_t](0), 1, 0600) 157 + else 158 + em.edict = [&m.edict](shm.shmat(em.shmid, nil, 0)) 159 + end 160 + if [intptr](em.edict) == [intptr](-1) then lib.bail('could not attach shared memory segment') end 161 + if client then 162 + em.edict:sem(0) -- wait our turn 163 + em.edict:sem(1) -- lock edict 164 + if em.edict.nextclient == 0 then em.edict.nextclient = 1 end 165 + em.clid = em.edict.nextclient 166 + em.edict.nextclient = em.clid + 1 167 + em.edict:sem(-1) -- release edict 168 + end 169 +-- while true do 170 +-- var id = lib.crypt.random(uint64) % [ipc.key_t:max()] 171 +-- em.msqid = mq.msgget([ipc.key_t](id),ipc.IPC_CREAT or ipc.IPC_EXCL) 172 +-- if em.msqid ~= -1 then break end 173 +-- end 174 + em.msqid = mq.msgget([ipc.key_t](0), 0600) 175 + var empty: S.sigset_t 176 + S.sigemptyset(&empty) -- 🙄 177 + var ss = S._sig_spec { sa_mask = empty, sa_flags = 0 } 178 + ss.__sigaction_handler.sa_handler = handle_interrupt; -- may not be portable 179 + S.sigaction(S.SIGINT, &ss, &em.oldsig_int) 180 + S.sigaction(S.SIGTERM, &ss, &em.oldsig_term) 181 + return em 182 +end 183 + 184 +terra m.emperor:poll(cmd: &m.demand) if self.shmid == 0 then return end 185 + if self.edict.serial > self.lastcmd then 186 + self.lastcmd = self.edict.serial 187 + @cmd = self.edict.demand 188 + self.edict:sem(-1) -- release my lock 189 + return 190 + end 191 + 192 + if mq.msgrcv(self.msqid, cmd, sizeof(m.demand), 0, ipc.IPC_NOWAIT) ~= -1 then 193 + return 194 + end 195 + 196 + cmd.cmd = m.cmd.none 197 +end 198 + 199 +terra m.emperor:mallack() return lib.mem.heapa(m.ack, self:countpeers()) end 200 +terra m.emperor.methods.decree :: { 201 + &m.emperor, uint64, rawstring, m.cmd.t, uint64, &m.ack 202 +} -> {} 203 +terra m.emperor:decree( 204 + tgtclid: uint64, tgtname: rawstring, 205 + cmd: m.cmd.t, operand: uint64, 206 + result: &m.ack 207 +): {} 208 + if self.client then lib.bail('client attempted to issue IPC decree') end 209 + var dem = m.demand { 210 + cmd = cmd; 211 + operand = operand; 212 + empq = self.msqid; -- register to receive replies 213 + } 214 + var npeers = self:countpeers() 215 + if npeers == 0 then lib.bail('no processes connected to control bus') end 216 + if tgtclid == 0 and tgtname == nil then 217 + lib.dbg('sending to all instances, waiting for edict to become writable') 218 + self.edict:sem(0) -- wait for all locks on edict to resolve 219 + lib.dbg('locking edict') 220 + self.edict:sem(npeers) -- place a read lock for each peer 221 + self.edict.demand = dem 222 + lib.dbg('sending edict') 223 + self.edict.serial = self.edict.serial + 1 -- send command 224 + 225 + var acks: intptr = 0 226 + while acks < npeers do 227 + lib.dbg('awaiting responses from instances') 228 + if mq.msgrcv(self.msqid, result + acks, sizeof(m.ack), 0, 0) == -1 then 229 + lib.bail('error occurred while waiting for responses from instances') 230 + end 231 + acks = acks + 1 232 + -- TODO: handle rare case that a process dies here without releasing 233 + -- its semaphore read lock, desyncing and paralyzing IPC 234 + end 235 + lib.dbg('all instances have reported in') 236 + else 237 + -- TODO: get msgqueue and send edict 238 + var acks = lib.mem.heapa(m.ack, npeers) 239 + self:decree(0, nil, m.cmd.enumerate, 0, &acks(0)) 240 + var tgt: int 241 + var found = false 242 + for i=0,npeers do 243 + if acks(i).clid == tgtclid or ( 244 + tgtname ~= nil and lib.str.cmp(tgtname, acks(i).iname) == 0 245 + ) then 246 + found = true 247 + tgt = acks(i).cliq 248 + end 249 + end 250 + acks:free() 251 + if not found then 252 + lib.warn('no such instance is currently online and responding to IPC calls') 253 + else 254 + lib.dbg('located instance, sending command') 255 + if mq.msgsnd(tgt, &dem, sizeof(m.demand), 0) == -1 then 256 + lib.bail('could not send command to target process') 257 + end 258 + while true do 259 + var ack: m.ack 260 + if mq.msgrcv(self.msqid, &ack, sizeof(m.ack), 0, 0) == -1 then 261 + lib.bail('error while awaiting response') 262 + end 263 + if ack.cliq == tgt then 264 + lib.dbg('got response, writing out and returning') 265 + @result = ack 266 + break 267 + else lib.warn('got spurious response, ignoring') end 268 + end 269 + end 270 + end 271 +end 272 + 273 +terra m.demand:ack(emp: &m.emperor, a: &m.ack) 274 + a.clid = emp.clid 275 + a.cliq = emp.msqid 276 + mq.msgsnd(self.empq, a, sizeof(m.ack), 0) 277 +end 278 + 279 +m.global_emperor = global(m.emperor, `m.emperor {shmid = 0}, 'process-emperor') -- :/ 280 + 281 +terra m.emperor:release() if self.shmid == 0 then return end 282 + if self:countpeers() == 0 then 283 + -- we're the last user, go ahead and destroy it 284 + sem.semctl(self.edict.semid,0,ipc.IPC_RMID) 285 + shm.shmctl(self.shmid,ipc.IPC_RMID,nil) 286 + end 287 + shm.shmdt([&opaque](self.edict)) 288 + mq.msgctl(self.msqid,ipc.IPC_RMID,nil) 289 + S.sigaction(S.SIGINT, &m.global_emperor.oldsig_int, nil) 290 + S.sigaction(S.SIGTERM, &m.global_emperor.oldsig_term, nil) 291 + self.shmid = 0 292 +end 293 + 294 +terra handle_interrupt(sig: int): {} 295 + m.global_emperor:release() 296 + lib.warn('caught interrupt, shutting down') 297 + m.proc_active = false 298 +end 299 + 300 +return m