1// SPDX-License-Identifier: GPL-3.0-or-later
2
3#include "blockdev_manager.hpp"
4
5#include "proto/blockdev.pb.h"
6#include "proto/blockdev.services.h"
7
8#include <atomic>
9#include <cstdlib>
10#include <iostream>
11#include <librpc/rpc.h>
12#include <librpc/rpc_server++.hpp>
13#include <librpc/rpc_server.h>
14#include <map>
15#include <memory>
16#include <mos/types.h>
17#include <mutex>
18#include <pb.h>
19#include <pb_decode.h>
20#include <pb_encode.h>
21#include <string>
22
23std::map<std::string, BlockInfo> devices; // blockdev id -> blockdev info
24static std::atomic_ulong next_blockdev_id = 2; // 1 is reserved for the root directory
25
26static std::shared_ptr<BlockdevLayerStub> get_layer_server(const std::string &name)
27{
28 static std::map<std::string, std::shared_ptr<BlockdevLayerStub>> layer_servers; // server name -> server
29 static std::mutex server_lock;
30 std::lock_guard<std::mutex> lock(server_lock);
31
32 if (!layer_servers.contains(x: name))
33 layer_servers[name] = std::make_shared<BlockdevLayerStub>(args: name);
34
35 return layer_servers[name];
36}
37
38static std::shared_ptr<BlockdevDeviceStub> get_device_server(const std::string &name)
39{
40 static std::map<std::string, std::shared_ptr<BlockdevDeviceStub>> device_servers; // server name -> server
41 static std::mutex server_lock;
42 std::lock_guard<std::mutex> lock(server_lock);
43
44 if (!device_servers.contains(x: name))
45 device_servers[name] = std::make_shared<BlockdevDeviceStub>(args: name);
46
47 return device_servers[name];
48}
49
50struct ClientFDTable
51{
52 fd_t next_fd = 0;
53 std::map<int, std::string> fd_to_device;
54};
55
56void BlockManager::on_connect(rpc_context_t *ctx)
57{
58 // allocate a new FD table for this client
59 set_data(context: ctx, data: new ClientFDTable());
60}
61
62void BlockManager::on_disconnect(rpc_context_t *ctx)
63{
64 auto table = get_data<ClientFDTable>(context: ctx);
65 delete table;
66 MOS_UNUSED(ctx);
67}
68
69rpc_result_code_t BlockManager::register_layer_server(rpc_context_t *, register_layer_server::request *req, register_layer_server::response *resp)
70{
71 for (size_t i = 0; i < req->partitions_count; i++)
72 {
73 const auto part = req->partitions[i];
74 const BlockInfo info = {
75 .ino = next_blockdev_id++,
76 .name = part.name,
77 .n_blocks = part.size / 512,
78 .block_size = 512,
79 .type = BlockInfo::BLOCKDEV_LAYER,
80 .info = BlockLayerInfo{ .server_name = req->server_name, .partid = part.partid },
81 };
82
83 devices.emplace(args: part.name, args: info);
84 }
85
86 resp->result.success = true;
87 resp->result.error = NULL;
88 return RPC_RESULT_OK;
89}
90
91rpc_result_code_t BlockManager::register_device(rpc_context_t *, register_device::request *req, register_device::response *resp)
92{
93 if (devices.contains(x: req->device_info.name))
94 {
95 std::cout << "Device " << req->device_info.name << " already registered" << std::endl;
96 resp->result.success = false;
97 resp->result.error = strdup(string: "Device already registered");
98 return RPC_RESULT_OK;
99 }
100
101 const BlockInfo info = {
102 .ino = next_blockdev_id++,
103 .name = req->device_info.name,
104 .n_blocks = req->device_info.n_blocks,
105 .block_size = req->device_info.block_size,
106 .type = BlockInfo::BLOCKDEV_DEVICE,
107 .info = BlockDeviceInfo{ .server_name = req->server_name },
108 };
109
110 devices.emplace(args&: req->device_info.name, args: info);
111
112 std::cout << "block device '" << req->device_info.name << "' with " << req->device_info.n_blocks << " blocks of size " << req->device_info.block_size << " bytes"
113 << std::endl;
114
115 resp->id = info.ino;
116
117 resp->result.success = true;
118 resp->result.error = NULL;
119 return RPC_RESULT_OK;
120}
121
122rpc_result_code_t BlockManager::open_device(rpc_context_t *ctx, open_device::request *req, open_device::response *resp)
123{
124 const auto name = req->device_name;
125 if (!devices.contains(x: name))
126 {
127 std::cout << "Device " << name << " not found" << std::endl;
128 resp->result.success = false;
129 resp->result.error = strdup(string: "Device not found");
130 return RPC_RESULT_OK;
131 }
132
133 auto fdtable = get_data<ClientFDTable>(context: ctx);
134 const auto fd = fdtable->next_fd++;
135 fdtable->fd_to_device[fd] = name;
136 resp->device.devid = fd;
137 resp->result.success = true;
138 resp->result.error = NULL;
139 return RPC_RESULT_OK;
140}
141
142rpc_result_code_t BlockManager::read_block(rpc_context_t *ctx, read_block::request *req, read_block::response *resp)
143{
144 auto fdtable = get_data<ClientFDTable>(context: ctx);
145 if (!fdtable->fd_to_device.contains(x: req->device.devid))
146 {
147 std::cout << "Invalid device handle " << req->device.devid << std::endl;
148 resp->result.success = false;
149 resp->result.error = strdup(string: "Invalid device handle");
150 return RPC_RESULT_OK;
151 }
152
153 auto &device = devices[fdtable->fd_to_device[req->device.devid]];
154 if (req->n_boffset >= device.n_blocks)
155 {
156 std::cout << "Block offset " << req->n_boffset << " out of range" << std::endl;
157 resp->result.success = false;
158 resp->result.error = strdup(string: "Block offset out of range");
159 return RPC_RESULT_OK;
160 }
161
162 switch (device.type)
163 {
164 case BlockInfo::BLOCKDEV_LAYER:
165 {
166 const auto info = std::get<BlockLayerInfo>(v&: device.info);
167 const auto server = get_layer_server(name: info.server_name);
168
169 mosrpc_blockdev_read_partition_block_request part_req = {
170 .device = { .devid = (u32) -1 },
171 .partition = { .partid = info.partid },
172 .n_boffset = req->n_boffset,
173 .n_blocks = req->n_blocks,
174 };
175
176 return server->read_partition_block(request: &part_req, response: resp);
177 }
178
179 case BlockInfo::BLOCKDEV_DEVICE:
180 {
181 const auto server = get_device_server(name: std::get<BlockDeviceInfo>(v&: device.info).server_name);
182 return server->read_block(request: req, response: resp);
183 }
184
185 default: __builtin_unreachable();
186 };
187}
188
189rpc_result_code_t BlockManager::write_block(rpc_context_t *ctx, write_block::request *req, write_block::response *resp)
190{
191 auto fdtable = get_data<ClientFDTable>(context: ctx);
192
193 if (!fdtable->fd_to_device.contains(x: req->device.devid))
194 {
195 std::cout << "Invalid device handle " << req->device.devid << std::endl;
196 resp->result.success = false;
197 resp->result.error = strdup(string: "Invalid device handle");
198 return RPC_RESULT_OK;
199 }
200
201 auto &device = devices[fdtable->fd_to_device[req->device.devid]];
202 if (req->n_boffset >= device.n_blocks)
203 {
204 std::cout << "Block offset " << req->n_boffset << " out of range" << std::endl;
205 resp->result.success = false;
206 resp->result.error = strdup(string: "Block offset out of range");
207 return RPC_RESULT_OK;
208 }
209
210 switch (device.type)
211 {
212 case BlockInfo::BLOCKDEV_LAYER:
213 {
214 const auto info = std::get<BlockLayerInfo>(v&: device.info);
215 const auto server = get_layer_server(name: info.server_name);
216
217 std::cout << "Writing to layer " << info.partid << " block " << req->n_boffset << std::endl;
218
219 mosrpc_blockdev_write_partition_block_request part_req = {
220 .device = { .devid = (u32) -1 },
221 .partition = { .partid = info.partid },
222 .data = req->data,
223 .n_boffset = req->n_boffset,
224 .n_blocks = req->n_blocks,
225 };
226
227 return server->write_partition_block(request: &part_req, response: resp);
228 }
229
230 case BlockInfo::BLOCKDEV_DEVICE:
231 {
232 const auto servername = std::get<BlockDeviceInfo>(v&: device.info).server_name;
233 const auto server = get_device_server(name: servername);
234 return server->write_block(request: req, response: resp);
235 }
236
237 default: __builtin_unreachable();
238 };
239
240 return RPC_RESULT_OK;
241}
242