Whamcloud - gitweb
ac25e7e69913ec7fe50c08e7f779e7d3609d095b
[fs/lustre-release.git] / lustre / ptlrpc / llog_client.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/ptlrpc/llog_client.c
33  *
34  * remote api for llog - client side
35  *
36  * Author: Andreas Dilger <adilger@clusterfs.com>
37  */
38
39 #define DEBUG_SUBSYSTEM S_LOG
40
41 #include <linux/list.h>
42 #include <libcfs/libcfs.h>
43
44 #include <obd_class.h>
45 #include <lustre_log.h>
46 #include <lustre_net.h>
47
48 #include "ptlrpc_internal.h"
49
50 #define LLOG_CLIENT_ENTRY(ctxt, imp) do {                             \
51         mutex_lock(&ctxt->loc_mutex);                                 \
52         if (ctxt->loc_imp) {                                          \
53                 imp = class_import_get(ctxt->loc_imp);                \
54         } else {                                                      \
55                 CERROR("ctxt->loc_imp == NULL for context idx %d."    \
56                        "Unable to complete MDS/OSS recovery,"         \
57                        "but I'll try again next time.  Not fatal.\n", \
58                        ctxt->loc_idx);                                \
59                 imp = NULL;                                           \
60                 mutex_unlock(&ctxt->loc_mutex);                       \
61                 return -EINVAL;                                       \
62         }                                                             \
63         mutex_unlock(&ctxt->loc_mutex);                               \
64 } while (0)
65
66 #define LLOG_CLIENT_EXIT(ctxt, imp) do {                              \
67         mutex_lock(&ctxt->loc_mutex);                                 \
68         if (ctxt->loc_imp != imp)                                     \
69                 CWARN("loc_imp has changed from %p to %p\n",          \
70                       ctxt->loc_imp, imp);                            \
71         class_import_put(imp);                                        \
72         mutex_unlock(&ctxt->loc_mutex);                               \
73 } while (0)
74
75 /*
76  * This is a callback from the llog_* functions.
77  * Assumes caller has already pushed us into the kernel context.
78  */
79 static int llog_client_open(const struct lu_env *env,
80                             struct llog_handle *lgh, struct llog_logid *logid,
81                             char *name, enum llog_open_param open_param)
82 {
83         struct obd_import *imp;
84         struct llogd_body *body;
85         struct llog_ctxt *ctxt = lgh->lgh_ctxt;
86         struct ptlrpc_request *req = NULL;
87         int rc;
88
89         ENTRY;
90
91         LLOG_CLIENT_ENTRY(ctxt, imp);
92
93         /* client cannot create llog */
94         LASSERTF(open_param != LLOG_OPEN_NEW, "%#x\n", open_param);
95         LASSERT(lgh);
96
97         req = ptlrpc_request_alloc(imp, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
98         if (!req)
99                 GOTO(out, rc = -ENOMEM);
100
101         if (name)
102                 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
103                                      strlen(name) + 1);
104
105         rc = ptlrpc_request_pack(req, LUSTRE_LOG_VERSION,
106                                  LLOG_ORIGIN_HANDLE_CREATE);
107         if (rc) {
108                 ptlrpc_request_free(req);
109                 req = NULL;
110                 GOTO(out, rc);
111         }
112         ptlrpc_request_set_replen(req);
113
114         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
115         if (logid)
116                 body->lgd_logid = *logid;
117         body->lgd_ctxt_idx = ctxt->loc_idx - 1;
118
119         if (name) {
120                 char *tmp;
121
122                 tmp = req_capsule_client_sized_get(&req->rq_pill, &RMF_NAME,
123                                                    strlen(name) + 1);
124                 LASSERT(tmp);
125                 strcpy(tmp, name);
126
127                 do_pack_body(req);
128         }
129
130         rc = ptlrpc_queue_wait(req);
131         if (rc)
132                 GOTO(out, rc);
133
134         body = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
135         if (!body)
136                 GOTO(out, rc = -EFAULT);
137
138         lgh->lgh_id = body->lgd_logid;
139         lgh->lgh_ctxt = ctxt;
140         EXIT;
141 out:
142         LLOG_CLIENT_EXIT(ctxt, imp);
143         ptlrpc_req_finished(req);
144         return rc;
145 }
146
147 static int llog_client_next_block(const struct lu_env *env,
148                                   struct llog_handle *loghandle,
149                                   int *cur_idx, int next_idx,
150                                   __u64 *cur_offset, void *buf, int len)
151 {
152         struct obd_import *imp;
153         struct ptlrpc_request *req = NULL;
154         struct llogd_body *body;
155         void *ptr;
156         int rc;
157
158         ENTRY;
159
160         LLOG_CLIENT_ENTRY(loghandle->lgh_ctxt, imp);
161         req = ptlrpc_request_alloc_pack(imp, &RQF_LLOG_ORIGIN_HANDLE_NEXT_BLOCK,
162                                         LUSTRE_LOG_VERSION,
163                                         LLOG_ORIGIN_HANDLE_NEXT_BLOCK);
164         if (!req)
165                 GOTO(err_exit, rc = -ENOMEM);
166
167         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
168         body->lgd_logid = loghandle->lgh_id;
169         body->lgd_ctxt_idx = loghandle->lgh_ctxt->loc_idx - 1;
170         body->lgd_llh_flags = loghandle->lgh_hdr->llh_flags;
171         body->lgd_index = next_idx;
172         body->lgd_saved_index = *cur_idx;
173         body->lgd_len = len;
174         body->lgd_cur_offset = *cur_offset;
175
176         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER, len);
177         ptlrpc_request_set_replen(req);
178         rc = ptlrpc_queue_wait(req);
179         /*
180          * -EIO has a special meaning here. If llog_osd_next_block()
181          * reaches the end of the log without finding the desired
182          * record then it updates *cur_offset and *cur_idx and returns
183          * -EIO. In llog_process_thread() we use this to detect
184          * EOF. But we must be careful to distinguish between -EIO
185          * coming from llog_osd_next_block() and -EIO coming from
186          * ptlrpc or below.
187          */
188         if (rc == -EIO) {
189                 if (!req->rq_repmsg ||
190                     lustre_msg_get_status(req->rq_repmsg) != -EIO)
191                         GOTO(out, rc);
192         } else if (rc < 0) {
193                 GOTO(out, rc);
194         }
195
196         body = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
197         if (!body)
198                 GOTO(out, rc = -EFAULT);
199
200         *cur_idx = body->lgd_saved_index;
201         *cur_offset = body->lgd_cur_offset;
202
203         if (rc < 0)
204                 GOTO(out, rc);
205
206         /* The log records are swabbed as they are processed */
207         ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
208         if (!ptr)
209                 GOTO(out, rc = -EFAULT);
210
211         memcpy(buf, ptr, len);
212         EXIT;
213 out:
214         ptlrpc_req_finished(req);
215 err_exit:
216         LLOG_CLIENT_EXIT(loghandle->lgh_ctxt, imp);
217         return rc;
218 }
219
220 static int llog_client_prev_block(const struct lu_env *env,
221                                   struct llog_handle *loghandle,
222                                   int prev_idx, void *buf, int len)
223 {
224         struct obd_import *imp;
225         struct ptlrpc_request *req = NULL;
226         struct llogd_body *body;
227         void *ptr;
228         int rc;
229
230         ENTRY;
231
232         LLOG_CLIENT_ENTRY(loghandle->lgh_ctxt, imp);
233         req = ptlrpc_request_alloc_pack(imp, &RQF_LLOG_ORIGIN_HANDLE_PREV_BLOCK,
234                                         LUSTRE_LOG_VERSION,
235                                         LLOG_ORIGIN_HANDLE_PREV_BLOCK);
236         if (!req)
237                 GOTO(err_exit, rc = -ENOMEM);
238
239         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
240         body->lgd_logid = loghandle->lgh_id;
241         body->lgd_ctxt_idx = loghandle->lgh_ctxt->loc_idx - 1;
242         body->lgd_llh_flags = loghandle->lgh_hdr->llh_flags;
243         body->lgd_index = prev_idx;
244         body->lgd_len = len;
245
246         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER, len);
247         ptlrpc_request_set_replen(req);
248
249         rc = ptlrpc_queue_wait(req);
250         if (rc)
251                 GOTO(out, rc);
252
253         body = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
254         if (!body)
255                 GOTO(out, rc = -EFAULT);
256
257         ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
258         if (!ptr)
259                 GOTO(out, rc = -EFAULT);
260
261         memcpy(buf, ptr, len);
262         EXIT;
263 out:
264         ptlrpc_req_finished(req);
265 err_exit:
266         LLOG_CLIENT_EXIT(loghandle->lgh_ctxt, imp);
267         return rc;
268 }
269
270 static int llog_client_read_header(const struct lu_env *env,
271                                    struct llog_handle *handle)
272 {
273         struct obd_import *imp;
274         struct ptlrpc_request *req = NULL;
275         struct llogd_body *body;
276         struct llog_log_hdr *hdr;
277         struct llog_rec_hdr *llh_hdr;
278         int rc;
279
280         ENTRY;
281
282         LLOG_CLIENT_ENTRY(handle->lgh_ctxt, imp);
283         req = ptlrpc_request_alloc_pack(imp,
284                                         &RQF_LLOG_ORIGIN_HANDLE_READ_HEADER,
285                                         LUSTRE_LOG_VERSION,
286                                         LLOG_ORIGIN_HANDLE_READ_HEADER);
287         if (!req)
288                 GOTO(err_exit, rc = -ENOMEM);
289
290         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
291         body->lgd_logid = handle->lgh_id;
292         body->lgd_ctxt_idx = handle->lgh_ctxt->loc_idx - 1;
293         body->lgd_llh_flags = handle->lgh_hdr->llh_flags;
294
295         ptlrpc_request_set_replen(req);
296         rc = ptlrpc_queue_wait(req);
297         if (rc)
298                 GOTO(out, rc);
299
300         hdr = req_capsule_server_get(&req->rq_pill, &RMF_LLOG_LOG_HDR);
301         if (!hdr)
302                 GOTO(out, rc = -EFAULT);
303
304         if (handle->lgh_hdr_size < hdr->llh_hdr.lrh_len)
305                 GOTO(out, rc = -EFAULT);
306
307         memcpy(handle->lgh_hdr, hdr, hdr->llh_hdr.lrh_len);
308         handle->lgh_last_idx = LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index;
309
310         /* sanity checks */
311         llh_hdr = &handle->lgh_hdr->llh_hdr;
312         if (llh_hdr->lrh_type != LLOG_HDR_MAGIC) {
313                 CERROR("bad log header magic: %#x (expecting %#x)\n",
314                        llh_hdr->lrh_type, LLOG_HDR_MAGIC);
315                 rc = -EIO;
316         } else if (llh_hdr->lrh_len !=
317                    LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_len ||
318                    (llh_hdr->lrh_len & (llh_hdr->lrh_len - 1)) != 0 ||
319                    llh_hdr->lrh_len < LLOG_MIN_CHUNK_SIZE ||
320                    llh_hdr->lrh_len > handle->lgh_hdr_size) {
321                 CERROR("incorrectly sized log header: %#x, expecting %#x (power of two > 8192)\n",
322                        llh_hdr->lrh_len,
323                        LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_len);
324                 CERROR("you may need to re-run lconf --write_conf.\n");
325                 rc = -EIO;
326         }
327         EXIT;
328 out:
329         ptlrpc_req_finished(req);
330 err_exit:
331         LLOG_CLIENT_EXIT(handle->lgh_ctxt, imp);
332         return rc;
333 }
334
335 static int llog_client_close(const struct lu_env *env,
336                              struct llog_handle *handle)
337 {
338         /*
339          * this doesn't call LLOG_ORIGIN_HANDLE_CLOSE because
340          * the servers all close the file at the end of every
341          * other LLOG_ RPC.
342          */
343         return 0;
344 }
345
346 const struct llog_operations llog_client_ops = {
347         .lop_next_block         = llog_client_next_block,
348         .lop_prev_block         = llog_client_prev_block,
349         .lop_read_header        = llog_client_read_header,
350         .lop_open               = llog_client_open,
351         .lop_close              = llog_client_close,
352 };
353 EXPORT_SYMBOL(llog_client_ops);