Whamcloud - gitweb
52c7e82f1e683e0587049fe170e7d07c7c8a75ed
[fs/lustre-release.git] / lustre / llite / dcache.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #include <linux/fs.h>
38 #include <linux/sched.h>
39 #include <linux/smp_lock.h>
40 #include <linux/quotaops.h>
41
42 #define DEBUG_SUBSYSTEM S_LLITE
43
44 #include <obd_support.h>
45 #include <lustre_lite.h>
46 #include <lustre/lustre_idl.h>
47 #include <lustre_dlm.h>
48 #include <lustre_mdc.h>
49 //#include <lustre_ver.h>
50 //#include <lustre_version.h>
51
52 #include "llite_internal.h"
53
54 cfs_spinlock_t ll_lookup_lock = CFS_SPIN_LOCK_UNLOCKED;
55
56 /* should NOT be called with the dcache lock, see fs/dcache.c */
57 static void ll_release(struct dentry *de)
58 {
59         struct ll_dentry_data *lld;
60         ENTRY;
61         LASSERT(de != NULL);
62         lld = ll_d2d(de);
63         if (lld == NULL) { /* NFS copies the de->d_op methods (bug 4655) */
64                 EXIT;
65                 return;
66         }
67 #ifndef HAVE_VFS_INTENT_PATCHES
68         if (lld->lld_it) {
69                 ll_intent_release(lld->lld_it);
70                 OBD_FREE(lld->lld_it, sizeof(*lld->lld_it));
71         }
72 #endif
73         LASSERT(lld->lld_cwd_count == 0);
74         LASSERT(lld->lld_mnt_count == 0);
75         OBD_FREE(de->d_fsdata, sizeof(*lld));
76
77         EXIT;
78 }
79
80 /* Compare if two dentries are the same.  Don't match if the existing dentry
81  * is marked DCACHE_LUSTRE_INVALID.  Returns 1 if different, 0 if the same.
82  *
83  * This avoids a race where ll_lookup_it() instantiates a dentry, but we get
84  * an AST before calling d_revalidate_it().  The dentry still exists (marked
85  * INVALID) so d_lookup() matches it, but we have no lock on it (so
86  * lock_match() fails) and we spin around real_lookup(). */
87 int ll_dcompare(struct dentry *parent, struct qstr *d_name, struct qstr *name)
88 {
89         struct dentry *dchild;
90         ENTRY;
91
92         if (d_name->len != name->len)
93                 RETURN(1);
94
95         if (memcmp(d_name->name, name->name, name->len))
96                 RETURN(1);
97
98         /* XXX: d_name must be in-dentry structure */
99         dchild = container_of(d_name, struct dentry, d_name); /* ugh */
100
101         CDEBUG(D_DENTRY,"found name %.*s(%p) - flags %d/%x - refc %d\n",
102                name->len, name->name, dchild,
103                d_mountpoint(dchild), dchild->d_flags & DCACHE_LUSTRE_INVALID,
104                atomic_read(&dchild->d_count));
105
106          /* mountpoint is always valid */
107         if (d_mountpoint(dchild))
108                 RETURN(0);
109
110         if (dchild->d_flags & DCACHE_LUSTRE_INVALID)
111                 RETURN(1);
112
113
114         RETURN(0);
115 }
116
117 static inline int return_if_equal(struct ldlm_lock *lock, void *data)
118 {
119         return LDLM_ITER_STOP;
120 }
121
122 /* find any ldlm lock of the inode in mdc and lov
123  * return 0    not find
124  *        1    find one
125  *      < 0    error */
126 static int find_cbdata(struct inode *inode)
127 {
128         struct ll_inode_info *lli = ll_i2info(inode);
129         struct ll_sb_info *sbi = ll_i2sbi(inode);
130         int rc = 0;
131         ENTRY;
132
133         LASSERT(inode);
134         rc = md_find_cbdata(sbi->ll_md_exp, ll_inode2fid(inode),
135                             return_if_equal, NULL);
136         if (rc != 0)
137                  RETURN(rc);
138
139         if (lli->lli_smd)
140                 rc = obd_find_cbdata(sbi->ll_dt_exp, lli->lli_smd,
141                                      return_if_equal, NULL);
142
143         RETURN(rc);
144 }
145
146 /* should NOT be called with the dcache lock, see fs/dcache.c */
147 static int ll_ddelete(struct dentry *de)
148 {
149         ENTRY;
150         LASSERT(de);
151
152         CDEBUG(D_DENTRY, "%s dentry %.*s (%p, parent %p, inode %p) %s%s\n",
153                (de->d_flags & DCACHE_LUSTRE_INVALID ? "deleting" : "keeping"),
154                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
155                d_unhashed(de) ? "" : "hashed,",
156                list_empty(&de->d_subdirs) ? "" : "subdirs");
157
158         /* if not ldlm lock for this inode, set i_nlink to 0 so that
159          * this inode can be recycled later b=20433 */
160         LASSERT(atomic_read(&de->d_count) == 0);
161         if (de->d_inode && !find_cbdata(de->d_inode))
162                 de->d_inode->i_nlink = 0;
163
164         if (de->d_flags & DCACHE_LUSTRE_INVALID)
165                 RETURN(1);
166
167         RETURN(0);
168 }
169
170 int ll_set_dd(struct dentry *de)
171 {
172         ENTRY;
173         LASSERT(de != NULL);
174
175         CDEBUG(D_DENTRY, "ldd on dentry %.*s (%p) parent %p inode %p refc %d\n",
176                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
177                atomic_read(&de->d_count));
178
179         if (de->d_fsdata == NULL) {
180                 struct ll_dentry_data *lld;
181
182                 OBD_ALLOC_PTR(lld);
183                 if (likely(lld != NULL)) {
184                         CFS_INIT_LIST_HEAD(&lld->lld_sa_alias);
185                         lock_dentry(de);
186                         if (likely(de->d_fsdata == NULL))
187                                 de->d_fsdata = lld;
188                         else
189                                 OBD_FREE_PTR(lld);
190                         unlock_dentry(de);
191                 } else {
192                         RETURN(-ENOMEM);
193                 }
194         }
195
196         RETURN(0);
197 }
198
199 void ll_intent_drop_lock(struct lookup_intent *it)
200 {
201         struct lustre_handle *handle;
202
203         if (it->it_op && it->d.lustre.it_lock_mode) {
204                 handle = (struct lustre_handle *)&it->d.lustre.it_lock_handle;
205                 CDEBUG(D_DLMTRACE, "releasing lock with cookie "LPX64
206                        " from it %p\n", handle->cookie, it);
207                 ldlm_lock_decref(handle, it->d.lustre.it_lock_mode);
208
209                 /* bug 494: intent_release may be called multiple times, from
210                  * this thread and we don't want to double-decref this lock */
211                 it->d.lustre.it_lock_mode = 0;
212         }
213 }
214
215 void ll_intent_release(struct lookup_intent *it)
216 {
217         ENTRY;
218
219         CDEBUG(D_INFO, "intent %p released\n", it);
220         ll_intent_drop_lock(it);
221 #ifdef HAVE_VFS_INTENT_PATCHES
222         it->it_magic = 0;
223         it->it_op_release = 0;
224 #endif
225         /* We are still holding extra reference on a request, need to free it */
226         if (it_disposition(it, DISP_ENQ_OPEN_REF))
227                  ptlrpc_req_finished(it->d.lustre.it_data); /* ll_file_open */
228         if (it_disposition(it, DISP_ENQ_CREATE_REF)) /* create rec */
229                 ptlrpc_req_finished(it->d.lustre.it_data);
230         if (it_disposition(it, DISP_ENQ_COMPLETE)) /* saved req from revalidate
231                                                     * to lookup */
232                 ptlrpc_req_finished(it->d.lustre.it_data);
233
234         it->d.lustre.it_disposition = 0;
235         it->d.lustre.it_data = NULL;
236         EXIT;
237 }
238
239 /* Drop dentry if it is not used already, unhash otherwise.
240    Should be called with dcache lock held!
241    Returns: 1 if dentry was dropped, 0 if unhashed. */
242 int ll_drop_dentry(struct dentry *dentry)
243 {
244         lock_dentry(dentry);
245         if (atomic_read(&dentry->d_count) == 0) {
246                 CDEBUG(D_DENTRY, "deleting dentry %.*s (%p) parent %p "
247                        "inode %p\n", dentry->d_name.len,
248                        dentry->d_name.name, dentry, dentry->d_parent,
249                        dentry->d_inode);
250                 dget_locked(dentry);
251                 __d_drop(dentry);
252                 unlock_dentry(dentry);
253                 spin_unlock(&dcache_lock);
254                 cfs_spin_unlock(&ll_lookup_lock);
255                 dput(dentry);
256                 cfs_spin_lock(&ll_lookup_lock);
257                 spin_lock(&dcache_lock);
258                 return 1;
259         }
260         /* disconected dentry can not be find without lookup, because we
261          * not need his to unhash or mark invalid. */
262         if (dentry->d_flags & DCACHE_DISCONNECTED) {
263                 unlock_dentry(dentry);
264                 RETURN (0);
265         }
266
267         if (!(dentry->d_flags & DCACHE_LUSTRE_INVALID)) {
268                 CDEBUG(D_DENTRY, "unhashing dentry %.*s (%p) parent %p "
269                        "inode %p refc %d\n", dentry->d_name.len,
270                        dentry->d_name.name, dentry, dentry->d_parent,
271                        dentry->d_inode, atomic_read(&dentry->d_count));
272                 /* actually we don't unhash the dentry, rather just
273                  * mark it inaccessible for to __d_lookup(). otherwise
274                  * sys_getcwd() could return -ENOENT -bzzz */
275                 dentry->d_flags |= DCACHE_LUSTRE_INVALID;
276                 if (!dentry->d_inode || !S_ISDIR(dentry->d_inode->i_mode))
277                         __d_drop(dentry);
278         }
279         unlock_dentry(dentry);
280         return 0;
281 }
282
283 void ll_unhash_aliases(struct inode *inode)
284 {
285         struct list_head *tmp, *head;
286         ENTRY;
287
288         if (inode == NULL) {
289                 CERROR("unexpected NULL inode, tell phil\n");
290                 return;
291         }
292
293         CDEBUG(D_INODE, "marking dentries for ino %lu/%u(%p) invalid\n",
294                inode->i_ino, inode->i_generation, inode);
295
296         head = &inode->i_dentry;
297         cfs_spin_lock(&ll_lookup_lock);
298         spin_lock(&dcache_lock);
299 restart:
300         tmp = head;
301         while ((tmp = tmp->next) != head) {
302                 struct dentry *dentry = list_entry(tmp, struct dentry, d_alias);
303
304                 CDEBUG(D_DENTRY, "dentry in drop %.*s (%p) parent %p "
305                        "inode %p flags %d\n", dentry->d_name.len,
306                        dentry->d_name.name, dentry, dentry->d_parent,
307                        dentry->d_inode, dentry->d_flags);
308
309                 if (dentry->d_name.len == 1 && dentry->d_name.name[0] == '/') {
310                         CERROR("called on root (?) dentry=%p, inode=%p "
311                                "ino=%lu\n", dentry, inode, inode->i_ino);
312                         lustre_dump_dentry(dentry, 1);
313                         libcfs_debug_dumpstack(NULL);
314                 }
315
316                 if (ll_drop_dentry(dentry))
317                           goto restart;
318         }
319         spin_unlock(&dcache_lock);
320         cfs_spin_unlock(&ll_lookup_lock);
321
322         EXIT;
323 }
324
325 int ll_revalidate_it_finish(struct ptlrpc_request *request,
326                             struct lookup_intent *it,
327                             struct dentry *de)
328 {
329         int rc = 0;
330         ENTRY;
331
332         if (!request)
333                 RETURN(0);
334
335         if (it_disposition(it, DISP_LOOKUP_NEG))
336                 RETURN(-ENOENT);
337
338         rc = ll_prep_inode(&de->d_inode, request, NULL);
339
340         RETURN(rc);
341 }
342
343 void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry)
344 {
345         LASSERT(it != NULL);
346         LASSERT(dentry != NULL);
347
348         if (it->d.lustre.it_lock_mode && dentry->d_inode != NULL) {
349                 struct inode *inode = dentry->d_inode;
350                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
351
352                 CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%u)\n",
353                        inode, inode->i_ino, inode->i_generation);
354                 md_set_lock_data(sbi->ll_md_exp, &it->d.lustre.it_lock_handle,
355                                  inode, NULL);
356         }
357
358         /* drop lookup or getattr locks immediately */
359         if (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR) {
360                 /* on 2.6 there are situation when several lookups and
361                  * revalidations may be requested during single operation.
362                  * therefore, we don't release intent here -bzzz */
363                 ll_intent_drop_lock(it);
364         }
365 }
366
367 void ll_frob_intent(struct lookup_intent **itp, struct lookup_intent *deft)
368 {
369         struct lookup_intent *it = *itp;
370 #ifdef HAVE_VFS_INTENT_PATCHES
371         if (it) {
372                 LASSERTF(it->it_magic == INTENT_MAGIC,
373                          "%p has bad intent magic: %x\n",
374                          it, it->it_magic);
375         }
376 #endif
377
378         if (!it || it->it_op == IT_GETXATTR)
379                 it = *itp = deft;
380
381 #ifdef HAVE_VFS_INTENT_PATCHES
382         it->it_op_release = ll_intent_release;
383 #endif
384 }
385
386 int ll_revalidate_it(struct dentry *de, int lookup_flags,
387                      struct lookup_intent *it)
388 {
389         struct md_op_data *op_data;
390         struct ptlrpc_request *req = NULL;
391         struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
392         struct obd_export *exp;
393         struct inode *parent = de->d_parent->d_inode;
394         int rc, first = 0;
395
396         ENTRY;
397         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,intent=%s\n", de->d_name.name,
398                LL_IT2STR(it));
399
400         if (de->d_inode == NULL) {
401                 /* We can only use negative dentries if this is stat or lookup,
402                    for opens and stuff we do need to query server. */
403                 /* If there is IT_CREAT in intent op set, then we must throw
404                    away this negative dentry and actually do the request to
405                    kernel to create whatever needs to be created (if possible)*/
406                 if (it && (it->it_op & IT_CREAT))
407                         RETURN(0);
408
409                 if (de->d_flags & DCACHE_LUSTRE_INVALID)
410                         RETURN(0);
411
412                 rc = ll_have_md_lock(parent, MDS_INODELOCK_UPDATE);
413                 GOTO(out_sa, rc);
414         }
415
416         /* Never execute intents for mount points.
417          * Attributes will be fixed up in ll_inode_revalidate_it */
418         if (d_mountpoint(de))
419                 GOTO(out_sa, rc = 1);
420
421         /* need to get attributes in case root got changed from other client */
422         if (de == de->d_sb->s_root) {
423                 rc = __ll_inode_revalidate_it(de, it, MDS_INODELOCK_LOOKUP);
424                 if (rc == 0)
425                         rc = 1;
426                 GOTO(out_sa, rc);
427         }
428
429         exp = ll_i2mdexp(de->d_inode);
430
431         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_REVALIDATE_PAUSE, 5);
432         ll_frob_intent(&it, &lookup_it);
433         LASSERT(it);
434
435         if (it->it_op == IT_LOOKUP && !(de->d_flags & DCACHE_LUSTRE_INVALID))
436                 GOTO(out_sa, rc = 1);
437
438         op_data = ll_prep_md_op_data(NULL, parent, de->d_inode,
439                                      de->d_name.name, de->d_name.len,
440                                      0, LUSTRE_OPC_ANY, NULL);
441         if (IS_ERR(op_data))
442                 RETURN(PTR_ERR(op_data));
443
444         if ((it->it_op == IT_OPEN) && de->d_inode) {
445                 struct inode *inode = de->d_inode;
446                 struct ll_inode_info *lli = ll_i2info(inode);
447                 struct obd_client_handle **och_p;
448                 __u64 *och_usecount;
449
450                 /*
451                  * We used to check for MDS_INODELOCK_OPEN here, but in fact
452                  * just having LOOKUP lock is enough to justify inode is the
453                  * same. And if inode is the same and we have suitable
454                  * openhandle, then there is no point in doing another OPEN RPC
455                  * just to throw away newly received openhandle.  There are no
456                  * security implications too, if file owner or access mode is
457                  * change, LOOKUP lock is revoked.
458                  */
459
460
461                 if (it->it_flags & FMODE_WRITE) {
462                         och_p = &lli->lli_mds_write_och;
463                         och_usecount = &lli->lli_open_fd_write_count;
464                 } else if (it->it_flags & FMODE_EXEC) {
465                         och_p = &lli->lli_mds_exec_och;
466                         och_usecount = &lli->lli_open_fd_exec_count;
467                 } else {
468                         och_p = &lli->lli_mds_read_och;
469                         och_usecount = &lli->lli_open_fd_read_count;
470                 }
471                 /* Check for the proper lock. */
472                 if (!ll_have_md_lock(inode, MDS_INODELOCK_LOOKUP))
473                         goto do_lock;
474                 cfs_down(&lli->lli_och_sem);
475                 if (*och_p) { /* Everything is open already, do nothing */
476                         /*(*och_usecount)++;  Do not let them steal our open
477                           handle from under us */
478                         /* XXX The code above was my original idea, but in case
479                            we have the handle, but we cannot use it due to later
480                            checks (e.g. O_CREAT|O_EXCL flags set), nobody
481                            would decrement counter increased here. So we just
482                            hope the lock won't be invalidated in between. But
483                            if it would be, we'll reopen the open request to
484                            MDS later during file open path */
485                         cfs_up(&lli->lli_och_sem);
486                         ll_finish_md_op_data(op_data);
487                         RETURN(1);
488                 } else {
489                         cfs_up(&lli->lli_och_sem);
490                 }
491         }
492
493         if (it->it_op == IT_GETATTR) {
494                 first = ll_statahead_enter(parent, &de, 0);
495                 if (first == 1) {
496                         ll_statahead_exit(parent, de, 1);
497                         ll_finish_md_op_data(op_data);
498                         GOTO(out, rc = 1);
499                 }
500         }
501
502 do_lock:
503         it->it_create_mode &= ~current->fs->umask;
504         it->it_create_mode |= M_CHECK_STALE;
505         rc = md_intent_lock(exp, op_data, NULL, 0, it,
506                             lookup_flags,
507                             &req, ll_md_blocking_ast, 0);
508         it->it_create_mode &= ~M_CHECK_STALE;
509         ll_finish_md_op_data(op_data);
510         if (it->it_op == IT_GETATTR && !first)
511                 /* If there are too many locks on client-side, then some
512                  * locks taken by statahead maybe dropped automatically
513                  * before the real "revalidate" using them. */
514                 ll_statahead_exit(parent, de, req == NULL ? rc : 0);
515         else if (first == -EEXIST)
516                 ll_statahead_mark(parent, de);
517
518         /* If req is NULL, then md_intent_lock only tried to do a lock match;
519          * if all was well, it will return 1 if it found locks, 0 otherwise. */
520         if (req == NULL && rc >= 0) {
521                 if (!rc)
522                         goto do_lookup;
523                 GOTO(out, rc);
524         }
525
526         if (rc < 0) {
527                 if (rc != -ESTALE) {
528                         CDEBUG(D_INFO, "ll_intent_lock: rc %d : it->it_status "
529                                "%d\n", rc, it->d.lustre.it_status);
530                 }
531                 GOTO(out, rc = 0);
532         }
533
534 revalidate_finish:
535         rc = ll_revalidate_it_finish(req, it, de);
536         if (rc != 0) {
537                 if (rc != -ESTALE && rc != -ENOENT)
538                         ll_intent_release(it);
539                 GOTO(out, rc = 0);
540         }
541
542         if ((it->it_op & IT_OPEN) && de->d_inode &&
543             !S_ISREG(de->d_inode->i_mode) &&
544             !S_ISDIR(de->d_inode->i_mode)) {
545                 ll_release_openhandle(de, it);
546         }
547         rc = 1;
548
549         /* unfortunately ll_intent_lock may cause a callback and revoke our
550          * dentry */
551         cfs_spin_lock(&ll_lookup_lock);
552         spin_lock(&dcache_lock);
553         lock_dentry(de);
554         __d_drop(de);
555         unlock_dentry(de);
556         d_rehash_cond(de, 0);
557         spin_unlock(&dcache_lock);
558         cfs_spin_unlock(&ll_lookup_lock);
559
560 out:
561         /* We do not free request as it may be reused during following lookup
562          * (see comment in mdc/mdc_locks.c::mdc_intent_lock()), request will
563          * be freed in ll_lookup_it or in ll_intent_release. But if
564          * request was not completed, we need to free it. (bug 5154, 9903) */
565         if (req != NULL && !it_disposition(it, DISP_ENQ_COMPLETE))
566                 ptlrpc_req_finished(req);
567         if (rc == 0) {
568                 ll_unhash_aliases(de->d_inode);
569                 /* done in ll_unhash_aliases()
570                    dentry->d_flags |= DCACHE_LUSTRE_INVALID; */
571         } else {
572                 CDEBUG(D_DENTRY, "revalidated dentry %.*s (%p) parent %p "
573                        "inode %p refc %d\n", de->d_name.len,
574                        de->d_name.name, de, de->d_parent, de->d_inode,
575                        atomic_read(&de->d_count));
576                 if (first != 1) {
577                         if (de->d_flags & DCACHE_LUSTRE_INVALID) {
578                                 lock_dentry(de);
579                                 de->d_flags &= ~DCACHE_LUSTRE_INVALID;
580                                 unlock_dentry(de);
581                         }
582                         ll_lookup_finish_locks(it, de);
583                 }
584         }
585         RETURN(rc);
586
587         /*
588          * This part is here to combat evil-evil race in real_lookup on 2.6
589          * kernels.  The race details are: We enter do_lookup() looking for some
590          * name, there is nothing in dcache for this name yet and d_lookup()
591          * returns NULL.  We proceed to real_lookup(), and while we do this,
592          * another process does open on the same file we looking up (most simple
593          * reproducer), open succeeds and the dentry is added. Now back to
594          * us. In real_lookup() we do d_lookup() again and suddenly find the
595          * dentry, so we call d_revalidate on it, but there is no lock, so
596          * without this code we would return 0, but unpatched real_lookup just
597          * returns -ENOENT in such a case instead of retrying the lookup. Once
598          * this is dealt with in real_lookup(), all of this ugly mess can go and
599          * we can just check locks in ->d_revalidate without doing any RPCs
600          * ever.
601          */
602 do_lookup:
603         if (it != &lookup_it) {
604                 /* MDS_INODELOCK_UPDATE needed for IT_GETATTR case. */
605                 if (it->it_op == IT_GETATTR)
606                         lookup_it.it_op = IT_GETATTR;
607                 ll_lookup_finish_locks(it, de);
608                 it = &lookup_it;
609         }
610
611         /* Do real lookup here. */
612         op_data = ll_prep_md_op_data(NULL, parent, NULL, de->d_name.name,
613                                      de->d_name.len, 0, (it->it_op & IT_CREAT ?
614                                                          LUSTRE_OPC_CREATE :
615                                                          LUSTRE_OPC_ANY), NULL);
616         if (IS_ERR(op_data))
617                 RETURN(PTR_ERR(op_data));
618
619         rc = md_intent_lock(exp, op_data, NULL, 0,  it, 0, &req,
620                             ll_md_blocking_ast, 0);
621         if (rc >= 0) {
622                 struct mdt_body *mdt_body;
623                 struct lu_fid fid = {.f_seq = 0, .f_oid = 0, .f_ver = 0};
624                 mdt_body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
625
626                 if (de->d_inode)
627                         fid = *ll_inode2fid(de->d_inode);
628
629                 /* see if we got same inode, if not - return error */
630                 if (lu_fid_eq(&fid, &mdt_body->fid1)) {
631                         ll_finish_md_op_data(op_data);
632                         op_data = NULL;
633                         goto revalidate_finish;
634                 }
635                 ll_intent_release(it);
636         }
637         ll_finish_md_op_data(op_data);
638         GOTO(out, rc = 0);
639
640 out_sa:
641         /*
642          * For rc == 1 case, should not return directly to prevent losing
643          * statahead windows; for rc == 0 case, the "lookup" will be done later.
644          */
645         if (it && it->it_op == IT_GETATTR && rc == 1) {
646                 first = ll_statahead_enter(parent, &de, 0);
647                 if (first >= 0)
648                         ll_statahead_exit(parent, de, 1);
649                 else if (first == -EEXIST)
650                         ll_statahead_mark(parent, de);
651         }
652
653         return rc;
654 }
655
656 #if 0
657 static void ll_pin(struct dentry *de, struct vfsmount *mnt, int flag)
658 {
659         struct inode *inode= de->d_inode;
660         struct ll_sb_info *sbi = ll_i2sbi(inode);
661         struct ll_dentry_data *ldd = ll_d2d(de);
662         struct obd_client_handle *handle;
663         struct obd_capa *oc;
664         int rc = 0;
665         ENTRY;
666         LASSERT(ldd);
667
668         cfs_lock_kernel();
669         /* Strictly speaking this introduces an additional race: the
670          * increments should wait until the rpc has returned.
671          * However, given that at present the function is void, this
672          * issue is moot. */
673         if (flag == 1 && (++ldd->lld_mnt_count) > 1) {
674                 cfs_unlock_kernel();
675                 EXIT;
676                 return;
677         }
678
679         if (flag == 0 && (++ldd->lld_cwd_count) > 1) {
680                 cfs_unlock_kernel();
681                 EXIT;
682                 return;
683         }
684         cfs_unlock_kernel();
685
686         handle = (flag) ? &ldd->lld_mnt_och : &ldd->lld_cwd_och;
687         oc = ll_mdscapa_get(inode);
688         rc = obd_pin(sbi->ll_md_exp, ll_inode2fid(inode), oc, handle, flag);
689         capa_put(oc);
690         if (rc) {
691                 cfs_lock_kernel();
692                 memset(handle, 0, sizeof(*handle));
693                 if (flag == 0)
694                         ldd->lld_cwd_count--;
695                 else
696                         ldd->lld_mnt_count--;
697                 cfs_unlock_kernel();
698         }
699
700         EXIT;
701         return;
702 }
703
704 static void ll_unpin(struct dentry *de, struct vfsmount *mnt, int flag)
705 {
706         struct ll_sb_info *sbi = ll_i2sbi(de->d_inode);
707         struct ll_dentry_data *ldd = ll_d2d(de);
708         struct obd_client_handle handle;
709         int count, rc = 0;
710         ENTRY;
711         LASSERT(ldd);
712
713         cfs_lock_kernel();
714         /* Strictly speaking this introduces an additional race: the
715          * increments should wait until the rpc has returned.
716          * However, given that at present the function is void, this
717          * issue is moot. */
718         handle = (flag) ? ldd->lld_mnt_och : ldd->lld_cwd_och;
719         if (handle.och_magic != OBD_CLIENT_HANDLE_MAGIC) {
720                 /* the "pin" failed */
721                 cfs_unlock_kernel();
722                 EXIT;
723                 return;
724         }
725
726         if (flag)
727                 count = --ldd->lld_mnt_count;
728         else
729                 count = --ldd->lld_cwd_count;
730         cfs_unlock_kernel();
731
732         if (count != 0) {
733                 EXIT;
734                 return;
735         }
736
737         rc = obd_unpin(sbi->ll_md_exp, &handle, flag);
738         EXIT;
739         return;
740 }
741 #endif
742
743 #ifdef HAVE_VFS_INTENT_PATCHES
744 int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
745 {
746         int rc;
747         ENTRY;
748
749         if (nd && nd->flags & LOOKUP_LAST && !(nd->flags & LOOKUP_LINK_NOTLAST))
750                 rc = ll_revalidate_it(dentry, nd->flags, &nd->intent);
751         else
752                 rc = ll_revalidate_it(dentry, 0, NULL);
753
754         RETURN(rc);
755 }
756 #else
757 int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
758 {
759         int rc;
760         ENTRY;
761
762         if (nd && !(nd->flags & (LOOKUP_CONTINUE|LOOKUP_PARENT))) {
763                 struct lookup_intent *it;
764                 it = ll_convert_intent(&nd->intent.open, nd->flags);
765                 if (IS_ERR(it))
766                         RETURN(0);
767                 if (it->it_op == (IT_OPEN|IT_CREAT))
768                         if (nd->intent.open.flags & O_EXCL) {
769                                 CDEBUG(D_VFSTRACE, "create O_EXCL, returning 0\n");
770                                 rc = 0;
771                                 goto out_it;
772                         }
773
774                 rc = ll_revalidate_it(dentry, nd->flags, it);
775
776                 if (rc && (nd->flags & LOOKUP_OPEN) &&
777                     it_disposition(it, DISP_OPEN_OPEN)) {/*Open*/
778 #ifdef HAVE_FILE_IN_STRUCT_INTENT
779 // XXX Code duplication with ll_lookup_nd
780                         if (S_ISFIFO(dentry->d_inode->i_mode)) {
781                                 // We cannot call open here as it would
782                                 // deadlock.
783                                 ptlrpc_req_finished(
784                                                (struct ptlrpc_request *)
785                                                   it->d.lustre.it_data);
786                         } else {
787 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,17))
788 /* 2.6.1[456] have a bug in open_namei() that forgets to check
789  * nd->intent.open.file for error, so we need to return it as lookup's result
790  * instead */
791                                 struct file *filp;
792
793                                 nd->intent.open.file->private_data = it;
794                                 filp = lookup_instantiate_filp(nd, dentry,NULL);
795                                 if (IS_ERR(filp)) {
796                                         rc = PTR_ERR(filp);
797                                 }
798 #else
799                                 nd->intent.open.file->private_data = it;
800                                 (void)lookup_instantiate_filp(nd, dentry,NULL);
801 #endif
802                         }
803 #else
804                         ll_release_openhandle(dentry, it);
805 #endif /* HAVE_FILE_IN_STRUCT_INTENT */
806                 }
807                 if (!rc && (nd->flags & LOOKUP_CREATE) &&
808                     it_disposition(it, DISP_OPEN_CREATE)) {
809                         /* We created something but we may only return
810                          * negative dentry here, so save request in dentry,
811                          * if lookup will be called later on, it will
812                          * pick the request, otherwise it would be freed
813                          * with dentry */
814                         ll_d2d(dentry)->lld_it = it;
815                         it = NULL; /* avoid freeing */
816                 }
817
818 out_it:
819                 if (it) {
820                         ll_intent_release(it);
821                         OBD_FREE(it, sizeof(*it));
822                 }
823         } else {
824                 rc = ll_revalidate_it(dentry, 0, NULL);
825         }
826
827         RETURN(rc);
828 }
829 #endif
830
831 void ll_d_iput(struct dentry *de, struct inode *inode)
832 {
833         LASSERT(inode);
834         if (!find_cbdata(inode))
835                 inode->i_nlink = 0;
836         iput(inode);
837 }
838
839 struct dentry_operations ll_d_ops = {
840         .d_revalidate = ll_revalidate_nd,
841         .d_release = ll_release,
842         .d_delete  = ll_ddelete,
843         .d_iput    = ll_d_iput,
844         .d_compare = ll_dcompare,
845 #if 0
846         .d_pin = ll_pin,
847         .d_unpin = ll_unpin,
848 #endif
849 };