Whamcloud - gitweb
b=6379
[fs/lustre-release.git] / lustre / mds / mds_unlink_open.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mds/mds_orphan.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *   Author: Phil Schwan <phil@clusterfs.com>
10  *
11  *   This file is part of Lustre, http://www.lustre.org.
12  *
13  *   Lustre is free software; you can redistribute it and/or
14  *   modify it under the terms of version 2 of the GNU General Public
15  *   License as published by the Free Software Foundation.
16  *
17  *   Lustre is distributed in the hope that it will be useful,
18  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
19  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20  *   GNU General Public License for more details.
21  *
22  *   You should have received a copy of the GNU General Public License
23  *   along with Lustre; if not, write to the Free Software
24  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25  */
26
27 /* code for handling open unlinked files */
28
29 #define DEBUG_SUBSYSTEM S_MDS
30
31 #include <linux/config.h>
32 #include <linux/module.h>
33 #include <linux/version.h>
34
35 #include <libcfs/list.h>
36 #include <linux/obd_class.h>
37 #include <linux/lustre_fsfilt.h>
38 #include <linux/lustre_commit_confd.h>
39 #include <linux/lvfs.h>
40
41 #include "mds_internal.h"
42
43 /*
44  * used when destroying orphanes and from mds_reint_unlink() when MDS wants to
45  * destroy objects on OSS.
46  */
47 int
48 mds_unlink_object(struct mds_obd *mds, struct inode *inode,
49                   struct lov_mds_md *lmm, int lmm_size,
50                   struct llog_cookie *logcookies,
51                   int log_unlink, int async)
52 {
53         struct lov_stripe_md *lsm = NULL;
54         struct obd_trans_info oti = { 0 };
55         struct obdo *oa;
56         int rc;
57         ENTRY;
58
59         if (lmm_size == 0)
60                 RETURN(0);
61
62         rc = obd_unpackmd(mds->mds_dt_exp, &lsm, lmm, lmm_size);
63         if (rc < 0) {
64                 CERROR("Error unpack md %p\n", lmm);
65                 RETURN(rc);
66         } else {
67                 LASSERT(rc >= sizeof(*lsm));
68                 rc = 0;
69         }
70
71         oa = obdo_alloc();
72         if (oa == NULL)
73                 GOTO(out_free_memmd, rc = -ENOMEM);
74         oa->o_id = lsm->lsm_object_id;
75         oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num;
76         oa->o_mode = inode->i_mode & S_IFMT;
77         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLGROUP;
78
79         if (log_unlink && logcookies) {
80                 oa->o_valid |= OBD_MD_FLCOOKIE;
81                 oti.oti_logcookies = logcookies;
82         }
83
84         CDEBUG(D_INODE, "destroy OSS object %d/%d\n",
85                (int)oa->o_id, (int)oa->o_gr);
86         
87         oti.oti_async = async;
88         rc = obd_destroy(mds->mds_dt_exp, oa, lsm, &oti);
89         obdo_free(oa);
90 out_free_memmd:
91         obd_free_memmd(mds->mds_dt_exp, &lsm);
92         RETURN(rc);
93 }
94
95 static int mds_unlink_orphan(struct obd_device *obd, struct dentry *dchild,
96                              struct inode *inode, struct inode *pending_dir)
97 {
98         struct mds_obd *mds = &obd->u.mds;
99         struct lov_mds_md *lmm = NULL;
100         struct llog_cookie *logcookies = NULL;
101         int lmm_size, log_unlink = 0;
102         void *handle = NULL;
103         int rc, err;
104         ENTRY;
105
106         LASSERT(mds->mds_dt_obd != NULL);
107         LASSERT(obd->obd_recovering == 0);
108
109         /* We don't need to do any of these other things for orhpan dirs,
110          * especially not mds_get_md (may get a default LOV EA, bug 4554) */
111         if (S_ISDIR(inode->i_mode)) {
112                 rc = vfs_rmdir(pending_dir, dchild);
113                 if (rc)
114                         CERROR("error %d unlinking dir %*s from PENDING\n",
115                                rc, dchild->d_name.len, dchild->d_name.name);
116                 RETURN(rc);
117         }
118
119         lmm_size = mds->mds_max_mdsize;
120         OBD_ALLOC(lmm, lmm_size);
121         if (lmm == NULL)
122                 RETURN(-ENOMEM);
123
124         rc = mds_get_md(obd, inode, lmm, &lmm_size, 1, 0);
125         if (rc < 0)
126                 GOTO(out_free_lmm, rc);
127
128         handle = fsfilt_start_log(obd, pending_dir, FSFILT_OP_UNLINK, NULL,
129                                   le32_to_cpu(lmm->lmm_stripe_count));
130         if (IS_ERR(handle)) {
131                 rc = PTR_ERR(handle);
132                 CERROR("error fsfilt_start: %d\n", rc);
133                 handle = NULL;
134                 GOTO(out_free_lmm, rc);
135         }
136
137         rc = vfs_unlink(pending_dir, dchild);
138         if (rc) {
139                 CERROR("error %d unlinking orphan %.*s from PENDING\n",
140                        rc, dchild->d_name.len, dchild->d_name.name);
141         } else if (lmm_size) {
142                 OBD_ALLOC(logcookies, mds->mds_max_cookiesize);
143                 if (logcookies == NULL)
144                         rc = -ENOMEM;
145                 else if (mds_log_op_unlink(obd, inode, lmm,lmm_size,logcookies,
146                                            mds->mds_max_cookiesize, NULL) > 0)
147                         log_unlink = 1;
148         }
149         err = fsfilt_commit(obd, mds->mds_sb, pending_dir, handle, 0);
150         if (err) {
151                 CERROR("error committing orphan unlink: %d\n", err);
152                 if (!rc)
153                         rc = err;
154         } else if (!rc) {
155                 rc = mds_unlink_object(mds, inode, lmm, lmm_size,
156                                        logcookies, log_unlink, 0);
157         }
158
159         if (logcookies != NULL)
160                 OBD_FREE(logcookies, mds->mds_max_cookiesize);
161 out_free_lmm:
162         OBD_FREE(lmm, mds->mds_max_mdsize);
163         RETURN(rc);
164 }
165
166 int mds_cleanup_orphans(struct obd_device *obd)
167 {
168         struct mds_obd *mds = &obd->u.mds;
169         struct lvfs_run_ctxt saved;
170         struct file *file;
171         struct dentry *dchild, *dentry;
172         struct vfsmount *mnt;
173         struct inode *child_inode, *pending_dir = mds->mds_pending_dir->d_inode;
174         struct l_linux_dirent *dirent, *n;
175         struct list_head dentry_list;
176         char d_name[LL_ID_NAMELEN];
177         unsigned long inum;
178         __u64 i = 0;
179         int rc = 0, item = 0, namlen;
180         ENTRY;
181
182         LASSERT(obd->obd_recovering == 0);
183         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
184         dentry = dget(mds->mds_pending_dir);
185         if (IS_ERR(dentry))
186                 GOTO(err_pop, rc = PTR_ERR(dentry));
187         mnt = mntget(mds->mds_vfsmnt);
188         if (IS_ERR(mnt))
189                 GOTO(err_mntget, rc = PTR_ERR(mnt));
190
191         file = dentry_open(mds->mds_pending_dir, mds->mds_vfsmnt,
192                            O_RDONLY | O_LARGEFILE);
193         if (IS_ERR(file))
194                 GOTO(err_pop, rc = PTR_ERR(file));
195
196         INIT_LIST_HEAD(&dentry_list);
197         rc = l_readdir(file, &dentry_list);
198         filp_close(file, 0);
199         if (rc < 0)
200                 GOTO(err_out, rc);
201
202         list_for_each_entry_safe(dirent, n, &dentry_list, lld_list) {
203                 i++;
204                 list_del(&dirent->lld_list);
205
206                 namlen = strlen(dirent->lld_name);
207                 LASSERT(sizeof(d_name) >= namlen + 1);
208                 strcpy(d_name, dirent->lld_name);
209                 inum = dirent->lld_ino;
210                 OBD_FREE(dirent, sizeof(*dirent));
211
212                 CDEBUG(D_INODE, "entry "LPU64" of PENDING DIR: %s\n",
213                        i, d_name);
214
215                 if (((namlen == 1) && !strcmp(d_name, ".")) ||
216                     ((namlen == 2) && !strcmp(d_name, "..")) || inum == 0)
217                         continue;
218
219                 down(&pending_dir->i_sem);
220                 dchild = lookup_one_len(d_name, mds->mds_pending_dir, namlen);
221                 if (IS_ERR(dchild)) {
222                         up(&pending_dir->i_sem);
223                         GOTO(err_out, rc = PTR_ERR(dchild));
224                 }
225                 if (!dchild->d_inode) {
226                         CERROR("orphan %s has been removed\n", d_name);
227                         GOTO(next, rc = 0);
228                 }
229
230                 if (is_bad_inode(dchild->d_inode)) {
231                         CERROR("bad orphan inode found %lu/%u\n",
232                                dchild->d_inode->i_ino,
233                                dchild->d_inode->i_generation);
234                         GOTO(next, rc = -ENOENT);
235                 }
236
237                 child_inode = dchild->d_inode;
238                 DOWN_READ_I_ALLOC_SEM(child_inode);
239                 if (mds_orphan_open_count(child_inode)) {
240                         UP_READ_I_ALLOC_SEM(child_inode);
241                         CWARN("orphan %s re-opened during recovery\n", d_name);
242                         GOTO(next, rc = 0);
243                 }
244                 if (!mds_inode_is_orphan(child_inode)) {
245                         UP_READ_I_ALLOC_SEM(child_inode);
246                         CWARN("orphan %s has been removed by CLOSE\n", d_name);
247                         GOTO(next, rc = 0);
248                 }
249                 mds_inode_unset_orphan(child_inode);
250                 UP_READ_I_ALLOC_SEM(child_inode);
251                 rc = mds_unlink_orphan(obd, dchild, child_inode, pending_dir);
252                 if (rc == 0) {
253                         item ++;
254                         CWARN("removed orphan %s from MDS and OST\n", d_name);
255                 } else {
256                         CDEBUG(D_INODE, "removed orphan %s from MDS/OST failed,"
257                                " rc = %d\n", d_name, rc);
258                         rc = 0;
259                 }
260 next:
261                 l_dput(dchild);
262                 up(&pending_dir->i_sem);
263         }
264 err_out:
265         list_for_each_entry_safe(dirent, n, &dentry_list, lld_list) {
266                 list_del(&dirent->lld_list);
267                 OBD_FREE(dirent, sizeof(*dirent));
268         }
269 err_pop:
270         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
271         if (rc == 0)
272                 rc = item;
273         RETURN(rc);
274
275 err_mntget:
276         l_dput(mds->mds_pending_dir);
277         goto err_pop;
278 }