Whamcloud - gitweb
Branch b1_6
[fs/lustre-release.git] / lustre / llite / dcache.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #include <linux/fs.h>
23 #include <linux/sched.h>
24 #include <linux/smp_lock.h>
25 #include <linux/quotaops.h>
26
27 #define DEBUG_SUBSYSTEM S_LLITE
28
29 #include <obd_support.h>
30 #include <lustre_lite.h>
31 #include <lustre/lustre_idl.h>
32 #include <lustre_dlm.h>
33 #include <linux/lustre_version.h>
34
35 #include "llite_internal.h"
36
37 /* should NOT be called with the dcache lock, see fs/dcache.c */
38 static void ll_release(struct dentry *de)
39 {
40         struct ll_dentry_data *lld;
41         ENTRY;
42         LASSERT(de != NULL);
43         lld = ll_d2d(de);
44         if (lld == NULL) { /* NFS copies the de->d_op methods (bug 4655) */
45                 EXIT;
46                 return;
47         }
48 #ifndef HAVE_VFS_INTENT_PATCHES
49         if (lld->lld_it) {
50                 ll_intent_release(lld->lld_it);
51                 OBD_FREE(lld->lld_it, sizeof(*lld->lld_it));
52         }
53 #endif
54         LASSERT(lld->lld_cwd_count == 0);
55         LASSERT(lld->lld_mnt_count == 0);
56         OBD_FREE(de->d_fsdata, sizeof(*lld));
57
58         EXIT;
59 }
60
61 #ifdef DCACHE_LUSTRE_INVALID
62 /* Compare if two dentries are the same.  Don't match if the existing dentry
63  * is marked DCACHE_LUSTRE_INVALID.  Returns 1 if different, 0 if the same.
64  *
65  * This avoids a race where ll_lookup_it() instantiates a dentry, but we get
66  * an AST before calling d_revalidate_it().  The dentry still exists (marked
67  * INVALID) so d_lookup() matches it, but we have no lock on it (so
68  * lock_match() fails) and we spin around real_lookup(). */
69 int ll_dcompare(struct dentry *parent, struct qstr *d_name, struct qstr *name)
70 {
71         struct dentry *dchild;
72         ENTRY;
73
74         if (d_name->len != name->len)
75                 RETURN(1);
76
77         if (memcmp(d_name->name, name->name, name->len))
78                 RETURN(1);
79
80         /* XXX: d_name must be in-dentry structure */
81         dchild = container_of(d_name, struct dentry, d_name); /* ugh */
82         if (dchild->d_flags & DCACHE_LUSTRE_INVALID) {
83                 CDEBUG(D_DENTRY,"INVALID dentry %p not matched, was bug 3784\n",
84                        dchild);
85                 RETURN(1);
86         }
87
88         RETURN(0);
89 }
90 #endif
91
92 /* should NOT be called with the dcache lock, see fs/dcache.c */
93 static int ll_ddelete(struct dentry *de)
94 {
95         ENTRY;
96         LASSERT(de);
97 #ifndef DCACHE_LUSTRE_INVALID
98 #define DCACHE_LUSTRE_INVALID 0
99 #endif
100
101         CDEBUG(D_DENTRY, "%s dentry %.*s (%p, parent %p, inode %p) %s%s\n",
102                (de->d_flags & DCACHE_LUSTRE_INVALID ? "deleting" : "keeping"),
103                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
104                d_unhashed(de) ? "" : "hashed,",
105                list_empty(&de->d_subdirs) ? "" : "subdirs");
106 #if DCACHE_LUSTRE_INVALID == 0
107 #undef DCACHE_LUSTRE_INVALID
108 #endif
109
110         RETURN(0);
111 }
112
113 void ll_set_dd(struct dentry *de)
114 {
115         ENTRY;
116         LASSERT(de != NULL);
117
118         CDEBUG(D_DENTRY, "ldd on dentry %.*s (%p) parent %p inode %p refc %d\n",
119                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
120                atomic_read(&de->d_count));
121         lock_kernel();
122         if (de->d_fsdata == NULL) {
123                 OBD_ALLOC(de->d_fsdata, sizeof(struct ll_dentry_data));
124         }
125         unlock_kernel();
126
127         EXIT;
128 }
129
130 void ll_intent_drop_lock(struct lookup_intent *it)
131 {
132         struct lustre_handle *handle;
133
134         if (it->it_op && it->d.lustre.it_lock_mode) {
135                 handle = (struct lustre_handle *)&it->d.lustre.it_lock_handle;
136                 CDEBUG(D_DLMTRACE, "releasing lock with cookie "LPX64
137                        " from it %p\n", handle->cookie, it);
138                 ldlm_lock_decref(handle, it->d.lustre.it_lock_mode);
139
140                 /* bug 494: intent_release may be called multiple times, from
141                  * this thread and we don't want to double-decref this lock */
142                 it->d.lustre.it_lock_mode = 0;
143         }
144 }
145
146 void ll_intent_release(struct lookup_intent *it)
147 {
148         ENTRY;
149
150         ll_intent_drop_lock(it);
151 #ifdef HAVE_VFS_INTENT_PATCHES
152         it->it_magic = 0;
153         it->it_op_release = 0;
154 #endif
155         /* We are still holding extra reference on a request, need to free it */
156         if (it_disposition(it, DISP_ENQ_OPEN_REF)) /* open req for llfile_open*/
157                 ptlrpc_req_finished(it->d.lustre.it_data);
158         if (it_disposition(it, DISP_ENQ_CREATE_REF)) /* create rec */
159                 ptlrpc_req_finished(it->d.lustre.it_data);
160         if (it_disposition(it, DISP_ENQ_COMPLETE)) /* saved req from revalidate
161                                                     * to lookup */
162                 ptlrpc_req_finished(it->d.lustre.it_data);
163
164         it->d.lustre.it_disposition = 0;
165         it->d.lustre.it_data = NULL;
166         EXIT;
167 }
168
169 /* Drop dentry if it is not used already, unhash otherwise.
170    Should be called with dcache lock held!
171    Returns: 1 if dentry was dropped, 0 if unhashed. */
172 int ll_drop_dentry(struct dentry *dentry)
173 {
174         lock_dentry(dentry);
175         if (atomic_read(&dentry->d_count) == 0) {
176                 CDEBUG(D_DENTRY, "deleting dentry %.*s (%p) parent %p "
177                        "inode %p\n", dentry->d_name.len,
178                        dentry->d_name.name, dentry, dentry->d_parent,
179                        dentry->d_inode);
180                 dget_locked(dentry);
181                 __d_drop(dentry);
182                 unlock_dentry(dentry);
183                 spin_unlock(&dcache_lock);
184                 dput(dentry);
185                 spin_lock(&dcache_lock);
186                 return 1;
187         }
188         /* disconected dentry can not be find without lookup, because we 
189          * not need his to unhash or mark invalid. */
190         if (dentry->d_flags & DCACHE_DISCONNECTED) {
191                 unlock_dentry(dentry);
192                 RETURN (0);
193         }
194
195 #ifdef DCACHE_LUSTRE_INVALID
196         if (!(dentry->d_flags & DCACHE_LUSTRE_INVALID)) {
197 #else
198         if (!d_unhashed(dentry)) {
199 #endif
200                 CDEBUG(D_DENTRY, "unhashing dentry %.*s (%p) parent %p "
201                        "inode %p refc %d\n", dentry->d_name.len,
202                        dentry->d_name.name, dentry, dentry->d_parent,
203                        dentry->d_inode, atomic_read(&dentry->d_count));
204                 /* actually we don't unhash the dentry, rather just
205                  * mark it inaccessible for to __d_lookup(). otherwise
206                  * sys_getcwd() could return -ENOENT -bzzz */
207 #ifdef DCACHE_LUSTRE_INVALID
208                 dentry->d_flags |= DCACHE_LUSTRE_INVALID;
209 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
210                 __d_drop(dentry);
211                 if (dentry->d_inode) {
212                         /* Put positive dentries to orphan list */
213                         list_add(&dentry->d_hash,
214                                  &ll_i2sbi(dentry->d_inode)->ll_orphan_dentry_list);
215                 }
216 #endif
217 #else
218                 if (!dentry->d_inode || !S_ISDIR(dentry->d_inode->i_mode))
219                         __d_drop(dentry);
220 #endif
221
222         }
223         unlock_dentry(dentry);
224         return 0;
225 }
226
227 void ll_unhash_aliases(struct inode *inode)
228 {
229         struct list_head *tmp, *head;
230         ENTRY;
231
232         if (inode == NULL) {
233                 CERROR("unexpected NULL inode, tell phil\n");
234                 return;
235         }
236
237         CDEBUG(D_INODE, "marking dentries for ino %lu/%u(%p) invalid\n",
238                inode->i_ino, inode->i_generation, inode);
239
240         head = &inode->i_dentry;
241         spin_lock(&dcache_lock);
242 restart:
243         tmp = head;
244         while ((tmp = tmp->next) != head) {
245                 struct dentry *dentry = list_entry(tmp, struct dentry, d_alias);
246
247                 if (dentry->d_name.len == 1 && dentry->d_name.name[0] == '/') {
248                         CERROR("called on root (?) dentry=%p, inode=%p "
249                                "ino=%lu\n", dentry, inode, inode->i_ino);
250                         lustre_dump_dentry(dentry, 1);
251                         libcfs_debug_dumpstack(NULL);
252                 } else if (d_mountpoint(dentry)) {
253                         /* For mountpoints we skip removal of the dentry
254                            which happens solely because we have a lock on it
255                            obtained when this dentry was not a mountpoint yet */
256                         CDEBUG(D_DENTRY, "Skippind mountpoint dentry removal "
257                                          "%.*s (%p) parent %p\n",
258                                           dentry->d_name.len,
259                                           dentry->d_name.name,
260                                           dentry, dentry->d_parent);
261
262                         continue;
263                 }
264
265                 if (ll_drop_dentry(dentry))
266                           goto restart;
267         }
268         spin_unlock(&dcache_lock);
269         EXIT;
270 }
271
272 int revalidate_it_finish(struct ptlrpc_request *request, int offset,
273                          struct lookup_intent *it, struct dentry *de)
274 {
275         int rc = 0;
276         ENTRY;
277
278         if (!request)
279                 RETURN(0);
280
281         if (it_disposition(it, DISP_LOOKUP_NEG))
282                 RETURN(-ENOENT);
283
284         rc = ll_prep_inode(ll_i2sbi(de->d_inode)->ll_osc_exp, &de->d_inode,
285                            request, offset, NULL);
286
287         RETURN(rc);
288 }
289
290 void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry)
291 {
292         LASSERT(it != NULL);
293         LASSERT(dentry != NULL);
294
295         if (it->d.lustre.it_lock_mode && dentry->d_inode != NULL) {
296                 struct inode *inode = dentry->d_inode;
297                 CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%u)\n",
298                        inode, inode->i_ino, inode->i_generation);
299                 mdc_set_lock_data(&it->d.lustre.it_lock_handle, inode);
300         }
301
302         /* drop lookup or getattr locks immediately */
303         if (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR) {
304 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
305                 /* on 2.6 there are situation when several lookups and
306                  * revalidations may be requested during single operation.
307                  * therefore, we don't release intent here -bzzz */
308                 ll_intent_drop_lock(it);
309 #else
310                 ll_intent_release(it);
311 #endif
312         }
313 }
314
315 void ll_frob_intent(struct lookup_intent **itp, struct lookup_intent *deft)
316 {
317         struct lookup_intent *it = *itp;
318 #if defined(HAVE_VFS_INTENT_PATCHES)&&(LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
319         if (it) {
320                 LASSERTF(it->it_magic == INTENT_MAGIC, "bad intent magic: %x\n",
321                          it->it_magic);
322         }
323 #endif
324
325         if (!it || it->it_op == IT_GETXATTR)
326                 it = *itp = deft;
327
328 #ifdef HAVE_VFS_INTENT_PATCHES
329         it->it_op_release = ll_intent_release;
330 #endif
331 }
332
333 int ll_revalidate_it(struct dentry *de, int lookup_flags,
334                      struct lookup_intent *it)
335 {
336         struct mdc_op_data op_data;
337         struct ptlrpc_request *req = NULL;
338         struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
339         struct obd_export *exp;
340         int first = 0, rc;
341
342         ENTRY;
343         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,intent=%s\n", de->d_name.name,
344                LL_IT2STR(it));
345
346         if (de->d_inode == NULL) {
347                 /* We can only use negative dentries if this is stat or lookup,
348                    for opens and stuff we do need to query server. */
349                 /* If there is IT_CREAT in intent op set, then we must throw
350                    away this negative dentry and actually do the request to
351                    kernel to create whatever needs to be created (if possible)*/
352                 if (it && (it->it_op & IT_CREAT))
353                         RETURN(0);
354
355 #ifdef DCACHE_LUSTRE_INVALID
356                 if (de->d_flags & DCACHE_LUSTRE_INVALID)
357                         RETURN(0);
358 #endif
359
360                 rc = ll_have_md_lock(de->d_parent->d_inode, 
361                                      MDS_INODELOCK_UPDATE);
362         
363                 RETURN(rc);
364         }
365
366         exp = ll_i2mdcexp(de->d_inode);
367
368         /* Never execute intents for mount points.
369          * Attributes will be fixed up in ll_inode_revalidate_it */
370         if (d_mountpoint(de))
371                 RETURN(1);
372
373         /* Root of the lustre tree. Always valid.
374          * Attributes will be fixed up in ll_inode_revalidate_it */
375         if (de == de->d_sb->s_root)
376                 RETURN(1);
377
378         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_REVALIDATE_PAUSE, 5);
379         ll_frob_intent(&it, &lookup_it);
380         LASSERT(it);
381
382         ll_prepare_mdc_op_data(&op_data, de->d_parent->d_inode, de->d_inode,
383                                de->d_name.name, de->d_name.len, 0, NULL);
384
385         if ((it->it_op == IT_OPEN) && de->d_inode) {
386                 struct inode *inode = de->d_inode;
387                 struct ll_inode_info *lli = ll_i2info(inode);
388                 struct obd_client_handle **och_p;
389                 __u64 *och_usecount;
390                 /* We used to check for MDS_INODELOCK_OPEN here, but in fact
391                  * just having LOOKUP lock is enough to justify inode is the
392                  * same. And if inode is the same and we have suitable
393                  * openhandle, then there is no point in doing another OPEN RPC
394                  * just to throw away newly received openhandle.
395                  * There are no security implications too, if file owner or
396                  * access mode is change, LOOKUP lock is revoked */
397
398                 if (it->it_flags & FMODE_WRITE) {
399                         och_p = &lli->lli_mds_write_och;
400                         och_usecount = &lli->lli_open_fd_write_count;
401                 } else if (it->it_flags & FMODE_EXEC) {
402                         och_p = &lli->lli_mds_exec_och;
403                         och_usecount = &lli->lli_open_fd_exec_count;
404                  } else {
405                         och_p = &lli->lli_mds_read_och;
406                         och_usecount = &lli->lli_open_fd_read_count;
407                 }
408                 /* Check for the proper lock. */
409                 if (!ll_have_md_lock(inode, MDS_INODELOCK_LOOKUP))
410                         goto do_lock;
411                 down(&lli->lli_och_sem);
412                 if (*och_p) { /* Everything is open already, do nothing */
413                         /*(*och_usecount)++;  Do not let them steal our open
414                                               handle from under us */
415                         /* XXX The code above was my original idea, but in case
416                            we have the handle, but we cannot use it due to later
417                            checks (e.g. O_CREAT|O_EXCL flags set), nobody
418                            would decrement counter increased here. So we just
419                            hope the lock won't be invalidated in between. But
420                            if it would be, we'll reopen the open request to
421                            MDS later during file open path */
422                         up(&lli->lli_och_sem);
423                         RETURN(1);
424                 } else {
425                         up(&lli->lli_och_sem);
426                 }
427         }
428
429         if (it->it_op == IT_GETATTR)
430                 first = ll_statahead_enter(de->d_parent->d_inode, &de, 0);
431
432 do_lock:
433         it->it_create_mode &= ~current->fs->umask;
434
435         rc = mdc_intent_lock(exp, &op_data, NULL, 0, it, lookup_flags,
436                              &req, ll_mdc_blocking_ast, 0);
437         if (it->it_op == IT_GETATTR && !first)
438                 ll_statahead_exit(de, rc);
439         /* If req is NULL, then mdc_intent_lock only tried to do a lock match;
440          * if all was well, it will return 1 if it found locks, 0 otherwise. */
441         if (req == NULL && rc >= 0) {
442                 if (!rc)
443                         goto do_lookup;
444                 GOTO(out, rc);
445         }
446
447         if (rc < 0) {
448                 if (rc != -ESTALE) {
449                         CDEBUG(D_INFO, "ll_intent_lock: rc %d : it->it_status "
450                                "%d\n", rc, it->d.lustre.it_status);
451                 }
452                 GOTO(out, rc = 0);
453         }
454
455 revalidate_finish:
456         rc = revalidate_it_finish(req, DLM_REPLY_REC_OFF, it, de);
457         if (rc != 0) {
458                 ll_intent_release(it);
459                 GOTO(out, rc = 0);
460         }
461         if ((it->it_op & IT_OPEN) && de->d_inode && 
462             !S_ISREG(de->d_inode->i_mode) && 
463             !S_ISDIR(de->d_inode->i_mode)) {
464                 ll_release_openhandle(de, it);
465         }
466         rc = 1;
467
468         /* unfortunately ll_intent_lock may cause a callback and revoke our
469          * dentry */
470         spin_lock(&dcache_lock);
471         lock_dentry(de);
472         __d_drop(de);
473         unlock_dentry(de);
474         d_rehash_cond(de, 0);
475         spin_unlock(&dcache_lock);
476
477  out:
478         /* We do not free request as it may be reused during following lookup
479          * (see comment in mdc/mdc_locks.c::mdc_intent_lock()), request will
480          * be freed in ll_lookup_it or in ll_intent_release. But if
481          * request was not completed, we need to free it. (bug 5154, 9903) */
482         if (req != NULL && !it_disposition(it, DISP_ENQ_COMPLETE))
483                 ptlrpc_req_finished(req);
484         if (rc == 0) {
485 #ifdef DCACHE_LUSTRE_INVALID
486                 ll_unhash_aliases(de->d_inode);
487                 /* done in ll_unhash_aliases()
488                 dentry->d_flags |= DCACHE_LUSTRE_INVALID; */
489 #else
490                 /* We do not want d_invalidate to kill all child dentries too */
491                 d_drop(de);
492 #endif
493         } else {
494                 CDEBUG(D_DENTRY, "revalidated dentry %.*s (%p) parent %p "
495                                "inode %p refc %d\n", de->d_name.len,
496                                de->d_name.name, de, de->d_parent, de->d_inode,
497                                atomic_read(&de->d_count));
498                 ll_lookup_finish_locks(it, de);
499 #ifdef DCACHE_LUSTRE_INVALID
500                 lock_dentry(de);
501                 de->d_flags &= ~DCACHE_LUSTRE_INVALID;
502                 unlock_dentry(de);
503 #endif
504         }
505         RETURN(rc);
506 /* This part is here to combat evil-evil race in real_lookup on 2.6 kernels.
507  * The race details are: We enter do_lookup() looking for some name,
508  * there is nothing in dcache for this name yet and d_lookup() returns NULL.
509  * We proceed to real_lookup(), and while we do this, another process does
510  * open on the same file we looking up (most simple reproducer), open succeeds
511  * and the dentry is added. Now back to us. In real_lookup() we do d_lookup()
512  * again and suddenly find the dentry, so we call d_revalidate on it, but there
513  * is no lock, so without this code we would return 0, but unpatched
514  * real_lookup just returns -ENOENT in such a case instead of retrying the
515  * lookup. Once this is dealt with in real_lookup(), all of this ugly mess
516  * can go and we can just check locks in ->d_revalidate without doing any
517  * RPCs ever. */
518 do_lookup:
519         if (it != &lookup_it) {
520                 ll_lookup_finish_locks(it, de);
521                 it = &lookup_it;
522         }
523         /*do real lookup here */
524         ll_prepare_mdc_op_data(&op_data, de->d_parent->d_inode, NULL,
525                                de->d_name.name, de->d_name.len, 0, NULL);
526         rc = mdc_intent_lock(exp, &op_data, NULL, 0,  it, 0, &req,
527                              ll_mdc_blocking_ast, 0);
528         if (rc >= 0) {
529                 struct mds_body *mds_body = lustre_msg_buf(req->rq_repmsg,
530                                                            DLM_REPLY_REC_OFF,
531                                                            sizeof(*mds_body));
532                 struct ll_fid fid = { 0 };
533
534                 if (de->d_inode)
535                          ll_inode2fid(&fid, de->d_inode);
536
537                 /* see if we got same inode, if not - return error */
538                 if(!memcmp(&fid, &mds_body->fid1, sizeof(struct ll_fid)))
539                         goto revalidate_finish;
540                 ll_intent_release(it);
541         }
542         GOTO(out, rc = 0);
543 }
544
545 /*static*/ void ll_pin(struct dentry *de, struct vfsmount *mnt, int flag)
546 {
547         struct inode *inode= de->d_inode;
548         struct ll_sb_info *sbi = ll_i2sbi(inode);
549         struct ll_dentry_data *ldd = ll_d2d(de);
550         struct obd_client_handle *handle;
551         int rc = 0;
552         ENTRY;
553         LASSERT(ldd);
554
555         lock_kernel();
556         /* Strictly speaking this introduces an additional race: the
557          * increments should wait until the rpc has returned.
558          * However, given that at present the function is void, this
559          * issue is moot. */
560         if (flag == 1 && (++ldd->lld_mnt_count) > 1) {
561                 unlock_kernel();
562                 EXIT;
563                 return;
564         }
565
566         if (flag == 0 && (++ldd->lld_cwd_count) > 1) {
567                 unlock_kernel();
568                 EXIT;
569                 return;
570         }
571         unlock_kernel();
572
573         handle = (flag) ? &ldd->lld_mnt_och : &ldd->lld_cwd_och;
574         rc = obd_pin(sbi->ll_mdc_exp, inode->i_ino, inode->i_generation,
575                      inode->i_mode & S_IFMT, handle, flag);
576
577         if (rc) {
578                 lock_kernel();
579                 memset(handle, 0, sizeof(*handle));
580                 if (flag == 0)
581                         ldd->lld_cwd_count--;
582                 else
583                         ldd->lld_mnt_count--;
584                 unlock_kernel();
585         }
586
587         EXIT;
588         return;
589 }
590
591 /*static*/ void ll_unpin(struct dentry *de, struct vfsmount *mnt, int flag)
592 {
593         struct ll_sb_info *sbi = ll_i2sbi(de->d_inode);
594         struct ll_dentry_data *ldd = ll_d2d(de);
595         struct obd_client_handle handle;
596         int count, rc = 0;
597         ENTRY;
598         LASSERT(ldd);
599
600         lock_kernel();
601         /* Strictly speaking this introduces an additional race: the
602          * increments should wait until the rpc has returned.
603          * However, given that at present the function is void, this
604          * issue is moot. */
605         handle = (flag) ? ldd->lld_mnt_och : ldd->lld_cwd_och;
606         if (handle.och_magic != OBD_CLIENT_HANDLE_MAGIC) {
607                 /* the "pin" failed */
608                 unlock_kernel();
609                 EXIT;
610                 return;
611         }
612
613         if (flag)
614                 count = --ldd->lld_mnt_count;
615         else
616                 count = --ldd->lld_cwd_count;
617         unlock_kernel();
618
619         if (count != 0) {
620                 EXIT;
621                 return;
622         }
623
624         rc = obd_unpin(sbi->ll_mdc_exp, &handle, flag);
625         EXIT;
626         return;
627 }
628
629 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
630 #ifdef HAVE_VFS_INTENT_PATCHES
631 static int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
632 {
633         int rc;
634         ENTRY;
635
636         if (nd && nd->flags & LOOKUP_LAST && !(nd->flags & LOOKUP_LINK_NOTLAST))
637                 rc = ll_revalidate_it(dentry, nd->flags, &nd->intent);
638         else
639                 rc = ll_revalidate_it(dentry, 0, NULL);
640
641         RETURN(rc);
642 }
643 #else
644 int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
645 {
646         int rc;
647         ENTRY;
648
649         if (nd && !(nd->flags & (LOOKUP_CONTINUE|LOOKUP_PARENT))) {
650                 struct lookup_intent *it;
651                 it = ll_convert_intent(&nd->intent.open, nd->flags);
652                 if (IS_ERR(it))
653                         RETURN(0);
654                 if (it->it_op == (IT_OPEN|IT_CREAT))
655                         if (nd->intent.open.flags & O_EXCL) {
656                                 CDEBUG(D_VFSTRACE, "create O_EXCL, returning 0\n");
657                                 rc = 0;
658                                 goto out_it;
659                         }
660
661                 rc = ll_revalidate_it(dentry, nd->flags, it);
662
663                 if (rc && (nd->flags & LOOKUP_OPEN) &&
664                     it_disposition(it, DISP_OPEN_OPEN)) {/*Open*/
665 #ifdef HAVE_FILE_IN_STRUCT_INTENT
666 // XXX Code duplication with ll_lookup_nd
667                         if (S_ISFIFO(dentry->d_inode->i_mode)) {
668                                 // We cannot call open here as it would
669                                 // deadlock.
670                                 ptlrpc_req_finished(
671                                                (struct ptlrpc_request *)
672                                                   it->d.lustre.it_data);
673                         } else {
674                                 struct file *filp;
675
676                                 nd->intent.open.file->private_data = it;
677                                 filp = lookup_instantiate_filp(nd, dentry,NULL);
678 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,17))
679 /* 2.6.1[456] have a bug in open_namei() that forgets to check
680  * nd->intent.open.file for error, so we need to return it as lookup's result
681  * instead */
682                                 if (IS_ERR(filp))
683                                         rc = 0;
684 #endif
685                         }
686 #else
687                         ll_release_openhandle(dentry, it);
688 #endif /* HAVE_FILE_IN_STRUCT_INTENT */
689                 }
690                 if (!rc && (nd->flags & LOOKUP_CREATE) &&
691                     it_disposition(it, DISP_OPEN_CREATE)) {
692                         /* We created something but we may only return
693                          * negative dentry here, so save request in dentry,
694                          * if lookup will be called later on, it will
695                          * pick the request, otherwise it would be freed
696                          * with dentry */
697                         ll_d2d(dentry)->lld_it = it;
698                         it = NULL; /* avoid freeing */
699                 }
700                         
701 out_it:
702                 if (it) {
703                         ll_intent_release(it);
704                         OBD_FREE(it, sizeof(*it));
705                 }
706         } else {
707                 rc = ll_revalidate_it(dentry, 0, NULL);
708         }
709
710         RETURN(rc);
711 }
712 #endif
713 #endif
714
715 struct dentry_operations ll_d_ops = {
716 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
717         .d_revalidate = ll_revalidate_nd,
718 #else
719         .d_revalidate_it = ll_revalidate_it,
720 #endif
721         .d_release = ll_release,
722         .d_delete = ll_ddelete,
723 #ifdef DCACHE_LUSTRE_INVALID
724         .d_compare = ll_dcompare,
725 #endif
726 #if 0
727         .d_pin = ll_pin,
728         .d_unpin = ll_unpin,
729 #endif
730 };