Whamcloud - gitweb
b=17167 libcfs: ensure all libcfs exported symbols to have cfs_ prefix
[fs/lustre-release.git] / lustre / llite / dcache.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #include <linux/fs.h>
38 #include <linux/sched.h>
39 #include <linux/smp_lock.h>
40 #include <linux/quotaops.h>
41
42 #define DEBUG_SUBSYSTEM S_LLITE
43
44 #include <obd_support.h>
45 #include <lustre_lite.h>
46 #include <lustre/lustre_idl.h>
47 #include <lustre_dlm.h>
48 #include <lustre_mdc.h>
49 //#include <lustre_ver.h>
50 //#include <lustre_version.h>
51
52 #include "llite_internal.h"
53
54
55 /* should NOT be called with the dcache lock, see fs/dcache.c */
56 static void ll_release(struct dentry *de)
57 {
58         struct ll_dentry_data *lld;
59         ENTRY;
60         LASSERT(de != NULL);
61         lld = ll_d2d(de);
62         if (lld == NULL) { /* NFS copies the de->d_op methods (bug 4655) */
63                 EXIT;
64                 return;
65         }
66 #ifndef HAVE_VFS_INTENT_PATCHES
67         if (lld->lld_it) {
68                 ll_intent_release(lld->lld_it);
69                 OBD_FREE(lld->lld_it, sizeof(*lld->lld_it));
70         }
71 #endif
72         LASSERT(lld->lld_cwd_count == 0);
73         LASSERT(lld->lld_mnt_count == 0);
74         OBD_FREE(de->d_fsdata, sizeof(*lld));
75
76         EXIT;
77 }
78
79 /* Compare if two dentries are the same.  Don't match if the existing dentry
80  * is marked DCACHE_LUSTRE_INVALID.  Returns 1 if different, 0 if the same.
81  *
82  * This avoids a race where ll_lookup_it() instantiates a dentry, but we get
83  * an AST before calling d_revalidate_it().  The dentry still exists (marked
84  * INVALID) so d_lookup() matches it, but we have no lock on it (so
85  * lock_match() fails) and we spin around real_lookup(). */
86 int ll_dcompare(struct dentry *parent, struct qstr *d_name, struct qstr *name)
87 {
88         struct dentry *dchild;
89         ENTRY;
90
91         if (d_name->len != name->len)
92                 RETURN(1);
93
94         if (memcmp(d_name->name, name->name, name->len))
95                 RETURN(1);
96
97         /* XXX: d_name must be in-dentry structure */
98         dchild = container_of(d_name, struct dentry, d_name); /* ugh */
99
100         CDEBUG(D_DENTRY,"found name %.*s(%p) - flags %d/%x - refc %d\n",
101                name->len, name->name, dchild,
102                d_mountpoint(dchild), dchild->d_flags & DCACHE_LUSTRE_INVALID,
103                atomic_read(&dchild->d_count));
104
105          /* mountpoint is always valid */
106         if (d_mountpoint(dchild))
107                 RETURN(0);
108
109         if (dchild->d_flags & DCACHE_LUSTRE_INVALID)
110                 RETURN(1);
111
112
113         RETURN(0);
114 }
115
116 /* should NOT be called with the dcache lock, see fs/dcache.c */
117 static int ll_ddelete(struct dentry *de)
118 {
119         ENTRY;
120         LASSERT(de);
121
122         CDEBUG(D_DENTRY, "%s dentry %.*s (%p, parent %p, inode %p) %s%s\n",
123                (de->d_flags & DCACHE_LUSTRE_INVALID ? "deleting" : "keeping"),
124                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
125                d_unhashed(de) ? "" : "hashed,",
126                list_empty(&de->d_subdirs) ? "" : "subdirs");
127
128         RETURN(0);
129 }
130
131 void ll_set_dd(struct dentry *de)
132 {
133         ENTRY;
134         LASSERT(de != NULL);
135
136         CDEBUG(D_DENTRY, "ldd on dentry %.*s (%p) parent %p inode %p refc %d\n",
137                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
138                atomic_read(&de->d_count));
139
140         if (de->d_fsdata == NULL) {
141                 struct ll_dentry_data *lld;
142
143                 OBD_ALLOC_PTR(lld);
144                 if (likely(lld != NULL)) {
145                         lock_dentry(de);
146                         if (likely(de->d_fsdata == NULL))
147                                 de->d_fsdata = lld;
148                         else
149                                 OBD_FREE_PTR(lld);
150                         unlock_dentry(de);
151                 }
152         }
153
154         EXIT;
155 }
156
157 void ll_intent_drop_lock(struct lookup_intent *it)
158 {
159         struct lustre_handle *handle;
160
161         if (it->it_op && it->d.lustre.it_lock_mode) {
162                 handle = (struct lustre_handle *)&it->d.lustre.it_lock_handle;
163                 CDEBUG(D_DLMTRACE, "releasing lock with cookie "LPX64
164                        " from it %p\n", handle->cookie, it);
165                 ldlm_lock_decref(handle, it->d.lustre.it_lock_mode);
166
167                 /* bug 494: intent_release may be called multiple times, from
168                  * this thread and we don't want to double-decref this lock */
169                 it->d.lustre.it_lock_mode = 0;
170         }
171 }
172
173 void ll_intent_release(struct lookup_intent *it)
174 {
175         ENTRY;
176
177         CDEBUG(D_INFO, "intent %p released\n", it);
178         ll_intent_drop_lock(it);
179 #ifdef HAVE_VFS_INTENT_PATCHES
180         it->it_magic = 0;
181         it->it_op_release = 0;
182 #endif
183         /* We are still holding extra reference on a request, need to free it */
184         if (it_disposition(it, DISP_ENQ_OPEN_REF))
185                  ptlrpc_req_finished(it->d.lustre.it_data); /* ll_file_open */
186         if (it_disposition(it, DISP_ENQ_CREATE_REF)) /* create rec */
187                 ptlrpc_req_finished(it->d.lustre.it_data);
188         if (it_disposition(it, DISP_ENQ_COMPLETE)) /* saved req from revalidate
189                                                     * to lookup */
190                 ptlrpc_req_finished(it->d.lustre.it_data);
191
192         it->d.lustre.it_disposition = 0;
193         it->d.lustre.it_data = NULL;
194         EXIT;
195 }
196
197 /* Drop dentry if it is not used already, unhash otherwise.
198    Should be called with dcache lock held!
199    Returns: 1 if dentry was dropped, 0 if unhashed. */
200 int ll_drop_dentry(struct dentry *dentry)
201 {
202         lock_dentry(dentry);
203         if (atomic_read(&dentry->d_count) == 0) {
204                 CDEBUG(D_DENTRY, "deleting dentry %.*s (%p) parent %p "
205                        "inode %p\n", dentry->d_name.len,
206                        dentry->d_name.name, dentry, dentry->d_parent,
207                        dentry->d_inode);
208                 dget_locked(dentry);
209                 __d_drop(dentry);
210                 unlock_dentry(dentry);
211                 spin_unlock(&dcache_lock);
212                 dput(dentry);
213                 spin_lock(&dcache_lock);
214                 return 1;
215         }
216         /* disconected dentry can not be find without lookup, because we
217          * not need his to unhash or mark invalid. */
218         if (dentry->d_flags & DCACHE_DISCONNECTED) {
219                 unlock_dentry(dentry);
220                 RETURN (0);
221         }
222
223         if (!(dentry->d_flags & DCACHE_LUSTRE_INVALID)) {
224                 CDEBUG(D_DENTRY, "unhashing dentry %.*s (%p) parent %p "
225                        "inode %p refc %d\n", dentry->d_name.len,
226                        dentry->d_name.name, dentry, dentry->d_parent,
227                        dentry->d_inode, atomic_read(&dentry->d_count));
228                 /* actually we don't unhash the dentry, rather just
229                  * mark it inaccessible for to __d_lookup(). otherwise
230                  * sys_getcwd() could return -ENOENT -bzzz */
231                 dentry->d_flags |= DCACHE_LUSTRE_INVALID;
232                 if (!dentry->d_inode || !S_ISDIR(dentry->d_inode->i_mode))
233                         __d_drop(dentry);
234         }
235         unlock_dentry(dentry);
236         return 0;
237 }
238
239 void ll_unhash_aliases(struct inode *inode)
240 {
241         struct list_head *tmp, *head;
242         ENTRY;
243
244         if (inode == NULL) {
245                 CERROR("unexpected NULL inode, tell phil\n");
246                 return;
247         }
248
249         CDEBUG(D_INODE, "marking dentries for ino %lu/%u(%p) invalid\n",
250                inode->i_ino, inode->i_generation, inode);
251
252         head = &inode->i_dentry;
253         spin_lock(&dcache_lock);
254 restart:
255         tmp = head;
256         while ((tmp = tmp->next) != head) {
257                 struct dentry *dentry = list_entry(tmp, struct dentry, d_alias);
258
259                 CDEBUG(D_DENTRY, "dentry in drop %.*s (%p) parent %p "
260                        "inode %p flags %d\n", dentry->d_name.len,
261                        dentry->d_name.name, dentry, dentry->d_parent,
262                        dentry->d_inode, dentry->d_flags);
263
264                 if (dentry->d_name.len == 1 && dentry->d_name.name[0] == '/') {
265                         CERROR("called on root (?) dentry=%p, inode=%p "
266                                "ino=%lu\n", dentry, inode, inode->i_ino);
267                         lustre_dump_dentry(dentry, 1);
268                         libcfs_debug_dumpstack(NULL);
269                 }
270
271                 if (ll_drop_dentry(dentry))
272                           goto restart;
273         }
274         spin_unlock(&dcache_lock);
275
276         EXIT;
277 }
278
279 int ll_revalidate_it_finish(struct ptlrpc_request *request,
280                             struct lookup_intent *it,
281                             struct dentry *de)
282 {
283         int rc = 0;
284         ENTRY;
285
286         if (!request)
287                 RETURN(0);
288
289         if (it_disposition(it, DISP_LOOKUP_NEG))
290                 RETURN(-ENOENT);
291
292         rc = ll_prep_inode(&de->d_inode, request, NULL);
293
294         RETURN(rc);
295 }
296
297 void ll_finish_locks(struct lookup_intent *it, struct dentry *dentry)
298 {
299         LASSERT(it != NULL);
300         LASSERT(dentry != NULL);
301
302         if (it->d.lustre.it_lock_mode && dentry->d_inode != NULL) {
303                 struct inode *inode = dentry->d_inode;
304                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
305
306                 CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%u)\n",
307                        inode, inode->i_ino, inode->i_generation);
308                 md_set_lock_data(sbi->ll_md_exp, &it->d.lustre.it_lock_handle,
309                                  inode, NULL);
310         }
311
312         /* drop lookup or getattr locks immediately */
313         if (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR) {
314                 /* on 2.6 there are situation when several lookups and
315                  * revalidations may be requested during single operation.
316                  * therefore, we don't release intent here -bzzz */
317                 ll_intent_drop_lock(it);
318         }
319 }
320
321 void ll_frob_intent(struct lookup_intent **itp, struct lookup_intent *deft)
322 {
323         struct lookup_intent *it = *itp;
324 #ifdef HAVE_VFS_INTENT_PATCHES
325         if (it) {
326                 LASSERTF(it->it_magic == INTENT_MAGIC,
327                          "%p has bad intent magic: %x\n",
328                          it, it->it_magic);
329         }
330 #endif
331
332         if (!it || it->it_op == IT_GETXATTR)
333                 it = *itp = deft;
334
335 #ifdef HAVE_VFS_INTENT_PATCHES
336         it->it_op_release = ll_intent_release;
337 #endif
338 }
339
340 int ll_revalidate_it(struct dentry *de, int lookup_flags,
341                      struct lookup_intent *it)
342 {
343         struct md_op_data *op_data;
344         struct ptlrpc_request *req = NULL;
345         struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
346         struct obd_export *exp;
347         struct inode *parent = de->d_parent->d_inode;
348         int rc, first = 0;
349
350         ENTRY;
351         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,intent=%s\n", de->d_name.name,
352                LL_IT2STR(it));
353
354         if (de->d_inode == NULL) {
355                 /* We can only use negative dentries if this is stat or lookup,
356                    for opens and stuff we do need to query server. */
357                 /* If there is IT_CREAT in intent op set, then we must throw
358                    away this negative dentry and actually do the request to
359                    kernel to create whatever needs to be created (if possible)*/
360                 if (it && (it->it_op & IT_CREAT))
361                         RETURN(0);
362
363                 if (de->d_flags & DCACHE_LUSTRE_INVALID)
364                         RETURN(0);
365
366                 rc = ll_have_md_lock(parent, MDS_INODELOCK_UPDATE);
367                 GOTO(out_sa, rc);
368         }
369
370         /* Never execute intents for mount points.
371          * Attributes will be fixed up in ll_inode_revalidate_it */
372         if (d_mountpoint(de))
373                 GOTO(out_sa, rc = 1);
374
375         /* need to get attributes in case root got changed from other client */
376         if (de == de->d_sb->s_root) {
377                 rc = __ll_inode_revalidate_it(de, it, MDS_INODELOCK_LOOKUP);
378                 if (rc == 0)
379                         rc = 1;
380                 GOTO(out_sa, rc);
381         }
382
383         exp = ll_i2mdexp(de->d_inode);
384
385         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_REVALIDATE_PAUSE, 5);
386         ll_frob_intent(&it, &lookup_it);
387         LASSERT(it);
388
389         if (it->it_op == IT_LOOKUP && !(de->d_flags & DCACHE_LUSTRE_INVALID))
390                 GOTO(out_sa, rc = 1);
391
392         op_data = ll_prep_md_op_data(NULL, parent, de->d_inode,
393                                      de->d_name.name, de->d_name.len,
394                                      0, LUSTRE_OPC_ANY, NULL);
395         if (IS_ERR(op_data))
396                 RETURN(PTR_ERR(op_data));
397
398         if ((it->it_op == IT_OPEN) && de->d_inode) {
399                 struct inode *inode = de->d_inode;
400                 struct ll_inode_info *lli = ll_i2info(inode);
401                 struct obd_client_handle **och_p;
402                 __u64 *och_usecount;
403
404                 /*
405                  * We used to check for MDS_INODELOCK_OPEN here, but in fact
406                  * just having LOOKUP lock is enough to justify inode is the
407                  * same. And if inode is the same and we have suitable
408                  * openhandle, then there is no point in doing another OPEN RPC
409                  * just to throw away newly received openhandle.  There are no
410                  * security implications too, if file owner or access mode is
411                  * change, LOOKUP lock is revoked.
412                  */
413
414
415                 if (it->it_flags & FMODE_WRITE) {
416                         och_p = &lli->lli_mds_write_och;
417                         och_usecount = &lli->lli_open_fd_write_count;
418                 } else if (it->it_flags & FMODE_EXEC) {
419                         och_p = &lli->lli_mds_exec_och;
420                         och_usecount = &lli->lli_open_fd_exec_count;
421                 } else {
422                         och_p = &lli->lli_mds_read_och;
423                         och_usecount = &lli->lli_open_fd_read_count;
424                 }
425                 /* Check for the proper lock. */
426                 if (!ll_have_md_lock(inode, MDS_INODELOCK_LOOKUP))
427                         goto do_lock;
428                 cfs_down(&lli->lli_och_sem);
429                 if (*och_p) { /* Everything is open already, do nothing */
430                         /*(*och_usecount)++;  Do not let them steal our open
431                           handle from under us */
432                         /* XXX The code above was my original idea, but in case
433                            we have the handle, but we cannot use it due to later
434                            checks (e.g. O_CREAT|O_EXCL flags set), nobody
435                            would decrement counter increased here. So we just
436                            hope the lock won't be invalidated in between. But
437                            if it would be, we'll reopen the open request to
438                            MDS later during file open path */
439                         cfs_up(&lli->lli_och_sem);
440                         ll_finish_md_op_data(op_data);
441                         RETURN(1);
442                 } else {
443                         cfs_up(&lli->lli_och_sem);
444                 }
445         }
446
447         if (it->it_op == IT_GETATTR)
448                 first = ll_statahead_enter(parent, &de, 0);
449
450 do_lock:
451         it->it_create_mode &= ~current->fs->umask;
452         it->it_create_mode |= M_CHECK_STALE;
453         rc = md_intent_lock(exp, op_data, NULL, 0, it,
454                             lookup_flags,
455                             &req, ll_md_blocking_ast, 0);
456         it->it_create_mode &= ~M_CHECK_STALE;
457         ll_finish_md_op_data(op_data);
458         if (it->it_op == IT_GETATTR && !first)
459                 /* If there are too many locks on client-side, then some
460                  * locks taken by statahead maybe dropped automatically
461                  * before the real "revalidate" using them. */
462                 ll_statahead_exit(parent, de, req == NULL ? rc : 0);
463         else if (first == -EEXIST)
464                 ll_statahead_mark(parent, de);
465
466         /* If req is NULL, then md_intent_lock only tried to do a lock match;
467          * if all was well, it will return 1 if it found locks, 0 otherwise. */
468         if (req == NULL && rc >= 0) {
469                 if (!rc)
470                         goto do_lookup;
471                 GOTO(out, rc);
472         }
473
474         if (rc < 0) {
475                 if (rc != -ESTALE) {
476                         CDEBUG(D_INFO, "ll_intent_lock: rc %d : it->it_status "
477                                "%d\n", rc, it->d.lustre.it_status);
478                 }
479                 GOTO(out, rc = 0);
480         }
481
482 revalidate_finish:
483         rc = ll_revalidate_it_finish(req, it, de);
484         if (rc != 0) {
485                 if (rc != -ESTALE && rc != -ENOENT)
486                         ll_intent_release(it);
487                 GOTO(out, rc = 0);
488         }
489
490         if ((it->it_op & IT_OPEN) && de->d_inode &&
491             !S_ISREG(de->d_inode->i_mode) &&
492             !S_ISDIR(de->d_inode->i_mode)) {
493                 ll_release_openhandle(de, it);
494         }
495         rc = 1;
496
497         /* unfortunately ll_intent_lock may cause a callback and revoke our
498          * dentry */
499         spin_lock(&dcache_lock);
500         lock_dentry(de);
501         __d_drop(de);
502         unlock_dentry(de);
503         d_rehash_cond(de, 0);
504         spin_unlock(&dcache_lock);
505
506 out:
507         /* We do not free request as it may be reused during following lookup
508          * (see comment in mdc/mdc_locks.c::mdc_intent_lock()), request will
509          * be freed in ll_lookup_it or in ll_intent_release. But if
510          * request was not completed, we need to free it. (bug 5154, 9903) */
511         if (req != NULL && !it_disposition(it, DISP_ENQ_COMPLETE))
512                 ptlrpc_req_finished(req);
513         if (rc == 0) {
514                 ll_unhash_aliases(de->d_inode);
515         } else {
516                 CDEBUG(D_DENTRY, "revalidated dentry %.*s (%p) parent %p "
517                        "inode %p refc %d\n", de->d_name.len,
518                        de->d_name.name, de, de->d_parent, de->d_inode,
519                        atomic_read(&de->d_count));
520                 ll_finish_locks(it, de);
521                 lock_dentry(de);
522                 de->d_flags &= ~DCACHE_LUSTRE_INVALID;
523                 unlock_dentry(de);
524         }
525         RETURN(rc);
526
527         /*
528          * This part is here to combat evil-evil race in real_lookup on 2.6
529          * kernels.  The race details are: We enter do_lookup() looking for some
530          * name, there is nothing in dcache for this name yet and d_lookup()
531          * returns NULL.  We proceed to real_lookup(), and while we do this,
532          * another process does open on the same file we looking up (most simple
533          * reproducer), open succeeds and the dentry is added. Now back to
534          * us. In real_lookup() we do d_lookup() again and suddenly find the
535          * dentry, so we call d_revalidate on it, but there is no lock, so
536          * without this code we would return 0, but unpatched real_lookup just
537          * returns -ENOENT in such a case instead of retrying the lookup. Once
538          * this is dealt with in real_lookup(), all of this ugly mess can go and
539          * we can just check locks in ->d_revalidate without doing any RPCs
540          * ever.
541          */
542 do_lookup:
543         if (it != &lookup_it) {
544                 /* MDS_INODELOCK_UPDATE needed for IT_GETATTR case. */
545                 if (it->it_op == IT_GETATTR)
546                         lookup_it.it_op = IT_GETATTR;
547                 ll_finish_locks(it, de);
548                 it = &lookup_it;
549         }
550
551         /* Do real lookup here. */
552         op_data = ll_prep_md_op_data(NULL, parent, NULL, de->d_name.name,
553                                      de->d_name.len, 0, (it->it_op & IT_CREAT ?
554                                                          LUSTRE_OPC_CREATE :
555                                                          LUSTRE_OPC_ANY), NULL);
556         if (IS_ERR(op_data))
557                 RETURN(PTR_ERR(op_data));
558
559         rc = md_intent_lock(exp, op_data, NULL, 0,  it, 0, &req,
560                             ll_md_blocking_ast, 0);
561         if (rc >= 0) {
562                 struct mdt_body *mdt_body;
563                 struct lu_fid fid = {.f_seq = 0, .f_oid = 0, .f_ver = 0};
564                 mdt_body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
565
566                 if (de->d_inode)
567                         fid = *ll_inode2fid(de->d_inode);
568
569                 /* see if we got same inode, if not - return error */
570                 if (lu_fid_eq(&fid, &mdt_body->fid1)) {
571                         ll_finish_md_op_data(op_data);
572                         op_data = NULL;
573                         goto revalidate_finish;
574                 }
575                 ll_intent_release(it);
576         }
577         ll_finish_md_op_data(op_data);
578         GOTO(out, rc = 0);
579
580 out_sa:
581         /*
582          * For rc == 1 case, should not return directly to prevent losing
583          * statahead windows; for rc == 0 case, the "lookup" will be done later.
584          */
585         if (it && it->it_op == IT_GETATTR && rc == 1) {
586                 first = ll_statahead_enter(parent, &de, 0);
587                 if (!first)
588                         ll_statahead_exit(parent, de, 1);
589                 else if (first == -EEXIST)
590                         ll_statahead_mark(parent, de);
591         }
592
593         return rc;
594 }
595
596 #if 0
597 static void ll_pin(struct dentry *de, struct vfsmount *mnt, int flag)
598 {
599         struct inode *inode= de->d_inode;
600         struct ll_sb_info *sbi = ll_i2sbi(inode);
601         struct ll_dentry_data *ldd = ll_d2d(de);
602         struct obd_client_handle *handle;
603         struct obd_capa *oc;
604         int rc = 0;
605         ENTRY;
606         LASSERT(ldd);
607
608         cfs_lock_kernel();
609         /* Strictly speaking this introduces an additional race: the
610          * increments should wait until the rpc has returned.
611          * However, given that at present the function is void, this
612          * issue is moot. */
613         if (flag == 1 && (++ldd->lld_mnt_count) > 1) {
614                 cfs_unlock_kernel();
615                 EXIT;
616                 return;
617         }
618
619         if (flag == 0 && (++ldd->lld_cwd_count) > 1) {
620                 cfs_unlock_kernel();
621                 EXIT;
622                 return;
623         }
624         cfs_unlock_kernel();
625
626         handle = (flag) ? &ldd->lld_mnt_och : &ldd->lld_cwd_och;
627         oc = ll_mdscapa_get(inode);
628         rc = obd_pin(sbi->ll_md_exp, ll_inode2fid(inode), oc, handle, flag);
629         capa_put(oc);
630         if (rc) {
631                 cfs_lock_kernel();
632                 memset(handle, 0, sizeof(*handle));
633                 if (flag == 0)
634                         ldd->lld_cwd_count--;
635                 else
636                         ldd->lld_mnt_count--;
637                 cfs_unlock_kernel();
638         }
639
640         EXIT;
641         return;
642 }
643
644 static void ll_unpin(struct dentry *de, struct vfsmount *mnt, int flag)
645 {
646         struct ll_sb_info *sbi = ll_i2sbi(de->d_inode);
647         struct ll_dentry_data *ldd = ll_d2d(de);
648         struct obd_client_handle handle;
649         int count, rc = 0;
650         ENTRY;
651         LASSERT(ldd);
652
653         cfs_lock_kernel();
654         /* Strictly speaking this introduces an additional race: the
655          * increments should wait until the rpc has returned.
656          * However, given that at present the function is void, this
657          * issue is moot. */
658         handle = (flag) ? ldd->lld_mnt_och : ldd->lld_cwd_och;
659         if (handle.och_magic != OBD_CLIENT_HANDLE_MAGIC) {
660                 /* the "pin" failed */
661                 cfs_unlock_kernel();
662                 EXIT;
663                 return;
664         }
665
666         if (flag)
667                 count = --ldd->lld_mnt_count;
668         else
669                 count = --ldd->lld_cwd_count;
670         cfs_unlock_kernel();
671
672         if (count != 0) {
673                 EXIT;
674                 return;
675         }
676
677         rc = obd_unpin(sbi->ll_md_exp, &handle, flag);
678         EXIT;
679         return;
680 }
681 #endif
682
683 #ifdef HAVE_VFS_INTENT_PATCHES
684 int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
685 {
686         int rc;
687         ENTRY;
688
689         if (nd && nd->flags & LOOKUP_LAST && !(nd->flags & LOOKUP_LINK_NOTLAST))
690                 rc = ll_revalidate_it(dentry, nd->flags, &nd->intent);
691         else
692                 rc = ll_revalidate_it(dentry, 0, NULL);
693
694         RETURN(rc);
695 }
696 #else
697 int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
698 {
699         int rc;
700         ENTRY;
701
702         if (nd && !(nd->flags & (LOOKUP_CONTINUE|LOOKUP_PARENT))) {
703                 struct lookup_intent *it;
704                 it = ll_convert_intent(&nd->intent.open, nd->flags);
705                 if (IS_ERR(it))
706                         RETURN(0);
707                 if (it->it_op == (IT_OPEN|IT_CREAT))
708                         if (nd->intent.open.flags & O_EXCL) {
709                                 CDEBUG(D_VFSTRACE, "create O_EXCL, returning 0\n");
710                                 rc = 0;
711                                 goto out_it;
712                         }
713
714                 rc = ll_revalidate_it(dentry, nd->flags, it);
715
716                 if (rc && (nd->flags & LOOKUP_OPEN) &&
717                     it_disposition(it, DISP_OPEN_OPEN)) {/*Open*/
718 #ifdef HAVE_FILE_IN_STRUCT_INTENT
719 // XXX Code duplication with ll_lookup_nd
720                         if (S_ISFIFO(dentry->d_inode->i_mode)) {
721                                 // We cannot call open here as it would
722                                 // deadlock.
723                                 ptlrpc_req_finished(
724                                                (struct ptlrpc_request *)
725                                                   it->d.lustre.it_data);
726                         } else {
727 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,17))
728 /* 2.6.1[456] have a bug in open_namei() that forgets to check
729  * nd->intent.open.file for error, so we need to return it as lookup's result
730  * instead */
731                                 struct file *filp;
732
733                                 nd->intent.open.file->private_data = it;
734                                 filp = lookup_instantiate_filp(nd, dentry,NULL);
735                                 if (IS_ERR(filp)) {
736                                         rc = PTR_ERR(filp);
737                                 }
738 #else
739                                 nd->intent.open.file->private_data = it;
740                                 (void)lookup_instantiate_filp(nd, dentry,NULL);
741 #endif
742                         }
743 #else
744                         ll_release_openhandle(dentry, it);
745 #endif /* HAVE_FILE_IN_STRUCT_INTENT */
746                 }
747                 if (!rc && (nd->flags & LOOKUP_CREATE) &&
748                     it_disposition(it, DISP_OPEN_CREATE)) {
749                         /* We created something but we may only return
750                          * negative dentry here, so save request in dentry,
751                          * if lookup will be called later on, it will
752                          * pick the request, otherwise it would be freed
753                          * with dentry */
754                         ll_d2d(dentry)->lld_it = it;
755                         it = NULL; /* avoid freeing */
756                 }
757
758 out_it:
759                 if (it) {
760                         ll_intent_release(it);
761                         OBD_FREE(it, sizeof(*it));
762                 }
763         } else {
764                 rc = ll_revalidate_it(dentry, 0, NULL);
765         }
766
767         RETURN(rc);
768 }
769 #endif
770
771 struct dentry_operations ll_d_ops = {
772         .d_revalidate = ll_revalidate_nd,
773         .d_release = ll_release,
774         .d_delete = ll_ddelete,
775         .d_compare = ll_dcompare,
776 #if 0
777         .d_pin = ll_pin,
778         .d_unpin = ll_unpin,
779 #endif
780 };