Whamcloud - gitweb
b=20433 we should recycle dentries and inodes if only cancelling locks existing
[fs/lustre-release.git] / lustre / llite / dcache.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #include <linux/fs.h>
38 #include <linux/sched.h>
39 #include <linux/smp_lock.h>
40 #include <linux/quotaops.h>
41
42 #define DEBUG_SUBSYSTEM S_LLITE
43
44 #include <obd_support.h>
45 #include <lustre_lite.h>
46 #include <lustre/lustre_idl.h>
47 #include <lustre_dlm.h>
48 #include <lustre_mdc.h>
49 //#include <lustre_ver.h>
50 //#include <lustre_version.h>
51
52 #include "llite_internal.h"
53
54 cfs_spinlock_t ll_lookup_lock = CFS_SPIN_LOCK_UNLOCKED;
55
56 /* should NOT be called with the dcache lock, see fs/dcache.c */
57 static void ll_release(struct dentry *de)
58 {
59         struct ll_dentry_data *lld;
60         ENTRY;
61         LASSERT(de != NULL);
62         lld = ll_d2d(de);
63         if (lld == NULL) { /* NFS copies the de->d_op methods (bug 4655) */
64                 EXIT;
65                 return;
66         }
67 #ifndef HAVE_VFS_INTENT_PATCHES
68         if (lld->lld_it) {
69                 ll_intent_release(lld->lld_it);
70                 OBD_FREE(lld->lld_it, sizeof(*lld->lld_it));
71         }
72 #endif
73         LASSERT(lld->lld_cwd_count == 0);
74         LASSERT(lld->lld_mnt_count == 0);
75         OBD_FREE(de->d_fsdata, sizeof(*lld));
76
77         EXIT;
78 }
79
80 /* Compare if two dentries are the same.  Don't match if the existing dentry
81  * is marked DCACHE_LUSTRE_INVALID.  Returns 1 if different, 0 if the same.
82  *
83  * This avoids a race where ll_lookup_it() instantiates a dentry, but we get
84  * an AST before calling d_revalidate_it().  The dentry still exists (marked
85  * INVALID) so d_lookup() matches it, but we have no lock on it (so
86  * lock_match() fails) and we spin around real_lookup(). */
87 int ll_dcompare(struct dentry *parent, struct qstr *d_name, struct qstr *name)
88 {
89         struct dentry *dchild;
90         ENTRY;
91
92         if (d_name->len != name->len)
93                 RETURN(1);
94
95         if (memcmp(d_name->name, name->name, name->len))
96                 RETURN(1);
97
98         /* XXX: d_name must be in-dentry structure */
99         dchild = container_of(d_name, struct dentry, d_name); /* ugh */
100
101         CDEBUG(D_DENTRY,"found name %.*s(%p) - flags %d/%x - refc %d\n",
102                name->len, name->name, dchild,
103                d_mountpoint(dchild), dchild->d_flags & DCACHE_LUSTRE_INVALID,
104                atomic_read(&dchild->d_count));
105
106          /* mountpoint is always valid */
107         if (d_mountpoint(dchild))
108                 RETURN(0);
109
110         if (dchild->d_flags & DCACHE_LUSTRE_INVALID)
111                 RETURN(1);
112
113
114         RETURN(0);
115 }
116
117 static inline int return_if_equal(struct ldlm_lock *lock, void *data)
118 {
119         if (lock->l_flags & LDLM_FL_CANCELING)
120                 return LDLM_ITER_CONTINUE;
121         return LDLM_ITER_STOP;
122 }
123
124 /* find any ldlm lock of the inode in mdc and lov
125  * return 0    not find
126  *        1    find one
127  *      < 0    error */
128 static int find_cbdata(struct inode *inode)
129 {
130         struct ll_inode_info *lli = ll_i2info(inode);
131         struct ll_sb_info *sbi = ll_i2sbi(inode);
132         int rc = 0;
133         ENTRY;
134
135         LASSERT(inode);
136         rc = md_find_cbdata(sbi->ll_md_exp, ll_inode2fid(inode),
137                             return_if_equal, NULL);
138         if (rc != 0)
139                  RETURN(rc);
140
141         if (lli->lli_smd)
142                 rc = obd_find_cbdata(sbi->ll_dt_exp, lli->lli_smd,
143                                      return_if_equal, NULL);
144
145         RETURN(rc);
146 }
147
148 /* should NOT be called with the dcache lock, see fs/dcache.c */
149 static int ll_ddelete(struct dentry *de)
150 {
151         ENTRY;
152         LASSERT(de);
153
154         CDEBUG(D_DENTRY, "%s dentry %.*s (%p, parent %p, inode %p) %s%s\n",
155                (de->d_flags & DCACHE_LUSTRE_INVALID ? "deleting" : "keeping"),
156                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
157                d_unhashed(de) ? "" : "hashed,",
158                list_empty(&de->d_subdirs) ? "" : "subdirs");
159
160         /* if not ldlm lock for this inode, set i_nlink to 0 so that
161          * this inode can be recycled later b=20433 */
162         LASSERT(atomic_read(&de->d_count) == 0);
163         if (de->d_inode && !find_cbdata(de->d_inode))
164                 de->d_inode->i_nlink = 0;
165
166         if (de->d_flags & DCACHE_LUSTRE_INVALID)
167                 RETURN(1);
168
169         RETURN(0);
170 }
171
172 int ll_set_dd(struct dentry *de)
173 {
174         ENTRY;
175         LASSERT(de != NULL);
176
177         CDEBUG(D_DENTRY, "ldd on dentry %.*s (%p) parent %p inode %p refc %d\n",
178                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
179                atomic_read(&de->d_count));
180
181         if (de->d_fsdata == NULL) {
182                 struct ll_dentry_data *lld;
183
184                 OBD_ALLOC_PTR(lld);
185                 if (likely(lld != NULL)) {
186                         CFS_INIT_LIST_HEAD(&lld->lld_sa_alias);
187                         lock_dentry(de);
188                         if (likely(de->d_fsdata == NULL))
189                                 de->d_fsdata = lld;
190                         else
191                                 OBD_FREE_PTR(lld);
192                         unlock_dentry(de);
193                 } else {
194                         RETURN(-ENOMEM);
195                 }
196         }
197
198         RETURN(0);
199 }
200
201 void ll_intent_drop_lock(struct lookup_intent *it)
202 {
203         struct lustre_handle *handle;
204
205         if (it->it_op && it->d.lustre.it_lock_mode) {
206                 handle = (struct lustre_handle *)&it->d.lustre.it_lock_handle;
207                 CDEBUG(D_DLMTRACE, "releasing lock with cookie "LPX64
208                        " from it %p\n", handle->cookie, it);
209                 ldlm_lock_decref(handle, it->d.lustre.it_lock_mode);
210
211                 /* bug 494: intent_release may be called multiple times, from
212                  * this thread and we don't want to double-decref this lock */
213                 it->d.lustre.it_lock_mode = 0;
214         }
215 }
216
217 void ll_intent_release(struct lookup_intent *it)
218 {
219         ENTRY;
220
221         CDEBUG(D_INFO, "intent %p released\n", it);
222         ll_intent_drop_lock(it);
223 #ifdef HAVE_VFS_INTENT_PATCHES
224         it->it_magic = 0;
225         it->it_op_release = 0;
226 #endif
227         /* We are still holding extra reference on a request, need to free it */
228         if (it_disposition(it, DISP_ENQ_OPEN_REF))
229                  ptlrpc_req_finished(it->d.lustre.it_data); /* ll_file_open */
230         if (it_disposition(it, DISP_ENQ_CREATE_REF)) /* create rec */
231                 ptlrpc_req_finished(it->d.lustre.it_data);
232         if (it_disposition(it, DISP_ENQ_COMPLETE)) /* saved req from revalidate
233                                                     * to lookup */
234                 ptlrpc_req_finished(it->d.lustre.it_data);
235
236         it->d.lustre.it_disposition = 0;
237         it->d.lustre.it_data = NULL;
238         EXIT;
239 }
240
241 /* Drop dentry if it is not used already, unhash otherwise.
242    Should be called with dcache lock held!
243    Returns: 1 if dentry was dropped, 0 if unhashed. */
244 int ll_drop_dentry(struct dentry *dentry)
245 {
246         lock_dentry(dentry);
247         if (atomic_read(&dentry->d_count) == 0) {
248                 CDEBUG(D_DENTRY, "deleting dentry %.*s (%p) parent %p "
249                        "inode %p\n", dentry->d_name.len,
250                        dentry->d_name.name, dentry, dentry->d_parent,
251                        dentry->d_inode);
252                 dget_locked(dentry);
253                 __d_drop(dentry);
254                 unlock_dentry(dentry);
255                 spin_unlock(&dcache_lock);
256                 cfs_spin_unlock(&ll_lookup_lock);
257                 dput(dentry);
258                 cfs_spin_lock(&ll_lookup_lock);
259                 spin_lock(&dcache_lock);
260                 return 1;
261         }
262         /* disconected dentry can not be find without lookup, because we
263          * not need his to unhash or mark invalid. */
264         if (dentry->d_flags & DCACHE_DISCONNECTED) {
265                 unlock_dentry(dentry);
266                 RETURN (0);
267         }
268
269         if (!(dentry->d_flags & DCACHE_LUSTRE_INVALID)) {
270                 CDEBUG(D_DENTRY, "unhashing dentry %.*s (%p) parent %p "
271                        "inode %p refc %d\n", dentry->d_name.len,
272                        dentry->d_name.name, dentry, dentry->d_parent,
273                        dentry->d_inode, atomic_read(&dentry->d_count));
274                 /* actually we don't unhash the dentry, rather just
275                  * mark it inaccessible for to __d_lookup(). otherwise
276                  * sys_getcwd() could return -ENOENT -bzzz */
277                 dentry->d_flags |= DCACHE_LUSTRE_INVALID;
278                 if (!dentry->d_inode || !S_ISDIR(dentry->d_inode->i_mode))
279                         __d_drop(dentry);
280         }
281         unlock_dentry(dentry);
282         return 0;
283 }
284
285 void ll_unhash_aliases(struct inode *inode)
286 {
287         struct list_head *tmp, *head;
288         ENTRY;
289
290         if (inode == NULL) {
291                 CERROR("unexpected NULL inode, tell phil\n");
292                 return;
293         }
294
295         CDEBUG(D_INODE, "marking dentries for ino %lu/%u(%p) invalid\n",
296                inode->i_ino, inode->i_generation, inode);
297
298         head = &inode->i_dentry;
299         cfs_spin_lock(&ll_lookup_lock);
300         spin_lock(&dcache_lock);
301 restart:
302         tmp = head;
303         while ((tmp = tmp->next) != head) {
304                 struct dentry *dentry = list_entry(tmp, struct dentry, d_alias);
305
306                 CDEBUG(D_DENTRY, "dentry in drop %.*s (%p) parent %p "
307                        "inode %p flags %d\n", dentry->d_name.len,
308                        dentry->d_name.name, dentry, dentry->d_parent,
309                        dentry->d_inode, dentry->d_flags);
310
311                 if (dentry->d_name.len == 1 && dentry->d_name.name[0] == '/') {
312                         CERROR("called on root (?) dentry=%p, inode=%p "
313                                "ino=%lu\n", dentry, inode, inode->i_ino);
314                         lustre_dump_dentry(dentry, 1);
315                         libcfs_debug_dumpstack(NULL);
316                 }
317
318                 if (ll_drop_dentry(dentry))
319                           goto restart;
320         }
321         spin_unlock(&dcache_lock);
322         cfs_spin_unlock(&ll_lookup_lock);
323
324         EXIT;
325 }
326
327 int ll_revalidate_it_finish(struct ptlrpc_request *request,
328                             struct lookup_intent *it,
329                             struct dentry *de)
330 {
331         int rc = 0;
332         ENTRY;
333
334         if (!request)
335                 RETURN(0);
336
337         if (it_disposition(it, DISP_LOOKUP_NEG))
338                 RETURN(-ENOENT);
339
340         rc = ll_prep_inode(&de->d_inode, request, NULL);
341
342         RETURN(rc);
343 }
344
345 void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry)
346 {
347         LASSERT(it != NULL);
348         LASSERT(dentry != NULL);
349
350         if (it->d.lustre.it_lock_mode && dentry->d_inode != NULL) {
351                 struct inode *inode = dentry->d_inode;
352                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
353
354                 CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%u)\n",
355                        inode, inode->i_ino, inode->i_generation);
356                 md_set_lock_data(sbi->ll_md_exp, &it->d.lustre.it_lock_handle,
357                                  inode, NULL);
358         }
359
360         /* drop lookup or getattr locks immediately */
361         if (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR) {
362                 /* on 2.6 there are situation when several lookups and
363                  * revalidations may be requested during single operation.
364                  * therefore, we don't release intent here -bzzz */
365                 ll_intent_drop_lock(it);
366         }
367 }
368
369 void ll_frob_intent(struct lookup_intent **itp, struct lookup_intent *deft)
370 {
371         struct lookup_intent *it = *itp;
372 #ifdef HAVE_VFS_INTENT_PATCHES
373         if (it) {
374                 LASSERTF(it->it_magic == INTENT_MAGIC,
375                          "%p has bad intent magic: %x\n",
376                          it, it->it_magic);
377         }
378 #endif
379
380         if (!it || it->it_op == IT_GETXATTR)
381                 it = *itp = deft;
382
383 #ifdef HAVE_VFS_INTENT_PATCHES
384         it->it_op_release = ll_intent_release;
385 #endif
386 }
387
388 int ll_revalidate_it(struct dentry *de, int lookup_flags,
389                      struct lookup_intent *it)
390 {
391         struct md_op_data *op_data;
392         struct ptlrpc_request *req = NULL;
393         struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
394         struct obd_export *exp;
395         struct inode *parent = de->d_parent->d_inode;
396         int rc, first = 0;
397
398         ENTRY;
399         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,intent=%s\n", de->d_name.name,
400                LL_IT2STR(it));
401
402         if (de->d_inode == NULL) {
403                 /* We can only use negative dentries if this is stat or lookup,
404                    for opens and stuff we do need to query server. */
405                 /* If there is IT_CREAT in intent op set, then we must throw
406                    away this negative dentry and actually do the request to
407                    kernel to create whatever needs to be created (if possible)*/
408                 if (it && (it->it_op & IT_CREAT))
409                         RETURN(0);
410
411                 if (de->d_flags & DCACHE_LUSTRE_INVALID)
412                         RETURN(0);
413
414                 rc = ll_have_md_lock(parent, MDS_INODELOCK_UPDATE);
415                 GOTO(out_sa, rc);
416         }
417
418         /* Never execute intents for mount points.
419          * Attributes will be fixed up in ll_inode_revalidate_it */
420         if (d_mountpoint(de))
421                 GOTO(out_sa, rc = 1);
422
423         /* need to get attributes in case root got changed from other client */
424         if (de == de->d_sb->s_root) {
425                 rc = __ll_inode_revalidate_it(de, it, MDS_INODELOCK_LOOKUP);
426                 if (rc == 0)
427                         rc = 1;
428                 GOTO(out_sa, rc);
429         }
430
431         exp = ll_i2mdexp(de->d_inode);
432
433         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_REVALIDATE_PAUSE, 5);
434         ll_frob_intent(&it, &lookup_it);
435         LASSERT(it);
436
437         if (it->it_op == IT_LOOKUP && !(de->d_flags & DCACHE_LUSTRE_INVALID))
438                 GOTO(out_sa, rc = 1);
439
440         op_data = ll_prep_md_op_data(NULL, parent, de->d_inode,
441                                      de->d_name.name, de->d_name.len,
442                                      0, LUSTRE_OPC_ANY, NULL);
443         if (IS_ERR(op_data))
444                 RETURN(PTR_ERR(op_data));
445
446         if ((it->it_op == IT_OPEN) && de->d_inode) {
447                 struct inode *inode = de->d_inode;
448                 struct ll_inode_info *lli = ll_i2info(inode);
449                 struct obd_client_handle **och_p;
450                 __u64 *och_usecount;
451
452                 /*
453                  * We used to check for MDS_INODELOCK_OPEN here, but in fact
454                  * just having LOOKUP lock is enough to justify inode is the
455                  * same. And if inode is the same and we have suitable
456                  * openhandle, then there is no point in doing another OPEN RPC
457                  * just to throw away newly received openhandle.  There are no
458                  * security implications too, if file owner or access mode is
459                  * change, LOOKUP lock is revoked.
460                  */
461
462
463                 if (it->it_flags & FMODE_WRITE) {
464                         och_p = &lli->lli_mds_write_och;
465                         och_usecount = &lli->lli_open_fd_write_count;
466                 } else if (it->it_flags & FMODE_EXEC) {
467                         och_p = &lli->lli_mds_exec_och;
468                         och_usecount = &lli->lli_open_fd_exec_count;
469                 } else {
470                         och_p = &lli->lli_mds_read_och;
471                         och_usecount = &lli->lli_open_fd_read_count;
472                 }
473                 /* Check for the proper lock. */
474                 if (!ll_have_md_lock(inode, MDS_INODELOCK_LOOKUP))
475                         goto do_lock;
476                 cfs_down(&lli->lli_och_sem);
477                 if (*och_p) { /* Everything is open already, do nothing */
478                         /*(*och_usecount)++;  Do not let them steal our open
479                           handle from under us */
480                         /* XXX The code above was my original idea, but in case
481                            we have the handle, but we cannot use it due to later
482                            checks (e.g. O_CREAT|O_EXCL flags set), nobody
483                            would decrement counter increased here. So we just
484                            hope the lock won't be invalidated in between. But
485                            if it would be, we'll reopen the open request to
486                            MDS later during file open path */
487                         cfs_up(&lli->lli_och_sem);
488                         ll_finish_md_op_data(op_data);
489                         RETURN(1);
490                 } else {
491                         cfs_up(&lli->lli_och_sem);
492                 }
493         }
494
495         if (it->it_op == IT_GETATTR) {
496                 first = ll_statahead_enter(parent, &de, 0);
497                 if (first == 1) {
498                         ll_statahead_exit(parent, de, 1);
499                         ll_finish_md_op_data(op_data);
500                         GOTO(out, rc = 1);
501                 }
502         }
503
504 do_lock:
505         it->it_create_mode &= ~current->fs->umask;
506         it->it_create_mode |= M_CHECK_STALE;
507         rc = md_intent_lock(exp, op_data, NULL, 0, it,
508                             lookup_flags,
509                             &req, ll_md_blocking_ast, 0);
510         it->it_create_mode &= ~M_CHECK_STALE;
511         ll_finish_md_op_data(op_data);
512         if (it->it_op == IT_GETATTR && !first)
513                 /* If there are too many locks on client-side, then some
514                  * locks taken by statahead maybe dropped automatically
515                  * before the real "revalidate" using them. */
516                 ll_statahead_exit(parent, de, req == NULL ? rc : 0);
517         else if (first == -EEXIST)
518                 ll_statahead_mark(parent, de);
519
520         /* If req is NULL, then md_intent_lock only tried to do a lock match;
521          * if all was well, it will return 1 if it found locks, 0 otherwise. */
522         if (req == NULL && rc >= 0) {
523                 if (!rc)
524                         goto do_lookup;
525                 GOTO(out, rc);
526         }
527
528         if (rc < 0) {
529                 if (rc != -ESTALE) {
530                         CDEBUG(D_INFO, "ll_intent_lock: rc %d : it->it_status "
531                                "%d\n", rc, it->d.lustre.it_status);
532                 }
533                 GOTO(out, rc = 0);
534         }
535
536 revalidate_finish:
537         rc = ll_revalidate_it_finish(req, it, de);
538         if (rc != 0) {
539                 if (rc != -ESTALE && rc != -ENOENT)
540                         ll_intent_release(it);
541                 GOTO(out, rc = 0);
542         }
543
544         if ((it->it_op & IT_OPEN) && de->d_inode &&
545             !S_ISREG(de->d_inode->i_mode) &&
546             !S_ISDIR(de->d_inode->i_mode)) {
547                 ll_release_openhandle(de, it);
548         }
549         rc = 1;
550
551         /* unfortunately ll_intent_lock may cause a callback and revoke our
552          * dentry */
553         cfs_spin_lock(&ll_lookup_lock);
554         spin_lock(&dcache_lock);
555         lock_dentry(de);
556         __d_drop(de);
557         unlock_dentry(de);
558         d_rehash_cond(de, 0);
559         spin_unlock(&dcache_lock);
560         cfs_spin_unlock(&ll_lookup_lock);
561
562 out:
563         /* We do not free request as it may be reused during following lookup
564          * (see comment in mdc/mdc_locks.c::mdc_intent_lock()), request will
565          * be freed in ll_lookup_it or in ll_intent_release. But if
566          * request was not completed, we need to free it. (bug 5154, 9903) */
567         if (req != NULL && !it_disposition(it, DISP_ENQ_COMPLETE))
568                 ptlrpc_req_finished(req);
569         if (rc == 0) {
570                 ll_unhash_aliases(de->d_inode);
571                 /* done in ll_unhash_aliases()
572                    dentry->d_flags |= DCACHE_LUSTRE_INVALID; */
573         } else {
574                 CDEBUG(D_DENTRY, "revalidated dentry %.*s (%p) parent %p "
575                        "inode %p refc %d\n", de->d_name.len,
576                        de->d_name.name, de, de->d_parent, de->d_inode,
577                        atomic_read(&de->d_count));
578                 if (first != 1) {
579                         if (de->d_flags & DCACHE_LUSTRE_INVALID) {
580                                 lock_dentry(de);
581                                 de->d_flags &= ~DCACHE_LUSTRE_INVALID;
582                                 unlock_dentry(de);
583                         }
584                         ll_lookup_finish_locks(it, de);
585                 }
586         }
587         RETURN(rc);
588
589         /*
590          * This part is here to combat evil-evil race in real_lookup on 2.6
591          * kernels.  The race details are: We enter do_lookup() looking for some
592          * name, there is nothing in dcache for this name yet and d_lookup()
593          * returns NULL.  We proceed to real_lookup(), and while we do this,
594          * another process does open on the same file we looking up (most simple
595          * reproducer), open succeeds and the dentry is added. Now back to
596          * us. In real_lookup() we do d_lookup() again and suddenly find the
597          * dentry, so we call d_revalidate on it, but there is no lock, so
598          * without this code we would return 0, but unpatched real_lookup just
599          * returns -ENOENT in such a case instead of retrying the lookup. Once
600          * this is dealt with in real_lookup(), all of this ugly mess can go and
601          * we can just check locks in ->d_revalidate without doing any RPCs
602          * ever.
603          */
604 do_lookup:
605         if (it != &lookup_it) {
606                 /* MDS_INODELOCK_UPDATE needed for IT_GETATTR case. */
607                 if (it->it_op == IT_GETATTR)
608                         lookup_it.it_op = IT_GETATTR;
609                 ll_lookup_finish_locks(it, de);
610                 it = &lookup_it;
611         }
612
613         /* Do real lookup here. */
614         op_data = ll_prep_md_op_data(NULL, parent, NULL, de->d_name.name,
615                                      de->d_name.len, 0, (it->it_op & IT_CREAT ?
616                                                          LUSTRE_OPC_CREATE :
617                                                          LUSTRE_OPC_ANY), NULL);
618         if (IS_ERR(op_data))
619                 RETURN(PTR_ERR(op_data));
620
621         rc = md_intent_lock(exp, op_data, NULL, 0,  it, 0, &req,
622                             ll_md_blocking_ast, 0);
623         if (rc >= 0) {
624                 struct mdt_body *mdt_body;
625                 struct lu_fid fid = {.f_seq = 0, .f_oid = 0, .f_ver = 0};
626                 mdt_body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
627
628                 if (de->d_inode)
629                         fid = *ll_inode2fid(de->d_inode);
630
631                 /* see if we got same inode, if not - return error */
632                 if (lu_fid_eq(&fid, &mdt_body->fid1)) {
633                         ll_finish_md_op_data(op_data);
634                         op_data = NULL;
635                         goto revalidate_finish;
636                 }
637                 ll_intent_release(it);
638         }
639         ll_finish_md_op_data(op_data);
640         GOTO(out, rc = 0);
641
642 out_sa:
643         /*
644          * For rc == 1 case, should not return directly to prevent losing
645          * statahead windows; for rc == 0 case, the "lookup" will be done later.
646          */
647         if (it && it->it_op == IT_GETATTR && rc == 1) {
648                 first = ll_statahead_enter(parent, &de, 0);
649                 if (first >= 0)
650                         ll_statahead_exit(parent, de, 1);
651                 else if (first == -EEXIST)
652                         ll_statahead_mark(parent, de);
653         }
654
655         return rc;
656 }
657
658 #if 0
659 static void ll_pin(struct dentry *de, struct vfsmount *mnt, int flag)
660 {
661         struct inode *inode= de->d_inode;
662         struct ll_sb_info *sbi = ll_i2sbi(inode);
663         struct ll_dentry_data *ldd = ll_d2d(de);
664         struct obd_client_handle *handle;
665         struct obd_capa *oc;
666         int rc = 0;
667         ENTRY;
668         LASSERT(ldd);
669
670         cfs_lock_kernel();
671         /* Strictly speaking this introduces an additional race: the
672          * increments should wait until the rpc has returned.
673          * However, given that at present the function is void, this
674          * issue is moot. */
675         if (flag == 1 && (++ldd->lld_mnt_count) > 1) {
676                 cfs_unlock_kernel();
677                 EXIT;
678                 return;
679         }
680
681         if (flag == 0 && (++ldd->lld_cwd_count) > 1) {
682                 cfs_unlock_kernel();
683                 EXIT;
684                 return;
685         }
686         cfs_unlock_kernel();
687
688         handle = (flag) ? &ldd->lld_mnt_och : &ldd->lld_cwd_och;
689         oc = ll_mdscapa_get(inode);
690         rc = obd_pin(sbi->ll_md_exp, ll_inode2fid(inode), oc, handle, flag);
691         capa_put(oc);
692         if (rc) {
693                 cfs_lock_kernel();
694                 memset(handle, 0, sizeof(*handle));
695                 if (flag == 0)
696                         ldd->lld_cwd_count--;
697                 else
698                         ldd->lld_mnt_count--;
699                 cfs_unlock_kernel();
700         }
701
702         EXIT;
703         return;
704 }
705
706 static void ll_unpin(struct dentry *de, struct vfsmount *mnt, int flag)
707 {
708         struct ll_sb_info *sbi = ll_i2sbi(de->d_inode);
709         struct ll_dentry_data *ldd = ll_d2d(de);
710         struct obd_client_handle handle;
711         int count, rc = 0;
712         ENTRY;
713         LASSERT(ldd);
714
715         cfs_lock_kernel();
716         /* Strictly speaking this introduces an additional race: the
717          * increments should wait until the rpc has returned.
718          * However, given that at present the function is void, this
719          * issue is moot. */
720         handle = (flag) ? ldd->lld_mnt_och : ldd->lld_cwd_och;
721         if (handle.och_magic != OBD_CLIENT_HANDLE_MAGIC) {
722                 /* the "pin" failed */
723                 cfs_unlock_kernel();
724                 EXIT;
725                 return;
726         }
727
728         if (flag)
729                 count = --ldd->lld_mnt_count;
730         else
731                 count = --ldd->lld_cwd_count;
732         cfs_unlock_kernel();
733
734         if (count != 0) {
735                 EXIT;
736                 return;
737         }
738
739         rc = obd_unpin(sbi->ll_md_exp, &handle, flag);
740         EXIT;
741         return;
742 }
743 #endif
744
745 #ifdef HAVE_VFS_INTENT_PATCHES
746 int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
747 {
748         int rc;
749         ENTRY;
750
751         if (nd && nd->flags & LOOKUP_LAST && !(nd->flags & LOOKUP_LINK_NOTLAST))
752                 rc = ll_revalidate_it(dentry, nd->flags, &nd->intent);
753         else
754                 rc = ll_revalidate_it(dentry, 0, NULL);
755
756         RETURN(rc);
757 }
758 #else
759 int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
760 {
761         int rc;
762         ENTRY;
763
764         if (nd && !(nd->flags & (LOOKUP_CONTINUE|LOOKUP_PARENT))) {
765                 struct lookup_intent *it;
766                 it = ll_convert_intent(&nd->intent.open, nd->flags);
767                 if (IS_ERR(it))
768                         RETURN(0);
769                 if (it->it_op == (IT_OPEN|IT_CREAT))
770                         if (nd->intent.open.flags & O_EXCL) {
771                                 CDEBUG(D_VFSTRACE, "create O_EXCL, returning 0\n");
772                                 rc = 0;
773                                 goto out_it;
774                         }
775
776                 rc = ll_revalidate_it(dentry, nd->flags, it);
777
778                 if (rc && (nd->flags & LOOKUP_OPEN) &&
779                     it_disposition(it, DISP_OPEN_OPEN)) {/*Open*/
780 #ifdef HAVE_FILE_IN_STRUCT_INTENT
781 // XXX Code duplication with ll_lookup_nd
782                         if (S_ISFIFO(dentry->d_inode->i_mode)) {
783                                 // We cannot call open here as it would
784                                 // deadlock.
785                                 ptlrpc_req_finished(
786                                                (struct ptlrpc_request *)
787                                                   it->d.lustre.it_data);
788                         } else {
789 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,17))
790 /* 2.6.1[456] have a bug in open_namei() that forgets to check
791  * nd->intent.open.file for error, so we need to return it as lookup's result
792  * instead */
793                                 struct file *filp;
794
795                                 nd->intent.open.file->private_data = it;
796                                 filp = lookup_instantiate_filp(nd, dentry,NULL);
797                                 if (IS_ERR(filp)) {
798                                         rc = PTR_ERR(filp);
799                                 }
800 #else
801                                 nd->intent.open.file->private_data = it;
802                                 (void)lookup_instantiate_filp(nd, dentry,NULL);
803 #endif
804                         }
805 #else
806                         ll_release_openhandle(dentry, it);
807 #endif /* HAVE_FILE_IN_STRUCT_INTENT */
808                 }
809                 if (!rc && (nd->flags & LOOKUP_CREATE) &&
810                     it_disposition(it, DISP_OPEN_CREATE)) {
811                         /* We created something but we may only return
812                          * negative dentry here, so save request in dentry,
813                          * if lookup will be called later on, it will
814                          * pick the request, otherwise it would be freed
815                          * with dentry */
816                         ll_d2d(dentry)->lld_it = it;
817                         it = NULL; /* avoid freeing */
818                 }
819
820 out_it:
821                 if (it) {
822                         ll_intent_release(it);
823                         OBD_FREE(it, sizeof(*it));
824                 }
825         } else {
826                 rc = ll_revalidate_it(dentry, 0, NULL);
827         }
828
829         RETURN(rc);
830 }
831 #endif
832
833 void ll_d_iput(struct dentry *de, struct inode *inode)
834 {
835         LASSERT(inode);
836         if (!find_cbdata(inode))
837                 inode->i_nlink = 0;
838         iput(inode);
839 }
840
841 struct dentry_operations ll_d_ops = {
842         .d_revalidate = ll_revalidate_nd,
843         .d_release = ll_release,
844         .d_delete  = ll_ddelete,
845         .d_iput    = ll_d_iput,
846         .d_compare = ll_dcompare,
847 #if 0
848         .d_pin = ll_pin,
849         .d_unpin = ll_unpin,
850 #endif
851 };