Whamcloud - gitweb
b=22766 cascading_rw: take lmm_stripe_count returned by ioctl(LL_IOC_LOV_SETSTRIPE)
[fs/lustre-release.git] / lustre / llite / dcache.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #include <linux/fs.h>
38 #include <linux/sched.h>
39 #include <linux/smp_lock.h>
40 #include <linux/quotaops.h>
41
42 #define DEBUG_SUBSYSTEM S_LLITE
43
44 #include <obd_support.h>
45 #include <lustre_lite.h>
46 #include <lustre/lustre_idl.h>
47 #include <lustre_dlm.h>
48 #include <linux/lustre_version.h>
49
50 #include "llite_internal.h"
51
52 spinlock_t ll_lookup_lock = SPIN_LOCK_UNLOCKED;
53
54 /* should NOT be called with the dcache lock, see fs/dcache.c */
55 static void ll_release(struct dentry *de)
56 {
57         struct ll_dentry_data *lld;
58         ENTRY;
59         LASSERT(de != NULL);
60         lld = ll_d2d(de);
61         if (lld == NULL) { /* NFS copies the de->d_op methods (bug 4655) */
62                 EXIT;
63                 return;
64         }
65 #ifndef HAVE_VFS_INTENT_PATCHES
66         if (lld->lld_it) {
67                 ll_intent_release(lld->lld_it);
68                 OBD_FREE(lld->lld_it, sizeof(*lld->lld_it));
69         }
70 #endif
71         LASSERT(lld->lld_cwd_count == 0);
72         LASSERT(lld->lld_mnt_count == 0);
73         OBD_FREE(de->d_fsdata, sizeof(*lld));
74
75         EXIT;
76 }
77
78 /* Compare if two dentries are the same.  Don't match if the existing dentry
79  * is marked DCACHE_LUSTRE_INVALID.  Returns 1 if different, 0 if the same.
80  *
81  * This avoids a race where ll_lookup_it() instantiates a dentry, but we get
82  * an AST before calling d_revalidate_it().  The dentry still exists (marked
83  * INVALID) so d_lookup() matches it, but we have no lock on it (so
84  * lock_match() fails) and we spin around real_lookup(). */
85 int ll_dcompare(struct dentry *parent, struct qstr *d_name, struct qstr *name)
86 {
87         struct dentry *dchild;
88         ENTRY;
89
90         /* XXX: d_name must be in-dentry structure */
91         dchild = container_of(d_name, struct dentry, d_name); /* ugh */
92
93         if (d_name->len != name->len)
94                 RETURN(1);
95
96         if (memcmp(d_name->name, name->name, name->len))
97                 RETURN(1);
98
99         CDEBUG(D_DENTRY,"found name %.*s(%p) - flags %d/%x - refc %d\n",
100                name->len, name->name, dchild,
101                d_mountpoint(dchild), dchild->d_flags & DCACHE_LUSTRE_INVALID,
102                atomic_read(&dchild->d_count));
103
104          /* mountpoint is always valid */
105         if (d_mountpoint(dchild))
106                 RETURN(0);
107
108         if (dchild->d_flags & DCACHE_LUSTRE_INVALID)
109                 RETURN(1);
110
111         RETURN(0);
112 }
113
114 static inline int return_if_equal(struct ldlm_lock *lock, void *data)
115 {
116         if (lock->l_flags & LDLM_FL_CANCELING)
117                 return LDLM_ITER_CONTINUE;
118         return LDLM_ITER_STOP;
119 }
120
121 /* find any ldlm lock of the inode in mdc and lov
122  * return 0    not find
123  *        1    find one
124  *      < 0    error */
125 int find_cbdata(struct inode *inode)
126 {
127         struct ll_fid fid;
128         struct ll_inode_info *lli = ll_i2info(inode);
129         struct ll_sb_info *sbi = ll_i2sbi(inode);
130         int rc = 0;
131         ENTRY;
132
133         LASSERT(inode);
134         ll_inode2fid(&fid, inode);
135         rc = mdc_find_cbdata(sbi->ll_mdc_exp, &fid, return_if_equal, NULL);
136         if (rc != 0)
137                 RETURN(rc);
138
139         if (lli->lli_smd)
140                 rc = obd_find_cbdata(sbi->ll_osc_exp, lli->lli_smd,
141                                      return_if_equal, NULL);
142
143         RETURN(rc);
144 }
145
146 /* should NOT be called with the dcache lock, see fs/dcache.c */
147 static int ll_ddelete(struct dentry *de)
148 {
149         ENTRY;
150         LASSERT(de);
151
152         CDEBUG(D_DENTRY, "%s dentry %.*s (%p, parent %p, inode %p) %s%s\n",
153                (de->d_flags & DCACHE_LUSTRE_INVALID ? "hiden" : "keeping"),
154                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
155                d_unhashed(de) ? "" : "hashed,",
156                list_empty(&de->d_subdirs) ? "" : "subdirs");
157
158         /* if not ldlm lock for this inode, set i_nlink to 0 so that
159          * this inode can be recycled later b=20433 */
160         LASSERT(atomic_read(&de->d_count) == 0);
161         if (de->d_inode && !find_cbdata(de->d_inode))
162                 de->d_inode->i_nlink = 0;
163
164         if (de->d_flags & DCACHE_LUSTRE_INVALID)
165                 RETURN(1);
166
167         RETURN(0);
168 }
169
170 void ll_set_dd(struct dentry *de)
171 {
172         ENTRY;
173         LASSERT(de != NULL);
174
175         CDEBUG(D_DENTRY, "ldd on dentry %.*s (%p) parent %p inode %p refc %d\n",
176                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
177                atomic_read(&de->d_count));
178
179         if (de->d_fsdata == NULL) {
180                 struct ll_dentry_data *lld;
181
182                 OBD_ALLOC_PTR(lld);
183                 if (likely(lld != NULL)) {
184                         lock_dentry(de);
185                         if (likely(de->d_fsdata == NULL))
186                                 de->d_fsdata = lld;
187                         else
188                                 OBD_FREE_PTR(lld);
189                         unlock_dentry(de);
190                 }
191         }
192
193         EXIT;
194 }
195
196 void ll_intent_drop_lock(struct lookup_intent *it)
197 {
198         struct lustre_handle *handle;
199
200         if (it->it_op && it->d.lustre.it_lock_mode) {
201                 handle = (struct lustre_handle *)&it->d.lustre.it_lock_handle;
202                 CDEBUG(D_DLMTRACE, "releasing lock with cookie "LPX64
203                        " from it %p\n", handle->cookie, it);
204                 ldlm_lock_decref(handle, it->d.lustre.it_lock_mode);
205
206                 /* bug 494: intent_release may be called multiple times, from
207                  * this thread and we don't want to double-decref this lock */
208                 it->d.lustre.it_lock_mode = 0;
209         }
210 }
211
212 void ll_intent_release(struct lookup_intent *it)
213 {
214         ENTRY;
215
216         ll_intent_drop_lock(it);
217 #ifdef HAVE_VFS_INTENT_PATCHES
218         it->it_magic = 0;
219         it->it_op_release = 0;
220 #endif
221         /* We are still holding extra reference on a request, need to free it */
222         if (it_disposition(it, DISP_ENQ_OPEN_REF)) /* open req for llfile_open*/
223                 ptlrpc_req_finished(it->d.lustre.it_data);
224         if (it_disposition(it, DISP_ENQ_CREATE_REF)) /* create rec */
225                 ptlrpc_req_finished(it->d.lustre.it_data);
226         if (it_disposition(it, DISP_ENQ_COMPLETE)) /* saved req from revalidate
227                                                     * to lookup */
228                 ptlrpc_req_finished(it->d.lustre.it_data);
229
230         it->d.lustre.it_disposition = 0;
231         it->d.lustre.it_data = NULL;
232         EXIT;
233 }
234
235 /* Drop dentry if it is not used already, unhash otherwise.
236    Should be called with dcache lock held!
237    Returns: 1 if dentry was dropped, 0 if unhashed. */
238 int ll_drop_dentry(struct dentry *dentry)
239 {
240         lock_dentry(dentry);
241         if (atomic_read(&dentry->d_count) == 0) {
242                 CDEBUG(D_DENTRY, "deleting dentry %.*s (%p) parent %p "
243                        "inode %p\n", dentry->d_name.len,
244                        dentry->d_name.name, dentry, dentry->d_parent,
245                        dentry->d_inode);
246                 dget_locked(dentry);
247                 __d_drop(dentry);
248                 unlock_dentry(dentry);
249                 spin_unlock(&dcache_lock);
250                 spin_unlock(&ll_lookup_lock);
251                 dput(dentry);
252                 spin_lock(&ll_lookup_lock);
253                 spin_lock(&dcache_lock);
254                 return 1;
255         }
256         /* disconected dentry can not be find without lookup, because we
257          * not need his to unhash or mark invalid. */
258         if (dentry->d_flags & DCACHE_DISCONNECTED) {
259                 unlock_dentry(dentry);
260                 RETURN (0);
261         }
262
263 #ifdef DCACHE_LUSTRE_INVALID
264         if (!(dentry->d_flags & DCACHE_LUSTRE_INVALID)) {
265 #else
266         if (!d_unhashed(dentry)) {
267 #endif
268                 CDEBUG(D_DENTRY, "unhashing dentry %.*s (%p) parent %p "
269                        "inode %p refc %d\n", dentry->d_name.len,
270                        dentry->d_name.name, dentry, dentry->d_parent,
271                        dentry->d_inode, atomic_read(&dentry->d_count));
272                 /* actually we don't unhash the dentry, rather just
273                  * mark it inaccessible for to __d_lookup(). otherwise
274                  * sys_getcwd() could return -ENOENT -bzzz */
275 #ifdef DCACHE_LUSTRE_INVALID
276                 dentry->d_flags |= DCACHE_LUSTRE_INVALID;
277 #else
278                 if (!dentry->d_inode || !S_ISDIR(dentry->d_inode->i_mode))
279                         __d_drop(dentry);
280 #endif
281
282         }
283         unlock_dentry(dentry);
284         return 0;
285 }
286
287 void ll_unhash_aliases(struct inode *inode)
288 {
289         struct list_head *tmp, *head;
290         ENTRY;
291
292         if (inode == NULL) {
293                 CERROR("unexpected NULL inode, tell phil\n");
294                 return;
295         }
296
297         CDEBUG(D_INODE, "marking dentries for ino %lu/%u(%p) invalid\n",
298                inode->i_ino, inode->i_generation, inode);
299
300         head = &inode->i_dentry;
301         spin_lock(&ll_lookup_lock);
302         spin_lock(&dcache_lock);
303 restart:
304         tmp = head;
305         while ((tmp = tmp->next) != head) {
306                 struct dentry *dentry = list_entry(tmp, struct dentry, d_alias);
307
308                 if (dentry->d_name.len == 1 && dentry->d_name.name[0] == '/') {
309                         CERROR("called on root (?) dentry=%p, inode=%p "
310                                "ino=%lu\n", dentry, inode, inode->i_ino);
311                         lustre_dump_dentry(dentry, 1);
312                         libcfs_debug_dumpstack(NULL);
313                 }
314
315                 if (ll_drop_dentry(dentry))
316                           goto restart;
317         }
318         spin_unlock(&dcache_lock);
319         spin_unlock(&ll_lookup_lock);
320
321         EXIT;
322 }
323
324 int revalidate_it_finish(struct ptlrpc_request *request, int offset,
325                          struct lookup_intent *it, struct dentry *de)
326 {
327         int rc = 0;
328         ENTRY;
329
330         if (!request)
331                 RETURN(0);
332
333         if (it_disposition(it, DISP_LOOKUP_NEG))
334                 RETURN(-ENOENT);
335
336         rc = ll_prep_inode(ll_i2sbi(de->d_inode)->ll_osc_exp, &de->d_inode,
337                            request, offset, NULL);
338
339         RETURN(rc);
340 }
341
342 void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry)
343 {
344         LASSERT(it != NULL);
345         LASSERT(dentry != NULL);
346
347         if (it->d.lustre.it_lock_mode && dentry->d_inode != NULL) {
348                 struct inode *inode = dentry->d_inode;
349                 CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%u)\n",
350                        inode, inode->i_ino, inode->i_generation);
351                 mdc_set_lock_data(&it->d.lustre.it_lock_handle, inode, NULL);
352         }
353
354         /* drop lookup or getattr locks immediately */
355         if (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR) {
356                 /* on 2.6 there are situation when several lookups and
357                  * revalidations may be requested during single operation.
358                  * therefore, we don't release intent here -bzzz */
359                 ll_intent_drop_lock(it);
360         }
361 }
362
363 void ll_frob_intent(struct lookup_intent **itp, struct lookup_intent *deft)
364 {
365         struct lookup_intent *it = *itp;
366 #ifdef HAVE_VFS_INTENT_PATCHES
367         if (it) {
368                 LASSERTF(it->it_magic == INTENT_MAGIC, "bad intent magic: %x\n",
369                          it->it_magic);
370         }
371 #endif
372
373         if (!it || it->it_op == IT_GETXATTR)
374                 it = *itp = deft;
375
376 #ifdef HAVE_VFS_INTENT_PATCHES
377         it->it_op_release = ll_intent_release;
378 #endif
379 }
380
381 int ll_revalidate_it(struct dentry *de, int lookup_flags,
382                      struct lookup_intent *it)
383 {
384         struct mdc_op_data op_data = { { 0 } };
385         struct ptlrpc_request *req = NULL;
386         struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
387         struct obd_export *exp;
388         struct inode *parent = de->d_parent->d_inode;
389         int first = -1, rc;
390
391         ENTRY;
392         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,intent=%s\n", de->d_name.name,
393                LL_IT2STR(it));
394
395         if (de->d_inode == NULL) {
396                 /* We can only use negative dentries if this is stat or lookup,
397                    for opens and stuff we do need to query server. */
398                 /* If there is IT_CREAT in intent op set, then we must throw
399                    away this negative dentry and actually do the request to
400                    kernel to create whatever needs to be created (if possible)*/
401                 if (it && (it->it_op & IT_CREAT))
402                         RETURN(0);
403
404 #ifdef DCACHE_LUSTRE_INVALID
405                 if (de->d_flags & DCACHE_LUSTRE_INVALID)
406                         RETURN(0);
407 #endif
408
409                 rc = ll_have_md_lock(parent, MDS_INODELOCK_UPDATE, LCK_MINMODE);
410                 GOTO(out_sa, rc);
411         }
412
413         exp = ll_i2mdcexp(de->d_inode);
414
415         /* Never execute intents for mount points.
416          * Attributes will be fixed up in ll_inode_revalidate_it */
417         if (d_mountpoint(de))
418                 GOTO(out_sa, rc = 1);
419
420         /*need to get attributes in case it got changed from other client*/
421         if (de == de->d_sb->s_root)  {
422                 rc = __ll_inode_revalidate_it(de, it, MDS_INODELOCK_LOOKUP);
423                 if (rc == 0)
424                         rc = 1;
425                 GOTO(out_sa, rc);
426         }
427
428         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_REVALIDATE_PAUSE, 5);
429         ll_frob_intent(&it, &lookup_it);
430         LASSERT(it);
431
432         if (it->it_op == IT_LOOKUP && !(de->d_flags & DCACHE_LUSTRE_INVALID))
433                 GOTO(out_sa, rc = 1);
434
435         ll_prepare_mdc_op_data(&op_data, parent, de->d_inode,
436                                de->d_name.name, de->d_name.len, 0, NULL);
437
438         if ((it->it_op == IT_OPEN) && de->d_inode) {
439                 struct inode *inode = de->d_inode;
440                 struct ll_inode_info *lli = ll_i2info(inode);
441                 struct obd_client_handle **och_p;
442                 __u64 *och_usecount;
443                 /* We used to check for MDS_INODELOCK_OPEN here, but in fact
444                  * just having LOOKUP lock is enough to justify inode is the
445                  * same. And if inode is the same and we have suitable
446                  * openhandle, then there is no point in doing another OPEN RPC
447                  * just to throw away newly received openhandle.
448                  * There are no security implications too, if file owner or
449                  * access mode is change, LOOKUP lock is revoked */
450
451                 if (it->it_flags & FMODE_WRITE) {
452                         och_p = &lli->lli_mds_write_och;
453                         och_usecount = &lli->lli_open_fd_write_count;
454                 } else if (it->it_flags & FMODE_EXEC) {
455                         och_p = &lli->lli_mds_exec_och;
456                         och_usecount = &lli->lli_open_fd_exec_count;
457                 } else {
458                         och_p = &lli->lli_mds_read_och;
459                         och_usecount = &lli->lli_open_fd_read_count;
460                 }
461                 /* Check for the proper lock. */
462                 if (!ll_have_md_lock(inode, MDS_INODELOCK_LOOKUP, LCK_MINMODE))
463                         goto do_lock;
464                 down(&lli->lli_och_sem);
465                 if (*och_p) { /* Everything is open already, do nothing */
466                         /*(*och_usecount)++;  Do not let them steal our open
467                                               handle from under us */
468                         /* XXX The code above was my original idea, but in case
469                            we have the handle, but we cannot use it due to later
470                            checks (e.g. O_CREAT|O_EXCL flags set), nobody
471                            would decrement counter increased here. So we just
472                            hope the lock won't be invalidated in between. But
473                            if it would be, we'll reopen the open request to
474                            MDS later during file open path */
475                         up(&lli->lli_och_sem);
476                         RETURN(1);
477                 } else {
478                         up(&lli->lli_och_sem);
479                 }
480         }
481
482         if (it->it_op == IT_GETATTR)
483                 first = ll_statahead_enter(parent, &de, 0);
484
485 do_lock:
486         it->it_create_mode &= ~current->fs->umask;
487         it->it_create_mode |= M_CHECK_STALE;
488         rc = mdc_intent_lock(exp, &op_data, NULL, 0, it, lookup_flags,
489                              &req, ll_mdc_blocking_ast, 0);
490         it->it_create_mode &= ~M_CHECK_STALE;
491         if (it->it_op == IT_GETATTR && !first)
492                 /* If there are too many locks on client-side, then some
493                  * locks taken by statahead maybe dropped automatically
494                  * before the real "revalidate" using them. */
495                 ll_statahead_exit(parent, de, req == NULL ? rc : 0);
496         else if (first == -EEXIST)
497                 ll_statahead_mark(parent, de);
498
499         /* If req is NULL, then mdc_intent_lock only tried to do a lock match;
500          * if all was well, it will return 1 if it found locks, 0 otherwise. */
501         if (req == NULL && rc >= 0) {
502                 if (!rc)
503                         goto do_lookup;
504                 GOTO(out, rc);
505         }
506
507         if (rc < 0) {
508                 if (rc != -ESTALE) {
509                         CDEBUG(D_INFO, "ll_intent_lock: rc %d : it->it_status "
510                                "%d\n", rc, it->d.lustre.it_status);
511                 }
512                 GOTO(out, rc = 0);
513         }
514
515 revalidate_finish:
516         rc = revalidate_it_finish(req, DLM_REPLY_REC_OFF, it, de);
517         if (rc != 0) {
518                 /* we are going release the intent, so clear DISP_ENQ_COMPLETE
519                  * to prevent a double free of the request */
520                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
521                 ll_intent_release(it);
522                 GOTO(out, rc = 0);
523         }
524         if ((it->it_op & IT_OPEN) && de->d_inode &&
525             !S_ISREG(de->d_inode->i_mode) &&
526             !S_ISDIR(de->d_inode->i_mode)) {
527                 ll_release_openhandle(de, it);
528         }
529         rc = 1;
530
531         /* unfortunately ll_intent_lock may cause a callback and revoke our
532          * dentry */
533         spin_lock(&ll_lookup_lock);
534         spin_lock(&dcache_lock);
535         lock_dentry(de);
536         __d_drop(de);
537         unlock_dentry(de);
538         d_rehash_cond(de, 0);
539         spin_unlock(&dcache_lock);
540         spin_unlock(&ll_lookup_lock);
541
542  out:
543         /* We do not free request as it may be reused during following lookup
544          * (see comment in mdc/mdc_locks.c::mdc_intent_lock()), request will
545          * be freed in ll_lookup_it or in ll_intent_release. But if
546          * request was not completed, we need to free it. (bug 5154, 9903) */
547         if (req != NULL && !it_disposition(it, DISP_ENQ_COMPLETE))
548                 ptlrpc_req_finished(req);
549         if (rc == 0) {
550 #ifdef DCACHE_LUSTRE_INVALID
551                 ll_unhash_aliases(de->d_inode);
552                 /* done in ll_unhash_aliases()
553                 dentry->d_flags |= DCACHE_LUSTRE_INVALID; */
554 #else
555                 /* We do not want d_invalidate to kill all child dentries too */
556                 d_drop(de);
557 #endif
558         } else {
559                 CDEBUG(D_DENTRY, "revalidated dentry %.*s (%p) parent %p "
560                                "inode %p refc %d\n", de->d_name.len,
561                                de->d_name.name, de, de->d_parent, de->d_inode,
562                                atomic_read(&de->d_count));
563                 ll_lookup_finish_locks(it, de);
564 #ifdef DCACHE_LUSTRE_INVALID
565                 lock_dentry(de);
566                 de->d_flags &= ~DCACHE_LUSTRE_INVALID;
567                 unlock_dentry(de);
568 #endif
569         }
570         RETURN(rc);
571 /* This part is here to combat evil-evil race in real_lookup on 2.6 kernels.
572  * The race details are: We enter do_lookup() looking for some name,
573  * there is nothing in dcache for this name yet and d_lookup() returns NULL.
574  * We proceed to real_lookup(), and while we do this, another process does
575  * open on the same file we looking up (most simple reproducer), open succeeds
576  * and the dentry is added. Now back to us. In real_lookup() we do d_lookup()
577  * again and suddenly find the dentry, so we call d_revalidate on it, but there
578  * is no lock, so without this code we would return 0, but unpatched
579  * real_lookup just returns -ENOENT in such a case instead of retrying the
580  * lookup. Once this is dealt with in real_lookup(), all of this ugly mess
581  * can go and we can just check locks in ->d_revalidate without doing any
582  * RPCs ever. */
583 do_lookup:
584         if (it != &lookup_it) {
585                 ll_lookup_finish_locks(it, de);
586                 it = &lookup_it;
587         }
588         /*do real lookup here */
589         ll_prepare_mdc_op_data(&op_data, parent, NULL,
590                                de->d_name.name, de->d_name.len, 0, NULL);
591         rc = mdc_intent_lock(exp, &op_data, NULL, 0,  it, 0, &req,
592                              ll_mdc_blocking_ast, 0);
593         if (rc >= 0) {
594                 struct mds_body *mds_body = lustre_msg_buf(req->rq_repmsg,
595                                                            DLM_REPLY_REC_OFF,
596                                                            sizeof(*mds_body));
597                 struct ll_fid fid = { 0 };
598
599                 if (de->d_inode)
600                          ll_inode2fid(&fid, de->d_inode);
601
602                 /* see if we got same inode, if not - return error */
603                 if(!memcmp(&fid, &mds_body->fid1, sizeof(struct ll_fid)))
604                         goto revalidate_finish;
605                 /* we are going release the intent, so clear DISP_ENQ_COMPLETE
606                  * to prevent a double free of the request */
607                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
608                 ll_intent_release(it);
609         }
610         GOTO(out, rc = 0);
611
612 out_sa:
613         /*
614          * For rc == 1 case, should not return directly to prevent losing
615          * statahead windows; for rc == 0 case, the "lookup" will be done later.
616          */
617         if (it && it->it_op == IT_GETATTR && rc == 1) {
618                 first = ll_statahead_enter(parent, &de, 0);
619                 if (!first)
620                         ll_statahead_exit(parent, de, 1);
621                 else if (first == -EEXIST)
622                         ll_statahead_mark(parent, de);
623         }
624
625         return rc;
626 }
627
628 /*static*/ void ll_pin(struct dentry *de, struct vfsmount *mnt, int flag)
629 {
630         struct inode *inode= de->d_inode;
631         struct ll_sb_info *sbi = ll_i2sbi(inode);
632         struct ll_dentry_data *ldd = ll_d2d(de);
633         struct obd_client_handle *handle;
634         int rc = 0;
635         ENTRY;
636         LASSERT(ldd);
637
638         lock_kernel();
639         /* Strictly speaking this introduces an additional race: the
640          * increments should wait until the rpc has returned.
641          * However, given that at present the function is void, this
642          * issue is moot. */
643         if (flag == 1 && (++ldd->lld_mnt_count) > 1) {
644                 unlock_kernel();
645                 EXIT;
646                 return;
647         }
648
649         if (flag == 0 && (++ldd->lld_cwd_count) > 1) {
650                 unlock_kernel();
651                 EXIT;
652                 return;
653         }
654         unlock_kernel();
655
656         handle = (flag) ? &ldd->lld_mnt_och : &ldd->lld_cwd_och;
657         rc = obd_pin(sbi->ll_mdc_exp, ll_inode_ll_fid(inode),
658                      handle, flag);
659
660         if (rc) {
661                 lock_kernel();
662                 memset(handle, 0, sizeof(*handle));
663                 if (flag == 0)
664                         ldd->lld_cwd_count--;
665                 else
666                         ldd->lld_mnt_count--;
667                 unlock_kernel();
668         }
669
670         EXIT;
671         return;
672 }
673
674 /*static*/ void ll_unpin(struct dentry *de, struct vfsmount *mnt, int flag)
675 {
676         struct ll_sb_info *sbi = ll_i2sbi(de->d_inode);
677         struct ll_dentry_data *ldd = ll_d2d(de);
678         struct obd_client_handle handle;
679         int count, rc = 0;
680         ENTRY;
681         LASSERT(ldd);
682
683         lock_kernel();
684         /* Strictly speaking this introduces an additional race: the
685          * increments should wait until the rpc has returned.
686          * However, given that at present the function is void, this
687          * issue is moot. */
688         handle = (flag) ? ldd->lld_mnt_och : ldd->lld_cwd_och;
689         if (handle.och_magic != OBD_CLIENT_HANDLE_MAGIC) {
690                 /* the "pin" failed */
691                 unlock_kernel();
692                 EXIT;
693                 return;
694         }
695
696         if (flag)
697                 count = --ldd->lld_mnt_count;
698         else
699                 count = --ldd->lld_cwd_count;
700         unlock_kernel();
701
702         if (count != 0) {
703                 EXIT;
704                 return;
705         }
706
707         rc = obd_unpin(sbi->ll_mdc_exp, &handle, flag);
708         EXIT;
709         return;
710 }
711
712 #ifdef HAVE_VFS_INTENT_PATCHES
713 int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
714 {
715         int rc;
716         ENTRY;
717
718         if (nd && nd->flags & LOOKUP_LAST && !(nd->flags & LOOKUP_LINK_NOTLAST))
719                 rc = ll_revalidate_it(dentry, nd->flags, &nd->intent);
720         else
721                 rc = ll_revalidate_it(dentry, 0, NULL);
722
723         RETURN(rc);
724 }
725 #else
726 int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
727 {
728         int rc;
729         ENTRY;
730
731         if (nd && !(nd->flags & (LOOKUP_CONTINUE|LOOKUP_PARENT))) {
732                 struct lookup_intent *it;
733                 it = ll_convert_intent(&nd->intent.open, nd->flags);
734                 if (IS_ERR(it))
735                         RETURN(0);
736                 if (it->it_op == (IT_OPEN|IT_CREAT))
737                         if (nd->intent.open.flags & O_EXCL) {
738                                 CDEBUG(D_VFSTRACE,
739                                        "create O_EXCL, returning 0\n");
740                                 rc = 0;
741                                 goto out_it;
742                         }
743
744                 rc = ll_revalidate_it(dentry, nd->flags, it);
745
746                 if (rc && (nd->flags & LOOKUP_OPEN) &&
747                     it_disposition(it, DISP_OPEN_OPEN)) {/*Open*/
748 #ifdef HAVE_FILE_IN_STRUCT_INTENT
749 // XXX Code duplication with ll_lookup_nd
750                         if (S_ISFIFO(dentry->d_inode->i_mode)) {
751                                 // We cannot call open here as it would
752                                 // deadlock.
753                                 ptlrpc_req_finished(
754                                                (struct ptlrpc_request *)
755                                                   it->d.lustre.it_data);
756                         } else {
757 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,17))
758 /* 2.6.1[456] have a bug in open_namei() that forgets to check
759  * nd->intent.open.file for error, so we need to return it as lookup's result
760  * instead */
761                                 struct file *filp;
762
763                                 nd->intent.open.file->private_data = it;
764                                 filp = lookup_instantiate_filp(nd, dentry,NULL);
765                                 if (IS_ERR(filp)) {
766                                         rc = PTR_ERR(filp);
767                                 }
768 #else
769                                 nd->intent.open.file->private_data = it;
770                                 (void)lookup_instantiate_filp(nd, dentry,NULL);
771 #endif
772                         }
773 #else
774                         ll_release_openhandle(dentry, it);
775 #endif /* HAVE_FILE_IN_STRUCT_INTENT */
776                 }
777                 if (!rc && (nd->flags & LOOKUP_CREATE) &&
778                     it_disposition(it, DISP_OPEN_CREATE)) {
779                         /* We created something but we may only return
780                          * negative dentry here, so save request in dentry,
781                          * if lookup will be called later on, it will
782                          * pick the request, otherwise it would be freed
783                          * with dentry */
784                         ll_d2d(dentry)->lld_it = it;
785                         it = NULL; /* avoid freeing */
786                 }
787
788 out_it:
789                 if (it) {
790                         ll_intent_release(it);
791                         OBD_FREE(it, sizeof(*it));
792                 }
793         } else {
794                 rc = ll_revalidate_it(dentry, 0, NULL);
795         }
796
797         RETURN(rc);
798 }
799 #endif
800
801 void ll_d_iput(struct dentry *de, struct inode *inode)
802 {
803         LASSERT(inode);
804         if (inode && !find_cbdata(inode))
805                 inode->i_nlink = 0;
806         iput(inode);
807 }
808
809 struct dentry_operations ll_d_ops = {
810         .d_revalidate = ll_revalidate_nd,
811         .d_release = ll_release,
812         .d_delete = ll_ddelete,
813         .d_iput   = ll_d_iput,
814 #ifdef DCACHE_LUSTRE_INVALID
815         .d_compare = ll_dcompare,
816 #endif
817 #if 0
818         .d_pin = ll_pin,
819         .d_unpin = ll_unpin,
820 #endif
821 };