Whamcloud - gitweb
b=21174 allow quotacheck over OSTs with sparse indices
[fs/lustre-release.git] / lustre / mds / mds_unlink_open.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mds/mds_unlink_open.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Andreas Dilger <adilger@clusterfs.com>
40  * Author: Phil Schwan <phil@clusterfs.com>
41  */
42
43 /* code for handling open unlinked files */
44
45 #define DEBUG_SUBSYSTEM S_MDS
46
47 #ifndef AUTOCONF_INCLUDED
48 #include <linux/config.h>
49 #endif
50 #include <linux/module.h>
51 #include <linux/version.h>
52
53 #include <libcfs/list.h>
54 #include <obd_class.h>
55 #include <lustre_fsfilt.h>
56 #include <lustre_mds.h>
57 #include <lvfs.h>
58
59 #include "mds_internal.h"
60
61 int mds_osc_destroy_orphan(struct obd_device *obd,
62                            umode_t mode,
63                            struct lov_mds_md *lmm,
64                            int lmm_size,
65                            struct llog_cookie *logcookies,
66                            int log_unlink)
67 {
68         struct mds_obd *mds = &obd->u.mds;
69         struct lov_stripe_md *lsm = NULL;
70         struct obd_trans_info oti = { 0 };
71         struct obdo *oa;
72         int rc;
73         ENTRY;
74
75         if (lmm_size == 0)
76                 RETURN(0);
77
78         rc = obd_unpackmd(mds->mds_lov_exp, &lsm, lmm, lmm_size);
79         if (rc < 0) {
80                 CERROR("Error unpack md %p\n", lmm);
81                 RETURN(rc);
82         } else {
83                 LASSERT(rc >= sizeof(*lsm));
84                 rc = 0;
85         }
86
87         rc = obd_checkmd(mds->mds_lov_exp, obd->obd_self_export, lsm);
88         if (rc)
89                 GOTO(out_free_memmd, rc);
90
91         OBDO_ALLOC(oa);
92         if (oa == NULL)
93                 GOTO(out_free_memmd, rc = -ENOMEM);
94         oa->o_id = lsm->lsm_object_id;
95         oa->o_gr = 0;
96         oa->o_mode = mode & S_IFMT;
97         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLGROUP;
98
99         if (log_unlink && logcookies) {
100                 oa->o_valid |= OBD_MD_FLCOOKIE;
101                 oti.oti_logcookies = logcookies;
102         }
103         rc = obd_destroy(mds->mds_lov_exp, oa, lsm, &oti, obd->obd_self_export);
104         OBDO_FREE(oa);
105         if (rc)
106                 CDEBUG(D_INODE, "destroy orphan objid 0x"LPX64" on ost error "
107                        "%d\n", lsm->lsm_object_id, rc);
108 out_free_memmd:
109         obd_free_memmd(mds->mds_lov_exp, &lsm);
110         RETURN(rc);
111 }
112
113 static int mds_unlink_orphan(struct obd_device *obd, struct dentry *dchild,
114                              struct inode *inode, struct inode *pending_dir)
115 {
116         struct mds_obd *mds = &obd->u.mds;
117         struct lov_mds_md *lmm = NULL;
118         struct llog_cookie *logcookies = NULL;
119         int lmm_size, log_unlink = 0, cookie_size = 0;
120         void *handle = NULL;
121         umode_t mode;
122         int rc, err;
123         ENTRY;
124
125         LASSERT(mds->mds_lov_obd != NULL);
126
127         /* We don't need to do any of these other things for orhpan dirs,
128          * especially not mds_get_md (may get a default LOV EA, bug 4554) */
129         mode = inode->i_mode;
130         if (S_ISDIR(mode)) {
131                 rc = ll_vfs_rmdir(pending_dir, dchild, mds->mds_vfsmnt);
132                 if (rc)
133                         CERROR("error %d unlinking dir %*s from PENDING\n",
134                                rc, dchild->d_name.len, dchild->d_name.name);
135                 RETURN(rc);
136         }
137
138         lmm_size = mds->mds_max_mdsize;
139         OBD_ALLOC(lmm, lmm_size);
140         if (lmm == NULL)
141                 RETURN(-ENOMEM);
142
143         rc = mds_get_md(obd, inode, lmm, &lmm_size, 1, 0, 0);
144         if (rc < 0)
145                 GOTO(out_free_lmm, rc);
146
147         handle = fsfilt_start_log(obd, pending_dir, FSFILT_OP_UNLINK, NULL,
148                                   le32_to_cpu(lmm->lmm_stripe_count));
149         if (IS_ERR(handle)) {
150                 rc = PTR_ERR(handle);
151                 CERROR("error fsfilt_start: %d\n", rc);
152                 handle = NULL;
153                 GOTO(out_free_lmm, rc);
154         }
155
156         rc = ll_vfs_unlink(pending_dir, dchild, mds->mds_vfsmnt);
157         if (rc) {
158                 CERROR("error %d unlinking orphan %.*s from PENDING\n",
159                        rc, dchild->d_name.len, dchild->d_name.name);
160         } else if (lmm_size) {
161                 cookie_size = mds_get_cookie_size(obd, lmm); 
162                 OBD_ALLOC(logcookies, cookie_size);
163                 if (logcookies == NULL)
164                         rc = -ENOMEM;
165                 else if (mds_log_op_unlink(obd, lmm,lmm_size,logcookies,
166                                            cookie_size) > 0)
167                         log_unlink = 1;
168         }
169
170         err = fsfilt_commit(obd, pending_dir, handle, 0);
171         if (err) {
172                 CERROR("error committing orphan unlink: %d\n", err);
173                 if (!rc)
174                         rc = err;
175         } else if (!rc) {
176                 rc = mds_osc_destroy_orphan(obd, mode, lmm, lmm_size,
177                                             logcookies, log_unlink);
178         }
179
180         if (logcookies != NULL)
181                 OBD_FREE(logcookies, cookie_size);
182 out_free_lmm:
183         OBD_FREE(lmm, mds->mds_max_mdsize);
184         RETURN(rc);
185 }
186
187 static __u64 mds_orphans_max_version(struct obd_device *obd)
188 {
189         struct obd_export *exp;
190         __u32 epoch = lr_epoch(obd->u.mds.mds_last_transno);
191         spin_lock(&obd->obd_dev_lock);
192         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain) {
193                 struct lu_export_data *led = &exp->exp_target_data;
194                 epoch = min(epoch, le32_to_cpu(led->led_lcd->lcd_first_epoch));
195         }
196         spin_unlock(&obd->obd_dev_lock);
197         return (__u64)epoch << LR_EPOCH_BITS;
198 }
199
200 /* Delete inodes which were previously open-unlinked but were not reopened
201  * during MDS recovery for whatever reason (e.g. client also failed, recovery
202  * aborted, etc). */
203 int mds_cleanup_pending(struct obd_device *obd)
204 {
205         struct mds_obd *mds = &obd->u.mds;
206         struct lvfs_run_ctxt saved;
207         struct file *file;
208         struct dentry *dchild, *dentry;
209         struct vfsmount *mnt;
210         struct inode *child_inode, *pending_dir = mds->mds_pending_dir->d_inode;
211         struct l_linux_dirent *dirent, *n;
212         struct list_head dentry_list;
213         char d_name[LL_FID_NAMELEN];
214         unsigned long inum;
215         __u64 max_version;
216         int i = 0, rc = 0, item = 0, namlen;
217         ENTRY;
218
219         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
220         /* dentry and mnt ref dropped in dentry_open() on error, or
221          * in filp_close() if dentry_open() succeeds */
222         dentry = dget(mds->mds_pending_dir);
223         if (IS_ERR(dentry))
224                 GOTO(err_pop, rc = PTR_ERR(dentry));
225         mnt = mntget(mds->mds_vfsmnt);
226         if (IS_ERR(mnt))
227                 GOTO(err_mntget, rc = PTR_ERR(mnt));
228
229         file = ll_dentry_open(mds->mds_pending_dir, mds->mds_vfsmnt,
230                            O_RDONLY | O_LARGEFILE, current_cred());
231         if (IS_ERR(file))
232                 GOTO(err_pop, rc = PTR_ERR(file));
233
234         CFS_INIT_LIST_HEAD(&dentry_list);
235         rc = l_readdir(file, &dentry_list);
236         filp_close(file, 0);
237         if (rc < 0)
238                 GOTO(err_out, rc);
239
240         /** Get maximum version for orphans to delete. All other orphans may be
241          *  needed for delayed clients */
242         max_version = mds_orphans_max_version(obd);
243
244         list_for_each_entry_safe(dirent, n, &dentry_list, lld_list) {
245                 __u64 version;
246
247                 i++;
248                 list_del(&dirent->lld_list);
249
250                 namlen = strlen(dirent->lld_name);
251                 LASSERT(sizeof(d_name) >= namlen + 1);
252                 strcpy(d_name, dirent->lld_name);
253                 inum = dirent->lld_ino;
254                 OBD_FREE_PTR(dirent);
255
256                 CDEBUG(D_INODE, "entry %d of PENDING DIR: %s\n", i, d_name);
257
258                 if (((namlen == 1) && !strcmp(d_name, ".")) ||
259                     ((namlen == 2) && !strcmp(d_name, "..")) || inum == 0)
260                         continue;
261
262                 LOCK_INODE_MUTEX(pending_dir);
263                 dchild = lookup_one_len(d_name, mds->mds_pending_dir, namlen);
264                 if (IS_ERR(dchild)) {
265                         UNLOCK_INODE_MUTEX(pending_dir);
266                         GOTO(err_out, rc = PTR_ERR(dchild));
267                 }
268                 if (!dchild->d_inode) {
269                         CWARN("%s: orphan %s has already been removed\n",
270                               obd->obd_name, d_name);
271                         GOTO(next, rc = 0);
272                 }
273
274                 if (is_bad_inode(dchild->d_inode)) {
275                         CERROR("%s: bad orphan inode found %lu/%u\n",
276                                obd->obd_name, dchild->d_inode->i_ino,
277                                dchild->d_inode->i_generation);
278                         GOTO(next, rc = -ENOENT);
279                 }
280
281                 child_inode = dchild->d_inode;
282                 MDS_DOWN_READ_ORPHAN_SEM(child_inode);
283                 if (mds_inode_is_orphan(child_inode) &&
284                     mds_orphan_open_count(child_inode)) {
285                         MDS_UP_READ_ORPHAN_SEM(child_inode);
286                         CWARN("%s: orphan %s re-opened during recovery\n",
287                               obd->obd_name, d_name);
288                         GOTO(next, rc = 0);
289                 }
290                 /** Keep orphans for possible use by delayed exports. Remove
291                  * orphans with version lower than minimal one of all exports */
292                 version = fsfilt_get_version(obd, child_inode);
293                 if ((__s64)version != -EOPNOTSUPP &&
294                     version >= max_version) {
295                         MDS_UP_READ_ORPHAN_SEM(child_inode);
296                         CDEBUG(D_INFO,
297                                "%s: orphan %s is needed for delayed exports\n",
298                                obd->obd_name, d_name);
299                         GOTO(next, rc = 0);
300                 }
301                 MDS_UP_READ_ORPHAN_SEM(child_inode);
302
303                 rc = mds_unlink_orphan(obd, dchild, child_inode, pending_dir);
304                 CDEBUG(D_INODE, "%s: removed orphan %s: rc %d\n",
305                        obd->obd_name, d_name, rc);
306                 if (rc == 0)
307                         item++;
308                 else
309                         rc = 0;
310 next:
311                 l_dput(dchild);
312                 UNLOCK_INODE_MUTEX(pending_dir);
313         }
314         rc = 0;
315 err_out:
316         list_for_each_entry_safe(dirent, n, &dentry_list, lld_list) {
317                 list_del(&dirent->lld_list);
318                 OBD_FREE(dirent, sizeof(*dirent));
319         }
320 err_pop:
321         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
322         if (item > 0)
323                 CWARN("%s: removed %d pending open-unlinked files\n",
324                       obd->obd_name, item);
325         RETURN(rc);
326
327 err_mntget:
328         l_dput(mds->mds_pending_dir);
329         goto err_pop;
330 }
331
332 /**
333  * Determine there is no orphan with the same inode number. That may happens
334  * since unlink replay don't delete inode but keep orphan for delayed clients.
335  * Therefore replays like 'create, unlink, create' will fail due to inode can't
336  * be reused.
337  */
338 int mds_check_stale_orphan(struct obd_device *obd, struct ll_fid *fid)
339 {
340         struct mds_obd *mds = &obd->u.mds;
341         char fidname[32];
342         struct dentry *result;
343         struct inode *inode, *pending_dir = mds->mds_pending_dir->d_inode;
344         int fidlen = 0, rc = 0;
345
346         /* no need in checks*/
347         if (fid->id == 0 || obd->obd_recovering == 0)
348                 RETURN(0);
349
350         /** open by fid like mds_fid2dentry does */
351         snprintf(fidname, sizeof(fidname), "0x%lx", (unsigned long)(fid->id));
352         fidlen = strlen(fidname);
353         result = mds_lookup(obd, fidname, mds->mds_fid_de, fidlen);
354         if (IS_ERR(result))
355                 RETURN(0);
356         inode = result->d_inode;
357         if (!inode)
358                 GOTO(out, rc = 0);
359
360         LOCK_INODE_MUTEX(pending_dir);
361         MDS_DOWN_READ_ORPHAN_SEM(inode);
362         if (mds_inode_is_orphan(inode)) {
363                 struct dentry *orphan;
364
365                 /* bz18927: The exactly same inode can be marked as orphan
366                  * if there was open|creat replay and this is second one */
367                 if (inode->i_generation == fid->generation)
368                         GOTO(unlock_child, rc);
369
370                 if (mds_orphan_open_count(inode) > 0) {
371                         CERROR("Orphan "LPU64"/%u is in use!\n",
372                                fid->id, fid->generation);
373                         GOTO(unlock_child, rc = -EFAULT);
374                 }
375
376                 /** Found orphan in pending dir and delete it */
377                 fidlen = ll_fid2str(fidname, fid->id, inode->i_generation);
378                 orphan = lookup_one_len(fidname, mds->mds_pending_dir, fidlen);
379                 if (IS_ERR(orphan)) {
380                         rc = PTR_ERR(orphan);
381                         CERROR("error looking up %s in PENDING: rc = %d\n",
382                                 fidname, rc);
383                         GOTO(unlock_child, rc);
384                 }
385                 if (orphan->d_inode != inode) {
386                         l_dput(orphan);
387                         CWARN("%s: Found wrong orphan %s %p/%p\n",
388                               obd->obd_name, fidname, orphan->d_inode, inode);
389                         GOTO(unlock_child, rc = -EFAULT);
390                 }
391                 MDS_UP_READ_ORPHAN_SEM(inode);
392
393                 rc = mds_unlink_orphan(obd, orphan, inode, pending_dir);
394                 CDEBUG(D_INODE, "%s: removed orphan %s: rc %d\n",
395                        obd->obd_name, fidname, rc);
396                 l_dput(orphan);
397                 GOTO(unlock, rc);
398         }
399 unlock_child:
400         MDS_UP_READ_ORPHAN_SEM(inode);
401 unlock:
402         UNLOCK_INODE_MUTEX(pending_dir);
403 out:
404         l_dput(result);
405         RETURN(0);
406 }