1// SPDX-License-Identifier: GPL-3.0-or-later
2
3#define pr_fmt(fmt) "ipc: " fmt
4
5#include "mos/ipc/ipc.h"
6
7#include "mos/filesystem/dentry.h"
8#include "mos/filesystem/sysfs/sysfs.h"
9#include "mos/filesystem/sysfs/sysfs_autoinit.h"
10#include "mos/filesystem/vfs_types.h"
11#include "mos/ipc/pipe.h"
12#include "mos/mm/slab.h"
13#include "mos/mm/slab_autoinit.h"
14#include "mos/platform/platform.h"
15#include "mos/syslog/printk.h"
16#include "mos/tasks/schedule.h"
17#include "mos/tasks/signal.h"
18#include "mos/tasks/wait.h"
19
20#include <mos/filesystem/fs_types.h>
21#include <mos/lib/structures/hashmap_common.h>
22#include <mos/lib/structures/list.h>
23#include <mos/mos_global.h>
24#include <mos_stdlib.h>
25#include <mos_string.h>
26
27#define IPC_SERVER_MAGIC MOS_FOURCC('I', 'P', 'C', 'S')
28
29typedef struct _ipc
30{
31 as_linked_list; ///< attached to either pending or established list
32 size_t buffer_size_npages;
33 const char *server_name;
34
35 waitlist_t client_waitlist; ///< client waits here for the server to accept the connection
36
37 union
38 {
39 pipe_t *client_write_pipe;
40 pipe_t *server_read_pipe;
41 };
42
43 union
44 {
45 pipe_t *server_write_pipe;
46 pipe_t *client_read_pipe;
47 };
48} ipc_t;
49
50typedef struct _ipc_server
51{
52 as_linked_list;
53 const char *name;
54 spinlock_t lock;
55 inode_t *sysfs_ino; ///< inode for sysfs
56 size_t pending_max, pending_n;
57 size_t established_n;
58 list_head pending; ///< list of ipc_t
59
60 waitlist_t server_waitlist; ///< wake up the server here when a client connects
61} ipc_server_t;
62
63static slab_t *ipc_server_slab = NULL;
64SLAB_AUTOINIT("ipc_server", ipc_server_slab, ipc_server_t);
65
66static slab_t *ipc_slab = NULL;
67SLAB_AUTOINIT("ipc", ipc_slab, ipc_t);
68
69static list_head ipc_servers = LIST_HEAD_INIT(ipc_servers);
70static hashmap_t name_waitlist; ///< waitlist for an IPC server, key = name, value = waitlist_t *
71static spinlock_t ipc_lock = SPINLOCK_INIT; ///< protects ipc_servers and name_waitlist
72
73void ipc_server_close(ipc_server_t *server)
74{
75 spinlock_acquire(&ipc_lock);
76 spinlock_acquire(&server->lock);
77 // remove the server from the list
78 list_remove(server);
79
80 waitlist_t *waitlist = hashmap_get(map: &name_waitlist, key: (ptr_t) server->name);
81 if (waitlist)
82 {
83 // the waitlist should have been closed when the server was created
84 // and no one should be waiting on it
85 spinlock_acquire(&waitlist->lock);
86 MOS_ASSERT(waitlist->closed);
87 MOS_ASSERT(list_is_empty(&waitlist->list));
88 spinlock_release(&waitlist->lock);
89 waitlist_init(list: waitlist); // reuse the waitlist
90 }
91 spinlock_release(&ipc_lock);
92
93 // with the server lock held, we reject all pending connections
94 list_foreach(ipc_t, ipc, server->pending)
95 {
96 ipc->buffer_size_npages = 0; // mark the connection as closed
97 // wake up the client
98 waitlist_close(list: &ipc->client_waitlist);
99 waitlist_wake_all(&ipc->client_waitlist);
100 }
101
102 server->pending_max = 0;
103 waitlist_close(list: &server->server_waitlist); // close the server's waitlist
104 int n = waitlist_wake_all(&server->server_waitlist); // wake up the server, if it is waiting
105
106 if (n)
107 {
108 spinlock_release(&server->lock);
109 // let the accept() syscall free the server
110 }
111 else
112 {
113 // now we can free the server
114 kfree(ptr: server->name);
115 spinlock_release(&server->lock);
116 kfree(ptr: server);
117 }
118}
119
120size_t ipc_client_read(ipc_t *ipc, void *buf, size_t size)
121{
122 return pipe_read(pipe: ipc->client_read_pipe, buf, size);
123}
124
125size_t ipc_client_write(ipc_t *ipc, const void *buf, size_t size)
126{
127 return pipe_write(pipe: ipc->client_write_pipe, buf, size);
128}
129
130size_t ipc_server_read(ipc_t *ipc, void *buf, size_t size)
131{
132 return pipe_read(pipe: ipc->server_read_pipe, buf, size);
133}
134
135size_t ipc_server_write(ipc_t *ipc, const void *buf, size_t size)
136{
137 return pipe_write(pipe: ipc->server_write_pipe, buf, size);
138}
139
140void ipc_client_close_channel(ipc_t *ipc)
141{
142 bool r_fullyclosed = pipe_close_one_end(pipe: ipc->client_read_pipe);
143 bool w_fullyclosed = pipe_close_one_end(pipe: ipc->client_write_pipe);
144 MOS_ASSERT(r_fullyclosed == w_fullyclosed); // both ends should have the same return value
145
146 if (r_fullyclosed || w_fullyclosed)
147 {
148 // now we can free the ipc
149 kfree(ptr: ipc->server_name);
150 kfree(ptr: ipc);
151 return;
152 }
153}
154
155void ipc_server_close_channel(ipc_t *ipc)
156{
157 bool r_fullyclosed = pipe_close_one_end(pipe: ipc->server_read_pipe);
158 bool w_fullyclosed = pipe_close_one_end(pipe: ipc->server_write_pipe);
159 MOS_ASSERT(r_fullyclosed == w_fullyclosed); // both ends should have the same return value
160
161 if (r_fullyclosed || w_fullyclosed)
162 {
163 // now we can free the ipc
164 kfree(ptr: ipc->server_name);
165 kfree(ptr: ipc);
166 return;
167 }
168}
169
170void ipc_init(void)
171{
172 hashmap_init(map: &name_waitlist, capacity: 128, hash_func: hashmap_hash_string, compare_func: hashmap_compare_string);
173}
174
175static inode_t *ipc_sysfs_create_ino(ipc_server_t *ipc_server);
176
177ipc_server_t *ipc_server_create(const char *name, size_t max_pending)
178{
179 pr_dinfo(ipc, "creating ipc server '%s' with max_pending=%zu", name, max_pending);
180 spinlock_acquire(&ipc_lock);
181 list_foreach(ipc_server_t, server, ipc_servers)
182 {
183 if (strcmp(str1: server->name, str2: name) == 0)
184 {
185 spinlock_release(&ipc_lock);
186 pr_dwarn(ipc, "ipc server '%s' already exists", name);
187 return ERR_PTR(error: -EEXIST);
188 }
189 }
190
191 ipc_server_t *server = kmalloc(ipc_server_slab);
192 // we don't need to acquire the lock here because the server is not yet announced
193 linked_list_init(list_node(server));
194 linked_list_init(head_node: &server->pending);
195 waitlist_init(list: &server->server_waitlist);
196 server->name = strdup(src: name);
197 server->pending_max = max_pending;
198
199 // now announce the server
200 list_node_append(head: &ipc_servers, list_node(server));
201 ipc_sysfs_create_ino(ipc_server: server);
202
203 // check and see if there is a waitlist for this name
204 // if so, wake up all waiters
205 waitlist_t *waitlist = hashmap_get(map: &name_waitlist, key: (ptr_t) name);
206 if (waitlist)
207 {
208 pr_dinfo2(ipc, "found waitlist for ipc server '%s'", name);
209 // wake up all waiters
210 waitlist_close(list: waitlist);
211 const size_t n = waitlist_wake_all(waitlist);
212 if (n)
213 pr_dinfo2(ipc, "woken up %zu waiters for ipc server '%s'", n, name);
214 }
215 spinlock_release(&ipc_lock);
216
217 return server;
218}
219
220ipc_server_t *ipc_get_server(const char *name)
221{
222 spinlock_acquire(&ipc_lock);
223 list_foreach(ipc_server_t, server, ipc_servers)
224 {
225 if (strcmp(str1: server->name, str2: name) == 0)
226 {
227 spinlock_release(&ipc_lock);
228 return server;
229 }
230 }
231
232 spinlock_release(&ipc_lock);
233 return NULL;
234}
235
236ipc_t *ipc_server_accept(ipc_server_t *ipc_server)
237{
238 pr_dinfo(ipc, "accepting connection on ipc server '%s'...", ipc_server->name);
239
240retry_accept:
241 spinlock_acquire(&ipc_server->lock);
242
243 // check if the server is closed
244 if (ipc_server->pending_max == 0)
245 {
246 // now we can free the server
247 pr_dinfo2(ipc, "ipc server '%s' is closed, aborting accept()", ipc_server->name);
248 kfree(ptr: ipc_server->name);
249 kfree(ptr: ipc_server);
250 return ERR_PTR(error: -ECONNABORTED);
251 }
252
253 if (ipc_server->pending_n == 0)
254 {
255 // no pending connections, wait for a client to connect
256 pr_dinfo2(ipc, "no pending connections, waiting for a client to connect...");
257 MOS_ASSERT(waitlist_append(&ipc_server->server_waitlist));
258 spinlock_release(&ipc_server->lock);
259 blocked_reschedule();
260
261 if (signal_has_pending())
262 {
263 pr_dinfo2(ipc, "woken up by a signal, aborting accept()");
264 waitlist_remove_me(waitlist: &ipc_server->server_waitlist);
265 return ERR_PTR(error: -EINTR);
266 }
267
268 goto retry_accept; // the server has woken us up, try again
269 }
270
271 // get the first pending connection
272 MOS_ASSERT(!list_is_empty(&ipc_server->pending));
273 ipc_t *ipc = list_node_next_entry(&ipc_server->pending, ipc_t);
274 list_remove(ipc);
275 ipc_server->pending_n--;
276 spinlock_release(&ipc_server->lock);
277
278 MOS_ASSERT(ipc->buffer_size_npages > 0);
279 pr_dinfo(ipc, "accepted a connection on ipc server '%s' with buffer_size_npages=%zu", ipc_server->name, ipc->buffer_size_npages);
280
281 // setup the pipes
282 ipc->server_read_pipe = pipe_create(bufsize: ipc->buffer_size_npages);
283 ipc->server_write_pipe = pipe_create(bufsize: ipc->buffer_size_npages);
284
285 // wake up the client
286 waitlist_wake_all(&ipc->client_waitlist);
287
288 return ipc;
289}
290
291ipc_t *ipc_connect_to_server(const char *name, size_t buffer_size)
292{
293 if (buffer_size == 0)
294 return ERR_PTR(error: -EINVAL); // buffer size must be > 0
295
296 pr_dinfo(ipc, "connecting to ipc server '%s' with buffer_size=%zu", name, buffer_size);
297
298check_server:
299 // check if the server exists
300 spinlock_acquire(&ipc_lock);
301 ipc_server_t *ipc_server = NULL;
302 list_foreach(ipc_server_t, server, ipc_servers)
303 {
304 if (strcmp(str1: server->name, str2: name) == 0)
305 {
306 ipc_server = server;
307 // we are holding the ipc_servers_lock, so that the server won't deannounce itself
308 // while we are checking the server list, thus the server won't be freed
309 spinlock_acquire(&ipc_server->lock);
310 pr_dinfo2(ipc, "found ipc server '%s'", ipc_server->name);
311 break;
312 }
313 }
314
315 // now we have a server, we can create the connection
316 ipc_t *const ipc = kmalloc(ipc_slab);
317 linked_list_init(list_node(ipc));
318 waitlist_init(list: &ipc->client_waitlist);
319 buffer_size = ALIGN_UP_TO_PAGE(buffer_size);
320 ipc->buffer_size_npages = buffer_size / MOS_PAGE_SIZE;
321 ipc->server_name = strdup(src: name);
322
323 if (!ipc_server)
324 {
325 // no server found, wait for it to be created
326 waitlist_t *waitlist = hashmap_get(map: &name_waitlist, key: (ptr_t) name);
327 if (!waitlist)
328 {
329 waitlist = kmalloc(waitlist_slab);
330 waitlist_init(list: waitlist);
331 waitlist_t *const old = hashmap_put(map: &name_waitlist, key: (ptr_t) ipc->server_name, value: waitlist); // the key must be in kernel memory
332 if (old)
333 {
334 // someone else has created the waitlist, but now we have replaced it
335 // so we have to append the old waitlist to the new one
336 list_foreach(thread_t, thread, old->list)
337 {
338 MOS_ASSERT(waitlist_append(waitlist));
339 }
340 }
341 pr_dinfo2(ipc, "created waitlist for ipc server '%s'", name);
342 }
343
344 pr_dinfo2(ipc, "no ipc server '%s' found, waiting for it to be created...", name);
345 MOS_ASSERT(waitlist_append(waitlist));
346 spinlock_release(&ipc_lock);
347 blocked_reschedule();
348
349 if (signal_has_pending())
350 {
351 pr_dinfo2(ipc, "woken up by a signal, aborting connect()");
352 kfree(ptr: ipc);
353 return ERR_PTR(error: -EINTR);
354 }
355
356 // now check if the server exists again
357 goto check_server;
358 }
359 spinlock_release(&ipc_lock);
360
361 // add the connection to the pending list
362 if (ipc_server->pending_n >= ipc_server->pending_max)
363 {
364 pr_dwarn(ipc, "ipc server '%s' has reached its max pending connections, rejecting connection", ipc_server->name);
365 spinlock_release(&ipc_server->lock);
366 kfree(ptr: ipc);
367 return ERR_PTR(error: -ECONNREFUSED);
368 }
369
370 list_node_append(head: &ipc_server->pending, list_node(ipc)); // add to pending list
371 ipc_server->pending_n++;
372
373 // now wait for the server to accept the connection
374 MOS_ASSERT(waitlist_append(&ipc->client_waitlist));
375 waitlist_wake(list: &ipc_server->server_waitlist, max_wakeups: 1);
376 spinlock_release(&ipc_server->lock); // now the server can do whatever it wants
377
378 blocked_reschedule();
379 // the server has woken us up and has accepted the connection, or it is closed
380 pr_dinfo2(ipc, "ipc server '%s' woke us up", ipc_server->name);
381
382 // check if the server has closed
383 if (ipc->buffer_size_npages == 0)
384 {
385 // the server is closed, don't touch ipc_server pointer anymore
386 pr_dwarn(ipc, "ipc server '%s' has closed", ipc_server->name);
387 ipc_server = NULL;
388 kfree(ptr: ipc);
389 return ERR_PTR(error: -ECONNREFUSED);
390 }
391
392 // now we have a connection, both the read and write pipes are ready, the io object is also ready
393 // we just need to return the io object
394 pr_dinfo2(ipc, "ipc server '%s' has accepted the connection", ipc_server->name);
395 return ipc;
396}
397
398// ! sysfs support
399
400static bool ipc_sysfs_servers(sysfs_file_t *f)
401{
402 sysfs_printf(file: f, fmt: "%-20s\t%s\n", "Server Name", "Max Pending Connections");
403 list_foreach(ipc_server_t, ipc, ipc_servers)
404 {
405 sysfs_printf(file: f, fmt: "%-20s\t%zu\n", ipc->name, ipc->pending_max);
406 }
407
408 return true;
409}
410
411static inode_t *ipc_sysfs_create_ino(ipc_server_t *ipc_server)
412{
413 extern const file_ops_t ipc_sysfs_file_ops;
414 ipc_server->sysfs_ino = sysfs_create_inode(type: FILE_TYPE_CHAR_DEVICE, data: ipc_server);
415 ipc_server->sysfs_ino->perm = PERM_OWNER & (PERM_READ | PERM_WRITE);
416 ipc_server->sysfs_ino->file_ops = &ipc_sysfs_file_ops;
417 return ipc_server->sysfs_ino;
418}
419
420static void ipc_sysfs_list_ipcs(sysfs_item_t *item, dentry_t *d, vfs_listdir_state_t *state, dentry_iterator_op add_record)
421{
422 MOS_UNUSED(item);
423 MOS_UNUSED(d);
424
425 list_foreach(ipc_server_t, ipc_server, ipc_servers)
426 {
427 MOS_ASSERT(ipc_server->sysfs_ino);
428 add_record(state, ipc_server->sysfs_ino->ino, ipc_server->name, strlen(str: ipc_server->name), ipc_server->sysfs_ino->type);
429 }
430}
431
432static bool ipc_sysfs_lookup_ipc(inode_t *parent_dir, dentry_t *dentry)
433{
434 MOS_UNUSED(parent_dir);
435
436 const char *name = dentry->name;
437 ipc_server_t *ipc_server = NULL;
438 list_foreach(ipc_server_t, ipc, ipc_servers)
439 {
440 if (strcmp(str1: ipc->name, str2: name) == 0)
441 {
442 ipc_server = ipc;
443 break;
444 }
445 }
446
447 if (ipc_server == NULL)
448 return false;
449
450 dentry_attach(d: dentry, inode: ipc_server->sysfs_ino);
451 return dentry->inode != NULL;
452}
453
454static bool ipc_sysfs_create_server(inode_t *dir, dentry_t *dentry, file_type_t type, file_perm_t perm)
455{
456 MOS_UNUSED(dir);
457 MOS_UNUSED(perm);
458
459 if (type != FILE_TYPE_REGULAR)
460 return false;
461
462 ipc_server_t *ipc_server = ipc_server_create(name: dentry->name, max_pending: 1);
463 if (IS_ERR(ptr: ipc_server))
464 return false;
465
466 dentry_attach(d: dentry, inode: ipc_server->sysfs_ino);
467 return true;
468}
469
470static bool ipc_dump_name_waitlist(uintn key, void *value, void *data)
471{
472 MOS_UNUSED(key);
473 waitlist_t *waitlist = value;
474 sysfs_file_t *f = data;
475
476 spinlock_acquire(&waitlist->lock);
477 sysfs_printf(file: f, fmt: "%s\t%s:\n", (const char *) key, waitlist->closed ? "closed" : "open");
478 list_foreach(thread_t, thread, waitlist->list)
479 {
480 sysfs_printf(file: f, fmt: "\t%s\n", thread->name);
481 }
482 spinlock_release(&waitlist->lock);
483
484 return true;
485}
486
487static bool ipc_sysfs_dump_name_waitlist(sysfs_file_t *f)
488{
489 sysfs_printf(file: f, fmt: "%-20s\t%s\n", "IPC Name", "Status");
490 spinlock_acquire(&ipc_lock);
491 hashmap_foreach(map: &name_waitlist, func: ipc_dump_name_waitlist, data: f);
492 spinlock_release(&ipc_lock);
493 return true;
494}
495
496static sysfs_item_t ipc_sysfs_items[] = {
497 SYSFS_RO_ITEM("servers", ipc_sysfs_servers),
498 SYSFS_DYN_DIR("ipcs", ipc_sysfs_list_ipcs, ipc_sysfs_lookup_ipc, ipc_sysfs_create_server),
499 SYSFS_RO_ITEM("name_waitlist", ipc_sysfs_dump_name_waitlist),
500};
501
502SYSFS_AUTOREGISTER(ipc, ipc_sysfs_items);
503