Whamcloud - gitweb
LU-365 Update copyright for files modified by Whamcloud
[fs/lustre-release.git] / lustre / llite / dcache.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011 Whamcloud, Inc.
33  *
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  */
39
40 #include <linux/fs.h>
41 #include <linux/sched.h>
42 #include <linux/smp_lock.h>
43 #include <linux/quotaops.h>
44
45 #define DEBUG_SUBSYSTEM S_LLITE
46
47 #include <obd_support.h>
48 #include <lustre_lite.h>
49 #include <lustre/lustre_idl.h>
50 #include <lustre_dlm.h>
51 #include <lustre_mdc.h>
52 //#include <lustre_ver.h>
53 //#include <lustre_version.h>
54
55 #include "llite_internal.h"
56
57 cfs_spinlock_t ll_lookup_lock = CFS_SPIN_LOCK_UNLOCKED;
58
59 /* should NOT be called with the dcache lock, see fs/dcache.c */
60 static void ll_release(struct dentry *de)
61 {
62         struct ll_dentry_data *lld;
63         ENTRY;
64         LASSERT(de != NULL);
65         lld = ll_d2d(de);
66         if (lld == NULL) { /* NFS copies the de->d_op methods (bug 4655) */
67                 EXIT;
68                 return;
69         }
70         if (lld->lld_it) {
71                 ll_intent_release(lld->lld_it);
72                 OBD_FREE(lld->lld_it, sizeof(*lld->lld_it));
73         }
74         LASSERT(lld->lld_cwd_count == 0);
75         LASSERT(lld->lld_mnt_count == 0);
76         OBD_FREE(de->d_fsdata, sizeof(*lld));
77
78         EXIT;
79 }
80
81 /* Compare if two dentries are the same.  Don't match if the existing dentry
82  * is marked DCACHE_LUSTRE_INVALID.  Returns 1 if different, 0 if the same.
83  *
84  * This avoids a race where ll_lookup_it() instantiates a dentry, but we get
85  * an AST before calling d_revalidate_it().  The dentry still exists (marked
86  * INVALID) so d_lookup() matches it, but we have no lock on it (so
87  * lock_match() fails) and we spin around real_lookup(). */
88 int ll_dcompare(struct dentry *parent, struct qstr *d_name, struct qstr *name)
89 {
90         struct dentry *dchild;
91         ENTRY;
92
93         if (d_name->len != name->len)
94                 RETURN(1);
95
96         if (memcmp(d_name->name, name->name, name->len))
97                 RETURN(1);
98
99         /* XXX: d_name must be in-dentry structure */
100         dchild = container_of(d_name, struct dentry, d_name); /* ugh */
101
102         CDEBUG(D_DENTRY,"found name %.*s(%p) - flags %d/%x - refc %d\n",
103                name->len, name->name, dchild,
104                d_mountpoint(dchild), dchild->d_flags & DCACHE_LUSTRE_INVALID,
105                atomic_read(&dchild->d_count));
106
107          /* mountpoint is always valid */
108         if (d_mountpoint(dchild))
109                 RETURN(0);
110
111         if (dchild->d_flags & DCACHE_LUSTRE_INVALID)
112                 RETURN(1);
113
114         RETURN(0);
115 }
116
117 static inline int return_if_equal(struct ldlm_lock *lock, void *data)
118 {
119         if (lock->l_flags & LDLM_FL_CANCELING)
120                 return LDLM_ITER_CONTINUE;
121         return LDLM_ITER_STOP;
122 }
123
124 /* find any ldlm lock of the inode in mdc and lov
125  * return 0    not find
126  *        1    find one
127  *      < 0    error */
128 static int find_cbdata(struct inode *inode)
129 {
130         struct ll_inode_info *lli = ll_i2info(inode);
131         struct ll_sb_info *sbi = ll_i2sbi(inode);
132         int rc = 0;
133         ENTRY;
134
135         LASSERT(inode);
136         rc = md_find_cbdata(sbi->ll_md_exp, ll_inode2fid(inode),
137                             return_if_equal, NULL);
138         if (rc != 0)
139                  RETURN(rc);
140
141         if (lli->lli_smd)
142                 rc = obd_find_cbdata(sbi->ll_dt_exp, lli->lli_smd,
143                                      return_if_equal, NULL);
144
145         RETURN(rc);
146 }
147
148 /**
149  * Called when last reference to a dentry is dropped and dcache wants to know
150  * whether or not it should cache it:
151  * - return 1 to delete the dentry immediately
152  * - return 0 to cache the dentry
153  * Should NOT be called with the dcache lock, see fs/dcache.c
154  */
155 static int ll_ddelete(struct dentry *de)
156 {
157         ENTRY;
158         LASSERT(de);
159
160         CDEBUG(D_DENTRY, "%s dentry %.*s (%p, parent %p, inode %p) %s%s\n",
161                (de->d_flags & DCACHE_LUSTRE_INVALID ? "deleting" : "keeping"),
162                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
163                d_unhashed(de) ? "" : "hashed,",
164                list_empty(&de->d_subdirs) ? "" : "subdirs");
165
166         /* if not ldlm lock for this inode, set i_nlink to 0 so that
167          * this inode can be recycled later b=20433 */
168         LASSERT(atomic_read(&de->d_count) == 0);
169         if (de->d_inode && !find_cbdata(de->d_inode))
170                 de->d_inode->i_nlink = 0;
171
172         if (de->d_flags & DCACHE_LUSTRE_INVALID)
173                 RETURN(1);
174
175         RETURN(0);
176 }
177
178 static int ll_set_dd(struct dentry *de)
179 {
180         ENTRY;
181         LASSERT(de != NULL);
182
183         CDEBUG(D_DENTRY, "ldd on dentry %.*s (%p) parent %p inode %p refc %d\n",
184                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
185                atomic_read(&de->d_count));
186
187         if (de->d_fsdata == NULL) {
188                 struct ll_dentry_data *lld;
189
190                 OBD_ALLOC_PTR(lld);
191                 if (likely(lld != NULL)) {
192                         lock_dentry(de);
193                         if (likely(de->d_fsdata == NULL))
194                                 de->d_fsdata = lld;
195                         else
196                                 OBD_FREE_PTR(lld);
197                         unlock_dentry(de);
198                 } else {
199                         RETURN(-ENOMEM);
200                 }
201         }
202
203         RETURN(0);
204 }
205
206 int ll_dops_init(struct dentry *de, int block, int init_sa)
207 {
208         struct ll_dentry_data *lld = ll_d2d(de);
209         int rc = 0;
210
211         if (lld == NULL && block != 0) {
212                 rc = ll_set_dd(de);
213                 if (rc)
214                         return rc;
215
216                 lld = ll_d2d(de);
217         }
218
219         if (lld != NULL && init_sa != 0)
220                 lld->lld_sa_generation = 0;
221
222         de->d_op = &ll_d_ops;
223         return rc;
224 }
225
226 void ll_intent_drop_lock(struct lookup_intent *it)
227 {
228         struct lustre_handle *handle;
229
230         if (it->it_op && it->d.lustre.it_lock_mode) {
231                 handle = (struct lustre_handle *)&it->d.lustre.it_lock_handle;
232                 CDEBUG(D_DLMTRACE, "releasing lock with cookie "LPX64
233                        " from it %p\n", handle->cookie, it);
234                 ldlm_lock_decref(handle, it->d.lustre.it_lock_mode);
235
236                 /* bug 494: intent_release may be called multiple times, from
237                  * this thread and we don't want to double-decref this lock */
238                 it->d.lustre.it_lock_mode = 0;
239         }
240 }
241
242 void ll_intent_release(struct lookup_intent *it)
243 {
244         ENTRY;
245
246         CDEBUG(D_INFO, "intent %p released\n", it);
247         ll_intent_drop_lock(it);
248         /* We are still holding extra reference on a request, need to free it */
249         if (it_disposition(it, DISP_ENQ_OPEN_REF))
250                  ptlrpc_req_finished(it->d.lustre.it_data); /* ll_file_open */
251         if (it_disposition(it, DISP_ENQ_CREATE_REF)) /* create rec */
252                 ptlrpc_req_finished(it->d.lustre.it_data);
253         if (it_disposition(it, DISP_ENQ_COMPLETE)) /* saved req from revalidate
254                                                     * to lookup */
255                 ptlrpc_req_finished(it->d.lustre.it_data);
256
257         it->d.lustre.it_disposition = 0;
258         it->d.lustre.it_data = NULL;
259         EXIT;
260 }
261
262 /* Drop dentry if it is not used already, unhash otherwise.
263    Should be called with dcache lock held!
264    Returns: 1 if dentry was dropped, 0 if unhashed. */
265 int ll_drop_dentry(struct dentry *dentry)
266 {
267         lock_dentry(dentry);
268         if (atomic_read(&dentry->d_count) == 0) {
269                 CDEBUG(D_DENTRY, "deleting dentry %.*s (%p) parent %p "
270                        "inode %p\n", dentry->d_name.len,
271                        dentry->d_name.name, dentry, dentry->d_parent,
272                        dentry->d_inode);
273                 dget_locked(dentry);
274                 __d_drop(dentry);
275                 unlock_dentry(dentry);
276                 spin_unlock(&dcache_lock);
277                 cfs_spin_unlock(&ll_lookup_lock);
278                 dput(dentry);
279                 cfs_spin_lock(&ll_lookup_lock);
280                 spin_lock(&dcache_lock);
281                 return 1;
282         }
283         /* disconected dentry can not be find without lookup, because we
284          * not need his to unhash or mark invalid. */
285         if (dentry->d_flags & DCACHE_DISCONNECTED) {
286                 unlock_dentry(dentry);
287                 RETURN (0);
288         }
289
290         if (!(dentry->d_flags & DCACHE_LUSTRE_INVALID)) {
291                 CDEBUG(D_DENTRY, "unhashing dentry %.*s (%p) parent %p "
292                        "inode %p refc %d\n", dentry->d_name.len,
293                        dentry->d_name.name, dentry, dentry->d_parent,
294                        dentry->d_inode, atomic_read(&dentry->d_count));
295                 /* actually we don't unhash the dentry, rather just
296                  * mark it inaccessible for to __d_lookup(). otherwise
297                  * sys_getcwd() could return -ENOENT -bzzz */
298                 dentry->d_flags |= DCACHE_LUSTRE_INVALID;
299                 if (!dentry->d_inode || !S_ISDIR(dentry->d_inode->i_mode))
300                         __d_drop(dentry);
301         }
302         unlock_dentry(dentry);
303         return 0;
304 }
305
306 void ll_unhash_aliases(struct inode *inode)
307 {
308         struct list_head *tmp, *head;
309         ENTRY;
310
311         if (inode == NULL) {
312                 CERROR("unexpected NULL inode, tell phil\n");
313                 return;
314         }
315
316         CDEBUG(D_INODE, "marking dentries for ino %lu/%u(%p) invalid\n",
317                inode->i_ino, inode->i_generation, inode);
318
319         head = &inode->i_dentry;
320         cfs_spin_lock(&ll_lookup_lock);
321         spin_lock(&dcache_lock);
322 restart:
323         tmp = head;
324         while ((tmp = tmp->next) != head) {
325                 struct dentry *dentry = list_entry(tmp, struct dentry, d_alias);
326
327                 CDEBUG(D_DENTRY, "dentry in drop %.*s (%p) parent %p "
328                        "inode %p flags %d\n", dentry->d_name.len,
329                        dentry->d_name.name, dentry, dentry->d_parent,
330                        dentry->d_inode, dentry->d_flags);
331
332                 if (dentry->d_name.len == 1 && dentry->d_name.name[0] == '/') {
333                         CERROR("called on root (?) dentry=%p, inode=%p "
334                                "ino=%lu\n", dentry, inode, inode->i_ino);
335                         lustre_dump_dentry(dentry, 1);
336                         libcfs_debug_dumpstack(NULL);
337                 }
338
339                 if (ll_drop_dentry(dentry))
340                           goto restart;
341         }
342         spin_unlock(&dcache_lock);
343         cfs_spin_unlock(&ll_lookup_lock);
344
345         EXIT;
346 }
347
348 int ll_revalidate_it_finish(struct ptlrpc_request *request,
349                             struct lookup_intent *it,
350                             struct dentry *de)
351 {
352         int rc = 0;
353         ENTRY;
354
355         if (!request)
356                 RETURN(0);
357
358         if (it_disposition(it, DISP_LOOKUP_NEG))
359                 RETURN(-ENOENT);
360
361         rc = ll_prep_inode(&de->d_inode, request, NULL);
362
363         RETURN(rc);
364 }
365
366 void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry)
367 {
368         LASSERT(it != NULL);
369         LASSERT(dentry != NULL);
370
371         if (it->d.lustre.it_lock_mode && dentry->d_inode != NULL) {
372                 struct inode *inode = dentry->d_inode;
373                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
374
375                 CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%u)\n",
376                        inode, inode->i_ino, inode->i_generation);
377                 md_set_lock_data(sbi->ll_md_exp, &it->d.lustre.it_lock_handle,
378                                  inode, NULL);
379         }
380
381         /* drop lookup or getattr locks immediately */
382         if (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR) {
383                 /* on 2.6 there are situation when several lookups and
384                  * revalidations may be requested during single operation.
385                  * therefore, we don't release intent here -bzzz */
386                 ll_intent_drop_lock(it);
387         }
388 }
389
390 void ll_frob_intent(struct lookup_intent **itp, struct lookup_intent *deft)
391 {
392         struct lookup_intent *it = *itp;
393
394         if (!it || it->it_op == IT_GETXATTR)
395                 it = *itp = deft;
396
397 }
398
399 int ll_revalidate_it(struct dentry *de, int lookup_flags,
400                      struct lookup_intent *it)
401 {
402         struct md_op_data *op_data;
403         struct ptlrpc_request *req = NULL;
404         struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
405         struct obd_export *exp;
406         struct inode *parent = de->d_parent->d_inode;
407         int rc, first = 0;
408
409         ENTRY;
410         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,intent=%s\n", de->d_name.name,
411                LL_IT2STR(it));
412
413         if (de->d_inode == NULL) {
414                 /* We can only use negative dentries if this is stat or lookup,
415                    for opens and stuff we do need to query server. */
416                 /* If there is IT_CREAT in intent op set, then we must throw
417                    away this negative dentry and actually do the request to
418                    kernel to create whatever needs to be created (if possible)*/
419                 if (it && (it->it_op & IT_CREAT))
420                         RETURN(0);
421
422                 if (de->d_flags & DCACHE_LUSTRE_INVALID)
423                         RETURN(0);
424
425                 rc = ll_have_md_lock(parent, MDS_INODELOCK_UPDATE, LCK_MINMODE);
426                 GOTO(out_sa, rc);
427         }
428
429         /* Never execute intents for mount points.
430          * Attributes will be fixed up in ll_inode_revalidate_it */
431         if (d_mountpoint(de))
432                 GOTO(out_sa, rc = 1);
433
434         /* need to get attributes in case root got changed from other client */
435         if (de == de->d_sb->s_root) {
436                 rc = __ll_inode_revalidate_it(de, it, MDS_INODELOCK_LOOKUP);
437                 if (rc == 0)
438                         rc = 1;
439                 GOTO(out_sa, rc);
440         }
441
442         exp = ll_i2mdexp(de->d_inode);
443
444         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_REVALIDATE_PAUSE, 5);
445         ll_frob_intent(&it, &lookup_it);
446         LASSERT(it);
447
448         if (it->it_op == IT_LOOKUP && !(de->d_flags & DCACHE_LUSTRE_INVALID))
449                 GOTO(out_sa, rc = 1);
450
451         op_data = ll_prep_md_op_data(NULL, parent, de->d_inode,
452                                      de->d_name.name, de->d_name.len,
453                                      0, LUSTRE_OPC_ANY, NULL);
454         if (IS_ERR(op_data))
455                 RETURN(PTR_ERR(op_data));
456
457         if ((it->it_op == IT_OPEN) && de->d_inode) {
458                 struct inode *inode = de->d_inode;
459                 struct ll_inode_info *lli = ll_i2info(inode);
460                 struct obd_client_handle **och_p;
461                 __u64 *och_usecount;
462
463                 /*
464                  * We used to check for MDS_INODELOCK_OPEN here, but in fact
465                  * just having LOOKUP lock is enough to justify inode is the
466                  * same. And if inode is the same and we have suitable
467                  * openhandle, then there is no point in doing another OPEN RPC
468                  * just to throw away newly received openhandle.  There are no
469                  * security implications too, if file owner or access mode is
470                  * change, LOOKUP lock is revoked.
471                  */
472
473
474                 if (it->it_flags & FMODE_WRITE) {
475                         och_p = &lli->lli_mds_write_och;
476                         och_usecount = &lli->lli_open_fd_write_count;
477                 } else if (it->it_flags & FMODE_EXEC) {
478                         och_p = &lli->lli_mds_exec_och;
479                         och_usecount = &lli->lli_open_fd_exec_count;
480                 } else {
481                         och_p = &lli->lli_mds_read_och;
482                         och_usecount = &lli->lli_open_fd_read_count;
483                 }
484                 /* Check for the proper lock. */
485                 if (!ll_have_md_lock(inode, MDS_INODELOCK_LOOKUP, LCK_MINMODE))
486                         goto do_lock;
487                 cfs_down(&lli->lli_och_sem);
488                 if (*och_p) { /* Everything is open already, do nothing */
489                         /*(*och_usecount)++;  Do not let them steal our open
490                           handle from under us */
491                         /* XXX The code above was my original idea, but in case
492                            we have the handle, but we cannot use it due to later
493                            checks (e.g. O_CREAT|O_EXCL flags set), nobody
494                            would decrement counter increased here. So we just
495                            hope the lock won't be invalidated in between. But
496                            if it would be, we'll reopen the open request to
497                            MDS later during file open path */
498                         cfs_up(&lli->lli_och_sem);
499                         ll_finish_md_op_data(op_data);
500                         RETURN(1);
501                 } else {
502                         cfs_up(&lli->lli_och_sem);
503                 }
504         }
505
506         if (it->it_op == IT_GETATTR)
507                 first = ll_statahead_enter(parent, &de, 0);
508
509 do_lock:
510         it->it_create_mode &= ~cfs_curproc_umask();
511         it->it_create_mode |= M_CHECK_STALE;
512         rc = md_intent_lock(exp, op_data, NULL, 0, it,
513                             lookup_flags,
514                             &req, ll_md_blocking_ast, 0);
515         it->it_create_mode &= ~M_CHECK_STALE;
516         ll_finish_md_op_data(op_data);
517         if (it->it_op == IT_GETATTR && !first)
518                 /* If there are too many locks on client-side, then some
519                  * locks taken by statahead maybe dropped automatically
520                  * before the real "revalidate" using them. */
521                 ll_statahead_exit(parent, de, req == NULL ? rc : 0);
522         else if (first == -EEXIST)
523                 ll_statahead_mark(parent, de);
524
525         /* If req is NULL, then md_intent_lock only tried to do a lock match;
526          * if all was well, it will return 1 if it found locks, 0 otherwise. */
527         if (req == NULL && rc >= 0) {
528                 if (!rc)
529                         goto do_lookup;
530                 GOTO(out, rc);
531         }
532
533         if (rc < 0) {
534                 if (rc != -ESTALE) {
535                         CDEBUG(D_INFO, "ll_intent_lock: rc %d : it->it_status "
536                                "%d\n", rc, it->d.lustre.it_status);
537                 }
538                 GOTO(out, rc = 0);
539         }
540
541 revalidate_finish:
542         rc = ll_revalidate_it_finish(req, it, de);
543         if (rc != 0) {
544                 if (rc != -ESTALE && rc != -ENOENT)
545                         ll_intent_release(it);
546                 GOTO(out, rc = 0);
547         }
548
549         if ((it->it_op & IT_OPEN) && de->d_inode &&
550             !S_ISREG(de->d_inode->i_mode) &&
551             !S_ISDIR(de->d_inode->i_mode)) {
552                 ll_release_openhandle(de, it);
553         }
554         rc = 1;
555
556         /* unfortunately ll_intent_lock may cause a callback and revoke our
557          * dentry */
558         cfs_spin_lock(&ll_lookup_lock);
559         spin_lock(&dcache_lock);
560         lock_dentry(de);
561         __d_drop(de);
562         unlock_dentry(de);
563         d_rehash_cond(de, 0);
564         spin_unlock(&dcache_lock);
565         cfs_spin_unlock(&ll_lookup_lock);
566
567 out:
568         /* We do not free request as it may be reused during following lookup
569          * (see comment in mdc/mdc_locks.c::mdc_intent_lock()), request will
570          * be freed in ll_lookup_it or in ll_intent_release. But if
571          * request was not completed, we need to free it. (bug 5154, 9903) */
572         if (req != NULL && !it_disposition(it, DISP_ENQ_COMPLETE))
573                 ptlrpc_req_finished(req);
574         if (rc == 0) {
575                 ll_unhash_aliases(de->d_inode);
576                 /* done in ll_unhash_aliases()
577                    dentry->d_flags |= DCACHE_LUSTRE_INVALID; */
578         } else {
579                 CDEBUG(D_DENTRY, "revalidated dentry %.*s (%p) parent %p "
580                        "inode %p refc %d\n", de->d_name.len,
581                        de->d_name.name, de, de->d_parent, de->d_inode,
582                        atomic_read(&de->d_count));
583                 if (de->d_flags & DCACHE_LUSTRE_INVALID) {
584                         lock_dentry(de);
585                         de->d_flags &= ~DCACHE_LUSTRE_INVALID;
586                         unlock_dentry(de);
587                 }
588                 ll_lookup_finish_locks(it, de);
589         }
590         RETURN(rc);
591
592         /*
593          * This part is here to combat evil-evil race in real_lookup on 2.6
594          * kernels.  The race details are: We enter do_lookup() looking for some
595          * name, there is nothing in dcache for this name yet and d_lookup()
596          * returns NULL.  We proceed to real_lookup(), and while we do this,
597          * another process does open on the same file we looking up (most simple
598          * reproducer), open succeeds and the dentry is added. Now back to
599          * us. In real_lookup() we do d_lookup() again and suddenly find the
600          * dentry, so we call d_revalidate on it, but there is no lock, so
601          * without this code we would return 0, but unpatched real_lookup just
602          * returns -ENOENT in such a case instead of retrying the lookup. Once
603          * this is dealt with in real_lookup(), all of this ugly mess can go and
604          * we can just check locks in ->d_revalidate without doing any RPCs
605          * ever.
606          */
607 do_lookup:
608         if (it != &lookup_it) {
609                 /* MDS_INODELOCK_UPDATE needed for IT_GETATTR case. */
610                 if (it->it_op == IT_GETATTR)
611                         lookup_it.it_op = IT_GETATTR;
612                 ll_lookup_finish_locks(it, de);
613                 it = &lookup_it;
614         }
615
616         /* Do real lookup here. */
617         op_data = ll_prep_md_op_data(NULL, parent, NULL, de->d_name.name,
618                                      de->d_name.len, 0, (it->it_op & IT_CREAT ?
619                                                          LUSTRE_OPC_CREATE :
620                                                          LUSTRE_OPC_ANY), NULL);
621         if (IS_ERR(op_data))
622                 RETURN(PTR_ERR(op_data));
623
624         rc = md_intent_lock(exp, op_data, NULL, 0,  it, 0, &req,
625                             ll_md_blocking_ast, 0);
626         if (rc >= 0) {
627                 struct mdt_body *mdt_body;
628                 struct lu_fid fid = {.f_seq = 0, .f_oid = 0, .f_ver = 0};
629                 mdt_body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
630
631                 if (de->d_inode)
632                         fid = *ll_inode2fid(de->d_inode);
633
634                 /* see if we got same inode, if not - return error */
635                 if (lu_fid_eq(&fid, &mdt_body->fid1)) {
636                         ll_finish_md_op_data(op_data);
637                         op_data = NULL;
638                         goto revalidate_finish;
639                 }
640                 ll_intent_release(it);
641         }
642         ll_finish_md_op_data(op_data);
643         GOTO(out, rc = 0);
644
645 out_sa:
646         /*
647          * For rc == 1 case, should not return directly to prevent losing
648          * statahead windows; for rc == 0 case, the "lookup" will be done later.
649          */
650         if (it && it->it_op == IT_GETATTR && rc == 1) {
651                 first = ll_statahead_enter(parent, &de, 0);
652                 if (first >= 0)
653                         ll_statahead_exit(parent, de, 1);
654                 else if (first == -EEXIST)
655                         ll_statahead_mark(parent, de);
656         }
657
658         return rc;
659 }
660
661 int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
662 {
663         int rc;
664         ENTRY;
665
666         if (nd && !(nd->flags & (LOOKUP_CONTINUE|LOOKUP_PARENT))) {
667                 struct lookup_intent *it;
668
669                 it = ll_convert_intent(&nd->intent.open, nd->flags);
670                 if (IS_ERR(it))
671                         RETURN(0);
672
673                 if (it->it_op == (IT_OPEN|IT_CREAT) &&
674                     nd->intent.open.flags & O_EXCL) {
675                         CDEBUG(D_VFSTRACE, "create O_EXCL, returning 0\n");
676                         rc = 0;
677                         goto out_it;
678                 }
679
680                 rc = ll_revalidate_it(dentry, nd->flags, it);
681
682                 if (rc && (nd->flags & LOOKUP_OPEN) &&
683                     it_disposition(it, DISP_OPEN_OPEN)) {/*Open*/
684 #ifdef HAVE_FILE_IN_STRUCT_INTENT
685 // XXX Code duplication with ll_lookup_nd
686                         if (S_ISFIFO(dentry->d_inode->i_mode)) {
687                                 // We cannot call open here as it would
688                                 // deadlock.
689                                 ptlrpc_req_finished(
690                                                (struct ptlrpc_request *)
691                                                   it->d.lustre.it_data);
692                         } else {
693 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,17))
694 /* 2.6.1[456] have a bug in open_namei() that forgets to check
695  * nd->intent.open.file for error, so we need to return it as lookup's result
696  * instead */
697                                 struct file *filp;
698
699                                 nd->intent.open.file->private_data = it;
700                                 filp = lookup_instantiate_filp(nd, dentry,NULL);
701                                 if (IS_ERR(filp)) {
702                                         rc = PTR_ERR(filp);
703                                 }
704 #else
705                                 nd->intent.open.file->private_data = it;
706                                 (void)lookup_instantiate_filp(nd, dentry,NULL);
707 #endif
708                         }
709 #else
710                         ll_release_openhandle(dentry, it);
711 #endif /* HAVE_FILE_IN_STRUCT_INTENT */
712                 }
713                 if (!rc && (nd->flags & LOOKUP_CREATE) &&
714                     it_disposition(it, DISP_OPEN_CREATE)) {
715                         /* We created something but we may only return
716                          * negative dentry here, so save request in dentry,
717                          * if lookup will be called later on, it will
718                          * pick the request, otherwise it would be freed
719                          * with dentry */
720                         ll_d2d(dentry)->lld_it = it;
721                         it = NULL; /* avoid freeing */
722                 }
723
724 out_it:
725                 if (it) {
726                         ll_intent_release(it);
727                         OBD_FREE(it, sizeof(*it));
728                 }
729         } else {
730                 rc = ll_revalidate_it(dentry, 0, NULL);
731         }
732
733         RETURN(rc);
734 }
735
736 void ll_d_iput(struct dentry *de, struct inode *inode)
737 {
738         LASSERT(inode);
739         if (!find_cbdata(inode))
740                 inode->i_nlink = 0;
741         iput(inode);
742 }
743
744 struct dentry_operations ll_d_ops = {
745         .d_revalidate = ll_revalidate_nd,
746         .d_release = ll_release,
747         .d_delete  = ll_ddelete,
748         .d_iput    = ll_d_iput,
749         .d_compare = ll_dcompare,
750 };