Whamcloud - gitweb
LU-13004 ptlrpc: Allow BULK_BUF_KIOV to accept a kvec
[fs/lustre-release.git] / lustre / ptlrpc / llog_server.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/ptlrpc/llog_server.c
33  *
34  * remote api for llog - server side
35  *
36  * Author: Andreas Dilger <adilger@clusterfs.com>
37  */
38
39 #define DEBUG_SUBSYSTEM S_LOG
40
41 #include <obd_class.h>
42 #include <lu_target.h>
43 #include <lustre_log.h>
44 #include <lustre_net.h>
45
46 static int llog_origin_close(const struct lu_env *env, struct llog_handle *lgh)
47 {
48         if (lgh->lgh_hdr != NULL && lgh->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
49                 return llog_cat_close(env, lgh);
50         else
51                 return llog_close(env, lgh);
52 }
53
54 /* Only open is supported, no new llog can be created remotely */
55 int llog_origin_handle_open(struct ptlrpc_request *req)
56 {
57         struct obd_export       *exp = req->rq_export;
58         struct obd_device       *obd = exp->exp_obd;
59         struct llog_handle      *loghandle;
60         struct llogd_body       *body;
61         struct llog_logid       *logid = NULL;
62         struct llog_ctxt        *ctxt;
63         char                    *name = NULL;
64         int                      rc;
65
66         ENTRY;
67
68         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
69         if (body == NULL)
70                 RETURN(err_serious(-EFAULT));
71
72         rc = req_capsule_server_pack(&req->rq_pill);
73         if (rc)
74                 RETURN(err_serious(-ENOMEM));
75
76         if (ostid_id(&body->lgd_logid.lgl_oi) > 0)
77                 logid = &body->lgd_logid;
78
79         if (req_capsule_field_present(&req->rq_pill, &RMF_NAME, RCL_CLIENT)) {
80                 name = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
81                 if (name == NULL)
82                         RETURN(-EFAULT);
83                 CDEBUG(D_INFO, "%s: opening log %s\n", obd->obd_name, name);
84         }
85
86         if (body->lgd_ctxt_idx >= LLOG_MAX_CTXTS) {
87                 CDEBUG(D_WARNING, "%s: bad ctxt ID: idx=%d name=%s\n",
88                        obd->obd_name, body->lgd_ctxt_idx, name);
89                 RETURN(-EPROTO);
90         }
91
92         ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
93         if (ctxt == NULL) {
94                 CDEBUG(D_WARNING, "%s: no ctxt. group=%p idx=%d name=%s\n",
95                        obd->obd_name, &obd->obd_olg, body->lgd_ctxt_idx, name);
96                 RETURN(-ENODEV);
97         }
98
99         rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle, logid,
100                        name, LLOG_OPEN_EXISTS);
101         if (rc)
102                 GOTO(out_ctxt, rc);
103
104         body = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
105         body->lgd_logid = loghandle->lgh_id;
106
107         llog_origin_close(req->rq_svc_thread->t_env, loghandle);
108         EXIT;
109 out_ctxt:
110         llog_ctxt_put(ctxt);
111         return rc;
112 }
113
114 int llog_origin_handle_next_block(struct ptlrpc_request *req)
115 {
116         struct llog_handle      *loghandle;
117         struct llogd_body       *body;
118         struct llogd_body       *repbody;
119         struct llog_ctxt        *ctxt;
120         __u32                    flags;
121         void                    *ptr;
122         int                      rc;
123
124         ENTRY;
125
126         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
127         if (body == NULL)
128                 RETURN(err_serious(-EFAULT));
129
130         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
131                              LLOG_MIN_CHUNK_SIZE);
132         rc = req_capsule_server_pack(&req->rq_pill);
133         if (rc)
134                 RETURN(err_serious(-ENOMEM));
135
136         if (body->lgd_ctxt_idx >= LLOG_MAX_CTXTS) {
137                 CDEBUG(D_WARNING, "%s: bad ctxt ID: idx=%d\n",
138                        req->rq_export->exp_obd->obd_name, body->lgd_ctxt_idx);
139                 RETURN(-EPROTO);
140         }
141
142         ctxt = llog_get_context(req->rq_export->exp_obd, body->lgd_ctxt_idx);
143         if (ctxt == NULL)
144                 RETURN(-ENODEV);
145
146         rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle,
147                        &body->lgd_logid, NULL, LLOG_OPEN_EXISTS);
148         if (rc)
149                 GOTO(out_ctxt, rc);
150
151         flags = body->lgd_llh_flags;
152         rc = llog_init_handle(req->rq_svc_thread->t_env, loghandle, flags,
153                               NULL);
154         if (rc)
155                 GOTO(out_close, rc);
156
157         repbody = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
158         *repbody = *body;
159
160         ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
161         rc = llog_next_block(req->rq_svc_thread->t_env, loghandle,
162                              &repbody->lgd_saved_index, repbody->lgd_index,
163                              &repbody->lgd_cur_offset, ptr,
164                              LLOG_MIN_CHUNK_SIZE);
165         if (rc)
166                 GOTO(out_close, rc);
167         EXIT;
168 out_close:
169         llog_origin_close(req->rq_svc_thread->t_env, loghandle);
170 out_ctxt:
171         llog_ctxt_put(ctxt);
172         return rc;
173 }
174
175 int llog_origin_handle_prev_block(struct ptlrpc_request *req)
176 {
177         struct llog_handle      *loghandle;
178         struct llogd_body       *body;
179         struct llogd_body       *repbody;
180         struct llog_ctxt        *ctxt;
181         __u32                    flags;
182         void                    *ptr;
183         int                      rc;
184
185         ENTRY;
186
187         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
188         if (body == NULL)
189                 RETURN(err_serious(-EFAULT));
190
191         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
192                              LLOG_MIN_CHUNK_SIZE);
193         rc = req_capsule_server_pack(&req->rq_pill);
194         if (rc)
195                 RETURN(err_serious(-ENOMEM));
196
197         if (body->lgd_ctxt_idx >= LLOG_MAX_CTXTS) {
198                 CDEBUG(D_WARNING, "%s: bad ctxt ID: idx=%d\n",
199                        req->rq_export->exp_obd->obd_name, body->lgd_ctxt_idx);
200                 RETURN(-EPROTO);
201         }
202
203         ctxt = llog_get_context(req->rq_export->exp_obd, body->lgd_ctxt_idx);
204         if (ctxt == NULL)
205                 RETURN(-ENODEV);
206
207         rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle,
208                          &body->lgd_logid, NULL, LLOG_OPEN_EXISTS);
209         if (rc)
210                 GOTO(out_ctxt, rc);
211
212         flags = body->lgd_llh_flags;
213         rc = llog_init_handle(req->rq_svc_thread->t_env, loghandle, flags,
214                               NULL);
215         if (rc)
216                 GOTO(out_close, rc);
217
218         repbody = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
219         *repbody = *body;
220
221         ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
222         rc = llog_prev_block(req->rq_svc_thread->t_env, loghandle,
223                              body->lgd_index, ptr, LLOG_MIN_CHUNK_SIZE);
224         if (rc)
225                 GOTO(out_close, rc);
226
227         EXIT;
228 out_close:
229         llog_origin_close(req->rq_svc_thread->t_env, loghandle);
230 out_ctxt:
231         llog_ctxt_put(ctxt);
232         return rc;
233 }
234
235 int llog_origin_handle_read_header(struct ptlrpc_request *req)
236 {
237         struct llog_handle      *loghandle;
238         struct llogd_body       *body;
239         struct llog_log_hdr     *hdr;
240         struct llog_ctxt        *ctxt;
241         __u32                    flags;
242         int                      rc;
243
244         ENTRY;
245
246         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
247         if (body == NULL)
248                 RETURN(err_serious(-EFAULT));
249
250         rc = req_capsule_server_pack(&req->rq_pill);
251         if (rc)
252                 RETURN(err_serious(-ENOMEM));
253
254         if (body->lgd_ctxt_idx >= LLOG_MAX_CTXTS) {
255                 CDEBUG(D_WARNING, "%s: bad ctxt ID: idx=%d\n",
256                        req->rq_export->exp_obd->obd_name, body->lgd_ctxt_idx);
257                 RETURN(-EPROTO);
258         }
259
260         ctxt = llog_get_context(req->rq_export->exp_obd, body->lgd_ctxt_idx);
261         if (ctxt == NULL)
262                 RETURN(-ENODEV);
263
264         rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle,
265                        &body->lgd_logid, NULL, LLOG_OPEN_EXISTS);
266         if (rc)
267                 GOTO(out_ctxt, rc);
268
269         /*
270          * llog_init_handle() reads the llog header
271          */
272         flags = body->lgd_llh_flags;
273         rc = llog_init_handle(req->rq_svc_thread->t_env, loghandle, flags,
274                               NULL);
275         if (rc)
276                 GOTO(out_close, rc);
277         flags = loghandle->lgh_hdr->llh_flags;
278
279         hdr = req_capsule_server_get(&req->rq_pill, &RMF_LLOG_LOG_HDR);
280         *hdr = *loghandle->lgh_hdr;
281         EXIT;
282 out_close:
283         llog_origin_close(req->rq_svc_thread->t_env, loghandle);
284 out_ctxt:
285         llog_ctxt_put(ctxt);
286         return rc;
287 }