Whamcloud - gitweb
LU-555 ll_have_md_lock() optimization to accelerate multiple bits locks
[fs/lustre-release.git] / lustre / llite / dcache.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011 Whamcloud, Inc.
33  *
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  */
39
40 #include <linux/fs.h>
41 #include <linux/sched.h>
42 #include <linux/smp_lock.h>
43 #include <linux/quotaops.h>
44
45 #define DEBUG_SUBSYSTEM S_LLITE
46
47 #include <obd_support.h>
48 #include <lustre_lite.h>
49 #include <lustre/lustre_idl.h>
50 #include <lustre_dlm.h>
51 #include <lustre_mdc.h>
52 //#include <lustre_ver.h>
53 //#include <lustre_version.h>
54
55 #include "llite_internal.h"
56
57 cfs_spinlock_t ll_lookup_lock = CFS_SPIN_LOCK_UNLOCKED;
58
59 /* should NOT be called with the dcache lock, see fs/dcache.c */
60 static void ll_release(struct dentry *de)
61 {
62         struct ll_dentry_data *lld;
63         ENTRY;
64         LASSERT(de != NULL);
65         lld = ll_d2d(de);
66         if (lld == NULL) { /* NFS copies the de->d_op methods (bug 4655) */
67                 EXIT;
68                 return;
69         }
70         if (lld->lld_it) {
71                 ll_intent_release(lld->lld_it);
72                 OBD_FREE(lld->lld_it, sizeof(*lld->lld_it));
73         }
74         LASSERT(lld->lld_cwd_count == 0);
75         LASSERT(lld->lld_mnt_count == 0);
76         OBD_FREE(de->d_fsdata, sizeof(*lld));
77
78         EXIT;
79 }
80
81 /* Compare if two dentries are the same.  Don't match if the existing dentry
82  * is marked DCACHE_LUSTRE_INVALID.  Returns 1 if different, 0 if the same.
83  *
84  * This avoids a race where ll_lookup_it() instantiates a dentry, but we get
85  * an AST before calling d_revalidate_it().  The dentry still exists (marked
86  * INVALID) so d_lookup() matches it, but we have no lock on it (so
87  * lock_match() fails) and we spin around real_lookup(). */
88 int ll_dcompare(struct dentry *parent, struct qstr *d_name, struct qstr *name)
89 {
90         struct dentry *dchild;
91         ENTRY;
92
93         if (d_name->len != name->len)
94                 RETURN(1);
95
96         if (memcmp(d_name->name, name->name, name->len))
97                 RETURN(1);
98
99         /* XXX: d_name must be in-dentry structure */
100         dchild = container_of(d_name, struct dentry, d_name); /* ugh */
101
102         CDEBUG(D_DENTRY,"found name %.*s(%p) - flags %d/%x - refc %d\n",
103                name->len, name->name, dchild,
104                d_mountpoint(dchild), dchild->d_flags & DCACHE_LUSTRE_INVALID,
105                atomic_read(&dchild->d_count));
106
107          /* mountpoint is always valid */
108         if (d_mountpoint(dchild))
109                 RETURN(0);
110
111         if (dchild->d_flags & DCACHE_LUSTRE_INVALID)
112                 RETURN(1);
113
114         RETURN(0);
115 }
116
117 static inline int return_if_equal(struct ldlm_lock *lock, void *data)
118 {
119         if ((lock->l_flags &
120              (LDLM_FL_CANCELING | LDLM_FL_DISCARD_DATA)) ==
121             (LDLM_FL_CANCELING | LDLM_FL_DISCARD_DATA))
122                 return LDLM_ITER_CONTINUE;
123         return LDLM_ITER_STOP;
124 }
125
126 /* find any ldlm lock of the inode in mdc and lov
127  * return 0    not find
128  *        1    find one
129  *      < 0    error */
130 static int find_cbdata(struct inode *inode)
131 {
132         struct ll_inode_info *lli = ll_i2info(inode);
133         struct ll_sb_info *sbi = ll_i2sbi(inode);
134         int rc = 0;
135         ENTRY;
136
137         LASSERT(inode);
138         rc = md_find_cbdata(sbi->ll_md_exp, ll_inode2fid(inode),
139                             return_if_equal, NULL);
140         if (rc != 0)
141                  RETURN(rc);
142
143         if (lli->lli_smd)
144                 rc = obd_find_cbdata(sbi->ll_dt_exp, lli->lli_smd,
145                                      return_if_equal, NULL);
146
147         RETURN(rc);
148 }
149
150 /**
151  * Called when last reference to a dentry is dropped and dcache wants to know
152  * whether or not it should cache it:
153  * - return 1 to delete the dentry immediately
154  * - return 0 to cache the dentry
155  * Should NOT be called with the dcache lock, see fs/dcache.c
156  */
157 static int ll_ddelete(struct dentry *de)
158 {
159         ENTRY;
160         LASSERT(de);
161
162         CDEBUG(D_DENTRY, "%s dentry %.*s (%p, parent %p, inode %p) %s%s\n",
163                (de->d_flags & DCACHE_LUSTRE_INVALID ? "deleting" : "keeping"),
164                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
165                d_unhashed(de) ? "" : "hashed,",
166                list_empty(&de->d_subdirs) ? "" : "subdirs");
167
168         /* if not ldlm lock for this inode, set i_nlink to 0 so that
169          * this inode can be recycled later b=20433 */
170         LASSERT(atomic_read(&de->d_count) == 0);
171         if (de->d_inode && !find_cbdata(de->d_inode))
172                 de->d_inode->i_nlink = 0;
173
174         if (de->d_flags & DCACHE_LUSTRE_INVALID)
175                 RETURN(1);
176
177         RETURN(0);
178 }
179
180 static int ll_set_dd(struct dentry *de)
181 {
182         ENTRY;
183         LASSERT(de != NULL);
184
185         CDEBUG(D_DENTRY, "ldd on dentry %.*s (%p) parent %p inode %p refc %d\n",
186                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
187                atomic_read(&de->d_count));
188
189         if (de->d_fsdata == NULL) {
190                 struct ll_dentry_data *lld;
191
192                 OBD_ALLOC_PTR(lld);
193                 if (likely(lld != NULL)) {
194                         lock_dentry(de);
195                         if (likely(de->d_fsdata == NULL))
196                                 de->d_fsdata = lld;
197                         else
198                                 OBD_FREE_PTR(lld);
199                         unlock_dentry(de);
200                 } else {
201                         RETURN(-ENOMEM);
202                 }
203         }
204
205         RETURN(0);
206 }
207
208 int ll_dops_init(struct dentry *de, int block, int init_sa)
209 {
210         struct ll_dentry_data *lld = ll_d2d(de);
211         int rc = 0;
212
213         if (lld == NULL && block != 0) {
214                 rc = ll_set_dd(de);
215                 if (rc)
216                         return rc;
217
218                 lld = ll_d2d(de);
219         }
220
221         if (lld != NULL && init_sa != 0)
222                 lld->lld_sa_generation = 0;
223
224         de->d_op = &ll_d_ops;
225         return rc;
226 }
227
228 void ll_intent_drop_lock(struct lookup_intent *it)
229 {
230         struct lustre_handle *handle;
231
232         if (it->it_op && it->d.lustre.it_lock_mode) {
233                 handle = (struct lustre_handle *)&it->d.lustre.it_lock_handle;
234                 CDEBUG(D_DLMTRACE, "releasing lock with cookie "LPX64
235                        " from it %p\n", handle->cookie, it);
236                 ldlm_lock_decref(handle, it->d.lustre.it_lock_mode);
237
238                 /* bug 494: intent_release may be called multiple times, from
239                  * this thread and we don't want to double-decref this lock */
240                 it->d.lustre.it_lock_mode = 0;
241         }
242 }
243
244 void ll_intent_release(struct lookup_intent *it)
245 {
246         ENTRY;
247
248         CDEBUG(D_INFO, "intent %p released\n", it);
249         ll_intent_drop_lock(it);
250         /* We are still holding extra reference on a request, need to free it */
251         if (it_disposition(it, DISP_ENQ_OPEN_REF))
252                  ptlrpc_req_finished(it->d.lustre.it_data); /* ll_file_open */
253         if (it_disposition(it, DISP_ENQ_CREATE_REF)) /* create rec */
254                 ptlrpc_req_finished(it->d.lustre.it_data);
255         if (it_disposition(it, DISP_ENQ_COMPLETE)) /* saved req from revalidate
256                                                     * to lookup */
257                 ptlrpc_req_finished(it->d.lustre.it_data);
258
259         it->d.lustre.it_disposition = 0;
260         it->d.lustre.it_data = NULL;
261         EXIT;
262 }
263
264 /* Drop dentry if it is not used already, unhash otherwise.
265    Should be called with dcache lock held!
266    Returns: 1 if dentry was dropped, 0 if unhashed. */
267 int ll_drop_dentry(struct dentry *dentry)
268 {
269         lock_dentry(dentry);
270         if (atomic_read(&dentry->d_count) == 0) {
271                 CDEBUG(D_DENTRY, "deleting dentry %.*s (%p) parent %p "
272                        "inode %p\n", dentry->d_name.len,
273                        dentry->d_name.name, dentry, dentry->d_parent,
274                        dentry->d_inode);
275                 dget_locked(dentry);
276                 __d_drop(dentry);
277                 unlock_dentry(dentry);
278                 spin_unlock(&dcache_lock);
279                 cfs_spin_unlock(&ll_lookup_lock);
280                 dput(dentry);
281                 cfs_spin_lock(&ll_lookup_lock);
282                 spin_lock(&dcache_lock);
283                 return 1;
284         }
285         /* disconected dentry can not be find without lookup, because we
286          * not need his to unhash or mark invalid. */
287         if (dentry->d_flags & DCACHE_DISCONNECTED) {
288                 unlock_dentry(dentry);
289                 RETURN (0);
290         }
291
292         if (!(dentry->d_flags & DCACHE_LUSTRE_INVALID)) {
293                 CDEBUG(D_DENTRY, "unhashing dentry %.*s (%p) parent %p "
294                        "inode %p refc %d\n", dentry->d_name.len,
295                        dentry->d_name.name, dentry, dentry->d_parent,
296                        dentry->d_inode, atomic_read(&dentry->d_count));
297                 /* actually we don't unhash the dentry, rather just
298                  * mark it inaccessible for to __d_lookup(). otherwise
299                  * sys_getcwd() could return -ENOENT -bzzz */
300                 dentry->d_flags |= DCACHE_LUSTRE_INVALID;
301                 if (!dentry->d_inode || !S_ISDIR(dentry->d_inode->i_mode))
302                         __d_drop(dentry);
303         }
304         unlock_dentry(dentry);
305         return 0;
306 }
307
308 void ll_unhash_aliases(struct inode *inode)
309 {
310         struct list_head *tmp, *head;
311         ENTRY;
312
313         if (inode == NULL) {
314                 CERROR("unexpected NULL inode, tell phil\n");
315                 return;
316         }
317
318         CDEBUG(D_INODE, "marking dentries for ino %lu/%u(%p) invalid\n",
319                inode->i_ino, inode->i_generation, inode);
320
321         head = &inode->i_dentry;
322         cfs_spin_lock(&ll_lookup_lock);
323         spin_lock(&dcache_lock);
324 restart:
325         tmp = head;
326         while ((tmp = tmp->next) != head) {
327                 struct dentry *dentry = list_entry(tmp, struct dentry, d_alias);
328
329                 CDEBUG(D_DENTRY, "dentry in drop %.*s (%p) parent %p "
330                        "inode %p flags %d\n", dentry->d_name.len,
331                        dentry->d_name.name, dentry, dentry->d_parent,
332                        dentry->d_inode, dentry->d_flags);
333
334                 if (dentry->d_name.len == 1 && dentry->d_name.name[0] == '/') {
335                         CERROR("called on root (?) dentry=%p, inode=%p "
336                                "ino=%lu\n", dentry, inode, inode->i_ino);
337                         lustre_dump_dentry(dentry, 1);
338                         libcfs_debug_dumpstack(NULL);
339                 }
340
341                 if (ll_drop_dentry(dentry))
342                           goto restart;
343         }
344         spin_unlock(&dcache_lock);
345         cfs_spin_unlock(&ll_lookup_lock);
346
347         EXIT;
348 }
349
350 int ll_revalidate_it_finish(struct ptlrpc_request *request,
351                             struct lookup_intent *it,
352                             struct dentry *de)
353 {
354         int rc = 0;
355         ENTRY;
356
357         if (!request)
358                 RETURN(0);
359
360         if (it_disposition(it, DISP_LOOKUP_NEG))
361                 RETURN(-ENOENT);
362
363         rc = ll_prep_inode(&de->d_inode, request, NULL);
364
365         RETURN(rc);
366 }
367
368 void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry)
369 {
370         LASSERT(it != NULL);
371         LASSERT(dentry != NULL);
372
373         if (it->d.lustre.it_lock_mode && dentry->d_inode != NULL) {
374                 struct inode *inode = dentry->d_inode;
375                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
376
377                 CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%u)\n",
378                        inode, inode->i_ino, inode->i_generation);
379                 md_set_lock_data(sbi->ll_md_exp, &it->d.lustre.it_lock_handle,
380                                  inode, NULL);
381         }
382
383         /* drop lookup or getattr locks immediately */
384         if (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR) {
385                 /* on 2.6 there are situation when several lookups and
386                  * revalidations may be requested during single operation.
387                  * therefore, we don't release intent here -bzzz */
388                 ll_intent_drop_lock(it);
389         }
390 }
391
392 void ll_frob_intent(struct lookup_intent **itp, struct lookup_intent *deft)
393 {
394         struct lookup_intent *it = *itp;
395
396         if (!it || it->it_op == IT_GETXATTR)
397                 it = *itp = deft;
398
399 }
400
401 int ll_revalidate_it(struct dentry *de, int lookup_flags,
402                      struct lookup_intent *it)
403 {
404         struct md_op_data *op_data;
405         struct ptlrpc_request *req = NULL;
406         struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
407         struct obd_export *exp;
408         struct inode *parent = de->d_parent->d_inode;
409         int rc, first = 0;
410
411         ENTRY;
412         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,intent=%s\n", de->d_name.name,
413                LL_IT2STR(it));
414
415         if (de->d_inode == NULL) {
416                 __u64 ibits;
417
418                 /* We can only use negative dentries if this is stat or lookup,
419                    for opens and stuff we do need to query server. */
420                 /* If there is IT_CREAT in intent op set, then we must throw
421                    away this negative dentry and actually do the request to
422                    kernel to create whatever needs to be created (if possible)*/
423                 if (it && (it->it_op & IT_CREAT))
424                         RETURN(0);
425
426                 if (de->d_flags & DCACHE_LUSTRE_INVALID)
427                         RETURN(0);
428
429                 ibits = MDS_INODELOCK_UPDATE;
430                 rc = ll_have_md_lock(parent, &ibits, LCK_MINMODE);
431                 GOTO(out_sa, rc);
432         }
433
434         /* Never execute intents for mount points.
435          * Attributes will be fixed up in ll_inode_revalidate_it */
436         if (d_mountpoint(de))
437                 GOTO(out_sa, rc = 1);
438
439         /* need to get attributes in case root got changed from other client */
440         if (de == de->d_sb->s_root) {
441                 rc = __ll_inode_revalidate_it(de, it, MDS_INODELOCK_LOOKUP);
442                 if (rc == 0)
443                         rc = 1;
444                 GOTO(out_sa, rc);
445         }
446
447         exp = ll_i2mdexp(de->d_inode);
448
449         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_REVALIDATE_PAUSE, 5);
450         ll_frob_intent(&it, &lookup_it);
451         LASSERT(it);
452
453         if (it->it_op == IT_LOOKUP && !(de->d_flags & DCACHE_LUSTRE_INVALID))
454                 GOTO(out_sa, rc = 1);
455
456         op_data = ll_prep_md_op_data(NULL, parent, de->d_inode,
457                                      de->d_name.name, de->d_name.len,
458                                      0, LUSTRE_OPC_ANY, NULL);
459         if (IS_ERR(op_data))
460                 RETURN(PTR_ERR(op_data));
461
462         if ((it->it_op == IT_OPEN) && de->d_inode) {
463                 struct inode *inode = de->d_inode;
464                 struct ll_inode_info *lli = ll_i2info(inode);
465                 struct obd_client_handle **och_p;
466                 __u64 *och_usecount;
467                 __u64 ibits;
468
469                 /*
470                  * We used to check for MDS_INODELOCK_OPEN here, but in fact
471                  * just having LOOKUP lock is enough to justify inode is the
472                  * same. And if inode is the same and we have suitable
473                  * openhandle, then there is no point in doing another OPEN RPC
474                  * just to throw away newly received openhandle.  There are no
475                  * security implications too, if file owner or access mode is
476                  * change, LOOKUP lock is revoked.
477                  */
478
479
480                 if (it->it_flags & FMODE_WRITE) {
481                         och_p = &lli->lli_mds_write_och;
482                         och_usecount = &lli->lli_open_fd_write_count;
483                 } else if (it->it_flags & FMODE_EXEC) {
484                         och_p = &lli->lli_mds_exec_och;
485                         och_usecount = &lli->lli_open_fd_exec_count;
486                 } else {
487                         och_p = &lli->lli_mds_read_och;
488                         och_usecount = &lli->lli_open_fd_read_count;
489                 }
490                 /* Check for the proper lock. */
491                 ibits = MDS_INODELOCK_LOOKUP;
492                 if (!ll_have_md_lock(inode, &ibits, LCK_MINMODE))
493                         goto do_lock;
494                 cfs_down(&lli->lli_och_sem);
495                 if (*och_p) { /* Everything is open already, do nothing */
496                         /*(*och_usecount)++;  Do not let them steal our open
497                           handle from under us */
498                         /* XXX The code above was my original idea, but in case
499                            we have the handle, but we cannot use it due to later
500                            checks (e.g. O_CREAT|O_EXCL flags set), nobody
501                            would decrement counter increased here. So we just
502                            hope the lock won't be invalidated in between. But
503                            if it would be, we'll reopen the open request to
504                            MDS later during file open path */
505                         cfs_up(&lli->lli_och_sem);
506                         ll_finish_md_op_data(op_data);
507                         RETURN(1);
508                 } else {
509                         cfs_up(&lli->lli_och_sem);
510                 }
511         }
512
513         if (it->it_op == IT_GETATTR)
514                 first = ll_statahead_enter(parent, &de, 0);
515
516 do_lock:
517         it->it_create_mode &= ~cfs_curproc_umask();
518         it->it_create_mode |= M_CHECK_STALE;
519         rc = md_intent_lock(exp, op_data, NULL, 0, it,
520                             lookup_flags,
521                             &req, ll_md_blocking_ast, 0);
522         it->it_create_mode &= ~M_CHECK_STALE;
523         ll_finish_md_op_data(op_data);
524         if (it->it_op == IT_GETATTR && !first)
525                 /* If there are too many locks on client-side, then some
526                  * locks taken by statahead maybe dropped automatically
527                  * before the real "revalidate" using them. */
528                 ll_statahead_exit(parent, de, req == NULL ? rc : 0);
529         else if (first == -EEXIST)
530                 ll_statahead_mark(parent, de);
531
532         /* If req is NULL, then md_intent_lock only tried to do a lock match;
533          * if all was well, it will return 1 if it found locks, 0 otherwise. */
534         if (req == NULL && rc >= 0) {
535                 if (!rc)
536                         goto do_lookup;
537                 GOTO(out, rc);
538         }
539
540         if (rc < 0) {
541                 if (rc != -ESTALE) {
542                         CDEBUG(D_INFO, "ll_intent_lock: rc %d : it->it_status "
543                                "%d\n", rc, it->d.lustre.it_status);
544                 }
545                 GOTO(out, rc = 0);
546         }
547
548 revalidate_finish:
549         rc = ll_revalidate_it_finish(req, it, de);
550         if (rc != 0) {
551                 if (rc != -ESTALE && rc != -ENOENT)
552                         ll_intent_release(it);
553                 GOTO(out, rc = 0);
554         }
555
556         if ((it->it_op & IT_OPEN) && de->d_inode &&
557             !S_ISREG(de->d_inode->i_mode) &&
558             !S_ISDIR(de->d_inode->i_mode)) {
559                 ll_release_openhandle(de, it);
560         }
561         rc = 1;
562
563         /* unfortunately ll_intent_lock may cause a callback and revoke our
564          * dentry */
565         cfs_spin_lock(&ll_lookup_lock);
566         spin_lock(&dcache_lock);
567         lock_dentry(de);
568         __d_drop(de);
569         unlock_dentry(de);
570         d_rehash_cond(de, 0);
571         spin_unlock(&dcache_lock);
572         cfs_spin_unlock(&ll_lookup_lock);
573
574 out:
575         /* We do not free request as it may be reused during following lookup
576          * (see comment in mdc/mdc_locks.c::mdc_intent_lock()), request will
577          * be freed in ll_lookup_it or in ll_intent_release. But if
578          * request was not completed, we need to free it. (bug 5154, 9903) */
579         if (req != NULL && !it_disposition(it, DISP_ENQ_COMPLETE))
580                 ptlrpc_req_finished(req);
581         if (rc == 0) {
582                 ll_unhash_aliases(de->d_inode);
583                 /* done in ll_unhash_aliases()
584                    dentry->d_flags |= DCACHE_LUSTRE_INVALID; */
585         } else {
586                 CDEBUG(D_DENTRY, "revalidated dentry %.*s (%p) parent %p "
587                        "inode %p refc %d\n", de->d_name.len,
588                        de->d_name.name, de, de->d_parent, de->d_inode,
589                        atomic_read(&de->d_count));
590                 if (de->d_flags & DCACHE_LUSTRE_INVALID) {
591                         lock_dentry(de);
592                         de->d_flags &= ~DCACHE_LUSTRE_INVALID;
593                         unlock_dentry(de);
594                 }
595                 ll_lookup_finish_locks(it, de);
596         }
597         RETURN(rc);
598
599         /*
600          * This part is here to combat evil-evil race in real_lookup on 2.6
601          * kernels.  The race details are: We enter do_lookup() looking for some
602          * name, there is nothing in dcache for this name yet and d_lookup()
603          * returns NULL.  We proceed to real_lookup(), and while we do this,
604          * another process does open on the same file we looking up (most simple
605          * reproducer), open succeeds and the dentry is added. Now back to
606          * us. In real_lookup() we do d_lookup() again and suddenly find the
607          * dentry, so we call d_revalidate on it, but there is no lock, so
608          * without this code we would return 0, but unpatched real_lookup just
609          * returns -ENOENT in such a case instead of retrying the lookup. Once
610          * this is dealt with in real_lookup(), all of this ugly mess can go and
611          * we can just check locks in ->d_revalidate without doing any RPCs
612          * ever.
613          */
614 do_lookup:
615         if (it != &lookup_it) {
616                 /* MDS_INODELOCK_UPDATE needed for IT_GETATTR case. */
617                 if (it->it_op == IT_GETATTR)
618                         lookup_it.it_op = IT_GETATTR;
619                 ll_lookup_finish_locks(it, de);
620                 it = &lookup_it;
621         }
622
623         /* Do real lookup here. */
624         op_data = ll_prep_md_op_data(NULL, parent, NULL, de->d_name.name,
625                                      de->d_name.len, 0, (it->it_op & IT_CREAT ?
626                                                          LUSTRE_OPC_CREATE :
627                                                          LUSTRE_OPC_ANY), NULL);
628         if (IS_ERR(op_data))
629                 RETURN(PTR_ERR(op_data));
630
631         rc = md_intent_lock(exp, op_data, NULL, 0,  it, 0, &req,
632                             ll_md_blocking_ast, 0);
633         if (rc >= 0) {
634                 struct mdt_body *mdt_body;
635                 struct lu_fid fid = {.f_seq = 0, .f_oid = 0, .f_ver = 0};
636                 mdt_body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
637
638                 if (de->d_inode)
639                         fid = *ll_inode2fid(de->d_inode);
640
641                 /* see if we got same inode, if not - return error */
642                 if (lu_fid_eq(&fid, &mdt_body->fid1)) {
643                         ll_finish_md_op_data(op_data);
644                         op_data = NULL;
645                         goto revalidate_finish;
646                 }
647                 ll_intent_release(it);
648         }
649         ll_finish_md_op_data(op_data);
650         GOTO(out, rc = 0);
651
652 out_sa:
653         /*
654          * For rc == 1 case, should not return directly to prevent losing
655          * statahead windows; for rc == 0 case, the "lookup" will be done later.
656          */
657         if (it && it->it_op == IT_GETATTR && rc == 1) {
658                 first = ll_statahead_enter(parent, &de, 0);
659                 if (first >= 0)
660                         ll_statahead_exit(parent, de, 1);
661                 else if (first == -EEXIST)
662                         ll_statahead_mark(parent, de);
663         }
664
665         return rc;
666 }
667
668 int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
669 {
670         int rc;
671         ENTRY;
672
673         if (nd && !(nd->flags & (LOOKUP_CONTINUE|LOOKUP_PARENT))) {
674                 struct lookup_intent *it;
675
676                 it = ll_convert_intent(&nd->intent.open, nd->flags);
677                 if (IS_ERR(it))
678                         RETURN(0);
679
680                 if (it->it_op == (IT_OPEN|IT_CREAT) &&
681                     nd->intent.open.flags & O_EXCL) {
682                         CDEBUG(D_VFSTRACE, "create O_EXCL, returning 0\n");
683                         rc = 0;
684                         goto out_it;
685                 }
686
687                 rc = ll_revalidate_it(dentry, nd->flags, it);
688
689                 if (rc && (nd->flags & LOOKUP_OPEN) &&
690                     it_disposition(it, DISP_OPEN_OPEN)) {/*Open*/
691 #ifdef HAVE_FILE_IN_STRUCT_INTENT
692 // XXX Code duplication with ll_lookup_nd
693                         if (S_ISFIFO(dentry->d_inode->i_mode)) {
694                                 // We cannot call open here as it would
695                                 // deadlock.
696                                 ptlrpc_req_finished(
697                                                (struct ptlrpc_request *)
698                                                   it->d.lustre.it_data);
699                         } else {
700 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,17))
701 /* 2.6.1[456] have a bug in open_namei() that forgets to check
702  * nd->intent.open.file for error, so we need to return it as lookup's result
703  * instead */
704                                 struct file *filp;
705
706                                 nd->intent.open.file->private_data = it;
707                                 filp = lookup_instantiate_filp(nd, dentry,NULL);
708                                 if (IS_ERR(filp)) {
709                                         rc = PTR_ERR(filp);
710                                 }
711 #else
712                                 nd->intent.open.file->private_data = it;
713                                 (void)lookup_instantiate_filp(nd, dentry,NULL);
714 #endif
715                         }
716 #else
717                         ll_release_openhandle(dentry, it);
718 #endif /* HAVE_FILE_IN_STRUCT_INTENT */
719                 }
720                 if (!rc && (nd->flags & LOOKUP_CREATE) &&
721                     it_disposition(it, DISP_OPEN_CREATE)) {
722                         /* We created something but we may only return
723                          * negative dentry here, so save request in dentry,
724                          * if lookup will be called later on, it will
725                          * pick the request, otherwise it would be freed
726                          * with dentry */
727                         ll_d2d(dentry)->lld_it = it;
728                         it = NULL; /* avoid freeing */
729                 }
730
731 out_it:
732                 if (it) {
733                         ll_intent_release(it);
734                         OBD_FREE(it, sizeof(*it));
735                 }
736         } else {
737                 rc = ll_revalidate_it(dentry, 0, NULL);
738         }
739
740         RETURN(rc);
741 }
742
743 void ll_d_iput(struct dentry *de, struct inode *inode)
744 {
745         LASSERT(inode);
746         if (!find_cbdata(inode))
747                 inode->i_nlink = 0;
748         iput(inode);
749 }
750
751 struct dentry_operations ll_d_ops = {
752         .d_revalidate = ll_revalidate_nd,
753         .d_release = ll_release,
754         .d_delete  = ll_ddelete,
755         .d_iput    = ll_d_iput,
756         .d_compare = ll_dcompare,
757 };