1// SPDX-License-Identifier: GPL-3.0-or-later
2// abstract pipe implementation
3// A pipe is a buffer that only has a single reader and a single writer
4
5#include "mos/ipc/pipe.hpp"
6
7#include "mos/platform/platform.hpp"
8#include "mos/tasks/schedule.hpp"
9#include "mos/tasks/signal.hpp"
10#include "mos/tasks/wait.hpp"
11
12#include <climits>
13#include <mos/lib/sync/spinlock.hpp>
14#include <mos_stdlib.hpp>
15
16#define PIPE_MAGIC MOS_FOURCC('P', 'I', 'P', 'E')
17
18#define advance_buffer(buffer, bytes) ((buffer) = (void *) ((char *) (buffer) + (bytes)))
19
20size_t pipe_write(pipe_t *p, const void *buf, size_t size)
21{
22 if (p->magic != PIPE_MAGIC)
23 {
24 mWarn << "pipe_io_write: invalid magic";
25 return 0;
26 }
27
28 dInfo2<pipe> << "writing " << size << " bytes";
29
30 // write data to buffer
31 spinlock_acquire(&p->lock);
32
33 if (p->other_closed)
34 {
35 dInfo2<pipe> << "pipe closed";
36 signal_send_to_thread(current_thread, SIGPIPE);
37 spinlock_release(&p->lock);
38 return -EPIPE; // pipe closed
39 }
40
41 size_t total_written = 0;
42
43retry_write:;
44 const size_t written = ring_buffer_pos_push_back(buffer: (u8 *) p->buffers, pos: &p->buffer_pos, data: (u8 *) buf, size);
45 advance_buffer(buf, written), size -= written, total_written += written;
46
47 if (size > 0)
48 {
49 // buffer is full, wait for the reader to read some data
50 dInfo2<pipe> << "pipe buffer full, waiting...";
51 spinlock_release(&p->lock);
52 waitlist_wake(list: &p->waitlist, INT_MAX); // wake up any readers that are waiting for data
53 MOS_ASSERT(reschedule_for_waitlist(&p->waitlist)); // wait for the reader to read some data
54 if (signal_has_pending())
55 {
56 dInfo2<pipe> << "signal pending, returning early";
57 return total_written;
58 }
59 spinlock_acquire(&p->lock);
60
61 // check if the pipe is still valid
62 if (p->other_closed)
63 {
64 dInfo2<pipe> << "pipe closed";
65 signal_send_to_thread(current_thread, SIGPIPE);
66 spinlock_release(&p->lock);
67 return -EPIPE; // pipe closed
68 }
69
70 goto retry_write;
71 }
72
73 spinlock_release(&p->lock);
74
75 // wake up any readers that are waiting for data
76 waitlist_wake(list: &p->waitlist, INT_MAX);
77 return total_written;
78}
79
80size_t pipe_read(pipe_t *p, void *buf, size_t size)
81{
82 if (p->magic != PIPE_MAGIC)
83 {
84 mWarn << "pipe_io_read: invalid magic";
85 return 0;
86 }
87
88 dInfo2<pipe> << "reading " << size << " bytes";
89
90 // read data from buffer
91 spinlock_acquire(&p->lock);
92
93 size_t total_read = 0;
94
95retry_read:;
96 const size_t read = ring_buffer_pos_pop_front(buffer: (u8 *) p->buffers, pos: &p->buffer_pos, buf: (u8 *) buf, size);
97 advance_buffer(buf, read), size -= read, total_read += read;
98
99 if (size > 0)
100 {
101 // check if the pipe is still valid
102 if (p->other_closed && ring_buffer_pos_is_empty(pos: &p->buffer_pos))
103 {
104 dInfo2<pipe> << "pipe closed";
105 spinlock_release(&p->lock);
106 waitlist_wake(list: &p->waitlist, INT_MAX);
107 dInfo2<pipe> << "read " << total_read << " bytes";
108 return total_read; // EOF
109 }
110
111 // buffer is empty, wait for the writer to write some data
112 dInfo2<pipe> << "pipe buffer empty, waiting...";
113 spinlock_release(&p->lock);
114 waitlist_wake(list: &p->waitlist, INT_MAX); // wake up any writers that are waiting for space in the buffer
115 MOS_ASSERT(reschedule_for_waitlist(&p->waitlist)); // wait for the writer to write some data
116 if (signal_has_pending())
117 {
118 dInfo2<pipe> << "signal pending, returning early";
119 return total_read;
120 }
121 spinlock_acquire(&p->lock);
122 goto retry_read;
123 }
124
125 spinlock_release(&p->lock);
126
127 // wake up any writers that are waiting for space in the buffer
128 waitlist_wake(list: &p->waitlist, INT_MAX);
129
130 dInfo2<pipe> << "read " << total_read << " bytes";
131 return total_read;
132}
133
134bool pipe_close_one_end(pipe_t *pipe)
135{
136 if (pipe->magic != PIPE_MAGIC)
137 {
138 mWarn << "pipe_io_close: invalid magic";
139 return false;
140 }
141
142 spinlock_acquire(&pipe->lock);
143 if (!pipe->other_closed)
144 {
145 pipe->other_closed = true;
146 spinlock_release(&pipe->lock);
147
148 // wake up any readers/writers that are waiting for data/space in the buffer
149 waitlist_wake(list: &pipe->waitlist, INT_MAX);
150 return false;
151 }
152 else
153 {
154 // the other end of the pipe is already closed, so we can just free the pipe
155 spinlock_release(&pipe->lock);
156
157 mm_free_pages(va_phyframe(pipe->buffers), pipe->buffer_pos.capacity / MOS_PAGE_SIZE);
158 delete pipe;
159 return true;
160 }
161
162 MOS_UNREACHABLE();
163}
164
165PtrResult<pipe_t> pipe_create(size_t bufsize)
166{
167 bufsize = ALIGN_UP_TO_PAGE(bufsize);
168
169 pipe_t *pipe = mos::create<pipe_t>();
170 pipe->magic = PIPE_MAGIC;
171 pipe->buffers = (void *) phyframe_va(mm_get_free_pages(bufsize / MOS_PAGE_SIZE));
172 waitlist_init(list: &pipe->waitlist);
173 ring_buffer_pos_init(pos: &pipe->buffer_pos, capacity: bufsize);
174 return pipe;
175}
176
177size_t PipeIOImpl::on_read(void *buf, size_t size)
178{
179 MOS_ASSERT(io_flags.test(IO_READABLE));
180 pipeio_t *pipeio = container_of(this, pipeio_t, io_r);
181 return pipe_read(p: pipeio->pipe, buf, size);
182}
183
184size_t PipeIOImpl::on_write(const void *buf, size_t size)
185{
186 MOS_ASSERT(io_flags.test(IO_WRITABLE));
187 pipeio_t *pipeio = container_of(this, pipeio_t, io_w);
188 return pipe_write(p: pipeio->pipe, buf, size);
189}
190
191void PipeIOImpl::on_closed()
192{
193 const char *type = "<unknown>";
194 const pipeio_t *const pipeio = statement_expr(const pipeio_t *, {
195 if (io_flags.test(IO_READABLE))
196 retval = container_of(this, pipeio_t, io_r), type = "reader"; // the reader is closing
197 else if (io_flags.test(IO_WRITABLE))
198 retval = container_of(this, pipeio_t, io_w), type = "writer"; // the writer is closing
199 else
200 MOS_UNREACHABLE();
201 });
202
203 if (!pipeio->pipe->other_closed)
204 dInfo2<pipe> << "pipe " << type << " closing";
205 else
206 dInfo2<pipe> << "pipe is already closed by the other end, '" << type << "' closing";
207
208 const bool fully_closed = pipe_close_one_end(pipe: pipeio->pipe);
209 if (fully_closed)
210 delete pipeio;
211}
212
213pipeio_t *pipeio_create(pipe_t *pipe)
214{
215 return mos::create<pipeio_t>(args&: pipe);
216}
217