Whamcloud - gitweb
242b830dc722e0fa709030895fa4fa864529f878
[fs/lustre-release.git] / lustre / ptlrpc / llog_server.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ptlrpc/llog_server.c
37  *
38  * remote api for llog - server side
39  *
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LOG
44
45 #ifndef __KERNEL__
46 #include <liblustre.h>
47 #endif
48
49 #include <obd_class.h>
50 #include <lustre_log.h>
51 #include <lustre_net.h>
52 #include <libcfs/list.h>
53 #include <lustre_fsfilt.h>
54
55 #if defined(__KERNEL__) && defined(LUSTRE_LOG_SERVER)
56
57 int llog_origin_handle_create(struct ptlrpc_request *req)
58 {
59         struct obd_export    *exp = req->rq_export;
60         struct obd_device    *obd = exp->exp_obd;
61         struct obd_device    *disk_obd;
62         struct llog_handle   *loghandle;
63         struct llogd_body    *body;
64         struct lvfs_run_ctxt  saved;
65         struct llog_logid    *logid = NULL;
66         struct llog_ctxt     *ctxt;
67         char                 *name = NULL;
68         int                   rc, rc2;
69         ENTRY;
70
71         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
72         if (body == NULL)
73                 RETURN(-EFAULT);
74
75         if (body->lgd_logid.lgl_oid > 0)
76                 logid = &body->lgd_logid;
77
78         if (req_capsule_field_present(&req->rq_pill, &RMF_NAME, RCL_CLIENT)) {
79                 name = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
80                 if (name == NULL)
81                         RETURN(-EFAULT);
82                 CDEBUG(D_INFO, "%s: opening log %s\n", obd->obd_name, name);
83         }
84
85         ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
86         if (ctxt == NULL) {
87                 CDEBUG(D_WARNING, "%s: no ctxt. group=%p idx=%d name=%s\n",
88                        obd->obd_name, &obd->obd_olg, body->lgd_ctxt_idx, name);
89                 RETURN(-ENODEV);
90         }
91         disk_obd = ctxt->loc_exp->exp_obd;
92         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
93
94         rc = llog_create(ctxt, &loghandle, logid, name);
95         if (rc)
96                 GOTO(out_pop, rc);
97
98         rc = req_capsule_server_pack(&req->rq_pill);
99         if (rc)
100                 GOTO(out_close, rc = -ENOMEM);
101
102         body = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
103         body->lgd_logid = loghandle->lgh_id;
104
105         GOTO(out_close, rc);
106 out_close:
107         rc2 = llog_close(loghandle);
108         if (!rc)
109                 rc = rc2;
110 out_pop:
111         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
112         llog_ctxt_put(ctxt);
113         return rc;
114 }
115
116 int llog_origin_handle_destroy(struct ptlrpc_request *req)
117 {
118         struct obd_export    *exp = req->rq_export;
119         struct obd_device    *obd = exp->exp_obd;
120         struct obd_device    *disk_obd;
121         struct llog_handle   *loghandle;
122         struct llogd_body    *body;
123         struct lvfs_run_ctxt  saved;
124         struct llog_logid    *logid = NULL;
125         struct llog_ctxt     *ctxt;
126         int                   rc;
127         ENTRY;
128
129         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
130         if (body == NULL)
131                 RETURN(-EFAULT);
132
133         if (body->lgd_logid.lgl_oid > 0)
134                 logid = &body->lgd_logid;
135
136         ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
137         if (ctxt == NULL)
138                 RETURN(-ENODEV);
139
140         disk_obd = ctxt->loc_exp->exp_obd;
141         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
142
143         rc = llog_create(ctxt, &loghandle, logid, NULL);
144         if (rc)
145                 GOTO(out_pop, rc);
146
147         rc = req_capsule_server_pack(&req->rq_pill);
148         if (rc)
149                 GOTO(out_close, rc = -ENOMEM);
150
151         body = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
152         body->lgd_logid = loghandle->lgh_id;
153         rc = llog_init_handle(loghandle, LLOG_F_IS_PLAIN, NULL);
154         if (rc)
155                 GOTO(out_close, rc);
156         rc = llog_destroy(loghandle);
157         if (rc)
158                 GOTO(out_close, rc);
159         llog_free_handle(loghandle);
160         GOTO(out_close, rc);
161 out_close:
162         if (rc)
163                 llog_close(loghandle);
164 out_pop:
165         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
166         llog_ctxt_put(ctxt);
167         return rc;
168 }
169
170 int llog_origin_handle_next_block(struct ptlrpc_request *req)
171 {
172         struct obd_export   *exp = req->rq_export;
173         struct obd_device   *obd = exp->exp_obd;
174         struct obd_device   *disk_obd;
175         struct llog_handle  *loghandle;
176         struct llogd_body   *body;
177         struct llogd_body   *repbody;
178         struct lvfs_run_ctxt saved;
179         struct llog_ctxt    *ctxt;
180         __u32                flags;
181         __u8                *buf;
182         void                *ptr;
183         int                  rc, rc2;
184         ENTRY;
185
186         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
187         if (body == NULL)
188                 RETURN(-EFAULT);
189
190         OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
191         if (!buf)
192                 RETURN(-ENOMEM);
193
194         ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
195         if (ctxt == NULL)
196                 GOTO(out_free, rc = -ENODEV);
197         disk_obd = ctxt->loc_exp->exp_obd;
198         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
199
200         rc = llog_create(ctxt, &loghandle, &body->lgd_logid, NULL);
201         if (rc)
202                 GOTO(out_pop, rc);
203
204         flags = body->lgd_llh_flags;
205         rc = llog_init_handle(loghandle, flags, NULL);
206         if (rc)
207                 GOTO(out_close, rc);
208
209         memset(buf, 0, LLOG_CHUNK_SIZE);
210         rc = llog_next_block(loghandle, &body->lgd_saved_index,
211                              body->lgd_index,
212                              &body->lgd_cur_offset, buf, LLOG_CHUNK_SIZE);
213         if (rc)
214                 GOTO(out_close, rc);
215
216         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
217                              LLOG_CHUNK_SIZE);
218         rc = req_capsule_server_pack(&req->rq_pill);
219         if (rc)
220                 GOTO(out_close, rc = -ENOMEM);
221
222         repbody = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
223         *repbody = *body;
224
225         ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
226         memcpy(ptr, buf, LLOG_CHUNK_SIZE);
227         GOTO(out_close, rc);
228 out_close:
229         rc2 = llog_close(loghandle);
230         if (!rc)
231                 rc = rc2;
232 out_pop:
233         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
234         llog_ctxt_put(ctxt);
235 out_free:
236         OBD_FREE(buf, LLOG_CHUNK_SIZE);
237         return rc;
238 }
239
240 int llog_origin_handle_prev_block(struct ptlrpc_request *req)
241 {
242         struct obd_export    *exp = req->rq_export;
243         struct obd_device    *obd = exp->exp_obd;
244         struct llog_handle   *loghandle;
245         struct llogd_body    *body;
246         struct llogd_body    *repbody;
247         struct obd_device    *disk_obd;
248         struct lvfs_run_ctxt  saved;
249         struct llog_ctxt     *ctxt;
250         __u32                 flags;
251         __u8                 *buf;
252         void                 *ptr;
253         int                   rc, rc2;
254         ENTRY;
255
256         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
257         if (body == NULL)
258                 RETURN(-EFAULT);
259
260         OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
261         if (!buf)
262                 RETURN(-ENOMEM);
263
264         ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
265         if (ctxt == NULL)
266                 GOTO(out_free, rc = -ENODEV);
267
268         disk_obd = ctxt->loc_exp->exp_obd;
269         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
270
271         rc = llog_create(ctxt, &loghandle, &body->lgd_logid, NULL);
272         if (rc)
273                 GOTO(out_pop, rc);
274
275         flags = body->lgd_llh_flags;
276         rc = llog_init_handle(loghandle, flags, NULL);
277         if (rc)
278                 GOTO(out_close, rc);
279
280         memset(buf, 0, LLOG_CHUNK_SIZE);
281         rc = llog_prev_block(loghandle, body->lgd_index,
282                              buf, LLOG_CHUNK_SIZE);
283         if (rc)
284                 GOTO(out_close, rc);
285
286         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
287                              LLOG_CHUNK_SIZE);
288         rc = req_capsule_server_pack(&req->rq_pill);
289         if (rc)
290                 GOTO(out_close, rc = -ENOMEM);
291
292         repbody = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
293         *repbody = *body;
294
295         ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
296         memcpy(ptr, buf, LLOG_CHUNK_SIZE);
297         GOTO(out_close, rc);
298 out_close:
299         rc2 = llog_close(loghandle);
300         if (!rc)
301                 rc = rc2;
302
303 out_pop:
304         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
305         llog_ctxt_put(ctxt);
306 out_free:
307         OBD_FREE(buf, LLOG_CHUNK_SIZE);
308         return rc;
309 }
310
311 int llog_origin_handle_read_header(struct ptlrpc_request *req)
312 {
313         struct obd_export    *exp = req->rq_export;
314         struct obd_device    *obd = exp->exp_obd;
315         struct obd_device    *disk_obd;
316         struct llog_handle   *loghandle;
317         struct llogd_body    *body;
318         struct llog_log_hdr  *hdr;
319         struct lvfs_run_ctxt  saved;
320         struct llog_ctxt     *ctxt;
321         __u32                 flags;
322         int                   rc, rc2;
323         ENTRY;
324
325         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
326         if (body == NULL)
327                 RETURN(-EFAULT);
328
329         ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
330         if (ctxt == NULL)
331                 RETURN(-ENODEV);
332
333         disk_obd = ctxt->loc_exp->exp_obd;
334         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
335
336         rc = llog_create(ctxt, &loghandle, &body->lgd_logid, NULL);
337         if (rc)
338                 GOTO(out_pop, rc);
339
340         /*
341          * llog_init_handle() reads the llog header
342          */
343         flags = body->lgd_llh_flags;
344         rc = llog_init_handle(loghandle, flags, NULL);
345         if (rc)
346                 GOTO(out_close, rc);
347
348         rc = req_capsule_server_pack(&req->rq_pill);
349         if (rc)
350                 GOTO(out_close, rc = -ENOMEM);
351
352         hdr = req_capsule_server_get(&req->rq_pill, &RMF_LLOG_LOG_HDR);
353         *hdr = *loghandle->lgh_hdr;
354         GOTO(out_close, rc);
355 out_close:
356         rc2 = llog_close(loghandle);
357         if (!rc)
358                 rc = rc2;
359 out_pop:
360         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
361         llog_ctxt_put(ctxt);
362         return rc;
363 }
364
365 int llog_origin_handle_close(struct ptlrpc_request *req)
366 {
367         ENTRY;
368         /* Nothing to do */
369         RETURN(0);
370 }
371
372 int llog_origin_handle_cancel(struct ptlrpc_request *req)
373 {
374         struct obd_device *obd = req->rq_export->exp_obd;
375         int num_cookies, rc = 0, err, i, failed = 0;
376         struct obd_device *disk_obd;
377         struct llog_cookie *logcookies;
378         struct llog_ctxt *ctxt = NULL;
379         struct lvfs_run_ctxt saved;
380         struct llog_handle *cathandle;
381         struct inode *inode;
382         void *handle;
383         ENTRY;
384
385         logcookies = req_capsule_client_get(&req->rq_pill, &RMF_LOGCOOKIES);
386         num_cookies = req_capsule_get_size(&req->rq_pill, &RMF_LOGCOOKIES,
387                                            RCL_CLIENT) / sizeof(*logcookies);
388         if (logcookies == NULL || num_cookies == 0) {
389                 DEBUG_REQ(D_HA, req, "No llog cookies sent");
390                 RETURN(-EFAULT);
391         }
392
393         ctxt = llog_get_context(obd, logcookies->lgc_subsys);
394         if (ctxt == NULL)
395                 RETURN(-ENODEV);
396
397         disk_obd = ctxt->loc_exp->exp_obd;
398         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
399         for (i = 0; i < num_cookies; i++, logcookies++) {
400                 cathandle = ctxt->loc_handle;
401                 LASSERT(cathandle != NULL);
402                 inode = cathandle->lgh_file->f_dentry->d_inode;
403
404                 handle = fsfilt_start_log(disk_obd, inode,
405                                           FSFILT_OP_CANCEL_UNLINK, NULL, 1);
406                 if (IS_ERR(handle)) {
407                         CERROR("fsfilt_start_log() failed: %ld\n",
408                                PTR_ERR(handle));
409                         GOTO(pop_ctxt, rc = PTR_ERR(handle));
410                 }
411
412                 rc = llog_cat_cancel_records(cathandle, 1, logcookies);
413
414                 /*
415                  * Do not raise -ENOENT errors for resent rpcs. This rec already
416                  * might be killed.
417                  */
418                 if (rc == -ENOENT &&
419                     (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)) {
420                         /*
421                          * Do not change this message, reply-single.sh test_59b
422                          * expects to find this in log.
423                          */
424                         CDEBUG(D_RPCTRACE, "RESENT cancel req %p - ignored\n",
425                                req);
426                         rc = 0;
427                 } else if (rc == 0) {
428                         CDEBUG(D_RPCTRACE, "Canceled %d llog-records\n",
429                                num_cookies);
430                 }
431
432                 err = fsfilt_commit(disk_obd, inode, handle, 0);
433                 if (err) {
434                         CERROR("Error committing transaction: %d\n", err);
435                         if (!rc)
436                                 rc = err;
437                         failed++;
438                         GOTO(pop_ctxt, rc);
439                 } else if (rc)
440                         failed++;
441         }
442         GOTO(pop_ctxt, rc);
443 pop_ctxt:
444         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
445         if (rc)
446                 CERROR("Cancel %d of %d llog-records failed: %d\n",
447                        failed, num_cookies, rc);
448
449         llog_ctxt_put(ctxt);
450         return rc;
451 }
452 EXPORT_SYMBOL(llog_origin_handle_cancel);
453
454 #else /* !__KERNEL__ */
455 int llog_origin_handle_create(struct ptlrpc_request *req)
456 {
457         LBUG();
458         return 0;
459 }
460
461 int llog_origin_handle_destroy(struct ptlrpc_request *req)
462 {
463         LBUG();
464         return 0;
465 }
466
467 int llog_origin_handle_next_block(struct ptlrpc_request *req)
468 {
469         LBUG();
470         return 0;
471 }
472 int llog_origin_handle_prev_block(struct ptlrpc_request *req)
473 {
474         LBUG();
475         return 0;
476 }
477 int llog_origin_handle_read_header(struct ptlrpc_request *req)
478 {
479         LBUG();
480         return 0;
481 }
482 int llog_origin_handle_close(struct ptlrpc_request *req)
483 {
484         LBUG();
485         return 0;
486 }
487 int llog_origin_handle_cancel(struct ptlrpc_request *req)
488 {
489         LBUG();
490         return 0;
491 }
492 #endif