Whamcloud - gitweb
LU-1866 lfsck: enhance otable-based iteration
[fs/lustre-release.git] / lustre / ptlrpc / llog_server.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ptlrpc/llog_server.c
37  *
38  * remote api for llog - server side
39  *
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LOG
44
45 #ifndef __KERNEL__
46 #include <liblustre.h>
47 #endif
48
49 #include <obd_class.h>
50 #include <lustre_log.h>
51 #include <lustre_net.h>
52 #include <lustre_fsfilt.h>
53
54 #if defined(__KERNEL__) && defined(LUSTRE_LOG_SERVER)
55 static int llog_origin_close(const struct lu_env *env, struct llog_handle *lgh)
56 {
57         if (lgh->lgh_hdr != NULL && lgh->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
58                 return llog_cat_close(env, lgh);
59         else
60                 return llog_close(env, lgh);
61 }
62
63 /* Only open is supported, no new llog can be created remotely */
64 int llog_origin_handle_open(struct ptlrpc_request *req)
65 {
66         struct obd_export       *exp = req->rq_export;
67         struct obd_device       *obd = exp->exp_obd;
68         struct obd_device       *disk_obd;
69         struct lvfs_run_ctxt     saved;
70         struct llog_handle      *loghandle;
71         struct llogd_body       *body;
72         struct llog_logid       *logid = NULL;
73         struct llog_ctxt        *ctxt;
74         char                    *name = NULL;
75         int                      rc;
76
77         ENTRY;
78
79         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
80         if (body == NULL)
81                 RETURN(-EFAULT);
82
83         if (body->lgd_logid.lgl_oid > 0)
84                 logid = &body->lgd_logid;
85
86         if (req_capsule_field_present(&req->rq_pill, &RMF_NAME, RCL_CLIENT)) {
87                 name = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
88                 if (name == NULL)
89                         RETURN(-EFAULT);
90                 CDEBUG(D_INFO, "%s: opening log %s\n", obd->obd_name, name);
91         }
92
93         ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
94         if (ctxt == NULL) {
95                 CDEBUG(D_WARNING, "%s: no ctxt. group=%p idx=%d name=%s\n",
96                        obd->obd_name, &obd->obd_olg, body->lgd_ctxt_idx, name);
97                 RETURN(-ENODEV);
98         }
99         disk_obd = ctxt->loc_exp->exp_obd;
100         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
101
102         rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle, logid,
103                        name, LLOG_OPEN_EXISTS);
104         if (rc)
105                 GOTO(out_pop, rc);
106
107         rc = req_capsule_server_pack(&req->rq_pill);
108         if (rc)
109                 GOTO(out_close, rc = -ENOMEM);
110
111         body = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
112         body->lgd_logid = loghandle->lgh_id;
113
114         EXIT;
115 out_close:
116         llog_origin_close(req->rq_svc_thread->t_env, loghandle);
117 out_pop:
118         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
119         llog_ctxt_put(ctxt);
120         return rc;
121 }
122 EXPORT_SYMBOL(llog_origin_handle_open);
123
124 int llog_origin_handle_destroy(struct ptlrpc_request *req)
125 {
126         struct obd_device       *disk_obd;
127         struct lvfs_run_ctxt     saved;
128         struct llogd_body       *body;
129         struct llog_logid       *logid = NULL;
130         struct llog_ctxt        *ctxt;
131         int                      rc;
132
133         ENTRY;
134
135         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
136         if (body == NULL)
137                 RETURN(-EFAULT);
138
139         if (body->lgd_logid.lgl_oid > 0)
140                 logid = &body->lgd_logid;
141
142         if (!(body->lgd_llh_flags & LLOG_F_IS_PLAIN))
143                 CERROR("%s: wrong llog flags %x\n",
144                        req->rq_export->exp_obd->obd_name, body->lgd_llh_flags);
145
146         ctxt = llog_get_context(req->rq_export->exp_obd, body->lgd_ctxt_idx);
147         if (ctxt == NULL)
148                 RETURN(-ENODEV);
149
150         disk_obd = ctxt->loc_exp->exp_obd;
151         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
152
153         rc = req_capsule_server_pack(&req->rq_pill);
154         /* erase only if no error and logid is valid */
155         if (rc == 0)
156                 rc = llog_erase(req->rq_svc_thread->t_env, ctxt, logid, NULL);
157         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
158         llog_ctxt_put(ctxt);
159         RETURN(rc);
160 }
161 EXPORT_SYMBOL(llog_origin_handle_destroy);
162
163 int llog_origin_handle_next_block(struct ptlrpc_request *req)
164 {
165         struct obd_device   *disk_obd;
166         struct llog_handle  *loghandle;
167         struct llogd_body   *body;
168         struct llogd_body   *repbody;
169         struct lvfs_run_ctxt saved;
170         struct llog_ctxt    *ctxt;
171         __u32                flags;
172         void                *ptr;
173         int                  rc;
174
175         ENTRY;
176
177         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
178         if (body == NULL)
179                 RETURN(-EFAULT);
180
181         ctxt = llog_get_context(req->rq_export->exp_obd, body->lgd_ctxt_idx);
182         if (ctxt == NULL)
183                 RETURN(-ENODEV);
184
185         disk_obd = ctxt->loc_exp->exp_obd;
186         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
187
188         rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle,
189                        &body->lgd_logid, NULL, LLOG_OPEN_EXISTS);
190         if (rc)
191                 GOTO(out_pop, rc);
192
193         flags = body->lgd_llh_flags;
194         rc = llog_init_handle(req->rq_svc_thread->t_env, loghandle, flags,
195                               NULL);
196         if (rc)
197                 GOTO(out_close, rc);
198
199         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
200                              LLOG_CHUNK_SIZE);
201         rc = req_capsule_server_pack(&req->rq_pill);
202         if (rc)
203                 GOTO(out_close, rc = -ENOMEM);
204
205         repbody = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
206         *repbody = *body;
207
208         ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
209         rc = llog_next_block(req->rq_svc_thread->t_env, loghandle,
210                              &repbody->lgd_saved_index, repbody->lgd_index,
211                              &repbody->lgd_cur_offset, ptr, LLOG_CHUNK_SIZE);
212         if (rc)
213                 GOTO(out_close, rc);
214         EXIT;
215 out_close:
216         llog_origin_close(req->rq_svc_thread->t_env, loghandle);
217 out_pop:
218         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
219         llog_ctxt_put(ctxt);
220         return rc;
221 }
222 EXPORT_SYMBOL(llog_origin_handle_next_block);
223
224 int llog_origin_handle_prev_block(struct ptlrpc_request *req)
225 {
226         struct llog_handle   *loghandle;
227         struct llogd_body    *body;
228         struct llogd_body    *repbody;
229         struct obd_device    *disk_obd;
230         struct lvfs_run_ctxt  saved;
231         struct llog_ctxt     *ctxt;
232         __u32                 flags;
233         void                 *ptr;
234         int                   rc;
235
236         ENTRY;
237
238         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
239         if (body == NULL)
240                 RETURN(-EFAULT);
241
242         ctxt = llog_get_context(req->rq_export->exp_obd, body->lgd_ctxt_idx);
243         if (ctxt == NULL)
244                 RETURN(-ENODEV);
245
246         disk_obd = ctxt->loc_exp->exp_obd;
247         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
248
249         rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle,
250                          &body->lgd_logid, NULL, LLOG_OPEN_EXISTS);
251         if (rc)
252                 GOTO(out_pop, rc);
253
254         flags = body->lgd_llh_flags;
255         rc = llog_init_handle(req->rq_svc_thread->t_env, loghandle, flags,
256                               NULL);
257         if (rc)
258                 GOTO(out_close, rc);
259
260         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
261                              LLOG_CHUNK_SIZE);
262         rc = req_capsule_server_pack(&req->rq_pill);
263         if (rc)
264                 GOTO(out_close, rc = -ENOMEM);
265
266         repbody = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
267         *repbody = *body;
268
269         ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
270         rc = llog_prev_block(req->rq_svc_thread->t_env, loghandle,
271                              body->lgd_index, ptr, LLOG_CHUNK_SIZE);
272         if (rc)
273                 GOTO(out_close, rc);
274
275         EXIT;
276 out_close:
277         llog_origin_close(req->rq_svc_thread->t_env, loghandle);
278 out_pop:
279         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
280         llog_ctxt_put(ctxt);
281         return rc;
282 }
283 EXPORT_SYMBOL(llog_origin_handle_prev_block);
284
285 int llog_origin_handle_read_header(struct ptlrpc_request *req)
286 {
287         struct obd_device    *disk_obd;
288         struct llog_handle   *loghandle;
289         struct llogd_body    *body;
290         struct llog_log_hdr  *hdr;
291         struct lvfs_run_ctxt  saved;
292         struct llog_ctxt     *ctxt;
293         __u32                 flags;
294         int                   rc;
295
296         ENTRY;
297
298         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
299         if (body == NULL)
300                 RETURN(-EFAULT);
301
302         ctxt = llog_get_context(req->rq_export->exp_obd, body->lgd_ctxt_idx);
303         if (ctxt == NULL)
304                 RETURN(-ENODEV);
305
306         disk_obd = ctxt->loc_exp->exp_obd;
307         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
308
309         rc = llog_open(req->rq_svc_thread->t_env, ctxt, &loghandle,
310                        &body->lgd_logid, NULL, LLOG_OPEN_EXISTS);
311         if (rc)
312                 GOTO(out_pop, rc);
313
314         /*
315          * llog_init_handle() reads the llog header
316          */
317         flags = body->lgd_llh_flags;
318         rc = llog_init_handle(req->rq_svc_thread->t_env, loghandle, flags,
319                               NULL);
320         if (rc)
321                 GOTO(out_close, rc);
322         flags = loghandle->lgh_hdr->llh_flags;
323
324         rc = req_capsule_server_pack(&req->rq_pill);
325         if (rc)
326                 GOTO(out_close, rc = -ENOMEM);
327
328         hdr = req_capsule_server_get(&req->rq_pill, &RMF_LLOG_LOG_HDR);
329         *hdr = *loghandle->lgh_hdr;
330         EXIT;
331 out_close:
332         llog_origin_close(req->rq_svc_thread->t_env, loghandle);
333 out_pop:
334         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
335         llog_ctxt_put(ctxt);
336         return rc;
337 }
338 EXPORT_SYMBOL(llog_origin_handle_read_header);
339
340 int llog_origin_handle_close(struct ptlrpc_request *req)
341 {
342         ENTRY;
343         /* Nothing to do */
344         RETURN(0);
345 }
346 EXPORT_SYMBOL(llog_origin_handle_close);
347
348 int llog_origin_handle_cancel(struct ptlrpc_request *req)
349 {
350         int num_cookies, rc = 0, err, i, failed = 0;
351         struct obd_device *disk_obd;
352         struct llog_cookie *logcookies;
353         struct llog_ctxt *ctxt = NULL;
354         struct lvfs_run_ctxt saved;
355         struct llog_handle *cathandle;
356         struct inode *inode;
357         void *handle;
358         ENTRY;
359
360         logcookies = req_capsule_client_get(&req->rq_pill, &RMF_LOGCOOKIES);
361         num_cookies = req_capsule_get_size(&req->rq_pill, &RMF_LOGCOOKIES,
362                                            RCL_CLIENT) / sizeof(*logcookies);
363         if (logcookies == NULL || num_cookies == 0) {
364                 DEBUG_REQ(D_HA, req, "No llog cookies sent");
365                 RETURN(-EFAULT);
366         }
367
368         ctxt = llog_get_context(req->rq_export->exp_obd,
369                                 logcookies->lgc_subsys);
370         if (ctxt == NULL)
371                 RETURN(-ENODEV);
372
373         disk_obd = ctxt->loc_exp->exp_obd;
374         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
375         for (i = 0; i < num_cookies; i++, logcookies++) {
376                 cathandle = ctxt->loc_handle;
377                 LASSERT(cathandle != NULL);
378                 inode = cathandle->lgh_file->f_dentry->d_inode;
379
380                 handle = fsfilt_start_log(disk_obd, inode,
381                                           FSFILT_OP_CANCEL_UNLINK, NULL, 1);
382                 if (IS_ERR(handle)) {
383                         CERROR("fsfilt_start_log() failed: %ld\n",
384                                PTR_ERR(handle));
385                         GOTO(pop_ctxt, rc = PTR_ERR(handle));
386                 }
387
388                 rc = llog_cat_cancel_records(req->rq_svc_thread->t_env,
389                                              cathandle, 1, logcookies);
390
391                 /*
392                  * Do not raise -ENOENT errors for resent rpcs. This rec already
393                  * might be killed.
394                  */
395                 if (rc == -ENOENT &&
396                     (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)) {
397                         /*
398                          * Do not change this message, reply-single.sh test_59b
399                          * expects to find this in log.
400                          */
401                         CDEBUG(D_RPCTRACE, "RESENT cancel req %p - ignored\n",
402                                req);
403                         rc = 0;
404                 } else if (rc == 0) {
405                         CDEBUG(D_RPCTRACE, "Canceled %d llog-records\n",
406                                num_cookies);
407                 }
408
409                 err = fsfilt_commit(disk_obd, inode, handle, 0);
410                 if (err) {
411                         CERROR("Error committing transaction: %d\n", err);
412                         if (!rc)
413                                 rc = err;
414                         failed++;
415                         GOTO(pop_ctxt, rc);
416                 } else if (rc)
417                         failed++;
418         }
419         GOTO(pop_ctxt, rc);
420 pop_ctxt:
421         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
422         if (rc)
423                 CERROR("Cancel %d of %d llog-records failed: %d\n",
424                        failed, num_cookies, rc);
425
426         llog_ctxt_put(ctxt);
427         return rc;
428 }
429 EXPORT_SYMBOL(llog_origin_handle_cancel);
430
431 #else /* !__KERNEL__ */
432 int llog_origin_handle_open(struct ptlrpc_request *req)
433 {
434         LBUG();
435         return 0;
436 }
437
438 int llog_origin_handle_destroy(struct ptlrpc_request *req)
439 {
440         LBUG();
441         return 0;
442 }
443
444 int llog_origin_handle_next_block(struct ptlrpc_request *req)
445 {
446         LBUG();
447         return 0;
448 }
449 int llog_origin_handle_prev_block(struct ptlrpc_request *req)
450 {
451         LBUG();
452         return 0;
453 }
454 int llog_origin_handle_read_header(struct ptlrpc_request *req)
455 {
456         LBUG();
457         return 0;
458 }
459 int llog_origin_handle_close(struct ptlrpc_request *req)
460 {
461         LBUG();
462         return 0;
463 }
464 int llog_origin_handle_cancel(struct ptlrpc_request *req)
465 {
466         LBUG();
467         return 0;
468 }
469 #endif