Whamcloud - gitweb
b=19151
[fs/lustre-release.git] / lustre / llite / dcache.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #include <linux/fs.h>
38 #include <linux/sched.h>
39 #include <linux/smp_lock.h>
40 #include <linux/quotaops.h>
41
42 #define DEBUG_SUBSYSTEM S_LLITE
43
44 #include <obd_support.h>
45 #include <lustre_lite.h>
46 #include <lustre/lustre_idl.h>
47 #include <lustre_dlm.h>
48 #include <lustre_mdc.h>
49 //#include <lustre_ver.h>
50 //#include <lustre_version.h>
51
52 #include "llite_internal.h"
53
54 spinlock_t ll_lookup_lock = SPIN_LOCK_UNLOCKED;
55
56 /* should NOT be called with the dcache lock, see fs/dcache.c */
57 static void ll_release(struct dentry *de)
58 {
59         struct ll_dentry_data *lld;
60         ENTRY;
61         LASSERT(de != NULL);
62         lld = ll_d2d(de);
63         if (lld == NULL) { /* NFS copies the de->d_op methods (bug 4655) */
64                 EXIT;
65                 return;
66         }
67 #ifndef HAVE_VFS_INTENT_PATCHES
68         if (lld->lld_it) {
69                 ll_intent_release(lld->lld_it);
70                 OBD_FREE(lld->lld_it, sizeof(*lld->lld_it));
71         }
72 #endif
73         LASSERT(lld->lld_cwd_count == 0);
74         LASSERT(lld->lld_mnt_count == 0);
75         OBD_FREE(de->d_fsdata, sizeof(*lld));
76
77         EXIT;
78 }
79
80 #ifdef DCACHE_LUSTRE_INVALID
81 /* Compare if two dentries are the same.  Don't match if the existing dentry
82  * is marked DCACHE_LUSTRE_INVALID.  Returns 1 if different, 0 if the same.
83  *
84  * This avoids a race where ll_lookup_it() instantiates a dentry, but we get
85  * an AST before calling d_revalidate_it().  The dentry still exists (marked
86  * INVALID) so d_lookup() matches it, but we have no lock on it (so
87  * lock_match() fails) and we spin around real_lookup(). */
88 int ll_dcompare(struct dentry *parent, struct qstr *d_name, struct qstr *name)
89 {
90         struct dentry *dchild;
91         ENTRY;
92
93         if (d_name->len != name->len)
94                 RETURN(1);
95
96         if (memcmp(d_name->name, name->name, name->len))
97                 RETURN(1);
98
99         /* XXX: d_name must be in-dentry structure */
100         dchild = container_of(d_name, struct dentry, d_name); /* ugh */
101         if (dchild->d_flags & DCACHE_LUSTRE_INVALID) {
102                 CDEBUG(D_DENTRY,"INVALID dentry %p not matched, was bug 3784\n",
103                        dchild);
104                 RETURN(1);
105         }
106
107         RETURN(0);
108 }
109 #endif
110
111 /* should NOT be called with the dcache lock, see fs/dcache.c */
112 static int ll_ddelete(struct dentry *de)
113 {
114         ENTRY;
115         LASSERT(de);
116 #ifndef DCACHE_LUSTRE_INVALID
117 #define DCACHE_LUSTRE_INVALID 0
118 #endif
119
120         CDEBUG(D_DENTRY, "%s dentry %.*s (%p, parent %p, inode %p) %s%s\n",
121                (de->d_flags & DCACHE_LUSTRE_INVALID ? "deleting" : "keeping"),
122                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
123                d_unhashed(de) ? "" : "hashed,",
124                list_empty(&de->d_subdirs) ? "" : "subdirs");
125 #if DCACHE_LUSTRE_INVALID == 0
126 #undef DCACHE_LUSTRE_INVALID
127 #endif
128
129         RETURN(0);
130 }
131
132 void ll_set_dd(struct dentry *de)
133 {
134         ENTRY;
135         LASSERT(de != NULL);
136
137         CDEBUG(D_DENTRY, "ldd on dentry %.*s (%p) parent %p inode %p refc %d\n",
138                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
139                atomic_read(&de->d_count));
140
141         if (de->d_fsdata == NULL) {
142                 struct ll_dentry_data *lld;
143
144                 OBD_ALLOC_PTR(lld);
145                 if (likely(lld != NULL)) {
146                         lock_dentry(de);
147                         if (likely(de->d_fsdata == NULL))
148                                 de->d_fsdata = lld;
149                         else
150                                 OBD_FREE_PTR(lld);
151                         unlock_dentry(de);
152                 }
153         }
154
155         EXIT;
156 }
157
158 void ll_intent_drop_lock(struct lookup_intent *it)
159 {
160         struct lustre_handle *handle;
161
162         if (it->it_op && it->d.lustre.it_lock_mode) {
163                 handle = (struct lustre_handle *)&it->d.lustre.it_lock_handle;
164                 CDEBUG(D_DLMTRACE, "releasing lock with cookie "LPX64
165                        " from it %p\n", handle->cookie, it);
166                 ldlm_lock_decref(handle, it->d.lustre.it_lock_mode);
167
168                 /* bug 494: intent_release may be called multiple times, from
169                  * this thread and we don't want to double-decref this lock */
170                 it->d.lustre.it_lock_mode = 0;
171         }
172 }
173
174 void ll_intent_release(struct lookup_intent *it)
175 {
176         ENTRY;
177
178         CDEBUG(D_INFO, "intent %p released\n", it);
179         ll_intent_drop_lock(it);
180 #ifdef HAVE_VFS_INTENT_PATCHES
181         it->it_magic = 0;
182         it->it_op_release = 0;
183 #endif
184         /* We are still holding extra reference on a request, need to free it */
185         if (it_disposition(it, DISP_ENQ_OPEN_REF))
186                  ptlrpc_req_finished(it->d.lustre.it_data); /* ll_file_open */
187         if (it_disposition(it, DISP_ENQ_CREATE_REF)) /* create rec */
188                 ptlrpc_req_finished(it->d.lustre.it_data);
189         if (it_disposition(it, DISP_ENQ_COMPLETE)) /* saved req from revalidate
190                                                     * to lookup */
191                 ptlrpc_req_finished(it->d.lustre.it_data);
192
193         it->d.lustre.it_disposition = 0;
194         it->d.lustre.it_data = NULL;
195         EXIT;
196 }
197
198 /* Drop dentry if it is not used already, unhash otherwise.
199    Should be called with dcache lock held!
200    Returns: 1 if dentry was dropped, 0 if unhashed. */
201 int ll_drop_dentry(struct dentry *dentry)
202 {
203         lock_dentry(dentry);
204         if (atomic_read(&dentry->d_count) == 0) {
205                 CDEBUG(D_DENTRY, "deleting dentry %.*s (%p) parent %p "
206                        "inode %p\n", dentry->d_name.len,
207                        dentry->d_name.name, dentry, dentry->d_parent,
208                        dentry->d_inode);
209                 dget_locked(dentry);
210                 __d_drop(dentry);
211                 unlock_dentry(dentry);
212                 spin_unlock(&dcache_lock);
213                 spin_unlock(&ll_lookup_lock);
214                 dput(dentry);
215                 spin_lock(&ll_lookup_lock);
216                 spin_lock(&dcache_lock);
217                 return 1;
218         }
219         /* disconected dentry can not be find without lookup, because we
220          * not need his to unhash or mark invalid. */
221         if (dentry->d_flags & DCACHE_DISCONNECTED) {
222                 unlock_dentry(dentry);
223                 RETURN (0);
224         }
225
226 #ifdef DCACHE_LUSTRE_INVALID
227         if (!(dentry->d_flags & DCACHE_LUSTRE_INVALID)) {
228 #else
229         if (!d_unhashed(dentry)) {
230 #endif
231                 CDEBUG(D_DENTRY, "unhashing dentry %.*s (%p) parent %p "
232                        "inode %p refc %d\n", dentry->d_name.len,
233                        dentry->d_name.name, dentry, dentry->d_parent,
234                        dentry->d_inode, atomic_read(&dentry->d_count));
235                 /* actually we don't unhash the dentry, rather just
236                  * mark it inaccessible for to __d_lookup(). otherwise
237                  * sys_getcwd() could return -ENOENT -bzzz */
238 #ifdef DCACHE_LUSTRE_INVALID
239                 dentry->d_flags |= DCACHE_LUSTRE_INVALID;
240 #endif
241                 if (!dentry->d_inode || !S_ISDIR(dentry->d_inode->i_mode))
242                         __d_drop(dentry);
243
244         }
245         unlock_dentry(dentry);
246         return 0;
247 }
248
249 void ll_unhash_aliases(struct inode *inode)
250 {
251         struct list_head *tmp, *head;
252         ENTRY;
253
254         if (inode == NULL) {
255                 CERROR("unexpected NULL inode, tell phil\n");
256                 return;
257         }
258
259         CDEBUG(D_INODE, "marking dentries for ino %lu/%u(%p) invalid\n",
260                inode->i_ino, inode->i_generation, inode);
261
262         head = &inode->i_dentry;
263         spin_lock(&ll_lookup_lock);
264         spin_lock(&dcache_lock);
265 restart:
266         tmp = head;
267         while ((tmp = tmp->next) != head) {
268                 struct dentry *dentry = list_entry(tmp, struct dentry, d_alias);
269
270                 CDEBUG(D_DENTRY, "dentry in drop %.*s (%p) parent %p "
271                        "inode %p flags %d\n", dentry->d_name.len,
272                        dentry->d_name.name, dentry, dentry->d_parent,
273                        dentry->d_inode, dentry->d_flags);
274
275                 if (dentry->d_name.len == 1 && dentry->d_name.name[0] == '/') {
276                         CERROR("called on root (?) dentry=%p, inode=%p "
277                                "ino=%lu\n", dentry, inode, inode->i_ino);
278                         lustre_dump_dentry(dentry, 1);
279                         libcfs_debug_dumpstack(NULL);
280                 } else if (d_mountpoint(dentry)) {
281                         /* For mountpoints we skip removal of the dentry
282                            which happens solely because we have a lock on it
283                            obtained when this dentry was not a mountpoint yet */
284                         CDEBUG(D_DENTRY, "Skippind mountpoint dentry removal "
285                                          "%.*s (%p) parent %p\n",
286                                           dentry->d_name.len,
287                                           dentry->d_name.name,
288                                           dentry, dentry->d_parent);
289
290                         continue;
291                 }
292
293                 if (ll_drop_dentry(dentry))
294                           goto restart;
295         }
296         spin_unlock(&dcache_lock);
297         spin_unlock(&ll_lookup_lock);
298
299         EXIT;
300 }
301
302 int ll_revalidate_it_finish(struct ptlrpc_request *request,
303                             struct lookup_intent *it,
304                             struct dentry *de)
305 {
306         int rc = 0;
307         ENTRY;
308
309         if (!request)
310                 RETURN(0);
311
312         if (it_disposition(it, DISP_LOOKUP_NEG))
313                 RETURN(-ENOENT);
314
315         rc = ll_prep_inode(&de->d_inode, request, NULL);
316
317         RETURN(rc);
318 }
319
320 void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry)
321 {
322         LASSERT(it != NULL);
323         LASSERT(dentry != NULL);
324
325         if (it->d.lustre.it_lock_mode && dentry->d_inode != NULL) {
326                 struct inode *inode = dentry->d_inode;
327                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
328
329                 CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%u)\n",
330                        inode, inode->i_ino, inode->i_generation);
331                 md_set_lock_data(sbi->ll_md_exp, &it->d.lustre.it_lock_handle,
332                                  inode);
333         }
334
335         /* drop lookup or getattr locks immediately */
336         if (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR) {
337                 /* on 2.6 there are situation when several lookups and
338                  * revalidations may be requested during single operation.
339                  * therefore, we don't release intent here -bzzz */
340                 ll_intent_drop_lock(it);
341         }
342 }
343
344 void ll_frob_intent(struct lookup_intent **itp, struct lookup_intent *deft)
345 {
346         struct lookup_intent *it = *itp;
347 #ifdef HAVE_VFS_INTENT_PATCHES
348         if (it) {
349                 LASSERTF(it->it_magic == INTENT_MAGIC,
350                          "%p has bad intent magic: %x\n",
351                          it, it->it_magic);
352         }
353 #endif
354
355         if (!it || it->it_op == IT_GETXATTR)
356                 it = *itp = deft;
357
358 #ifdef HAVE_VFS_INTENT_PATCHES
359         it->it_op_release = ll_intent_release;
360 #endif
361 }
362
363 int ll_revalidate_it(struct dentry *de, int lookup_flags,
364                      struct lookup_intent *it)
365 {
366         struct md_op_data *op_data;
367         struct ptlrpc_request *req = NULL;
368         struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
369         struct obd_export *exp;
370         struct inode *parent;
371         int rc, first = 0;
372
373         ENTRY;
374         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,intent=%s\n", de->d_name.name,
375                LL_IT2STR(it));
376
377         if (de->d_inode == NULL) {
378                 /* We can only use negative dentries if this is stat or lookup,
379                    for opens and stuff we do need to query server. */
380                 /* If there is IT_CREAT in intent op set, then we must throw
381                    away this negative dentry and actually do the request to
382                    kernel to create whatever needs to be created (if possible)*/
383                 if (it && (it->it_op & IT_CREAT))
384                         RETURN(0);
385
386 #ifdef DCACHE_LUSTRE_INVALID
387                 if (de->d_flags & DCACHE_LUSTRE_INVALID)
388                         RETURN(0);
389 #endif
390
391                 rc = ll_have_md_lock(de->d_parent->d_inode,
392                                      MDS_INODELOCK_UPDATE);
393                 GOTO(out_sa, rc);
394         }
395
396         /* Never execute intents for mount points.
397          * Attributes will be fixed up in ll_inode_revalidate_it */
398         if (d_mountpoint(de))
399                 GOTO(out_sa, rc = 1);
400
401         /* need to get attributes in case root got changed from other client */
402         if (de == de->d_sb->s_root) {
403                 rc = __ll_inode_revalidate_it(de, it, MDS_INODELOCK_LOOKUP);
404                 if (rc == 0)
405                         rc = 1;
406                 GOTO(out_sa, rc);
407         }
408
409         exp = ll_i2mdexp(de->d_inode);
410
411         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_REVALIDATE_PAUSE, 5);
412         ll_frob_intent(&it, &lookup_it);
413         LASSERT(it);
414         parent = de->d_parent->d_inode;
415
416         op_data = ll_prep_md_op_data(NULL, parent, de->d_inode,
417                                      de->d_name.name, de->d_name.len,
418                                      0, LUSTRE_OPC_ANY, NULL);
419         if (IS_ERR(op_data))
420                 RETURN(PTR_ERR(op_data));
421
422         if ((it->it_op == IT_OPEN) && de->d_inode) {
423                 struct inode *inode = de->d_inode;
424                 struct ll_inode_info *lli = ll_i2info(inode);
425                 struct obd_client_handle **och_p;
426                 __u64 *och_usecount;
427
428                 /*
429                  * We used to check for MDS_INODELOCK_OPEN here, but in fact
430                  * just having LOOKUP lock is enough to justify inode is the
431                  * same. And if inode is the same and we have suitable
432                  * openhandle, then there is no point in doing another OPEN RPC
433                  * just to throw away newly received openhandle.  There are no
434                  * security implications too, if file owner or access mode is
435                  * change, LOOKUP lock is revoked.
436                  */
437
438
439                 if (it->it_flags & FMODE_WRITE) {
440                         och_p = &lli->lli_mds_write_och;
441                         och_usecount = &lli->lli_open_fd_write_count;
442                 } else if (it->it_flags & FMODE_EXEC) {
443                         och_p = &lli->lli_mds_exec_och;
444                         och_usecount = &lli->lli_open_fd_exec_count;
445                 } else {
446                         och_p = &lli->lli_mds_read_och;
447                         och_usecount = &lli->lli_open_fd_read_count;
448                 }
449                 /* Check for the proper lock. */
450                 if (!ll_have_md_lock(inode, MDS_INODELOCK_LOOKUP))
451                         goto do_lock;
452                 down(&lli->lli_och_sem);
453                 if (*och_p) { /* Everything is open already, do nothing */
454                         /*(*och_usecount)++;  Do not let them steal our open
455                           handle from under us */
456                         /* XXX The code above was my original idea, but in case
457                            we have the handle, but we cannot use it due to later
458                            checks (e.g. O_CREAT|O_EXCL flags set), nobody
459                            would decrement counter increased here. So we just
460                            hope the lock won't be invalidated in between. But
461                            if it would be, we'll reopen the open request to
462                            MDS later during file open path */
463                         up(&lli->lli_och_sem);
464                         ll_finish_md_op_data(op_data);
465                         RETURN(1);
466                 } else {
467                         up(&lli->lli_och_sem);
468                 }
469         }
470
471         if (it->it_op == IT_GETATTR)
472                 first = ll_statahead_enter(de->d_parent->d_inode, &de, 0);
473
474 do_lock:
475         it->it_create_mode &= ~current->fs->umask;
476         it->it_create_mode |= M_CHECK_STALE;
477         rc = md_intent_lock(exp, op_data, NULL, 0, it,
478                             lookup_flags,
479                             &req, ll_md_blocking_ast, 0);
480         it->it_create_mode &= ~M_CHECK_STALE;
481         ll_finish_md_op_data(op_data);
482         if (it->it_op == IT_GETATTR && !first)
483                 /* If there are too many locks on client-side, then some
484                  * locks taken by statahead maybe dropped automatically
485                  * before the real "revalidate" using them. */
486                 ll_statahead_exit(de, req == NULL ? rc : 0);
487         else if (first == -EEXIST)
488                 ll_statahead_mark(de);
489
490         /* If req is NULL, then md_intent_lock only tried to do a lock match;
491          * if all was well, it will return 1 if it found locks, 0 otherwise. */
492         if (req == NULL && rc >= 0) {
493                 if (!rc)
494                         goto do_lookup;
495                 GOTO(out, rc);
496         }
497
498         if (rc < 0) {
499                 if (rc != -ESTALE) {
500                         CDEBUG(D_INFO, "ll_intent_lock: rc %d : it->it_status "
501                                "%d\n", rc, it->d.lustre.it_status);
502                 } else {
503 #ifndef HAVE_VFS_INTENT_PATCHES
504                         if (it_disposition(it, DISP_OPEN_OPEN) &&
505                             !it_open_error(DISP_OPEN_OPEN, it))
506                                 /* server have valid open - close file first*/
507                                 ll_release_openhandle(de, it);
508 #endif
509                 }
510                 GOTO(out, rc = 0);
511         }
512
513 revalidate_finish:
514         rc = ll_revalidate_it_finish(req, it, de);
515         if (rc != 0) {
516                 if (rc != -ESTALE && rc != -ENOENT)
517                         ll_intent_release(it);
518                 GOTO(out, rc = 0);
519         }
520
521         if ((it->it_op & IT_OPEN) && de->d_inode &&
522             !S_ISREG(de->d_inode->i_mode) &&
523             !S_ISDIR(de->d_inode->i_mode)) {
524                 ll_release_openhandle(de, it);
525         }
526         rc = 1;
527
528         /* unfortunately ll_intent_lock may cause a callback and revoke our
529          * dentry */
530         spin_lock(&ll_lookup_lock);
531         spin_lock(&dcache_lock);
532         lock_dentry(de);
533         __d_drop(de);
534         unlock_dentry(de);
535         d_rehash_cond(de, 0);
536         spin_unlock(&dcache_lock);
537         spin_unlock(&ll_lookup_lock);
538
539 out:
540         /* We do not free request as it may be reused during following lookup
541          * (see comment in mdc/mdc_locks.c::mdc_intent_lock()), request will
542          * be freed in ll_lookup_it or in ll_intent_release. But if
543          * request was not completed, we need to free it. (bug 5154, 9903) */
544         if (req != NULL && !it_disposition(it, DISP_ENQ_COMPLETE))
545                 ptlrpc_req_finished(req);
546         if (rc == 0) {
547 #ifdef DCACHE_LUSTRE_INVALID
548                 ll_unhash_aliases(de->d_inode);
549                 /* done in ll_unhash_aliases()
550                    dentry->d_flags |= DCACHE_LUSTRE_INVALID; */
551 #else
552                 /* We do not want d_invalidate to kill all child dentries too */
553                 d_drop(de);
554 #endif
555         } else {
556                 CDEBUG(D_DENTRY, "revalidated dentry %.*s (%p) parent %p "
557                        "inode %p refc %d\n", de->d_name.len,
558                        de->d_name.name, de, de->d_parent, de->d_inode,
559                        atomic_read(&de->d_count));
560                 ll_lookup_finish_locks(it, de);
561 #ifdef DCACHE_LUSTRE_INVALID
562                 lock_dentry(de);
563                 de->d_flags &= ~DCACHE_LUSTRE_INVALID;
564                 unlock_dentry(de);
565 #endif
566         }
567         RETURN(rc);
568
569         /*
570          * This part is here to combat evil-evil race in real_lookup on 2.6
571          * kernels.  The race details are: We enter do_lookup() looking for some
572          * name, there is nothing in dcache for this name yet and d_lookup()
573          * returns NULL.  We proceed to real_lookup(), and while we do this,
574          * another process does open on the same file we looking up (most simple
575          * reproducer), open succeeds and the dentry is added. Now back to
576          * us. In real_lookup() we do d_lookup() again and suddenly find the
577          * dentry, so we call d_revalidate on it, but there is no lock, so
578          * without this code we would return 0, but unpatched real_lookup just
579          * returns -ENOENT in such a case instead of retrying the lookup. Once
580          * this is dealt with in real_lookup(), all of this ugly mess can go and
581          * we can just check locks in ->d_revalidate without doing any RPCs
582          * ever.
583          */
584 do_lookup:
585         if (it != &lookup_it) {
586                 /* MDS_INODELOCK_UPDATE needed for IT_GETATTR case. */
587                 if (it->it_op == IT_GETATTR)
588                         lookup_it.it_op = IT_GETATTR;
589                 ll_lookup_finish_locks(it, de);
590                 it = &lookup_it;
591         }
592
593         /* Do real lookup here. */
594         op_data = ll_prep_md_op_data(NULL, parent, NULL, de->d_name.name,
595                                      de->d_name.len, 0, (it->it_op & IT_CREAT ?
596                                                          LUSTRE_OPC_CREATE :
597                                                          LUSTRE_OPC_ANY), NULL);
598         if (IS_ERR(op_data))
599                 RETURN(PTR_ERR(op_data));
600
601         rc = md_intent_lock(exp, op_data, NULL, 0,  it, 0, &req,
602                             ll_md_blocking_ast, 0);
603         if (rc >= 0) {
604                 struct mdt_body *mdt_body;
605                 struct lu_fid fid = {.f_seq = 0, .f_oid = 0, .f_ver = 0};
606                 mdt_body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
607
608                 if (de->d_inode)
609                         fid = *ll_inode2fid(de->d_inode);
610
611                 /* see if we got same inode, if not - return error */
612                 if (lu_fid_eq(&fid, &mdt_body->fid1)) {
613                         ll_finish_md_op_data(op_data);
614                         op_data = NULL;
615                         goto revalidate_finish;
616                 }
617                 ll_intent_release(it);
618         }
619         ll_finish_md_op_data(op_data);
620         GOTO(out, rc = 0);
621
622 out_sa:
623         /*
624          * For rc == 1 case, should not return directly to prevent losing
625          * statahead windows; for rc == 0 case, the "lookup" will be done later.
626          */
627         if (it && it->it_op == IT_GETATTR && rc == 1) {
628                 first = ll_statahead_enter(de->d_parent->d_inode, &de, 0);
629                 if (!first)
630                         ll_statahead_exit(de, 1);
631                 else if (first == -EEXIST)
632                         ll_statahead_mark(de);
633         }
634
635         return rc;
636 }
637
638 /*static*/ void ll_pin(struct dentry *de, struct vfsmount *mnt, int flag)
639 {
640         struct inode *inode= de->d_inode;
641         struct ll_sb_info *sbi = ll_i2sbi(inode);
642         struct ll_dentry_data *ldd = ll_d2d(de);
643         struct obd_client_handle *handle;
644         struct obd_capa *oc;
645         int rc = 0;
646         ENTRY;
647         LASSERT(ldd);
648
649         lock_kernel();
650         /* Strictly speaking this introduces an additional race: the
651          * increments should wait until the rpc has returned.
652          * However, given that at present the function is void, this
653          * issue is moot. */
654         if (flag == 1 && (++ldd->lld_mnt_count) > 1) {
655                 unlock_kernel();
656                 EXIT;
657                 return;
658         }
659
660         if (flag == 0 && (++ldd->lld_cwd_count) > 1) {
661                 unlock_kernel();
662                 EXIT;
663                 return;
664         }
665         unlock_kernel();
666
667         handle = (flag) ? &ldd->lld_mnt_och : &ldd->lld_cwd_och;
668         oc = ll_mdscapa_get(inode);
669         rc = obd_pin(sbi->ll_md_exp, ll_inode2fid(inode), oc, handle, flag);
670         capa_put(oc);
671         if (rc) {
672                 lock_kernel();
673                 memset(handle, 0, sizeof(*handle));
674                 if (flag == 0)
675                         ldd->lld_cwd_count--;
676                 else
677                         ldd->lld_mnt_count--;
678                 unlock_kernel();
679         }
680
681         EXIT;
682         return;
683 }
684
685 /*static*/ void ll_unpin(struct dentry *de, struct vfsmount *mnt, int flag)
686 {
687         struct ll_sb_info *sbi = ll_i2sbi(de->d_inode);
688         struct ll_dentry_data *ldd = ll_d2d(de);
689         struct obd_client_handle handle;
690         int count, rc = 0;
691         ENTRY;
692         LASSERT(ldd);
693
694         lock_kernel();
695         /* Strictly speaking this introduces an additional race: the
696          * increments should wait until the rpc has returned.
697          * However, given that at present the function is void, this
698          * issue is moot. */
699         handle = (flag) ? ldd->lld_mnt_och : ldd->lld_cwd_och;
700         if (handle.och_magic != OBD_CLIENT_HANDLE_MAGIC) {
701                 /* the "pin" failed */
702                 unlock_kernel();
703                 EXIT;
704                 return;
705         }
706
707         if (flag)
708                 count = --ldd->lld_mnt_count;
709         else
710                 count = --ldd->lld_cwd_count;
711         unlock_kernel();
712
713         if (count != 0) {
714                 EXIT;
715                 return;
716         }
717
718         rc = obd_unpin(sbi->ll_md_exp, &handle, flag);
719         EXIT;
720         return;
721 }
722
723 #ifdef HAVE_VFS_INTENT_PATCHES
724 int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
725 {
726         int rc;
727         ENTRY;
728
729         if (nd && nd->flags & LOOKUP_LAST && !(nd->flags & LOOKUP_LINK_NOTLAST))
730                 rc = ll_revalidate_it(dentry, nd->flags, &nd->intent);
731         else
732                 rc = ll_revalidate_it(dentry, 0, NULL);
733
734         RETURN(rc);
735 }
736 #else
737 int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
738 {
739         int rc;
740         ENTRY;
741
742         if (nd && !(nd->flags & (LOOKUP_CONTINUE|LOOKUP_PARENT))) {
743                 struct lookup_intent *it;
744                 it = ll_convert_intent(&nd->intent.open, nd->flags);
745                 if (IS_ERR(it))
746                         RETURN(0);
747                 if (it->it_op == (IT_OPEN|IT_CREAT))
748                         if (nd->intent.open.flags & O_EXCL) {
749                                 CDEBUG(D_VFSTRACE, "create O_EXCL, returning 0\n");
750                                 rc = 0;
751                                 goto out_it;
752                         }
753
754                 rc = ll_revalidate_it(dentry, nd->flags, it);
755
756                 if (rc && (nd->flags & LOOKUP_OPEN) &&
757                     it_disposition(it, DISP_OPEN_OPEN)) {/*Open*/
758 #ifdef HAVE_FILE_IN_STRUCT_INTENT
759 // XXX Code duplication with ll_lookup_nd
760                         if (S_ISFIFO(dentry->d_inode->i_mode)) {
761                                 // We cannot call open here as it would
762                                 // deadlock.
763                                 ptlrpc_req_finished(
764                                                (struct ptlrpc_request *)
765                                                   it->d.lustre.it_data);
766                         } else {
767 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,17))
768 /* 2.6.1[456] have a bug in open_namei() that forgets to check
769  * nd->intent.open.file for error, so we need to return it as lookup's result
770  * instead */
771                                 struct file *filp;
772
773                                 nd->intent.open.file->private_data = it;
774                                 filp = lookup_instantiate_filp(nd, dentry,NULL);
775                                 if (IS_ERR(filp)) {
776                                         rc = PTR_ERR(filp);
777                                 }
778 #else
779                                 nd->intent.open.file->private_data = it;
780                                 (void)lookup_instantiate_filp(nd, dentry,NULL);
781 #endif
782                         }
783 #else
784                         ll_release_openhandle(dentry, it);
785 #endif /* HAVE_FILE_IN_STRUCT_INTENT */
786                 }
787                 if (!rc && (nd->flags & LOOKUP_CREATE) &&
788                     it_disposition(it, DISP_OPEN_CREATE)) {
789                         /* We created something but we may only return
790                          * negative dentry here, so save request in dentry,
791                          * if lookup will be called later on, it will
792                          * pick the request, otherwise it would be freed
793                          * with dentry */
794                         ll_d2d(dentry)->lld_it = it;
795                         it = NULL; /* avoid freeing */
796                 }
797
798 out_it:
799                 if (it) {
800                         ll_intent_release(it);
801                         OBD_FREE(it, sizeof(*it));
802                 }
803         } else {
804                 rc = ll_revalidate_it(dentry, 0, NULL);
805         }
806
807         RETURN(rc);
808 }
809 #endif
810
811 struct dentry_operations ll_d_ops = {
812         .d_revalidate = ll_revalidate_nd,
813         .d_release = ll_release,
814         .d_delete = ll_ddelete,
815 #ifdef DCACHE_LUSTRE_INVALID
816         .d_compare = ll_dcompare,
817 #endif
818 #if 0
819         .d_pin = ll_pin,
820         .d_unpin = ll_unpin,
821 #endif
822 };