MOS Source Code
Loading...
Searching...
No Matches
pipe.c
Go to the documentation of this file.
1// SPDX-License-Identifier: GPL-3.0-or-later
2// abstract pipe implementation
3// A pipe is a buffer that only has a single reader and a single writer
4
5#include <limits.h>
6#define pr_fmt(fmt) "pipe: " fmt
7
8#include "mos/ipc/pipe.h"
11#include "mos/syslog/printk.h"
12#include "mos/tasks/schedule.h"
13#include "mos/tasks/signal.h"
14#include "mos/tasks/wait.h"
15
17#include <mos_stdlib.h>
18
19#define PIPE_MAGIC MOS_FOURCC('P', 'I', 'P', 'E')
20
23
26
27#define advance_buffer(buffer, bytes) ((buffer) = (void *) ((char *) (buffer) + (bytes)))
28
29size_t pipe_write(pipe_t *pipe, const void *buf, size_t size)
30{
31 if (pipe->magic != PIPE_MAGIC)
32 {
33 pr_warn("pipe_io_write: invalid magic");
34 return 0;
35 }
36
37 pr_dinfo2(pipe, "writing %zu bytes", size);
38
39 // write data to buffer
40 spinlock_acquire(&pipe->lock);
41
42 if (pipe->other_closed)
43 {
44 pr_dinfo2(pipe, "pipe closed");
46 spinlock_release(&pipe->lock);
47 return -EPIPE; // pipe closed
48 }
49
50 size_t total_written = 0;
51
52retry_write:;
53 const size_t written = ring_buffer_pos_push_back(pipe->buffers, &pipe->buffer_pos, buf, size);
54 advance_buffer(buf, written), size -= written, total_written += written;
55
56 if (size > 0)
57 {
58 // buffer is full, wait for the reader to read some data
59 pr_dinfo2(pipe, "pipe buffer full, waiting...");
60 spinlock_release(&pipe->lock);
61 waitlist_wake(&pipe->waitlist, INT_MAX); // wake up any readers that are waiting for data
62 MOS_ASSERT(reschedule_for_waitlist(&pipe->waitlist)); // wait for the reader to read some data
64 {
65 pr_dinfo2(pipe, "signal pending, returning early");
66 return total_written;
67 }
68 spinlock_acquire(&pipe->lock);
69
70 // check if the pipe is still valid
71 if (pipe->other_closed)
72 {
73 pr_dinfo2(pipe, "pipe closed");
75 spinlock_release(&pipe->lock);
76 return -EPIPE; // pipe closed
77 }
78
79 goto retry_write;
80 }
81
82 spinlock_release(&pipe->lock);
83
84 // wake up any readers that are waiting for data
85 waitlist_wake(&pipe->waitlist, INT_MAX);
86 return total_written;
87}
88
89size_t pipe_read(pipe_t *pipe, void *buf, size_t size)
90{
91 if (pipe->magic != PIPE_MAGIC)
92 {
93 pr_warn("pipe_io_read: invalid magic");
94 return 0;
95 }
96
97 pr_dinfo2(pipe, "reading %zu bytes", size);
98
99 // read data from buffer
100 spinlock_acquire(&pipe->lock);
101
102 size_t total_read = 0;
103
104retry_read:;
105 const size_t read = ring_buffer_pos_pop_front(pipe->buffers, &pipe->buffer_pos, buf, size);
106 advance_buffer(buf, read), size -= read, total_read += read;
107
108 if (size > 0)
109 {
110 // check if the pipe is still valid
111 if (pipe->other_closed && ring_buffer_pos_is_empty(&pipe->buffer_pos))
112 {
113 pr_dinfo2(pipe, "pipe closed");
114 spinlock_release(&pipe->lock);
115 waitlist_wake(&pipe->waitlist, INT_MAX);
116 pr_dinfo2(pipe, "read %zu bytes", total_read);
117 return total_read; // EOF
118 }
119
120 // buffer is empty, wait for the writer to write some data
121 pr_dinfo2(pipe, "pipe buffer empty, waiting...");
122 spinlock_release(&pipe->lock);
123 waitlist_wake(&pipe->waitlist, INT_MAX); // wake up any writers that are waiting for space in the buffer
124 MOS_ASSERT(reschedule_for_waitlist(&pipe->waitlist)); // wait for the writer to write some data
125 if (signal_has_pending())
126 {
127 pr_dinfo2(pipe, "signal pending, returning early");
128 return total_read;
129 }
130 spinlock_acquire(&pipe->lock);
131 goto retry_read;
132 }
133
134 spinlock_release(&pipe->lock);
135
136 // wake up any writers that are waiting for space in the buffer
137 waitlist_wake(&pipe->waitlist, INT_MAX);
138
139 pr_dinfo2(pipe, "read %zu bytes", total_read);
140 return total_read;
141}
142
144{
145 if (pipe->magic != PIPE_MAGIC)
146 {
147 pr_warn("pipe_io_close: invalid magic");
148 return false;
149 }
150
151 spinlock_acquire(&pipe->lock);
152 if (!pipe->other_closed)
153 {
154 pipe->other_closed = true;
155 spinlock_release(&pipe->lock);
156
157 // wake up any readers/writers that are waiting for data/space in the buffer
158 waitlist_wake(&pipe->waitlist, INT_MAX);
159 return false;
160 }
161 else
162 {
163 // the other end of the pipe is already closed, so we can just free the pipe
164 spinlock_release(&pipe->lock);
165
166 mm_free_pages(va_phyframe(pipe->buffers), pipe->buffer_pos.capacity / MOS_PAGE_SIZE);
167 kfree(pipe);
168 return true;
169 }
170
172}
173
174pipe_t *pipe_create(size_t bufsize)
175{
176 bufsize = ALIGN_UP_TO_PAGE(bufsize);
177
178 pipe_t *pipe = kmalloc(pipe_slab);
179 pipe->magic = PIPE_MAGIC;
180 pipe->buffers = (void *) phyframe_va(mm_get_free_pages(bufsize / MOS_PAGE_SIZE));
181 waitlist_init(&pipe->waitlist);
182 ring_buffer_pos_init(&pipe->buffer_pos, bufsize);
183 return pipe;
184}
185
186static size_t pipeio_io_read(io_t *io, void *buf, size_t size)
187{
189 pipeio_t *pipeio = container_of(io, pipeio_t, io_r);
190 return pipe_read(pipeio->pipe, buf, size);
191}
192
193static size_t pipeio_io_write(io_t *io, const void *buf, size_t size)
194{
196 pipeio_t *pipeio = container_of(io, pipeio_t, io_w);
197 return pipe_write(pipeio->pipe, buf, size);
198}
199
200static void pipeio_io_close(io_t *io)
201{
202 const char *type = "<unknown>";
203 const pipeio_t *const pipeio = statement_expr(const pipeio_t *, {
204 if (io->flags & IO_READABLE)
205 retval = container_of(io, pipeio_t, io_r), type = "reader"; // the reader is closing
206 else if (io->flags & IO_WRITABLE)
207 retval = container_of(io, pipeio_t, io_w), type = "writer"; // the writer is closing
208 else
210 });
211
212 if (!pipeio->pipe->other_closed)
213 pr_dinfo2(pipe, "pipe %s closing", type);
214 else
215 pr_dinfo2(pipe, "pipe is already closed by the other end, '%s' closing", type);
216
217 const bool fully_closed = pipe_close_one_end(pipeio->pipe);
218 if (fully_closed)
219 kfree(pipeio);
220}
221
222static const io_op_t pipe_io_ops = {
223 .write = pipeio_io_write,
224 .read = pipeio_io_read,
225 .close = pipeio_io_close,
226};
227
229{
230 pipeio_t *pipeio = kmalloc(pipeio_slab);
231 pipeio->pipe = pipe;
234 return pipeio;
235}
#define MOS_ASSERT(cond)
Definition assert.h:14
#define MOS_UNREACHABLE()
Definition assert.h:11
#define MOS_PAGE_SIZE
Definition autoconf.h:6
long signal_send_to_thread(thread_t *target, signal_t signal)
Send a signal to a thread.
Definition signal.c:92
bool signal_has_pending(void)
Return true if there's a pending signal.
Definition signal.c:301
#define va_phyframe(va)
Definition mm.h:78
#define phyframe_va(frame)
Definition mm.h:79
#define mm_free_pages(frame, npages)
Definition mm.h:89
phyframe_t * mm_get_free_pages(size_t npages)
Definition mm.c:55
MOSAPI size_t ring_buffer_pos_pop_front(u8 *buffer, ring_buffer_pos_t *pos, u8 *buf, size_t size)
should_inline bool ring_buffer_pos_is_empty(ring_buffer_pos_t *pos)
Definition ring_buffer.h:60
MOSAPI void ring_buffer_pos_init(ring_buffer_pos_t *pos, size_t capacity)
Definition ring_buffer.c:44
MOSAPI size_t ring_buffer_pos_push_back(u8 *buffer, ring_buffer_pos_t *pos, const u8 *data, size_t size)
Definition ring_buffer.c:80
void io_init(io_t *io, io_type_t type, io_flags_t flags, const io_op_t *ops)
Definition io.c:44
@ IO_READABLE
Definition io.h:27
@ IO_WRITABLE
Definition io.h:28
@ IO_PIPE
Definition io.h:20
long define_syscall pipe(fd_t *reader, fd_t *writer, fd_flags_t flags)
Definition ksyscall.c:528
#define statement_expr(type,...)
Definition mos_global.h:92
#define ALIGN_UP_TO_PAGE(addr)
Definition mos_global.h:75
#define container_of(ptr, type, member)
Definition mos_global.h:50
#define NULL
Definition pb_syshdr.h:46
static slab_t * pipeio_slab
Definition pipe.c:24
#define PIPE_MAGIC
Definition pipe.c:19
pipeio_t * pipeio_create(pipe_t *pipe)
Definition pipe.c:228
size_t pipe_write(pipe_t *pipe, const void *buf, size_t size)
Definition pipe.c:29
#define advance_buffer(buffer, bytes)
Definition pipe.c:27
static void pipeio_io_close(io_t *io)
Definition pipe.c:200
bool pipe_close_one_end(pipe_t *pipe)
Close one end of the pipe, so that the other end will get EOF.
Definition pipe.c:143
size_t pipe_read(pipe_t *pipe, void *buf, size_t size)
Definition pipe.c:89
static slab_t * pipe_slab
Definition pipe.c:21
static size_t pipeio_io_write(io_t *io, const void *buf, size_t size)
Definition pipe.c:193
static size_t pipeio_io_read(io_t *io, void *buf, size_t size)
Definition pipe.c:186
static const io_op_t pipe_io_ops
Definition pipe.c:222
pipe_t * pipe_create(size_t bufsize)
Definition pipe.c:174
#define current_thread
Definition platform.h:30
#define pr_warn(fmt,...)
Definition printk.h:38
#define pr_dinfo2(feat, fmt,...)
Definition printk.h:27
__nodiscard bool reschedule_for_waitlist(waitlist_t *waitlist)
Definition schedule.c:174
size_t size
Definition slab.c:30
#define SLAB_AUTOINIT(name, var, type)
#define spinlock_acquire(lock)
Definition spinlock.h:61
#define spinlock_release(lock)
Definition spinlock.h:62
Definition io.h:35
Definition io.h:46
io_flags_t flags
Definition io.h:49
Definition pipe.h:11
bool other_closed
true if the other end of the pipe has been closed
Definition pipe.h:15
pipe_t * pipe
Definition pipe.h:36
io_t io_w
Definition pipe.h:35
io_t io_r
Definition pipe.h:35
Definition slab.h:45
size_t waitlist_wake(waitlist_t *list, size_t max_wakeups)
Definition wait.c:45
__BEGIN_DECLS void waitlist_init(waitlist_t *list)
Definition wait.c:23