1 | // SPDX-License-Identifier: GPL-3.0-or-later |
2 | |
3 | #define pr_fmt(fmt) "ipc: " fmt |
4 | |
5 | #include "mos/ipc/ipc.hpp" |
6 | |
7 | #include "mos/filesystem/dentry.hpp" |
8 | #include "mos/filesystem/sysfs/sysfs.hpp" |
9 | #include "mos/filesystem/sysfs/sysfs_autoinit.hpp" |
10 | #include "mos/filesystem/vfs_types.hpp" |
11 | #include "mos/ipc/pipe.hpp" |
12 | #include "mos/lib/sync/spinlock.hpp" |
13 | #include "mos/platform/platform.hpp" |
14 | #include "mos/syslog/printk.hpp" |
15 | #include "mos/tasks/schedule.hpp" |
16 | #include "mos/tasks/signal.hpp" |
17 | #include "mos/tasks/wait.hpp" |
18 | |
19 | #include <mos/allocator.hpp> |
20 | #include <mos/filesystem/fs_types.h> |
21 | #include <mos/lib/structures/hashmap_common.hpp> |
22 | #include <mos/lib/structures/list.hpp> |
23 | #include <mos/mos_global.h> |
24 | #include <mos_stdlib.hpp> |
25 | #include <mos_string.hpp> |
26 | |
27 | #define IPC_SERVER_MAGIC MOS_FOURCC('I', 'P', 'C', 'S') |
28 | |
29 | struct IPCDescriptor final : mos::NamedType<"IPC.Descriptor" > |
30 | { |
31 | as_linked_list; ///< attached to either pending or established list |
32 | const mos::string server_name; |
33 | size_t buffer_size_npages; |
34 | |
35 | waitlist_t client_waitlist; ///< client waits here for the server to accept the connection |
36 | |
37 | union |
38 | { |
39 | pipe_t *client_write_pipe; |
40 | pipe_t *server_read_pipe; |
41 | }; |
42 | |
43 | union |
44 | { |
45 | pipe_t *server_write_pipe; |
46 | pipe_t *client_read_pipe; |
47 | }; |
48 | |
49 | IPCDescriptor(mos::string_view name, size_t buffer_size) : server_name(name), buffer_size_npages(buffer_size / MOS_PAGE_SIZE) |
50 | { |
51 | } |
52 | }; |
53 | |
54 | struct IPCServer final : public mos::NamedType<"IPCServer" > |
55 | { |
56 | as_linked_list; |
57 | const mos::string name; |
58 | spinlock_t lock; |
59 | inode_t *sysfs_ino; ///< inode for sysfs |
60 | size_t pending_max, pending_n; |
61 | size_t established_n; |
62 | list_head pending; ///< list of IPCDescriptor |
63 | |
64 | waitlist_t server_waitlist; ///< wake up the server here when a client connects |
65 | |
66 | void *key() const |
67 | { |
68 | return (void *) name.c_str(); |
69 | } |
70 | |
71 | IPCServer(mos::string_view name, size_t pending_max) : name(name), pending_max(pending_max) |
72 | { |
73 | linked_list_init(list_node(this)); |
74 | linked_list_init(head_node: &pending); |
75 | waitlist_init(list: &server_waitlist); |
76 | } |
77 | |
78 | ~IPCServer() |
79 | { |
80 | spinlock_release(&lock); |
81 | } |
82 | }; |
83 | |
84 | static list_head ipc_servers; |
85 | static hashmap_t name_waitlist; ///< waitlist for an IPC server, key = name, value = waitlist_t * |
86 | static spinlock_t ipc_lock; ///< protects ipc_servers and name_waitlist |
87 | |
88 | void ipc_server_close(IPCServer *server) |
89 | { |
90 | spinlock_acquire(&ipc_lock); |
91 | spinlock_acquire(&server->lock); |
92 | // remove the server from the list |
93 | list_remove(server); |
94 | |
95 | waitlist_t *waitlist = (waitlist_t *) hashmap_get(map: &name_waitlist, key: (ptr_t) server->key()); |
96 | if (waitlist) |
97 | { |
98 | // the waitlist should have been closed when the server was created |
99 | // and no one should be waiting on it |
100 | spinlock_acquire(&waitlist->lock); |
101 | MOS_ASSERT(waitlist->closed); |
102 | MOS_ASSERT(list_is_empty(&waitlist->list)); |
103 | spinlock_release(&waitlist->lock); |
104 | waitlist_init(list: waitlist); // reuse the waitlist |
105 | } |
106 | spinlock_release(&ipc_lock); |
107 | |
108 | // with the server lock held, we reject all pending connections |
109 | list_foreach(IPCDescriptor, ipc, server->pending) |
110 | { |
111 | ipc->buffer_size_npages = 0; // mark the connection as closed |
112 | // wake up the client |
113 | waitlist_close(list: &ipc->client_waitlist); |
114 | waitlist_wake_all(&ipc->client_waitlist); |
115 | } |
116 | |
117 | server->pending_max = 0; |
118 | waitlist_close(list: &server->server_waitlist); // close the server's waitlist |
119 | int n = waitlist_wake_all(&server->server_waitlist); // wake up the server, if it is waiting |
120 | |
121 | if (n) |
122 | { |
123 | spinlock_release(&server->lock); |
124 | // let the accept() syscall free the server |
125 | } |
126 | else |
127 | { |
128 | // now we can free the server |
129 | delete server; |
130 | } |
131 | } |
132 | |
133 | size_t ipc_client_read(IPCDescriptor *ipc, void *buf, size_t size) |
134 | { |
135 | return pipe_read(pipe: ipc->client_read_pipe, buf, size); |
136 | } |
137 | |
138 | size_t ipc_client_write(IPCDescriptor *ipc, const void *buf, size_t size) |
139 | { |
140 | return pipe_write(pipe: ipc->client_write_pipe, buf, size); |
141 | } |
142 | |
143 | size_t ipc_server_read(IPCDescriptor *ipc, void *buf, size_t size) |
144 | { |
145 | return pipe_read(pipe: ipc->server_read_pipe, buf, size); |
146 | } |
147 | |
148 | size_t ipc_server_write(IPCDescriptor *ipc, const void *buf, size_t size) |
149 | { |
150 | return pipe_write(pipe: ipc->server_write_pipe, buf, size); |
151 | } |
152 | |
153 | void ipc_client_close_channel(IPCDescriptor *ipc) |
154 | { |
155 | bool r_fullyclosed = pipe_close_one_end(pipe: ipc->client_read_pipe); |
156 | bool w_fullyclosed = pipe_close_one_end(pipe: ipc->client_write_pipe); |
157 | MOS_ASSERT(r_fullyclosed == w_fullyclosed); // both ends should have the same return value |
158 | |
159 | if (r_fullyclosed || w_fullyclosed) |
160 | { |
161 | // now we can free the ipc |
162 | delete ipc; |
163 | return; |
164 | } |
165 | } |
166 | |
167 | void ipc_server_close_channel(IPCDescriptor *ipc) |
168 | { |
169 | bool r_fullyclosed = pipe_close_one_end(pipe: ipc->server_read_pipe); |
170 | bool w_fullyclosed = pipe_close_one_end(pipe: ipc->server_write_pipe); |
171 | MOS_ASSERT(r_fullyclosed == w_fullyclosed); // both ends should have the same return value |
172 | |
173 | if (r_fullyclosed || w_fullyclosed) |
174 | { |
175 | // now we can free the ipc |
176 | delete ipc; |
177 | return; |
178 | } |
179 | } |
180 | |
181 | void ipc_init(void) |
182 | { |
183 | hashmap_init(map: &name_waitlist, capacity: 128, hash_func: hashmap_hash_string, compare_func: hashmap_compare_string); |
184 | } |
185 | |
186 | static inode_t *ipc_sysfs_create_ino(IPCServer *ipc_server); |
187 | |
188 | PtrResult<IPCServer> ipc_server_create(mos::string_view name, size_t max_pending) |
189 | { |
190 | pr_dinfo(ipc, "creating ipc server '%s' with max_pending=%zu" , name.data(), max_pending); |
191 | const auto guard = ipc_lock.lock(); |
192 | list_foreach(IPCServer, server, ipc_servers) |
193 | { |
194 | if (server->name == name) |
195 | { |
196 | pr_dwarn(ipc, "ipc server '%s' already exists" , name.data()); |
197 | return -EEXIST; |
198 | } |
199 | } |
200 | |
201 | // we don't need to acquire the lock here because the server is not yet announced |
202 | const auto server = mos::create<IPCServer>(args&: name, args&: max_pending); |
203 | |
204 | // now announce the server |
205 | list_node_append(head: &ipc_servers, list_node(server)); |
206 | ipc_sysfs_create_ino(ipc_server: server); |
207 | |
208 | // check and see if there is a waitlist for this name |
209 | // if so, wake up all waiters |
210 | waitlist_t *waitlist = (waitlist_t *) hashmap_get(map: &name_waitlist, key: (ptr_t) server->key()); |
211 | if (waitlist) |
212 | { |
213 | pr_dinfo2(ipc, "found waitlist for ipc server '%s'" , name.data()); |
214 | // wake up all waiters |
215 | waitlist_close(list: waitlist); |
216 | const size_t n = waitlist_wake_all(waitlist); |
217 | if (n) |
218 | pr_dinfo2(ipc, "woken up %zu waiters for ipc server '%s'" , n, name.data()); |
219 | } |
220 | |
221 | return server; |
222 | } |
223 | |
224 | PtrResult<IPCServer> ipc_get_server(mos::string_view name) |
225 | { |
226 | const auto guard = ipc_lock.lock(); |
227 | list_foreach(IPCServer, server, ipc_servers) |
228 | { |
229 | if (server->name == name) |
230 | return server; |
231 | } |
232 | |
233 | return -ENOENT; |
234 | } |
235 | |
236 | PtrResult<IPCDescriptor> ipc_server_accept(IPCServer *ipc_server) |
237 | { |
238 | pr_dinfo(ipc, "accepting connection on ipc server '%s'..." , ipc_server->name.c_str()); |
239 | |
240 | retry_accept: |
241 | spinlock_acquire(&ipc_server->lock); |
242 | |
243 | // check if the server is closed |
244 | if (ipc_server->pending_max == 0) |
245 | { |
246 | // now we can free the server |
247 | pr_dinfo2(ipc, "ipc server '%s' is closed, aborting accept()" , ipc_server->name.c_str()); |
248 | delete ipc_server; |
249 | return -ECONNABORTED; |
250 | } |
251 | |
252 | if (ipc_server->pending_n == 0) |
253 | { |
254 | // no pending connections, wait for a client to connect |
255 | pr_dinfo2(ipc, "no pending connections, waiting for a client to connect..." ); |
256 | MOS_ASSERT(waitlist_append(&ipc_server->server_waitlist)); |
257 | spinlock_release(&ipc_server->lock); |
258 | blocked_reschedule(); |
259 | |
260 | if (signal_has_pending()) |
261 | { |
262 | pr_dinfo2(ipc, "woken up by a signal, aborting accept()" ); |
263 | waitlist_remove_me(waitlist: &ipc_server->server_waitlist); |
264 | return -EINTR; |
265 | } |
266 | |
267 | goto retry_accept; // the server has woken us up, try again |
268 | } |
269 | |
270 | // get the first pending connection |
271 | MOS_ASSERT(!list_is_empty(&ipc_server->pending)); |
272 | IPCDescriptor *ipc = list_node_next_entry(&ipc_server->pending, IPCDescriptor); |
273 | list_remove(ipc); |
274 | ipc_server->pending_n--; |
275 | spinlock_release(&ipc_server->lock); |
276 | |
277 | MOS_ASSERT(ipc->buffer_size_npages > 0); |
278 | pr_dinfo(ipc, "accepted a connection on ipc server '%s' with buffer_size_npages=%zu" , ipc_server->name.c_str(), ipc->buffer_size_npages); |
279 | |
280 | // setup the pipes |
281 | auto readPipe = pipe_create(bufsize: ipc->buffer_size_npages); |
282 | if (readPipe.isErr()) |
283 | { |
284 | pr_dwarn(ipc, "failed to create read pipe" ); |
285 | // TODO: cleanup |
286 | return readPipe.getErr(); |
287 | } |
288 | ipc->server_read_pipe = readPipe.get(); |
289 | |
290 | auto writePipe = pipe_create(bufsize: ipc->buffer_size_npages); |
291 | if (writePipe.isErr()) |
292 | { |
293 | pr_dwarn(ipc, "failed to create write pipe" ); |
294 | // TODO: cleanup |
295 | return writePipe.getErr(); |
296 | } |
297 | ipc->server_write_pipe = writePipe.get(); |
298 | |
299 | // wake up the client |
300 | waitlist_wake_all(&ipc->client_waitlist); |
301 | |
302 | return ipc; |
303 | } |
304 | |
305 | PtrResult<IPCDescriptor> ipc_connect_to_server(mos::string_view name, size_t buffer_size) |
306 | { |
307 | if (buffer_size == 0) |
308 | return -EINVAL; // buffer size must be > 0 |
309 | |
310 | pr_dinfo(ipc, "connecting to ipc server '%s' with buffer_size=%zu" , name.data(), buffer_size); |
311 | buffer_size = ALIGN_UP_TO_PAGE(buffer_size); |
312 | |
313 | check_server: |
314 | // check if the server exists |
315 | spinlock_acquire(&ipc_lock); |
316 | IPCServer *ipc_server = NULL; |
317 | list_foreach(IPCServer, server, ipc_servers) |
318 | { |
319 | if (server->name == name) |
320 | { |
321 | ipc_server = server; |
322 | // we are holding the ipc_servers_lock, so that the server won't deannounce itself |
323 | // while we are checking the server list, thus the server won't be freed |
324 | spinlock_acquire(&ipc_server->lock); |
325 | pr_dinfo2(ipc, "found ipc server '%s'" , ipc_server->name.c_str()); |
326 | break; |
327 | } |
328 | } |
329 | |
330 | // now we have a server, we can create the connection |
331 | const auto descriptor = mos::create<IPCDescriptor>(args&: name, args&: buffer_size); |
332 | |
333 | if (!ipc_server) |
334 | { |
335 | // no server found, wait for it to be created |
336 | waitlist_t *waitlist = (waitlist_t *) hashmap_get(map: &name_waitlist, key: (ptr_t) name.data()); |
337 | if (!waitlist) |
338 | { |
339 | waitlist = mos::create<waitlist_t>(); |
340 | waitlist_init(list: waitlist); |
341 | // the key must be in kernel memory |
342 | const auto old = (waitlist_t *) hashmap_put(map: &name_waitlist, key: (ptr_t) descriptor->server_name.c_str(), value: waitlist); |
343 | if (old) |
344 | { |
345 | // someone else has created the waitlist, but now we have replaced it |
346 | // so we have to append the old waitlist to the new one |
347 | list_foreach(Thread, thread, old->list) |
348 | { |
349 | MOS_ASSERT(waitlist_append(waitlist)); |
350 | } |
351 | } |
352 | pr_dinfo2(ipc, "created waitlist for ipc server '%s'" , name.data()); |
353 | } |
354 | |
355 | pr_dinfo2(ipc, "no ipc server '%s' found, waiting for it to be created..." , name.data()); |
356 | MOS_ASSERT(waitlist_append(waitlist)); |
357 | spinlock_release(&ipc_lock); |
358 | blocked_reschedule(); |
359 | |
360 | if (signal_has_pending()) |
361 | { |
362 | pr_dinfo2(ipc, "woken up by a signal, aborting connect()" ); |
363 | delete descriptor; |
364 | return -EINTR; |
365 | } |
366 | |
367 | // now check if the server exists again |
368 | goto check_server; |
369 | } |
370 | spinlock_release(&ipc_lock); |
371 | |
372 | // add the connection to the pending list |
373 | if (ipc_server->pending_n >= ipc_server->pending_max) |
374 | { |
375 | pr_dwarn(ipc, "ipc server '%s' has reached its max pending connections, rejecting connection" , ipc_server->name.c_str()); |
376 | spinlock_release(&ipc_server->lock); |
377 | delete descriptor; |
378 | return -ECONNREFUSED; |
379 | } |
380 | |
381 | list_node_append(head: &ipc_server->pending, list_node(descriptor)); // add to pending list |
382 | ipc_server->pending_n++; |
383 | |
384 | // now wait for the server to accept the connection |
385 | MOS_ASSERT(waitlist_append(&descriptor->client_waitlist)); |
386 | waitlist_wake(list: &ipc_server->server_waitlist, max_wakeups: 1); |
387 | spinlock_release(&ipc_server->lock); // now the server can do whatever it wants |
388 | |
389 | blocked_reschedule(); |
390 | // the server has woken us up and has accepted the connection, or it is closed |
391 | pr_dinfo2(ipc, "ipc server '%s' woke us up" , ipc_server->name.c_str()); |
392 | |
393 | // check if the server has closed |
394 | if (descriptor->buffer_size_npages == 0) |
395 | { |
396 | // the server is closed, don't touch ipc_server pointer anymore |
397 | pr_dwarn(ipc, "ipc server '%s' has closed" , ipc_server->name.c_str()); |
398 | ipc_server = NULL; |
399 | delete descriptor; |
400 | return -ECONNREFUSED; |
401 | } |
402 | |
403 | // now we have a connection, both the read and write pipes are ready, the io object is also ready |
404 | // we just need to return the io object |
405 | pr_dinfo2(ipc, "ipc server '%s' has accepted the connection" , ipc_server->name.c_str()); |
406 | return descriptor; |
407 | } |
408 | |
409 | // ! sysfs support |
410 | |
411 | static bool ipc_sysfs_servers(sysfs_file_t *f) |
412 | { |
413 | sysfs_printf(file: f, fmt: "%-20s\t%s\n" , "Server Name" , "Max Pending Connections" ); |
414 | list_foreach(IPCServer, ipc, ipc_servers) |
415 | { |
416 | sysfs_printf(file: f, fmt: "%-20s\t%zu\n" , ipc->name.c_str(), ipc->pending_max); |
417 | } |
418 | |
419 | return true; |
420 | } |
421 | |
422 | static inode_t *ipc_sysfs_create_ino(IPCServer *ipc_server) |
423 | { |
424 | ipc_server->sysfs_ino = sysfs_create_inode(type: FILE_TYPE_CHAR_DEVICE, data: ipc_server); |
425 | ipc_server->sysfs_ino->perm = PERM_OWNER & (PERM_READ | PERM_WRITE); |
426 | ipc_server->sysfs_ino->file_ops = &ipc_sysfs_file_ops; |
427 | return ipc_server->sysfs_ino; |
428 | } |
429 | |
430 | static void ipc_sysfs_list_ipcs(sysfs_item_t *item, dentry_t *d, vfs_listdir_state_t *state, dentry_iterator_op add_record) |
431 | { |
432 | MOS_UNUSED(item); |
433 | MOS_UNUSED(d); |
434 | |
435 | list_foreach(IPCServer, ipc_server, ipc_servers) |
436 | { |
437 | MOS_ASSERT(ipc_server->sysfs_ino); |
438 | add_record(state, ipc_server->sysfs_ino->ino, ipc_server->name, ipc_server->sysfs_ino->type); |
439 | } |
440 | } |
441 | |
442 | static bool ipc_sysfs_lookup_ipc(inode_t *parent_dir, dentry_t *dentry) |
443 | { |
444 | MOS_UNUSED(parent_dir); |
445 | |
446 | const auto name = dentry->name; |
447 | IPCServer *ipc_server = NULL; |
448 | list_foreach(IPCServer, ipc, ipc_servers) |
449 | { |
450 | if (ipc->name == name) |
451 | { |
452 | ipc_server = ipc; |
453 | break; |
454 | } |
455 | } |
456 | |
457 | if (ipc_server == NULL) |
458 | return false; |
459 | |
460 | dentry_attach(d: dentry, inode: ipc_server->sysfs_ino); |
461 | return dentry->inode != NULL; |
462 | } |
463 | |
464 | static bool ipc_sysfs_create_server(inode_t *dir, dentry_t *dentry, file_type_t type, file_perm_t perm) |
465 | { |
466 | MOS_UNUSED(dir); |
467 | MOS_UNUSED(perm); |
468 | |
469 | if (type != FILE_TYPE_REGULAR) |
470 | return false; |
471 | |
472 | auto ipc_server = ipc_server_create(name: dentry->name, max_pending: 1); |
473 | if (ipc_server.isErr()) |
474 | return false; |
475 | |
476 | dentry_attach(d: dentry, inode: ipc_server->sysfs_ino); |
477 | return true; |
478 | } |
479 | |
480 | static bool ipc_dump_name_waitlist(uintn key, void *value, void *data) |
481 | { |
482 | MOS_UNUSED(key); |
483 | waitlist_t *waitlist = (waitlist_t *) value; |
484 | sysfs_file_t *f = (sysfs_file_t *) data; |
485 | |
486 | const auto guard = waitlist->lock.lock(); |
487 | sysfs_printf(file: f, fmt: "%s\t%s:\n" , (const char *) key, waitlist->closed ? "closed" : "open" ); |
488 | list_foreach(Thread, thread, waitlist->list) |
489 | { |
490 | sysfs_printf(file: f, fmt: "\t%s\n" , thread->name.c_str()); |
491 | } |
492 | |
493 | return true; |
494 | } |
495 | |
496 | static bool ipc_sysfs_dump_name_waitlist(sysfs_file_t *f) |
497 | { |
498 | sysfs_printf(file: f, fmt: "%-20s\t%s\n" , "IPC Name" , "Status" ); |
499 | const auto guard = ipc_lock.lock(); |
500 | hashmap_foreach(map: &name_waitlist, func: ipc_dump_name_waitlist, data: f); |
501 | return true; |
502 | } |
503 | |
504 | static sysfs_item_t ipc_sysfs_items[] = { |
505 | SYSFS_RO_ITEM("servers" , ipc_sysfs_servers), |
506 | SYSFS_DYN_DIR("ipcs" , ipc_sysfs_list_ipcs, ipc_sysfs_lookup_ipc, ipc_sysfs_create_server), |
507 | SYSFS_RO_ITEM("name_waitlist" , ipc_sysfs_dump_name_waitlist), |
508 | }; |
509 | |
510 | SYSFS_AUTOREGISTER(ipc, ipc_sysfs_items); |
511 | |