| 1 | // SPDX-License-Identifier: GPL-3.0-or-later |
| 2 | // abstract pipe implementation |
| 3 | // A pipe is a buffer that only has a single reader and a single writer |
| 4 | |
| 5 | #include "mos/ipc/pipe.hpp" |
| 6 | |
| 7 | #include "mos/platform/platform.hpp" |
| 8 | #include "mos/tasks/schedule.hpp" |
| 9 | #include "mos/tasks/signal.hpp" |
| 10 | #include "mos/tasks/wait.hpp" |
| 11 | |
| 12 | #include <climits> |
| 13 | #include <mos/lib/sync/spinlock.hpp> |
| 14 | #include <mos_stdlib.hpp> |
| 15 | |
| 16 | #define PIPE_MAGIC MOS_FOURCC('P', 'I', 'P', 'E') |
| 17 | |
| 18 | #define advance_buffer(buffer, bytes) ((buffer) = (void *) ((char *) (buffer) + (bytes))) |
| 19 | |
| 20 | size_t pipe_write(pipe_t *p, const void *buf, size_t size) |
| 21 | { |
| 22 | if (p->magic != PIPE_MAGIC) |
| 23 | { |
| 24 | mWarn << "pipe_io_write: invalid magic" ; |
| 25 | return 0; |
| 26 | } |
| 27 | |
| 28 | dInfo2<pipe> << "writing " << size << " bytes" ; |
| 29 | |
| 30 | // write data to buffer |
| 31 | spinlock_acquire(&p->lock); |
| 32 | |
| 33 | if (p->other_closed) |
| 34 | { |
| 35 | dInfo2<pipe> << "pipe closed" ; |
| 36 | signal_send_to_thread(current_thread, SIGPIPE); |
| 37 | spinlock_release(&p->lock); |
| 38 | return -EPIPE; // pipe closed |
| 39 | } |
| 40 | |
| 41 | size_t total_written = 0; |
| 42 | |
| 43 | retry_write:; |
| 44 | const size_t written = ring_buffer_pos_push_back(buffer: (u8 *) p->buffers, pos: &p->buffer_pos, data: (u8 *) buf, size); |
| 45 | advance_buffer(buf, written), size -= written, total_written += written; |
| 46 | |
| 47 | if (size > 0) |
| 48 | { |
| 49 | // buffer is full, wait for the reader to read some data |
| 50 | dInfo2<pipe> << "pipe buffer full, waiting..." ; |
| 51 | spinlock_release(&p->lock); |
| 52 | waitlist_wake(list: &p->waitlist, INT_MAX); // wake up any readers that are waiting for data |
| 53 | MOS_ASSERT(reschedule_for_waitlist(&p->waitlist)); // wait for the reader to read some data |
| 54 | if (signal_has_pending()) |
| 55 | { |
| 56 | dInfo2<pipe> << "signal pending, returning early" ; |
| 57 | return total_written; |
| 58 | } |
| 59 | spinlock_acquire(&p->lock); |
| 60 | |
| 61 | // check if the pipe is still valid |
| 62 | if (p->other_closed) |
| 63 | { |
| 64 | dInfo2<pipe> << "pipe closed" ; |
| 65 | signal_send_to_thread(current_thread, SIGPIPE); |
| 66 | spinlock_release(&p->lock); |
| 67 | return -EPIPE; // pipe closed |
| 68 | } |
| 69 | |
| 70 | goto retry_write; |
| 71 | } |
| 72 | |
| 73 | spinlock_release(&p->lock); |
| 74 | |
| 75 | // wake up any readers that are waiting for data |
| 76 | waitlist_wake(list: &p->waitlist, INT_MAX); |
| 77 | return total_written; |
| 78 | } |
| 79 | |
| 80 | size_t pipe_read(pipe_t *p, void *buf, size_t size) |
| 81 | { |
| 82 | if (p->magic != PIPE_MAGIC) |
| 83 | { |
| 84 | mWarn << "pipe_io_read: invalid magic" ; |
| 85 | return 0; |
| 86 | } |
| 87 | |
| 88 | dInfo2<pipe> << "reading " << size << " bytes" ; |
| 89 | |
| 90 | // read data from buffer |
| 91 | spinlock_acquire(&p->lock); |
| 92 | |
| 93 | size_t total_read = 0; |
| 94 | |
| 95 | retry_read:; |
| 96 | const size_t read = ring_buffer_pos_pop_front(buffer: (u8 *) p->buffers, pos: &p->buffer_pos, buf: (u8 *) buf, size); |
| 97 | advance_buffer(buf, read), size -= read, total_read += read; |
| 98 | |
| 99 | if (size > 0) |
| 100 | { |
| 101 | // check if the pipe is still valid |
| 102 | if (p->other_closed && ring_buffer_pos_is_empty(pos: &p->buffer_pos)) |
| 103 | { |
| 104 | dInfo2<pipe> << "pipe closed" ; |
| 105 | spinlock_release(&p->lock); |
| 106 | waitlist_wake(list: &p->waitlist, INT_MAX); |
| 107 | dInfo2<pipe> << "read " << total_read << " bytes" ; |
| 108 | return total_read; // EOF |
| 109 | } |
| 110 | |
| 111 | // buffer is empty, wait for the writer to write some data |
| 112 | dInfo2<pipe> << "pipe buffer empty, waiting..." ; |
| 113 | spinlock_release(&p->lock); |
| 114 | waitlist_wake(list: &p->waitlist, INT_MAX); // wake up any writers that are waiting for space in the buffer |
| 115 | MOS_ASSERT(reschedule_for_waitlist(&p->waitlist)); // wait for the writer to write some data |
| 116 | if (signal_has_pending()) |
| 117 | { |
| 118 | dInfo2<pipe> << "signal pending, returning early" ; |
| 119 | return total_read; |
| 120 | } |
| 121 | spinlock_acquire(&p->lock); |
| 122 | goto retry_read; |
| 123 | } |
| 124 | |
| 125 | spinlock_release(&p->lock); |
| 126 | |
| 127 | // wake up any writers that are waiting for space in the buffer |
| 128 | waitlist_wake(list: &p->waitlist, INT_MAX); |
| 129 | |
| 130 | dInfo2<pipe> << "read " << total_read << " bytes" ; |
| 131 | return total_read; |
| 132 | } |
| 133 | |
| 134 | bool pipe_close_one_end(pipe_t *pipe) |
| 135 | { |
| 136 | if (pipe->magic != PIPE_MAGIC) |
| 137 | { |
| 138 | mWarn << "pipe_io_close: invalid magic" ; |
| 139 | return false; |
| 140 | } |
| 141 | |
| 142 | spinlock_acquire(&pipe->lock); |
| 143 | if (!pipe->other_closed) |
| 144 | { |
| 145 | pipe->other_closed = true; |
| 146 | spinlock_release(&pipe->lock); |
| 147 | |
| 148 | // wake up any readers/writers that are waiting for data/space in the buffer |
| 149 | waitlist_wake(list: &pipe->waitlist, INT_MAX); |
| 150 | return false; |
| 151 | } |
| 152 | else |
| 153 | { |
| 154 | // the other end of the pipe is already closed, so we can just free the pipe |
| 155 | spinlock_release(&pipe->lock); |
| 156 | |
| 157 | mm_free_pages(va_phyframe(pipe->buffers), pipe->buffer_pos.capacity / MOS_PAGE_SIZE); |
| 158 | delete pipe; |
| 159 | return true; |
| 160 | } |
| 161 | |
| 162 | MOS_UNREACHABLE(); |
| 163 | } |
| 164 | |
| 165 | PtrResult<pipe_t> pipe_create(size_t bufsize) |
| 166 | { |
| 167 | bufsize = ALIGN_UP_TO_PAGE(bufsize); |
| 168 | |
| 169 | pipe_t *pipe = mos::create<pipe_t>(); |
| 170 | pipe->magic = PIPE_MAGIC; |
| 171 | pipe->buffers = (void *) phyframe_va(mm_get_free_pages(bufsize / MOS_PAGE_SIZE)); |
| 172 | waitlist_init(list: &pipe->waitlist); |
| 173 | ring_buffer_pos_init(pos: &pipe->buffer_pos, capacity: bufsize); |
| 174 | return pipe; |
| 175 | } |
| 176 | |
| 177 | size_t PipeIOImpl::on_read(void *buf, size_t size) |
| 178 | { |
| 179 | MOS_ASSERT(io_flags.test(IO_READABLE)); |
| 180 | pipeio_t *pipeio = container_of(this, pipeio_t, io_r); |
| 181 | return pipe_read(p: pipeio->pipe, buf, size); |
| 182 | } |
| 183 | |
| 184 | size_t PipeIOImpl::on_write(const void *buf, size_t size) |
| 185 | { |
| 186 | MOS_ASSERT(io_flags.test(IO_WRITABLE)); |
| 187 | pipeio_t *pipeio = container_of(this, pipeio_t, io_w); |
| 188 | return pipe_write(p: pipeio->pipe, buf, size); |
| 189 | } |
| 190 | |
| 191 | void PipeIOImpl::on_closed() |
| 192 | { |
| 193 | const char *type = "<unknown>" ; |
| 194 | const pipeio_t *const pipeio = statement_expr(const pipeio_t *, { |
| 195 | if (io_flags.test(IO_READABLE)) |
| 196 | retval = container_of(this, pipeio_t, io_r), type = "reader" ; // the reader is closing |
| 197 | else if (io_flags.test(IO_WRITABLE)) |
| 198 | retval = container_of(this, pipeio_t, io_w), type = "writer" ; // the writer is closing |
| 199 | else |
| 200 | MOS_UNREACHABLE(); |
| 201 | }); |
| 202 | |
| 203 | if (!pipeio->pipe->other_closed) |
| 204 | dInfo2<pipe> << "pipe " << type << " closing" ; |
| 205 | else |
| 206 | dInfo2<pipe> << "pipe is already closed by the other end, '" << type << "' closing" ; |
| 207 | |
| 208 | const bool fully_closed = pipe_close_one_end(pipe: pipeio->pipe); |
| 209 | if (fully_closed) |
| 210 | delete pipeio; |
| 211 | } |
| 212 | |
| 213 | pipeio_t *pipeio_create(pipe_t *pipe) |
| 214 | { |
| 215 | return mos::create<pipeio_t>(args&: pipe); |
| 216 | } |
| 217 | |