1 | // SPDX-License-Identifier: GPL-3.0-or-later |
2 | // abstract pipe implementation |
3 | // A pipe is a buffer that only has a single reader and a single writer |
4 | |
5 | #include <limits.h> |
6 | #define pr_fmt(fmt) "pipe: " fmt |
7 | |
8 | #include "mos/ipc/pipe.h" |
9 | #include "mos/mm/slab_autoinit.h" |
10 | #include "mos/platform/platform.h" |
11 | #include "mos/syslog/printk.h" |
12 | #include "mos/tasks/schedule.h" |
13 | #include "mos/tasks/signal.h" |
14 | #include "mos/tasks/wait.h" |
15 | |
16 | #include <mos/lib/sync/spinlock.h> |
17 | #include <mos_stdlib.h> |
18 | |
19 | #define PIPE_MAGIC MOS_FOURCC('P', 'I', 'P', 'E') |
20 | |
21 | static slab_t *pipe_slab = NULL; |
22 | SLAB_AUTOINIT("pipe" , pipe_slab, pipe_t); |
23 | |
24 | static slab_t *pipeio_slab = NULL; |
25 | SLAB_AUTOINIT("pipeio" , pipeio_slab, pipeio_t); |
26 | |
27 | #define advance_buffer(buffer, bytes) ((buffer) = (void *) ((char *) (buffer) + (bytes))) |
28 | |
29 | size_t pipe_write(pipe_t *pipe, const void *buf, size_t size) |
30 | { |
31 | if (pipe->magic != PIPE_MAGIC) |
32 | { |
33 | pr_warn("pipe_io_write: invalid magic" ); |
34 | return 0; |
35 | } |
36 | |
37 | pr_dinfo2(pipe, "writing %zu bytes" , size); |
38 | |
39 | // write data to buffer |
40 | spinlock_acquire(&pipe->lock); |
41 | |
42 | if (pipe->other_closed) |
43 | { |
44 | pr_dinfo2(pipe, "pipe closed" ); |
45 | signal_send_to_thread(current_thread, SIGPIPE); |
46 | spinlock_release(&pipe->lock); |
47 | return -EPIPE; // pipe closed |
48 | } |
49 | |
50 | size_t total_written = 0; |
51 | |
52 | retry_write:; |
53 | const size_t written = ring_buffer_pos_push_back(buffer: pipe->buffers, pos: &pipe->buffer_pos, data: buf, size); |
54 | advance_buffer(buf, written), size -= written, total_written += written; |
55 | |
56 | if (size > 0) |
57 | { |
58 | // buffer is full, wait for the reader to read some data |
59 | pr_dinfo2(pipe, "pipe buffer full, waiting..." ); |
60 | spinlock_release(&pipe->lock); |
61 | waitlist_wake(list: &pipe->waitlist, INT_MAX); // wake up any readers that are waiting for data |
62 | MOS_ASSERT(reschedule_for_waitlist(&pipe->waitlist)); // wait for the reader to read some data |
63 | if (signal_has_pending()) |
64 | { |
65 | pr_dinfo2(pipe, "signal pending, returning early" ); |
66 | return total_written; |
67 | } |
68 | spinlock_acquire(&pipe->lock); |
69 | |
70 | // check if the pipe is still valid |
71 | if (pipe->other_closed) |
72 | { |
73 | pr_dinfo2(pipe, "pipe closed" ); |
74 | signal_send_to_thread(current_thread, SIGPIPE); |
75 | spinlock_release(&pipe->lock); |
76 | return -EPIPE; // pipe closed |
77 | } |
78 | |
79 | goto retry_write; |
80 | } |
81 | |
82 | spinlock_release(&pipe->lock); |
83 | |
84 | // wake up any readers that are waiting for data |
85 | waitlist_wake(list: &pipe->waitlist, INT_MAX); |
86 | return total_written; |
87 | } |
88 | |
89 | size_t pipe_read(pipe_t *pipe, void *buf, size_t size) |
90 | { |
91 | if (pipe->magic != PIPE_MAGIC) |
92 | { |
93 | pr_warn("pipe_io_read: invalid magic" ); |
94 | return 0; |
95 | } |
96 | |
97 | pr_dinfo2(pipe, "reading %zu bytes" , size); |
98 | |
99 | // read data from buffer |
100 | spinlock_acquire(&pipe->lock); |
101 | |
102 | size_t total_read = 0; |
103 | |
104 | retry_read:; |
105 | const size_t read = ring_buffer_pos_pop_front(buffer: pipe->buffers, pos: &pipe->buffer_pos, buf, size); |
106 | advance_buffer(buf, read), size -= read, total_read += read; |
107 | |
108 | if (size > 0) |
109 | { |
110 | // check if the pipe is still valid |
111 | if (pipe->other_closed && ring_buffer_pos_is_empty(pos: &pipe->buffer_pos)) |
112 | { |
113 | pr_dinfo2(pipe, "pipe closed" ); |
114 | spinlock_release(&pipe->lock); |
115 | waitlist_wake(list: &pipe->waitlist, INT_MAX); |
116 | pr_dinfo2(pipe, "read %zu bytes" , total_read); |
117 | return total_read; // EOF |
118 | } |
119 | |
120 | // buffer is empty, wait for the writer to write some data |
121 | pr_dinfo2(pipe, "pipe buffer empty, waiting..." ); |
122 | spinlock_release(&pipe->lock); |
123 | waitlist_wake(list: &pipe->waitlist, INT_MAX); // wake up any writers that are waiting for space in the buffer |
124 | MOS_ASSERT(reschedule_for_waitlist(&pipe->waitlist)); // wait for the writer to write some data |
125 | if (signal_has_pending()) |
126 | { |
127 | pr_dinfo2(pipe, "signal pending, returning early" ); |
128 | return total_read; |
129 | } |
130 | spinlock_acquire(&pipe->lock); |
131 | goto retry_read; |
132 | } |
133 | |
134 | spinlock_release(&pipe->lock); |
135 | |
136 | // wake up any writers that are waiting for space in the buffer |
137 | waitlist_wake(list: &pipe->waitlist, INT_MAX); |
138 | |
139 | pr_dinfo2(pipe, "read %zu bytes" , total_read); |
140 | return total_read; |
141 | } |
142 | |
143 | bool pipe_close_one_end(pipe_t *pipe) |
144 | { |
145 | if (pipe->magic != PIPE_MAGIC) |
146 | { |
147 | pr_warn("pipe_io_close: invalid magic" ); |
148 | return false; |
149 | } |
150 | |
151 | spinlock_acquire(&pipe->lock); |
152 | if (!pipe->other_closed) |
153 | { |
154 | pipe->other_closed = true; |
155 | spinlock_release(&pipe->lock); |
156 | |
157 | // wake up any readers/writers that are waiting for data/space in the buffer |
158 | waitlist_wake(list: &pipe->waitlist, INT_MAX); |
159 | return false; |
160 | } |
161 | else |
162 | { |
163 | // the other end of the pipe is already closed, so we can just free the pipe |
164 | spinlock_release(&pipe->lock); |
165 | |
166 | mm_free_pages(va_phyframe(pipe->buffers), pipe->buffer_pos.capacity / MOS_PAGE_SIZE); |
167 | kfree(ptr: pipe); |
168 | return true; |
169 | } |
170 | |
171 | MOS_UNREACHABLE(); |
172 | } |
173 | |
174 | pipe_t *pipe_create(size_t bufsize) |
175 | { |
176 | bufsize = ALIGN_UP_TO_PAGE(bufsize); |
177 | |
178 | pipe_t *pipe = kmalloc(pipe_slab); |
179 | pipe->magic = PIPE_MAGIC; |
180 | pipe->buffers = (void *) phyframe_va(mm_get_free_pages(bufsize / MOS_PAGE_SIZE)); |
181 | waitlist_init(list: &pipe->waitlist); |
182 | ring_buffer_pos_init(pos: &pipe->buffer_pos, capacity: bufsize); |
183 | return pipe; |
184 | } |
185 | |
186 | static size_t pipeio_io_read(io_t *io, void *buf, size_t size) |
187 | { |
188 | MOS_ASSERT(io->flags & IO_READABLE); |
189 | pipeio_t *pipeio = container_of(io, pipeio_t, io_r); |
190 | return pipe_read(pipe: pipeio->pipe, buf, size); |
191 | } |
192 | |
193 | static size_t pipeio_io_write(io_t *io, const void *buf, size_t size) |
194 | { |
195 | MOS_ASSERT(io->flags & IO_WRITABLE); |
196 | pipeio_t *pipeio = container_of(io, pipeio_t, io_w); |
197 | return pipe_write(pipe: pipeio->pipe, buf, size); |
198 | } |
199 | |
200 | static void pipeio_io_close(io_t *io) |
201 | { |
202 | const char *type = "<unknown>" ; |
203 | const pipeio_t *const pipeio = statement_expr(const pipeio_t *, { |
204 | if (io->flags & IO_READABLE) |
205 | retval = container_of(io, pipeio_t, io_r), type = "reader" ; // the reader is closing |
206 | else if (io->flags & IO_WRITABLE) |
207 | retval = container_of(io, pipeio_t, io_w), type = "writer" ; // the writer is closing |
208 | else |
209 | MOS_UNREACHABLE(); |
210 | }); |
211 | |
212 | if (!pipeio->pipe->other_closed) |
213 | pr_dinfo2(pipe, "pipe %s closing" , type); |
214 | else |
215 | pr_dinfo2(pipe, "pipe is already closed by the other end, '%s' closing" , type); |
216 | |
217 | const bool fully_closed = pipe_close_one_end(pipe: pipeio->pipe); |
218 | if (fully_closed) |
219 | kfree(ptr: pipeio); |
220 | } |
221 | |
222 | static const io_op_t pipe_io_ops = { |
223 | .write = pipeio_io_write, |
224 | .read = pipeio_io_read, |
225 | .close = pipeio_io_close, |
226 | }; |
227 | |
228 | pipeio_t *pipeio_create(pipe_t *pipe) |
229 | { |
230 | pipeio_t *pipeio = kmalloc(pipeio_slab); |
231 | pipeio->pipe = pipe; |
232 | io_init(io: &pipeio->io_r, type: IO_PIPE, flags: IO_READABLE, ops: &pipe_io_ops); |
233 | io_init(io: &pipeio->io_w, type: IO_PIPE, flags: IO_WRITABLE, ops: &pipe_io_ops); |
234 | return pipeio; |
235 | } |
236 | |