Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / llite / dcache.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #include <linux/fs.h>
23 #include <linux/sched.h>
24 #include <linux/smp_lock.h>
25 #include <linux/quotaops.h>
26
27 #define DEBUG_SUBSYSTEM S_LLITE
28
29 #include <obd_support.h>
30 #include <lustre_lite.h>
31 #include <lustre/lustre_idl.h>
32 #include <lustre_dlm.h>
33 #include <lustre_mdc.h>
34 //#include <lustre_ver.h>
35 //#include <lustre_version.h>
36
37 #include "llite_internal.h"
38
39 /* should NOT be called with the dcache lock, see fs/dcache.c */
40 static void ll_release(struct dentry *de)
41 {
42         struct ll_dentry_data *lld;
43         ENTRY;
44         LASSERT(de != NULL);
45         lld = ll_d2d(de);
46         if (lld == NULL) { /* NFS copies the de->d_op methods (bug 4655) */
47                 EXIT;
48                 return;
49         }
50 #ifndef HAVE_VFS_INTENT_PATCHES
51         if (lld->lld_it) {
52                 ll_intent_release(lld->lld_it);
53                 OBD_FREE(lld->lld_it, sizeof(*lld->lld_it));
54         }
55 #endif
56         LASSERT(lld->lld_cwd_count == 0);
57         LASSERT(lld->lld_mnt_count == 0);
58         OBD_FREE(de->d_fsdata, sizeof(*lld));
59
60         EXIT;
61 }
62
63 #ifdef DCACHE_LUSTRE_INVALID
64 /* Compare if two dentries are the same.  Don't match if the existing dentry
65  * is marked DCACHE_LUSTRE_INVALID.  Returns 1 if different, 0 if the same.
66  *
67  * This avoids a race where ll_lookup_it() instantiates a dentry, but we get
68  * an AST before calling d_revalidate_it().  The dentry still exists (marked
69  * INVALID) so d_lookup() matches it, but we have no lock on it (so
70  * lock_match() fails) and we spin around real_lookup(). */
71 int ll_dcompare(struct dentry *parent, struct qstr *d_name, struct qstr *name)
72 {
73         struct dentry *dchild;
74         ENTRY;
75
76         if (d_name->len != name->len)
77                 RETURN(1);
78
79         if (memcmp(d_name->name, name->name, name->len))
80                 RETURN(1);
81
82         /* XXX: d_name must be in-dentry structure */
83         dchild = container_of(d_name, struct dentry, d_name); /* ugh */
84         if (dchild->d_flags & DCACHE_LUSTRE_INVALID) {
85                 CDEBUG(D_DENTRY,"INVALID dentry %p not matched, was bug 3784\n",
86                        dchild);
87                 RETURN(1);
88         }
89
90         RETURN(0);
91 }
92 #endif
93
94 /* should NOT be called with the dcache lock, see fs/dcache.c */
95 static int ll_ddelete(struct dentry *de)
96 {
97         ENTRY;
98         LASSERT(de);
99 #ifndef DCACHE_LUSTRE_INVALID
100 #define DCACHE_LUSTRE_INVALID 0
101 #endif
102
103         CDEBUG(D_DENTRY, "%s dentry %.*s (%p, parent %p, inode %p) %s%s\n",
104                (de->d_flags & DCACHE_LUSTRE_INVALID ? "deleting" : "keeping"),
105                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
106                d_unhashed(de) ? "" : "hashed,",
107                list_empty(&de->d_subdirs) ? "" : "subdirs");
108 #if DCACHE_LUSTRE_INVALID == 0
109 #undef DCACHE_LUSTRE_INVALID
110 #endif
111
112         RETURN(0);
113 }
114
115 void ll_set_dd(struct dentry *de)
116 {
117         ENTRY;
118         LASSERT(de != NULL);
119
120         CDEBUG(D_DENTRY, "ldd on dentry %.*s (%p) parent %p inode %p refc %d\n",
121                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
122                atomic_read(&de->d_count));
123         lock_kernel();
124         if (de->d_fsdata == NULL) {
125                 OBD_ALLOC(de->d_fsdata, sizeof(struct ll_dentry_data));
126         }
127         unlock_kernel();
128
129         EXIT;
130 }
131
132 void ll_intent_drop_lock(struct lookup_intent *it)
133 {
134         struct lustre_handle *handle;
135
136         if (it->it_op && it->d.lustre.it_lock_mode) {
137                 handle = (struct lustre_handle *)&it->d.lustre.it_lock_handle;
138                 CDEBUG(D_DLMTRACE, "releasing lock with cookie "LPX64
139                        " from it %p\n", handle->cookie, it);
140                 ldlm_lock_decref(handle, it->d.lustre.it_lock_mode);
141
142                 /* bug 494: intent_release may be called multiple times, from
143                  * this thread and we don't want to double-decref this lock */
144                 it->d.lustre.it_lock_mode = 0;
145         }
146 }
147
148 void ll_intent_release(struct lookup_intent *it)
149 {
150         ENTRY;
151
152         CDEBUG(D_INFO, "intent %p released\n", it);
153         ll_intent_drop_lock(it);
154 #ifdef HAVE_VFS_INTENT_PATCHES
155         it->it_magic = 0;
156         it->it_op_release = 0;
157 #endif
158         /* We are still holding extra reference on a request, need to free it */
159         if (it_disposition(it, DISP_ENQ_OPEN_REF))
160                  ptlrpc_req_finished(it->d.lustre.it_data); /* ll_file_open */
161         if (it_disposition(it, DISP_ENQ_CREATE_REF)) /* create rec */
162                 ptlrpc_req_finished(it->d.lustre.it_data);
163         if (it_disposition(it, DISP_ENQ_COMPLETE)) /* saved req from revalidate
164                                                     * to lookup */
165                 ptlrpc_req_finished(it->d.lustre.it_data);
166
167         it->d.lustre.it_disposition = 0;
168         it->d.lustre.it_data = NULL;
169         EXIT;
170 }
171
172 /* Drop dentry if it is not used already, unhash otherwise.
173    Should be called with dcache lock held!
174    Returns: 1 if dentry was dropped, 0 if unhashed. */
175 int ll_drop_dentry(struct dentry *dentry)
176 {
177         lock_dentry(dentry);
178         if (atomic_read(&dentry->d_count) == 0) {
179                 CDEBUG(D_DENTRY, "deleting dentry %.*s (%p) parent %p "
180                        "inode %p\n", dentry->d_name.len,
181                        dentry->d_name.name, dentry, dentry->d_parent,
182                        dentry->d_inode);
183                 dget_locked(dentry);
184                 __d_drop(dentry);
185                 unlock_dentry(dentry);
186                 spin_unlock(&dcache_lock);
187                 dput(dentry);
188                 spin_lock(&dcache_lock);
189                 return 1;
190         }
191         /* disconected dentry can not be find without lookup, because we 
192          * not need his to unhash or mark invalid. */
193         if (dentry->d_flags & DCACHE_DISCONNECTED) {
194                 unlock_dentry(dentry);
195                 RETURN (0);
196         }
197
198 #ifdef DCACHE_LUSTRE_INVALID
199         if (!(dentry->d_flags & DCACHE_LUSTRE_INVALID)) {
200 #else
201         if (!d_unhashed(dentry)) {
202 #endif
203                 CDEBUG(D_DENTRY, "unhashing dentry %.*s (%p) parent %p "
204                        "inode %p refc %d\n", dentry->d_name.len,
205                        dentry->d_name.name, dentry, dentry->d_parent,
206                        dentry->d_inode, atomic_read(&dentry->d_count));
207                 /* actually we don't unhash the dentry, rather just
208                  * mark it inaccessible for to __d_lookup(). otherwise
209                  * sys_getcwd() could return -ENOENT -bzzz */
210 #ifdef DCACHE_LUSTRE_INVALID
211                 dentry->d_flags |= DCACHE_LUSTRE_INVALID;
212 #endif
213                 if (!dentry->d_inode || !S_ISDIR(dentry->d_inode->i_mode))
214                         __d_drop(dentry);
215
216         }
217         unlock_dentry(dentry);
218         return 0;
219 }
220
221 void ll_unhash_aliases(struct inode *inode)
222 {
223         struct list_head *tmp, *head;
224         ENTRY;
225
226         if (inode == NULL) {
227                 CERROR("unexpected NULL inode, tell phil\n");
228                 return;
229         }
230
231         CDEBUG(D_INODE, "marking dentries for ino %lu/%u(%p) invalid\n",
232                inode->i_ino, inode->i_generation, inode);
233
234         head = &inode->i_dentry;
235         spin_lock(&dcache_lock);
236 restart:
237         tmp = head;
238         while ((tmp = tmp->next) != head) {
239                 struct dentry *dentry = list_entry(tmp, struct dentry, d_alias);
240
241                 CDEBUG(D_DENTRY, "dentry in drop %.*s (%p) parent %p "
242                        "inode %p flags %d\n", dentry->d_name.len,
243                        dentry->d_name.name, dentry, dentry->d_parent,
244                        dentry->d_inode, dentry->d_flags);
245
246                 if (dentry->d_name.len == 1 && dentry->d_name.name[0] == '/') {
247                         CERROR("called on root (?) dentry=%p, inode=%p "
248                                "ino=%lu\n", dentry, inode, inode->i_ino);
249                         lustre_dump_dentry(dentry, 1);
250                         libcfs_debug_dumpstack(NULL);
251                 } else if (d_mountpoint(dentry)) {
252                         /* For mountpoints we skip removal of the dentry
253                            which happens solely because we have a lock on it
254                            obtained when this dentry was not a mountpoint yet */
255                         CDEBUG(D_DENTRY, "Skippind mountpoint dentry removal "
256                                          "%.*s (%p) parent %p\n",
257                                           dentry->d_name.len,
258                                           dentry->d_name.name,
259                                           dentry, dentry->d_parent);
260
261                         continue;
262                 }
263
264                 if (ll_drop_dentry(dentry))
265                           goto restart;
266         }
267         spin_unlock(&dcache_lock);
268         EXIT;
269 }
270
271 int ll_revalidate_it_finish(struct ptlrpc_request *request,
272                             struct lookup_intent *it,
273                             struct dentry *de)
274 {
275         int rc = 0;
276         ENTRY;
277
278         if (!request)
279                 RETURN(0);
280
281         if (it_disposition(it, DISP_LOOKUP_NEG)) 
282                 RETURN(-ENOENT);
283
284         rc = ll_prep_inode(&de->d_inode, request, NULL);
285
286         RETURN(rc);
287 }
288
289 void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry)
290 {
291         LASSERT(it != NULL);
292         LASSERT(dentry != NULL);
293
294         if (it->d.lustre.it_lock_mode && dentry->d_inode != NULL) {
295                 struct inode *inode = dentry->d_inode;
296                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
297
298                 CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%u)\n",
299                        inode, inode->i_ino, inode->i_generation);
300                 md_set_lock_data(sbi->ll_md_exp, &it->d.lustre.it_lock_handle,
301                                  inode);
302         }
303
304         /* drop lookup or getattr locks immediately */
305         if (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR) {
306                 /* on 2.6 there are situation when several lookups and
307                  * revalidations may be requested during single operation.
308                  * therefore, we don't release intent here -bzzz */
309                 ll_intent_drop_lock(it);
310         }
311 }
312
313 void ll_frob_intent(struct lookup_intent **itp, struct lookup_intent *deft)
314 {
315         struct lookup_intent *it = *itp;
316 #ifdef HAVE_VFS_INTENT_PATCHES
317         if (it) {
318                 LASSERTF(it->it_magic == INTENT_MAGIC, 
319                          "%p has bad intent magic: %x\n",
320                          it, it->it_magic);
321         }
322 #endif
323
324         if (!it || it->it_op == IT_GETXATTR)
325                 it = *itp = deft;
326
327 #ifdef HAVE_VFS_INTENT_PATCHES
328         it->it_op_release = ll_intent_release;
329 #endif
330 }
331
332 int ll_revalidate_it(struct dentry *de, int lookup_flags,
333                      struct lookup_intent *it)
334 {
335         int rc;
336         struct md_op_data *op_data;
337         struct ptlrpc_request *req = NULL;
338         struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
339         struct obd_export *exp;
340         struct inode *parent;
341
342         ENTRY;
343         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,intent=%s\n", de->d_name.name,
344                LL_IT2STR(it));
345
346         if (de->d_inode == NULL) {
347                 /* We can only use negative dentries if this is stat or lookup,
348                    for opens and stuff we do need to query server. */
349                 /* If there is IT_CREAT in intent op set, then we must throw
350                    away this negative dentry and actually do the request to
351                    kernel to create whatever needs to be created (if possible)*/
352                 if (it && (it->it_op & IT_CREAT))
353                         RETURN(0);
354
355 #ifdef DCACHE_LUSTRE_INVALID
356                 if (de->d_flags & DCACHE_LUSTRE_INVALID)
357                         RETURN(0);
358 #endif
359
360                 rc = ll_have_md_lock(de->d_parent->d_inode,
361                                      MDS_INODELOCK_UPDATE);
362                 RETURN(rc);
363         }
364
365         exp = ll_i2mdexp(de->d_inode);
366
367         /* Never execute intents for mount points.
368          * Attributes will be fixed up in ll_inode_revalidate_it */
369         if (d_mountpoint(de))
370                 RETURN(1);
371
372         /* Root of the lustre tree. Always valid.
373          * Attributes will be fixed up in ll_inode_revalidate_it */
374         if (de == de->d_sb->s_root)
375                 RETURN(1);
376
377         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_REVALIDATE_PAUSE, 5);
378         ll_frob_intent(&it, &lookup_it);
379         LASSERT(it);
380         parent = de->d_parent->d_inode;
381
382         op_data = ll_prep_md_op_data(NULL, parent, de->d_inode,
383                                      de->d_name.name, de->d_name.len,
384                                      0, LUSTRE_OPC_ANY, NULL);
385         if (IS_ERR(op_data))
386                 RETURN(PTR_ERR(op_data));
387
388         if ((it->it_op == IT_OPEN) && de->d_inode) {
389                 struct inode *inode = de->d_inode;
390                 struct ll_inode_info *lli = ll_i2info(inode);
391                 struct obd_client_handle **och_p;
392                 __u64 *och_usecount;
393
394                 /*
395                  * We used to check for MDS_INODELOCK_OPEN here, but in fact
396                  * just having LOOKUP lock is enough to justify inode is the
397                  * same. And if inode is the same and we have suitable
398                  * openhandle, then there is no point in doing another OPEN RPC
399                  * just to throw away newly received openhandle.  There are no
400                  * security implications too, if file owner or access mode is
401                  * change, LOOKUP lock is revoked.
402                  */
403
404
405                 if (it->it_flags & FMODE_WRITE) {
406                         och_p = &lli->lli_mds_write_och;
407                         och_usecount = &lli->lli_open_fd_write_count;
408                 } else if (it->it_flags & FMODE_EXEC) {
409                         och_p = &lli->lli_mds_exec_och;
410                         och_usecount = &lli->lli_open_fd_exec_count;
411                 } else {
412                         och_p = &lli->lli_mds_read_och;
413                         och_usecount = &lli->lli_open_fd_read_count;
414                 }
415                 /* Check for the proper lock. */
416                 if (!ll_have_md_lock(inode, MDS_INODELOCK_LOOKUP))
417                         goto do_lock;
418                 down(&lli->lli_och_sem);
419                 if (*och_p) { /* Everything is open already, do nothing */
420                         /*(*och_usecount)++;  Do not let them steal our open
421                           handle from under us */
422                         /* XXX The code above was my original idea, but in case
423                            we have the handle, but we cannot use it due to later
424                            checks (e.g. O_CREAT|O_EXCL flags set), nobody
425                            would decrement counter increased here. So we just
426                            hope the lock won't be invalidated in between. But
427                            if it would be, we'll reopen the open request to
428                            MDS later during file open path */
429                         up(&lli->lli_och_sem);
430                         ll_finish_md_op_data(op_data);
431                         RETURN(1);
432                 } else {
433                         up(&lli->lli_och_sem);
434                 }
435         }
436
437 do_lock:
438         it->it_create_mode &= ~current->fs->umask;
439         it->it_flags |= O_CHECK_STALE;
440         rc = md_intent_lock(exp, op_data, NULL, 0, it,
441                             lookup_flags,
442                             &req, ll_md_blocking_ast, 0);
443         it->it_flags &= ~O_CHECK_STALE;
444         ll_finish_md_op_data(op_data);
445         /* If req is NULL, then md_intent_lock only tried to do a lock match;
446          * if all was well, it will return 1 if it found locks, 0 otherwise. */
447         if (req == NULL && rc >= 0) {
448                 if (!rc)
449                         goto do_lookup;
450                 GOTO(out, rc);
451         }
452
453         if (rc < 0) {
454                 if (rc != -ESTALE) {
455                         CDEBUG(D_INFO, "ll_intent_lock: rc %d : it->it_status "
456                                "%d\n", rc, it->d.lustre.it_status);
457                 }
458                 GOTO(out, rc = 0);
459         }
460
461 revalidate_finish:
462         rc = ll_revalidate_it_finish(req, it, de);
463         if (rc != 0) {
464                 if (rc != -ESTALE && rc != -ENOENT)
465                         ll_intent_release(it);
466                 GOTO(out, rc = 0);
467         }
468
469         if ((it->it_op & IT_OPEN) && de->d_inode && 
470             !S_ISREG(de->d_inode->i_mode) && 
471             !S_ISDIR(de->d_inode->i_mode)) {
472                 ll_release_openhandle(de, it);
473         }
474         rc = 1;
475
476         /* unfortunately ll_intent_lock may cause a callback and revoke our
477          * dentry */
478         spin_lock(&dcache_lock);
479         lock_dentry(de);
480         __d_drop(de);
481         unlock_dentry(de);
482         d_rehash_cond(de, 0);
483         spin_unlock(&dcache_lock);
484
485 out:
486         /* We do not free request as it may be reused during following lookup
487          * (see comment in mdc/mdc_locks.c::mdc_intent_lock()), request will
488          * be freed in ll_lookup_it or in ll_intent_release. But if
489          * request was not completed, we need to free it. (bug 5154, 9903) */
490         if (req != NULL && !it_disposition(it, DISP_ENQ_COMPLETE))
491                 ptlrpc_req_finished(req);
492         if (rc == 0) {
493 #ifdef DCACHE_LUSTRE_INVALID
494                 ll_unhash_aliases(de->d_inode);
495                 /* done in ll_unhash_aliases()
496                    dentry->d_flags |= DCACHE_LUSTRE_INVALID; */
497 #else
498                 /* We do not want d_invalidate to kill all child dentries too */
499                 d_drop(de);
500 #endif
501         } else {
502                 CDEBUG(D_DENTRY, "revalidated dentry %.*s (%p) parent %p "
503                        "inode %p refc %d\n", de->d_name.len,
504                        de->d_name.name, de, de->d_parent, de->d_inode,
505                        atomic_read(&de->d_count));
506                 ll_lookup_finish_locks(it, de);
507 #ifdef DCACHE_LUSTRE_INVALID
508                 lock_dentry(de);
509                 de->d_flags &= ~DCACHE_LUSTRE_INVALID;
510                 unlock_dentry(de);
511 #endif
512         }
513         RETURN(rc);
514
515         /*
516          * This part is here to combat evil-evil race in real_lookup on 2.6
517          * kernels.  The race details are: We enter do_lookup() looking for some
518          * name, there is nothing in dcache for this name yet and d_lookup()
519          * returns NULL.  We proceed to real_lookup(), and while we do this,
520          * another process does open on the same file we looking up (most simple
521          * reproducer), open succeeds and the dentry is added. Now back to
522          * us. In real_lookup() we do d_lookup() again and suddenly find the
523          * dentry, so we call d_revalidate on it, but there is no lock, so
524          * without this code we would return 0, but unpatched real_lookup just
525          * returns -ENOENT in such a case instead of retrying the lookup. Once
526          * this is dealt with in real_lookup(), all of this ugly mess can go and
527          * we can just check locks in ->d_revalidate without doing any RPCs
528          * ever.
529          */
530 do_lookup:
531         if (it != &lookup_it) {
532                 /* MDS_INODELOCK_UPDATE needed for IT_GETATTR case. */
533                 if (it->it_op == IT_GETATTR)
534                         lookup_it.it_op = IT_GETATTR;
535                 ll_lookup_finish_locks(it, de);
536                 it = &lookup_it;
537         }
538
539         /* Do real lookup here. */
540         op_data = ll_prep_md_op_data(NULL, parent, NULL, de->d_name.name,
541                                      de->d_name.len, 0, (it->it_op & IT_CREAT ?
542                                                          LUSTRE_OPC_CREATE :
543                                                          LUSTRE_OPC_ANY), NULL);
544         if (IS_ERR(op_data))
545                 RETURN(PTR_ERR(op_data));
546
547         rc = md_intent_lock(exp, op_data, NULL, 0,  it, 0, &req,
548                             ll_md_blocking_ast, 0);
549         if (rc >= 0) {
550                 struct mdt_body *mdt_body;
551                 struct lu_fid fid = {.f_seq = 0, .f_oid = 0, .f_ver = 0};
552                 mdt_body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
553
554                 if (de->d_inode)
555                         fid = *ll_inode2fid(de->d_inode);
556
557                 /* see if we got same inode, if not - return error */
558                 if (lu_fid_eq(&fid, &mdt_body->fid1)) {
559                         ll_finish_md_op_data(op_data);
560                         op_data = NULL;
561                         goto revalidate_finish;
562                 }
563                 ll_intent_release(it);
564         }
565         ll_finish_md_op_data(op_data);
566         GOTO(out, rc = 0);
567 }
568
569 /*static*/ void ll_pin(struct dentry *de, struct vfsmount *mnt, int flag)
570 {
571         struct inode *inode= de->d_inode;
572         struct ll_sb_info *sbi = ll_i2sbi(inode);
573         struct ll_dentry_data *ldd = ll_d2d(de);
574         struct obd_client_handle *handle;
575         struct obd_capa *oc;
576         int rc = 0;
577         ENTRY;
578         LASSERT(ldd);
579
580         lock_kernel();
581         /* Strictly speaking this introduces an additional race: the
582          * increments should wait until the rpc has returned.
583          * However, given that at present the function is void, this
584          * issue is moot. */
585         if (flag == 1 && (++ldd->lld_mnt_count) > 1) {
586                 unlock_kernel();
587                 EXIT;
588                 return;
589         }
590
591         if (flag == 0 && (++ldd->lld_cwd_count) > 1) {
592                 unlock_kernel();
593                 EXIT;
594                 return;
595         }
596         unlock_kernel();
597
598         handle = (flag) ? &ldd->lld_mnt_och : &ldd->lld_cwd_och;
599         oc = ll_mdscapa_get(inode);
600         rc = obd_pin(sbi->ll_md_exp, ll_inode2fid(inode), oc, handle, flag);
601         capa_put(oc);
602         if (rc) {
603                 lock_kernel();
604                 memset(handle, 0, sizeof(*handle));
605                 if (flag == 0)
606                         ldd->lld_cwd_count--;
607                 else
608                         ldd->lld_mnt_count--;
609                 unlock_kernel();
610         }
611
612         EXIT;
613         return;
614 }
615
616 /*static*/ void ll_unpin(struct dentry *de, struct vfsmount *mnt, int flag)
617 {
618         struct ll_sb_info *sbi = ll_i2sbi(de->d_inode);
619         struct ll_dentry_data *ldd = ll_d2d(de);
620         struct obd_client_handle handle;
621         int count, rc = 0;
622         ENTRY;
623         LASSERT(ldd);
624
625         lock_kernel();
626         /* Strictly speaking this introduces an additional race: the
627          * increments should wait until the rpc has returned.
628          * However, given that at present the function is void, this
629          * issue is moot. */
630         handle = (flag) ? ldd->lld_mnt_och : ldd->lld_cwd_och;
631         if (handle.och_magic != OBD_CLIENT_HANDLE_MAGIC) {
632                 /* the "pin" failed */
633                 unlock_kernel();
634                 EXIT;
635                 return;
636         }
637
638         if (flag)
639                 count = --ldd->lld_mnt_count;
640         else
641                 count = --ldd->lld_cwd_count;
642         unlock_kernel();
643
644         if (count != 0) {
645                 EXIT;
646                 return;
647         }
648
649         rc = obd_unpin(sbi->ll_md_exp, &handle, flag);
650         EXIT;
651         return;
652 }
653
654 #ifdef HAVE_VFS_INTENT_PATCHES
655 static int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
656 {
657         int rc;
658         ENTRY;
659
660         if (nd && nd->flags & LOOKUP_LAST && !(nd->flags & LOOKUP_LINK_NOTLAST))
661                 rc = ll_revalidate_it(dentry, nd->flags, &nd->intent);
662         else
663                 rc = ll_revalidate_it(dentry, 0, NULL);
664
665         RETURN(rc);
666 }
667 #else
668 int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
669 {
670         int rc;
671         ENTRY;
672
673         if (nd && !(nd->flags & (LOOKUP_CONTINUE|LOOKUP_PARENT))) {
674                 struct lookup_intent *it;
675                 it = ll_convert_intent(&nd->intent.open, nd->flags);
676                 if (IS_ERR(it))
677                         RETURN(0);
678                 if (it->it_op == (IT_OPEN|IT_CREAT))
679                         if (nd->intent.open.flags & O_EXCL) {
680                                 CDEBUG(D_VFSTRACE, "create O_EXCL, returning 0\n");
681                                 rc = 0;
682                                 goto out_it;
683                         }
684
685                 rc = ll_revalidate_it(dentry, nd->flags, it);
686
687                 if (rc && (nd->flags & LOOKUP_OPEN) &&
688                     it_disposition(it, DISP_OPEN_OPEN)) {/*Open*/
689 #ifdef HAVE_FILE_IN_STRUCT_INTENT
690 // XXX Code duplication with ll_lookup_nd
691                         if (S_ISFIFO(dentry->d_inode->i_mode)) {
692                                 // We cannot call open here as it would
693                                 // deadlock.
694                                 ptlrpc_req_finished(
695                                                (struct ptlrpc_request *)
696                                                   it->d.lustre.it_data);
697                         } else {
698                                 struct file *filp;
699
700                                 nd->intent.open.file->private_data = it;
701                                 filp = lookup_instantiate_filp(nd, dentry,NULL);
702 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,17))
703 /* 2.6.1[456] have a bug in open_namei() that forgets to check
704  * nd->intent.open.file for error, so we need to return it as lookup's result
705  * instead */
706                                 if (IS_ERR(filp))
707                                         rc = 0;
708 #endif
709                         }
710 #else
711                         ll_release_openhandle(dentry, it);
712 #endif /* HAVE_FILE_IN_STRUCT_INTENT */
713                 }
714                 if (!rc && (nd->flags & LOOKUP_CREATE) &&
715                     it_disposition(it, DISP_OPEN_CREATE)) {
716                         /* We created something but we may only return
717                          * negative dentry here, so save request in dentry,
718                          * if lookup will be called later on, it will
719                          * pick the request, otherwise it would be freed
720                          * with dentry */
721                         ll_d2d(dentry)->lld_it = it;
722                         it = NULL; /* avoid freeing */
723                 }
724
725 out_it:
726                 if (it) {
727                         ll_intent_release(it);
728                         OBD_FREE(it, sizeof(*it));
729                 }
730         } else {
731                 rc = ll_revalidate_it(dentry, 0, NULL);
732         }
733
734         RETURN(rc);
735 }
736 #endif
737
738 struct dentry_operations ll_d_ops = {
739         .d_revalidate = ll_revalidate_nd,
740         .d_release = ll_release,
741         .d_delete = ll_ddelete,
742 #ifdef DCACHE_LUSTRE_INVALID
743         .d_compare = ll_dcompare,
744 #endif
745 #if 0
746         .d_pin = ll_pin,
747         .d_unpin = ll_unpin,
748 #endif
749 };