1// SPDX-License-Identifier: GPL-3.0-or-later
2
3#define pr_fmt(fmt) "ipc: " fmt
4
5#include "mos/ipc/ipc.hpp"
6
7#include "mos/filesystem/dentry.hpp"
8#include "mos/filesystem/sysfs/sysfs.hpp"
9#include "mos/filesystem/sysfs/sysfs_autoinit.hpp"
10#include "mos/filesystem/vfs_types.hpp"
11#include "mos/ipc/pipe.hpp"
12#include "mos/lib/sync/spinlock.hpp"
13#include "mos/platform/platform.hpp"
14#include "mos/syslog/printk.hpp"
15#include "mos/tasks/schedule.hpp"
16#include "mos/tasks/signal.hpp"
17#include "mos/tasks/wait.hpp"
18
19#include <mos/allocator.hpp>
20#include <mos/filesystem/fs_types.h>
21#include <mos/lib/structures/hashmap_common.hpp>
22#include <mos/lib/structures/list.hpp>
23#include <mos/mos_global.h>
24#include <mos_stdlib.hpp>
25#include <mos_string.hpp>
26
27#define IPC_SERVER_MAGIC MOS_FOURCC('I', 'P', 'C', 'S')
28
29struct IPCDescriptor final : mos::NamedType<"IPC.Descriptor">
30{
31 as_linked_list; ///< attached to either pending or established list
32 const mos::string server_name;
33 size_t buffer_size_npages;
34
35 waitlist_t client_waitlist; ///< client waits here for the server to accept the connection
36
37 union
38 {
39 pipe_t *client_write_pipe;
40 pipe_t *server_read_pipe;
41 };
42
43 union
44 {
45 pipe_t *server_write_pipe;
46 pipe_t *client_read_pipe;
47 };
48
49 IPCDescriptor(mos::string_view name, size_t buffer_size) : server_name(name), buffer_size_npages(buffer_size / MOS_PAGE_SIZE)
50 {
51 }
52};
53
54struct IPCServer final : public mos::NamedType<"IPCServer">
55{
56 as_linked_list;
57 const mos::string name;
58 spinlock_t lock;
59 inode_t *sysfs_ino; ///< inode for sysfs
60 size_t pending_max, pending_n;
61 size_t established_n;
62 list_head pending; ///< list of IPCDescriptor
63
64 waitlist_t server_waitlist; ///< wake up the server here when a client connects
65
66 void *key() const
67 {
68 return (void *) name.c_str();
69 }
70
71 IPCServer(mos::string_view name, size_t pending_max) : name(name), pending_max(pending_max)
72 {
73 linked_list_init(list_node(this));
74 linked_list_init(head_node: &pending);
75 waitlist_init(list: &server_waitlist);
76 }
77
78 ~IPCServer()
79 {
80 spinlock_release(&lock);
81 }
82};
83
84static list_head ipc_servers;
85static hashmap_t name_waitlist; ///< waitlist for an IPC server, key = name, value = waitlist_t *
86static spinlock_t ipc_lock; ///< protects ipc_servers and name_waitlist
87
88void ipc_server_close(IPCServer *server)
89{
90 spinlock_acquire(&ipc_lock);
91 spinlock_acquire(&server->lock);
92 // remove the server from the list
93 list_remove(server);
94
95 waitlist_t *waitlist = (waitlist_t *) hashmap_get(map: &name_waitlist, key: (ptr_t) server->key());
96 if (waitlist)
97 {
98 // the waitlist should have been closed when the server was created
99 // and no one should be waiting on it
100 spinlock_acquire(&waitlist->lock);
101 MOS_ASSERT(waitlist->closed);
102 MOS_ASSERT(list_is_empty(&waitlist->list));
103 spinlock_release(&waitlist->lock);
104 waitlist_init(list: waitlist); // reuse the waitlist
105 }
106 spinlock_release(&ipc_lock);
107
108 // with the server lock held, we reject all pending connections
109 list_foreach(IPCDescriptor, ipc, server->pending)
110 {
111 ipc->buffer_size_npages = 0; // mark the connection as closed
112 // wake up the client
113 waitlist_close(list: &ipc->client_waitlist);
114 waitlist_wake_all(&ipc->client_waitlist);
115 }
116
117 server->pending_max = 0;
118 waitlist_close(list: &server->server_waitlist); // close the server's waitlist
119 int n = waitlist_wake_all(&server->server_waitlist); // wake up the server, if it is waiting
120
121 if (n)
122 {
123 spinlock_release(&server->lock);
124 // let the accept() syscall free the server
125 }
126 else
127 {
128 // now we can free the server
129 delete server;
130 }
131}
132
133size_t ipc_client_read(IPCDescriptor *ipc, void *buf, size_t size)
134{
135 return pipe_read(pipe: ipc->client_read_pipe, buf, size);
136}
137
138size_t ipc_client_write(IPCDescriptor *ipc, const void *buf, size_t size)
139{
140 return pipe_write(pipe: ipc->client_write_pipe, buf, size);
141}
142
143size_t ipc_server_read(IPCDescriptor *ipc, void *buf, size_t size)
144{
145 return pipe_read(pipe: ipc->server_read_pipe, buf, size);
146}
147
148size_t ipc_server_write(IPCDescriptor *ipc, const void *buf, size_t size)
149{
150 return pipe_write(pipe: ipc->server_write_pipe, buf, size);
151}
152
153void ipc_client_close_channel(IPCDescriptor *ipc)
154{
155 bool r_fullyclosed = pipe_close_one_end(pipe: ipc->client_read_pipe);
156 bool w_fullyclosed = pipe_close_one_end(pipe: ipc->client_write_pipe);
157 MOS_ASSERT(r_fullyclosed == w_fullyclosed); // both ends should have the same return value
158
159 if (r_fullyclosed || w_fullyclosed)
160 {
161 // now we can free the ipc
162 delete ipc;
163 return;
164 }
165}
166
167void ipc_server_close_channel(IPCDescriptor *ipc)
168{
169 bool r_fullyclosed = pipe_close_one_end(pipe: ipc->server_read_pipe);
170 bool w_fullyclosed = pipe_close_one_end(pipe: ipc->server_write_pipe);
171 MOS_ASSERT(r_fullyclosed == w_fullyclosed); // both ends should have the same return value
172
173 if (r_fullyclosed || w_fullyclosed)
174 {
175 // now we can free the ipc
176 delete ipc;
177 return;
178 }
179}
180
181void ipc_init(void)
182{
183 hashmap_init(map: &name_waitlist, capacity: 128, hash_func: hashmap_hash_string, compare_func: hashmap_compare_string);
184}
185
186static inode_t *ipc_sysfs_create_ino(IPCServer *ipc_server);
187
188PtrResult<IPCServer> ipc_server_create(mos::string_view name, size_t max_pending)
189{
190 pr_dinfo(ipc, "creating ipc server '%s' with max_pending=%zu", name.data(), max_pending);
191 const auto guard = ipc_lock.lock();
192 list_foreach(IPCServer, server, ipc_servers)
193 {
194 if (server->name == name)
195 {
196 pr_dwarn(ipc, "ipc server '%s' already exists", name.data());
197 return -EEXIST;
198 }
199 }
200
201 // we don't need to acquire the lock here because the server is not yet announced
202 const auto server = mos::create<IPCServer>(args&: name, args&: max_pending);
203
204 // now announce the server
205 list_node_append(head: &ipc_servers, list_node(server));
206 ipc_sysfs_create_ino(ipc_server: server);
207
208 // check and see if there is a waitlist for this name
209 // if so, wake up all waiters
210 waitlist_t *waitlist = (waitlist_t *) hashmap_get(map: &name_waitlist, key: (ptr_t) server->key());
211 if (waitlist)
212 {
213 pr_dinfo2(ipc, "found waitlist for ipc server '%s'", name.data());
214 // wake up all waiters
215 waitlist_close(list: waitlist);
216 const size_t n = waitlist_wake_all(waitlist);
217 if (n)
218 pr_dinfo2(ipc, "woken up %zu waiters for ipc server '%s'", n, name.data());
219 }
220
221 return server;
222}
223
224PtrResult<IPCServer> ipc_get_server(mos::string_view name)
225{
226 const auto guard = ipc_lock.lock();
227 list_foreach(IPCServer, server, ipc_servers)
228 {
229 if (server->name == name)
230 return server;
231 }
232
233 return -ENOENT;
234}
235
236PtrResult<IPCDescriptor> ipc_server_accept(IPCServer *ipc_server)
237{
238 pr_dinfo(ipc, "accepting connection on ipc server '%s'...", ipc_server->name.c_str());
239
240retry_accept:
241 spinlock_acquire(&ipc_server->lock);
242
243 // check if the server is closed
244 if (ipc_server->pending_max == 0)
245 {
246 // now we can free the server
247 pr_dinfo2(ipc, "ipc server '%s' is closed, aborting accept()", ipc_server->name.c_str());
248 delete ipc_server;
249 return -ECONNABORTED;
250 }
251
252 if (ipc_server->pending_n == 0)
253 {
254 // no pending connections, wait for a client to connect
255 pr_dinfo2(ipc, "no pending connections, waiting for a client to connect...");
256 MOS_ASSERT(waitlist_append(&ipc_server->server_waitlist));
257 spinlock_release(&ipc_server->lock);
258 blocked_reschedule();
259
260 if (signal_has_pending())
261 {
262 pr_dinfo2(ipc, "woken up by a signal, aborting accept()");
263 waitlist_remove_me(waitlist: &ipc_server->server_waitlist);
264 return -EINTR;
265 }
266
267 goto retry_accept; // the server has woken us up, try again
268 }
269
270 // get the first pending connection
271 MOS_ASSERT(!list_is_empty(&ipc_server->pending));
272 IPCDescriptor *ipc = list_node_next_entry(&ipc_server->pending, IPCDescriptor);
273 list_remove(ipc);
274 ipc_server->pending_n--;
275 spinlock_release(&ipc_server->lock);
276
277 MOS_ASSERT(ipc->buffer_size_npages > 0);
278 pr_dinfo(ipc, "accepted a connection on ipc server '%s' with buffer_size_npages=%zu", ipc_server->name.c_str(), ipc->buffer_size_npages);
279
280 // setup the pipes
281 auto readPipe = pipe_create(bufsize: ipc->buffer_size_npages);
282 if (readPipe.isErr())
283 {
284 pr_dwarn(ipc, "failed to create read pipe");
285 // TODO: cleanup
286 return readPipe.getErr();
287 }
288 ipc->server_read_pipe = readPipe.get();
289
290 auto writePipe = pipe_create(bufsize: ipc->buffer_size_npages);
291 if (writePipe.isErr())
292 {
293 pr_dwarn(ipc, "failed to create write pipe");
294 // TODO: cleanup
295 return writePipe.getErr();
296 }
297 ipc->server_write_pipe = writePipe.get();
298
299 // wake up the client
300 waitlist_wake_all(&ipc->client_waitlist);
301
302 return ipc;
303}
304
305PtrResult<IPCDescriptor> ipc_connect_to_server(mos::string_view name, size_t buffer_size)
306{
307 if (buffer_size == 0)
308 return -EINVAL; // buffer size must be > 0
309
310 pr_dinfo(ipc, "connecting to ipc server '%s' with buffer_size=%zu", name.data(), buffer_size);
311 buffer_size = ALIGN_UP_TO_PAGE(buffer_size);
312
313check_server:
314 // check if the server exists
315 spinlock_acquire(&ipc_lock);
316 IPCServer *ipc_server = NULL;
317 list_foreach(IPCServer, server, ipc_servers)
318 {
319 if (server->name == name)
320 {
321 ipc_server = server;
322 // we are holding the ipc_servers_lock, so that the server won't deannounce itself
323 // while we are checking the server list, thus the server won't be freed
324 spinlock_acquire(&ipc_server->lock);
325 pr_dinfo2(ipc, "found ipc server '%s'", ipc_server->name.c_str());
326 break;
327 }
328 }
329
330 // now we have a server, we can create the connection
331 const auto descriptor = mos::create<IPCDescriptor>(args&: name, args&: buffer_size);
332
333 if (!ipc_server)
334 {
335 // no server found, wait for it to be created
336 waitlist_t *waitlist = (waitlist_t *) hashmap_get(map: &name_waitlist, key: (ptr_t) name.data());
337 if (!waitlist)
338 {
339 waitlist = mos::create<waitlist_t>();
340 waitlist_init(list: waitlist);
341 // the key must be in kernel memory
342 const auto old = (waitlist_t *) hashmap_put(map: &name_waitlist, key: (ptr_t) descriptor->server_name.c_str(), value: waitlist);
343 if (old)
344 {
345 // someone else has created the waitlist, but now we have replaced it
346 // so we have to append the old waitlist to the new one
347 list_foreach(Thread, thread, old->list)
348 {
349 MOS_ASSERT(waitlist_append(waitlist));
350 }
351 }
352 pr_dinfo2(ipc, "created waitlist for ipc server '%s'", name.data());
353 }
354
355 pr_dinfo2(ipc, "no ipc server '%s' found, waiting for it to be created...", name.data());
356 MOS_ASSERT(waitlist_append(waitlist));
357 spinlock_release(&ipc_lock);
358 blocked_reschedule();
359
360 if (signal_has_pending())
361 {
362 pr_dinfo2(ipc, "woken up by a signal, aborting connect()");
363 delete descriptor;
364 return -EINTR;
365 }
366
367 // now check if the server exists again
368 goto check_server;
369 }
370 spinlock_release(&ipc_lock);
371
372 // add the connection to the pending list
373 if (ipc_server->pending_n >= ipc_server->pending_max)
374 {
375 pr_dwarn(ipc, "ipc server '%s' has reached its max pending connections, rejecting connection", ipc_server->name.c_str());
376 spinlock_release(&ipc_server->lock);
377 delete descriptor;
378 return -ECONNREFUSED;
379 }
380
381 list_node_append(head: &ipc_server->pending, list_node(descriptor)); // add to pending list
382 ipc_server->pending_n++;
383
384 // now wait for the server to accept the connection
385 MOS_ASSERT(waitlist_append(&descriptor->client_waitlist));
386 waitlist_wake(list: &ipc_server->server_waitlist, max_wakeups: 1);
387 spinlock_release(&ipc_server->lock); // now the server can do whatever it wants
388
389 blocked_reschedule();
390 // the server has woken us up and has accepted the connection, or it is closed
391 pr_dinfo2(ipc, "ipc server '%s' woke us up", ipc_server->name.c_str());
392
393 // check if the server has closed
394 if (descriptor->buffer_size_npages == 0)
395 {
396 // the server is closed, don't touch ipc_server pointer anymore
397 pr_dwarn(ipc, "ipc server '%s' has closed", ipc_server->name.c_str());
398 ipc_server = NULL;
399 delete descriptor;
400 return -ECONNREFUSED;
401 }
402
403 // now we have a connection, both the read and write pipes are ready, the io object is also ready
404 // we just need to return the io object
405 pr_dinfo2(ipc, "ipc server '%s' has accepted the connection", ipc_server->name.c_str());
406 return descriptor;
407}
408
409// ! sysfs support
410
411static bool ipc_sysfs_servers(sysfs_file_t *f)
412{
413 sysfs_printf(file: f, fmt: "%-20s\t%s\n", "Server Name", "Max Pending Connections");
414 list_foreach(IPCServer, ipc, ipc_servers)
415 {
416 sysfs_printf(file: f, fmt: "%-20s\t%zu\n", ipc->name.c_str(), ipc->pending_max);
417 }
418
419 return true;
420}
421
422static inode_t *ipc_sysfs_create_ino(IPCServer *ipc_server)
423{
424 ipc_server->sysfs_ino = sysfs_create_inode(type: FILE_TYPE_CHAR_DEVICE, data: ipc_server);
425 ipc_server->sysfs_ino->perm = PERM_OWNER & (PERM_READ | PERM_WRITE);
426 ipc_server->sysfs_ino->file_ops = &ipc_sysfs_file_ops;
427 return ipc_server->sysfs_ino;
428}
429
430static void ipc_sysfs_list_ipcs(sysfs_item_t *item, dentry_t *d, vfs_listdir_state_t *state, dentry_iterator_op add_record)
431{
432 MOS_UNUSED(item);
433 MOS_UNUSED(d);
434
435 list_foreach(IPCServer, ipc_server, ipc_servers)
436 {
437 MOS_ASSERT(ipc_server->sysfs_ino);
438 add_record(state, ipc_server->sysfs_ino->ino, ipc_server->name, ipc_server->sysfs_ino->type);
439 }
440}
441
442static bool ipc_sysfs_lookup_ipc(inode_t *parent_dir, dentry_t *dentry)
443{
444 MOS_UNUSED(parent_dir);
445
446 const auto name = dentry->name;
447 IPCServer *ipc_server = NULL;
448 list_foreach(IPCServer, ipc, ipc_servers)
449 {
450 if (ipc->name == name)
451 {
452 ipc_server = ipc;
453 break;
454 }
455 }
456
457 if (ipc_server == NULL)
458 return false;
459
460 dentry_attach(d: dentry, inode: ipc_server->sysfs_ino);
461 return dentry->inode != NULL;
462}
463
464static bool ipc_sysfs_create_server(inode_t *dir, dentry_t *dentry, file_type_t type, file_perm_t perm)
465{
466 MOS_UNUSED(dir);
467 MOS_UNUSED(perm);
468
469 if (type != FILE_TYPE_REGULAR)
470 return false;
471
472 auto ipc_server = ipc_server_create(name: dentry->name, max_pending: 1);
473 if (ipc_server.isErr())
474 return false;
475
476 dentry_attach(d: dentry, inode: ipc_server->sysfs_ino);
477 return true;
478}
479
480static bool ipc_dump_name_waitlist(uintn key, void *value, void *data)
481{
482 MOS_UNUSED(key);
483 waitlist_t *waitlist = (waitlist_t *) value;
484 sysfs_file_t *f = (sysfs_file_t *) data;
485
486 const auto guard = waitlist->lock.lock();
487 sysfs_printf(file: f, fmt: "%s\t%s:\n", (const char *) key, waitlist->closed ? "closed" : "open");
488 list_foreach(Thread, thread, waitlist->list)
489 {
490 sysfs_printf(file: f, fmt: "\t%s\n", thread->name.c_str());
491 }
492
493 return true;
494}
495
496static bool ipc_sysfs_dump_name_waitlist(sysfs_file_t *f)
497{
498 sysfs_printf(file: f, fmt: "%-20s\t%s\n", "IPC Name", "Status");
499 const auto guard = ipc_lock.lock();
500 hashmap_foreach(map: &name_waitlist, func: ipc_dump_name_waitlist, data: f);
501 return true;
502}
503
504static sysfs_item_t ipc_sysfs_items[] = {
505 SYSFS_RO_ITEM("servers", ipc_sysfs_servers),
506 SYSFS_DYN_DIR("ipcs", ipc_sysfs_list_ipcs, ipc_sysfs_lookup_ipc, ipc_sysfs_create_server),
507 SYSFS_RO_ITEM("name_waitlist", ipc_sysfs_dump_name_waitlist),
508};
509
510SYSFS_AUTOREGISTER(ipc, ipc_sysfs_items);
511