parsav  Check-in [5a4f99fb55]

Overview
Comment:check in missing file
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA3-256: 5a4f99fb5583001945effbb48f4b3366b5c02642ee32e525645c375f8e3afcf6
User & Date: lexi on 2020-12-29 14:35:21
Other Links: manifest | tags
Context
2020-12-29
15:48
enable remote control of running instances check-in: f8816b0ab5 user: lexi tags: trunk
14:35
check in missing file check-in: 5a4f99fb55 user: lexi tags: trunk
14:35
add ipc backbone check-in: 87731d4007 user: lexi tags: trunk
Changes

Added ipc.t version [5ea8386d8f].

























































































































































































































































































































































































































































































































































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
-- vim: ft=terra

-- NOTA BENE: this is a hack to get around the
-- weaknesses of the terra macro parser. it is
-- highly undesirable and should be replaced if
-- suitable IPC primitives become available
local S = terralib.includecstring [[
	#include <signal.h>
	int _min(void) { return SIGRTMIN; }
	int _max(void) { return SIGRTMAX; }
	typedef struct sigaction _sig_spec;
]]--terra cannot access structures when they have the same name as a function

local m = {
	signals = {
		-- rtsigs
		notify_state_change = 0x3;
		notify_cond_update = 0x4;
		-- values sent through rtsig
		state_success = 0x69;
		state_fail_find = 0x93;

		-- system sigs
		sys_child = S.SIGCHLD;
	};
	cmd = lib.enum {
		'none', 'enumerate', 'stop', 'cfgrefresh', 'chnoise'
	};
	proc_active = global(true);
	ipckey = 0x8e037c6;
}

terra m.signum(reqsig: int)
	var sig = S._max() - reqsig
	-- we implement this in contradiction to the recommendations of the manpages
	-- because they make no goddamn sense. if SIGRTMIN can vary, using as a basis
	-- for signals is really rather stupid -- there's no guarantee a signal sent
	-- by one process will be interpreted correctly by another, as its SIGRTMIN
	-- might well be different! so we use SIGRTMAX as the base instead, assuming
	-- that it's more likely to remain constant across the whole system.
	if reqsig > S._max() or sig < S._min() then lib.bail('realtime signal error - requested signal number is greater than the available number of realtime signals') end
	return sig
end

terra m.sigdec(rtsig: int)
-- exactly like the above, except without bounds checking
	return S._max() - rtsig
end

local getppid = terralib.externfunction('getppid', {} -> int);
local sigsend = terralib.externfunction('sigqueue', {int,int,intptr} -> int);
local sigfd = terralib.includec 'sys/signalfd.h';

terra m.notify_parent(sig: int)
	sigsend(getppid(), m.signum(m.signals.notify_state_change), sig)
end

struct m.listener { prevmask:S.sigset_t fd:int }

terra m.listener.methods.mk()
	var mask: S.sigset_t
	var l: m.listener
	S.sigemptyset(&mask)
	-- important to keep this in sync with any new signals we add
	S.sigaddset(&mask, m.signum(m.signals.notify_state_change))
	S.sigaddset(&mask, m.signum(m.signals.notify_cond_update))
	S.sigaddset(&mask, S.SIGCHLD)
	S.sigprocmask(S.SIG_BLOCK, &mask, &l.prevmask)
	l.fd = sigfd.signalfd(-1, &mask, 0)
	return l
end

terra m.listener:block()
	var si: sigfd.signalfd_siginfo
	lib.io.recv(self.fd, [rawstring](&si), sizeof([si.type]))
	var syssig = si.ssi_signo < S._min() or si.ssi_signo > S._max();
	return {
		from = si.ssi_pid;
		system = syssig;
		sig = lib.trn(syssig, si.ssi_signo, m.sigdec(si.ssi_signo));
		event = si.ssi_int;
		exit = si.ssi_status;
	}
end

terra m.listener:release()
	lib.io.close(self.fd)
	S.sigprocmask(S.SIG_SETMASK, &self.prevmask, nil)
