MOS Source Code
Loading...
Searching...
No Matches
rpc_client.cpp
Go to the documentation of this file.
1// SPDX-License-Identifier: GPL-3.0-or-later
2
3#include "librpc/rpc_client.h"
4
5#include "librpc/internal.h"
6#include "librpc/rpc.h"
7
8#include <atomic>
9#include <libipc/ipc.h>
10#include <mos/types.h>
11#include <pb_decode.h>
12#include <pb_encode.h>
13#include <stdarg.h>
14
15#if defined(__MOS_KERNEL__)
17#include <mos_stdio.hpp>
18#include <mos_stdlib.hpp>
19#include <mos_string.hpp>
20#else
21#include <stdio.h>
22#include <stdlib.h>
23#include <string.h>
24#endif
25
26#ifdef __MOS_KERNEL__
27#include "mos/assert.hpp"
28#include "mos/ipc/ipc_io.hpp"
29
31#include <mos/syscall/decl.h>
32#define syscall_ipc_connect(n, s) ipc_connect(n, s).get()
33#define syscall_io_close(fd) io_unref(fd)
34#else
36#endif
37
38#if !defined(__MOS_KERNEL__)
39// fixup for hosted libc
40#include <pthread.h>
41typedef pthread_mutex_t mutex_t;
42#define mutex_acquire(mutex) pthread_mutex_lock(mutex)
43#define mutex_release(mutex) pthread_mutex_unlock(mutex)
44#define mos_warn(...) fprintf(stderr, __VA_ARGS__)
45#define MOS_LIB_UNREACHABLE() __builtin_unreachable()
46#endif
47
48#define RPC_CLIENT_SMH_SIZE MOS_PAGE_SIZE
49
50typedef struct rpc_server_stub
51{
52 const char *server_name;
54 mutex_t mutex; // only one call at a time
55 std::atomic_size_t callid;
57
65
66rpc_server_stub_t *rpc_client_create(const char *server_name)
67{
68 rpc_server_stub_t *client = (rpc_server_stub_t *) calloc(1, sizeof(rpc_server_stub_t));
69 client->server_name = server_name;
70 client->fd = syscall_ipc_connect(server_name, RPC_CLIENT_SMH_SIZE);
71
72 if (IS_ERR_VALUE(client->fd))
73 {
74 free(client);
75 return NULL;
76 }
77
78 return client;
79}
80
82{
83 mutex_acquire(&server->mutex);
84 syscall_io_close(server->fd);
85 free(server);
86}
87
89{
90 rpc_call_t *call = (rpc_call_t *) calloc(1, sizeof(rpc_call_t));
91 call->request = (rpc_request_t *) calloc(1, sizeof(rpc_request_t));
93 call->request->function_id = function_id;
94 call->size = sizeof(rpc_request_t);
95 call->server = server;
96
97 return call;
98}
99
101{
102 mutex_acquire(&call->mutex);
103 free(call->request);
104 free(call);
105}
106
107void rpc_call_arg(rpc_call_t *call, rpc_argtype_t argtype, const void *data, size_t size)
108{
109 mutex_acquire(&call->mutex);
110 call->request = (rpc_request_t *) realloc(call->request, call->size + sizeof(rpc_arg_t) + size);
111 call->request->args_count += 1;
112
113 rpc_arg_t *arg = (rpc_arg_t *) &call->request->args_array[call->size - sizeof(rpc_request_t)];
114 arg->size = size;
115 arg->argtype = argtype;
116 arg->magic = RPC_ARG_MAGIC;
117 memcpy(arg->data, data, size);
118
119 call->size += sizeof(rpc_arg_t) + size;
120 mutex_release(&call->mutex);
121}
122
123#define RPC_CALL_ARG_IMPL(type, TYPE) \
124 void rpc_call_arg_##type(rpc_call_t *call, type arg) \
125 { \
126 rpc_call_arg(call, RPC_ARGTYPE_##TYPE, &arg, sizeof(arg)); \
127 }
128
135
136void rpc_call_arg_string(rpc_call_t *call, const char *arg)
137{
138 rpc_call_arg(call, RPC_ARGTYPE_STRING, arg, strlen(arg) + 1); // also send the null terminator
139}
140
141rpc_result_code_t rpc_call_exec(rpc_call_t *call, void **result_data, size_t *data_size)
142{
143 if (result_data && data_size)
144 {
145 *data_size = 0;
146 *result_data = NULL;
147 }
148
149 mutex_acquire(&call->mutex);
150 mutex_acquire(&call->server->mutex);
151 call->request->call_id = ++call->server->callid;
152
153 bool written = ipc_write_as_msg(call->server->fd, (char *) call->request, call->size);
154 if (!written)
155 {
156 mutex_release(&call->server->mutex);
157 mutex_release(&call->mutex);
159 }
160
161 ipc_msg_t *msg = ipc_read_msg(call->server->fd);
162 if (!msg)
163 {
164 mutex_release(&call->server->mutex);
165 mutex_release(&call->mutex);
167 }
168
169 if (msg->size < sizeof(rpc_response_t))
170 {
171 mutex_release(&call->server->mutex);
172 mutex_release(&call->mutex);
174 }
175
176 rpc_response_t *response = (rpc_response_t *) msg->data;
177 if (response->magic != RPC_RESPONSE_MAGIC)
178 {
179 mutex_release(&call->server->mutex);
180 mutex_release(&call->mutex);
182 }
183
184 if (response->call_id != call->request->call_id)
185 {
186 mutex_release(&call->server->mutex);
187 mutex_release(&call->mutex);
189 }
190
191 if (response->result_code != RPC_RESULT_OK)
192 {
193 mutex_release(&call->server->mutex);
194 mutex_release(&call->mutex);
195 return response->result_code;
196 }
197
198 if (msg->size < sizeof(rpc_response_t))
199 {
200 mutex_release(&call->server->mutex);
201 mutex_release(&call->mutex);
203 }
204
205 if (result_data && data_size && response->data_size)
206 {
207 *data_size = response->data_size;
208 *result_data = malloc(response->data_size);
209 memcpy(*result_data, response->data, response->data_size);
210 }
211
212 rpc_result_code_t result = response->result_code;
213 ipc_msg_destroy(msg);
214 mutex_release(&call->server->mutex);
215 mutex_release(&call->mutex);
216 return result;
217}
218
219rpc_result_code_t rpc_simple_call(rpc_server_stub_t *stub, u32 funcid, rpc_result_t *result, const char *argspec, ...)
220{
221 va_list args;
222 va_start(args, argspec);
223 rpc_result_code_t code = rpc_simple_callv(stub, funcid, result, argspec, args);
224 va_end(args);
225 return code;
226}
227
228rpc_result_code_t rpc_simple_callv(rpc_server_stub_t *stub, u32 funcid, rpc_result_t *result, const char *argspec, va_list args)
229{
230 if (unlikely(!argspec))
231 {
232 mos_warn("argspec is NULL");
234 }
235
236 rpc_call_t *call = rpc_call_create(stub, funcid);
237
238 if (*argspec == 'v')
239 {
240 if (*++argspec != '\0')
241 {
242 mos_warn("argspec is not empty after 'v' (void) (argspec='%s')", argspec);
243 rpc_call_destroy(call);
245 }
246 goto exec;
247 }
248
249 for (const char *c = argspec; *c != '\0'; c++)
250 {
251 switch (*c)
252 {
253 case 'c':
254 {
255 u8 arg = va_arg(args, int);
256 rpc_call_arg(call, RPC_ARGTYPE_INT8, &arg, sizeof(arg));
257 break;
258 }
259 case 'i':
260 {
261 u32 arg = va_arg(args, int);
262 rpc_call_arg(call, RPC_ARGTYPE_INT32, &arg, sizeof(arg));
263 break;
264 }
265 case 'l':
266 {
267 u64 arg = va_arg(args, long long);
268 rpc_call_arg(call, RPC_ARGTYPE_INT64, &arg, sizeof(arg));
269 break;
270 }
271 case 'f':
272 {
273 MOS_LIB_UNREACHABLE(); // TODO: implement
274 // double arg = va_arg(args, double);
275 // rpc_call_arg(call, &arg, sizeof(arg));
276 break;
277 }
278 case 's':
279 {
280 const char *arg = va_arg(args, const char *);
281 rpc_call_arg(call, RPC_ARGTYPE_STRING, arg, strlen(arg) + 1); // also send the null terminator
282 break;
283 }
284 default: mos_warn("rpc_call: invalid argspec '%c'", *c); return RPC_RESULT_CLIENT_INVALID_ARGSPEC;
285 }
286 }
287
288exec:
289 rpc_call_exec(call, &result->data, &result->size);
290 rpc_call_destroy(call);
291
292 return RPC_RESULT_OK;
293}
294
295rpc_result_code_t rpc_do_pb_call(rpc_server_stub_t *stub, u32 funcid, const pb_msgdesc_t *reqm, const void *req, const pb_msgdesc_t *respm, void *resp)
296{
297 size_t bufsize;
298 pb_get_encoded_size(&bufsize, reqm, req);
299 pb_byte_t buf[bufsize];
300 pb_ostream_t wstream = pb_ostream_from_buffer(buf, bufsize);
301 if (!pb_encode(&wstream, reqm, req))
303
304 rpc_call_t *call = rpc_call_create(stub, funcid);
305 rpc_call_arg(call, RPC_ARGTYPE_BUFFER, buf, wstream.bytes_written);
306
307 void *result = NULL;
308 size_t result_size = 0;
309 rpc_result_code_t result_code = rpc_call_exec(call, &result, &result_size);
310 rpc_call_destroy(call);
311
312 if (!respm || !resp)
313 return result_code; // no response expected
314
315 if (result_code != RPC_RESULT_OK)
316 return result_code;
317
318 pb_istream_t stream = pb_istream_from_buffer((const pb_byte_t *) result, result_size);
319 if (!pb_decode(&stream, respm, resp))
321
322 free(result);
323 return RPC_RESULT_OK;
324}
char args[3][16]
Definition avr_io.c:16
#define RPC_ARG_MAGIC
Definition internal.h:10
#define RPC_RESPONSE_MAGIC
Definition internal.h:9
#define RPC_REQUEST_MAGIC
Definition internal.h:8
MOSAPI void ipc_msg_destroy(ipc_msg_t *buffer)
Destroy an IPC message.
Definition libipc.cpp:34
MOSAPI ipc_msg_t * ipc_read_msg(ipcfd_t fd)
Read an IPC message.
Definition libipc.cpp:39
fd_t ipcfd_t
Definition ipc.h:11
MOSAPI bool ipc_write_as_msg(ipcfd_t fd, const char *data, size_t size)
Definition libipc.cpp:87
#define IS_ERR_VALUE(x)
Definition mos_global.h:137
#define unlikely(x)
Definition mos_global.h:40
futex_word_t mutex_t
Definition mutex.hpp:8
uint_least8_t pb_byte_t
Definition pb.h:247
bool pb_decode(pb_istream_t *stream, const pb_msgdesc_t *fields, void *dest_struct)
Definition pb_decode.c:1210
pb_istream_t pb_istream_from_buffer(const pb_byte_t *buf, size_t msglen)
Definition pb_decode.c:142
bool pb_get_encoded_size(size_t *size, const pb_msgdesc_t *fields, const void *src_struct)
Definition pb_encode.c:557
pb_ostream_t pb_ostream_from_buffer(pb_byte_t *buf, size_t bufsize)
Definition pb_encode.c:63
bool pb_encode(pb_ostream_t *stream, const pb_msgdesc_t *fields, const void *src_struct)
Definition pb_encode.c:512
static void * memcpy(void *s1, const void *s2, size_t n)
Definition pb_syshdr.h:90
#define NULL
Definition pb_syshdr.h:46
static size_t strlen(const char *s)
Definition pb_syshdr.h:80
rpc_argtype_t
Definition rpc.h:9
@ RPC_ARGTYPE_INT8
Definition rpc.h:12
@ RPC_ARGTYPE_BUFFER
Definition rpc.h:21
@ RPC_ARGTYPE_STRING
Definition rpc.h:20
@ RPC_ARGTYPE_INT64
Definition rpc.h:15
@ RPC_ARGTYPE_INT32
Definition rpc.h:14
rpc_result_code_t
Definition rpc.h:25
@ RPC_RESULT_CLIENT_INVALID_ARGSPEC
Definition rpc.h:31
@ RPC_RESULT_CALLID_MISMATCH
Definition rpc.h:34
@ RPC_RESULT_OK
Definition rpc.h:26
@ RPC_RESULT_CLIENT_WRITE_FAILED
Definition rpc.h:32
@ RPC_RESULT_CLIENT_READ_FAILED
Definition rpc.h:33
rpc_result_code_t rpc_do_pb_call(rpc_server_stub_t *stub, u32 funcid, const pb_msgdesc_t *reqm, const void *req, const pb_msgdesc_t *respm, void *resp)
Call a function on the server using protobuf (nanopb)
#define RPC_CALL_ARG_IMPL(type, TYPE)
#define RPC_CLIENT_SMH_SIZE
#define mos_warn(...)
void rpc_call_arg(rpc_call_t *call, rpc_argtype_t argtype, const void *data, size_t size)
Add an argument to a call.
#define MOS_LIB_UNREACHABLE()
rpc_result_code_t rpc_call_exec(rpc_call_t *call, void **result_data, size_t *data_size)
Execute a call.
#define mutex_acquire(mutex)
rpc_call_t * rpc_call_create(rpc_server_stub_t *server, u32 function_id)
Manually create a new RPC call.
void rpc_call_destroy(rpc_call_t *call)
Destroy a call.
pthread_mutex_t mutex_t
void rpc_client_destroy(rpc_server_stub_t *server)
Destroy a server stub.
rpc_result_code_t rpc_simple_callv(rpc_server_stub_t *stub, u32 funcid, rpc_result_t *result, const char *argspec, va_list args)
Call a function on the server.
rpc_server_stub_t * rpc_client_create(const char *server_name)
Create a new RPC client stub for the given server.
#define mutex_release(mutex)
rpc_result_code_t rpc_simple_call(rpc_server_stub_t *stub, u32 funcid, rpc_result_t *result, const char *argspec,...)
Call a function on the server.
MOSAPI void rpc_call_arg_string(rpc_call_t *call, const char *arg)
size_t size
Definition slab.cpp:34
An IPC message.
Definition ipc.h:19
size_t size
Definition ipc.h:20
char data[]
Definition ipc.h:21
u32 magic
Definition internal.h:14
rpc_argtype_t argtype
Definition internal.h:15
char data[]
Definition internal.h:17
u32 size
Definition internal.h:16
size_t size
rpc_server_stub_t * server
rpc_request_t * request
mutex_t mutex
char args_array[]
Definition internal.h:27
u32 function_id
Definition internal.h:25
char data[]
Definition internal.h:37
rpc_result_code_t result_code
Definition internal.h:35
size_t data_size
Definition internal.h:36
void * data
Definition rpc_client.h:15
size_t size
Definition rpc_client.h:16
const char * server_name
std::atomic_size_t callid
signed int s32
Definition types.h:11
unsigned int u32
Definition types.h:17
signed char s8
Definition types.h:9
signed long long int s64
Definition types.h:13
unsigned long long u64
Definition types.h:19
unsigned char u8
Definition types.h:15
should_inline fd_t syscall_ipc_connect(const char *name, size_t buffer_size)
Definition usermode.h:121
should_inline bool syscall_io_close(fd_t fd)
Definition usermode.h:44