MOS Source Code
Loading...
Searching...
No Matches
pipe.cpp
Go to the documentation of this file.
1// SPDX-License-Identifier: GPL-3.0-or-later
2// abstract pipe implementation
3// A pipe is a buffer that only has a single reader and a single writer
4
5#include <limits.h>
6#define pr_fmt(fmt) "pipe: " fmt
7
8#include "mos/ipc/pipe.hpp"
10#include "mos/syslog/printk.hpp"
12#include "mos/tasks/signal.hpp"
13#include "mos/tasks/wait.hpp"
14
16#include <mos_stdlib.hpp>
17
18#define PIPE_MAGIC MOS_FOURCC('P', 'I', 'P', 'E')
19
20#define advance_buffer(buffer, bytes) ((buffer) = (void *) ((char *) (buffer) + (bytes)))
21
22size_t pipe_write(pipe_t *pipe, const void *buf, size_t size)
23{
24 if (pipe->magic != PIPE_MAGIC)
25 {
26 pr_warn("pipe_io_write: invalid magic");
27 return 0;
28 }
29
30 pr_dinfo2(pipe, "writing %zu bytes", size);
31
32 // write data to buffer
33 spinlock_acquire(&pipe->lock);
34
35 if (pipe->other_closed)
36 {
37 pr_dinfo2(pipe, "pipe closed");
39 spinlock_release(&pipe->lock);
40 return -EPIPE; // pipe closed
41 }
42
43 size_t total_written = 0;
44
45retry_write:;
46 const size_t written = ring_buffer_pos_push_back((u8 *) pipe->buffers, &pipe->buffer_pos, (u8 *) buf, size);
47 advance_buffer(buf, written), size -= written, total_written += written;
48
49 if (size > 0)
50 {
51 // buffer is full, wait for the reader to read some data
52 pr_dinfo2(pipe, "pipe buffer full, waiting...");
53 spinlock_release(&pipe->lock);
54 waitlist_wake(&pipe->waitlist, INT_MAX); // wake up any readers that are waiting for data
55 MOS_ASSERT(reschedule_for_waitlist(&pipe->waitlist)); // wait for the reader to read some data
57 {
58 pr_dinfo2(pipe, "signal pending, returning early");
59 return total_written;
60 }
61 spinlock_acquire(&pipe->lock);
62
63 // check if the pipe is still valid
64 if (pipe->other_closed)
65 {
66 pr_dinfo2(pipe, "pipe closed");
68 spinlock_release(&pipe->lock);
69 return -EPIPE; // pipe closed
70 }
71
72 goto retry_write;
73 }
74
75 spinlock_release(&pipe->lock);
76
77 // wake up any readers that are waiting for data
78 waitlist_wake(&pipe->waitlist, INT_MAX);
79 return total_written;
80}
81
82size_t pipe_read(pipe_t *pipe, void *buf, size_t size)
83{
84 if (pipe->magic != PIPE_MAGIC)
85 {
86 pr_warn("pipe_io_read: invalid magic");
87 return 0;
88 }
89
90 pr_dinfo2(pipe, "reading %zu bytes", size);
91
92 // read data from buffer
93 spinlock_acquire(&pipe->lock);
94
95 size_t total_read = 0;
96
97retry_read:;
98 const size_t read = ring_buffer_pos_pop_front((u8 *) pipe->buffers, &pipe->buffer_pos, (u8 *) buf, size);
99 advance_buffer(buf, read), size -= read, total_read += read;
100
101 if (size > 0)
102 {
103 // check if the pipe is still valid
104 if (pipe->other_closed && ring_buffer_pos_is_empty(&pipe->buffer_pos))
105 {
106 pr_dinfo2(pipe, "pipe closed");
107 spinlock_release(&pipe->lock);
108 waitlist_wake(&pipe->waitlist, INT_MAX);
109 pr_dinfo2(pipe, "read %zu bytes", total_read);
110 return total_read; // EOF
111 }
112
113 // buffer is empty, wait for the writer to write some data
114 pr_dinfo2(pipe, "pipe buffer empty, waiting...");
115 spinlock_release(&pipe->lock);
116 waitlist_wake(&pipe->waitlist, INT_MAX); // wake up any writers that are waiting for space in the buffer
117 MOS_ASSERT(reschedule_for_waitlist(&pipe->waitlist)); // wait for the writer to write some data
118 if (signal_has_pending())
119 {
120 pr_dinfo2(pipe, "signal pending, returning early");
121 return total_read;
122 }
123 spinlock_acquire(&pipe->lock);
124 goto retry_read;
125 }
126
127 spinlock_release(&pipe->lock);
128
129 // wake up any writers that are waiting for space in the buffer
130 waitlist_wake(&pipe->waitlist, INT_MAX);
131
132 pr_dinfo2(pipe, "read %zu bytes", total_read);
133 return total_read;
134}
135
137{
138 if (pipe->magic != PIPE_MAGIC)
139 {
140 pr_warn("pipe_io_close: invalid magic");
141 return false;
142 }
143
144 spinlock_acquire(&pipe->lock);
145 if (!pipe->other_closed)
146 {
147 pipe->other_closed = true;
148 spinlock_release(&pipe->lock);
149
150 // wake up any readers/writers that are waiting for data/space in the buffer
151 waitlist_wake(&pipe->waitlist, INT_MAX);
152 return false;
153 }
154 else
155 {
156 // the other end of the pipe is already closed, so we can just free the pipe
157 spinlock_release(&pipe->lock);
158
159 mm_free_pages(va_phyframe(pipe->buffers), pipe->buffer_pos.capacity / MOS_PAGE_SIZE);
160 delete pipe;
161 return true;
162 }
163
165}
166
168{
169 bufsize = ALIGN_UP_TO_PAGE(bufsize);
170
172 pipe->magic = PIPE_MAGIC;
173 pipe->buffers = (void *) phyframe_va(mm_get_free_pages(bufsize / MOS_PAGE_SIZE));
174 waitlist_init(&pipe->waitlist);
175 ring_buffer_pos_init(&pipe->buffer_pos, bufsize);
176 return pipe;
177}
178
179static size_t pipeio_io_read(io_t *io, void *buf, size_t size)
180{
182 pipeio_t *pipeio = container_of(io, pipeio_t, io_r);
183 return pipe_read(pipeio->pipe, buf, size);
184}
185
186static size_t pipeio_io_write(io_t *io, const void *buf, size_t size)
187{
189 pipeio_t *pipeio = container_of(io, pipeio_t, io_w);
190 return pipe_write(pipeio->pipe, buf, size);
191}
192
193static void pipeio_io_close(io_t *io)
194{
195 const char *type = "<unknown>";
196 const pipeio_t *const pipeio = statement_expr(const pipeio_t *, {
197 if (io->flags & IO_READABLE)
198 retval = container_of(io, pipeio_t, io_r), type = "reader"; // the reader is closing
199 else if (io->flags & IO_WRITABLE)
200 retval = container_of(io, pipeio_t, io_w), type = "writer"; // the writer is closing
201 else
203 });
204
205 if (!pipeio->pipe->other_closed)
206 pr_dinfo2(pipe, "pipe %s closing", type);
207 else
208 pr_dinfo2(pipe, "pipe is already closed by the other end, '%s' closing", type);
209
210 const bool fully_closed = pipe_close_one_end(pipeio->pipe);
211 if (fully_closed)
212 delete pipeio;
213}
214
215static const io_op_t pipe_io_ops = {
216 .read = pipeio_io_read,
217 .write = pipeio_io_write,
218 .close = pipeio_io_close,
219};
220
222{
224 pipeio->pipe = pipe;
227 return pipeio;
228}
#define MOS_ASSERT(cond)
Definition assert.hpp:14
#define MOS_UNREACHABLE()
Definition assert.hpp:11
#define MOS_PAGE_SIZE
Definition autoconf.h:6
bool signal_has_pending(void)
Return true if there's a pending signal.
Definition signal.cpp:296
long signal_send_to_thread(Thread *target, signal_t signal)
Send a signal to a thread.
Definition signal.cpp:87
#define va_phyframe(va)
Definition mm.hpp:79
#define phyframe_va(frame)
Definition mm.hpp:80
#define mm_free_pages(frame, npages)
Definition mm.hpp:88
phyframe_t * mm_get_free_pages(size_t npages)
Definition mm.cpp:48
MOSAPI size_t ring_buffer_pos_pop_front(u8 *buffer, ring_buffer_pos_t *pos, u8 *buf, size_t size)
should_inline bool ring_buffer_pos_is_empty(ring_buffer_pos_t *pos)
MOSAPI void ring_buffer_pos_init(ring_buffer_pos_t *pos, size_t capacity)
MOSAPI size_t ring_buffer_pos_push_back(u8 *buffer, ring_buffer_pos_t *pos, const u8 *data, size_t size)
void io_init(io_t *io, io_type_t type, io_flags_t flags, const io_op_t *ops)
Definition io.cpp:45
@ IO_READABLE
Definition io.hpp:27
@ IO_WRITABLE
Definition io.hpp:28
@ IO_PIPE
Definition io.hpp:20
long define_syscall pipe(fd_t *reader, fd_t *writer, fd_flags_t flags)
Definition ksyscall.cpp:534
#define statement_expr(type,...)
Definition mos_global.h:102
#define ALIGN_UP_TO_PAGE(addr)
Definition mos_global.h:76
T * create(Args &&...args)
Definition allocator.hpp:10
#define PIPE_MAGIC
Definition pipe.cpp:18
PtrResult< pipe_t > pipe_create(size_t bufsize)
Definition pipe.cpp:167
pipeio_t * pipeio_create(pipe_t *pipe)
Definition pipe.cpp:221
size_t pipe_write(pipe_t *pipe, const void *buf, size_t size)
Definition pipe.cpp:22
#define advance_buffer(buffer, bytes)
Definition pipe.cpp:20
static void pipeio_io_close(io_t *io)
Definition pipe.cpp:193
bool pipe_close_one_end(pipe_t *pipe)
Close one end of the pipe, so that the other end will get EOF.
Definition pipe.cpp:136
size_t pipe_read(pipe_t *pipe, void *buf, size_t size)
Definition pipe.cpp:82
static size_t pipeio_io_write(io_t *io, const void *buf, size_t size)
Definition pipe.cpp:186
static size_t pipeio_io_read(io_t *io, void *buf, size_t size)
Definition pipe.cpp:179
static const io_op_t pipe_io_ops
Definition pipe.cpp:215
#define current_thread
Definition platform.hpp:32
#define pr_warn(fmt,...)
Definition printk.hpp:38
#define pr_dinfo2(feat, fmt,...)
Definition printk.hpp:27
__nodiscard bool reschedule_for_waitlist(waitlist_t *waitlist)
Definition schedule.cpp:172
size_t size
Definition slab.cpp:34
#define spinlock_acquire(lock)
Definition spinlock.hpp:64
#define spinlock_release(lock)
Definition spinlock.hpp:65
Definition io.hpp:37
Definition io.hpp:48
io_flags_t flags
Definition io.hpp:51
bool other_closed
true if the other end of the pipe has been closed
Definition pipe.hpp:16
pipe_t * pipe
Definition pipe.hpp:37
io_t io_w
Definition pipe.hpp:36
io_t io_r
Definition pipe.hpp:36
unsigned char u8
Definition types.h:15
#define container_of(ptr, type, member)
Definition types.hpp:31
void waitlist_init(waitlist_t *list)
Definition wait.cpp:15
size_t waitlist_wake(waitlist_t *list, size_t max_wakeups)
Definition wait.cpp:37