end

local ipc = terralib.includec 'sys/ipc.h'
local shm = terralib.includec 'sys/shm.h'
local sem = terralib.includec 'sys/sem.h'
local mq = terralib.includec 'sys/msg.h'

struct m.ack {
	clid: uint64
	cliq: int
	success: bool
	union {
		iname: int8[64]
	}
}

struct m.demand {
	cmd: m.cmd.t
	empq: ipc.key_t -- ID of the current emperor's msg queue, chosen randomly
	operand: uint64 -- extra cmd parameter, where useful
}

struct m.edict {
	semid: int -- controls access to shared memory
	serial: uint64 -- starts at zero, no action taken until set to 1
	nextclient: uint64 -- starting at 1

	demand: m.demand
}

terra m.edict:sem(op: int)
	var sb = sem.sembuf {
		sem_num = 1;
		sem_op = op;
		sem_flg = 0;
	}
	return sem.semop(self.semid, &sb, 1)
end

struct m.emperor {
	shmid: int
	msqid: int -- my message queue
	clid: uint64 -- my client serial id
	lastcmd: uint64 -- when serial increases above this, execute the command
	edict: &m.edict
	client: bool
	oldsig_int: S._sig_spec
	oldsig_term: S._sig_spec
}


terra m.emperor:countpeers(): intptr
	var si: shm.shmid_ds
	shm.shmctl(self.shmid,ipc.IPC_STAT,&si)
	return si.shm_nattch - 1
end


local terra handle_interrupt :: int -> {}

terra m.emperor.methods.mk(client: bool)
	var em = m.emperor { lastcmd = 0, client = client }
	em.shmid = shm.shmget(m.ipckey, sizeof(m.emperor), 0);
	if em.shmid == -1 then -- needs to be created
		em.shmid = shm.shmget(m.ipckey, sizeof(m.emperor), ipc.IPC_CREAT or ipc.IPC_EXCL or 0600);
		if em.shmid == -1 then lib.bail('could not get shared memory segment; may have clashed with simultaneously starting process, if any') end
		em.edict = [&m.edict](shm.shmat(em.shmid, nil, 0))
		em.edict.semid = sem.semget([ipc.key_t](0), 1, 0600)
	else
		em.edict = [&m.edict](shm.shmat(em.shmid, nil, 0))
	end
	if [intptr](em.edict) == [intptr](-1) then lib.bail('could not attach shared memory segment') end
	if client then
		em.edict:sem(0) -- wait our turn
		em.edict:sem(1) -- lock edict
		if em.edict.nextclient == 0 then em.edict.nextclient = 1 end
		em.clid = em.edict.nextclient 
		em.edict.nextclient = em.clid + 1
		em.edict:sem(-1) -- release edict
	end
--	while true do
--		var id = lib.crypt.random(uint64) % [ipc.key_t:max()]
--		em.msqid = mq.msgget([ipc.key_t](id),ipc.IPC_CREAT or ipc.IPC_EXCL)
--		if em.msqid ~= -1 then break end
--	end
	em.msqid = mq.msgget([ipc.key_t](0), 0600)
	var empty: S.sigset_t
	S.sigemptyset(&empty) -- 🙄
	var ss = S._sig_spec { sa_mask = empty, sa_flags = 0 }
	ss.__sigaction_handler.sa_handler = handle_interrupt; -- may not be portable
	S.sigaction(S.SIGINT, &ss, &em.oldsig_int)
	S.sigaction(S.SIGTERM, &ss, &em.oldsig_term)
	return em
end

terra m.emperor:poll(cmd: &m.demand) if self.shmid == 0 then return end
	if self.edict.serial > self.lastcmd then
		self.lastcmd = self.edict.serial
		@cmd = self.edict.demand
		self.edict:sem(-1) -- release my lock
		return
	end

	if mq.msgrcv(self.msqid, cmd, sizeof(m.demand), 0, ipc.IPC_NOWAIT) ~= -1 then
		return
	end

	cmd.cmd = m.cmd.none
