Whamcloud - gitweb
84a4090987ebc87f204655cf4a6c2f08efdfd80b
[fs/lustre-release.git] / lustre / mds / mds_unlink_open.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mds/mds_orphan.c
5  *
6  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
7  *   Author: Peter Braam <braam@clusterfs.com>
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *   Author: Phil Schwan <phil@clusterfs.com>
10  *
11  *   This file is part of Lustre, http://www.lustre.org.
12  *
13  *   Lustre is free software; you can redistribute it and/or
14  *   modify it under the terms of version 2 of the GNU General Public
15  *   License as published by the Free Software Foundation.
16  *
17  *   Lustre is distributed in the hope that it will be useful,
18  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
19  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20  *   GNU General Public License for more details.
21  *
22  *   You should have received a copy of the GNU General Public License
23  *   along with Lustre; if not, write to the Free Software
24  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25  */
26
27 /* code for handling open unlinked files */
28
29 #define DEBUG_SUBSYSTEM S_MDS
30
31 #include <linux/config.h>
32 #include <linux/module.h>
33 #include <linux/version.h>
34
35 #include <portals/list.h>
36 #include <linux/obd_class.h>
37 #include <linux/lustre_fsfilt.h>
38 #include <linux/lustre_commit_confd.h>
39 #include <linux/lvfs.h>
40
41 #include "mds_internal.h"
42
43
44 /* If we are unlinking an open file/dir (i.e. creating an orphan) then
45  * we instead link the inode into the PENDING directory until it is
46  * finally released.  We can't simply call mds_reint_rename() or some
47  * part thereof, because we don't have the inode to check for link
48  * count/open status until after it is locked.
49  *
50  * For lock ordering, we always get the PENDING, then pending_child lock
51  * last to avoid deadlocks.
52  */
53
54 int mds_open_unlink_rename(struct mds_update_record *rec,
55                            struct obd_device *obd, struct dentry *dparent,
56                            struct dentry *dchild, void **handle)
57 {
58         struct mds_obd *mds = &obd->u.mds;
59         struct inode *pending_dir = mds->mds_pending_dir->d_inode;
60         struct dentry *pending_child;
61         char fidname[LL_FID_NAMELEN];
62         int fidlen = 0, rc;
63         ENTRY;
64
65         LASSERT(!mds_inode_is_orphan(dchild->d_inode));
66
67         down(&pending_dir->i_sem);
68         fidlen = ll_fid2str(fidname, dchild->d_inode->i_ino,
69                             dchild->d_inode->i_generation);
70
71         CWARN("pending destroy of %dx open file %s = %s\n",
72               mds_open_orphan_count(dchild->d_inode),
73               rec->ur_name, fidname);
74
75         pending_child = lookup_one_len(fidname, mds->mds_pending_dir, fidlen);
76         if (IS_ERR(pending_child))
77                 GOTO(out_lock, rc = PTR_ERR(pending_child));
78
79         if (pending_child->d_inode != NULL) {
80                 CERROR("re-destroying orphan file %s?\n", rec->ur_name);
81                 LASSERT(pending_child->d_inode == dchild->d_inode);
82                 GOTO(out_dput, rc = 0);
83         }
84
85         *handle = fsfilt_start(obd, pending_dir, FSFILT_OP_RENAME, NULL);
86         if (IS_ERR(*handle))
87                 GOTO(out_dput, rc = PTR_ERR(*handle));
88
89         lock_kernel();
90         rc = vfs_rename(dparent->d_inode, dchild, pending_dir, pending_child);
91         unlock_kernel();
92         if (rc)
93                 CERROR("error renaming orphan %lu/%s to PENDING: rc = %d\n",
94                        dparent->d_inode->i_ino, rec->ur_name, rc);
95         else
96                 mds_inode_set_orphan(dchild->d_inode);
97 out_dput:
98         dput(pending_child);
99 out_lock:
100         up(&pending_dir->i_sem);
101         RETURN(rc);
102 }
103
104 static int mds_osc_destroy_orphan(struct mds_obd *mds, 
105                                   struct ptlrpc_request *request)
106 {
107         struct mds_body *body;
108         struct lov_mds_md *lmm = NULL;
109         struct lov_stripe_md *lsm = NULL;
110         struct obd_trans_info oti = { 0 };
111         struct obdo *oa;
112         int rc;
113         ENTRY;
114
115         body = lustre_msg_buf(request->rq_repmsg, 0, sizeof(*body));
116         if (!(body->valid & OBD_MD_FLEASIZE))
117                 RETURN(0);
118         if (body->eadatasize == 0) {
119                 CERROR("OBD_MD_FLEASIZE set but eadatasize zero\n");
120                 RETURN(rc = -EPROTO); 
121         }
122
123         lmm = lustre_msg_buf(request->rq_repmsg, 1, body->eadatasize);
124         LASSERT(lmm != NULL);
125
126         rc = obd_unpackmd(mds->mds_osc_exp, &lsm, lmm, body->eadatasize);
127         if (rc < 0) {
128                 CERROR("Error unpack md %p\n", lmm);
129                 RETURN(rc);
130         } else {
131                 LASSERT(rc >= sizeof(*lsm));
132                 rc = 0;
133         }
134
135         oa = obdo_alloc();
136         if (oa == NULL)
137                 GOTO(out_free_memmd, rc = -ENOMEM);
138         oa->o_id = lsm->lsm_object_id;
139         oa->o_mode = body->mode & S_IFMT;
140         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE;
141
142         if (body->valid & OBD_MD_FLCOOKIE) {
143                 oa->o_valid |= OBD_MD_FLCOOKIE;
144                 oti.oti_logcookies = 
145                         lustre_msg_buf(request->rq_repmsg, 2,
146                                        sizeof(struct llog_cookie) *
147                                        lsm->lsm_stripe_count);
148                 if (oti.oti_logcookies == NULL)
149                         oa->o_valid &= ~OBD_MD_FLCOOKIE;
150                         body->valid &= ~OBD_MD_FLCOOKIE;
151         }
152
153         rc = obd_destroy(mds->mds_osc_exp, oa, lsm, &oti);
154         obdo_free(oa);
155         if (rc) 
156                 CERROR("destroy orphan objid 0x"LPX64" on ost error "
157                        "%d\n", lsm->lsm_object_id, rc);
158 out_free_memmd:
159         obd_free_memmd(mds->mds_osc_exp, &lsm);
160         RETURN(rc);
161 }
162
163 static int mds_unlink_orphan(struct obd_device *obd, struct dentry *dchild,
164                              struct inode *inode, struct inode *pending_dir)
165 {
166         struct mds_obd *mds = &obd->u.mds;
167         struct mds_body *body;
168         void *handle = NULL;
169         struct ptlrpc_request *req;
170         int lengths[3] = {sizeof(struct mds_body),
171                           mds->mds_max_mdsize,
172                           mds->mds_max_cookiesize};
173         int rc;
174         ENTRY;
175
176         LASSERT(mds->mds_osc_obd != NULL);
177         OBD_ALLOC(req, sizeof(*req));
178         if (!req) {
179                 CERROR("request allocation out of memory\n");
180                 GOTO(err_alloc_req, rc = -ENOMEM);
181         }
182         rc = lustre_pack_reply(req, 3, lengths, NULL);
183         if (rc) {
184                 CERROR("cannot pack request %d\n", rc);
185                 GOTO(out_free_req, rc);
186         }
187         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
188         LASSERT(body != NULL);
189
190         mds_pack_inode2body(body, inode);
191         mds_pack_md(obd, req->rq_repmsg, 1, body, inode, 1);
192
193         handle = fsfilt_start(obd, pending_dir, FSFILT_OP_UNLINK_LOG, NULL);
194         if (IS_ERR(handle)) {
195                 rc = PTR_ERR(handle);
196                 CERROR("error fsfilt_start: %d\n", rc);
197                 handle = NULL;
198                 GOTO(out_free_msg, rc);
199         }
200
201         if (S_ISDIR(inode->i_mode)) {
202                 rc = vfs_rmdir(pending_dir, dchild);
203         } else {
204                 rc = vfs_unlink(pending_dir, dchild);
205         }
206         if (rc) 
207                 CERROR("error %d unlinking orphan %*s from PENDING directory\n",
208                        rc, dchild->d_name.len, dchild->d_name.name);
209
210         if ((body->valid & OBD_MD_FLEASIZE)) {
211                 if (mds_log_op_unlink(obd, inode, req->rq_repmsg, 1) > 0)
212                         body->valid |= OBD_MD_FLCOOKIE;
213         }
214
215         if (handle) {
216                 int err = fsfilt_commit(obd, pending_dir, handle, 0);
217                 if (err) {
218                         CERROR("error committing orphan unlink: %d\n", err);
219                         rc = err;
220                         GOTO(out_free_msg, rc);
221                 }
222         }
223         rc = mds_osc_destroy_orphan(mds, req);
224 out_free_msg:
225         OBD_FREE(req->rq_repmsg, req->rq_replen);
226         req->rq_repmsg = NULL;
227 out_free_req:
228         OBD_FREE(req, sizeof(*req));
229 err_alloc_req:
230         RETURN(rc);
231 }
232
233 int mds_cleanup_orphans(struct obd_device *obd)
234 {
235         struct mds_obd *mds = &obd->u.mds;
236         struct obd_run_ctxt saved;
237         struct file *file;
238         struct dentry *dchild, *dentry;
239         struct vfsmount *mnt;
240         struct inode *child_inode, *pending_dir = mds->mds_pending_dir->d_inode;
241         struct l_linux_dirent *dirent, *n;
242         struct list_head dentry_list;
243         char d_name[LL_FID_NAMELEN];
244         __u64 i = 0;
245         int rc = 0, item = 0, namlen;
246         ENTRY;
247
248         push_ctxt(&saved, &obd->obd_ctxt, NULL);
249         dentry = dget(mds->mds_pending_dir);
250         if (IS_ERR(dentry))
251                 GOTO(err_pop, rc = PTR_ERR(dentry));
252         mnt = mntget(mds->mds_vfsmnt);
253         if (IS_ERR(mnt))
254                 GOTO(err_mntget, rc = PTR_ERR(mnt));
255
256         file = dentry_open(mds->mds_pending_dir, mds->mds_vfsmnt,
257                            O_RDONLY | O_LARGEFILE);
258         if (IS_ERR(file))
259                 GOTO(err_pop, rc = PTR_ERR(file));
260
261         INIT_LIST_HEAD(&dentry_list);
262         rc = l_readdir(file, &dentry_list);
263         filp_close(file, 0);
264         if (rc < 0)
265                 GOTO(err_out, rc);
266
267         list_for_each_entry_safe(dirent, n, &dentry_list, lld_list) {
268                 i ++;
269                 list_del(&dirent->lld_list);
270
271                 namlen = strlen(dirent->lld_name);
272                 LASSERT(sizeof(d_name) >= namlen + 1);
273                 strcpy(d_name, dirent->lld_name);
274                 OBD_FREE(dirent, sizeof(*dirent));
275
276                 CDEBUG(D_INODE, "entry "LPU64" of PENDING DIR: %s\n",
277                        i, d_name);
278                 
279                 if (((namlen == 1) && !strcmp(d_name, ".")) ||
280                     ((namlen == 2) && !strcmp(d_name, ".."))) {
281                         continue;
282                 }
283
284                 down(&pending_dir->i_sem);
285                 dchild = lookup_one_len(d_name, mds->mds_pending_dir, namlen);
286                 if (IS_ERR(dchild)) {
287                         up(&pending_dir->i_sem);
288                         GOTO(err_out, rc = PTR_ERR(dchild));
289                 }
290                 if (!dchild->d_inode) {
291                         CERROR("orphan %s has been removed\n", d_name);
292                         GOTO(next, rc = 0);
293                 }
294
295                 child_inode = dchild->d_inode;
296                 if (mds_inode_is_orphan(child_inode) &&
297                     mds_open_orphan_count(child_inode)) {
298                         CWARN("orphan %s was re-opened during recovery\n", d_name);
299                         GOTO(next, rc = 0);
300                 }
301
302                 rc = mds_unlink_orphan(obd, dchild, child_inode, pending_dir);
303                 if (rc == 0) {
304                         item ++;
305                         CWARN("removed orphan %s from MDS and OST\n", d_name);
306                 } else {
307                         CERROR("removed orphan %s from MDS and OST failed,"
308                                " rc = %d\n", d_name, rc);
309                         rc = 0;
310                 }
311 next:
312                 l_dput(dchild);
313                 up(&pending_dir->i_sem);
314         }
315 err_out:
316         list_for_each_entry_safe(dirent, n, &dentry_list, lld_list) {
317                 list_del(&dirent->lld_list);
318                 OBD_FREE(dirent, sizeof(*dirent));
319         }
320 err_pop:
321         pop_ctxt(&saved, &obd->obd_ctxt, NULL);
322         if (rc == 0)
323                 rc = item;
324         RETURN(rc);
325
326 err_mntget:
327         l_dput(mds->mds_pending_dir);
328         goto err_pop;
329 }