Whamcloud - gitweb
LU-721 llite: Parallel writes to same file results in zero file
[fs/lustre-release.git] / lustre / llite / dcache.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011 Whamcloud, Inc.
33  *
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  */
39
40 #include <linux/fs.h>
41 #include <linux/sched.h>
42 #include <linux/smp_lock.h>
43 #include <linux/quotaops.h>
44
45 #define DEBUG_SUBSYSTEM S_LLITE
46
47 #include <obd_support.h>
48 #include <lustre_lite.h>
49 #include <lustre/lustre_idl.h>
50 #include <lustre_dlm.h>
51 #include <lustre_mdc.h>
52 //#include <lustre_ver.h>
53 //#include <lustre_version.h>
54
55 #include "llite_internal.h"
56
57 cfs_spinlock_t ll_lookup_lock = CFS_SPIN_LOCK_UNLOCKED;
58
59 /* should NOT be called with the dcache lock, see fs/dcache.c */
60 static void ll_release(struct dentry *de)
61 {
62         struct ll_dentry_data *lld;
63         ENTRY;
64         LASSERT(de != NULL);
65         lld = ll_d2d(de);
66         if (lld == NULL) { /* NFS copies the de->d_op methods (bug 4655) */
67                 EXIT;
68                 return;
69         }
70         if (lld->lld_it) {
71                 ll_intent_release(lld->lld_it);
72                 OBD_FREE(lld->lld_it, sizeof(*lld->lld_it));
73         }
74         LASSERT(lld->lld_cwd_count == 0);
75         LASSERT(lld->lld_mnt_count == 0);
76         OBD_FREE(de->d_fsdata, sizeof(*lld));
77
78         EXIT;
79 }
80
81 /* Compare if two dentries are the same.  Don't match if the existing dentry
82  * is marked DCACHE_LUSTRE_INVALID.  Returns 1 if different, 0 if the same.
83  *
84  * This avoids a race where ll_lookup_it() instantiates a dentry, but we get
85  * an AST before calling d_revalidate_it().  The dentry still exists (marked
86  * INVALID) so d_lookup() matches it, but we have no lock on it (so
87  * lock_match() fails) and we spin around real_lookup(). */
88 int ll_dcompare(struct dentry *parent, struct qstr *d_name, struct qstr *name)
89 {
90         struct dentry *dchild;
91         ENTRY;
92
93         if (d_name->len != name->len)
94                 RETURN(1);
95
96         if (memcmp(d_name->name, name->name, name->len))
97                 RETURN(1);
98
99         /* XXX: d_name must be in-dentry structure */
100         dchild = container_of(d_name, struct dentry, d_name); /* ugh */
101
102         CDEBUG(D_DENTRY,"found name %.*s(%p) - flags %d/%x - refc %d\n",
103                name->len, name->name, dchild,
104                d_mountpoint(dchild), dchild->d_flags & DCACHE_LUSTRE_INVALID,
105                atomic_read(&dchild->d_count));
106
107          /* mountpoint is always valid */
108         if (d_mountpoint(dchild))
109                 RETURN(0);
110
111         if (dchild->d_flags & DCACHE_LUSTRE_INVALID)
112                 RETURN(1);
113
114         RETURN(0);
115 }
116
117 static inline int return_if_equal(struct ldlm_lock *lock, void *data)
118 {
119         if ((lock->l_flags &
120              (LDLM_FL_CANCELING | LDLM_FL_DISCARD_DATA)) ==
121             (LDLM_FL_CANCELING | LDLM_FL_DISCARD_DATA))
122                 return LDLM_ITER_CONTINUE;
123         return LDLM_ITER_STOP;
124 }
125
126 /* find any ldlm lock of the inode in mdc and lov
127  * return 0    not find
128  *        1    find one
129  *      < 0    error */
130 static int find_cbdata(struct inode *inode)
131 {
132         struct ll_inode_info *lli = ll_i2info(inode);
133         struct ll_sb_info *sbi = ll_i2sbi(inode);
134         int rc = 0;
135         ENTRY;
136
137         LASSERT(inode);
138         rc = md_find_cbdata(sbi->ll_md_exp, ll_inode2fid(inode),
139                             return_if_equal, NULL);
140         if (rc != 0)
141                  RETURN(rc);
142
143         if (lli->lli_smd)
144                 rc = obd_find_cbdata(sbi->ll_dt_exp, lli->lli_smd,
145                                      return_if_equal, NULL);
146
147         RETURN(rc);
148 }
149
150 /**
151  * Called when last reference to a dentry is dropped and dcache wants to know
152  * whether or not it should cache it:
153  * - return 1 to delete the dentry immediately
154  * - return 0 to cache the dentry
155  * Should NOT be called with the dcache lock, see fs/dcache.c
156  */
157 static int ll_ddelete(struct dentry *de)
158 {
159         ENTRY;
160         LASSERT(de);
161
162         CDEBUG(D_DENTRY, "%s dentry %.*s (%p, parent %p, inode %p) %s%s\n",
163                (de->d_flags & DCACHE_LUSTRE_INVALID ? "deleting" : "keeping"),
164                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
165                d_unhashed(de) ? "" : "hashed,",
166                list_empty(&de->d_subdirs) ? "" : "subdirs");
167
168         /* if not ldlm lock for this inode, set i_nlink to 0 so that
169          * this inode can be recycled later b=20433 */
170         LASSERT(atomic_read(&de->d_count) == 0);
171         if (de->d_inode && !find_cbdata(de->d_inode))
172                 de->d_inode->i_nlink = 0;
173
174         if (de->d_flags & DCACHE_LUSTRE_INVALID)
175                 RETURN(1);
176
177         RETURN(0);
178 }
179
180 static int ll_set_dd(struct dentry *de)
181 {
182         ENTRY;
183         LASSERT(de != NULL);
184
185         CDEBUG(D_DENTRY, "ldd on dentry %.*s (%p) parent %p inode %p refc %d\n",
186                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
187                atomic_read(&de->d_count));
188
189         if (de->d_fsdata == NULL) {
190                 struct ll_dentry_data *lld;
191
192                 OBD_ALLOC_PTR(lld);
193                 if (likely(lld != NULL)) {
194                         lock_dentry(de);
195                         if (likely(de->d_fsdata == NULL))
196                                 de->d_fsdata = lld;
197                         else
198                                 OBD_FREE_PTR(lld);
199                         unlock_dentry(de);
200                 } else {
201                         RETURN(-ENOMEM);
202                 }
203         }
204
205         RETURN(0);
206 }
207
208 int ll_dops_init(struct dentry *de, int block, int init_sa)
209 {
210         struct ll_dentry_data *lld = ll_d2d(de);
211         int rc = 0;
212
213         if (lld == NULL && block != 0) {
214                 rc = ll_set_dd(de);
215                 if (rc)
216                         return rc;
217
218                 lld = ll_d2d(de);
219         }
220
221         if (lld != NULL && init_sa != 0)
222                 lld->lld_sa_generation = 0;
223
224         de->d_op = &ll_d_ops;
225         return rc;
226 }
227
228 void ll_intent_drop_lock(struct lookup_intent *it)
229 {
230         struct lustre_handle *handle;
231
232         if (it->it_op && it->d.lustre.it_lock_mode) {
233                 handle = (struct lustre_handle *)&it->d.lustre.it_lock_handle;
234                 CDEBUG(D_DLMTRACE, "releasing lock with cookie "LPX64
235                        " from it %p\n", handle->cookie, it);
236                 ldlm_lock_decref(handle, it->d.lustre.it_lock_mode);
237
238                 /* bug 494: intent_release may be called multiple times, from
239                  * this thread and we don't want to double-decref this lock */
240                 it->d.lustre.it_lock_mode = 0;
241         }
242 }
243
244 void ll_intent_release(struct lookup_intent *it)
245 {
246         ENTRY;
247
248         CDEBUG(D_INFO, "intent %p released\n", it);
249         ll_intent_drop_lock(it);
250         /* We are still holding extra reference on a request, need to free it */
251         if (it_disposition(it, DISP_ENQ_OPEN_REF))
252                  ptlrpc_req_finished(it->d.lustre.it_data); /* ll_file_open */
253         if (it_disposition(it, DISP_ENQ_CREATE_REF)) /* create rec */
254                 ptlrpc_req_finished(it->d.lustre.it_data);
255         if (it_disposition(it, DISP_ENQ_COMPLETE)) /* saved req from revalidate
256                                                     * to lookup */
257                 ptlrpc_req_finished(it->d.lustre.it_data);
258
259         it->d.lustre.it_disposition = 0;
260         it->d.lustre.it_data = NULL;
261         EXIT;
262 }
263
264 /* Drop dentry if it is not used already, unhash otherwise.
265    Should be called with dcache lock held!
266    Returns: 1 if dentry was dropped, 0 if unhashed. */
267 int ll_drop_dentry(struct dentry *dentry)
268 {
269         lock_dentry(dentry);
270         if (atomic_read(&dentry->d_count) == 0) {
271                 CDEBUG(D_DENTRY, "deleting dentry %.*s (%p) parent %p "
272                        "inode %p\n", dentry->d_name.len,
273                        dentry->d_name.name, dentry, dentry->d_parent,
274                        dentry->d_inode);
275                 dget_locked(dentry);
276                 __d_drop(dentry);
277                 unlock_dentry(dentry);
278                 spin_unlock(&dcache_lock);
279                 cfs_spin_unlock(&ll_lookup_lock);
280                 dput(dentry);
281                 cfs_spin_lock(&ll_lookup_lock);
282                 spin_lock(&dcache_lock);
283                 return 1;
284         }
285         /* disconected dentry can not be find without lookup, because we
286          * not need his to unhash or mark invalid. */
287         if (dentry->d_flags & DCACHE_DISCONNECTED) {
288                 unlock_dentry(dentry);
289                 RETURN (0);
290         }
291
292         if (!(dentry->d_flags & DCACHE_LUSTRE_INVALID)) {
293                 CDEBUG(D_DENTRY, "unhashing dentry %.*s (%p) parent %p "
294                        "inode %p refc %d\n", dentry->d_name.len,
295                        dentry->d_name.name, dentry, dentry->d_parent,
296                        dentry->d_inode, atomic_read(&dentry->d_count));
297                 /* actually we don't unhash the dentry, rather just
298                  * mark it inaccessible for to __d_lookup(). otherwise
299                  * sys_getcwd() could return -ENOENT -bzzz */
300                 dentry->d_flags |= DCACHE_LUSTRE_INVALID;
301                 if (!dentry->d_inode || !S_ISDIR(dentry->d_inode->i_mode))
302                         __d_drop(dentry);
303         }
304         unlock_dentry(dentry);
305         return 0;
306 }
307
308 void ll_unhash_aliases(struct inode *inode)
309 {
310         struct list_head *tmp, *head;
311         ENTRY;
312
313         if (inode == NULL) {
314                 CERROR("unexpected NULL inode, tell phil\n");
315                 return;
316         }
317
318         CDEBUG(D_INODE, "marking dentries for ino %lu/%u(%p) invalid\n",
319                inode->i_ino, inode->i_generation, inode);
320
321         head = &inode->i_dentry;
322         cfs_spin_lock(&ll_lookup_lock);
323         spin_lock(&dcache_lock);
324 restart:
325         tmp = head;
326         while ((tmp = tmp->next) != head) {
327                 struct dentry *dentry = list_entry(tmp, struct dentry, d_alias);
328
329                 CDEBUG(D_DENTRY, "dentry in drop %.*s (%p) parent %p "
330                        "inode %p flags %d\n", dentry->d_name.len,
331                        dentry->d_name.name, dentry, dentry->d_parent,
332                        dentry->d_inode, dentry->d_flags);
333
334                 if (dentry->d_name.len == 1 && dentry->d_name.name[0] == '/') {
335                         CERROR("called on root (?) dentry=%p, inode=%p "
336                                "ino=%lu\n", dentry, inode, inode->i_ino);
337                         lustre_dump_dentry(dentry, 1);
338                         libcfs_debug_dumpstack(NULL);
339                 }
340
341                 if (ll_drop_dentry(dentry))
342                           goto restart;
343         }
344         spin_unlock(&dcache_lock);
345         cfs_spin_unlock(&ll_lookup_lock);
346
347         EXIT;
348 }
349
350 int ll_revalidate_it_finish(struct ptlrpc_request *request,
351                             struct lookup_intent *it,
352                             struct dentry *de)
353 {
354         int rc = 0;
355         ENTRY;
356
357         if (!request)
358                 RETURN(0);
359
360         if (it_disposition(it, DISP_LOOKUP_NEG))
361                 RETURN(-ENOENT);
362
363         rc = ll_prep_inode(&de->d_inode, request, NULL);
364
365         RETURN(rc);
366 }
367
368 void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry)
369 {
370         LASSERT(it != NULL);
371         LASSERT(dentry != NULL);
372
373         if (it->d.lustre.it_lock_mode && dentry->d_inode != NULL) {
374                 struct inode *inode = dentry->d_inode;
375                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
376
377                 CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%u)\n",
378                        inode, inode->i_ino, inode->i_generation);
379                 md_set_lock_data(sbi->ll_md_exp, &it->d.lustre.it_lock_handle,
380                                  inode, NULL);
381         }
382
383         /* drop lookup or getattr locks immediately */
384         if (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR) {
385                 /* on 2.6 there are situation when several lookups and
386                  * revalidations may be requested during single operation.
387                  * therefore, we don't release intent here -bzzz */
388                 ll_intent_drop_lock(it);
389         }
390 }
391
392 void ll_frob_intent(struct lookup_intent **itp, struct lookup_intent *deft)
393 {
394         struct lookup_intent *it = *itp;
395
396         if (!it || it->it_op == IT_GETXATTR)
397                 it = *itp = deft;
398
399 }
400
401 int ll_revalidate_it(struct dentry *de, int lookup_flags,
402                      struct lookup_intent *it)
403 {
404         struct md_op_data *op_data;
405         struct ptlrpc_request *req = NULL;
406         struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
407         struct obd_export *exp;
408         struct inode *parent = de->d_parent->d_inode;
409         int rc, first = 0;
410
411         ENTRY;
412         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,intent=%s\n", de->d_name.name,
413                LL_IT2STR(it));
414
415         if (de->d_inode == NULL) {
416                 /* We can only use negative dentries if this is stat or lookup,
417                    for opens and stuff we do need to query server. */
418                 /* If there is IT_CREAT in intent op set, then we must throw
419                    away this negative dentry and actually do the request to
420                    kernel to create whatever needs to be created (if possible)*/
421                 if (it && (it->it_op & IT_CREAT))
422                         RETURN(0);
423
424                 if (de->d_flags & DCACHE_LUSTRE_INVALID)
425                         RETURN(0);
426
427                 rc = ll_have_md_lock(parent, MDS_INODELOCK_UPDATE, LCK_MINMODE);
428                 GOTO(out_sa, rc);
429         }
430
431         /* Never execute intents for mount points.
432          * Attributes will be fixed up in ll_inode_revalidate_it */
433         if (d_mountpoint(de))
434                 GOTO(out_sa, rc = 1);
435
436         /* need to get attributes in case root got changed from other client */
437         if (de == de->d_sb->s_root) {
438                 rc = __ll_inode_revalidate_it(de, it, MDS_INODELOCK_LOOKUP);
439                 if (rc == 0)
440                         rc = 1;
441                 GOTO(out_sa, rc);
442         }
443
444         exp = ll_i2mdexp(de->d_inode);
445
446         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_REVALIDATE_PAUSE, 5);
447         ll_frob_intent(&it, &lookup_it);
448         LASSERT(it);
449
450         if (it->it_op == IT_LOOKUP && !(de->d_flags & DCACHE_LUSTRE_INVALID))
451                 GOTO(out_sa, rc = 1);
452
453         op_data = ll_prep_md_op_data(NULL, parent, de->d_inode,
454                                      de->d_name.name, de->d_name.len,
455                                      0, LUSTRE_OPC_ANY, NULL);
456         if (IS_ERR(op_data))
457                 RETURN(PTR_ERR(op_data));
458
459         if ((it->it_op == IT_OPEN) && de->d_inode) {
460                 struct inode *inode = de->d_inode;
461                 struct ll_inode_info *lli = ll_i2info(inode);
462                 struct obd_client_handle **och_p;
463                 __u64 *och_usecount;
464
465                 /*
466                  * We used to check for MDS_INODELOCK_OPEN here, but in fact
467                  * just having LOOKUP lock is enough to justify inode is the
468                  * same. And if inode is the same and we have suitable
469                  * openhandle, then there is no point in doing another OPEN RPC
470                  * just to throw away newly received openhandle.  There are no
471                  * security implications too, if file owner or access mode is
472                  * change, LOOKUP lock is revoked.
473                  */
474
475
476                 if (it->it_flags & FMODE_WRITE) {
477                         och_p = &lli->lli_mds_write_och;
478                         och_usecount = &lli->lli_open_fd_write_count;
479                 } else if (it->it_flags & FMODE_EXEC) {
480                         och_p = &lli->lli_mds_exec_och;
481                         och_usecount = &lli->lli_open_fd_exec_count;
482                 } else {
483                         och_p = &lli->lli_mds_read_och;
484                         och_usecount = &lli->lli_open_fd_read_count;
485                 }
486                 /* Check for the proper lock. */
487                 if (!ll_have_md_lock(inode, MDS_INODELOCK_LOOKUP, LCK_MINMODE))
488                         goto do_lock;
489                 cfs_down(&lli->lli_och_sem);
490                 if (*och_p) { /* Everything is open already, do nothing */
491                         /*(*och_usecount)++;  Do not let them steal our open
492                           handle from under us */
493                         /* XXX The code above was my original idea, but in case
494                            we have the handle, but we cannot use it due to later
495                            checks (e.g. O_CREAT|O_EXCL flags set), nobody
496                            would decrement counter increased here. So we just
497                            hope the lock won't be invalidated in between. But
498                            if it would be, we'll reopen the open request to
499                            MDS later during file open path */
500                         cfs_up(&lli->lli_och_sem);
501                         ll_finish_md_op_data(op_data);
502                         RETURN(1);
503                 } else {
504                         cfs_up(&lli->lli_och_sem);
505                 }
506         }
507
508         if (it->it_op == IT_GETATTR)
509                 first = ll_statahead_enter(parent, &de, 0);
510
511 do_lock:
512         it->it_create_mode &= ~cfs_curproc_umask();
513         it->it_create_mode |= M_CHECK_STALE;
514         rc = md_intent_lock(exp, op_data, NULL, 0, it,
515                             lookup_flags,
516                             &req, ll_md_blocking_ast, 0);
517         it->it_create_mode &= ~M_CHECK_STALE;
518         ll_finish_md_op_data(op_data);
519         if (it->it_op == IT_GETATTR && !first)
520                 /* If there are too many locks on client-side, then some
521                  * locks taken by statahead maybe dropped automatically
522                  * before the real "revalidate" using them. */
523                 ll_statahead_exit(parent, de, req == NULL ? rc : 0);
524         else if (first == -EEXIST)
525                 ll_statahead_mark(parent, de);
526
527         /* If req is NULL, then md_intent_lock only tried to do a lock match;
528          * if all was well, it will return 1 if it found locks, 0 otherwise. */
529         if (req == NULL && rc >= 0) {
530                 if (!rc)
531                         goto do_lookup;
532                 GOTO(out, rc);
533         }
534
535         if (rc < 0) {
536                 if (rc != -ESTALE) {
537                         CDEBUG(D_INFO, "ll_intent_lock: rc %d : it->it_status "
538                                "%d\n", rc, it->d.lustre.it_status);
539                 }
540                 GOTO(out, rc = 0);
541         }
542
543 revalidate_finish:
544         rc = ll_revalidate_it_finish(req, it, de);
545         if (rc != 0) {
546                 if (rc != -ESTALE && rc != -ENOENT)
547                         ll_intent_release(it);
548                 GOTO(out, rc = 0);
549         }
550
551         if ((it->it_op & IT_OPEN) && de->d_inode &&
552             !S_ISREG(de->d_inode->i_mode) &&
553             !S_ISDIR(de->d_inode->i_mode)) {
554                 ll_release_openhandle(de, it);
555         }
556         rc = 1;
557
558         /* unfortunately ll_intent_lock may cause a callback and revoke our
559          * dentry */
560         cfs_spin_lock(&ll_lookup_lock);
561         spin_lock(&dcache_lock);
562         lock_dentry(de);
563         __d_drop(de);
564         unlock_dentry(de);
565         d_rehash_cond(de, 0);
566         spin_unlock(&dcache_lock);
567         cfs_spin_unlock(&ll_lookup_lock);
568
569 out:
570         /* We do not free request as it may be reused during following lookup
571          * (see comment in mdc/mdc_locks.c::mdc_intent_lock()), request will
572          * be freed in ll_lookup_it or in ll_intent_release. But if
573          * request was not completed, we need to free it. (bug 5154, 9903) */
574         if (req != NULL && !it_disposition(it, DISP_ENQ_COMPLETE))
575                 ptlrpc_req_finished(req);
576         if (rc == 0) {
577                 ll_unhash_aliases(de->d_inode);
578                 /* done in ll_unhash_aliases()
579                    dentry->d_flags |= DCACHE_LUSTRE_INVALID; */
580         } else {
581                 CDEBUG(D_DENTRY, "revalidated dentry %.*s (%p) parent %p "
582                        "inode %p refc %d\n", de->d_name.len,
583                        de->d_name.name, de, de->d_parent, de->d_inode,
584                        atomic_read(&de->d_count));
585                 if (de->d_flags & DCACHE_LUSTRE_INVALID) {
586                         lock_dentry(de);
587                         de->d_flags &= ~DCACHE_LUSTRE_INVALID;
588                         unlock_dentry(de);
589                 }
590                 ll_lookup_finish_locks(it, de);
591         }
592         RETURN(rc);
593
594         /*
595          * This part is here to combat evil-evil race in real_lookup on 2.6
596          * kernels.  The race details are: We enter do_lookup() looking for some
597          * name, there is nothing in dcache for this name yet and d_lookup()
598          * returns NULL.  We proceed to real_lookup(), and while we do this,
599          * another process does open on the same file we looking up (most simple
600          * reproducer), open succeeds and the dentry is added. Now back to
601          * us. In real_lookup() we do d_lookup() again and suddenly find the
602          * dentry, so we call d_revalidate on it, but there is no lock, so
603          * without this code we would return 0, but unpatched real_lookup just
604          * returns -ENOENT in such a case instead of retrying the lookup. Once
605          * this is dealt with in real_lookup(), all of this ugly mess can go and
606          * we can just check locks in ->d_revalidate without doing any RPCs
607          * ever.
608          */
609 do_lookup:
610         if (it != &lookup_it) {
611                 /* MDS_INODELOCK_UPDATE needed for IT_GETATTR case. */
612                 if (it->it_op == IT_GETATTR)
613                         lookup_it.it_op = IT_GETATTR;
614                 ll_lookup_finish_locks(it, de);
615                 it = &lookup_it;
616         }
617
618         /* Do real lookup here. */
619         op_data = ll_prep_md_op_data(NULL, parent, NULL, de->d_name.name,
620                                      de->d_name.len, 0, (it->it_op & IT_CREAT ?
621                                                          LUSTRE_OPC_CREATE :
622                                                          LUSTRE_OPC_ANY), NULL);
623         if (IS_ERR(op_data))
624                 RETURN(PTR_ERR(op_data));
625
626         rc = md_intent_lock(exp, op_data, NULL, 0,  it, 0, &req,
627                             ll_md_blocking_ast, 0);
628         if (rc >= 0) {
629                 struct mdt_body *mdt_body;
630                 struct lu_fid fid = {.f_seq = 0, .f_oid = 0, .f_ver = 0};
631                 mdt_body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
632
633                 if (de->d_inode)
634                         fid = *ll_inode2fid(de->d_inode);
635
636                 /* see if we got same inode, if not - return error */
637                 if (lu_fid_eq(&fid, &mdt_body->fid1)) {
638                         ll_finish_md_op_data(op_data);
639                         op_data = NULL;
640                         goto revalidate_finish;
641                 }
642                 ll_intent_release(it);
643         }
644         ll_finish_md_op_data(op_data);
645         GOTO(out, rc = 0);
646
647 out_sa:
648         /*
649          * For rc == 1 case, should not return directly to prevent losing
650          * statahead windows; for rc == 0 case, the "lookup" will be done later.
651          */
652         if (it && it->it_op == IT_GETATTR && rc == 1) {
653                 first = ll_statahead_enter(parent, &de, 0);
654                 if (first >= 0)
655                         ll_statahead_exit(parent, de, 1);
656                 else if (first == -EEXIST)
657                         ll_statahead_mark(parent, de);
658         }
659
660         return rc;
661 }
662
663 int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
664 {
665         int rc;
666         ENTRY;
667
668         if (nd && !(nd->flags & (LOOKUP_CONTINUE|LOOKUP_PARENT))) {
669                 struct lookup_intent *it;
670
671                 it = ll_convert_intent(&nd->intent.open, nd->flags);
672                 if (IS_ERR(it))
673                         RETURN(0);
674
675                 if (it->it_op == (IT_OPEN|IT_CREAT) &&
676                     nd->intent.open.flags & O_EXCL) {
677                         CDEBUG(D_VFSTRACE, "create O_EXCL, returning 0\n");
678                         rc = 0;
679                         goto out_it;
680                 }
681
682                 rc = ll_revalidate_it(dentry, nd->flags, it);
683
684                 if (rc && (nd->flags & LOOKUP_OPEN) &&
685                     it_disposition(it, DISP_OPEN_OPEN)) {/*Open*/
686 #ifdef HAVE_FILE_IN_STRUCT_INTENT
687 // XXX Code duplication with ll_lookup_nd
688                         if (S_ISFIFO(dentry->d_inode->i_mode)) {
689                                 // We cannot call open here as it would
690                                 // deadlock.
691                                 ptlrpc_req_finished(
692                                                (struct ptlrpc_request *)
693                                                   it->d.lustre.it_data);
694                         } else {
695 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,17))
696 /* 2.6.1[456] have a bug in open_namei() that forgets to check
697  * nd->intent.open.file for error, so we need to return it as lookup's result
698  * instead */
699                                 struct file *filp;
700
701                                 nd->intent.open.file->private_data = it;
702                                 filp = lookup_instantiate_filp(nd, dentry,NULL);
703                                 if (IS_ERR(filp)) {
704                                         rc = PTR_ERR(filp);
705                                 }
706 #else
707                                 nd->intent.open.file->private_data = it;
708                                 (void)lookup_instantiate_filp(nd, dentry,NULL);
709 #endif
710                         }
711 #else
712                         ll_release_openhandle(dentry, it);
713 #endif /* HAVE_FILE_IN_STRUCT_INTENT */
714                 }
715                 if (!rc && (nd->flags & LOOKUP_CREATE) &&
716                     it_disposition(it, DISP_OPEN_CREATE)) {
717                         /* We created something but we may only return
718                          * negative dentry here, so save request in dentry,
719                          * if lookup will be called later on, it will
720                          * pick the request, otherwise it would be freed
721                          * with dentry */
722                         ll_d2d(dentry)->lld_it = it;
723                         it = NULL; /* avoid freeing */
724                 }
725
726 out_it:
727                 if (it) {
728                         ll_intent_release(it);
729                         OBD_FREE(it, sizeof(*it));
730                 }
731         } else {
732                 rc = ll_revalidate_it(dentry, 0, NULL);
733         }
734
735         RETURN(rc);
736 }
737
738 void ll_d_iput(struct dentry *de, struct inode *inode)
739 {
740         LASSERT(inode);
741         if (!find_cbdata(inode))
742                 inode->i_nlink = 0;
743         iput(inode);
744 }
745
746 struct dentry_operations ll_d_ops = {
747         .d_revalidate = ll_revalidate_nd,
748         .d_release = ll_release,
749         .d_delete  = ll_ddelete,
750         .d_iput    = ll_d_iput,
751         .d_compare = ll_dcompare,
752 };