Whamcloud - gitweb
mds_osc_destroy_orphan did an obd_destroy to the OSTs without setting
[fs/lustre-release.git] / lustre / mds / mds_unlink_open.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mds/mds_orphan.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *   Author: Phil Schwan <phil@clusterfs.com>
10  *
11  *   This file is part of Lustre, http://www.lustre.org.
12  *
13  *   Lustre is free software; you can redistribute it and/or
14  *   modify it under the terms of version 2 of the GNU General Public
15  *   License as published by the Free Software Foundation.
16  *
17  *   Lustre is distributed in the hope that it will be useful,
18  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
19  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20  *   GNU General Public License for more details.
21  *
22  *   You should have received a copy of the GNU General Public License
23  *   along with Lustre; if not, write to the Free Software
24  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25  */
26
27 /* code for handling open unlinked files */
28
29 #define DEBUG_SUBSYSTEM S_MDS
30
31 #include <linux/config.h>
32 #include <linux/module.h>
33 #include <linux/version.h>
34
35 #include <portals/list.h>
36 #include <linux/obd_class.h>
37 #include <linux/lustre_fsfilt.h>
38 #include <linux/lustre_commit_confd.h>
39 #include <linux/lvfs.h>
40
41 #include "mds_internal.h"
42
43
44 /* If we are unlinking an open file/dir (i.e. creating an orphan) then
45  * we instead link the inode into the PENDING directory until it is
46  * finally released.  We can't simply call mds_reint_rename() or some
47  * part thereof, because we don't have the inode to check for link
48  * count/open status until after it is locked.
49  *
50  * For lock ordering, we always get the PENDING, then pending_child lock
51  * last to avoid deadlocks.
52  */
53
54 int mds_open_unlink_rename(struct mds_update_record *rec,
55                            struct obd_device *obd, struct dentry *dparent,
56                            struct dentry *dchild, void **handle)
57 {
58         struct mds_obd *mds = &obd->u.mds;
59         struct inode *pending_dir = mds->mds_pending_dir->d_inode;
60         struct dentry *pending_child;
61         char fidname[LL_FID_NAMELEN];
62         int fidlen = 0, rc;
63         unsigned mode;
64         ENTRY;
65
66         LASSERT(!mds_inode_is_orphan(dchild->d_inode));
67
68         down(&pending_dir->i_sem);
69         fidlen = ll_fid2str(fidname, dchild->d_inode->i_ino,
70                             dchild->d_inode->i_generation);
71
72         CDEBUG(D_HA, "pending destroy of %dx open file %s = %s\n",
73                mds_open_orphan_count(dchild->d_inode), rec->ur_name, fidname);
74
75         pending_child = lookup_one_len(fidname, mds->mds_pending_dir, fidlen);
76         if (IS_ERR(pending_child))
77                 GOTO(out_lock, rc = PTR_ERR(pending_child));
78
79         if (pending_child->d_inode != NULL) {
80                 CERROR("re-destroying orphan file %s?\n", rec->ur_name);
81                 LASSERT(pending_child->d_inode == dchild->d_inode);
82                 GOTO(out_dput, rc = 0);
83         }
84
85         /* link() is semanticaly-wrong for S_IFDIR, so we set S_IFREG
86          * for linking and return real mode back then -bzzz */
87         mode = dchild->d_inode->i_mode;
88         dchild->d_inode->i_mode = S_IFREG;
89         rc = vfs_link(dchild, pending_dir, pending_child);
90         if (rc)
91                 CERROR("error linking orphan %s to PENDING: rc = %d\n",
92                        rec->ur_name, rc);
93         else
94                 mds_inode_set_orphan(dchild->d_inode);
95
96         /* return mode and correct i_nlink if inode is directory */
97         LASSERT(dchild->d_inode->i_nlink == 1);
98         dchild->d_inode->i_mode = mode;
99         if ((mode & S_IFMT) == S_IFDIR) {
100                 dchild->d_inode->i_nlink++;
101                 pending_dir->i_nlink++;
102         }
103         mark_inode_dirty(dchild->d_inode);
104
105 out_dput:
106         dput(pending_child);
107 out_lock:
108         up(&pending_dir->i_sem);
109         RETURN(rc);
110 }
111
112 static int mds_osc_destroy_orphan(struct mds_obd *mds,
113                                   struct inode *inode,
114                                   struct lov_mds_md *lmm,
115                                   int lmm_size,
116                                   struct llog_cookie *logcookies,
117                                   int log_unlink)
118 {
119         struct lov_stripe_md *lsm = NULL;
120         struct obd_trans_info oti = { 0 };
121         struct obdo *oa;
122         int rc;
123         ENTRY;
124
125         if (lmm_size == 0)
126                 RETURN(0);
127
128         rc = obd_unpackmd(mds->mds_osc_exp, &lsm, lmm, lmm_size);
129         if (rc < 0) {
130                 CERROR("Error unpack md %p\n", lmm);
131                 RETURN(rc);
132         } else {
133                 LASSERT(rc >= sizeof(*lsm));
134                 rc = 0;
135         }
136
137         oa = obdo_alloc();
138         if (oa == NULL)
139                 GOTO(out_free_memmd, rc = -ENOMEM);
140         oa->o_id = lsm->lsm_object_id;
141         oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num;
142         oa->o_mode = inode->i_mode & S_IFMT;
143         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLGROUP;
144
145         if (log_unlink && logcookies) {
146                 oa->o_valid |= OBD_MD_FLCOOKIE;
147                 oti.oti_logcookies = logcookies;
148         }
149
150         rc = obd_destroy(mds->mds_osc_exp, oa, lsm, &oti);
151         obdo_free(oa);
152         if (rc)
153                 CDEBUG(D_INODE, "destroy orphan objid 0x"LPX64" on ost error "
154                        "%d\n", lsm->lsm_object_id, rc);
155 out_free_memmd:
156         obd_free_memmd(mds->mds_osc_exp, &lsm);
157         RETURN(rc);
158 }
159
160 static int mds_unlink_orphan(struct obd_device *obd, struct dentry *dchild,
161                              struct inode *inode, struct inode *pending_dir)
162 {
163         struct mds_obd *mds = &obd->u.mds;
164         struct lov_mds_md *lmm = NULL;
165         struct llog_cookie *logcookies = NULL;
166         int lmm_size = 0, log_unlink = 0;
167         void *handle = NULL;
168         int rc, err;
169         ENTRY;
170
171         LASSERT(mds->mds_osc_obd != NULL);
172
173         OBD_ALLOC(lmm, mds->mds_max_mdsize);
174         if (lmm == NULL)
175                 RETURN(-ENOMEM);
176
177         down(&inode->i_sem);
178         rc = fsfilt_get_md(obd, inode, lmm, mds->mds_max_mdsize);
179         up(&inode->i_sem);
180
181         if (rc < 0) {
182                 CERROR("Error %d reading eadata for ino %lu\n",
183                        rc, inode->i_ino);
184                 GOTO(out_free_lmm, rc);
185         } else if (rc > 0) {
186                 lmm_size = rc;
187                 rc = mds_convert_lov_ea(obd, inode, lmm, lmm_size);
188                 if (rc > 0)
189                         lmm_size = rc;
190                 rc = 0;
191         }
192
193         handle = fsfilt_start_log(obd, pending_dir, FSFILT_OP_UNLINK, NULL,
194                                   le32_to_cpu(lmm->lmm_stripe_count));
195         if (IS_ERR(handle)) {
196                 rc = PTR_ERR(handle);
197                 CERROR("error fsfilt_start: %d\n", rc);
198                 handle = NULL;
199                 GOTO(out_free_lmm, rc);
200         }
201
202         down(&inode->i_sem);
203         rc = fsfilt_get_md(obd, inode, lmm, mds->mds_max_mdsize);
204         up(&inode->i_sem);
205
206         if (rc < 0) {
207                 CERROR("Error %d reading eadata for ino %lu\n",
208                        rc, inode->i_ino);
209                 GOTO(out_free_lmm, rc);
210         } else if (rc > 0) {
211                 lmm_size = rc;
212                 rc = 0;
213         }
214
215         if (S_ISDIR(inode->i_mode))
216                 rc = vfs_rmdir(pending_dir, dchild);
217         else
218                 rc = vfs_unlink(pending_dir, dchild);
219
220         if (rc)
221                 CERROR("error %d unlinking orphan %*s from PENDING directory\n",
222                        rc, dchild->d_name.len, dchild->d_name.name);
223
224         if (!rc && lmm_size) {
225                 OBD_ALLOC(logcookies, mds->mds_max_cookiesize);
226                 if (logcookies == NULL)
227                         rc = -ENOMEM;
228                 else if (mds_log_op_unlink(obd, inode, lmm,lmm_size,logcookies,
229                                            mds->mds_max_cookiesize) > 0)
230                         log_unlink = 1;
231         }
232         err = fsfilt_commit(obd, mds->mds_sb, pending_dir, handle, 0);
233         if (err) {
234                 CERROR("error committing orphan unlink: %d\n", err);
235                 if (!rc)
236                         rc = err;
237         }
238         if (!rc) {
239                 rc = mds_osc_destroy_orphan(mds, inode, lmm, lmm_size,
240                                             logcookies, log_unlink);
241         }
242
243         if (logcookies != NULL)
244                 OBD_FREE(logcookies, mds->mds_max_cookiesize);
245 out_free_lmm:
246         OBD_FREE(lmm, mds->mds_max_mdsize);
247         RETURN(rc);
248 }
249
250 int mds_cleanup_orphans(struct obd_device *obd)
251 {
252         struct mds_obd *mds = &obd->u.mds;
253         struct lvfs_run_ctxt saved;
254         struct file *file;
255         struct dentry *dchild, *dentry;
256         struct vfsmount *mnt;
257         struct inode *child_inode, *pending_dir = mds->mds_pending_dir->d_inode;
258         struct l_linux_dirent *dirent, *n;
259         struct list_head dentry_list;
260         char d_name[LL_FID_NAMELEN];
261         __u64 i = 0;
262         int rc = 0, item = 0, namlen;
263         ENTRY;
264
265         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
266         dentry = dget(mds->mds_pending_dir);
267         if (IS_ERR(dentry))
268                 GOTO(err_pop, rc = PTR_ERR(dentry));
269         mnt = mntget(mds->mds_vfsmnt);
270         if (IS_ERR(mnt))
271                 GOTO(err_mntget, rc = PTR_ERR(mnt));
272
273         file = dentry_open(mds->mds_pending_dir, mds->mds_vfsmnt,
274                            O_RDONLY | O_LARGEFILE);
275         if (IS_ERR(file))
276                 GOTO(err_pop, rc = PTR_ERR(file));
277
278         INIT_LIST_HEAD(&dentry_list);
279         rc = l_readdir(file, &dentry_list);
280         filp_close(file, 0);
281         if (rc < 0)
282                 GOTO(err_out, rc);
283
284         list_for_each_entry_safe(dirent, n, &dentry_list, lld_list) {
285                 i ++;
286                 list_del(&dirent->lld_list);
287
288                 namlen = strlen(dirent->lld_name);
289                 LASSERT(sizeof(d_name) >= namlen + 1);
290                 strcpy(d_name, dirent->lld_name);
291                 OBD_FREE(dirent, sizeof(*dirent));
292
293                 CDEBUG(D_INODE, "entry "LPU64" of PENDING DIR: %s\n",
294                        i, d_name);
295
296                 if (((namlen == 1) && !strcmp(d_name, ".")) ||
297                     ((namlen == 2) && !strcmp(d_name, ".."))) {
298                         continue;
299                 }
300
301                 down(&pending_dir->i_sem);
302                 dchild = lookup_one_len(d_name, mds->mds_pending_dir, namlen);
303                 if (IS_ERR(dchild)) {
304                         up(&pending_dir->i_sem);
305                         GOTO(err_out, rc = PTR_ERR(dchild));
306                 }
307                 if (!dchild->d_inode) {
308                         CERROR("orphan %s has been removed\n", d_name);
309                         GOTO(next, rc = 0);
310                 }
311
312                 child_inode = dchild->d_inode;
313                 if (mds_inode_is_orphan(child_inode) &&
314                     mds_open_orphan_count(child_inode)) {
315                         CWARN("orphan %s was re-opened during recovery\n", d_name);
316                         GOTO(next, rc = 0);
317                 }
318
319                 rc = mds_unlink_orphan(obd, dchild, child_inode, pending_dir);
320                 if (rc == 0) {
321                         item ++;
322                         CWARN("removed orphan %s from MDS and OST\n", d_name);
323                 } else {
324                         CDEBUG(D_INODE, "removed orphan %s from MDS/OST failed,"
325                                " rc = %d\n", d_name, rc);
326                         rc = 0;
327                 }
328 next:
329                 l_dput(dchild);
330                 up(&pending_dir->i_sem);
331         }
332 err_out:
333         list_for_each_entry_safe(dirent, n, &dentry_list, lld_list) {
334                 list_del(&dirent->lld_list);
335                 OBD_FREE(dirent, sizeof(*dirent));
336         }
337 err_pop:
338         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
339         if (rc == 0)
340                 rc = item;
341         RETURN(rc);
342
343 err_mntget:
344         l_dput(mds->mds_pending_dir);
345         goto err_pop;
346 }