parsav  ipc.t at [9cc5d48cd8]

File ipc.t artifact ff639e2a51 part of check-in 9cc5d48cd8


-- vim: ft=terra

-- NOTA BENE: this is a hack to get around the
-- weaknesses of the terra macro parser. it is
-- highly undesirable and should be replaced if
-- suitable IPC primitives become available
local S = terralib.includecstring [[
	#include <signal.h>
	int _min(void) { return SIGRTMIN; }
	int _max(void) { return SIGRTMAX; }
	typedef struct sigaction _sig_spec;
]]--terra cannot access structures when they have the same name as a function

local m = {
	signals = {
		-- rtsigs
		notify_state_change = 0x3;
		notify_cond_update = 0x4;
		-- values sent through rtsig
		state_success = 0x69;
		state_fail_find = 0x93;

		-- system sigs
		sys_child = S.SIGCHLD;
	};
	cmd = lib.enum {
		'none', 'enumerate', 'stop', 'cfgrefresh', 'chnoise'
	};
	proc_active = global(true);
	ipckey = 0x8e037c6;
}

terra m.signum(reqsig: int)
	var sig = S._max() - reqsig
	-- we implement this in contradiction to the recommendations of the manpages
	-- because they make no goddamn sense. if SIGRTMIN can vary, using as a basis
	-- for signals is really rather stupid -- there's no guarantee a signal sent
	-- by one process will be interpreted correctly by another, as its SIGRTMIN
	-- might well be different! so we use SIGRTMAX as the base instead, assuming
	-- that it's more likely to remain constant across the whole system.
	if reqsig > S._max() or sig < S._min() then lib.bail('realtime signal error - requested signal number is greater than the available number of realtime signals') end
	return sig
end

terra m.sigdec(rtsig: int)
-- exactly like the above, except without bounds checking
	return S._max() - rtsig
end

local getppid = terralib.externfunction('getppid', {} -> int);
local sigsend = terralib.externfunction('sigqueue', {int,int,intptr} -> int);
local sigfd = terralib.includec 'sys/signalfd.h';

terra m.notify_parent(sig: int)
	sigsend(getppid(), m.signum(m.signals.notify_state_change), sig)
end

struct m.listener { prevmask:S.sigset_t fd:int }

terra m.listener.methods.mk()
	var mask: S.sigset_t
	var l: m.listener
	S.sigemptyset(&mask)
	-- important to keep this in sync with any new signals we add
	S.sigaddset(&mask, m.signum(m.signals.notify_state_change))
	S.sigaddset(&mask, m.signum(m.signals.notify_cond_update))
	S.sigaddset(&mask, S.SIGCHLD)
	S.sigprocmask(S.SIG_BLOCK, &mask, &l.prevmask)
	l.fd = sigfd.signalfd(-1, &mask, 0)
	return l
end

terra m.listener:block()
	var si: sigfd.signalfd_siginfo
	lib.io.recv(self.fd, [rawstring](&si), sizeof([si.type]))
	var syssig = si.ssi_signo < S._min() or si.ssi_signo > S._max();
	return {
		from = si.ssi_pid;
		system = syssig;
		sig = lib.trn(syssig, si.ssi_signo, m.sigdec(si.ssi_signo));
		event = si.ssi_int;
		exit = si.ssi_status;
	}
end

terra m.listener:release()
	lib.io.close(self.fd)
	S.sigprocmask(S.SIG_SETMASK, &self.prevmask, nil)
end

local ipc = terralib.includec 'sys/ipc.h'
local shm = terralib.includec 'sys/shm.h'
local sem = terralib.includec 'sys/sem.h'
local mq = terralib.includec 'sys/msg.h'

struct m.ack {
	clid: uint64
	cliq: int
	success: bool
	union {
		iname: int8[64]
	}
}

struct m.demand {
	cmd: m.cmd.t
	empq: ipc.key_t -- ID of the current emperor's msg queue, chosen randomly
	operand: uint64 -- extra cmd parameter, where useful
}

struct m.edict {
	semid: int -- controls access to shared memory
	serial: uint64 -- starts at zero, no action taken until set to 1
	nextclient: uint64 -- starting at 1

	demand: m.demand
}

terra m.edict:sem(op: int)
	var sb = sem.sembuf {
		sem_num = 1;
		sem_op = op;
		sem_flg = 0;
	}
	return sem.semop(self.semid, &sb, 1)
end

struct m.emperor {
	shmid: int
	msqid: int -- my message queue
	clid: uint64 -- my client serial id
	lastcmd: uint64 -- when serial increases above this, execute the command
	edict: &m.edict
	client: bool
	oldsig_int: S._sig_spec
	oldsig_term: S._sig_spec
}


terra m.emperor:countpeers(): intptr
	var si: shm.shmid_ds
	shm.shmctl(self.shmid,ipc.IPC_STAT,&si)
	return si.shm_nattch - 1
end


local terra handle_interrupt :: int -> {}

terra m.emperor.methods.mk(client: bool)
	var em = m.emperor { lastcmd = 0, client = client }
	em.shmid = shm.shmget(m.ipckey, sizeof(m.emperor), 0);
	if em.shmid == -1 then -- needs to be created
		em.shmid = shm.shmget(m.ipckey, sizeof(m.emperor), ipc.IPC_CREAT or ipc.IPC_EXCL or 0600);
		if em.shmid == -1 then lib.bail('could not get shared memory segment; may have clashed with simultaneously starting process, if any') end
		em.edict = [&m.edict](shm.shmat(em.shmid, nil, 0))
		em.edict.semid = sem.semget([ipc.key_t](0), 1, 0600)
	else
		em.edict = [&m.edict](shm.shmat(em.shmid, nil, 0))
	end
	if [intptr](em.edict) == [intptr](-1) then lib.bail('could not attach shared memory segment') end
	if client then
		em.edict:sem(0) -- wait our turn
		em.edict:sem(1) -- lock edict
		if em.edict.nextclient == 0 then em.edict.nextclient = 1 end
		em.clid = em.edict.nextclient 
		em.edict.nextclient = em.clid + 1
		em.edict:sem(-1) -- release edict
	end
