Whamcloud - gitweb
ORNL-26 prevent call md_set_lock_data() repeatly
[fs/lustre-release.git] / lustre / llite / dcache.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011 Whamcloud, Inc.
33  *
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  */
39
40 #include <linux/fs.h>
41 #include <linux/sched.h>
42 #include <linux/smp_lock.h>
43 #include <linux/quotaops.h>
44
45 #define DEBUG_SUBSYSTEM S_LLITE
46
47 #include <obd_support.h>
48 #include <lustre_lite.h>
49 #include <lustre/lustre_idl.h>
50 #include <lustre_dlm.h>
51 //#include <lustre_ver.h>
52 //#include <lustre_version.h>
53
54 #include "llite_internal.h"
55
56 cfs_spinlock_t ll_lookup_lock = CFS_SPIN_LOCK_UNLOCKED;
57
58 /* should NOT be called with the dcache lock, see fs/dcache.c */
59 static void ll_release(struct dentry *de)
60 {
61         struct ll_dentry_data *lld;
62         ENTRY;
63         LASSERT(de != NULL);
64         lld = ll_d2d(de);
65         if (lld == NULL) { /* NFS copies the de->d_op methods (bug 4655) */
66                 EXIT;
67                 return;
68         }
69         if (lld->lld_it) {
70                 ll_intent_release(lld->lld_it);
71                 OBD_FREE(lld->lld_it, sizeof(*lld->lld_it));
72         }
73         LASSERT(lld->lld_cwd_count == 0);
74         LASSERT(lld->lld_mnt_count == 0);
75         OBD_FREE(de->d_fsdata, sizeof(*lld));
76
77         EXIT;
78 }
79
80 /* Compare if two dentries are the same.  Don't match if the existing dentry
81  * is marked DCACHE_LUSTRE_INVALID.  Returns 1 if different, 0 if the same.
82  *
83  * This avoids a race where ll_lookup_it() instantiates a dentry, but we get
84  * an AST before calling d_revalidate_it().  The dentry still exists (marked
85  * INVALID) so d_lookup() matches it, but we have no lock on it (so
86  * lock_match() fails) and we spin around real_lookup(). */
87 int ll_dcompare(struct dentry *parent, struct qstr *d_name, struct qstr *name)
88 {
89         struct dentry *dchild;
90         ENTRY;
91
92         if (d_name->len != name->len)
93                 RETURN(1);
94
95         if (memcmp(d_name->name, name->name, name->len))
96                 RETURN(1);
97
98         /* XXX: d_name must be in-dentry structure */
99         dchild = container_of(d_name, struct dentry, d_name); /* ugh */
100
101         CDEBUG(D_DENTRY,"found name %.*s(%p) - flags %d/%x - refc %d\n",
102                name->len, name->name, dchild,
103                d_mountpoint(dchild), dchild->d_flags & DCACHE_LUSTRE_INVALID,
104                atomic_read(&dchild->d_count));
105
106          /* mountpoint is always valid */
107         if (d_mountpoint(dchild))
108                 RETURN(0);
109
110         if (dchild->d_flags & DCACHE_LUSTRE_INVALID)
111                 RETURN(1);
112
113         RETURN(0);
114 }
115
116 static inline int return_if_equal(struct ldlm_lock *lock, void *data)
117 {
118         if ((lock->l_flags &
119              (LDLM_FL_CANCELING | LDLM_FL_DISCARD_DATA)) ==
120             (LDLM_FL_CANCELING | LDLM_FL_DISCARD_DATA))
121                 return LDLM_ITER_CONTINUE;
122         return LDLM_ITER_STOP;
123 }
124
125 /* find any ldlm lock of the inode in mdc and lov
126  * return 0    not find
127  *        1    find one
128  *      < 0    error */
129 static int find_cbdata(struct inode *inode)
130 {
131         struct ll_inode_info *lli = ll_i2info(inode);
132         struct ll_sb_info *sbi = ll_i2sbi(inode);
133         int rc = 0;
134         ENTRY;
135
136         LASSERT(inode);
137         rc = md_find_cbdata(sbi->ll_md_exp, ll_inode2fid(inode),
138                             return_if_equal, NULL);
139         if (rc != 0)
140                  RETURN(rc);
141
142         if (lli->lli_smd)
143                 rc = obd_find_cbdata(sbi->ll_dt_exp, lli->lli_smd,
144                                      return_if_equal, NULL);
145
146         RETURN(rc);
147 }
148
149 /**
150  * Called when last reference to a dentry is dropped and dcache wants to know
151  * whether or not it should cache it:
152  * - return 1 to delete the dentry immediately
153  * - return 0 to cache the dentry
154  * Should NOT be called with the dcache lock, see fs/dcache.c
155  */
156 static int ll_ddelete(struct dentry *de)
157 {
158         ENTRY;
159         LASSERT(de);
160
161         CDEBUG(D_DENTRY, "%s dentry %.*s (%p, parent %p, inode %p) %s%s\n",
162                (de->d_flags & DCACHE_LUSTRE_INVALID ? "deleting" : "keeping"),
163                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
164                d_unhashed(de) ? "" : "hashed,",
165                list_empty(&de->d_subdirs) ? "" : "subdirs");
166
167         /* if not ldlm lock for this inode, set i_nlink to 0 so that
168          * this inode can be recycled later b=20433 */
169         LASSERT(atomic_read(&de->d_count) == 0);
170         if (de->d_inode && !find_cbdata(de->d_inode))
171                 de->d_inode->i_nlink = 0;
172
173         if (de->d_flags & DCACHE_LUSTRE_INVALID)
174                 RETURN(1);
175
176         RETURN(0);
177 }
178
179 static int ll_set_dd(struct dentry *de)
180 {
181         ENTRY;
182         LASSERT(de != NULL);
183
184         CDEBUG(D_DENTRY, "ldd on dentry %.*s (%p) parent %p inode %p refc %d\n",
185                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
186                atomic_read(&de->d_count));
187
188         if (de->d_fsdata == NULL) {
189                 struct ll_dentry_data *lld;
190
191                 OBD_ALLOC_PTR(lld);
192                 if (likely(lld != NULL)) {
193                         lock_dentry(de);
194                         if (likely(de->d_fsdata == NULL))
195                                 de->d_fsdata = lld;
196                         else
197                                 OBD_FREE_PTR(lld);
198                         unlock_dentry(de);
199                 } else {
200                         RETURN(-ENOMEM);
201                 }
202         }
203
204         RETURN(0);
205 }
206
207 int ll_dops_init(struct dentry *de, int block, int init_sa)
208 {
209         struct ll_dentry_data *lld = ll_d2d(de);
210         int rc = 0;
211
212         if (lld == NULL && block != 0) {
213                 rc = ll_set_dd(de);
214                 if (rc)
215                         return rc;
216
217                 lld = ll_d2d(de);
218         }
219
220         if (lld != NULL && init_sa != 0)
221                 lld->lld_sa_generation = 0;
222
223         de->d_op = &ll_d_ops;
224         return rc;
225 }
226
227 void ll_intent_drop_lock(struct lookup_intent *it)
228 {
229         struct lustre_handle *handle;
230
231         if (it->it_op && it->d.lustre.it_lock_mode) {
232                 handle = (struct lustre_handle *)&it->d.lustre.it_lock_handle;
233                 CDEBUG(D_DLMTRACE, "releasing lock with cookie "LPX64
234                        " from it %p\n", handle->cookie, it);
235                 ldlm_lock_decref(handle, it->d.lustre.it_lock_mode);
236
237                 /* bug 494: intent_release may be called multiple times, from
238                  * this thread and we don't want to double-decref this lock */
239                 it->d.lustre.it_lock_mode = 0;
240         }
241 }
242
243 void ll_intent_release(struct lookup_intent *it)
244 {
245         ENTRY;
246
247         CDEBUG(D_INFO, "intent %p released\n", it);
248         ll_intent_drop_lock(it);
249         /* We are still holding extra reference on a request, need to free it */
250         if (it_disposition(it, DISP_ENQ_OPEN_REF))
251                  ptlrpc_req_finished(it->d.lustre.it_data); /* ll_file_open */
252         if (it_disposition(it, DISP_ENQ_CREATE_REF)) /* create rec */
253                 ptlrpc_req_finished(it->d.lustre.it_data);
254         if (it_disposition(it, DISP_ENQ_COMPLETE)) /* saved req from revalidate
255                                                     * to lookup */
256                 ptlrpc_req_finished(it->d.lustre.it_data);
257
258         it->d.lustre.it_disposition = 0;
259         it->d.lustre.it_data = NULL;
260         EXIT;
261 }
262
263 /* Drop dentry if it is not used already, unhash otherwise.
264    Should be called with dcache lock held!
265    Returns: 1 if dentry was dropped, 0 if unhashed. */
266 int ll_drop_dentry(struct dentry *dentry)
267 {
268         lock_dentry(dentry);
269         if (atomic_read(&dentry->d_count) == 0) {
270                 CDEBUG(D_DENTRY, "deleting dentry %.*s (%p) parent %p "
271                        "inode %p\n", dentry->d_name.len,
272                        dentry->d_name.name, dentry, dentry->d_parent,
273                        dentry->d_inode);
274                 dget_locked(dentry);
275                 __d_drop(dentry);
276                 unlock_dentry(dentry);
277                 spin_unlock(&dcache_lock);
278                 cfs_spin_unlock(&ll_lookup_lock);
279                 dput(dentry);
280                 cfs_spin_lock(&ll_lookup_lock);
281                 spin_lock(&dcache_lock);
282                 return 1;
283         }
284         /* disconected dentry can not be find without lookup, because we
285          * not need his to unhash or mark invalid. */
286         if (dentry->d_flags & DCACHE_DISCONNECTED) {
287                 unlock_dentry(dentry);
288                 RETURN (0);
289         }
290
291         if (!(dentry->d_flags & DCACHE_LUSTRE_INVALID)) {
292                 CDEBUG(D_DENTRY, "unhashing dentry %.*s (%p) parent %p "
293                        "inode %p refc %d\n", dentry->d_name.len,
294                        dentry->d_name.name, dentry, dentry->d_parent,
295                        dentry->d_inode, atomic_read(&dentry->d_count));
296                 /* actually we don't unhash the dentry, rather just
297                  * mark it inaccessible for to __d_lookup(). otherwise
298                  * sys_getcwd() could return -ENOENT -bzzz */
299                 dentry->d_flags |= DCACHE_LUSTRE_INVALID;
300                 if (!dentry->d_inode || !S_ISDIR(dentry->d_inode->i_mode))
301                         __d_drop(dentry);
302         }
303         unlock_dentry(dentry);
304         return 0;
305 }
306
307 void ll_unhash_aliases(struct inode *inode)
308 {
309         struct list_head *tmp, *head;
310         ENTRY;
311
312         if (inode == NULL) {
313                 CERROR("unexpected NULL inode, tell phil\n");
314                 return;
315         }
316
317         CDEBUG(D_INODE, "marking dentries for ino %lu/%u(%p) invalid\n",
318                inode->i_ino, inode->i_generation, inode);
319
320         head = &inode->i_dentry;
321         cfs_spin_lock(&ll_lookup_lock);
322         spin_lock(&dcache_lock);
323 restart:
324         tmp = head;
325         while ((tmp = tmp->next) != head) {
326                 struct dentry *dentry = list_entry(tmp, struct dentry, d_alias);
327
328                 CDEBUG(D_DENTRY, "dentry in drop %.*s (%p) parent %p "
329                        "inode %p flags %d\n", dentry->d_name.len,
330                        dentry->d_name.name, dentry, dentry->d_parent,
331                        dentry->d_inode, dentry->d_flags);
332
333                 if (dentry->d_name.len == 1 && dentry->d_name.name[0] == '/') {
334                         CERROR("called on root (?) dentry=%p, inode=%p "
335                                "ino=%lu\n", dentry, inode, inode->i_ino);
336                         lustre_dump_dentry(dentry, 1);
337                         libcfs_debug_dumpstack(NULL);
338                 }
339
340                 if (ll_drop_dentry(dentry))
341                           goto restart;
342         }
343         spin_unlock(&dcache_lock);
344         cfs_spin_unlock(&ll_lookup_lock);
345
346         EXIT;
347 }
348
349 int ll_revalidate_it_finish(struct ptlrpc_request *request,
350                             struct lookup_intent *it,
351                             struct dentry *de)
352 {
353         int rc = 0;
354         ENTRY;
355
356         if (!request)
357                 RETURN(0);
358
359         if (it_disposition(it, DISP_LOOKUP_NEG))
360                 RETURN(-ENOENT);
361
362         rc = ll_prep_inode(&de->d_inode, request, NULL);
363
364         RETURN(rc);
365 }
366
367 void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry)
368 {
369         LASSERT(it != NULL);
370         LASSERT(dentry != NULL);
371
372         if (it->d.lustre.it_lock_mode && dentry->d_inode != NULL) {
373                 struct inode *inode = dentry->d_inode;
374                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
375
376                 CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%u)\n",
377                        inode, inode->i_ino, inode->i_generation);
378                 ll_set_lock_data(sbi->ll_md_exp, inode, it, NULL);
379         }
380
381         /* drop lookup or getattr locks immediately */
382         if (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR) {
383                 /* on 2.6 there are situation when several lookups and
384                  * revalidations may be requested during single operation.
385                  * therefore, we don't release intent here -bzzz */
386                 ll_intent_drop_lock(it);
387         }
388 }
389
390 void ll_frob_intent(struct lookup_intent **itp, struct lookup_intent *deft)
391 {
392         struct lookup_intent *it = *itp;
393
394         if (!it || it->it_op == IT_GETXATTR)
395                 it = *itp = deft;
396
397 }
398
399 int ll_revalidate_it(struct dentry *de, int lookup_flags,
400                      struct lookup_intent *it)
401 {
402         struct md_op_data *op_data;
403         struct ptlrpc_request *req = NULL;
404         struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
405         struct obd_export *exp;
406         struct inode *parent = de->d_parent->d_inode;
407         int rc, first = 0;
408
409         ENTRY;
410         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,intent=%s\n", de->d_name.name,
411                LL_IT2STR(it));
412
413         if (de->d_inode == NULL) {
414                 __u64 ibits;
415
416                 /* We can only use negative dentries if this is stat or lookup,
417                    for opens and stuff we do need to query server. */
418                 /* If there is IT_CREAT in intent op set, then we must throw
419                    away this negative dentry and actually do the request to
420                    kernel to create whatever needs to be created (if possible)*/
421                 if (it && (it->it_op & IT_CREAT))
422                         RETURN(0);
423
424                 if (de->d_flags & DCACHE_LUSTRE_INVALID)
425                         RETURN(0);
426
427                 ibits = MDS_INODELOCK_UPDATE;
428                 rc = ll_have_md_lock(parent, &ibits, LCK_MINMODE);
429                 GOTO(out_sa, rc);
430         }
431
432         /* Never execute intents for mount points.
433          * Attributes will be fixed up in ll_inode_revalidate_it */
434         if (d_mountpoint(de))
435                 GOTO(out_sa, rc = 1);
436
437         /* need to get attributes in case root got changed from other client */
438         if (de == de->d_sb->s_root) {
439                 rc = __ll_inode_revalidate_it(de, it, MDS_INODELOCK_LOOKUP);
440                 if (rc == 0)
441                         rc = 1;
442                 GOTO(out_sa, rc);
443         }
444
445         exp = ll_i2mdexp(de->d_inode);
446
447         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_REVALIDATE_PAUSE, 5);
448         ll_frob_intent(&it, &lookup_it);
449         LASSERT(it);
450
451         if (it->it_op == IT_LOOKUP && !(de->d_flags & DCACHE_LUSTRE_INVALID))
452                 GOTO(out_sa, rc = 1);
453
454         op_data = ll_prep_md_op_data(NULL, parent, de->d_inode,
455                                      de->d_name.name, de->d_name.len,
456                                      0, LUSTRE_OPC_ANY, NULL);
457         if (IS_ERR(op_data))
458                 RETURN(PTR_ERR(op_data));
459
460         if ((it->it_op == IT_OPEN) && de->d_inode) {
461                 struct inode *inode = de->d_inode;
462                 struct ll_inode_info *lli = ll_i2info(inode);
463                 struct obd_client_handle **och_p;
464                 __u64 *och_usecount;
465                 __u64 ibits;
466
467                 /*
468                  * We used to check for MDS_INODELOCK_OPEN here, but in fact
469                  * just having LOOKUP lock is enough to justify inode is the
470                  * same. And if inode is the same and we have suitable
471                  * openhandle, then there is no point in doing another OPEN RPC
472                  * just to throw away newly received openhandle.  There are no
473                  * security implications too, if file owner or access mode is
474                  * change, LOOKUP lock is revoked.
475                  */
476
477
478                 if (it->it_flags & FMODE_WRITE) {
479                         och_p = &lli->lli_mds_write_och;
480                         och_usecount = &lli->lli_open_fd_write_count;
481                 } else if (it->it_flags & FMODE_EXEC) {
482                         och_p = &lli->lli_mds_exec_och;
483                         och_usecount = &lli->lli_open_fd_exec_count;
484                 } else {
485                         och_p = &lli->lli_mds_read_och;
486                         och_usecount = &lli->lli_open_fd_read_count;
487                 }
488                 /* Check for the proper lock. */
489                 ibits = MDS_INODELOCK_LOOKUP;
490                 if (!ll_have_md_lock(inode, &ibits, LCK_MINMODE))
491                         goto do_lock;
492                 cfs_down(&lli->lli_och_sem);
493                 if (*och_p) { /* Everything is open already, do nothing */
494                         /*(*och_usecount)++;  Do not let them steal our open
495                           handle from under us */
496                         /* XXX The code above was my original idea, but in case
497                            we have the handle, but we cannot use it due to later
498                            checks (e.g. O_CREAT|O_EXCL flags set), nobody
499                            would decrement counter increased here. So we just
500                            hope the lock won't be invalidated in between. But
501                            if it would be, we'll reopen the open request to
502                            MDS later during file open path */
503                         cfs_up(&lli->lli_och_sem);
504                         ll_finish_md_op_data(op_data);
505                         RETURN(1);
506                 } else {
507                         cfs_up(&lli->lli_och_sem);
508                 }
509         }
510
511         if (it->it_op == IT_GETATTR)
512                 first = ll_statahead_enter(parent, &de, 0);
513
514 do_lock:
515         it->it_create_mode &= ~cfs_curproc_umask();
516         it->it_create_mode |= M_CHECK_STALE;
517         rc = md_intent_lock(exp, op_data, NULL, 0, it,
518                             lookup_flags,
519                             &req, ll_md_blocking_ast, 0);
520         it->it_create_mode &= ~M_CHECK_STALE;
521         ll_finish_md_op_data(op_data);
522         if (it->it_op == IT_GETATTR && !first)
523                 /* If there are too many locks on client-side, then some
524                  * locks taken by statahead maybe dropped automatically
525                  * before the real "revalidate" using them. */
526                 ll_statahead_exit(parent, de, req == NULL ? rc : 0);
527         else if (first == -EEXIST)
528                 ll_statahead_mark(parent, de);
529
530         /* If req is NULL, then md_intent_lock only tried to do a lock match;
531          * if all was well, it will return 1 if it found locks, 0 otherwise. */
532         if (req == NULL && rc >= 0) {
533                 if (!rc)
534                         goto do_lookup;
535                 GOTO(out, rc);
536         }
537
538         if (rc < 0) {
539                 if (rc != -ESTALE) {
540                         CDEBUG(D_INFO, "ll_intent_lock: rc %d : it->it_status "
541                                "%d\n", rc, it->d.lustre.it_status);
542                 }
543                 GOTO(out, rc = 0);
544         }
545
546 revalidate_finish:
547         rc = ll_revalidate_it_finish(req, it, de);
548         if (rc != 0) {
549                 if (rc != -ESTALE && rc != -ENOENT)
550                         ll_intent_release(it);
551                 GOTO(out, rc = 0);
552         }
553
554         if ((it->it_op & IT_OPEN) && de->d_inode &&
555             !S_ISREG(de->d_inode->i_mode) &&
556             !S_ISDIR(de->d_inode->i_mode)) {
557                 ll_release_openhandle(de, it);
558         }
559         rc = 1;
560
561         /* unfortunately ll_intent_lock may cause a callback and revoke our
562          * dentry */
563         cfs_spin_lock(&ll_lookup_lock);
564         spin_lock(&dcache_lock);
565         lock_dentry(de);
566         __d_drop(de);
567         unlock_dentry(de);
568         d_rehash_cond(de, 0);
569         spin_unlock(&dcache_lock);
570         cfs_spin_unlock(&ll_lookup_lock);
571
572 out:
573         /* We do not free request as it may be reused during following lookup
574          * (see comment in mdc/mdc_locks.c::mdc_intent_lock()), request will
575          * be freed in ll_lookup_it or in ll_intent_release. But if
576          * request was not completed, we need to free it. (bug 5154, 9903) */
577         if (req != NULL && !it_disposition(it, DISP_ENQ_COMPLETE))
578                 ptlrpc_req_finished(req);
579         if (rc == 0) {
580                 ll_unhash_aliases(de->d_inode);
581                 /* done in ll_unhash_aliases()
582                    dentry->d_flags |= DCACHE_LUSTRE_INVALID; */
583         } else {
584                 __u64 bits = 0;
585
586                 CDEBUG(D_DENTRY, "revalidated dentry %.*s (%p) parent %p "
587                        "inode %p refc %d\n", de->d_name.len,
588                        de->d_name.name, de, de->d_parent, de->d_inode,
589                        atomic_read(&de->d_count));
590                 ll_set_lock_data(exp, de->d_inode, it, &bits);
591                 if (de->d_flags & DCACHE_LUSTRE_INVALID &&
592                     bits & MDS_INODELOCK_LOOKUP) {
593                         lock_dentry(de);
594                         de->d_flags &= ~DCACHE_LUSTRE_INVALID;
595                         unlock_dentry(de);
596                 }
597                 ll_lookup_finish_locks(it, de);
598         }
599         RETURN(rc);
600
601         /*
602          * This part is here to combat evil-evil race in real_lookup on 2.6
603          * kernels.  The race details are: We enter do_lookup() looking for some
604          * name, there is nothing in dcache for this name yet and d_lookup()
605          * returns NULL.  We proceed to real_lookup(), and while we do this,
606          * another process does open on the same file we looking up (most simple
607          * reproducer), open succeeds and the dentry is added. Now back to
608          * us. In real_lookup() we do d_lookup() again and suddenly find the
609          * dentry, so we call d_revalidate on it, but there is no lock, so
610          * without this code we would return 0, but unpatched real_lookup just
611          * returns -ENOENT in such a case instead of retrying the lookup. Once
612          * this is dealt with in real_lookup(), all of this ugly mess can go and
613          * we can just check locks in ->d_revalidate without doing any RPCs
614          * ever.
615          */
616 do_lookup:
617         if (it != &lookup_it) {
618                 /* MDS_INODELOCK_UPDATE needed for IT_GETATTR case. */
619                 if (it->it_op == IT_GETATTR)
620                         lookup_it.it_op = IT_GETATTR;
621                 ll_lookup_finish_locks(it, de);
622                 it = &lookup_it;
623         }
624
625         /* Do real lookup here. */
626         op_data = ll_prep_md_op_data(NULL, parent, NULL, de->d_name.name,
627                                      de->d_name.len, 0, (it->it_op & IT_CREAT ?
628                                                          LUSTRE_OPC_CREATE :
629                                                          LUSTRE_OPC_ANY), NULL);
630         if (IS_ERR(op_data))
631                 RETURN(PTR_ERR(op_data));
632
633         rc = md_intent_lock(exp, op_data, NULL, 0,  it, 0, &req,
634                             ll_md_blocking_ast, 0);
635         if (rc >= 0) {
636                 struct mdt_body *mdt_body;
637                 struct lu_fid fid = {.f_seq = 0, .f_oid = 0, .f_ver = 0};
638                 mdt_body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
639
640                 if (de->d_inode)
641                         fid = *ll_inode2fid(de->d_inode);
642
643                 /* see if we got same inode, if not - return error */
644                 if (lu_fid_eq(&fid, &mdt_body->fid1)) {
645                         ll_finish_md_op_data(op_data);
646                         op_data = NULL;
647                         goto revalidate_finish;
648                 }
649                 ll_intent_release(it);
650         }
651         ll_finish_md_op_data(op_data);
652         GOTO(out, rc = 0);
653
654 out_sa:
655         /*
656          * For rc == 1 case, should not return directly to prevent losing
657          * statahead windows; for rc == 0 case, the "lookup" will be done later.
658          */
659         if (it && it->it_op == IT_GETATTR && rc == 1) {
660                 first = ll_statahead_enter(parent, &de, 0);
661                 if (first >= 0)
662                         ll_statahead_exit(parent, de, 1);
663                 else if (first == -EEXIST)
664                         ll_statahead_mark(parent, de);
665         }
666
667         return rc;
668 }
669
670 int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
671 {
672         int rc;
673         ENTRY;
674
675         if (nd && !(nd->flags & (LOOKUP_CONTINUE|LOOKUP_PARENT))) {
676                 struct lookup_intent *it;
677
678                 it = ll_convert_intent(&nd->intent.open, nd->flags);
679                 if (IS_ERR(it))
680                         RETURN(0);
681
682                 if (it->it_op == (IT_OPEN|IT_CREAT) &&
683                     nd->intent.open.flags & O_EXCL) {
684                         CDEBUG(D_VFSTRACE, "create O_EXCL, returning 0\n");
685                         rc = 0;
686                         goto out_it;
687                 }
688
689                 rc = ll_revalidate_it(dentry, nd->flags, it);
690
691                 if (rc && (nd->flags & LOOKUP_OPEN) &&
692                     it_disposition(it, DISP_OPEN_OPEN)) {/*Open*/
693 #ifdef HAVE_FILE_IN_STRUCT_INTENT
694 // XXX Code duplication with ll_lookup_nd
695                         if (S_ISFIFO(dentry->d_inode->i_mode)) {
696                                 // We cannot call open here as it would
697                                 // deadlock.
698                                 ptlrpc_req_finished(
699                                                (struct ptlrpc_request *)
700                                                   it->d.lustre.it_data);
701                         } else {
702 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,17))
703 /* 2.6.1[456] have a bug in open_namei() that forgets to check
704  * nd->intent.open.file for error, so we need to return it as lookup's result
705  * instead */
706                                 struct file *filp;
707
708                                 nd->intent.open.file->private_data = it;
709                                 filp = lookup_instantiate_filp(nd, dentry,NULL);
710                                 if (IS_ERR(filp)) {
711                                         rc = PTR_ERR(filp);
712                                 }
713 #else
714                                 nd->intent.open.file->private_data = it;
715                                 (void)lookup_instantiate_filp(nd, dentry,NULL);
716 #endif
717                         }
718 #else
719                         ll_release_openhandle(dentry, it);
720 #endif /* HAVE_FILE_IN_STRUCT_INTENT */
721                 }
722                 if (!rc && (nd->flags & LOOKUP_CREATE) &&
723                     it_disposition(it, DISP_OPEN_CREATE)) {
724                         /* We created something but we may only return
725                          * negative dentry here, so save request in dentry,
726                          * if lookup will be called later on, it will
727                          * pick the request, otherwise it would be freed
728                          * with dentry */
729                         ll_d2d(dentry)->lld_it = it;
730                         it = NULL; /* avoid freeing */
731                 }
732
733 out_it:
734                 if (it) {
735                         ll_intent_release(it);
736                         OBD_FREE(it, sizeof(*it));
737                 }
738         } else {
739                 rc = ll_revalidate_it(dentry, 0, NULL);
740         }
741
742         RETURN(rc);
743 }
744
745 void ll_d_iput(struct dentry *de, struct inode *inode)
746 {
747         LASSERT(inode);
748         if (!find_cbdata(inode))
749                 inode->i_nlink = 0;
750         iput(inode);
751 }
752
753 struct dentry_operations ll_d_ops = {
754         .d_revalidate = ll_revalidate_nd,
755         .d_release = ll_release,
756         .d_delete  = ll_ddelete,
757         .d_iput    = ll_d_iput,
758         .d_compare = ll_dcompare,
759 };