Whamcloud - gitweb
Exit early from mds_open() if we get an error.
[fs/lustre-release.git] / lustre / mds / mds_open.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Andreas Dilger <adilger@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Mike Shaver <shaver@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #define EXPORT_SYMTAB
27 #define DEBUG_SUBSYSTEM S_MDS
28
29 #include <linux/module.h>
30 #include <linux/lustre_mds.h>
31 #include <linux/lustre_dlm.h>
32 #include <linux/init.h>
33 #include <linux/obd_class.h>
34 #include <linux/random.h>
35 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
36 # include <linux/buffer_head.h>
37 # include <linux/workqueue.h>
38 #else
39 # include <linux/locks.h>
40 #endif
41 #include <linux/obd_lov.h>
42 #include <linux/lustre_mds.h>
43 #include <linux/lustre_fsfilt.h>
44 #include <linux/lprocfs_status.h>
45
46 #include "mds_internal.h"
47
48 struct mds_file_data *mds_dentry_open(struct dentry *dentry,
49                                       struct vfsmount *mnt,
50                                       int flags,
51                                       struct ptlrpc_request *req)
52 {
53         struct mds_export_data *med = &req->rq_export->exp_mds_data;
54         struct inode *inode;
55         struct mds_file_data *mfd;
56         int mode, error;
57
58         mfd = mds_mfd_new();
59         if (mfd == NULL) {
60                 CERROR("mds: out of memory\n");
61                 GOTO(cleanup_dentry, error = -ENOMEM);
62         }
63
64         mode = (flags + 1) & O_ACCMODE;
65         inode = dentry->d_inode;
66
67         if (mode & FMODE_WRITE) {
68                 error = get_write_access(inode);
69                 if (error)
70                         goto cleanup_mfd;
71         }
72
73         mfd->mfd_mode = mode;
74         mfd->mfd_dentry = dentry;
75         mfd->mfd_xid = req->rq_xid;
76
77         spin_lock(&med->med_open_lock);
78         list_add(&mfd->mfd_list, &med->med_open_head);
79         spin_unlock(&med->med_open_lock);
80         mds_mfd_put(mfd);
81         return mfd;
82
83 cleanup_mfd:
84         mds_mfd_put(mfd);
85         mds_mfd_destroy(mfd);
86 cleanup_dentry:
87         dput(dentry);
88         mntput(mnt);
89         return ERR_PTR(error);
90 }
91
92 void reconstruct_open(struct mds_update_record *rec, int offset,
93                       struct ptlrpc_request *req,
94                       struct lustre_handle *child_lockh)
95 {
96         struct ptlrpc_request *oldreq = req->rq_export->exp_outstanding_reply;
97         struct mds_export_data *med = &req->rq_export->exp_mds_data;
98         struct mds_client_data *mcd = med->med_mcd;
99         struct mds_obd *mds = mds_req2mds(req);
100         struct mds_file_data *mfd;
101         struct obd_device *obd = req->rq_export->exp_obd;
102         struct dentry *parent, *child;
103         struct ldlm_reply *rep;
104         struct mds_body *body;
105         int rc;
106         struct list_head *t;
107         int put_child = 1;
108         ENTRY;
109
110         LASSERT(offset == 2);                  /* only called via intent */
111         rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
112         body = lustre_msg_buf(req->rq_repmsg, 1, sizeof (*body));
113
114         /* copy rc, transno and disp; steal locks */
115         req->rq_transno = mcd->mcd_last_transno;
116         req->rq_status = mcd->mcd_last_result;
117         intent_set_disposition(rep, mcd->mcd_last_data);
118
119         if (oldreq)
120                 mds_steal_ack_locks(req->rq_export, req);
121
122         /* Only replay if create or open actually happened. */
123         if (!intent_disposition(rep, DISP_OPEN_CREATE | DISP_OPEN_OPEN) ) {
124                 EXIT;
125                 return; /* error looking up parent or child */
126         }
127
128         parent = mds_fid2dentry(mds, rec->ur_fid1, NULL);
129         LASSERT(!IS_ERR(parent));
130
131         child = ll_lookup_one_len(rec->ur_name, parent, rec->ur_namelen - 1);
132         LASSERT(!IS_ERR(child));
133
134         if (!child->d_inode) {
135                 GOTO(out_dput, 0); /* child not present to open */
136         }
137
138         /* At this point, we know we have a child. We'll send
139          * it back _unless_ it not created and open failed.
140          */
141         if (intent_disposition(rep, DISP_OPEN_OPEN) &&
142             !intent_disposition(rep, DISP_OPEN_CREATE) &&
143             req->rq_status) {
144                 GOTO(out_dput, 0);
145         }
146
147         /* get lock (write for O_CREAT, read otherwise) */
148
149         mds_pack_inode2fid(&body->fid1, child->d_inode);
150         mds_pack_inode2body(body, child->d_inode);
151         if (S_ISREG(child->d_inode->i_mode)) {
152                 rc = mds_pack_md(obd, req->rq_repmsg, 2, body,
153                                  child->d_inode);
154
155                 if (rc)
156                         LASSERT(rc == req->rq_status);
157
158                 /* If we have LOV EA data, the OST holds size, mtime */
159                 if (!(body->valid & OBD_MD_FLEASIZE))
160                         body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
161                                         OBD_MD_FLATIME | OBD_MD_FLMTIME);
162         } else {
163                 /* XXX need to check this case */
164         }
165
166         /* If we're opening a file without an EA, change to a write
167            lock (unless we already have one). */
168
169         /* If we have -EEXIST as the status, and we were asked to create
170          * exclusively, we can tell we failed because the file already existed.
171          */
172         if (req->rq_status == -EEXIST &&
173             ((rec->ur_flags & (O_CREAT | O_EXCL)) == (O_CREAT | O_EXCL))) {
174                 GOTO(out_dput, 0);
175         }
176
177         /* If we didn't get as far as trying to open, then some locking thing
178          * probably went wrong, and we'll just bail here.
179          */
180         if (!intent_disposition(rep, DISP_OPEN_OPEN))
181                 GOTO(out_dput, 0);
182
183         /* If we failed, then we must have failed opening, so don't look for
184          * file descriptor or anything, just give the client the bad news.
185          */
186         if (req->rq_status)
187                 GOTO(out_dput, 0);
188
189         mfd = NULL;
190         list_for_each(t, &med->med_open_head) {
191                 mfd = list_entry(t, struct mds_file_data, mfd_list);
192                 if (mfd->mfd_xid == req->rq_xid)
193                         break;
194                 mfd = NULL;
195         }
196
197         if (oldreq) {
198                 /* if we're not recovering, it had better be found */
199                 LASSERT(mfd);
200         } else if (mfd == NULL) {
201                 mntget(mds->mds_vfsmnt);
202                 CERROR("Re-opened file \n");
203                 mfd = mds_dentry_open(child, mds->mds_vfsmnt,
204                                    rec->ur_flags & ~(O_DIRECT | O_TRUNC), req);
205                 if (!mfd) {
206                         CERROR("mds: out of memory\n");
207                         GOTO(out_dput, req->rq_status = -ENOMEM);
208                 }
209                 put_child = 0;
210         }
211
212         body->handle.cookie = mfd->mfd_handle.h_cookie;
213
214  out_dput:
215         if (put_child)
216                 l_dput(child);
217         l_dput(parent);
218         EXIT;
219 }
220
221 int mds_pin(struct ptlrpc_request *req)
222 {
223         struct mds_obd *mds = mds_req2mds(req);
224         struct inode *pending_dir = mds->mds_pending_dir->d_inode;
225         struct mds_file_data *mfd = NULL;
226         struct mds_body *body;
227         struct dentry *dchild;
228         struct obd_run_ctxt saved;
229         char fidname[LL_FID_NAMELEN];
230         int fidlen = 0, rc, cleanup_phase = 0, size = sizeof(*body);
231         ENTRY;
232
233         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
234
235         down(&pending_dir->i_sem);
236         fidlen = ll_fid2str(fidname, body->fid1.id, body->fid1.generation);
237         dchild = lookup_one_len(fidname, mds->mds_pending_dir, fidlen);
238         if (IS_ERR(dchild)) {
239                 up(&pending_dir->i_sem);
240                 rc = PTR_ERR(dchild);
241                 CERROR("error looking up %s in PENDING: rc = %d\n",
242                        fidname, rc);
243                 RETURN(rc);
244         }
245
246         cleanup_phase = 2;
247
248         if (dchild->d_inode) {
249                 up(&pending_dir->i_sem);
250                 mds_inode_set_orphan(dchild->d_inode);
251                 mds_pack_inode2fid(&body->fid1, dchild->d_inode);
252                 mds_pack_inode2body(body, dchild->d_inode);
253                 GOTO(openit, rc = 0);
254         }
255         dput(dchild);
256         up(&pending_dir->i_sem);
257
258         /* We didn't find it in PENDING so it isn't an orphan.  See
259          * if it's a regular inode. */
260         dchild = mds_fid2dentry(mds, &body->fid1, NULL);
261         if (!IS_ERR(dchild)) {
262                 mds_pack_inode2fid(&body->fid1, dchild->d_inode);
263                 mds_pack_inode2body(body, dchild->d_inode);
264                 GOTO(openit, rc = 0);
265         }
266
267         /* We didn't find this inode on disk, but we're trying to pin it.
268          * This should never happen. */
269         CERROR("ENOENT during mds_pin for fid "LPU64"/%u\n", body->fid1.id,
270                body->fid1.generation);
271         RETURN(-ENOENT);
272
273  openit:
274         /* dentry_open does a dput(de) and mntput(mds->mds_vfsmnt) on error */
275         mfd = mds_dentry_open(dchild, mds->mds_vfsmnt, body->flags, req);
276         if (IS_ERR(mfd)) {
277                 dchild = NULL; /* prevent a double dput in cleanup phase 2 */
278                 GOTO(cleanup, rc = PTR_ERR(mfd));
279         }
280
281         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
282         if (rc) {
283                 CERROR("out of memoryK\n");
284                 GOTO(cleanup, rc);
285         }
286         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
287
288         cleanup_phase = 4; /* mfd allocated */
289         body->handle.cookie = mfd->mfd_handle.h_cookie;
290         CDEBUG(D_INODE, "mfd %p, cookie "LPX64"\n", mfd,
291                mfd->mfd_handle.h_cookie);
292         GOTO(cleanup, rc = 0);
293
294  cleanup:
295         push_ctxt(&saved, &mds->mds_ctxt, NULL);
296         rc = mds_finish_transno(mds, dchild ? dchild->d_inode : NULL, NULL,
297                                 req, rc, 0);
298         pop_ctxt(&saved, &mds->mds_ctxt, NULL);
299         /* XXX what do we do here if mds_finish_transno itself failed? */
300         switch (cleanup_phase) {
301         case 4:
302                 if (rc)
303                         mds_mfd_destroy(mfd);
304         case 2:
305                 if (rc || S_ISLNK(dchild->d_inode->i_mode))
306                         l_dput(dchild);
307         }
308         return rc;
309 }
310
311 int mds_open(struct mds_update_record *rec, int offset,
312              struct ptlrpc_request *req, struct lustre_handle *child_lockh)
313 {
314         /* XXX ALLOCATE _something_ - 464 bytes on stack here */
315         static const char acc_table [] = {[O_RDONLY] MAY_READ,
316                                           [O_WRONLY] MAY_WRITE,
317                                           [O_RDWR]   MAY_READ | MAY_WRITE};
318         struct mds_obd *mds = mds_req2mds(req);
319         struct obd_device *obd = req->rq_export->exp_obd;
320         struct ldlm_reply *rep = NULL;
321         struct mds_body *body = NULL;
322         struct dentry *dchild = NULL, *parent = NULL;
323         struct mds_export_data *med;
324         struct mds_file_data *mfd = NULL;
325         struct ldlm_res_id child_res_id = { .name = {0} };
326         struct lustre_handle parent_lockh;
327         int rc = 0, parent_mode, child_mode = LCK_PR, lock_flags, created = 0;
328         int cleanup_phase = 0, acc_mode;
329         void *handle = NULL;
330         ENTRY;
331
332         if (offset == 2) { /* intent */
333                 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
334                 body = lustre_msg_buf(req->rq_repmsg, 1, sizeof (*body));
335         } else if (offset == 0) { /* non-intent reint */
336                 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
337         } else {
338                 body = NULL;
339                 LBUG();
340         }
341
342         MDS_CHECK_RESENT(req, reconstruct_open(rec, offset, req, child_lockh));
343
344         /* Step 0: If we are passed a fid, then we assume the client already
345          * opened this file and is only replaying the RPC, so we open the
346          * inode by fid (at some large expense in security).
347          */
348         if (rec->ur_fid2->id) {
349                 struct inode *pending_dir = mds->mds_pending_dir->d_inode;
350                 char fidname[LL_FID_NAMELEN];
351                 int fidlen = 0;
352
353                 down(&pending_dir->i_sem);
354                 fidlen = ll_fid2str(fidname, rec->ur_fid2->id,
355                                     rec->ur_fid2->generation);
356                 dchild = lookup_one_len(fidname, mds->mds_pending_dir, fidlen);
357                 if (IS_ERR(dchild)) {
358                         up(&pending_dir->i_sem);
359                         rc = PTR_ERR(dchild);
360                         CERROR("error looking up %s in PENDING: rc = %d\n",
361                                fidname, rc);
362                         RETURN(rc);
363                 }
364
365                 if (dchild->d_inode) {
366                         up(&pending_dir->i_sem);
367                         mds_inode_set_orphan(dchild->d_inode);
368                         mds_pack_inode2fid(&body->fid1, dchild->d_inode);
369                         mds_pack_inode2body(body, dchild->d_inode);
370                         cleanup_phase = 2;
371                         GOTO(openit, rc = 0);
372                 }
373                 dput(dchild);
374                 up(&pending_dir->i_sem);
375
376                 /* We didn't find it in PENDING so it isn't an orphan.  See
377                  * if it was a regular inode that was previously created.
378                  */
379                 dchild = mds_fid2dentry(mds, rec->ur_fid2, NULL);
380                 if (!IS_ERR(dchild)) {
381                         mds_pack_inode2fid(&body->fid1, dchild->d_inode);
382                         mds_pack_inode2body(body, dchild->d_inode);
383                         cleanup_phase = 2;
384                         GOTO(openit, rc = 0);
385                 }
386
387                 /* We didn't find the correct inode on disk either, so we
388                  * need to re-create it via a regular replay.  Do that below.
389                  */
390                 LASSERT(rec->ur_flags & O_CREAT);
391         }
392         LASSERT(offset == 2); /* If we got here, we must be called via intent */
393
394         med = &req->rq_export->exp_mds_data;
395         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
396                 CERROR("test case OBD_FAIL_MDS_OPEN_PACK\n");
397                 req->rq_status = -ENOMEM;
398                 RETURN(-ENOMEM);
399         }
400
401         if ((rec->ur_flags & O_ACCMODE) >= sizeof (acc_table))
402                 RETURN(-EINVAL);
403         acc_mode = acc_table[rec->ur_flags & O_ACCMODE];
404         if ((rec->ur_flags & O_TRUNC) != 0)
405                 acc_mode |= MAY_WRITE;
406
407         /* Step 1: Find and lock the parent */
408         parent_mode = (rec->ur_flags & O_CREAT) ? LCK_PW : LCK_PR;
409         parent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, parent_mode,
410                                        &parent_lockh);
411         if (IS_ERR(parent)) {
412                 rc = PTR_ERR(parent);
413                 CERROR("parent lookup error %d\n", rc);
414                 GOTO(cleanup, rc);
415         }
416         LASSERT(parent->d_inode);
417
418         cleanup_phase = 1; /* parent dentry and lock */
419
420         /* Step 2: Lookup the child */
421         dchild = ll_lookup_one_len(rec->ur_name, parent, rec->ur_namelen - 1);
422         if (IS_ERR(dchild))
423                 GOTO(cleanup, rc = PTR_ERR(dchild));
424
425         cleanup_phase = 2; /* child dentry */
426
427         intent_set_disposition(rep, DISP_LOOKUP_EXECD);
428         if (dchild->d_inode)
429                 intent_set_disposition(rep, DISP_LOOKUP_POS);
430         else
431                 intent_set_disposition(rep, DISP_LOOKUP_NEG);
432
433         /* Step 3: If the child was negative, and we're supposed to,
434          * create it. */
435         if (!dchild->d_inode) {
436                 unsigned long ino = rec->ur_fid2->id;
437
438                 if (!(rec->ur_flags & O_CREAT)) {
439                         /* It's negative and we weren't supposed to create it */
440                         GOTO(cleanup, rc = -ENOENT);
441                 }
442
443                 intent_set_disposition(rep, DISP_OPEN_CREATE);
444                 handle = fsfilt_start(obd, parent->d_inode, FSFILT_OP_CREATE,
445                                       NULL);
446                 if (IS_ERR(handle)) {
447                         rc = PTR_ERR(handle);
448                         handle = NULL;
449                         GOTO(cleanup, rc);
450                 }
451                 if (ino)
452                         dchild->d_fsdata = (void *)(unsigned long)ino;
453
454                 rc = vfs_create(parent->d_inode, dchild, rec->ur_mode);
455                 if (dchild->d_fsdata == (void *)(unsigned long)ino)
456                         dchild->d_fsdata = NULL;
457
458                 if (rc) {
459                         CDEBUG(D_INODE, "error during create: %d\n", rc);
460                         GOTO(cleanup, rc);
461                 } else {
462                         struct iattr iattr;
463                         struct inode *inode = dchild->d_inode;
464
465                         if (ino) {
466                                 LASSERT(ino == inode->i_ino);
467                                 /* Written as part of setattr */
468                                 inode->i_generation = rec->ur_fid2->generation;
469                                 CDEBUG(D_HA, "recreated ino %lu with gen %x\n",
470                                        inode->i_ino, inode->i_generation);
471                         }
472
473                         created = 1;
474                         LTIME_S(iattr.ia_atime) = rec->ur_time;
475                         LTIME_S(iattr.ia_ctime) = rec->ur_time;
476                         LTIME_S(iattr.ia_mtime) = rec->ur_time;
477
478                         iattr.ia_uid = rec->ur_uid;
479                         if (parent->d_inode->i_mode & S_ISGID) {
480                                 iattr.ia_gid = parent->d_inode->i_gid;
481                         } else
482                                 iattr.ia_gid = rec->ur_gid;
483
484                         iattr.ia_valid = ATTR_UID | ATTR_GID | ATTR_ATIME |
485                                 ATTR_MTIME | ATTR_CTIME;
486
487                         rc = fsfilt_setattr(obd, dchild, handle, &iattr, 0);
488                         if (rc) {
489                                 CERROR("error on setattr: rc = %d\n", rc);
490                                 /* XXX should we abort here in case of error? */
491                         }
492                 }
493
494                 child_mode = LCK_PW;
495                 acc_mode = 0;                  /* Don't check for permissions */
496         }
497
498         LASSERT(!mds_inode_is_orphan(dchild->d_inode));
499
500         /* Step 4: It's positive, so lock the child */
501         child_res_id.name[0] = dchild->d_inode->i_ino;
502         child_res_id.name[1] = dchild->d_inode->i_generation;
503  reacquire:
504         lock_flags = 0;
505         /* For the open(O_CREAT) case, this would technically be a lock
506          * inversion (getting a VFS lock after starting a transaction),
507          * but in that case we cannot possibly block on this lock because
508          * we just created the child and also hold a write lock on the
509          * parent, so nobody could be holding the lock yet.
510          */
511         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
512                               child_res_id, LDLM_PLAIN, NULL, 0, child_mode,
513                               &lock_flags, ldlm_completion_ast,
514                               mds_blocking_ast, NULL, child_lockh);
515         if (rc != ELDLM_OK) {
516                 CERROR("ldlm_cli_enqueue: %d\n", rc);
517                 GOTO(cleanup, rc = -EIO);
518         }
519
520         cleanup_phase = 3; /* child lock */
521
522         mds_pack_inode2fid(&body->fid1, dchild->d_inode);
523         mds_pack_inode2body(body, dchild->d_inode);
524
525         if (S_ISREG(dchild->d_inode->i_mode)) {
526                 /* Check permissions etc */
527                 rc = permission(dchild->d_inode, acc_mode);
528                 if (rc != 0)
529                         GOTO(cleanup, rc);
530
531                 /* Can't write to a read-only file */
532                 if (IS_RDONLY(dchild->d_inode) && (acc_mode & MAY_WRITE) != 0)
533                         GOTO(cleanup, rc = -EPERM);
534
535                 /* An append-only file must be opened in append mode for
536                  * writing */
537                 if (IS_APPEND(dchild->d_inode) && (acc_mode & MAY_WRITE) != 0 &&
538                     ((rec->ur_flags & O_APPEND) == 0 ||
539                      (rec->ur_flags & O_TRUNC) != 0))
540                         GOTO(cleanup, rc = -EPERM);
541
542                 rc = mds_pack_md(obd, req->rq_repmsg, 2, body, dchild->d_inode);
543                 if (rc)
544                         GOTO(cleanup, rc);
545
546                 /* If we have LOV EA data, the OST holds size, mtime */
547                 if (!(body->valid & OBD_MD_FLEASIZE))
548                         body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
549                                         OBD_MD_FLATIME | OBD_MD_FLMTIME);
550         }
551
552         if (!created && (rec->ur_flags & O_CREAT) &&
553             (rec->ur_flags & O_EXCL)) {
554                 /* File already exists, we didn't just create it, and we
555                  * were passed O_EXCL; err-or. */
556                 GOTO(cleanup, rc = -EEXIST); // returns a lock to the client
557         }
558
559         /* If we're opening a file without an EA for write, the client needs
560          * a write lock. */
561         if (S_ISREG(dchild->d_inode->i_mode) && (rec->ur_flags & O_ACCMODE) &&
562             child_mode != LCK_PW && !(body->valid & OBD_MD_FLEASIZE)) {
563                 ldlm_lock_decref(child_lockh, child_mode);
564                 child_mode = LCK_PW;
565                 goto reacquire;
566         }
567
568         /* if we are following a symlink, don't open */
569         if (S_ISLNK(dchild->d_inode->i_mode))
570                 GOTO(cleanup, rc = 0);
571
572         if ((rec->ur_flags & O_DIRECTORY) && !S_ISDIR(dchild->d_inode->i_mode))
573                 GOTO(cleanup, rc = -ENOTDIR);
574
575         /* Step 5: mds_open it */
576         intent_set_disposition(rep, DISP_OPEN_OPEN);
577  openit:
578         /* dentry_open does a dput(de) and mntput(mds->mds_vfsmnt) on error */
579         mfd = mds_dentry_open(dchild, mds->mds_vfsmnt,
580                               rec->ur_flags & ~(O_DIRECT | O_TRUNC), req);
581         if (IS_ERR(mfd)) {
582                 dchild = NULL; /* prevent a double dput in cleanup phase 2 */
583                 GOTO(cleanup, rc = PTR_ERR(mfd));
584         }
585
586         cleanup_phase = 4; /* mfd allocated */
587         body->handle.cookie = mfd->mfd_handle.h_cookie;
588         CDEBUG(D_INODE, "mfd %p, cookie "LPX64"\n", mfd,
589                mfd->mfd_handle.h_cookie);
590         GOTO(cleanup, rc = 0); /* returns a lock to the client */
591
592  cleanup:
593         rc = mds_finish_transno(mds, dchild ? dchild->d_inode : NULL, handle,
594                                 req, rc, rep->lock_policy_res1);
595         /* XXX what do we do here if mds_finish_transno itself failed? */
596         switch (cleanup_phase) {
597         case 4:
598                 if (rc && !S_ISLNK(dchild->d_inode->i_mode))
599                         mds_mfd_destroy(mfd);
600         case 3:
601                 /* This is the same logic as in the IT_OPEN part of
602                  * ldlm_intent_policy: if we found the dentry, or we tried to
603                  * open it (meaning that we created, if it wasn't found), then
604                  * we return the lock to the caller and client. */
605                 if (intent_disposition(rep, DISP_LOOKUP_NEG) &&
606                     !intent_disposition(rep, DISP_OPEN_OPEN))
607                         ldlm_lock_decref(child_lockh, child_mode);
608         case 2:
609                 if (rc || S_ISLNK(dchild->d_inode->i_mode))
610                         l_dput(dchild);
611         case 1:
612                 if (parent) {
613                         l_dput(parent);
614                         if (rc) {
615                                 ldlm_lock_decref(&parent_lockh, parent_mode);
616                         } else {
617                                 memcpy(&req->rq_ack_locks[0].lock,&parent_lockh,
618                                        sizeof(parent_lockh));
619                                 req->rq_ack_locks[0].mode = parent_mode;
620                         }
621                 }
622         }
623         RETURN(rc);
624 }