1// SPDX-License-Identifier: GPL-3.0-or-later
2// abstract pipe implementation
3// A pipe is a buffer that only has a single reader and a single writer
4
5#include <limits.h>
6#define pr_fmt(fmt) "pipe: " fmt
7
8#include "mos/ipc/pipe.h"
9#include "mos/mm/slab_autoinit.h"
10#include "mos/platform/platform.h"
11#include "mos/syslog/printk.h"
12#include "mos/tasks/schedule.h"
13#include "mos/tasks/signal.h"
14#include "mos/tasks/wait.h"
15
16#include <mos/lib/sync/spinlock.h>
17#include <mos_stdlib.h>
18
19#define PIPE_MAGIC MOS_FOURCC('P', 'I', 'P', 'E')
20
21static slab_t *pipe_slab = NULL;
22SLAB_AUTOINIT("pipe", pipe_slab, pipe_t);
23
24static slab_t *pipeio_slab = NULL;
25SLAB_AUTOINIT("pipeio", pipeio_slab, pipeio_t);
26
27#define advance_buffer(buffer, bytes) ((buffer) = (void *) ((char *) (buffer) + (bytes)))
28
29size_t pipe_write(pipe_t *pipe, const void *buf, size_t size)
30{
31 if (pipe->magic != PIPE_MAGIC)
32 {
33 pr_warn("pipe_io_write: invalid magic");
34 return 0;
35 }
36
37 pr_dinfo2(pipe, "writing %zu bytes", size);
38
39 // write data to buffer
40 spinlock_acquire(&pipe->lock);
41
42 if (pipe->other_closed)
43 {
44 pr_dinfo2(pipe, "pipe closed");
45 signal_send_to_thread(current_thread, SIGPIPE);
46 spinlock_release(&pipe->lock);
47 return -EPIPE; // pipe closed
48 }
49
50 size_t total_written = 0;
51
52retry_write:;
53 const size_t written = ring_buffer_pos_push_back(buffer: pipe->buffers, pos: &pipe->buffer_pos, data: buf, size);
54 advance_buffer(buf, written), size -= written, total_written += written;
55
56 if (size > 0)
57 {
58 // buffer is full, wait for the reader to read some data
59 pr_dinfo2(pipe, "pipe buffer full, waiting...");
60 spinlock_release(&pipe->lock);
61 waitlist_wake(list: &pipe->waitlist, INT_MAX); // wake up any readers that are waiting for data
62 MOS_ASSERT(reschedule_for_waitlist(&pipe->waitlist)); // wait for the reader to read some data
63 if (signal_has_pending())
64 {
65 pr_dinfo2(pipe, "signal pending, returning early");
66 return total_written;
67 }
68 spinlock_acquire(&pipe->lock);
69
70 // check if the pipe is still valid
71 if (pipe->other_closed)
72 {
73 pr_dinfo2(pipe, "pipe closed");
74 signal_send_to_thread(current_thread, SIGPIPE);
75 spinlock_release(&pipe->lock);
76 return -EPIPE; // pipe closed
77 }
78
79 goto retry_write;
80 }
81
82 spinlock_release(&pipe->lock);
83
84 // wake up any readers that are waiting for data
85 waitlist_wake(list: &pipe->waitlist, INT_MAX);
86 return total_written;
87}
88
89size_t pipe_read(pipe_t *pipe, void *buf, size_t size)
90{
91 if (pipe->magic != PIPE_MAGIC)
92 {
93 pr_warn("pipe_io_read: invalid magic");
94 return 0;
95 }
96
97 pr_dinfo2(pipe, "reading %zu bytes", size);
98
99 // read data from buffer
100 spinlock_acquire(&pipe->lock);
101
102 size_t total_read = 0;
103
104retry_read:;
105 const size_t read = ring_buffer_pos_pop_front(buffer: pipe->buffers, pos: &pipe->buffer_pos, buf, size);
106 advance_buffer(buf, read), size -= read, total_read += read;
107
108 if (size > 0)
109 {
110 // check if the pipe is still valid
111 if (pipe->other_closed && ring_buffer_pos_is_empty(pos: &pipe->buffer_pos))
112 {
113 pr_dinfo2(pipe, "pipe closed");
114 spinlock_release(&pipe->lock);
115 waitlist_wake(list: &pipe->waitlist, INT_MAX);
116 pr_dinfo2(pipe, "read %zu bytes", total_read);
117 return total_read; // EOF
118 }
119
120 // buffer is empty, wait for the writer to write some data
121 pr_dinfo2(pipe, "pipe buffer empty, waiting...");
122 spinlock_release(&pipe->lock);
123 waitlist_wake(list: &pipe->waitlist, INT_MAX); // wake up any writers that are waiting for space in the buffer
124 MOS_ASSERT(reschedule_for_waitlist(&pipe->waitlist)); // wait for the writer to write some data
125 if (signal_has_pending())
126 {
127 pr_dinfo2(pipe, "signal pending, returning early");
128 return total_read;
129 }
130 spinlock_acquire(&pipe->lock);
131 goto retry_read;
132 }
133
134 spinlock_release(&pipe->lock);
135
136 // wake up any writers that are waiting for space in the buffer
137 waitlist_wake(list: &pipe->waitlist, INT_MAX);
138
139 pr_dinfo2(pipe, "read %zu bytes", total_read);
140 return total_read;
141}
142
143bool pipe_close_one_end(pipe_t *pipe)
144{
145 if (pipe->magic != PIPE_MAGIC)
146 {
147 pr_warn("pipe_io_close: invalid magic");
148 return false;
149 }
150
151 spinlock_acquire(&pipe->lock);
152 if (!pipe->other_closed)
153 {
154 pipe->other_closed = true;
155 spinlock_release(&pipe->lock);
156
157 // wake up any readers/writers that are waiting for data/space in the buffer
158 waitlist_wake(list: &pipe->waitlist, INT_MAX);
159 return false;
160 }
161 else
162 {
163 // the other end of the pipe is already closed, so we can just free the pipe
164 spinlock_release(&pipe->lock);
165
166 mm_free_pages(va_phyframe(pipe->buffers), pipe->buffer_pos.capacity / MOS_PAGE_SIZE);
167 kfree(ptr: pipe);
168 return true;
169 }
170
171 MOS_UNREACHABLE();
172}
173
174pipe_t *pipe_create(size_t bufsize)
175{
176 bufsize = ALIGN_UP_TO_PAGE(bufsize);
177
178 pipe_t *pipe = kmalloc(pipe_slab);
179 pipe->magic = PIPE_MAGIC;
180 pipe->buffers = (void *) phyframe_va(mm_get_free_pages(bufsize / MOS_PAGE_SIZE));
181 waitlist_init(list: &pipe->waitlist);
182 ring_buffer_pos_init(pos: &pipe->buffer_pos, capacity: bufsize);
183 return pipe;
184}
185
186static size_t pipeio_io_read(io_t *io, void *buf, size_t size)
187{
188 MOS_ASSERT(io->flags & IO_READABLE);
189 pipeio_t *pipeio = container_of(io, pipeio_t, io_r);
190 return pipe_read(pipe: pipeio->pipe, buf, size);
191}
192
193static size_t pipeio_io_write(io_t *io, const void *buf, size_t size)
194{
195 MOS_ASSERT(io->flags & IO_WRITABLE);
196 pipeio_t *pipeio = container_of(io, pipeio_t, io_w);
197 return pipe_write(pipe: pipeio->pipe, buf, size);
198}
199
200static void pipeio_io_close(io_t *io)
201{
202 const char *type = "<unknown>";
203 const pipeio_t *const pipeio = statement_expr(const pipeio_t *, {
204 if (io->flags & IO_READABLE)
205 retval = container_of(io, pipeio_t, io_r), type = "reader"; // the reader is closing
206 else if (io->flags & IO_WRITABLE)
207 retval = container_of(io, pipeio_t, io_w), type = "writer"; // the writer is closing
208 else
209 MOS_UNREACHABLE();
210 });
211
212 if (!pipeio->pipe->other_closed)
213 pr_dinfo2(pipe, "pipe %s closing", type);
214 else
215 pr_dinfo2(pipe, "pipe is already closed by the other end, '%s' closing", type);
216
217 const bool fully_closed = pipe_close_one_end(pipe: pipeio->pipe);
218 if (fully_closed)
219 kfree(ptr: pipeio);
220}
221
222static const io_op_t pipe_io_ops = {
223 .write = pipeio_io_write,
224 .read = pipeio_io_read,
225 .close = pipeio_io_close,
226};
227
228pipeio_t *pipeio_create(pipe_t *pipe)
229{
230 pipeio_t *pipeio = kmalloc(pipeio_slab);
231 pipeio->pipe = pipe;
232 io_init(io: &pipeio->io_r, type: IO_PIPE, flags: IO_READABLE, ops: &pipe_io_ops);
233 io_init(io: &pipeio->io_w, type: IO_PIPE, flags: IO_WRITABLE, ops: &pipe_io_ops);
234 return pipeio;
235}
236