Whamcloud - gitweb
LU-17000 utils: handle_yaml_no_op() has wrong signature
[fs/lustre-release.git] / lustre / ptlrpc / llog_client.c
1 // SPDX-License-Identifier: GPL-2.0
2
3 /*
4  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
5  * Use is subject to license terms.
6  *
7  * Copyright (c) 2012, 2015, Intel Corporation.
8  */
9
10 /*
11  * This file is part of Lustre, http://www.lustre.org/
12  *
13  * remote api for llog - client side
14  *
15  * Author: Andreas Dilger <adilger@clusterfs.com>
16  */
17
18 #define DEBUG_SUBSYSTEM S_LOG
19
20 #include <linux/list.h>
21 #include <libcfs/libcfs.h>
22
23 #include <obd_class.h>
24 #include <lustre_log.h>
25 #include <lustre_net.h>
26
27 #include "ptlrpc_internal.h"
28
29 static struct obd_import *llog_client_entry(struct llog_ctxt *ctxt)
30 {
31         struct obd_import *imp = ERR_PTR(-EINVAL);
32
33         mutex_lock(&ctxt->loc_mutex);
34         if (ctxt->loc_imp)
35                 imp = class_import_get(ctxt->loc_imp);
36         else
37                 CWARN("%s: cannot finish recovery, ctxt #%d import not set, will retry rc = %ld\n",
38                       ctxt->loc_obd ? ctxt->loc_obd->obd_name : "llog",
39                       ctxt->loc_idx, PTR_ERR(imp));
40
41         mutex_unlock(&ctxt->loc_mutex);
42         return imp;
43 }
44
45 static void llog_client_exit(struct llog_ctxt *ctxt, struct obd_import *imp)
46 {
47         mutex_lock(&ctxt->loc_mutex);
48         if (ctxt->loc_imp != imp)
49                 CWARN("%s: import has changed from %p to %p\n",
50                       ctxt->loc_obd ? ctxt->loc_obd->obd_name : "llog",
51                       ctxt->loc_imp, imp);
52         class_import_put(imp);
53         mutex_unlock(&ctxt->loc_mutex);
54 }
55
56 /*
57  * This is a callback from the llog_* functions.
58  * Assumes caller has already pushed us into the kernel context.
59  */
60 static int llog_client_open(const struct lu_env *env,
61                             struct llog_handle *lgh, struct llog_logid *logid,
62                             char *name, enum llog_open_param open_param)
63 {
64         struct obd_import *imp;
65         struct llogd_body *body;
66         struct llog_ctxt *ctxt = lgh->lgh_ctxt;
67         struct ptlrpc_request *req = NULL;
68         int rc;
69
70         ENTRY;
71
72         imp = llog_client_entry(ctxt);
73         if (IS_ERR(imp))
74                 return PTR_ERR(imp);
75
76         /* client cannot create llog */
77         LASSERTF(open_param != LLOG_OPEN_NEW, "%#x\n", open_param);
78         LASSERT(lgh);
79
80         req = ptlrpc_request_alloc(imp, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
81         if (!req)
82                 GOTO(out, rc = -ENOMEM);
83
84         if (name)
85                 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
86                                      strlen(name) + 1);
87
88         rc = ptlrpc_request_pack(req, LUSTRE_LOG_VERSION,
89                                  LLOG_ORIGIN_HANDLE_CREATE);
90         if (rc) {
91                 ptlrpc_request_free(req);
92                 req = NULL;
93                 GOTO(out, rc);
94         }
95         ptlrpc_request_set_replen(req);
96
97         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
98         if (logid)
99                 body->lgd_logid = *logid;
100         body->lgd_ctxt_idx = ctxt->loc_idx - 1;
101
102         if (name) {
103                 char *tmp;
104
105                 tmp = req_capsule_client_sized_get(&req->rq_pill, &RMF_NAME,
106                                                    strlen(name) + 1);
107                 LASSERT(tmp);
108                 strcpy(tmp, name);
109
110                 do_pack_body(req);
111         }
112
113         rc = ptlrpc_queue_wait(req);
114         if (rc)
115                 GOTO(out, rc);
116
117         body = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
118         if (!body)
119                 GOTO(out, rc = -EFAULT);
120
121         lgh->lgh_id = body->lgd_logid;
122         lgh->lgh_ctxt = ctxt;
123         EXIT;
124 out:
125         llog_client_exit(ctxt, imp);
126         ptlrpc_req_put(req);
127         return rc;
128 }
129
130 static int llog_client_next_block(const struct lu_env *env,
131                                   struct llog_handle *loghandle,
132                                   int *cur_idx, int next_idx,
133                                   __u64 *cur_offset, void *buf, int len)
134 {
135         struct obd_import *imp;
136         struct ptlrpc_request *req = NULL;
137         struct llogd_body *body;
138         void *ptr;
139         int rc;
140
141         ENTRY;
142
143         imp = llog_client_entry(loghandle->lgh_ctxt);
144         if (IS_ERR(imp))
145                 return PTR_ERR(imp);
146
147         req = ptlrpc_request_alloc_pack(imp, &RQF_LLOG_ORIGIN_HANDLE_NEXT_BLOCK,
148                                         LUSTRE_LOG_VERSION,
149                                         LLOG_ORIGIN_HANDLE_NEXT_BLOCK);
150         if (IS_ERR(req))
151                 GOTO(err_exit, rc = PTR_ERR(req));
152
153         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
154         body->lgd_logid = loghandle->lgh_id;
155         body->lgd_ctxt_idx = loghandle->lgh_ctxt->loc_idx - 1;
156         body->lgd_llh_flags = loghandle->lgh_hdr->llh_flags;
157         body->lgd_index = next_idx;
158         body->lgd_saved_index = *cur_idx;
159         body->lgd_len = len;
160         body->lgd_cur_offset = *cur_offset;
161
162         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER, len);
163         ptlrpc_request_set_replen(req);
164         rc = ptlrpc_queue_wait(req);
165         /*
166          * -EBADR has a special meaning here. If llog_osd_next_block()
167          * reaches the end of the log without finding the desired
168          * record then it updates *cur_offset and *cur_idx and returns
169          * -EBADR. In llog_process_thread() we use this to detect
170          * EOF. But we must be careful to distinguish between -EBADR
171          * coming from llog_osd_next_block() and -EBADR coming from
172          * ptlrpc or below.
173          */
174         if (rc == -EBADR) {
175                 if (!req->rq_repmsg ||
176                     lustre_msg_get_status(req->rq_repmsg) != -EBADR)
177                         GOTO(out, rc);
178         } else if (rc < 0) {
179                 GOTO(out, rc);
180         }
181
182         body = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
183         if (!body)
184                 GOTO(out, rc = -EFAULT);
185
186         *cur_idx = body->lgd_saved_index;
187         *cur_offset = body->lgd_cur_offset;
188
189         if (rc < 0)
190                 GOTO(out, rc);
191
192         /* The log records are swabbed as they are processed */
193         ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
194         if (!ptr)
195                 GOTO(out, rc = -EFAULT);
196
197         memcpy(buf, ptr, len);
198         EXIT;
199 out:
200         ptlrpc_req_put(req);
201 err_exit:
202         llog_client_exit(loghandle->lgh_ctxt, imp);
203         return rc;
204 }
205
206 static int llog_client_prev_block(const struct lu_env *env,
207                                   struct llog_handle *loghandle,
208                                   int prev_idx, void *buf, int len)
209 {
210         struct obd_import *imp;
211         struct ptlrpc_request *req = NULL;
212         struct llogd_body *body;
213         void *ptr;
214         int rc;
215
216         ENTRY;
217
218         imp = llog_client_entry(loghandle->lgh_ctxt);
219         if (IS_ERR(imp))
220                 return PTR_ERR(imp);
221
222         req = ptlrpc_request_alloc_pack(imp, &RQF_LLOG_ORIGIN_HANDLE_PREV_BLOCK,
223                                         LUSTRE_LOG_VERSION,
224                                         LLOG_ORIGIN_HANDLE_PREV_BLOCK);
225         if (IS_ERR(req))
226                 GOTO(err_exit, rc = PTR_ERR(req));
227
228         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
229         body->lgd_logid = loghandle->lgh_id;
230         body->lgd_ctxt_idx = loghandle->lgh_ctxt->loc_idx - 1;
231         body->lgd_llh_flags = loghandle->lgh_hdr->llh_flags;
232         body->lgd_index = prev_idx;
233         body->lgd_len = len;
234
235         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER, len);
236         ptlrpc_request_set_replen(req);
237
238         rc = ptlrpc_queue_wait(req);
239         if (rc)
240                 GOTO(out, rc);
241
242         body = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
243         if (!body)
244                 GOTO(out, rc = -EFAULT);
245
246         ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
247         if (!ptr)
248                 GOTO(out, rc = -EFAULT);
249
250         memcpy(buf, ptr, len);
251         EXIT;
252 out:
253         ptlrpc_req_put(req);
254 err_exit:
255         llog_client_exit(loghandle->lgh_ctxt, imp);
256         return rc;
257 }
258
259 static int llog_client_read_header(const struct lu_env *env,
260                                    struct llog_handle *handle)
261 {
262         struct obd_import *imp;
263         struct ptlrpc_request *req = NULL;
264         struct llogd_body *body;
265         struct llog_log_hdr *hdr;
266         struct llog_rec_hdr *llh_hdr;
267         int rc;
268
269         ENTRY;
270
271         imp = llog_client_entry(handle->lgh_ctxt);
272         if (IS_ERR(imp))
273                 return PTR_ERR(imp);
274
275         req = ptlrpc_request_alloc_pack(imp,
276                                         &RQF_LLOG_ORIGIN_HANDLE_READ_HEADER,
277                                         LUSTRE_LOG_VERSION,
278                                         LLOG_ORIGIN_HANDLE_READ_HEADER);
279         if (IS_ERR(req))
280                 GOTO(err_exit, rc = PTR_ERR(req));
281
282         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
283         body->lgd_logid = handle->lgh_id;
284         body->lgd_ctxt_idx = handle->lgh_ctxt->loc_idx - 1;
285         body->lgd_llh_flags = handle->lgh_hdr->llh_flags;
286
287         ptlrpc_request_set_replen(req);
288         rc = ptlrpc_queue_wait(req);
289         if (rc)
290                 GOTO(out, rc);
291
292         hdr = req_capsule_server_get(&req->rq_pill, &RMF_LLOG_LOG_HDR);
293         if (!hdr)
294                 GOTO(out, rc = -EFAULT);
295
296         if (handle->lgh_hdr_size < hdr->llh_hdr.lrh_len)
297                 GOTO(out, rc = -EFAULT);
298
299         memcpy(handle->lgh_hdr, hdr, hdr->llh_hdr.lrh_len);
300         handle->lgh_last_idx = LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index;
301
302         /* sanity checks */
303         llh_hdr = &handle->lgh_hdr->llh_hdr;
304         if (llh_hdr->lrh_type != LLOG_HDR_MAGIC) {
305                 CERROR("bad log header magic: %#x (expecting %#x)\n",
306                        llh_hdr->lrh_type, LLOG_HDR_MAGIC);
307                 rc = -EIO;
308         } else if (llh_hdr->lrh_len !=
309                    LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_len ||
310                    (llh_hdr->lrh_len & (llh_hdr->lrh_len - 1)) != 0 ||
311                    llh_hdr->lrh_len < LLOG_MIN_CHUNK_SIZE ||
312                    llh_hdr->lrh_len > handle->lgh_hdr_size) {
313                 CERROR("incorrectly sized log header: %#x, expecting %#x (power of two > 8192)\n",
314                        llh_hdr->lrh_len,
315                        LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_len);
316                 CERROR("you may need to re-run lconf --write_conf.\n");
317                 rc = -EIO;
318         }
319         EXIT;
320 out:
321         ptlrpc_req_put(req);
322 err_exit:
323         llog_client_exit(handle->lgh_ctxt, imp);
324         return rc;
325 }
326
327 static int llog_client_close(const struct lu_env *env,
328                              struct llog_handle *handle)
329 {
330         /*
331          * this doesn't call LLOG_ORIGIN_HANDLE_CLOSE because
332          * the servers all close the file at the end of every
333          * other LLOG_ RPC.
334          */
335         return 0;
336 }
337
338 const struct llog_operations llog_client_ops = {
339         .lop_next_block         = llog_client_next_block,
340         .lop_prev_block         = llog_client_prev_block,
341         .lop_read_header        = llog_client_read_header,
342         .lop_open               = llog_client_open,
343         .lop_close              = llog_client_close,
344 };
345 EXPORT_SYMBOL(llog_client_ops);