1// SPDX-License-Identifier: GPL-3.0-or-later
2
3#include "mos/ipc/ipc.hpp"
4
5#include "mos/filesystem/dentry.hpp"
6#include "mos/filesystem/inode.hpp"
7#include "mos/filesystem/sysfs/sysfs.hpp"
8#include "mos/filesystem/sysfs/sysfs_autoinit.hpp"
9#include "mos/filesystem/vfs_types.hpp"
10#include "mos/filesystem/vfs_utils.hpp"
11#include "mos/ipc/pipe.hpp"
12#include "mos/lib/sync/spinlock.hpp"
13#include "mos/platform/platform.hpp"
14#include "mos/tasks/schedule.hpp"
15#include "mos/tasks/signal.hpp"
16#include "mos/tasks/wait.hpp"
17
18#include <mos/allocator.hpp>
19#include <mos/filesystem/fs_types.h>
20#include <mos/lib/structures/hashmap_common.hpp>
21#include <mos/lib/structures/list.hpp>
22#include <mos/mos_global.h>
23#include <mos_stdlib.hpp>
24#include <mos_string.hpp>
25
26#define IPC_SERVER_MAGIC MOS_FOURCC('I', 'P', 'C', 'S')
27
28struct IpcDescriptor final : mos::NamedType<"IPC.Descriptor">
29{
30 as_linked_list; ///< attached to either pending or established list
31 const mos::string server_name;
32 size_t buffer_size_npages;
33
34 waitlist_t client_waitlist; ///< client waits here for the server to accept the connection
35
36 union
37 {
38 pipe_t *client_write_pipe;
39 pipe_t *server_read_pipe;
40 };
41
42 union
43 {
44 pipe_t *server_write_pipe;
45 pipe_t *client_read_pipe;
46 };
47
48 IpcDescriptor(mos::string_view name, size_t buffer_size) : server_name(name), buffer_size_npages(buffer_size / MOS_PAGE_SIZE)
49 {
50 }
51};
52
53struct IPCServer final : public mos::NamedType<"IPCServer">
54{
55 as_linked_list;
56 const mos::string name;
57 spinlock_t lock;
58 inode_t *sysfs_ino; ///< inode for sysfs
59 size_t pending_max, pending_n;
60 size_t established_n;
61 list_head pending; ///< list of IPCDescriptor
62
63 waitlist_t server_waitlist; ///< wake up the server here when a client connects
64
65 void *key() const
66 {
67 return (void *) name.c_str();
68 }
69
70 IPCServer(mos::string_view name, size_t pending_max) : name(name), pending_max(pending_max)
71 {
72 linked_list_init(list_node(this));
73 linked_list_init(head_node: &pending);
74 waitlist_init(list: &server_waitlist);
75 }
76
77 ~IPCServer()
78 {
79 spinlock_release(&lock);
80 }
81};
82
83static list_head ipc_servers;
84static hashmap_t name_waitlist; ///< waitlist for an IPC server, key = name, value = waitlist_t *
85static spinlock_t ipc_lock; ///< protects ipc_servers and name_waitlist
86
87void ipc_server_close(IPCServer *server)
88{
89 spinlock_acquire(&ipc_lock);
90 spinlock_acquire(&server->lock);
91 // remove the server from the list
92 list_remove(server);
93
94 waitlist_t *waitlist = (waitlist_t *) hashmap_get(map: &name_waitlist, key: (ptr_t) server->key());
95 if (waitlist)
96 {
97 // the waitlist should have been closed when the server was created
98 // and no one should be waiting on it
99 spinlock_acquire(&waitlist->lock);
100 MOS_ASSERT(waitlist->closed);
101 MOS_ASSERT(list_is_empty(&waitlist->list));
102 spinlock_release(&waitlist->lock);
103 waitlist_init(list: waitlist); // reuse the waitlist
104 }
105 spinlock_release(&ipc_lock);
106
107 // with the server lock held, we reject all pending connections
108 list_foreach(IpcDescriptor, ipc, server->pending)
109 {
110 ipc->buffer_size_npages = 0; // mark the connection as closed
111 // wake up the client
112 waitlist_close(list: &ipc->client_waitlist);
113 waitlist_wake_all(&ipc->client_waitlist);
114 }
115
116 server->pending_max = 0;
117 waitlist_close(list: &server->server_waitlist); // close the server's waitlist
118 int n = waitlist_wake_all(&server->server_waitlist); // wake up the server, if it is waiting
119
120 if (n)
121 {
122 spinlock_release(&server->lock);
123 // let the accept() syscall free the server
124 }
125 else
126 {
127 dentry_t *ipc_get_sysfs_dir(void);
128 // now we can free the server
129 const auto dparent = ipc_get_sysfs_dir();
130
131 const auto dentry = dentry_get_from_parent(sb: server->sysfs_ino->superblock, parent: dparent, name: server->name);
132 if (dentry->inode == nullptr)
133 dentry_attach(d: dentry, inode: server->sysfs_ino); // fixup, as lookup may not have been called
134 inode_unlink(dir: server->sysfs_ino, dentry);
135 dentry_unref(dentry); // it won't release dentry because dentry->inode is still valid
136 dentry_detach(dentry);
137 dentry_try_release(dentry);
138 server->sysfs_ino = NULL;
139 delete server;
140 }
141}
142
143size_t ipc_client_read(IpcDescriptor *ipc, void *buf, size_t size)
144{
145 return pipe_read(pipe: ipc->client_read_pipe, buf, size);
146}
147
148size_t ipc_client_write(IpcDescriptor *ipc, const void *buf, size_t size)
149{
150 return pipe_write(pipe: ipc->client_write_pipe, buf, size);
151}
152
153size_t ipc_server_read(IpcDescriptor *ipc, void *buf, size_t size)
154{
155 return pipe_read(pipe: ipc->server_read_pipe, buf, size);
156}
157
158size_t ipc_server_write(IpcDescriptor *ipc, const void *buf, size_t size)
159{
160 return pipe_write(pipe: ipc->server_write_pipe, buf, size);
161}
162
163void ipc_client_close_channel(IpcDescriptor *ipc)
164{
165 bool r_fullyclosed = pipe_close_one_end(pipe: ipc->client_read_pipe);
166 bool w_fullyclosed = pipe_close_one_end(pipe: ipc->client_write_pipe);
167 MOS_ASSERT(r_fullyclosed == w_fullyclosed); // both ends should have the same return value
168
169 if (r_fullyclosed || w_fullyclosed)
170 {
171 // now we can free the ipc
172 delete ipc;
173 return;
174 }
175}
176
177void ipc_server_close_channel(IpcDescriptor *ipc)
178{
179 bool r_fullyclosed = pipe_close_one_end(pipe: ipc->server_read_pipe);
180 bool w_fullyclosed = pipe_close_one_end(pipe: ipc->server_write_pipe);
181 MOS_ASSERT(r_fullyclosed == w_fullyclosed); // both ends should have the same return value
182
183 if (r_fullyclosed || w_fullyclosed)
184 {
185 // now we can free the ipc
186 delete ipc;
187 return;
188 }
189}
190
191void ipc_init(void)
192{
193 hashmap_init(map: &name_waitlist, capacity: 128, hash_func: hashmap_hash_string, compare_func: hashmap_compare_string);
194}
195
196static inode_t *ipc_sysfs_create_ino(IPCServer *ipc_server);
197
198PtrResult<IPCServer> ipc_server_create(mos::string_view name, size_t max_pending)
199{
200 dInfo<ipc> << "creating ipc server '" << name << "' with max_pending=" << max_pending;
201 const auto guard = ipc_lock.lock();
202 list_foreach(IPCServer, server, ipc_servers)
203 {
204 if (server->name == name)
205 {
206 dWarn<ipc> << "ipc server '" << name << "' already exists";
207 return -EEXIST;
208 }
209 }
210
211 // we don't need to acquire the lock here because the server is not yet announced
212 const auto server = mos::create<IPCServer>(args&: name, args&: max_pending);
213
214 // now announce the server
215 list_node_append(head: &ipc_servers, list_node(server));
216 ipc_sysfs_create_ino(ipc_server: server);
217
218 // check and see if there is a waitlist for this name
219 // if so, wake up all waiters
220 waitlist_t *waitlist = (waitlist_t *) hashmap_get(map: &name_waitlist, key: (ptr_t) server->key());
221 if (waitlist)
222 {
223 dInfo<ipc> << "found waitlist for ipc server '" << name << "'";
224 // wake up all waiters
225 waitlist_close(list: waitlist);
226 const size_t n = waitlist_wake_all(waitlist);
227 if (n)
228 dInfo<ipc> << "woken up " << n << " waiters for ipc server '" << name << "'";
229 }
230
231 return server;
232}
233
234PtrResult<IPCServer> ipc_get_server(mos::string_view name)
235{
236 const auto guard = ipc_lock.lock();
237 list_foreach(IPCServer, server, ipc_servers)
238 {
239 if (server->name == name)
240 return server;
241 }
242
243 return -ENOENT;
244}
245
246PtrResult<IpcDescriptor> ipc_server_accept(IPCServer *ipc_server)
247{
248 dInfo<ipc> << "accepting connection on ipc server '" << ipc_server->name << "'...";
249
250retry_accept:
251 spinlock_acquire(&ipc_server->lock);
252
253 // check if the server is closed
254 if (ipc_server->pending_max == 0)
255 {
256 // now we can free the server
257 dInfo<ipc> << "ipc server '" << ipc_server->name << "' is closed, aborting accept()";
258 delete ipc_server;
259 return -ECONNABORTED;
260 }
261
262 if (ipc_server->pending_n == 0)
263 {
264 // no pending connections, wait for a client to connect
265 dInfo<ipc> << "no pending connections, waiting for a client to connect...";
266 MOS_ASSERT(waitlist_append(&ipc_server->server_waitlist));
267 spinlock_release(&ipc_server->lock);
268 blocked_reschedule();
269
270 if (signal_has_pending())
271 {
272 dInfo<ipc> << "woken up by a signal, aborting accept()";
273 waitlist_remove_me(waitlist: &ipc_server->server_waitlist);
274 return -EINTR;
275 }
276
277 goto retry_accept; // the server has woken us up, try again
278 }
279
280 // get the first pending connection
281 MOS_ASSERT(!list_is_empty(&ipc_server->pending));
282 IpcDescriptor *desc = list_node_next_entry(&ipc_server->pending, IpcDescriptor);
283 list_remove(desc);
284 ipc_server->pending_n--;
285 spinlock_release(&ipc_server->lock);
286
287 MOS_ASSERT(desc->buffer_size_npages > 0);
288 dInfo<ipc> << "accepted a connection on ipc server '" << ipc_server->name << "' with buffer_size_npages=" << desc->buffer_size_npages;
289
290 // setup the pipes
291 auto readPipe = pipe_create(bufsize: desc->buffer_size_npages);
292 if (readPipe.isErr())
293 {
294 dWarn<ipc> << "failed to create read pipe";
295 // TODO: cleanup
296 return readPipe.getErr();
297 }
298 desc->server_read_pipe = readPipe.get();
299
300 auto writePipe = pipe_create(bufsize: desc->buffer_size_npages);
301 if (writePipe.isErr())
302 {
303 dWarn<ipc> << "failed to create write pipe";
304 // TODO: cleanup
305 return writePipe.getErr();
306 }
307 desc->server_write_pipe = writePipe.get();
308
309 // wake up the client
310 waitlist_wake_all(&desc->client_waitlist);
311
312 return desc;
313}
314
315PtrResult<IpcDescriptor> ipc_connect_to_server(mos::string_view name, size_t buffer_size)
316{
317 if (buffer_size == 0)
318 return -EINVAL; // buffer size must be > 0
319
320 dInfo<ipc> << "connecting to ipc server '" << name << "' with buffer_size=" << buffer_size;
321 buffer_size = ALIGN_UP_TO_PAGE(buffer_size);
322
323check_server:
324 // check if the server exists
325 spinlock_acquire(&ipc_lock);
326 IPCServer *ipc_server = NULL;
327 list_foreach(IPCServer, server, ipc_servers)
328 {
329 if (server->name == name)
330 {
331 ipc_server = server;
332 // we are holding the ipc_servers_lock, so that the server won't deannounce itself
333 // while we are checking the server list, thus the server won't be freed
334 spinlock_acquire(&ipc_server->lock);
335 dInfo<ipc> << "found ipc server '" << ipc_server->name << "'";
336 break;
337 }
338 }
339
340 // now we have a server, we can create the connection
341 const auto descriptor = mos::create<IpcDescriptor>(args&: name, args&: buffer_size);
342
343 if (!ipc_server)
344 {
345 // no server found, wait for it to be created
346 waitlist_t *waitlist = (waitlist_t *) hashmap_get(map: &name_waitlist, key: (ptr_t) name.data());
347 if (!waitlist)
348 {
349 waitlist = mos::create<waitlist_t>();
350 waitlist_init(list: waitlist);
351 // the key must be in kernel memory
352 const auto old = (waitlist_t *) hashmap_put(map: &name_waitlist, key: (ptr_t) descriptor->server_name.c_str(), value: waitlist);
353 if (old)
354 {
355 // someone else has created the waitlist, but now we have replaced it
356 // so we have to append the old waitlist to the new one
357 list_foreach(Thread, thread, old->list)
358 {
359 MOS_ASSERT(waitlist_append(waitlist));
360 }
361 }
362 dInfo<ipc> << "created waitlist for ipc server '" << name << "'";
363 }
364
365 dInfo<ipc> << "no ipc server '" << name << "' found, waiting for it to be created...";
366 MOS_ASSERT(waitlist_append(waitlist));
367 spinlock_release(&ipc_lock);
368 blocked_reschedule();
369
370 if (signal_has_pending())
371 {
372 dInfo<ipc> << "woken up by a signal, aborting connect()";
373 delete descriptor;
374 return -EINTR;
375 }
376
377 // now check if the server exists again
378 goto check_server;
379 }
380 spinlock_release(&ipc_lock);
381
382 // add the connection to the pending list
383 if (ipc_server->pending_n >= ipc_server->pending_max)
384 {
385 dWarn<ipc> << "ipc server '" << ipc_server->name << "' has reached its max pending connections, rejecting connection";
386 spinlock_release(&ipc_server->lock);
387 delete descriptor;
388 return -ECONNREFUSED;
389 }
390
391 list_node_append(head: &ipc_server->pending, list_node(descriptor)); // add to pending list
392 ipc_server->pending_n++;
393
394 // now wait for the server to accept the connection
395 MOS_ASSERT(waitlist_append(&descriptor->client_waitlist));
396 waitlist_wake(list: &ipc_server->server_waitlist, max_wakeups: 1);
397 spinlock_release(&ipc_server->lock); // now the server can do whatever it wants
398
399 blocked_reschedule();
400 // the server has woken us up and has accepted the connection, or it is closed
401 dInfo<ipc> << "ipc server '" << ipc_server->name << "' woke us up";
402
403 // check if the server has closed
404 if (descriptor->buffer_size_npages == 0)
405 {
406 // the server is closed, don't touch ipc_server pointer anymore
407 dWarn<ipc> << "ipc server '" << ipc_server->name << "' has closed";
408 ipc_server = NULL;
409 delete descriptor;
410 return -ECONNREFUSED;
411 }
412
413 // now we have a connection, both the read and write pipes are ready, the io object is also ready
414 // we just need to return the io object
415 dInfo<ipc> << "ipc server '" << ipc_server->name << "' has accepted the connection";
416 return descriptor;
417}
418
419// ! sysfs support
420
421static bool ipc_sysfs_servers(sysfs_file_t *f)
422{
423 sysfs_printf(file: f, fmt: "%-40s\t%s\n", "Server Name", "Max Pending Connections");
424 list_foreach(IPCServer, ipc, ipc_servers)
425 {
426 sysfs_printf(file: f, fmt: "%-40s\t%zu\n", ipc->name.c_str(), ipc->pending_max);
427 }
428
429 return true;
430}
431
432static inode_t *ipc_sysfs_create_ino(IPCServer *ipc_server)
433{
434 ipc_server->sysfs_ino = sysfs_create_inode(type: FILE_TYPE_CHAR_DEVICE, data: ipc_server);
435 ipc_server->sysfs_ino->perm = PERM_OWNER & (PERM_READ | PERM_WRITE);
436 ipc_server->sysfs_ino->file_ops = &ipc_sysfs_file_ops;
437 return ipc_server->sysfs_ino;
438}
439
440static void ipc_sysfs_list_ipcs(sysfs_item_t *item, dentry_t *d, vfs_listdir_state_t *state, dentry_iterator_op add_record)
441{
442 MOS_UNUSED(item);
443 MOS_UNUSED(d);
444
445 list_foreach(IPCServer, ipc_server, ipc_servers)
446 {
447 MOS_ASSERT(ipc_server->sysfs_ino);
448 add_record(state, ipc_server->sysfs_ino->ino, ipc_server->name, ipc_server->sysfs_ino->type);
449 }
450}
451
452static bool ipc_sysfs_lookup_ipc(inode_t *parent_dir, dentry_t *dentry)
453{
454 MOS_UNUSED(parent_dir);
455
456 const auto name = dentry->name;
457 IPCServer *ipc_server = NULL;
458 list_foreach(IPCServer, ipc, ipc_servers)
459 {
460 if (ipc->name == name)
461 {
462 ipc_server = ipc;
463 break;
464 }
465 }
466
467 if (ipc_server == NULL)
468 return false;
469
470 dentry_attach(d: dentry, inode: ipc_server->sysfs_ino);
471 return dentry->inode != NULL;
472}
473
474static bool ipc_sysfs_create_server(inode_t *dir, dentry_t *dentry, file_type_t type, file_perm_t perm)
475{
476 MOS_UNUSED(dir);
477 MOS_UNUSED(perm);
478
479 if (type != FILE_TYPE_REGULAR)
480 return false;
481
482 auto ipc_server = ipc_server_create(name: dentry->name, max_pending: 1);
483 if (ipc_server.isErr())
484 return false;
485
486 dentry_attach(d: dentry, inode: ipc_server->sysfs_ino);
487 return true;
488}
489
490static bool ipc_dump_name_waitlist(uintn key, void *value, void *data)
491{
492 MOS_UNUSED(key);
493 waitlist_t *waitlist = (waitlist_t *) value;
494 sysfs_file_t *f = (sysfs_file_t *) data;
495
496 const auto guard = waitlist->lock.lock();
497 sysfs_printf(file: f, fmt: "%s\t%s:\n", (const char *) key, waitlist->closed ? "closed" : "open");
498 list_foreach(Thread, thread, waitlist->list)
499 {
500 sysfs_printf(file: f, fmt: "\t%s\n", thread->name.c_str());
501 }
502
503 return true;
504}
505
506static bool ipc_sysfs_dump_name_waitlist(sysfs_file_t *f)
507{
508 sysfs_printf(file: f, fmt: "%-20s\t%s\n", "IPC Name", "Status");
509 const auto guard = ipc_lock.lock();
510 hashmap_foreach(map: &name_waitlist, func: ipc_dump_name_waitlist, data: f);
511 return true;
512}
513
514static sysfs_item_t ipc_sysfs_items[] = {
515 SYSFS_RO_ITEM("servers", ipc_sysfs_servers),
516 SYSFS_DYN_DIR("ipcs", ipc_sysfs_list_ipcs, ipc_sysfs_lookup_ipc, ipc_sysfs_create_server),
517 SYSFS_RO_ITEM("name_waitlist", ipc_sysfs_dump_name_waitlist),
518};
519
520SYSFS_AUTOREGISTER(ipc, ipc_sysfs_items);
521
522dentry_t *ipc_get_sysfs_dir()
523{
524 return __sysfs_ipc._dentry;
525}
526