end

terra m.emperor:mallack() return lib.mem.heapa(m.ack, self:countpeers()) end
terra m.emperor.methods.decree :: {
	&m.emperor, uint64, rawstring, m.cmd.t, uint64, &m.ack
} -> {}
terra m.emperor:decree(
	tgtclid: uint64, tgtname: rawstring,
	cmd: m.cmd.t, operand: uint64,
	result: &m.ack
): {}
	if self.client then lib.bail('client attempted to issue IPC decree') end
	var dem = m.demand {
		cmd = cmd;
		operand = operand;
		empq = self.msqid; -- register to receive replies
	}
	var npeers = self:countpeers()
	if npeers == 0 then lib.bail('no processes connected to control bus') end
	if tgtclid == 0 and tgtname == nil then
		lib.dbg('sending to all instances, waiting for edict to become writable')
		self.edict:sem(0) -- wait for all locks on edict to resolve
		lib.dbg('locking edict')
		self.edict:sem(npeers) -- place a read lock for each peer
		self.edict.demand = dem
		lib.dbg('sending edict')
		self.edict.serial = self.edict.serial + 1 -- send command

		var acks: intptr = 0
		while acks < npeers do
			lib.dbg('awaiting responses from instances')
			if mq.msgrcv(self.msqid, result + acks, sizeof(m.ack), 0, 0) == -1 then
				lib.bail('error occurred while waiting for responses from instances')
			end
			acks = acks + 1
			-- TODO: handle rare case that a process dies here without releasing
			-- its semaphore read lock, desyncing and paralyzing IPC
		end
		lib.dbg('all instances have reported in')
	else
		-- TODO: get msgqueue and send edict
		var acks = lib.mem.heapa(m.ack, npeers)
		self:decree(0, nil, m.cmd.enumerate, 0, &acks(0))
		var tgt: int
		var found = false
		for i=0,npeers do
			if acks(i).clid == tgtclid or (
				tgtname ~= nil and lib.str.cmp(tgtname, acks(i).iname) == 0
			) then
				found = true
				tgt = acks(i).cliq
			end
		end
		acks:free()
		if not found then
			lib.warn('no such instance is currently online and responding to IPC calls')
		else
			lib.dbg('located instance, sending command')
			if mq.msgsnd(tgt, &dem, sizeof(m.demand), 0) == -1 then
				lib.bail('could not send command to target process')
			end
			while true do
				var ack: m.ack
				if mq.msgrcv(self.msqid, &ack, sizeof(m.ack), 0, 0) == -1 then
					lib.bail('error while awaiting response')
				end
				if ack.cliq == tgt then
					lib.dbg('got response, writing out and returning')
					@result = ack
					break
				else lib.warn('got spurious response, ignoring') end
			end
		end
	end
end

terra m.demand:ack(emp: &m.emperor, a: &m.ack)
	a.clid = emp.clid
	a.cliq = emp.msqid
	mq.msgsnd(self.empq, a, sizeof(m.ack), 0)
end

m.global_emperor = global(m.emperor, `m.emperor {shmid = 0}, 'process-emperor') -- :/

terra m.emperor:release() if self.shmid == 0 then return end
	if self:countpeers() == 0 then
		-- we're the last user, go ahead and destroy it
		sem.semctl(self.edict.semid,0,ipc.IPC_RMID)
		shm.shmctl(self.shmid,ipc.IPC_RMID,nil)
	end
	shm.shmdt([&opaque](self.edict))
	mq.msgctl(self.msqid,ipc.IPC_RMID,nil)
	S.sigaction(S.SIGINT, &m.global_emperor.oldsig_int, nil)
	S.sigaction(S.SIGTERM, &m.global_emperor.oldsig_term, nil)
	self.shmid = 0
end

terra handle_interrupt(sig: int): {}
	m.global_emperor:release()
	lib.warn('caught interrupt, shutting down')
	m.proc_active = false
end

return m