Whamcloud - gitweb
LU-6070 libcfs: provide separate buffers for libcfs_*2str()
[fs/lustre-release.git] / lustre / ptlrpc / llog_server.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ptlrpc/llog_server.c
37  *
38  * remote api for llog - server side
39  *
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LOG
44
45 #include <obd_class.h>
46 #include <lu_target.h>
47 #include <lustre_log.h>
48 #include <lustre_net.h>
49
50 static int llog_origin_close(const struct lu_env *env, struct llog_handle *lgh)
51 {
52         if (lgh->lgh_hdr != NULL && lgh->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
53                 return llog_cat_close(env, lgh);
54         else
55                 return llog_close(env, lgh);
56 }
57
58 /* Only open is supported, no new llog can be created remotely */
59 int llog_origin_handle_open(struct ptlrpc_request *req)
60 {
61         struct obd_export       *exp = req->rq_export;
62         struct obd_device       *obd = exp->exp_obd;
63         struct llog_handle      *loghandle;
64         struct llogd_body       *body;
65         struct llog_logid       *logid = NULL;
66         struct llog_ctxt        *ctxt;
67         char                    *name = NULL;
68         int                      rc;
69
70         ENTRY;
71
72         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
73         if (body == NULL)
74                 RETURN(err_serious(-EFAULT));
75
76         rc = req_capsule_server_pack(&req->rq_pill);
77         if (rc)
78                 RETURN(err_serious(-ENOMEM));
79
80         if (ostid_id(&body->lgd_logid.lgl_oi) > 0)
81                 logid = &body->lgd_logid;
82
83         if (req_capsule_field_present(&req->rq_pill, &RMF_NAME, RCL_CLIENT)) {
84                 name = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
85                 if (name == NULL)
86                         RETURN(-EFAULT);
87                 CDEBUG(D_INFO, "%s: opening log %s\n", obd->obd_name, name);
88         }
89
90         if (body->lgd_ctxt_idx >= LLOG_MAX_CTXTS) {
91                 CDEBUG(D_WARNING, "%s: bad ctxt ID: idx=%d name=%s\n",
92                        obd->obd_name, body->lgd_ctxt_idx, name);
93                 RETURN(-EPROTO);
94         }
95
96         ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
97         if (ctxt == NULL) {
98                 CDEBUG(D_WARNING, "%s: no ctxt. group=%p idx=%d name=%s\n",
99                        obd->obd_name, &obd->obd_olg, body->lgd_ctxt_idx, name);
100                 RETURN(-ENODEV);
101         }
102
103         rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle, logid,
104                        name, LLOG_OPEN_EXISTS);
105         if (rc)
106                 GOTO(out_ctxt, rc);
107
108         body = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
109         body->lgd_logid = loghandle->lgh_id;
110
111         llog_origin_close(req->rq_svc_thread->t_env, loghandle);
112         EXIT;
113 out_ctxt:
114         llog_ctxt_put(ctxt);
115         return rc;
116 }
117
118 int llog_origin_handle_destroy(struct ptlrpc_request *req)
119 {
120         struct llogd_body       *body;
121         struct llog_logid       *logid = NULL;
122         struct llog_ctxt        *ctxt;
123         int                      rc;
124
125         ENTRY;
126
127         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
128         if (body == NULL)
129                 RETURN(err_serious(-EFAULT));
130
131         rc = req_capsule_server_pack(&req->rq_pill);
132         if (rc < 0)
133                 RETURN(err_serious(-ENOMEM));
134
135         if (ostid_id(&body->lgd_logid.lgl_oi) > 0)
136                 logid = &body->lgd_logid;
137
138         if (!(body->lgd_llh_flags & LLOG_F_IS_PLAIN))
139                 CERROR("%s: wrong llog flags %x\n",
140                        req->rq_export->exp_obd->obd_name, body->lgd_llh_flags);
141
142         if (body->lgd_ctxt_idx >= LLOG_MAX_CTXTS) {
143                 CDEBUG(D_WARNING, "%s: bad ctxt ID: idx=%d\n",
144                        req->rq_export->exp_obd->obd_name, body->lgd_ctxt_idx);
145                 RETURN(-EPROTO);
146         }
147
148         ctxt = llog_get_context(req->rq_export->exp_obd, body->lgd_ctxt_idx);
149         if (ctxt == NULL)
150                 RETURN(-ENODEV);
151
152         rc = llog_erase(req->rq_svc_thread->t_env, ctxt, logid, NULL);
153         llog_ctxt_put(ctxt);
154         RETURN(rc);
155 }
156
157 int llog_origin_handle_next_block(struct ptlrpc_request *req)
158 {
159         struct llog_handle      *loghandle;
160         struct llogd_body       *body;
161         struct llogd_body       *repbody;
162         struct llog_ctxt        *ctxt;
163         __u32                    flags;
164         void                    *ptr;
165         int                      rc;
166
167         ENTRY;
168
169         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
170         if (body == NULL)
171                 RETURN(err_serious(-EFAULT));
172
173         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
174                              LLOG_CHUNK_SIZE);
175         rc = req_capsule_server_pack(&req->rq_pill);
176         if (rc)
177                 RETURN(err_serious(-ENOMEM));
178
179         if (body->lgd_ctxt_idx >= LLOG_MAX_CTXTS) {
180                 CDEBUG(D_WARNING, "%s: bad ctxt ID: idx=%d\n",
181                        req->rq_export->exp_obd->obd_name, body->lgd_ctxt_idx);
182                 RETURN(-EPROTO);
183         }
184
185         ctxt = llog_get_context(req->rq_export->exp_obd, body->lgd_ctxt_idx);
186         if (ctxt == NULL)
187                 RETURN(-ENODEV);
188
189         rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle,
190                        &body->lgd_logid, NULL, LLOG_OPEN_EXISTS);
191         if (rc)
192                 GOTO(out_ctxt, rc);
193
194         flags = body->lgd_llh_flags;
195         rc = llog_init_handle(req->rq_svc_thread->t_env, loghandle, flags,
196                               NULL);
197         if (rc)
198                 GOTO(out_close, rc);
199
200         repbody = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
201         *repbody = *body;
202
203         ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
204         rc = llog_next_block(req->rq_svc_thread->t_env, loghandle,
205                              &repbody->lgd_saved_index, repbody->lgd_index,
206                              &repbody->lgd_cur_offset, ptr, LLOG_CHUNK_SIZE);
207         if (rc)
208                 GOTO(out_close, rc);
209         EXIT;
210 out_close:
211         llog_origin_close(req->rq_svc_thread->t_env, loghandle);
212 out_ctxt:
213         llog_ctxt_put(ctxt);
214         return rc;
215 }
216
217 int llog_origin_handle_prev_block(struct ptlrpc_request *req)
218 {
219         struct llog_handle      *loghandle;
220         struct llogd_body       *body;
221         struct llogd_body       *repbody;
222         struct llog_ctxt        *ctxt;
223         __u32                    flags;
224         void                    *ptr;
225         int                      rc;
226
227         ENTRY;
228
229         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
230         if (body == NULL)
231                 RETURN(err_serious(-EFAULT));
232
233         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
234                              LLOG_CHUNK_SIZE);
235         rc = req_capsule_server_pack(&req->rq_pill);
236         if (rc)
237                 RETURN(err_serious(-ENOMEM));
238
239         if (body->lgd_ctxt_idx >= LLOG_MAX_CTXTS) {
240                 CDEBUG(D_WARNING, "%s: bad ctxt ID: idx=%d\n",
241                        req->rq_export->exp_obd->obd_name, body->lgd_ctxt_idx);
242                 RETURN(-EPROTO);
243         }
244
245         ctxt = llog_get_context(req->rq_export->exp_obd, body->lgd_ctxt_idx);
246         if (ctxt == NULL)
247                 RETURN(-ENODEV);
248
249         rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle,
250                          &body->lgd_logid, NULL, LLOG_OPEN_EXISTS);
251         if (rc)
252                 GOTO(out_ctxt, rc);
253
254         flags = body->lgd_llh_flags;
255         rc = llog_init_handle(req->rq_svc_thread->t_env, loghandle, flags,
256                               NULL);
257         if (rc)
258                 GOTO(out_close, rc);
259
260         repbody = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
261         *repbody = *body;
262
263         ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
264         rc = llog_prev_block(req->rq_svc_thread->t_env, loghandle,
265                              body->lgd_index, ptr, LLOG_CHUNK_SIZE);
266         if (rc)
267                 GOTO(out_close, rc);
268
269         EXIT;
270 out_close:
271         llog_origin_close(req->rq_svc_thread->t_env, loghandle);
272 out_ctxt:
273         llog_ctxt_put(ctxt);
274         return rc;
275 }
276
277 int llog_origin_handle_read_header(struct ptlrpc_request *req)
278 {
279         struct llog_handle      *loghandle;
280         struct llogd_body       *body;
281         struct llog_log_hdr     *hdr;
282         struct llog_ctxt        *ctxt;
283         __u32                    flags;
284         int                      rc;
285
286         ENTRY;
287
288         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
289         if (body == NULL)
290                 RETURN(err_serious(-EFAULT));
291
292         rc = req_capsule_server_pack(&req->rq_pill);
293         if (rc)
294                 RETURN(err_serious(-ENOMEM));
295
296         if (body->lgd_ctxt_idx >= LLOG_MAX_CTXTS) {
297                 CDEBUG(D_WARNING, "%s: bad ctxt ID: idx=%d\n",
298                        req->rq_export->exp_obd->obd_name, body->lgd_ctxt_idx);
299                 RETURN(-EPROTO);
300         }
301
302         ctxt = llog_get_context(req->rq_export->exp_obd, body->lgd_ctxt_idx);
303         if (ctxt == NULL)
304                 RETURN(-ENODEV);
305
306         rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle,
307                        &body->lgd_logid, NULL, LLOG_OPEN_EXISTS);
308         if (rc)
309                 GOTO(out_ctxt, rc);
310
311         /*
312          * llog_init_handle() reads the llog header
313          */
314         flags = body->lgd_llh_flags;
315         rc = llog_init_handle(req->rq_svc_thread->t_env, loghandle, flags,
316                               NULL);
317         if (rc)
318                 GOTO(out_close, rc);
319         flags = loghandle->lgh_hdr->llh_flags;
320
321         hdr = req_capsule_server_get(&req->rq_pill, &RMF_LLOG_LOG_HDR);
322         *hdr = *loghandle->lgh_hdr;
323         EXIT;
324 out_close:
325         llog_origin_close(req->rq_svc_thread->t_env, loghandle);
326 out_ctxt:
327         llog_ctxt_put(ctxt);
328         return rc;
329 }
330
331 int llog_origin_handle_close(struct ptlrpc_request *req)
332 {
333         int      rc;
334
335         ENTRY;
336
337         rc = req_capsule_server_pack(&req->rq_pill);
338         if (rc)
339                 RETURN(err_serious(-ENOMEM));
340         RETURN(0);
341 }