Whamcloud - gitweb
LU-1818 quota: en/disable quota enforcement via conf_param
[fs/lustre-release.git] / lustre / ptlrpc / llog_server.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ptlrpc/llog_server.c
37  *
38  * remote api for llog - server side
39  *
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LOG
44
45 #ifndef __KERNEL__
46 #include <liblustre.h>
47 #endif
48
49 #include <obd_class.h>
50 #include <lustre_log.h>
51 #include <lustre_net.h>
52 #include <lustre_fsfilt.h>
53
54 #if defined(__KERNEL__) && defined(LUSTRE_LOG_SERVER)
55
56 int llog_origin_handle_create(struct ptlrpc_request *req)
57 {
58         struct obd_export    *exp = req->rq_export;
59         struct obd_device    *obd = exp->exp_obd;
60         struct obd_device    *disk_obd;
61         struct llog_handle   *loghandle;
62         struct llogd_body    *body;
63         struct lvfs_run_ctxt  saved;
64         struct llog_logid    *logid = NULL;
65         struct llog_ctxt     *ctxt;
66         char                 *name = NULL;
67         int                   rc, rc2;
68         ENTRY;
69
70         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
71         if (body == NULL)
72                 RETURN(-EFAULT);
73
74         if (body->lgd_logid.lgl_oid > 0)
75                 logid = &body->lgd_logid;
76
77         if (req_capsule_field_present(&req->rq_pill, &RMF_NAME, RCL_CLIENT)) {
78                 name = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
79                 if (name == NULL)
80                         RETURN(-EFAULT);
81                 CDEBUG(D_INFO, "%s: opening log %s\n", obd->obd_name, name);
82         }
83
84         ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
85         if (ctxt == NULL) {
86                 CDEBUG(D_WARNING, "%s: no ctxt. group=%p idx=%d name=%s\n",
87                        obd->obd_name, &obd->obd_olg, body->lgd_ctxt_idx, name);
88                 RETURN(-ENODEV);
89         }
90         disk_obd = ctxt->loc_exp->exp_obd;
91         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
92
93         rc = llog_create(req->rq_svc_thread->t_env, ctxt, &loghandle, logid,
94                          name);
95         if (rc)
96                 GOTO(out_pop, rc);
97
98         rc = req_capsule_server_pack(&req->rq_pill);
99         if (rc)
100                 GOTO(out_close, rc = -ENOMEM);
101
102         body = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
103         body->lgd_logid = loghandle->lgh_id;
104
105         EXIT;
106 out_close:
107         rc2 = llog_close(req->rq_svc_thread->t_env, loghandle);
108         if (!rc)
109                 rc = rc2;
110 out_pop:
111         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
112         llog_ctxt_put(ctxt);
113         return rc;
114 }
115 EXPORT_SYMBOL(llog_origin_handle_create);
116
117 int llog_origin_handle_destroy(struct ptlrpc_request *req)
118 {
119         struct obd_export    *exp = req->rq_export;
120         struct obd_device    *obd = exp->exp_obd;
121         struct obd_device    *disk_obd;
122         struct llog_handle   *loghandle;
123         struct llogd_body    *body;
124         struct lvfs_run_ctxt  saved;
125         struct llog_logid    *logid = NULL;
126         struct llog_ctxt     *ctxt;
127         int                   rc;
128         ENTRY;
129
130         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
131         if (body == NULL)
132                 RETURN(-EFAULT);
133
134         if (body->lgd_logid.lgl_oid > 0)
135                 logid = &body->lgd_logid;
136
137         ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
138         if (ctxt == NULL)
139                 RETURN(-ENODEV);
140
141         disk_obd = ctxt->loc_exp->exp_obd;
142         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
143
144         rc = llog_create(req->rq_svc_thread->t_env, ctxt, &loghandle, logid,
145                          NULL);
146         if (rc)
147                 GOTO(out_pop, rc);
148
149         rc = req_capsule_server_pack(&req->rq_pill);
150         if (rc)
151                 GOTO(out_close, rc = -ENOMEM);
152
153         body = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
154         body->lgd_logid = loghandle->lgh_id;
155         rc = llog_init_handle(req->rq_svc_thread->t_env, loghandle,
156                               LLOG_F_IS_PLAIN, NULL);
157         if (rc)
158                 GOTO(out_close, rc);
159         rc = llog_destroy(req->rq_svc_thread->t_env, loghandle);
160         if (rc)
161                 GOTO(out_close, rc);
162         llog_free_handle(loghandle);
163         EXIT;
164 out_close:
165         if (rc)
166                 llog_close(req->rq_svc_thread->t_env, loghandle);
167 out_pop:
168         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
169         llog_ctxt_put(ctxt);
170         return rc;
171 }
172 EXPORT_SYMBOL(llog_origin_handle_destroy);
173
174 int llog_origin_handle_next_block(struct ptlrpc_request *req)
175 {
176         struct obd_export   *exp = req->rq_export;
177         struct obd_device   *obd = exp->exp_obd;
178         struct obd_device   *disk_obd;
179         struct llog_handle  *loghandle;
180         struct llogd_body   *body;
181         struct llogd_body   *repbody;
182         struct lvfs_run_ctxt saved;
183         struct llog_ctxt    *ctxt;
184         __u32                flags;
185         __u8                *buf;
186         void                *ptr;
187         int                  rc, rc2;
188         ENTRY;
189
190         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
191         if (body == NULL)
192                 RETURN(-EFAULT);
193
194         OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
195         if (!buf)
196                 RETURN(-ENOMEM);
197
198         ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
199         if (ctxt == NULL)
200                 GOTO(out_free, rc = -ENODEV);
201         disk_obd = ctxt->loc_exp->exp_obd;
202         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
203
204         rc = llog_create(req->rq_svc_thread->t_env, ctxt, &loghandle,
205                          &body->lgd_logid, NULL);
206         if (rc)
207                 GOTO(out_pop, rc);
208
209         flags = body->lgd_llh_flags;
210         rc = llog_init_handle(req->rq_svc_thread->t_env, loghandle, flags,
211                               NULL);
212         if (rc)
213                 GOTO(out_close, rc);
214
215         memset(buf, 0, LLOG_CHUNK_SIZE);
216         rc = llog_next_block(req->rq_svc_thread->t_env, loghandle,
217                              &body->lgd_saved_index, body->lgd_index,
218                              &body->lgd_cur_offset, buf, LLOG_CHUNK_SIZE);
219         if (rc)
220                 GOTO(out_close, rc);
221
222         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
223                              LLOG_CHUNK_SIZE);
224         rc = req_capsule_server_pack(&req->rq_pill);
225         if (rc)
226                 GOTO(out_close, rc = -ENOMEM);
227
228         repbody = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
229         *repbody = *body;
230
231         ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
232         memcpy(ptr, buf, LLOG_CHUNK_SIZE);
233         EXIT;
234 out_close:
235         rc2 = llog_close(req->rq_svc_thread->t_env, loghandle);
236         if (!rc)
237                 rc = rc2;
238 out_pop:
239         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
240         llog_ctxt_put(ctxt);
241 out_free:
242         OBD_FREE(buf, LLOG_CHUNK_SIZE);
243         return rc;
244 }
245 EXPORT_SYMBOL(llog_origin_handle_next_block);
246
247 int llog_origin_handle_prev_block(struct ptlrpc_request *req)
248 {
249         struct obd_export    *exp = req->rq_export;
250         struct obd_device    *obd = exp->exp_obd;
251         struct llog_handle   *loghandle;
252         struct llogd_body    *body;
253         struct llogd_body    *repbody;
254         struct obd_device    *disk_obd;
255         struct lvfs_run_ctxt  saved;
256         struct llog_ctxt     *ctxt;
257         __u32                 flags;
258         __u8                 *buf;
259         void                 *ptr;
260         int                   rc, rc2;
261         ENTRY;
262
263         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
264         if (body == NULL)
265                 RETURN(-EFAULT);
266
267         OBD_ALLOC(buf, LLOG_CHUNK_SIZE);
268         if (!buf)
269                 RETURN(-ENOMEM);
270
271         ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
272         if (ctxt == NULL)
273                 GOTO(out_free, rc = -ENODEV);
274
275         disk_obd = ctxt->loc_exp->exp_obd;
276         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
277
278         rc = llog_create(req->rq_svc_thread->t_env, ctxt, &loghandle,
279                          &body->lgd_logid, NULL);
280         if (rc)
281                 GOTO(out_pop, rc);
282
283         flags = body->lgd_llh_flags;
284         rc = llog_init_handle(req->rq_svc_thread->t_env, loghandle, flags,
285                               NULL);
286         if (rc)
287                 GOTO(out_close, rc);
288
289         memset(buf, 0, LLOG_CHUNK_SIZE);
290         rc = llog_prev_block(req->rq_svc_thread->t_env, loghandle,
291                              body->lgd_index, buf, LLOG_CHUNK_SIZE);
292         if (rc)
293                 GOTO(out_close, rc);
294
295         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
296                              LLOG_CHUNK_SIZE);
297         rc = req_capsule_server_pack(&req->rq_pill);
298         if (rc)
299                 GOTO(out_close, rc = -ENOMEM);
300
301         repbody = req_capsule_server_get(&req->rq_pill, &RMF_LLOGD_BODY);
302         *repbody = *body;
303
304         ptr = req_capsule_server_get(&req->rq_pill, &RMF_EADATA);
305         memcpy(ptr, buf, LLOG_CHUNK_SIZE);
306         EXIT;
307 out_close:
308         rc2 = llog_close(req->rq_svc_thread->t_env, loghandle);
309         if (!rc)
310                 rc = rc2;
311
312 out_pop:
313         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
314         llog_ctxt_put(ctxt);
315 out_free:
316         OBD_FREE(buf, LLOG_CHUNK_SIZE);
317         return rc;
318 }
319 EXPORT_SYMBOL(llog_origin_handle_prev_block);
320
321 int llog_origin_handle_read_header(struct ptlrpc_request *req)
322 {
323         struct obd_export    *exp = req->rq_export;
324         struct obd_device    *obd = exp->exp_obd;
325         struct obd_device    *disk_obd;
326         struct llog_handle   *loghandle;
327         struct llogd_body    *body;
328         struct llog_log_hdr  *hdr;
329         struct lvfs_run_ctxt  saved;
330         struct llog_ctxt     *ctxt;
331         __u32                 flags;
332         int                   rc, rc2;
333         ENTRY;
334
335         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
336         if (body == NULL)
337                 RETURN(-EFAULT);
338
339         ctxt = llog_get_context(obd, body->lgd_ctxt_idx);
340         if (ctxt == NULL)
341                 RETURN(-ENODEV);
342
343         disk_obd = ctxt->loc_exp->exp_obd;
344         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
345
346         rc = llog_create(req->rq_svc_thread->t_env, ctxt, &loghandle,
347                          &body->lgd_logid, NULL);
348         if (rc)
349                 GOTO(out_pop, rc);
350
351         /*
352          * llog_init_handle() reads the llog header
353          */
354         flags = body->lgd_llh_flags;
355         rc = llog_init_handle(req->rq_svc_thread->t_env, loghandle, flags,
356                               NULL);
357         if (rc)
358                 GOTO(out_close, rc);
359
360         rc = req_capsule_server_pack(&req->rq_pill);
361         if (rc)
362                 GOTO(out_close, rc = -ENOMEM);
363
364         hdr = req_capsule_server_get(&req->rq_pill, &RMF_LLOG_LOG_HDR);
365         *hdr = *loghandle->lgh_hdr;
366         EXIT;
367 out_close:
368         rc2 = llog_close(req->rq_svc_thread->t_env, loghandle);
369         if (!rc)
370                 rc = rc2;
371 out_pop:
372         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
373         llog_ctxt_put(ctxt);
374         return rc;
375 }
376 EXPORT_SYMBOL(llog_origin_handle_read_header);
377
378 int llog_origin_handle_close(struct ptlrpc_request *req)
379 {
380         ENTRY;
381         /* Nothing to do */
382         RETURN(0);
383 }
384 EXPORT_SYMBOL(llog_origin_handle_close);
385
386 int llog_origin_handle_cancel(struct ptlrpc_request *req)
387 {
388         struct obd_device *obd = req->rq_export->exp_obd;
389         int num_cookies, rc = 0, err, i, failed = 0;
390         struct obd_device *disk_obd;
391         struct llog_cookie *logcookies;
392         struct llog_ctxt *ctxt = NULL;
393         struct lvfs_run_ctxt saved;
394         struct llog_handle *cathandle;
395         struct inode *inode;
396         void *handle;
397         ENTRY;
398
399         logcookies = req_capsule_client_get(&req->rq_pill, &RMF_LOGCOOKIES);
400         num_cookies = req_capsule_get_size(&req->rq_pill, &RMF_LOGCOOKIES,
401                                            RCL_CLIENT) / sizeof(*logcookies);
402         if (logcookies == NULL || num_cookies == 0) {
403                 DEBUG_REQ(D_HA, req, "No llog cookies sent");
404                 RETURN(-EFAULT);
405         }
406
407         ctxt = llog_get_context(obd, logcookies->lgc_subsys);
408         if (ctxt == NULL)
409                 RETURN(-ENODEV);
410
411         disk_obd = ctxt->loc_exp->exp_obd;
412         push_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
413         for (i = 0; i < num_cookies; i++, logcookies++) {
414                 cathandle = ctxt->loc_handle;
415                 LASSERT(cathandle != NULL);
416                 inode = cathandle->lgh_file->f_dentry->d_inode;
417
418                 handle = fsfilt_start_log(disk_obd, inode,
419                                           FSFILT_OP_CANCEL_UNLINK, NULL, 1);
420                 if (IS_ERR(handle)) {
421                         CERROR("fsfilt_start_log() failed: %ld\n",
422                                PTR_ERR(handle));
423                         GOTO(pop_ctxt, rc = PTR_ERR(handle));
424                 }
425
426                 rc = llog_cat_cancel_records(req->rq_svc_thread->t_env,
427                                              cathandle, 1, logcookies);
428
429                 /*
430                  * Do not raise -ENOENT errors for resent rpcs. This rec already
431                  * might be killed.
432                  */
433                 if (rc == -ENOENT &&
434                     (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)) {
435                         /*
436                          * Do not change this message, reply-single.sh test_59b
437                          * expects to find this in log.
438                          */
439                         CDEBUG(D_RPCTRACE, "RESENT cancel req %p - ignored\n",
440                                req);
441                         rc = 0;
442                 } else if (rc == 0) {
443                         CDEBUG(D_RPCTRACE, "Canceled %d llog-records\n",
444                                num_cookies);
445                 }
446
447                 err = fsfilt_commit(disk_obd, inode, handle, 0);
448                 if (err) {
449                         CERROR("Error committing transaction: %d\n", err);
450                         if (!rc)
451                                 rc = err;
452                         failed++;
453                         GOTO(pop_ctxt, rc);
454                 } else if (rc)
455                         failed++;
456         }
457         GOTO(pop_ctxt, rc);
458 pop_ctxt:
459         pop_ctxt(&saved, &disk_obd->obd_lvfs_ctxt, NULL);
460         if (rc)
461                 CERROR("Cancel %d of %d llog-records failed: %d\n",
462                        failed, num_cookies, rc);
463
464         llog_ctxt_put(ctxt);
465         return rc;
466 }
467 EXPORT_SYMBOL(llog_origin_handle_cancel);
468
469 #else /* !__KERNEL__ */
470 int llog_origin_handle_create(struct ptlrpc_request *req)
471 {
472         LBUG();
473         return 0;
474 }
475
476 int llog_origin_handle_destroy(struct ptlrpc_request *req)
477 {
478         LBUG();
479         return 0;
480 }
481
482 int llog_origin_handle_next_block(struct ptlrpc_request *req)
483 {
484         LBUG();
485         return 0;
486 }
487 int llog_origin_handle_prev_block(struct ptlrpc_request *req)
488 {
489         LBUG();
490         return 0;
491 }
492 int llog_origin_handle_read_header(struct ptlrpc_request *req)
493 {
494         LBUG();
495         return 0;
496 }
497 int llog_origin_handle_close(struct ptlrpc_request *req)
498 {
499         LBUG();
500         return 0;
501 }
502 int llog_origin_handle_cancel(struct ptlrpc_request *req)
503 {
504         LBUG();
505         return 0;
506 }
507 #endif