Whamcloud - gitweb
merge b_devel into HEAD, which will become 0.7.3
[fs/lustre-release.git] / lustre / obdfilter / filter_log.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/fs/obdfilter/filter_log.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *   Author: Phil Schwan <phil@clusterfs.com>
10  *
11  *   This file is part of Lustre, http://www.lustre.org.
12  *
13  *   Lustre is free software; you can redistribute it and/or
14  *   modify it under the terms of version 2 of the GNU General Public
15  *   License as published by the Free Software Foundation.
16  *
17  *   Lustre is distributed in the hope that it will be useful,
18  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
19  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20  *   GNU General Public License for more details.
21  *
22  *   You should have received a copy of the GNU General Public License
23  *   along with Lustre; if not, write to the Free Software
24  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25  */
26
27 #define DEBUG_SUBSYSTEM S_FILTER
28
29 #include <linux/config.h>
30 #include <linux/module.h>
31 #include <linux/version.h>
32
33 #include <portals/list.h>
34 #include <linux/obd_class.h>
35 #include <linux/lustre_fsfilt.h>
36 #include <linux/lustre_commit_confd.h>
37
38 #include "filter_internal.h"
39
40 static struct llog_handle *filter_log_create(struct obd_device *obd);
41
42 /* This is a callback from the llog_* functions.
43  * Assumes caller has already pushed us into the kernel context. */
44 static int filter_log_close(struct llog_handle *cathandle,
45                             struct llog_handle *loghandle)
46 {
47         struct llog_object_hdr *llh = loghandle->lgh_hdr;
48         struct file *file = loghandle->lgh_file;
49         struct dentry *dparent = NULL, *dchild = NULL;
50         struct lustre_handle parent_lockh;
51         struct llog_logid *lgl = &loghandle->lgh_cookie.lgc_lgl;
52         int rc;
53         ENTRY;
54
55         /* If we are going to delete this log, grab a ref before we close
56          * it so we don't have to immediately do another lookup. */
57         if (llh->llh_hdr.lth_type != LLOG_CATALOG_MAGIC && llh->llh_count == 0){
58                 CDEBUG(D_INODE, "deleting log file "LPX64":%x\n",
59                        lgl->lgl_oid, lgl->lgl_ogen);
60                 dparent = filter_parent_lock(loghandle->lgh_obd, S_IFREG,
61                                              lgl->lgl_oid,LCK_PW,&parent_lockh);
62                 if (IS_ERR(dparent)) {
63                         rc = PTR_ERR(dparent);
64                         CERROR("error locking parent, orphan log %*s: rc %d\n",
65                                file->f_dentry->d_name.len,
66                                file->f_dentry->d_name.name, rc);
67                         RETURN(rc);
68                 } else {
69                         dchild = dget(file->f_dentry);
70                         llog_delete_log(cathandle, loghandle);
71                 }
72         } else {
73                 CDEBUG(D_INODE, "closing log file "LPX64":%x\n",
74                        lgl->lgl_oid, lgl->lgl_ogen);
75         }
76
77         rc = filp_close(file, 0);
78
79         llog_free_handle(loghandle); /* also removes loghandle from list */
80
81         if (dchild != NULL) {
82                 int err = vfs_unlink(dparent->d_inode, dchild);
83                 if (err) {
84                         CERROR("error unlinking empty log %*s: rc %d\n",
85                                dchild->d_name.len, dchild->d_name.name, err);
86                         if (!rc)
87                                 rc = err;
88                 }
89                 f_dput(dchild);
90                 ldlm_lock_decref(&parent_lockh, LCK_PW);
91         }
92         RETURN(rc);
93 }
94
95 /* This is a callback from the llog_* functions.
96  * Assumes caller has already pushed us into the kernel context. */
97 static struct llog_handle *filter_log_open(struct obd_device *obd,
98                                            struct llog_cookie *logcookie)
99 {
100         struct llog_logid *lgl = &logcookie->lgc_lgl;
101         struct llog_handle *loghandle;
102         struct dentry *dchild;
103         int rc;
104         ENTRY;
105
106         loghandle = llog_alloc_handle();
107         if (!loghandle)
108                 RETURN(ERR_PTR(-ENOMEM));
109
110         dchild = filter_fid2dentry(obd, NULL, S_IFREG, lgl->lgl_oid);
111         if (IS_ERR(dchild))
112                 GOTO(out_handle, rc = PTR_ERR(dchild));
113
114         if (dchild->d_inode == NULL) {
115                 CERROR("logcookie references non-existent object %*s\n",
116                        dchild->d_name.len, dchild->d_name.name);
117                 GOTO(out_dentry, rc = -ENOENT);
118         }
119
120         if (dchild->d_inode->i_generation != lgl->lgl_ogen) {
121                 CERROR("logcookie for %*s had different generation %x != %x\n",
122                        dchild->d_name.len, dchild->d_name.name,
123                        dchild->d_inode->i_generation, lgl->lgl_ogen);
124                 GOTO(out_dentry, rc = -ESTALE);
125         }
126
127         /* dentry_open does a dput(dchild) and mntput(mnt) on error */
128         mntget(obd->u.filter.fo_vfsmnt);
129         loghandle->lgh_file = dentry_open(dchild, obd->u.filter.fo_vfsmnt,
130                                           O_RDWR);
131         if (IS_ERR(loghandle->lgh_file)) {
132                 rc = PTR_ERR(loghandle->lgh_file);
133                 CERROR("error opening logfile %*s: rc %d\n",
134                        dchild->d_name.len, dchild->d_name.name, rc);
135                 GOTO(out_dentry, rc);
136         }
137         memcpy(&loghandle->lgh_cookie, logcookie, sizeof(*logcookie));
138         loghandle->lgh_log_create = filter_log_create;
139         loghandle->lgh_log_open = filter_log_open;
140         loghandle->lgh_log_close = filter_log_close;
141         loghandle->lgh_obd = obd;
142         RETURN(loghandle);
143
144 out_dentry:
145         f_dput(dchild);
146 out_handle:
147         llog_free_handle(loghandle);
148         RETURN(ERR_PTR(rc));
149 }
150
151 /* This is a callback from the llog_* functions.
152  * Assumes caller has already pushed us into the kernel context. */
153 static struct llog_handle *filter_log_create(struct obd_device *obd)
154 {
155         struct filter_obd *filter = &obd->u.filter;
156         struct lustre_handle parent_lockh;
157         struct dentry *dparent, *dchild;
158         struct llog_handle *loghandle;
159         struct file *file;
160         int err, rc;
161         obd_id id;
162         ENTRY;
163
164         loghandle = llog_alloc_handle();
165         if (!loghandle)
166                 RETURN(ERR_PTR(-ENOMEM));
167
168  retry:
169         id = filter_next_id(filter);
170
171         dparent = filter_parent_lock(obd, S_IFREG, id, LCK_PW, &parent_lockh);
172         if (IS_ERR(dparent))
173                 GOTO(out_ctxt, rc = PTR_ERR(dparent));
174
175         dchild = filter_fid2dentry(obd, dparent, S_IFREG, id);
176         if (IS_ERR(dchild))
177                 GOTO(out_lock, rc = PTR_ERR(dchild));
178
179         if (dchild->d_inode != NULL) {
180                 /* This would only happen if lastobjid was bad on disk */
181                 CERROR("Serious error: objid %*s already exists; is this "
182                        "filesystem corrupt?  I will try to work around it.\n",
183                        dchild->d_name.len, dchild->d_name.name);
184                 f_dput(dchild);
185                 ldlm_lock_decref(&parent_lockh, LCK_PW);
186                 goto retry;
187         }
188
189         rc = vfs_create(dparent->d_inode, dchild, S_IFREG);
190         if (rc) {
191                 CERROR("log create failed rc = %d\n", rc);
192                 GOTO(out_child, rc);
193         }
194
195         rc = filter_update_server_data(obd, filter->fo_rcvd_filp,
196                                        filter->fo_fsd);
197         if (rc) {
198                 CERROR("can't write lastobjid but log created: rc %d\n",rc);
199                 GOTO(out_destroy, rc);
200         }
201
202         /* dentry_open does a dput(dchild) and mntput(mnt) on error */
203         mntget(filter->fo_vfsmnt);
204         file = dentry_open(dchild, filter->fo_vfsmnt, O_RDWR | O_LARGEFILE);
205         if (IS_ERR(file)) {
206                 rc = PTR_ERR(file);
207                 CERROR("error opening log file "LPX64": rc %d\n", id, rc);
208                 GOTO(out_destroy, rc);
209         }
210         ldlm_lock_decref(&parent_lockh, LCK_PW);
211
212         loghandle->lgh_file = file;
213         loghandle->lgh_cookie.lgc_lgl.lgl_oid = id;
214         loghandle->lgh_cookie.lgc_lgl.lgl_ogen = dchild->d_inode->i_generation;
215         loghandle->lgh_log_create = filter_log_create;
216         loghandle->lgh_log_open = filter_log_open;
217         loghandle->lgh_log_close = filter_log_close;
218         loghandle->lgh_obd = obd;
219
220         RETURN(loghandle);
221
222 out_destroy:
223         err = vfs_unlink(dparent->d_inode, dchild);
224         if (err)
225                 CERROR("error unlinking %*s on error: rc %d\n",
226                        dchild->d_name.len, dchild->d_name.name, err);
227 out_child:
228         f_dput(dchild);
229 out_lock:
230         ldlm_lock_decref(&parent_lockh, LCK_PW);
231 out_ctxt:
232         llog_free_handle(loghandle);
233         RETURN(ERR_PTR(rc));
234 }
235
236 /* This is called from filter_setup() and should be single threaded */
237 struct llog_handle *filter_get_catalog(struct obd_device *obd)
238 {
239         struct filter_obd *filter = &obd->u.filter;
240         struct filter_server_data *fsd = filter->fo_fsd;
241         struct obd_run_ctxt saved;
242         struct llog_handle *cathandle = NULL;
243         int rc;
244         ENTRY;
245
246         push_ctxt(&saved, &filter->fo_ctxt, NULL);
247         if (fsd->fsd_catalog_oid) {
248                 struct llog_cookie catcookie;
249
250                 catcookie.lgc_lgl.lgl_oid = le64_to_cpu(fsd->fsd_catalog_oid);
251                 catcookie.lgc_lgl.lgl_ogen = le32_to_cpu(fsd->fsd_catalog_ogen);
252                 cathandle = filter_log_open(obd, &catcookie);
253                 if (IS_ERR(cathandle)) {
254                         CERROR("error opening catalog "LPX64":%x: rc %d\n",
255                                catcookie.lgc_lgl.lgl_oid,
256                                catcookie.lgc_lgl.lgl_ogen,
257                                (int)PTR_ERR(cathandle));
258                         fsd->fsd_catalog_oid = 0;
259                         fsd->fsd_catalog_ogen = 0;
260                 }
261         }
262
263         if (!fsd->fsd_catalog_oid) {
264                 struct llog_logid *lgl;
265
266                 cathandle = filter_log_create(obd);
267                 if (IS_ERR(cathandle)) {
268                         CERROR("error creating new catalog: rc %d\n",
269                                (int)PTR_ERR(cathandle));
270                         GOTO(out, cathandle);
271                 }
272                 lgl = &cathandle->lgh_cookie.lgc_lgl;
273                 fsd->fsd_catalog_oid = cpu_to_le64(lgl->lgl_oid);
274                 fsd->fsd_catalog_ogen = cpu_to_le32(lgl->lgl_ogen);
275                 rc = filter_update_server_data(obd, filter->fo_rcvd_filp, fsd);
276                 if (rc) {
277                         CERROR("error writing new catalog to disk: rc %d\n",rc);
278                         GOTO(out_handle, rc);
279                 }
280         }
281
282         rc = llog_init_catalog(cathandle, &obd->u.filter.fo_mdc_uuid);
283         if (rc)
284                 GOTO(out_handle, rc);
285 out:
286         pop_ctxt(&saved, &filter->fo_ctxt, NULL);
287         RETURN(cathandle);
288
289 out_handle:
290         filter_log_close(cathandle, cathandle);
291         cathandle = ERR_PTR(rc);
292         goto out;
293 }
294
295 void filter_put_catalog(struct llog_handle *cathandle)
296 {
297         struct llog_handle *loghandle, *n;
298         int rc;
299         ENTRY;
300
301         list_for_each_entry_safe(loghandle, n, &cathandle->lgh_list, lgh_list)
302                 filter_log_close(cathandle, loghandle);
303
304         rc = filp_close(cathandle->lgh_file, 0);
305         if (rc)
306                 CERROR("error closing catalog: rc %d\n", rc);
307
308         llog_free_handle(cathandle);
309         EXIT;
310 }
311
312 int filter_log_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm,
313                       int num_cookies, struct llog_cookie *logcookies,
314                       int flags)
315 {
316         struct obd_device *obd = class_conn2obd(conn);
317         struct obd_run_ctxt saved;
318         int rc;
319         ENTRY;
320
321         push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
322         rc = llog_cancel_records(obd->u.filter.fo_catalog, num_cookies,
323                                  logcookies);
324         pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
325
326         RETURN(rc);
327 }
328
329 int filter_log_op_create(struct llog_handle *cathandle, struct ll_fid *mds_fid,
330                          obd_id oid, obd_count ogen,
331                          struct llog_cookie *logcookie)
332 {
333         struct llog_create_rec *lcr;
334         int rc;
335         ENTRY;
336
337         OBD_ALLOC(lcr, sizeof(*lcr));
338         if (lcr == NULL)
339                 RETURN(-ENOMEM);
340         lcr->lcr_hdr.lth_len = lcr->lcr_end_len = sizeof(*lcr);
341         lcr->lcr_hdr.lth_type = OST_CREATE_REC;
342         lcr->lcr_fid.id = mds_fid->id;
343         lcr->lcr_fid.generation = mds_fid->generation;
344         lcr->lcr_fid.f_type = mds_fid->f_type;
345         lcr->lcr_oid = oid;
346         lcr->lcr_ogen = ogen;
347
348         rc = llog_add_record(cathandle, &lcr->lcr_hdr, logcookie);
349         OBD_FREE(lcr, sizeof(*lcr));
350
351         if (rc > 0) {
352                 LASSERT(rc == sizeof(*logcookie));
353                 rc = 0;
354         }
355         RETURN(rc);
356 }
357
358 int filter_log_op_orphan(struct llog_handle *cathandle, obd_id oid,
359                          obd_count ogen, struct llog_cookie *logcookie)
360 {
361         struct llog_orphan_rec *lor;
362         int rc;
363         ENTRY;
364
365         OBD_ALLOC(lor, sizeof(*lor));
366         if (lor == NULL)
367                 RETURN(-ENOMEM);
368         lor->lor_hdr.lth_len = lor->lor_end_len = sizeof(*lor);
369         lor->lor_hdr.lth_type = OST_ORPHAN_REC;
370         lor->lor_oid = oid;
371         lor->lor_ogen = ogen;
372
373         rc = llog_add_record(cathandle, &lor->lor_hdr, logcookie);
374
375         if (rc > 0) {
376                 LASSERT(rc == sizeof(*logcookie));
377                 rc = 0;
378         }
379         RETURN(rc);
380 }