--	while true do
--		var id = lib.crypt.random(uint64) % [ipc.key_t:max()]
--		em.msqid = mq.msgget([ipc.key_t](id),ipc.IPC_CREAT or ipc.IPC_EXCL)
--		if em.msqid ~= -1 then break end
--	end
	em.msqid = mq.msgget([ipc.key_t](0), 0600)
	var empty: S.sigset_t
	S.sigemptyset(&empty) -- 🙄
	var ss = S._sig_spec { sa_mask = empty, sa_flags = 0 }
	ss.__sigaction_handler.sa_handler = handle_interrupt; -- may not be portable
	S.sigaction(S.SIGINT, &ss, &em.oldsig_int)
	S.sigaction(S.SIGTERM, &ss, &em.oldsig_term)
	return em
end

terra m.emperor:poll(cmd: &m.demand) if self.shmid == 0 then return end
	if self.edict.serial > self.lastcmd then
		self.lastcmd = self.edict.serial
		@cmd = self.edict.demand
		self.edict:sem(-1) -- release my lock
		return
	end

	if mq.msgrcv(self.msqid, cmd, sizeof(m.demand), 0, ipc.IPC_NOWAIT) ~= -1 then
		return
	end

	cmd.cmd = m.cmd.none
end

terra m.emperor:mallack() return lib.mem.heapa(m.ack, self:countpeers()) end
terra m.emperor.methods.decree :: {
	&m.emperor, uint64, rawstring, m.cmd.t, uint64, &m.ack
} -> bool
terra m.emperor:decree(
	tgtclid: uint64, tgtname: rawstring,
	cmd: m.cmd.t, operand: uint64,
	result: &m.ack
): bool
	if self.client then lib.bail('client attempted to issue IPC decree') end
	var dem = m.demand {
		cmd = cmd;
		operand = operand;
		empq = self.msqid; -- register to receive replies
	}
	var npeers = self:countpeers()
	if npeers == 0 then lib.warn('no processes connected to control bus') return false end
	if tgtclid == 0 and tgtname == nil then
		lib.dbg('sending to all instances, waiting for edict to become writable')
		self.edict:sem(0) -- wait for all locks on edict to resolve
		lib.dbg('locking edict')
		self.edict:sem(npeers) -- place a read lock for each peer
		self.edict.demand = dem
		lib.dbg('sending edict')
		self.edict.serial = self.edict.serial + 1 -- send command

		var acks: intptr = 0
		while acks < npeers do
			lib.dbg('awaiting responses from instances')
			if mq.msgrcv(self.msqid, result + acks, sizeof(m.ack), 0, 0) == -1 then
				lib.bail('error occurred while waiting for responses from instances')
			end
			acks = acks + 1
			-- TODO: handle rare case that a process dies here without releasing
			-- its semaphore read lock, desyncing and paralyzing IPC
		end
		lib.dbg('all instances have reported in')
	else
		-- TODO: get msgqueue and send edict
		var acks = lib.mem.heapa(m.ack, npeers)
		self:decree(0, nil, m.cmd.enumerate, 0, &acks(0))
		var tgt: int
		var found = false
		for i=0,npeers do
			if acks(i).clid == tgtclid or (
				tgtname ~= nil and lib.str.cmp(tgtname, acks(i).iname) == 0
			) then
				found = true
				tgt = acks(i).cliq
			end
		end
		acks:free()
		if not found then
			lib.warn('no such instance is currently online and responding to IPC calls')
			return false
		else
			lib.dbg('located instance, sending command')
			if mq.msgsnd(tgt, &dem, sizeof(m.demand), 0) == -1 then
				lib.bail('could not send command to target process')
			end
			while true do
				var ack: m.ack
				if mq.msgrcv(self.msqid, &ack, sizeof(m.ack), 0, 0) == -1 then
					lib.bail('error while awaiting response')
				end
				if ack.cliq == tgt then
					lib.dbg('got response, writing out and returning')
					@result = ack
					break
				else lib.warn('got spurious response, ignoring') end
			end
		end
	end
	return true
end

terra m.demand:ack(emp: &m.emperor, a: &m.ack)
	a.clid = emp.clid
	a.cliq = emp.msqid
	mq.msgsnd(self.empq, a, sizeof(m.ack), 0)
end

m.global_emperor = global(m.emperor, `m.emperor {shmid = 0}, 'process-emperor') -- :/

terra m.emperor:release() if self.shmid == 0 then return end
	if self:countpeers() == 0 then
		-- we're the last user, go ahead and destroy it
		sem.semctl(self.edict.semid,0,ipc.IPC_RMID)
		shm.shmctl(self.shmid,ipc.IPC_RMID,nil)
	end
	shm.shmdt([&opaque](self.edict))
	mq.msgctl(self.msqid,ipc.IPC_RMID,nil)
	S.sigaction(S.SIGINT, &m.global_emperor.oldsig_int, nil)
	S.sigaction(S.SIGTERM, &m.global_emperor.oldsig_term, nil)
	self.shmid = 0
end

terra handle_interrupt(sig: int): {}
	m.global_emperor:release()
	lib.warn('caught interrupt, shutting down')
	m.proc_active = false
end

return m