MOS Source Code
Loading...
Searching...
No Matches
pipe.cpp
Go to the documentation of this file.
1// SPDX-License-Identifier: GPL-3.0-or-later
2// abstract pipe implementation
3// A pipe is a buffer that only has a single reader and a single writer
4
5#include "mos/ipc/pipe.hpp"
6
10#include "mos/tasks/wait.hpp"
11
12#include <climits>
14#include <mos_stdlib.hpp>
15
16#define PIPE_MAGIC MOS_FOURCC('P', 'I', 'P', 'E')
17
18#define advance_buffer(buffer, bytes) ((buffer) = (void *) ((char *) (buffer) + (bytes)))
19
20size_t pipe_write(pipe_t *p, const void *buf, size_t size)
21{
22 if (p->magic != PIPE_MAGIC)
23 {
24 mWarn << "pipe_io_write: invalid magic";
25 return 0;
26 }
27
28 dInfo2<pipe> << "writing " << size << " bytes";
29
30 // write data to buffer
32
33 if (p->other_closed)
34 {
35 dInfo2<pipe> << "pipe closed";
38 return -EPIPE; // pipe closed
39 }
40
41 size_t total_written = 0;
42
43retry_write:;
44 const size_t written = ring_buffer_pos_push_back((u8 *) p->buffers, &p->buffer_pos, (u8 *) buf, size);
45 advance_buffer(buf, written), size -= written, total_written += written;
46
47 if (size > 0)
48 {
49 // buffer is full, wait for the reader to read some data
50 dInfo2<pipe> << "pipe buffer full, waiting...";
52 waitlist_wake(&p->waitlist, INT_MAX); // wake up any readers that are waiting for data
53 MOS_ASSERT(reschedule_for_waitlist(&p->waitlist)); // wait for the reader to read some data
55 {
56 dInfo2<pipe> << "signal pending, returning early";
57 return total_written;
58 }
60
61 // check if the pipe is still valid
62 if (p->other_closed)
63 {
64 dInfo2<pipe> << "pipe closed";
67 return -EPIPE; // pipe closed
68 }
69
70 goto retry_write;
71 }
72
74
75 // wake up any readers that are waiting for data
76 waitlist_wake(&p->waitlist, INT_MAX);
77 return total_written;
78}
79
80size_t pipe_read(pipe_t *p, void *buf, size_t size)
81{
82 if (p->magic != PIPE_MAGIC)
83 {
84 mWarn << "pipe_io_read: invalid magic";
85 return 0;
86 }
87
88 dInfo2<pipe> << "reading " << size << " bytes";
89
90 // read data from buffer
92
93 size_t total_read = 0;
94
95retry_read:;
96 const size_t read = ring_buffer_pos_pop_front((u8 *) p->buffers, &p->buffer_pos, (u8 *) buf, size);
97 advance_buffer(buf, read), size -= read, total_read += read;
98
99 if (size > 0)
100 {
101 // check if the pipe is still valid
103 {
104 dInfo2<pipe> << "pipe closed";
106 waitlist_wake(&p->waitlist, INT_MAX);
107 dInfo2<pipe> << "read " << total_read << " bytes";
108 return total_read; // EOF
109 }
110
111 // buffer is empty, wait for the writer to write some data
112 dInfo2<pipe> << "pipe buffer empty, waiting...";
114 waitlist_wake(&p->waitlist, INT_MAX); // wake up any writers that are waiting for space in the buffer
115 MOS_ASSERT(reschedule_for_waitlist(&p->waitlist)); // wait for the writer to write some data
116 if (signal_has_pending())
117 {
118 dInfo2<pipe> << "signal pending, returning early";
119 return total_read;
120 }
122 goto retry_read;
123 }
124
126
127 // wake up any writers that are waiting for space in the buffer
128 waitlist_wake(&p->waitlist, INT_MAX);
129
130 dInfo2<pipe> << "read " << total_read << " bytes";
131 return total_read;
132}
133
135{
136 if (pipe->magic != PIPE_MAGIC)
137 {
138 mWarn << "pipe_io_close: invalid magic";
139 return false;
140 }
141
142 spinlock_acquire(&pipe->lock);
143 if (!pipe->other_closed)
144 {
145 pipe->other_closed = true;
146 spinlock_release(&pipe->lock);
147
148 // wake up any readers/writers that are waiting for data/space in the buffer
149 waitlist_wake(&pipe->waitlist, INT_MAX);
150 return false;
151 }
152 else
153 {
154 // the other end of the pipe is already closed, so we can just free the pipe
155 spinlock_release(&pipe->lock);
156
157 mm_free_pages(va_phyframe(pipe->buffers), pipe->buffer_pos.capacity / MOS_PAGE_SIZE);
158 delete pipe;
159 return true;
160 }
161
163}
164
166{
167 bufsize = ALIGN_UP_TO_PAGE(bufsize);
168
170 pipe->magic = PIPE_MAGIC;
171 pipe->buffers = (void *) phyframe_va(mm_get_free_pages(bufsize / MOS_PAGE_SIZE));
172 waitlist_init(&pipe->waitlist);
173 ring_buffer_pos_init(&pipe->buffer_pos, bufsize);
174 return pipe;
175}
176
177size_t PipeIOImpl::on_read(void *buf, size_t size)
178{
180 pipeio_t *pipeio = container_of(this, pipeio_t, io_r);
181 return pipe_read(pipeio->pipe, buf, size);
182}
183
184size_t PipeIOImpl::on_write(const void *buf, size_t size)
185{
187 pipeio_t *pipeio = container_of(this, pipeio_t, io_w);
188 return pipe_write(pipeio->pipe, buf, size);
189}
190
192{
193 const char *type = "<unknown>";
194 const pipeio_t *const pipeio = statement_expr(const pipeio_t *, {
195 if (io_flags.test(IO_READABLE))
196 retval = container_of(this, pipeio_t, io_r), type = "reader"; // the reader is closing
197 else if (io_flags.test(IO_WRITABLE))
198 retval = container_of(this, pipeio_t, io_w), type = "writer"; // the writer is closing
199 else
201 });
202
203 if (!pipeio->pipe->other_closed)
204 dInfo2<pipe> << "pipe " << type << " closing";
205 else
206 dInfo2<pipe> << "pipe is already closed by the other end, '" << type << "' closing";
207
208 const bool fully_closed = pipe_close_one_end(pipeio->pipe);
209 if (fully_closed)
210 delete pipeio;
211}
212
#define MOS_ASSERT(cond)
Definition assert.hpp:19
#define MOS_UNREACHABLE()
Definition assert.hpp:10
#define MOS_PAGE_SIZE
Definition autoconf.h:6
bool signal_has_pending(void)
Return true if there's a pending signal.
Definition signal.cpp:307
long signal_send_to_thread(Thread *target, signal_t signal)
Send a signal to a thread.
Definition signal.cpp:88
#define va_phyframe(va)
Definition mm.hpp:85
#define phyframe_va(frame)
Definition mm.hpp:86
#define mm_free_pages(frame, npages)
Definition mm.hpp:94
phyframe_t * mm_get_free_pages(size_t npages)
Definition mm.cpp:47
MOSAPI size_t ring_buffer_pos_pop_front(u8 *buffer, ring_buffer_pos_t *pos, u8 *buf, size_t size)
should_inline bool ring_buffer_pos_is_empty(ring_buffer_pos_t *pos)
MOSAPI void ring_buffer_pos_init(ring_buffer_pos_t *pos, size_t capacity)
MOSAPI size_t ring_buffer_pos_push_back(u8 *buffer, ring_buffer_pos_t *pos, const u8 *data, size_t size)
@ IO_READABLE
Definition io.hpp:30
@ IO_WRITABLE
Definition io.hpp:31
long define_syscall pipe(fd_t *reader, fd_t *writer, u64 flags)
Definition ksyscall.cpp:552
#define statement_expr(type,...)
Definition mos_global.h:102
#define ALIGN_UP_TO_PAGE(addr)
Definition mos_global.h:76
T * create(Args &&...args)
Definition allocator.hpp:12
#define PIPE_MAGIC
Definition pipe.cpp:16
PtrResult< pipe_t > pipe_create(size_t bufsize)
Definition pipe.cpp:165
pipeio_t * pipeio_create(pipe_t *pipe)
Definition pipe.cpp:213
#define advance_buffer(buffer, bytes)
Definition pipe.cpp:18
bool pipe_close_one_end(pipe_t *pipe)
Close one end of the pipe, so that the other end will get EOF.
Definition pipe.cpp:134
size_t pipe_write(pipe_t *p, const void *buf, size_t size)
Definition pipe.cpp:20
size_t pipe_read(pipe_t *p, void *buf, size_t size)
Definition pipe.cpp:80
__nodiscard bool pipe_close_one_end(pipe_t *pipe)
Close one end of the pipe, so that the other end will get EOF.
Definition pipe.cpp:134
size_t pipe_write(pipe_t *pipe, const void *buf, size_t size)
Definition pipe.cpp:20
size_t pipe_read(pipe_t *pipe, void *buf, size_t size)
Definition pipe.cpp:80
#define current_thread
Definition platform.hpp:33
__nodiscard bool reschedule_for_waitlist(waitlist_t *waitlist)
Definition schedule.cpp:171
size_t size
Definition slab.cpp:32
#define spinlock_acquire(lock)
Definition spinlock.hpp:64
#define spinlock_release(lock)
Definition spinlock.hpp:65
const IOFlags io_flags
Definition io.hpp:40
size_t on_write(const void *buf, size_t size) override
Definition pipe.cpp:184
void on_closed() override
Definition pipe.cpp:191
size_t on_read(void *buf, size_t size) override
Definition pipe.cpp:177
bool other_closed
true if the other end of the pipe has been closed
Definition pipe.hpp:16
u32 magic
Definition pipe.hpp:13
ring_buffer_pos_t buffer_pos
Definition pipe.hpp:18
void * buffers
Definition pipe.hpp:17
spinlock_t lock
protects the buffer_pos (and thus the buffer)
Definition pipe.hpp:15
waitlist_t waitlist
for both reader and writer, only one party can wait on the pipe at a time
Definition pipe.hpp:14
pipe_t *const pipe
Definition pipe.hpp:47
constexpr auto dInfo2
Definition syslog.hpp:151
constexpr auto mWarn
Definition syslog.hpp:154
unsigned char u8
Definition types.h:15
#define container_of(ptr, type, member)
Definition types.hpp:33
void waitlist_init(waitlist_t *list)
Definition wait.cpp:15
size_t waitlist_wake(waitlist_t *list, size_t max_wakeups)
Definition wait.cpp:37