Whamcloud - gitweb
LU-1302 llog: pass lu_env to the llog callback
[fs/lustre-release.git] / lustre / ptlrpc / llog_server.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ptlrpc/llog_server.c
37  *
38  * remote api for llog - server side
39  *
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LOG
44
45 #ifndef __KERNEL__
46 #include <liblustre.h>
47 #endif
48
49 #include <obd_class.h>
50 #include <lustre_log.h>
51 #include <lustre_net.h>
52 #include <libcfs/list.h>
53 #include <lustre_fsfilt.h>
54
55 #if defined(__KERNEL__) && defined(LUSTRE_LOG_SERVER)
56
57 int llog_origin_handle_create(struct ptlrpc_request *req)
58 {
59         struct obd_export    *exp = req->rq_export;
60         struct obd_device    *obd = exp->exp_obd;
61         struct obd_device    *disk_obd;
62         struct llog_handle   *loghandle;
63         struct llogd_body    *body;
64         struct lvfs_run_ctxt  saved;
65         struct llog_logid    *logid = NULL;
66         struct llog_ctxt     *ctxt;
67         char                 *name = NULL;
68         int                   rc, rc2;
69         ENTRY;
70
71         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
72         if (body == NULL)
73                 RETURN(-EFAULT);
74
75         if (body->lgd_logid.lgl_oid > 0)
76                 logid = &body->lgd_logid;
77
78         if (req_capsule_field_present(&req->rq_pill, &RMF_NAME, RCL_CLIENT)) {
79                 name = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
80                 if (name == NULL)
81                         RETURN(-EFAULT);
82                 CDEBUG(D_INFO, "%s: opening log %s\n", obd->obd_name, name);
83         }
84
85         ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
86         if (ctxt == NULL) {
87                 CDEBUG(D_WARNING, "%s: no ctxt. group=%p idx=%d name=%s\n",
88                        obd->obd_name, &obd->obd_olg, body->lgd_ctxt_idx, name);
89                 RETURN(-ENODEV);
90         }
91         disk_obd = ctxt->loc_exp->exp_obd;
92         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
93
94         rc = llog_create(ctxt, &loghandle, logid, name);
95         if (rc)
96                 GOTO(out_pop, rc);
97
98         rc = req_capsule_server_pack(&req->rq_pill);
99         if (rc)
100                 GOTO(out_close, rc = -ENOMEM);
101
102         body = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
103         body->lgd_logid = loghandle->lgh_id;
104
105         GOTO(out_close, rc);
106 out_close:
107         rc2 = llog_close(loghandle);
108         if (!rc)
109                 rc = rc2;
110 out_pop:
111         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
112         llog_ctxt_put(ctxt);
113         return rc;
114 }
115 EXPORT_SYMBOL(llog_origin_handle_create);
116
117 int llog_origin_handle_destroy(struct ptlrpc_request *req)
118 {
119         struct obd_export    *exp = req->rq_export;
120         struct obd_device    *obd = exp->exp_obd;
121         struct obd_device    *disk_obd;
122         struct llog_handle   *loghandle;
123         struct llogd_body    *body;
124         struct lvfs_run_ctxt  saved;
125         struct llog_logid    *logid = NULL;
126         struct llog_ctxt     *ctxt;
127         int                   rc;
128         ENTRY;
129
130         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
131         if (body == NULL)
132                 RETURN(-EFAULT);
133
134         if (body->lgd_logid.lgl_oid > 0)
135                 logid = &body->lgd_logid;
136
137         ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
138         if (ctxt == NULL)
139                 RETURN(-ENODEV);
140
141         disk_obd = ctxt->loc_exp->exp_obd;
142         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
143
144         rc = llog_create(ctxt, &loghandle, logid, NULL);
145         if (rc)
146                 GOTO(out_pop, rc);
147
148         rc = req_capsule_server_pack(&req->rq_pill);
149         if (rc)
150                 GOTO(out_close, rc = -ENOMEM);
151
152         body = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
153         body->lgd_logid = loghandle->lgh_id;
154         rc = llog_init_handle(loghandle, LLOG_F_IS_PLAIN, NULL);
155         if (rc)
156                 GOTO(out_close, rc);
157         rc = llog_destroy(loghandle);
158         if (rc)
159                 GOTO(out_close, rc);
160         llog_free_handle(loghandle);
161         GOTO(out_close, rc);
162 out_close:
163         if (rc)
164                 llog_close(loghandle);
165 out_pop:
166         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
167         llog_ctxt_put(ctxt);
168         return rc;
169 }
170 EXPORT_SYMBOL(llog_origin_handle_destroy);
171
172 int llog_origin_handle_next_block(struct ptlrpc_request *req)
173 {
174         struct obd_export   *exp = req->rq_export;
175         struct obd_device   *obd = exp->exp_obd;
176         struct obd_device   *disk_obd;
177         struct llog_handle  *loghandle;
178         struct llogd_body   *body;
179         struct llogd_body   *repbody;
180         struct lvfs_run_ctxt saved;
181         struct llog_ctxt    *ctxt;
182         __u32                flags;
183         __u8                *buf;
184         void                *ptr;
185         int                  rc, rc2;
186         ENTRY;
187
188         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
189         if (body == NULL)
190                 RETURN(-EFAULT);
191
192         OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
193         if (!buf)
194                 RETURN(-ENOMEM);
195
196         ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
197         if (ctxt == NULL)
198                 GOTO(out_free, rc = -ENODEV);
199         disk_obd = ctxt->loc_exp->exp_obd;
200         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
201
202         rc = llog_create(ctxt, &loghandle, &body->lgd_logid, NULL);
203         if (rc)
204                 GOTO(out_pop, rc);
205
206         flags = body->lgd_llh_flags;
207         rc = llog_init_handle(loghandle, flags, NULL);
208         if (rc)
209                 GOTO(out_close, rc);
210
211         memset(buf, 0, LLOG_CHUNK_SIZE);
212         rc = llog_next_block(loghandle, &body->lgd_saved_index,
213                              body->lgd_index,
214                              &body->lgd_cur_offset, buf, LLOG_CHUNK_SIZE);
215         if (rc)
216                 GOTO(out_close, rc);
217
218         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
219                              LLOG_CHUNK_SIZE);
220         rc = req_capsule_server_pack(&req->rq_pill);
221         if (rc)
222                 GOTO(out_close, rc = -ENOMEM);
223
224         repbody = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
225         *repbody = *body;
226
227         ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
228         memcpy(ptr, buf, LLOG_CHUNK_SIZE);
229         GOTO(out_close, rc);
230 out_close:
231         rc2 = llog_close(loghandle);
232         if (!rc)
233                 rc = rc2;
234 out_pop:
235         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
236         llog_ctxt_put(ctxt);
237 out_free:
238         OBD_FREE(buf, LLOG_CHUNK_SIZE);
239         return rc;
240 }
241 EXPORT_SYMBOL(llog_origin_handle_next_block);
242
243 int llog_origin_handle_prev_block(struct ptlrpc_request *req)
244 {
245         struct obd_export    *exp = req->rq_export;
246         struct obd_device    *obd = exp->exp_obd;
247         struct llog_handle   *loghandle;
248         struct llogd_body    *body;
249         struct llogd_body    *repbody;
250         struct obd_device    *disk_obd;
251         struct lvfs_run_ctxt  saved;
252         struct llog_ctxt     *ctxt;
253         __u32                 flags;
254         __u8                 *buf;
255         void                 *ptr;
256         int                   rc, rc2;
257         ENTRY;
258
259         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
260         if (body == NULL)
261                 RETURN(-EFAULT);
262
263         OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
264         if (!buf)
265                 RETURN(-ENOMEM);
266
267         ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
268         if (ctxt == NULL)
269                 GOTO(out_free, rc = -ENODEV);
270
271         disk_obd = ctxt->loc_exp->exp_obd;
272         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
273
274         rc = llog_create(ctxt, &loghandle, &body->lgd_logid, NULL);
275         if (rc)
276                 GOTO(out_pop, rc);
277
278         flags = body->lgd_llh_flags;
279         rc = llog_init_handle(loghandle, flags, NULL);
280         if (rc)
281                 GOTO(out_close, rc);
282
283         memset(buf, 0, LLOG_CHUNK_SIZE);
284         rc = llog_prev_block(loghandle, body->lgd_index,
285                              buf, LLOG_CHUNK_SIZE);
286         if (rc)
287                 GOTO(out_close, rc);
288
289         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
290                              LLOG_CHUNK_SIZE);
291         rc = req_capsule_server_pack(&req->rq_pill);
292         if (rc)
293                 GOTO(out_close, rc = -ENOMEM);
294
295         repbody = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
296         *repbody = *body;
297
298         ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
299         memcpy(ptr, buf, LLOG_CHUNK_SIZE);
300         GOTO(out_close, rc);
301 out_close:
302         rc2 = llog_close(loghandle);
303         if (!rc)
304                 rc = rc2;
305
306 out_pop:
307         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
308         llog_ctxt_put(ctxt);
309 out_free:
310         OBD_FREE(buf, LLOG_CHUNK_SIZE);
311         return rc;
312 }
313 EXPORT_SYMBOL(llog_origin_handle_prev_block);
314
315 int llog_origin_handle_read_header(struct ptlrpc_request *req)
316 {
317         struct obd_export    *exp = req->rq_export;
318         struct obd_device    *obd = exp->exp_obd;
319         struct obd_device    *disk_obd;
320         struct llog_handle   *loghandle;
321         struct llogd_body    *body;
322         struct llog_log_hdr  *hdr;
323         struct lvfs_run_ctxt  saved;
324         struct llog_ctxt     *ctxt;
325         __u32                 flags;
326         int                   rc, rc2;
327         ENTRY;
328
329         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
330         if (body == NULL)
331                 RETURN(-EFAULT);
332
333         ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
334         if (ctxt == NULL)
335                 RETURN(-ENODEV);
336
337         disk_obd = ctxt->loc_exp->exp_obd;
338         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
339
340         rc = llog_create(ctxt, &loghandle, &body->lgd_logid, NULL);
341         if (rc)
342                 GOTO(out_pop, rc);
343
344         /*
345          * llog_init_handle() reads the llog header
346          */
347         flags = body->lgd_llh_flags;
348         rc = llog_init_handle(loghandle, flags, NULL);
349         if (rc)
350                 GOTO(out_close, rc);
351
352         rc = req_capsule_server_pack(&req->rq_pill);
353         if (rc)
354                 GOTO(out_close, rc = -ENOMEM);
355
356         hdr = req_capsule_server_get(&req->rq_pill, &RMF_LLOG_LOG_HDR);
357         *hdr = *loghandle->lgh_hdr;
358         GOTO(out_close, rc);
359 out_close:
360         rc2 = llog_close(loghandle);
361         if (!rc)
362                 rc = rc2;
363 out_pop:
364         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
365         llog_ctxt_put(ctxt);
366         return rc;
367 }
368 EXPORT_SYMBOL(llog_origin_handle_read_header);
369
370 int llog_origin_handle_close(struct ptlrpc_request *req)
371 {
372         ENTRY;
373         /* Nothing to do */
374         RETURN(0);
375 }
376 EXPORT_SYMBOL(llog_origin_handle_close);
377
378 int llog_origin_handle_cancel(struct ptlrpc_request *req)
379 {
380         struct obd_device *obd = req->rq_export->exp_obd;
381         int num_cookies, rc = 0, err, i, failed = 0;
382         struct obd_device *disk_obd;
383         struct llog_cookie *logcookies;
384         struct llog_ctxt *ctxt = NULL;
385         struct lvfs_run_ctxt saved;
386         struct llog_handle *cathandle;
387         struct inode *inode;
388         void *handle;
389         ENTRY;
390
391         logcookies = req_capsule_client_get(&req->rq_pill, &RMF_LOGCOOKIES);
392         num_cookies = req_capsule_get_size(&req->rq_pill, &RMF_LOGCOOKIES,
393                                            RCL_CLIENT) / sizeof(*logcookies);
394         if (logcookies == NULL || num_cookies == 0) {
395                 DEBUG_REQ(D_HA, req, "No llog cookies sent");
396                 RETURN(-EFAULT);
397         }
398
399         ctxt = llog_get_context(obd, logcookies->lgc_subsys);
400         if (ctxt == NULL)
401                 RETURN(-ENODEV);
402
403         disk_obd = ctxt->loc_exp->exp_obd;
404         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
405         for (i = 0; i < num_cookies; i++, logcookies++) {
406                 cathandle = ctxt->loc_handle;
407                 LASSERT(cathandle != NULL);
408                 inode = cathandle->lgh_file->f_dentry->d_inode;
409
410                 handle = fsfilt_start_log(disk_obd, inode,
411                                           FSFILT_OP_CANCEL_UNLINK, NULL, 1);
412                 if (IS_ERR(handle)) {
413                         CERROR("fsfilt_start_log() failed: %ld\n",
414                                PTR_ERR(handle));
415                         GOTO(pop_ctxt, rc = PTR_ERR(handle));
416                 }
417
418                 rc = llog_cat_cancel_records(cathandle, 1, logcookies);
419
420                 /*
421                  * Do not raise -ENOENT errors for resent rpcs. This rec already
422                  * might be killed.
423                  */
424                 if (rc == -ENOENT &&
425                     (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)) {
426                         /*
427                          * Do not change this message, reply-single.sh test_59b
428                          * expects to find this in log.
429                          */
430                         CDEBUG(D_RPCTRACE, "RESENT cancel req %p - ignored\n",
431                                req);
432                         rc = 0;
433                 } else if (rc == 0) {
434                         CDEBUG(D_RPCTRACE, "Canceled %d llog-records\n",
435                                num_cookies);
436                 }
437
438                 err = fsfilt_commit(disk_obd, inode, handle, 0);
439                 if (err) {
440                         CERROR("Error committing transaction: %d\n", err);
441                         if (!rc)
442                                 rc = err;
443                         failed++;
444                         GOTO(pop_ctxt, rc);
445                 } else if (rc)
446                         failed++;
447         }
448         GOTO(pop_ctxt, rc);
449 pop_ctxt:
450         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
451         if (rc)
452                 CERROR("Cancel %d of %d llog-records failed: %d\n",
453                        failed, num_cookies, rc);
454
455         llog_ctxt_put(ctxt);
456         return rc;
457 }
458 EXPORT_SYMBOL(llog_origin_handle_cancel);
459
460 #else /* !__KERNEL__ */
461 int llog_origin_handle_create(struct ptlrpc_request *req)
462 {
463         LBUG();
464         return 0;
465 }
466
467 int llog_origin_handle_destroy(struct ptlrpc_request *req)
468 {
469         LBUG();
470         return 0;
471 }
472
473 int llog_origin_handle_next_block(struct ptlrpc_request *req)
474 {
475         LBUG();
476         return 0;
477 }
478 int llog_origin_handle_prev_block(struct ptlrpc_request *req)
479 {
480         LBUG();
481         return 0;
482 }
483 int llog_origin_handle_read_header(struct ptlrpc_request *req)
484 {
485         LBUG();
486         return 0;
487 }
488 int llog_origin_handle_close(struct ptlrpc_request *req)
489 {
490         LBUG();
491         return 0;
492 }
493 int llog_origin_handle_cancel(struct ptlrpc_request *req)
494 {
495         LBUG();
496         return 0;
497 }
498 #endif