Whamcloud - gitweb
LU-14291 ptlrpc: format UPDATE messages in server-only code
[fs/lustre-release.git] / lustre / ptlrpc / llog_server.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/ptlrpc/llog_server.c
33  *
34  * remote api for llog - server side
35  *
36  * Author: Andreas Dilger <adilger@clusterfs.com>
37  */
38
39 #define DEBUG_SUBSYSTEM S_LOG
40
41 #include <obd_class.h>
42 #include <lu_target.h>
43 #include <lustre_log.h>
44 #include <lustre_net.h>
45
46 static int llog_origin_close(const struct lu_env *env, struct llog_handle *lgh)
47 {
48         if (lgh->lgh_hdr != NULL && lgh->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
49                 return llog_cat_close(env, lgh);
50         else
51                 return llog_close(env, lgh);
52 }
53
54 /* Only open is supported, no new llog can be created remotely */
55 int llog_origin_handle_open(struct ptlrpc_request *req)
56 {
57         struct obd_export       *exp = req->rq_export;
58         struct obd_device       *obd = exp->exp_obd;
59         struct llog_handle      *loghandle;
60         struct llogd_body       *body;
61         struct llog_logid       *logid = NULL;
62         struct llog_ctxt        *ctxt;
63         char                    *name = NULL;
64         int                      rc;
65
66         ENTRY;
67
68         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
69         if (body == NULL)
70                 RETURN(err_serious(-EFAULT));
71
72         rc = req_capsule_server_pack(&req->rq_pill);
73         if (rc)
74                 RETURN(err_serious(-ENOMEM));
75
76         if (ostid_id(&body->lgd_logid.lgl_oi) > 0)
77                 logid = &body->lgd_logid;
78
79         if (req_capsule_field_present(&req->rq_pill, &RMF_NAME, RCL_CLIENT)) {
80                 name = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
81                 if (name == NULL)
82                         RETURN(-EFAULT);
83                 CDEBUG(D_INFO, "%s: opening log %s\n", obd->obd_name, name);
84         }
85
86         if (body->lgd_ctxt_idx >= LLOG_MAX_CTXTS) {
87                 CDEBUG(D_WARNING, "%s: bad ctxt ID: idx=%d name=%s\n",
88                        obd->obd_name, body->lgd_ctxt_idx, name);
89                 RETURN(-EPROTO);
90         }
91
92         ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
93         if (ctxt == NULL) {
94                 CDEBUG(D_WARNING, "%s: no ctxt. group=%p idx=%d name=%s\n",
95                        obd->obd_name, &obd->obd_olg, body->lgd_ctxt_idx, name);
96                 RETURN(-ENODEV);
97         }
98
99         rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle, logid,
100                        name, LLOG_OPEN_EXISTS);
101         if (rc)
102                 GOTO(out_ctxt, rc);
103
104         body = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
105         body->lgd_logid = loghandle->lgh_id;
106
107         llog_origin_close(req->rq_svc_thread->t_env, loghandle);
108         EXIT;
109 out_ctxt:
110         llog_ctxt_put(ctxt);
111         return rc;
112 }
113
114 int llog_origin_handle_next_block(struct ptlrpc_request *req)
115 {
116         struct llog_handle      *loghandle;
117         struct llogd_body       *body;
118         struct llogd_body       *repbody;
119         struct llog_ctxt        *ctxt;
120         __u32                    flags;
121         void                    *ptr;
122         int                      rc;
123
124         ENTRY;
125
126         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
127         if (body == NULL)
128                 RETURN(err_serious(-EFAULT));
129
130         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
131                              LLOG_MIN_CHUNK_SIZE);
132         rc = req_capsule_server_pack(&req->rq_pill);
133         if (rc)
134                 RETURN(err_serious(-ENOMEM));
135
136         if (body->lgd_ctxt_idx >= LLOG_MAX_CTXTS) {
137                 CDEBUG(D_WARNING, "%s: bad ctxt ID: idx=%d\n",
138                        req->rq_export->exp_obd->obd_name, body->lgd_ctxt_idx);
139                 RETURN(-EPROTO);
140         }
141
142         ctxt = llog_get_context(req->rq_export->exp_obd, body->lgd_ctxt_idx);
143         if (ctxt == NULL)
144                 RETURN(-ENODEV);
145         if (OBD_FAIL_PRECHECK(OBD_FAIL_MDS_LLOG_UMOUNT_RACE))
146                 cfs_fail_val = 1;
147
148         rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle,
149                        &body->lgd_logid, NULL, LLOG_OPEN_EXISTS);
150         if (rc)
151                 GOTO(out_ctxt, rc);
152
153         flags = body->lgd_llh_flags;
154         rc = llog_init_handle(req->rq_svc_thread->t_env, loghandle, flags,
155                               NULL);
156         if (rc)
157                 GOTO(out_close, rc);
158
159         repbody = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
160         *repbody = *body;
161
162         ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
163         rc = llog_next_block(req->rq_svc_thread->t_env, loghandle,
164                              &repbody->lgd_saved_index, repbody->lgd_index,
165                              &repbody->lgd_cur_offset, ptr,
166                              LLOG_MIN_CHUNK_SIZE);
167         if (rc)
168                 GOTO(out_close, rc);
169         EXIT;
170 out_close:
171         llog_origin_close(req->rq_svc_thread->t_env, loghandle);
172 out_ctxt:
173         llog_ctxt_put(ctxt);
174         return rc;
175 }
176
177 int llog_origin_handle_prev_block(struct ptlrpc_request *req)
178 {
179         struct llog_handle      *loghandle;
180         struct llogd_body       *body;
181         struct llogd_body       *repbody;
182         struct llog_ctxt        *ctxt;
183         __u32                    flags;
184         void                    *ptr;
185         int                      rc;
186
187         ENTRY;
188
189         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
190         if (body == NULL)
191                 RETURN(err_serious(-EFAULT));
192
193         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
194                              LLOG_MIN_CHUNK_SIZE);
195         rc = req_capsule_server_pack(&req->rq_pill);
196         if (rc)
197                 RETURN(err_serious(-ENOMEM));
198
199         if (body->lgd_ctxt_idx >= LLOG_MAX_CTXTS) {
200                 CDEBUG(D_WARNING, "%s: bad ctxt ID: idx=%d\n",
201                        req->rq_export->exp_obd->obd_name, body->lgd_ctxt_idx);
202                 RETURN(-EPROTO);
203         }
204
205         ctxt = llog_get_context(req->rq_export->exp_obd, body->lgd_ctxt_idx);
206         if (ctxt == NULL)
207                 RETURN(-ENODEV);
208
209         rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle,
210                          &body->lgd_logid, NULL, LLOG_OPEN_EXISTS);
211         if (rc)
212                 GOTO(out_ctxt, rc);
213
214         flags = body->lgd_llh_flags;
215         rc = llog_init_handle(req->rq_svc_thread->t_env, loghandle, flags,
216                               NULL);
217         if (rc)
218                 GOTO(out_close, rc);
219
220         repbody = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
221         *repbody = *body;
222
223         ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
224         rc = llog_prev_block(req->rq_svc_thread->t_env, loghandle,
225                              body->lgd_index, ptr, LLOG_MIN_CHUNK_SIZE);
226         if (rc)
227                 GOTO(out_close, rc);
228
229         EXIT;
230 out_close:
231         llog_origin_close(req->rq_svc_thread->t_env, loghandle);
232 out_ctxt:
233         llog_ctxt_put(ctxt);
234         return rc;
235 }
236
237 int llog_origin_handle_read_header(struct ptlrpc_request *req)
238 {
239         struct llog_handle      *loghandle;
240         struct llogd_body       *body;
241         struct llog_log_hdr     *hdr;
242         struct llog_ctxt        *ctxt;
243         __u32                    flags;
244         int                      rc;
245
246         ENTRY;
247
248         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
249         if (body == NULL)
250                 RETURN(err_serious(-EFAULT));
251
252         rc = req_capsule_server_pack(&req->rq_pill);
253         if (rc)
254                 RETURN(err_serious(-ENOMEM));
255
256         if (body->lgd_ctxt_idx >= LLOG_MAX_CTXTS) {
257                 CDEBUG(D_WARNING, "%s: bad ctxt ID: idx=%d\n",
258                        req->rq_export->exp_obd->obd_name, body->lgd_ctxt_idx);
259                 RETURN(-EPROTO);
260         }
261
262         ctxt = llog_get_context(req->rq_export->exp_obd, body->lgd_ctxt_idx);
263         if (ctxt == NULL)
264                 RETURN(-ENODEV);
265
266         rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle,
267                        &body->lgd_logid, NULL, LLOG_OPEN_EXISTS);
268         if (rc)
269                 GOTO(out_ctxt, rc);
270
271         /*
272          * llog_init_handle() reads the llog header
273          */
274         flags = body->lgd_llh_flags;
275         rc = llog_init_handle(req->rq_svc_thread->t_env, loghandle, flags,
276                               NULL);
277         if (rc)
278                 GOTO(out_close, rc);
279         flags = loghandle->lgh_hdr->llh_flags;
280
281         hdr = req_capsule_server_get(&req->rq_pill, &RMF_LLOG_LOG_HDR);
282         *hdr = *loghandle->lgh_hdr;
283         EXIT;
284 out_close:
285         llog_origin_close(req->rq_svc_thread->t_env, loghandle);
286 out_ctxt:
287         llog_ctxt_put(ctxt);
288         return rc;
289 }