parsav  Check-in [5a4f99fb55]

Overview
Comment:check in missing file
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA3-256: 5a4f99fb5583001945effbb48f4b3366b5c02642ee32e525645c375f8e3afcf6
User & Date: lexi on 2020-12-29 14:35:21
Other Links: manifest | tags
Context
2020-12-29
15:48
enable remote control of running instances check-in: f8816b0ab5 user: lexi tags: trunk
14:35
check in missing file check-in: 5a4f99fb55 user: lexi tags: trunk
14:35
add ipc backbone check-in: 87731d4007 user: lexi tags: trunk
Changes

Added ipc.t version [5ea8386d8f].

            1  +-- vim: ft=terra
            2  +
            3  +-- NOTA BENE: this is a hack to get around the
            4  +-- weaknesses of the terra macro parser. it is
            5  +-- highly undesirable and should be replaced if
            6  +-- suitable IPC primitives become available
            7  +local S = terralib.includecstring [[
            8  +	#include <signal.h>
            9  +	int _min(void) { return SIGRTMIN; }
           10  +	int _max(void) { return SIGRTMAX; }
           11  +	typedef struct sigaction _sig_spec;
           12  +]]--terra cannot access structures when they have the same name as a function
           13  +
           14  +local m = {
           15  +	signals = {
           16  +		-- rtsigs
           17  +		notify_state_change = 0x3;
           18  +		notify_cond_update = 0x4;
           19  +		-- values sent through rtsig
           20  +		state_success = 0x69;
           21  +		state_fail_find = 0x93;
           22  +
           23  +		-- system sigs
           24  +		sys_child = S.SIGCHLD;
           25  +	};
           26  +	cmd = lib.enum {
           27  +		'none', 'enumerate', 'stop', 'cfgrefresh', 'chnoise'
           28  +	};
           29  +	proc_active = global(true);
           30  +	ipckey = 0x8e037c6;
           31  +}
           32  +
           33  +terra m.signum(reqsig: int)
           34  +	var sig = S._max() - reqsig
           35  +	-- we implement this in contradiction to the recommendations of the manpages
           36  +	-- because they make no goddamn sense. if SIGRTMIN can vary, using as a basis
           37  +	-- for signals is really rather stupid -- there's no guarantee a signal sent
           38  +	-- by one process will be interpreted correctly by another, as its SIGRTMIN
           39  +	-- might well be different! so we use SIGRTMAX as the base instead, assuming
           40  +	-- that it's more likely to remain constant across the whole system.
           41  +	if reqsig > S._max() or sig < S._min() then lib.bail('realtime signal error - requested signal number is greater than the available number of realtime signals') end
           42  +	return sig
           43  +end
           44  +
           45  +terra m.sigdec(rtsig: int)
           46  +-- exactly like the above, except without bounds checking
           47  +	return S._max() - rtsig
           48  +end
           49  +
           50  +local getppid = terralib.externfunction('getppid', {} -> int);
           51  +local sigsend = terralib.externfunction('sigqueue', {int,int,intptr} -> int);
           52  +local sigfd = terralib.includec 'sys/signalfd.h';
           53  +
           54  +terra m.notify_parent(sig: int)
           55  +	sigsend(getppid(), m.signum(m.signals.notify_state_change), sig)
           56  +end
           57  +
           58  +struct m.listener { prevmask:S.sigset_t fd:int }
           59  +
           60  +terra m.listener.methods.mk()
           61  +	var mask: S.sigset_t
           62  +	var l: m.listener
           63  +	S.sigemptyset(&mask)
           64  +	-- important to keep this in sync with any new signals we add
           65  +	S.sigaddset(&mask, m.signum(m.signals.notify_state_change))
           66  +	S.sigaddset(&mask, m.signum(m.signals.notify_cond_update))
           67  +	S.sigaddset(&mask, S.SIGCHLD)
           68  +	S.sigprocmask(S.SIG_BLOCK, &mask, &l.prevmask)
           69  +	l.fd = sigfd.signalfd(-1, &mask, 0)
           70  +	return l
           71  +end
           72  +
           73  +terra m.listener:block()
           74  +	var si: sigfd.signalfd_siginfo
           75  +	lib.io.recv(self.fd, [rawstring](&si), sizeof([si.type]))
           76  +	var syssig = si.ssi_signo < S._min() or si.ssi_signo > S._max();
           77  +	return {
           78  +		from = si.ssi_pid;
           79  +		system = syssig;
           80  +		sig = lib.trn(syssig, si.ssi_signo, m.sigdec(si.ssi_signo));
           81  +		event = si.ssi_int;
           82  +		exit = si.ssi_status;
           83  +	}
           84  +end
           85  +
           86  +terra m.listener:release()
           87  +	lib.io.close(self.fd)
           88  +	S.sigprocmask(S.SIG_SETMASK, &self.prevmask, nil)
           89  +end
           90  +
           91  +local ipc = terralib.includec 'sys/ipc.h'
           92  +local shm = terralib.includec 'sys/shm.h'
           93  +local sem = terralib.includec 'sys/sem.h'
           94  +local mq = terralib.includec 'sys/msg.h'
           95  +
           96  +struct m.ack {
           97  +	clid: uint64
           98  +	cliq: int
           99  +	success: bool
          100  +	union {
          101  +		iname: int8[64]
          102  +	}
          103  +}
          104  +
          105  +struct m.demand {
          106  +	cmd: m.cmd.t
          107  +	empq: ipc.key_t -- ID of the current emperor's msg queue, chosen randomly
          108  +	operand: uint64 -- extra cmd parameter, where useful
          109  +}
          110  +
          111  +struct m.edict {
          112  +	semid: int -- controls access to shared memory
          113  +	serial: uint64 -- starts at zero, no action taken until set to 1
          114  +	nextclient: uint64 -- starting at 1
          115  +
          116  +	demand: m.demand
          117  +}
          118  +
          119  +terra m.edict:sem(op: int)
          120  +	var sb = sem.sembuf {
          121  +		sem_num = 1;
          122  +		sem_op = op;
          123  +		sem_flg = 0;
          124  +	}
          125  +	return sem.semop(self.semid, &sb, 1)
          126  +end
          127  +
          128  +struct m.emperor {
          129  +	shmid: int
          130  +	msqid: int -- my message queue
          131  +	clid: uint64 -- my client serial id
          132  +	lastcmd: uint64 -- when serial increases above this, execute the command
          133  +	edict: &m.edict
          134  +	client: bool
          135  +	oldsig_int: S._sig_spec
          136  +	oldsig_term: S._sig_spec
          137  +}
          138  +
          139  +
          140  +terra m.emperor:countpeers(): intptr
          141  +	var si: shm.shmid_ds
          142  +	shm.shmctl(self.shmid,ipc.IPC_STAT,&si)
          143  +	return si.shm_nattch - 1
          144  +end
          145  +
          146  +
          147  +local terra handle_interrupt :: int -> {}
          148  +
          149  +terra m.emperor.methods.mk(client: bool)
          150  +	var em = m.emperor { lastcmd = 0, client = client }
          151  +	em.shmid = shm.shmget(m.ipckey, sizeof(m.emperor), 0);
          152  +	if em.shmid == -1 then -- needs to be created
          153  +		em.shmid = shm.shmget(m.ipckey, sizeof(m.emperor), ipc.IPC_CREAT or ipc.IPC_EXCL or 0600);
          154  +		if em.shmid == -1 then lib.bail('could not get shared memory segment; may have clashed with simultaneously starting process, if any') end
          155  +		em.edict = [&m.edict](shm.shmat(em.shmid, nil, 0))
          156  +		em.edict.semid = sem.semget([ipc.key_t](0), 1, 0600)
          157  +	else
          158  +		em.edict = [&m.edict](shm.shmat(em.shmid, nil, 0))
          159  +	end
          160  +	if [intptr](em.edict) == [intptr](-1) then lib.bail('could not attach shared memory segment') end
          161  +	if client then
          162  +		em.edict:sem(0) -- wait our turn
          163  +		em.edict:sem(1) -- lock edict
          164  +		if em.edict.nextclient == 0 then em.edict.nextclient = 1 end
          165  +		em.clid = em.edict.nextclient 
          166  +		em.edict.nextclient = em.clid + 1
          167  +		em.edict:sem(-1) -- release edict
          168  +	end
          169  +--	while true do
          170  +--		var id = lib.crypt.random(uint64) % [ipc.key_t:max()]
          171  +--		em.msqid = mq.msgget([ipc.key_t](id),ipc.IPC_CREAT or ipc.IPC_EXCL)
          172  +--		if em.msqid ~= -1 then break end
          173  +--	end
          174  +	em.msqid = mq.msgget([ipc.key_t](0), 0600)
          175  +	var empty: S.sigset_t
          176  +	S.sigemptyset(&empty) -- 🙄
          177  +	var ss = S._sig_spec { sa_mask = empty, sa_flags = 0 }
          178  +	ss.__sigaction_handler.sa_handler = handle_interrupt; -- may not be portable
          179  +	S.sigaction(S.SIGINT, &ss, &em.oldsig_int)
          180  +	S.sigaction(S.SIGTERM, &ss, &em.oldsig_term)
          181  +	return em
          182  +end
          183  +
          184  +terra m.emperor:poll(cmd: &m.demand) if self.shmid == 0 then return end
          185  +	if self.edict.serial > self.lastcmd then
          186  +		self.lastcmd = self.edict.serial
          187  +		@cmd = self.edict.demand
          188  +		self.edict:sem(-1) -- release my lock
          189  +		return
          190  +	end
          191  +
          192  +	if mq.msgrcv(self.msqid, cmd, sizeof(m.demand), 0, ipc.IPC_NOWAIT) ~= -1 then
          193  +		return
          194  +	end
          195  +
          196  +	cmd.cmd = m.cmd.none
          197  +end
          198  +
          199  +terra m.emperor:mallack() return lib.mem.heapa(m.ack, self:countpeers()) end
          200  +terra m.emperor.methods.decree :: {
          201  +	&m.emperor, uint64, rawstring, m.cmd.t, uint64, &m.ack
          202  +} -> {}
          203  +terra m.emperor:decree(
          204  +	tgtclid: uint64, tgtname: rawstring,
          205  +	cmd: m.cmd.t, operand: uint64,
          206  +	result: &m.ack
          207  +): {}
          208  +	if self.client then lib.bail('client attempted to issue IPC decree') end
          209  +	var dem = m.demand {
          210  +		cmd = cmd;
          211  +		operand = operand;
          212  +		empq = self.msqid; -- register to receive replies
          213  +	}
          214  +	var npeers = self:countpeers()
          215  +	if npeers == 0 then lib.bail('no processes connected to control bus') end
          216  +	if tgtclid == 0 and tgtname == nil then
          217  +		lib.dbg('sending to all instances, waiting for edict to become writable')
          218  +		self.edict:sem(0) -- wait for all locks on edict to resolve
          219  +		lib.dbg('locking edict')
          220  +		self.edict:sem(npeers) -- place a read lock for each peer
          221  +		self.edict.demand = dem
          222  +		lib.dbg('sending edict')
          223  +		self.edict.serial = self.edict.serial + 1 -- send command
          224  +
          225  +		var acks: intptr = 0
          226  +		while acks < npeers do
          227  +			lib.dbg('awaiting responses from instances')
          228  +			if mq.msgrcv(self.msqid, result + acks, sizeof(m.ack), 0, 0) == -1 then
          229  +				lib.bail('error occurred while waiting for responses from instances')
          230  +			end
          231  +			acks = acks + 1
          232  +			-- TODO: handle rare case that a process dies here without releasing
          233  +			-- its semaphore read lock, desyncing and paralyzing IPC
          234  +		end
          235  +		lib.dbg('all instances have reported in')
          236  +	else
          237  +		-- TODO: get msgqueue and send edict
          238  +		var acks = lib.mem.heapa(m.ack, npeers)
          239  +		self:decree(0, nil, m.cmd.enumerate, 0, &acks(0))
          240  +		var tgt: int
          241  +		var found = false
          242  +		for i=0,npeers do
          243  +			if acks(i).clid == tgtclid or (
          244  +				tgtname ~= nil and lib.str.cmp(tgtname, acks(i).iname) == 0
          245  +			) then
          246  +				found = true
          247  +				tgt = acks(i).cliq
          248  +			end
          249  +		end
          250  +		acks:free()
          251  +		if not found then
          252  +			lib.warn('no such instance is currently online and responding to IPC calls')
          253  +		else
          254  +			lib.dbg('located instance, sending command')
          255  +			if mq.msgsnd(tgt, &dem, sizeof(m.demand), 0) == -1 then
          256  +				lib.bail('could not send command to target process')
          257  +			end
          258  +			while true do
          259  +				var ack: m.ack
          260  +				if mq.msgrcv(self.msqid, &ack, sizeof(m.ack), 0, 0) == -1 then
          261  +					lib.bail('error while awaiting response')
          262  +				end
          263  +				if ack.cliq == tgt then
          264  +					lib.dbg('got response, writing out and returning')
          265  +					@result = ack
          266  +					break
          267  +				else lib.warn('got spurious response, ignoring') end
          268  +			end
          269  +		end
          270  +	end
          271  +end
          272  +
          273  +terra m.demand:ack(emp: &m.emperor, a: &m.ack)
          274  +	a.clid = emp.clid
          275  +	a.cliq = emp.msqid
          276  +	mq.msgsnd(self.empq, a, sizeof(m.ack), 0)
          277  +end
          278  +
          279  +m.global_emperor = global(m.emperor, `m.emperor {shmid = 0}, 'process-emperor') -- :/
          280  +
          281  +terra m.emperor:release() if self.shmid == 0 then return end
          282  +	if self:countpeers() == 0 then
          283  +		-- we're the last user, go ahead and destroy it
          284  +		sem.semctl(self.edict.semid,0,ipc.IPC_RMID)
          285  +		shm.shmctl(self.shmid,ipc.IPC_RMID,nil)
          286  +	end
          287  +	shm.shmdt([&opaque](self.edict))
          288  +	mq.msgctl(self.msqid,ipc.IPC_RMID,nil)
          289  +	S.sigaction(S.SIGINT, &m.global_emperor.oldsig_int, nil)
          290  +	S.sigaction(S.SIGTERM, &m.global_emperor.oldsig_term, nil)
          291  +	self.shmid = 0
          292  +end
          293  +
          294  +terra handle_interrupt(sig: int): {}
          295  +	m.global_emperor:release()
          296  +	lib.warn('caught interrupt, shutting down')
          297  +	m.proc_active = false
          298  +end
          299  +
          300  +return m