Whamcloud - gitweb
small cleanup
[fs/lustre-release.git] / lustre / llite / dcache.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #include <linux/fs.h>
23 #include <linux/sched.h>
24 #include <linux/smp_lock.h>
25 #include <linux/quotaops.h>
26
27 #define DEBUG_SUBSYSTEM S_LLITE
28
29 #include <obd_support.h>
30 #include <lustre_lite.h>
31 #include <lustre/lustre_idl.h>
32 #include <lustre_dlm.h>
33 #include <lustre_mdc.h>
34 #include <lustre_ver.h>
35
36 #include "llite_internal.h"
37
38 /* should NOT be called with the dcache lock, see fs/dcache.c */
39 static void ll_release(struct dentry *de)
40 {
41         struct ll_dentry_data *lld;
42         ENTRY;
43         LASSERT(de != NULL);
44         lld = ll_d2d(de);
45         if (lld == NULL) { /* NFS copies the de->d_op methods (bug 4655) */
46                 EXIT;
47                 return;
48         }
49 #ifndef LUSTRE_KERNEL_VERSION
50         if (lld->lld_it) {
51                 ll_intent_release(lld->lld_it);
52                 OBD_FREE(lld->lld_it, sizeof(*lld->lld_it));
53         }
54 #endif
55         LASSERT(lld->lld_cwd_count == 0);
56         LASSERT(lld->lld_mnt_count == 0);
57         OBD_FREE(de->d_fsdata, sizeof(*lld));
58
59         EXIT;
60 }
61
62 #ifdef LUSTRE_KERNEL_VERSION
63 /* Compare if two dentries are the same.  Don't match if the existing dentry
64  * is marked DCACHE_LUSTRE_INVALID.  Returns 1 if different, 0 if the same.
65  *
66  * This avoids a race where ll_lookup_it() instantiates a dentry, but we get
67  * an AST before calling d_revalidate_it().  The dentry still exists (marked
68  * INVALID) so d_lookup() matches it, but we have no lock on it (so
69  * lock_match() fails) and we spin around real_lookup(). */
70 int ll_dcompare(struct dentry *parent, struct qstr *d_name, struct qstr *name)
71 {
72         struct dentry *dchild;
73         ENTRY;
74
75         if (d_name->len != name->len)
76                 RETURN(1);
77
78         if (memcmp(d_name->name, name->name, name->len))
79                 RETURN(1);
80
81         /* XXX: d_name must be in-dentry structure */
82         dchild = container_of(d_name, struct dentry, d_name); /* ugh */
83         if (dchild->d_flags & DCACHE_LUSTRE_INVALID) {
84                 CDEBUG(D_DENTRY,"INVALID dentry %p not matched, was bug 3784\n",
85                        dchild);
86                 RETURN(1);
87         }
88
89         RETURN(0);
90 }
91 #endif
92
93 /* should NOT be called with the dcache lock, see fs/dcache.c */
94 static int ll_ddelete(struct dentry *de)
95 {
96         ENTRY;
97         LASSERT(de);
98 #ifndef DCACHE_LUSTRE_INVALID
99 #define DCACHE_LUSTRE_INVALID 0
100 #endif
101
102         CDEBUG(D_DENTRY, "%s dentry %.*s (%p, parent %p, inode %p) %s%s\n",
103                (de->d_flags & DCACHE_LUSTRE_INVALID ? "deleting" : "keeping"),
104                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
105                d_unhashed(de) ? "" : "hashed,",
106                list_empty(&de->d_subdirs) ? "" : "subdirs");
107 #if DCACHE_LUSTRE_INVALID == 0
108 #undef DCACHE_LUSTRE_INVALID
109 #endif
110
111         RETURN(0);
112 }
113
114 void ll_set_dd(struct dentry *de)
115 {
116         ENTRY;
117         LASSERT(de != NULL);
118
119         CDEBUG(D_DENTRY, "ldd on dentry %.*s (%p) parent %p inode %p refc %d\n",
120                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
121                atomic_read(&de->d_count));
122         lock_kernel();
123         if (de->d_fsdata == NULL) {
124                 OBD_ALLOC(de->d_fsdata, sizeof(struct ll_dentry_data));
125         }
126         unlock_kernel();
127
128         EXIT;
129 }
130
131 void ll_intent_drop_lock(struct lookup_intent *it)
132 {
133         struct lustre_handle *handle;
134
135         if (it->it_op && it->d.lustre.it_lock_mode) {
136                 handle = (struct lustre_handle *)&it->d.lustre.it_lock_handle;
137                 CDEBUG(D_DLMTRACE, "releasing lock with cookie "LPX64
138                        " from it %p\n", handle->cookie, it);
139                 ldlm_lock_decref(handle, it->d.lustre.it_lock_mode);
140
141                 /* bug 494: intent_release may be called multiple times, from
142                  * this thread and we don't want to double-decref this lock */
143                 it->d.lustre.it_lock_mode = 0;
144         }
145 }
146
147 void ll_intent_release(struct lookup_intent *it)
148 {
149         ENTRY;
150
151         CDEBUG(D_INFO, "intent %p released\n", it);
152         ll_intent_drop_lock(it);
153 #ifdef LUSTRE_KERNEL_VERSION
154         it->it_magic = 0;
155         it->it_op_release = 0;
156 #endif
157         /* We are still holding extra reference on a request, need to free it */
158         if (it_disposition(it, DISP_ENQ_OPEN_REF)) /* open req for llfile_open*/
159                 ptlrpc_req_finished(it->d.lustre.it_data);
160         if (it_disposition(it, DISP_ENQ_CREATE_REF)) /* create rec */
161                 ptlrpc_req_finished(it->d.lustre.it_data);
162         if (it_disposition(it, DISP_ENQ_COMPLETE)) /* saved req from revalidate
163                                                     * to lookup */
164                 ptlrpc_req_finished(it->d.lustre.it_data);
165
166         it->d.lustre.it_disposition = 0;
167         it->d.lustre.it_data = NULL;
168         EXIT;
169 }
170
171 /* Drop dentry if it is not used already, unhash otherwise.
172    Should be called with dcache lock held!
173    Returns: 1 if dentry was dropped, 0 if unhashed. */
174 int ll_drop_dentry(struct dentry *dentry)
175 {
176         lock_dentry(dentry);
177         if (atomic_read(&dentry->d_count) == 0) {
178                 CDEBUG(D_DENTRY, "deleting dentry %.*s (%p) parent %p "
179                        "inode %p\n", dentry->d_name.len,
180                        dentry->d_name.name, dentry, dentry->d_parent,
181                        dentry->d_inode);
182                 dget_locked(dentry);
183                 __d_drop(dentry);
184                 unlock_dentry(dentry);
185                 spin_unlock(&dcache_lock);
186                 dput(dentry);
187                 spin_lock(&dcache_lock);
188                 return 1;
189         }
190
191 #ifdef LUSTRE_KERNEL_VERSION
192         if (!(dentry->d_flags & DCACHE_LUSTRE_INVALID)) {
193 #else
194         if (!d_unhashed(dentry)) {
195 #endif
196                 CDEBUG(D_DENTRY, "unhashing dentry %.*s (%p) parent %p "
197                        "inode %p refc %d\n", dentry->d_name.len,
198                        dentry->d_name.name, dentry, dentry->d_parent,
199                        dentry->d_inode, atomic_read(&dentry->d_count));
200                 /* actually we don't unhash the dentry, rather just
201                  * mark it inaccessible for to __d_lookup(). otherwise
202                  * sys_getcwd() could return -ENOENT -bzzz */
203 #ifdef LUSTRE_KERNEL_VERSION
204                 dentry->d_flags |= DCACHE_LUSTRE_INVALID;
205 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
206                 __d_drop(dentry);
207                 if (dentry->d_inode) {
208                         /* Put positive dentries to orphan list */
209                         list_add(&dentry->d_hash,
210                                  &ll_i2sbi(dentry->d_inode)->ll_orphan_dentry_list);
211                 }
212 #endif
213 #else
214                 if (!dentry->d_inode || !S_ISDIR(dentry->d_inode->i_mode))
215                         __d_drop(dentry);
216 #endif
217
218         }
219         unlock_dentry(dentry);
220         return 0;
221 }
222
223 void ll_unhash_aliases(struct inode *inode)
224 {
225         struct list_head *tmp, *head;
226         ENTRY;
227
228         if (inode == NULL) {
229                 CERROR("unexpected NULL inode, tell phil\n");
230                 return;
231         }
232
233         CDEBUG(D_INODE, "marking dentries for 111 ino %lu/%u(%p) invalid\n",
234                inode->i_ino, inode->i_generation, inode);
235
236         head = &inode->i_dentry;
237         spin_lock(&dcache_lock);
238 restart:
239         tmp = head;
240         while ((tmp = tmp->next) != head) {
241                 struct dentry *dentry = list_entry(tmp, struct dentry, d_alias);
242
243                 CDEBUG(D_DENTRY, "dentry in drop %.*s (%p) parent %p "
244                        "inode %p flags %d\n", dentry->d_name.len,
245                        dentry->d_name.name, dentry, dentry->d_parent,
246                        dentry->d_inode, dentry->d_flags);
247
248                 if (dentry->d_name.len == 1 && dentry->d_name.name[0] == '/') {
249                         CERROR("called on root (?) dentry=%p, inode=%p "
250                                "ino=%lu\n", dentry, inode, inode->i_ino);
251                         lustre_dump_dentry(dentry, 1);
252                         libcfs_debug_dumpstack(NULL);
253                 } else if (d_mountpoint(dentry)) {
254                         /* For mountpoints we skip removal of the dentry
255                            which happens solely because we have a lock on it
256                            obtained when this dentry was not a mountpoint yet */
257                         CDEBUG(D_DENTRY, "Skippind mountpoint dentry removal "
258                                          "%.*s (%p) parent %p\n",
259                                           dentry->d_name.len,
260                                           dentry->d_name.name,
261                                           dentry, dentry->d_parent);
262
263                         continue;
264                 }
265                 
266                 if (ll_drop_dentry(dentry))
267                           goto restart;
268         }
269         spin_unlock(&dcache_lock);
270         EXIT;
271 }
272
273 int ll_revalidate_it_finish(struct ptlrpc_request *request,
274                             int offset, struct lookup_intent *it,
275                             struct dentry *de)
276 {
277         int rc = 0;
278         ENTRY;
279
280         if (!request)
281                 RETURN(0);
282
283         if (it_disposition(it, DISP_LOOKUP_NEG)) 
284                 RETURN(-ENOENT);
285
286         rc = ll_prep_inode(&de->d_inode,
287                            request, offset, NULL);
288
289         RETURN(rc);
290 }
291
292 void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry)
293 {
294         LASSERT(it != NULL);
295         LASSERT(dentry != NULL);
296
297         if (it->d.lustre.it_lock_mode && dentry->d_inode != NULL) {
298                 struct inode *inode = dentry->d_inode;
299                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
300
301                 CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%u)\n",
302                        inode, inode->i_ino, inode->i_generation);
303                 md_set_lock_data(sbi->ll_md_exp, &it->d.lustre.it_lock_handle,
304                                  inode);
305         }
306
307         /* drop lookup or getattr locks immediately */
308         if (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR) {
309 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
310                 /* on 2.6 there are situation when several lookups and
311                  * revalidations may be requested during single operation.
312                  * therefore, we don't release intent here -bzzz */
313                 ll_intent_drop_lock(it);
314 #else
315                 ll_intent_release(it);
316 #endif
317         }
318 }
319
320 void ll_frob_intent(struct lookup_intent **itp, struct lookup_intent *deft)
321 {
322         struct lookup_intent *it = *itp;
323 #if defined(LUSTRE_KERNEL_VERSION)&&(LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
324         if (it) {
325                 LASSERTF(it->it_magic == INTENT_MAGIC, 
326                          "%p has bad intent magic: %x\n",
327                          it, it->it_magic);
328         }
329 #endif
330
331         if (!it || it->it_op == IT_GETXATTR)
332                 it = *itp = deft;
333
334 #ifdef LUSTRE_KERNEL_VERSION
335         it->it_op_release = ll_intent_release;
336 #endif
337 }
338
339 int ll_revalidate_it(struct dentry *de, int lookup_flags,
340                      struct lookup_intent *it)
341 {
342         int rc;
343         struct md_op_data *op_data;
344         struct ptlrpc_request *req = NULL;
345         struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
346         struct obd_export *exp;
347         struct inode *parent;
348
349         ENTRY;
350         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,intent=%s\n", de->d_name.name,
351                LL_IT2STR(it));
352
353         if (de->d_inode == NULL) {
354                 /* We can only use negative dentries if this is stat or lookup,
355                    for opens and stuff we do need to query server. */
356                 /* If there is IT_CREAT in intent op set, then we must throw
357                    away this negative dentry and actually do the request to
358                    kernel to create whatever needs to be created (if possible)*/
359                 if (it && (it->it_op & IT_CREAT))
360                         RETURN(0);
361
362 #ifdef LUSTRE_KERNEL_VERSION
363                 if (de->d_flags & DCACHE_LUSTRE_INVALID)
364                         RETURN(0);
365 #endif
366
367                 rc = ll_have_md_lock(de->d_parent->d_inode, 
368                                      MDS_INODELOCK_UPDATE);
369         
370                 RETURN(rc);
371         }
372
373         exp = ll_i2mdexp(de->d_inode);
374
375         /* Never execute intents for mount points.
376          * Attributes will be fixed up in ll_inode_revalidate_it */
377         if (d_mountpoint(de))
378                 RETURN(1);
379
380         /* Root of the lustre tree. Always valid.
381          * Attributes will be fixed up in ll_inode_revalidate_it */
382         if (de->d_name.name[0] == '/' && de->d_name.len == 1)
383                 RETURN(1);
384
385         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_REVALIDATE_PAUSE, 5);
386         ll_frob_intent(&it, &lookup_it);
387         LASSERT(it);
388
389         parent = de->d_parent->d_inode;
390
391         OBD_ALLOC_PTR(op_data);
392         if (op_data == NULL)
393                 RETURN(-ENOMEM);
394        
395         if (it->it_op & IT_CREAT) {
396                 /* 
397                  * Allocate new fid for case of create or open(O_CREAT). In both
398                  * cases it->it_op will contain IT_CREAT. In case of
399                  * open(O_CREAT) agains existing file, fid allocating is not
400                  * needed, but this is not known until server returns
401                  * anything. Well, in this case new allocated fid is lost. But
402                  * this is not big deal, we have 64bit fids. --umka
403                  */
404                 struct lu_placement_hint hint = { .ph_pname = NULL,
405                                                   .ph_pfid = ll_inode2fid(parent),
406                                                   .ph_cname = &de->d_name,
407                                                   .ph_opc = LUSTRE_OPC_CREATE };
408
409                 ll_prepare_md_op_data(op_data, parent, NULL,
410                                       de->d_name.name, de->d_name.len, 0);
411                 rc = ll_fid_md_alloc(ll_i2sbi(parent), &op_data->fid2,
412                                      &hint);
413                 if (rc) {
414                         CERROR("can't allocate new fid, rc %d\n", rc);
415                         LBUG();
416                 }
417         } else {
418                 ll_prepare_md_op_data(op_data, parent, de->d_inode,
419                                       de->d_name.name, de->d_name.len, 0);
420         }
421
422         if ((it->it_op == IT_OPEN) && de->d_inode) {
423                 struct inode *inode = de->d_inode;
424                 struct ll_inode_info *lli = ll_i2info(inode);
425                 struct obd_client_handle **och_p;
426                 __u64 *och_usecount;
427                 
428                 /*
429                  * We used to check for MDS_INODELOCK_OPEN here, but in fact
430                  * just having LOOKUP lock is enough to justify inode is the
431                  * same. And if inode is the same and we have suitable
432                  * openhandle, then there is no point in doing another OPEN RPC
433                  * just to throw away newly received openhandle.  There are no
434                  * security implications too, if file owner or access mode is
435                  * change, LOOKUP lock is revoked.
436                  */
437
438                 it->it_create_mode &= ~current->fs->umask;
439
440                 if (it->it_flags & FMODE_WRITE) {
441                         och_p = &lli->lli_mds_write_och;
442                         och_usecount = &lli->lli_open_fd_write_count;
443                 } else if (it->it_flags & FMODE_EXEC) {
444                         och_p = &lli->lli_mds_exec_och;
445                         och_usecount = &lli->lli_open_fd_exec_count;
446                 } else {
447                         och_p = &lli->lli_mds_read_och;
448                         och_usecount = &lli->lli_open_fd_read_count;
449                 }
450                 /* Check for the proper lock. */
451                 if (!ll_have_md_lock(inode, MDS_INODELOCK_LOOKUP))
452                         goto do_lock;
453                 down(&lli->lli_och_sem);
454                 if (*och_p) { /* Everything is open already, do nothing */
455                         /*(*och_usecount)++;  Do not let them steal our open
456                           handle from under us */
457                         /* XXX The code above was my original idea, but in case
458                            we have the handle, but we cannot use it due to later
459                            checks (e.g. O_CREAT|O_EXCL flags set), nobody
460                            would decrement counter increased here. So we just
461                            hope the lock won't be invalidated in between. But
462                            if it would be, we'll reopen the open request to
463                            MDS later during file open path */
464                         up(&lli->lli_och_sem);
465                         OBD_FREE_PTR(op_data);
466                         RETURN(1);
467                 } else {
468                         up(&lli->lli_och_sem);
469                 }
470         }
471
472 do_lock:
473         it->it_flags |= O_CHECK_STALE;
474         rc = md_intent_lock(exp, op_data, NULL, 0, it, lookup_flags,
475                             &req, ll_md_blocking_ast, 0);
476         it->it_flags &= ~O_CHECK_STALE;
477         
478         OBD_FREE_PTR(op_data);
479         /* If req is NULL, then md_intent_lock only tried to do a lock match;
480          * if all was well, it will return 1 if it found locks, 0 otherwise. */
481         if (req == NULL && rc >= 0) {
482                 if (!rc)
483                         goto do_lookup;
484                 GOTO(out, rc);
485         }
486
487         if (rc < 0) {
488                 if (rc != -ESTALE) {
489                         CDEBUG(D_INFO, "ll_intent_lock: rc %d : it->it_status "
490                                "%d\n", rc, it->d.lustre.it_status);
491                 }
492                 GOTO(out, rc = 0);
493         }
494
495 revalidate_finish:
496         rc = ll_revalidate_it_finish(req, DLM_REPLY_REC_OFF, it, de);
497         if (rc != 0) {
498                 if (rc != -ESTALE && rc != -ENOENT)
499                         ll_intent_release(it);
500                 GOTO(out, rc = 0);
501         }
502
503         if ((it->it_op & IT_OPEN) && de->d_inode && 
504             !S_ISREG(de->d_inode->i_mode) && 
505             !S_ISDIR(de->d_inode->i_mode)) {
506                 ll_release_openhandle(de, it);
507         }
508         rc = 1;
509
510         /* unfortunately ll_intent_lock may cause a callback and revoke our
511          * dentry */
512         spin_lock(&dcache_lock);
513         lock_dentry(de);
514         __d_drop(de);
515         unlock_dentry(de);
516         __d_rehash(de, 0);
517         spin_unlock(&dcache_lock);
518
519 out:
520         /* We do not free request as it may be reused during following lookup
521          * (see comment in mdc/mdc_locks.c::mdc_intent_lock()), request will
522          * be freed in ll_lookup_it or in ll_intent_release. But if
523          * request was not completed, we need to free it. (bug 5154, 9903) */
524         if (req != NULL && !it_disposition(it, DISP_ENQ_COMPLETE))
525                 ptlrpc_req_finished(req);
526         if (rc == 0) {
527 #ifdef LUSTRE_KERNEL_VERSION
528                 ll_unhash_aliases(de->d_inode);
529                 /* done in ll_unhash_aliases()
530                    dentry->d_flags |= DCACHE_LUSTRE_INVALID; */
531 #else
532                 /* We do not want d_invalidate to kill all child dentries too */
533                 d_drop(de);
534 #endif
535         } else {
536                 CDEBUG(D_DENTRY, "revalidated dentry %.*s (%p) parent %p "
537                        "inode %p refc %d\n", de->d_name.len,
538                        de->d_name.name, de, de->d_parent, de->d_inode,
539                        atomic_read(&de->d_count));
540                 ll_lookup_finish_locks(it, de);
541 #ifdef LUSTRE_KERNEL_VERSION
542                 lock_dentry(de);
543                 de->d_flags &= ~DCACHE_LUSTRE_INVALID;
544                 unlock_dentry(de);
545 #endif
546         }
547         RETURN(rc);
548         
549         /*
550          * This part is here to combat evil-evil race in real_lookup on 2.6
551          * kernels.  The race details are: We enter do_lookup() looking for some
552          * name, there is nothing in dcache for this name yet and d_lookup()
553          * returns NULL.  We proceed to real_lookup(), and while we do this,
554          * another process does open on the same file we looking up (most simple
555          * reproducer), open succeeds and the dentry is added. Now back to
556          * us. In real_lookup() we do d_lookup() again and suddenly find the
557          * dentry, so we call d_revalidate on it, but there is no lock, so
558          * without this code we would return 0, but unpatched real_lookup just
559          * returns -ENOENT in such a case instead of retrying the lookup. Once
560          * this is dealt with in real_lookup(), all of this ugly mess can go and
561          * we can just check locks in ->d_revalidate without doing any RPCs
562          * ever.
563          */
564 do_lookup:
565         if (it != &lookup_it) {
566                 ll_lookup_finish_locks(it, de);
567                 it = &lookup_it;
568         }
569         
570         OBD_ALLOC_PTR(op_data);
571         if (op_data == NULL)
572                 RETURN(-ENOMEM);
573
574         /* do real lookup here */
575         ll_prepare_md_op_data(op_data, parent, NULL,
576                               de->d_name.name, de->d_name.len, 0);
577         
578         if (it->it_op & IT_CREAT) {
579                 /* 
580                  * Allocate new fid for case of create or open with O_CREAT. In
581                  * both cases it->it_op will contain IT_CREAT.
582                  */
583                 struct lu_placement_hint hint = { .ph_pname = NULL,
584                                                   .ph_pfid = ll_inode2fid(parent),
585                                                   .ph_cname = &de->d_name,
586                                                   .ph_opc = LUSTRE_OPC_CREATE };
587
588                 rc = ll_fid_md_alloc(ll_i2sbi(parent), &op_data->fid2,
589                                      &hint);
590                 if (rc) {
591                         CERROR("can't allocate new fid, rc %d\n", rc);
592                         LBUG();
593                 }
594         }
595         
596         rc = md_intent_lock(exp, op_data, NULL, 0,  it, 0, &req,
597                             ll_md_blocking_ast, 0);
598         if (rc >= 0) {
599                 struct mdt_body *mdt_body = lustre_msg_buf(req->rq_repmsg,
600                                                            DLM_REPLY_REC_OFF,
601                                                            sizeof(*mdt_body));
602                 /* see if we got same inode, if not - return error */
603                 if(lu_fid_eq(&op_data->fid2, &mdt_body->fid1)) {
604                         OBD_FREE_PTR(op_data);
605                         goto revalidate_finish;
606                 }
607                 ll_intent_release(it);
608         }
609         OBD_FREE_PTR(op_data);
610         GOTO(out, rc = 0);
611 }
612
613 /*static*/ void ll_pin(struct dentry *de, struct vfsmount *mnt, int flag)
614 {
615         struct inode *inode= de->d_inode;
616         struct ll_sb_info *sbi = ll_i2sbi(inode);
617         struct ll_dentry_data *ldd = ll_d2d(de);
618         struct obd_client_handle *handle;
619         int rc = 0;
620         ENTRY;
621         LASSERT(ldd);
622
623         lock_kernel();
624         /* Strictly speaking this introduces an additional race: the
625          * increments should wait until the rpc has returned.
626          * However, given that at present the function is void, this
627          * issue is moot. */
628         if (flag == 1 && (++ldd->lld_mnt_count) > 1) {
629                 unlock_kernel();
630                 EXIT;
631                 return;
632         }
633
634         if (flag == 0 && (++ldd->lld_cwd_count) > 1) {
635                 unlock_kernel();
636                 EXIT;
637                 return;
638         }
639         unlock_kernel();
640
641         handle = (flag) ? &ldd->lld_mnt_och : &ldd->lld_cwd_och;
642         rc = obd_pin(sbi->ll_md_exp, &ll_i2info(inode)->lli_fid,
643                      handle, flag);
644
645         if (rc) {
646                 lock_kernel();
647                 memset(handle, 0, sizeof(*handle));
648                 if (flag == 0)
649                         ldd->lld_cwd_count--;
650                 else
651                         ldd->lld_mnt_count--;
652                 unlock_kernel();
653         }
654
655         EXIT;
656         return;
657 }
658
659 /*static*/ void ll_unpin(struct dentry *de, struct vfsmount *mnt, int flag)
660 {
661         struct ll_sb_info *sbi = ll_i2sbi(de->d_inode);
662         struct ll_dentry_data *ldd = ll_d2d(de);
663         struct obd_client_handle handle;
664         int count, rc = 0;
665         ENTRY;
666         LASSERT(ldd);
667
668         lock_kernel();
669         /* Strictly speaking this introduces an additional race: the
670          * increments should wait until the rpc has returned.
671          * However, given that at present the function is void, this
672          * issue is moot. */
673         handle = (flag) ? ldd->lld_mnt_och : ldd->lld_cwd_och;
674         if (handle.och_magic != OBD_CLIENT_HANDLE_MAGIC) {
675                 /* the "pin" failed */
676                 unlock_kernel();
677                 EXIT;
678                 return;
679         }
680
681         if (flag)
682                 count = --ldd->lld_mnt_count;
683         else
684                 count = --ldd->lld_cwd_count;
685         unlock_kernel();
686
687         if (count != 0) {
688                 EXIT;
689                 return;
690         }
691
692         rc = obd_unpin(sbi->ll_md_exp, &handle, flag);
693         EXIT;
694         return;
695 }
696
697 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
698 #ifdef LUSTRE_KERNEL_VERSION
699 static int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
700 {
701         int rc;
702         ENTRY;
703
704         if (nd && nd->flags & LOOKUP_LAST && !(nd->flags & LOOKUP_LINK_NOTLAST))
705                 rc = ll_revalidate_it(dentry, nd->flags, &nd->intent);
706         else
707                 rc = ll_revalidate_it(dentry, 0, NULL);
708
709         RETURN(rc);
710 }
711 #else
712 int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
713 {
714         int rc;
715         ENTRY;
716
717         if (nd && !(nd->flags & (LOOKUP_CONTINUE|LOOKUP_PARENT))) {
718                 struct lookup_intent *it;
719                 it = ll_convert_intent(&nd->intent.open, nd->flags);
720                 if (IS_ERR(it))
721                         RETURN(0);
722                 if (it->it_op == (IT_OPEN|IT_CREAT))
723                         if (nd->intent.open.flags & O_EXCL) {
724                                 CDEBUG(D_VFSTRACE, "create O_EXCL, returning 0\n");
725                                 rc = 0;
726                                 goto out_it;
727                         }
728
729                 rc = ll_revalidate_it(dentry, nd->flags, it);
730
731                 if (rc && (nd->flags & LOOKUP_OPEN) &&
732                     it_disposition(it, DISP_OPEN_OPEN)) {/*Open*/
733 #ifdef HAVE_FILE_IN_STRUCT_INTENT
734 // XXX Code duplication with ll_lookup_nd
735                         if (S_ISFIFO(dentry->d_inode->i_mode)) {
736                                 // We cannot call open here as it would
737                                 // deadlock.
738                                 ptlrpc_req_finished(
739                                                (struct ptlrpc_request *)
740                                                   it->d.lustre.it_data);
741                         } else {
742                                 struct file *filp;
743
744                                 nd->intent.open.file->private_data = it;
745                                 filp = lookup_instantiate_filp(nd, dentry,NULL);
746 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,17))
747 /* 2.6.1[456] have a bug in open_namei() that forgets to check
748  * nd->intent.open.file for error, so we need to return it as lookup's result
749  * instead */
750                                 if (IS_ERR(filp))
751                                         rc = 0;
752 #endif
753                         }
754 #else
755                         ll_release_openhandle(dentry, it);
756 #endif /* HAVE_FILE_IN_STRUCT_INTENT */
757                 }
758                 if (!rc && (nd->flags & LOOKUP_CREATE) &&
759                     it_disposition(it, DISP_OPEN_CREATE)) {
760                         /* We created something but we may only return
761                          * negative dentry here, so save request in dentry,
762                          * if lookup will be called later on, it will
763                          * pick the request, otherwise it would be freed
764                          * with dentry */
765                         ll_d2d(dentry)->lld_it = it;
766                         it = NULL; /* avoid freeing */
767                 }
768                         
769 out_it:
770                 if (it) {
771                         ll_intent_release(it);
772                         OBD_FREE(it, sizeof(*it));
773                 }
774         } else {
775                 rc = ll_revalidate_it(dentry, 0, NULL);
776         }
777
778         RETURN(rc);
779 }
780 #endif
781 #endif
782
783 struct dentry_operations ll_d_ops = {
784 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
785         .d_revalidate = ll_revalidate_nd,
786 #else
787         .d_revalidate_it = ll_revalidate_it,
788 #endif
789         .d_release = ll_release,
790         .d_delete = ll_ddelete,
791 #ifdef LUSTRE_KERNEL_VERSION
792         .d_compare = ll_dcompare,
793 #endif
794 #if 0
795         .d_pin = ll_pin,
796         .d_unpin = ll_unpin,
797 #endif
798 };