Whamcloud - gitweb
LU-8726 osd-ldiskfs: bypass read for benchmarking
[fs/lustre-release.git] / lustre / ptlrpc / llog_server.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/ptlrpc/llog_server.c
33  *
34  * remote api for llog - server side
35  *
36  * Author: Andreas Dilger <adilger@clusterfs.com>
37  */
38
39 #define DEBUG_SUBSYSTEM S_LOG
40
41 #include <obd_class.h>
42 #include <lu_target.h>
43 #include <lustre_log.h>
44 #include <lustre_net.h>
45
46 static int llog_origin_close(const struct lu_env *env, struct llog_handle *lgh)
47 {
48         if (lgh->lgh_hdr != NULL && lgh->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
49                 return llog_cat_close(env, lgh);
50         else
51                 return llog_close(env, lgh);
52 }
53
54 /* Only open is supported, no new llog can be created remotely */
55 int llog_origin_handle_open(struct ptlrpc_request *req)
56 {
57         struct obd_export       *exp = req->rq_export;
58         struct obd_device       *obd = exp->exp_obd;
59         struct llog_handle      *loghandle;
60         struct llogd_body       *body;
61         struct llog_logid       *logid = NULL;
62         struct llog_ctxt        *ctxt;
63         char                    *name = NULL;
64         int                      rc;
65
66         ENTRY;
67
68         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
69         if (body == NULL)
70                 RETURN(err_serious(-EFAULT));
71
72         rc = req_capsule_server_pack(&req->rq_pill);
73         if (rc)
74                 RETURN(err_serious(-ENOMEM));
75
76         if (ostid_id(&body->lgd_logid.lgl_oi) > 0)
77                 logid = &body->lgd_logid;
78
79         if (req_capsule_field_present(&req->rq_pill, &RMF_NAME, RCL_CLIENT)) {
80                 name = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
81                 if (name == NULL)
82                         RETURN(-EFAULT);
83                 CDEBUG(D_INFO, "%s: opening log %s\n", obd->obd_name, name);
84         }
85
86         if (body->lgd_ctxt_idx >= LLOG_MAX_CTXTS) {
87                 CDEBUG(D_WARNING, "%s: bad ctxt ID: idx=%d name=%s\n",
88                        obd->obd_name, body->lgd_ctxt_idx, name);
89                 RETURN(-EPROTO);
90         }
91
92         ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
93         if (ctxt == NULL) {
94                 CDEBUG(D_WARNING, "%s: no ctxt. group=%p idx=%d name=%s\n",
95                        obd->obd_name, &obd->obd_olg, body->lgd_ctxt_idx, name);
96                 RETURN(-ENODEV);
97         }
98
99         rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle, logid,
100                        name, LLOG_OPEN_EXISTS);
101         if (rc)
102                 GOTO(out_ctxt, rc);
103
104         body = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
105         body->lgd_logid = loghandle->lgh_id;
106
107         llog_origin_close(req->rq_svc_thread->t_env, loghandle);
108         EXIT;
109 out_ctxt:
110         llog_ctxt_put(ctxt);
111         return rc;
112 }
113
114 int llog_origin_handle_destroy(struct ptlrpc_request *req)
115 {
116         struct llogd_body       *body;
117         struct llog_logid       *logid = NULL;
118         struct llog_ctxt        *ctxt;
119         int                      rc;
120
121         ENTRY;
122
123         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
124         if (body == NULL)
125                 RETURN(err_serious(-EFAULT));
126
127         rc = req_capsule_server_pack(&req->rq_pill);
128         if (rc < 0)
129                 RETURN(err_serious(-ENOMEM));
130
131         if (ostid_id(&body->lgd_logid.lgl_oi) > 0)
132                 logid = &body->lgd_logid;
133
134         if (!(body->lgd_llh_flags & LLOG_F_IS_PLAIN))
135                 CERROR("%s: wrong llog flags %x\n",
136                        req->rq_export->exp_obd->obd_name, body->lgd_llh_flags);
137
138         if (body->lgd_ctxt_idx >= LLOG_MAX_CTXTS) {
139                 CDEBUG(D_WARNING, "%s: bad ctxt ID: idx=%d\n",
140                        req->rq_export->exp_obd->obd_name, body->lgd_ctxt_idx);
141                 RETURN(-EPROTO);
142         }
143
144         ctxt = llog_get_context(req->rq_export->exp_obd, body->lgd_ctxt_idx);
145         if (ctxt == NULL)
146                 RETURN(-ENODEV);
147
148         rc = llog_erase(req->rq_svc_thread->t_env, ctxt, logid, NULL);
149         llog_ctxt_put(ctxt);
150         RETURN(rc);
151 }
152
153 int llog_origin_handle_next_block(struct ptlrpc_request *req)
154 {
155         struct llog_handle      *loghandle;
156         struct llogd_body       *body;
157         struct llogd_body       *repbody;
158         struct llog_ctxt        *ctxt;
159         __u32                    flags;
160         void                    *ptr;
161         int                      rc;
162
163         ENTRY;
164
165         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
166         if (body == NULL)
167                 RETURN(err_serious(-EFAULT));
168
169         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
170                              LLOG_MIN_CHUNK_SIZE);
171         rc = req_capsule_server_pack(&req->rq_pill);
172         if (rc)
173                 RETURN(err_serious(-ENOMEM));
174
175         if (body->lgd_ctxt_idx >= LLOG_MAX_CTXTS) {
176                 CDEBUG(D_WARNING, "%s: bad ctxt ID: idx=%d\n",
177                        req->rq_export->exp_obd->obd_name, body->lgd_ctxt_idx);
178                 RETURN(-EPROTO);
179         }
180
181         ctxt = llog_get_context(req->rq_export->exp_obd, body->lgd_ctxt_idx);
182         if (ctxt == NULL)
183                 RETURN(-ENODEV);
184
185         rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle,
186                        &body->lgd_logid, NULL, LLOG_OPEN_EXISTS);
187         if (rc)
188                 GOTO(out_ctxt, rc);
189
190         flags = body->lgd_llh_flags;
191         rc = llog_init_handle(req->rq_svc_thread->t_env, loghandle, flags,
192                               NULL);
193         if (rc)
194                 GOTO(out_close, rc);
195
196         repbody = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
197         *repbody = *body;
198
199         ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
200         rc = llog_next_block(req->rq_svc_thread->t_env, loghandle,
201                              &repbody->lgd_saved_index, repbody->lgd_index,
202                              &repbody->lgd_cur_offset, ptr,
203                              LLOG_MIN_CHUNK_SIZE);
204         if (rc)
205                 GOTO(out_close, rc);
206         EXIT;
207 out_close:
208         llog_origin_close(req->rq_svc_thread->t_env, loghandle);
209 out_ctxt:
210         llog_ctxt_put(ctxt);
211         return rc;
212 }
213
214 int llog_origin_handle_prev_block(struct ptlrpc_request *req)
215 {
216         struct llog_handle      *loghandle;
217         struct llogd_body       *body;
218         struct llogd_body       *repbody;
219         struct llog_ctxt        *ctxt;
220         __u32                    flags;
221         void                    *ptr;
222         int                      rc;
223
224         ENTRY;
225
226         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
227         if (body == NULL)
228                 RETURN(err_serious(-EFAULT));
229
230         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
231                              LLOG_MIN_CHUNK_SIZE);
232         rc = req_capsule_server_pack(&req->rq_pill);
233         if (rc)
234                 RETURN(err_serious(-ENOMEM));
235
236         if (body->lgd_ctxt_idx >= LLOG_MAX_CTXTS) {
237                 CDEBUG(D_WARNING, "%s: bad ctxt ID: idx=%d\n",
238                        req->rq_export->exp_obd->obd_name, body->lgd_ctxt_idx);
239                 RETURN(-EPROTO);
240         }
241
242         ctxt = llog_get_context(req->rq_export->exp_obd, body->lgd_ctxt_idx);
243         if (ctxt == NULL)
244                 RETURN(-ENODEV);
245
246         rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle,
247                          &body->lgd_logid, NULL, LLOG_OPEN_EXISTS);
248         if (rc)
249                 GOTO(out_ctxt, rc);
250
251         flags = body->lgd_llh_flags;
252         rc = llog_init_handle(req->rq_svc_thread->t_env, loghandle, flags,
253                               NULL);
254         if (rc)
255                 GOTO(out_close, rc);
256
257         repbody = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
258         *repbody = *body;
259
260         ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
261         rc = llog_prev_block(req->rq_svc_thread->t_env, loghandle,
262                              body->lgd_index, ptr, LLOG_MIN_CHUNK_SIZE);
263         if (rc)
264                 GOTO(out_close, rc);
265
266         EXIT;
267 out_close:
268         llog_origin_close(req->rq_svc_thread->t_env, loghandle);
269 out_ctxt:
270         llog_ctxt_put(ctxt);
271         return rc;
272 }
273
274 int llog_origin_handle_read_header(struct ptlrpc_request *req)
275 {
276         struct llog_handle      *loghandle;
277         struct llogd_body       *body;
278         struct llog_log_hdr     *hdr;
279         struct llog_ctxt        *ctxt;
280         __u32                    flags;
281         int                      rc;
282
283         ENTRY;
284
285         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
286         if (body == NULL)
287                 RETURN(err_serious(-EFAULT));
288
289         rc = req_capsule_server_pack(&req->rq_pill);
290         if (rc)
291                 RETURN(err_serious(-ENOMEM));
292
293         if (body->lgd_ctxt_idx >= LLOG_MAX_CTXTS) {
294                 CDEBUG(D_WARNING, "%s: bad ctxt ID: idx=%d\n",
295                        req->rq_export->exp_obd->obd_name, body->lgd_ctxt_idx);
296                 RETURN(-EPROTO);
297         }
298
299         ctxt = llog_get_context(req->rq_export->exp_obd, body->lgd_ctxt_idx);
300         if (ctxt == NULL)
301                 RETURN(-ENODEV);
302
303         rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle,
304                        &body->lgd_logid, NULL, LLOG_OPEN_EXISTS);
305         if (rc)
306                 GOTO(out_ctxt, rc);
307
308         /*
309          * llog_init_handle() reads the llog header
310          */
311         flags = body->lgd_llh_flags;
312         rc = llog_init_handle(req->rq_svc_thread->t_env, loghandle, flags,
313                               NULL);
314         if (rc)
315                 GOTO(out_close, rc);
316         flags = loghandle->lgh_hdr->llh_flags;
317
318         hdr = req_capsule_server_get(&req->rq_pill, &RMF_LLOG_LOG_HDR);
319         *hdr = *loghandle->lgh_hdr;
320         EXIT;
321 out_close:
322         llog_origin_close(req->rq_svc_thread->t_env, loghandle);
323 out_ctxt:
324         llog_ctxt_put(ctxt);
325         return rc;
326 }
327
328 int llog_origin_handle_close(struct ptlrpc_request *req)
329 {
330         int      rc;
331
332         ENTRY;
333
334         rc = req_capsule_server_pack(&req->rq_pill);
335         if (rc)
336                 RETURN(err_serious(-ENOMEM));
337         RETURN(0);
338 }