Whamcloud - gitweb
capa code cleanup.
[fs/lustre-release.git] / lustre / llite / dcache.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #include <linux/fs.h>
23 #include <linux/sched.h>
24 #include <linux/smp_lock.h>
25 #include <linux/quotaops.h>
26
27 #define DEBUG_SUBSYSTEM S_LLITE
28
29 #include <obd_support.h>
30 #include <lustre_lite.h>
31 #include <lustre/lustre_idl.h>
32 #include <lustre_dlm.h>
33 #include <lustre_mdc.h>
34 #include <lustre_ver.h>
35
36 #include "llite_internal.h"
37
38 /* should NOT be called with the dcache lock, see fs/dcache.c */
39 static void ll_release(struct dentry *de)
40 {
41         struct ll_dentry_data *lld;
42         ENTRY;
43         LASSERT(de != NULL);
44         lld = ll_d2d(de);
45         if (lld == NULL) { /* NFS copies the de->d_op methods (bug 4655) */
46                 EXIT;
47                 return;
48         }
49 #ifndef LUSTRE_KERNEL_VERSION
50         if (lld->lld_it) {
51                 ll_intent_release(lld->lld_it);
52                 OBD_FREE(lld->lld_it, sizeof(*lld->lld_it));
53         }
54 #endif
55         LASSERT(lld->lld_cwd_count == 0);
56         LASSERT(lld->lld_mnt_count == 0);
57         OBD_FREE(de->d_fsdata, sizeof(*lld));
58
59         EXIT;
60 }
61
62 #ifdef LUSTRE_KERNEL_VERSION
63 /* Compare if two dentries are the same.  Don't match if the existing dentry
64  * is marked DCACHE_LUSTRE_INVALID.  Returns 1 if different, 0 if the same.
65  *
66  * This avoids a race where ll_lookup_it() instantiates a dentry, but we get
67  * an AST before calling d_revalidate_it().  The dentry still exists (marked
68  * INVALID) so d_lookup() matches it, but we have no lock on it (so
69  * lock_match() fails) and we spin around real_lookup(). */
70 int ll_dcompare(struct dentry *parent, struct qstr *d_name, struct qstr *name)
71 {
72         struct dentry *dchild;
73         ENTRY;
74
75         if (d_name->len != name->len)
76                 RETURN(1);
77
78         if (memcmp(d_name->name, name->name, name->len))
79                 RETURN(1);
80
81         /* XXX: d_name must be in-dentry structure */
82         dchild = container_of(d_name, struct dentry, d_name); /* ugh */
83         if (dchild->d_flags & DCACHE_LUSTRE_INVALID) {
84                 CDEBUG(D_DENTRY,"INVALID dentry %p not matched, was bug 3784\n",
85                        dchild);
86                 RETURN(1);
87         }
88
89         RETURN(0);
90 }
91 #endif
92
93 /* should NOT be called with the dcache lock, see fs/dcache.c */
94 static int ll_ddelete(struct dentry *de)
95 {
96         ENTRY;
97         LASSERT(de);
98 #ifndef DCACHE_LUSTRE_INVALID
99 #define DCACHE_LUSTRE_INVALID 0
100 #endif
101
102         CDEBUG(D_DENTRY, "%s dentry %.*s (%p, parent %p, inode %p) %s%s\n",
103                (de->d_flags & DCACHE_LUSTRE_INVALID ? "deleting" : "keeping"),
104                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
105                d_unhashed(de) ? "" : "hashed,",
106                list_empty(&de->d_subdirs) ? "" : "subdirs");
107 #if DCACHE_LUSTRE_INVALID == 0
108 #undef DCACHE_LUSTRE_INVALID
109 #endif
110
111         RETURN(0);
112 }
113
114 void ll_set_dd(struct dentry *de)
115 {
116         ENTRY;
117         LASSERT(de != NULL);
118
119         CDEBUG(D_DENTRY, "ldd on dentry %.*s (%p) parent %p inode %p refc %d\n",
120                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
121                atomic_read(&de->d_count));
122         lock_kernel();
123         if (de->d_fsdata == NULL) {
124                 OBD_ALLOC(de->d_fsdata, sizeof(struct ll_dentry_data));
125         }
126         unlock_kernel();
127
128         EXIT;
129 }
130
131 void ll_intent_drop_lock(struct lookup_intent *it)
132 {
133         struct lustre_handle *handle;
134
135         if (it->it_op && it->d.lustre.it_lock_mode) {
136                 handle = (struct lustre_handle *)&it->d.lustre.it_lock_handle;
137                 CDEBUG(D_DLMTRACE, "releasing lock with cookie "LPX64
138                        " from it %p\n", handle->cookie, it);
139                 ldlm_lock_decref(handle, it->d.lustre.it_lock_mode);
140
141                 /* bug 494: intent_release may be called multiple times, from
142                  * this thread and we don't want to double-decref this lock */
143                 it->d.lustre.it_lock_mode = 0;
144         }
145 }
146
147 void ll_intent_release(struct lookup_intent *it)
148 {
149         ENTRY;
150
151         CDEBUG(D_INFO, "intent %p released\n", it);
152         ll_intent_drop_lock(it);
153 #ifdef LUSTRE_KERNEL_VERSION
154         it->it_magic = 0;
155         it->it_op_release = 0;
156 #endif
157         /* We are still holding extra reference on a request, need to free it */
158         if (it_disposition(it, DISP_ENQ_OPEN_REF)) /* open req for llfile_open*/
159                 ptlrpc_req_finished(it->d.lustre.it_data);
160         if (it_disposition(it, DISP_ENQ_CREATE_REF)) /* create rec */
161                 ptlrpc_req_finished(it->d.lustre.it_data);
162         if (it_disposition(it, DISP_ENQ_COMPLETE)) /* saved req from revalidate
163                                                     * to lookup */
164                 ptlrpc_req_finished(it->d.lustre.it_data);
165
166         it->d.lustre.it_disposition = 0;
167         it->d.lustre.it_data = NULL;
168         EXIT;
169 }
170
171 /* Drop dentry if it is not used already, unhash otherwise.
172    Should be called with dcache lock held!
173    Returns: 1 if dentry was dropped, 0 if unhashed. */
174 int ll_drop_dentry(struct dentry *dentry)
175 {
176         lock_dentry(dentry);
177         if (atomic_read(&dentry->d_count) == 0) {
178                 CDEBUG(D_DENTRY, "deleting dentry %.*s (%p) parent %p "
179                        "inode %p\n", dentry->d_name.len,
180                        dentry->d_name.name, dentry, dentry->d_parent,
181                        dentry->d_inode);
182                 dget_locked(dentry);
183                 __d_drop(dentry);
184                 unlock_dentry(dentry);
185                 spin_unlock(&dcache_lock);
186                 dput(dentry);
187                 spin_lock(&dcache_lock);
188                 return 1;
189         }
190
191 #ifdef LUSTRE_KERNEL_VERSION
192         if (!(dentry->d_flags & DCACHE_LUSTRE_INVALID)) {
193 #else
194         if (!d_unhashed(dentry)) {
195 #endif
196                 CDEBUG(D_DENTRY, "unhashing dentry %.*s (%p) parent %p "
197                        "inode %p refc %d\n", dentry->d_name.len,
198                        dentry->d_name.name, dentry, dentry->d_parent,
199                        dentry->d_inode, atomic_read(&dentry->d_count));
200                 /* actually we don't unhash the dentry, rather just
201                  * mark it inaccessible for to __d_lookup(). otherwise
202                  * sys_getcwd() could return -ENOENT -bzzz */
203 #ifdef LUSTRE_KERNEL_VERSION
204                 dentry->d_flags |= DCACHE_LUSTRE_INVALID;
205
206                 /* 
207                  * XXX: Try to drop negative not directory dentries to check if
208                  * this is source of OOM on clients on big numbers of created
209                  * files.  --umka
210                  */
211                 if (!dentry->d_inode || !S_ISDIR(dentry->d_inode->i_mode))
212                         __d_drop(dentry);
213 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
214                 __d_drop(dentry);
215                 if (dentry->d_inode) {
216                         /* Put positive dentries to orphan list */
217                         list_add(&dentry->d_hash,
218                                  &ll_i2sbi(dentry->d_inode)->ll_orphan_dentry_list);
219                 }
220 #endif
221 #else
222                 if (!dentry->d_inode || !S_ISDIR(dentry->d_inode->i_mode))
223                         __d_drop(dentry);
224 #endif
225
226         }
227         unlock_dentry(dentry);
228         return 0;
229 }
230
231 void ll_unhash_aliases(struct inode *inode)
232 {
233         struct list_head *tmp, *head;
234         ENTRY;
235
236         if (inode == NULL) {
237                 CERROR("unexpected NULL inode, tell phil\n");
238                 return;
239         }
240
241         CDEBUG(D_INODE, "marking dentries for 111 ino %lu/%u(%p) invalid\n",
242                inode->i_ino, inode->i_generation, inode);
243
244         head = &inode->i_dentry;
245         spin_lock(&dcache_lock);
246 restart:
247         tmp = head;
248         while ((tmp = tmp->next) != head) {
249                 struct dentry *dentry = list_entry(tmp, struct dentry, d_alias);
250
251                 CDEBUG(D_DENTRY, "dentry in drop %.*s (%p) parent %p "
252                        "inode %p flags %d\n", dentry->d_name.len,
253                        dentry->d_name.name, dentry, dentry->d_parent,
254                        dentry->d_inode, dentry->d_flags);
255
256                 if (dentry->d_name.len == 1 && dentry->d_name.name[0] == '/') {
257                         CERROR("called on root (?) dentry=%p, inode=%p "
258                                "ino=%lu\n", dentry, inode, inode->i_ino);
259                         lustre_dump_dentry(dentry, 1);
260                         libcfs_debug_dumpstack(NULL);
261                 } else if (d_mountpoint(dentry)) {
262                         /* For mountpoints we skip removal of the dentry
263                            which happens solely because we have a lock on it
264                            obtained when this dentry was not a mountpoint yet */
265                         CDEBUG(D_DENTRY, "Skippind mountpoint dentry removal "
266                                          "%.*s (%p) parent %p\n",
267                                           dentry->d_name.len,
268                                           dentry->d_name.name,
269                                           dentry, dentry->d_parent);
270
271                         continue;
272                 }
273                 
274                 if (ll_drop_dentry(dentry))
275                           goto restart;
276         }
277         spin_unlock(&dcache_lock);
278         EXIT;
279 }
280
281 int ll_revalidate_it_finish(struct ptlrpc_request *request,
282                             int offset, struct lookup_intent *it,
283                             struct dentry *de)
284 {
285         int rc = 0;
286         ENTRY;
287
288         if (!request)
289                 RETURN(0);
290
291         if (it_disposition(it, DISP_LOOKUP_NEG)) 
292                 RETURN(-ENOENT);
293
294         rc = ll_prep_inode(&de->d_inode,
295                            request, offset, NULL);
296
297         RETURN(rc);
298 }
299
300 void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry)
301 {
302         LASSERT(it != NULL);
303         LASSERT(dentry != NULL);
304
305         if (it->d.lustre.it_lock_mode && dentry->d_inode != NULL) {
306                 struct inode *inode = dentry->d_inode;
307                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
308
309                 CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%u)\n",
310                        inode, inode->i_ino, inode->i_generation);
311                 md_set_lock_data(sbi->ll_md_exp, &it->d.lustre.it_lock_handle,
312                                  inode);
313         }
314
315         /* drop lookup or getattr locks immediately */
316         if (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR) {
317 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
318                 /* on 2.6 there are situation when several lookups and
319                  * revalidations may be requested during single operation.
320                  * therefore, we don't release intent here -bzzz */
321                 ll_intent_drop_lock(it);
322 #else
323                 ll_intent_release(it);
324 #endif
325         }
326 }
327
328 void ll_frob_intent(struct lookup_intent **itp, struct lookup_intent *deft)
329 {
330         struct lookup_intent *it = *itp;
331 #if defined(LUSTRE_KERNEL_VERSION)&&(LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
332         if (it) {
333                 LASSERTF(it->it_magic == INTENT_MAGIC, 
334                          "%p has bad intent magic: %x\n",
335                          it, it->it_magic);
336         }
337 #endif
338
339         if (!it || it->it_op == IT_GETXATTR)
340                 it = *itp = deft;
341
342 #ifdef LUSTRE_KERNEL_VERSION
343         it->it_op_release = ll_intent_release;
344 #endif
345 }
346
347 int ll_revalidate_it(struct dentry *de, int lookup_flags,
348                      struct lookup_intent *it)
349 {
350         int rc;
351         struct md_op_data *op_data;
352         struct ptlrpc_request *req = NULL;
353         struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
354         struct obd_export *exp;
355         struct inode *parent;
356
357         ENTRY;
358         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,intent=%s\n", de->d_name.name,
359                LL_IT2STR(it));
360
361         if (de->d_inode == NULL) {
362                 /* We can only use negative dentries if this is stat or lookup,
363                    for opens and stuff we do need to query server. */
364                 /* If there is IT_CREAT in intent op set, then we must throw
365                    away this negative dentry and actually do the request to
366                    kernel to create whatever needs to be created (if possible)*/
367                 if (it && (it->it_op & IT_CREAT))
368                         RETURN(0);
369
370 #ifdef LUSTRE_KERNEL_VERSION
371                 if (de->d_flags & DCACHE_LUSTRE_INVALID)
372                         RETURN(0);
373 #endif
374
375                 rc = ll_have_md_lock(de->d_parent->d_inode, 
376                                      MDS_INODELOCK_UPDATE);
377         
378                 RETURN(rc);
379         }
380
381         exp = ll_i2mdexp(de->d_inode);
382
383         /* Never execute intents for mount points.
384          * Attributes will be fixed up in ll_inode_revalidate_it */
385         if (d_mountpoint(de))
386                 RETURN(1);
387
388         /* Root of the lustre tree. Always valid.
389          * Attributes will be fixed up in ll_inode_revalidate_it */
390         if (de->d_name.name[0] == '/' && de->d_name.len == 1)
391                 RETURN(1);
392
393         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_REVALIDATE_PAUSE, 5);
394         ll_frob_intent(&it, &lookup_it);
395         LASSERT(it);
396
397         parent = de->d_parent->d_inode;
398
399         if (it->it_op & IT_CREAT) {
400                 /* 
401                  * Allocate new fid for case of create or open(O_CREAT). In both
402                  * cases it->it_op will contain IT_CREAT. In case of
403                  * open(O_CREAT) agains existing file, fid allocating is not
404                  * needed, but this is not known until server returns
405                  * anything. Well, in this case new allocated fid is lost. But
406                  * this is not big deal, we have 64bit fids. --umka
407                  */
408                 struct lu_placement_hint hint = { .ph_pname = NULL,
409                                                   .ph_pfid = ll_inode2fid(parent),
410                                                   .ph_cname = &de->d_name,
411                                                   .ph_opc = LUSTRE_OPC_CREATE };
412
413                 op_data = ll_prep_md_op_data(NULL, parent, NULL,
414                                             de->d_name.name, de->d_name.len, 0);
415                 if (op_data == NULL)
416                         RETURN(-ENOMEM);
417                 rc = ll_fid_md_alloc(ll_i2sbi(parent), &op_data->fid2, &hint);
418                 if (rc) {
419                         ll_finish_md_op_data(op_data);
420                         RETURN(rc);
421                 }
422         } else {
423                 op_data = ll_prep_md_op_data(NULL, parent, de->d_inode,
424                                              de->d_name.name, de->d_name.len, 0);
425                 if (op_data == NULL)
426                         RETURN(-ENOMEM);
427         }
428
429         if ((it->it_op == IT_OPEN) && de->d_inode) {
430                 struct inode *inode = de->d_inode;
431                 struct ll_inode_info *lli = ll_i2info(inode);
432                 struct obd_client_handle **och_p;
433                 __u64 *och_usecount;
434                 
435                 /*
436                  * We used to check for MDS_INODELOCK_OPEN here, but in fact
437                  * just having LOOKUP lock is enough to justify inode is the
438                  * same. And if inode is the same and we have suitable
439                  * openhandle, then there is no point in doing another OPEN RPC
440                  * just to throw away newly received openhandle.  There are no
441                  * security implications too, if file owner or access mode is
442                  * change, LOOKUP lock is revoked.
443                  */
444
445                 it->it_create_mode &= ~current->fs->umask;
446
447                 if (it->it_flags & FMODE_WRITE) {
448                         och_p = &lli->lli_mds_write_och;
449                         och_usecount = &lli->lli_open_fd_write_count;
450                 } else if (it->it_flags & FMODE_EXEC) {
451                         och_p = &lli->lli_mds_exec_och;
452                         och_usecount = &lli->lli_open_fd_exec_count;
453                 } else {
454                         och_p = &lli->lli_mds_read_och;
455                         och_usecount = &lli->lli_open_fd_read_count;
456                 }
457                 /* Check for the proper lock. */
458                 if (!ll_have_md_lock(inode, MDS_INODELOCK_LOOKUP))
459                         goto do_lock;
460                 down(&lli->lli_och_sem);
461                 if (*och_p) { /* Everything is open already, do nothing */
462                         /*(*och_usecount)++;  Do not let them steal our open
463                           handle from under us */
464                         /* XXX The code above was my original idea, but in case
465                            we have the handle, but we cannot use it due to later
466                            checks (e.g. O_CREAT|O_EXCL flags set), nobody
467                            would decrement counter increased here. So we just
468                            hope the lock won't be invalidated in between. But
469                            if it would be, we'll reopen the open request to
470                            MDS later during file open path */
471                         up(&lli->lli_och_sem);
472                         ll_finish_md_op_data(op_data);
473                         RETURN(1);
474                 } else {
475                         up(&lli->lli_och_sem);
476                 }
477         }
478
479 do_lock:
480         it->it_flags |= O_CHECK_STALE;
481         rc = md_intent_lock(exp, op_data, NULL, 0, it, lookup_flags,
482                             &req, ll_md_blocking_ast, 0);
483         it->it_flags &= ~O_CHECK_STALE;
484         
485         ll_finish_md_op_data(op_data);
486         /* If req is NULL, then md_intent_lock only tried to do a lock match;
487          * if all was well, it will return 1 if it found locks, 0 otherwise. */
488         if (req == NULL && rc >= 0) {
489                 if (!rc)
490                         goto do_lookup;
491                 GOTO(out, rc);
492         }
493
494         if (rc < 0) {
495                 if (rc != -ESTALE) {
496                         CDEBUG(D_INFO, "ll_intent_lock: rc %d : it->it_status "
497                                "%d\n", rc, it->d.lustre.it_status);
498                 }
499                 GOTO(out, rc = 0);
500         }
501
502 revalidate_finish:
503         rc = ll_revalidate_it_finish(req, DLM_REPLY_REC_OFF, it, de);
504         if (rc != 0) {
505                 if (rc != -ESTALE && rc != -ENOENT)
506                         ll_intent_release(it);
507                 GOTO(out, rc = 0);
508         }
509
510         if ((it->it_op & IT_OPEN) && de->d_inode && 
511             !S_ISREG(de->d_inode->i_mode) && 
512             !S_ISDIR(de->d_inode->i_mode)) {
513                 ll_release_openhandle(de, it);
514         }
515         rc = 1;
516
517         /* unfortunately ll_intent_lock may cause a callback and revoke our
518          * dentry */
519         spin_lock(&dcache_lock);
520         lock_dentry(de);
521         __d_drop(de);
522         unlock_dentry(de);
523         __d_rehash(de, 0);
524         spin_unlock(&dcache_lock);
525
526 out:
527         /* We do not free request as it may be reused during following lookup
528          * (see comment in mdc/mdc_locks.c::mdc_intent_lock()), request will
529          * be freed in ll_lookup_it or in ll_intent_release. But if
530          * request was not completed, we need to free it. (bug 5154, 9903) */
531         if (req != NULL && !it_disposition(it, DISP_ENQ_COMPLETE))
532                 ptlrpc_req_finished(req);
533         if (rc == 0) {
534 #ifdef LUSTRE_KERNEL_VERSION
535                 ll_unhash_aliases(de->d_inode);
536                 /* done in ll_unhash_aliases()
537                    dentry->d_flags |= DCACHE_LUSTRE_INVALID; */
538 #else
539                 /* We do not want d_invalidate to kill all child dentries too */
540                 d_drop(de);
541 #endif
542         } else {
543                 CDEBUG(D_DENTRY, "revalidated dentry %.*s (%p) parent %p "
544                        "inode %p refc %d\n", de->d_name.len,
545                        de->d_name.name, de, de->d_parent, de->d_inode,
546                        atomic_read(&de->d_count));
547                 ll_lookup_finish_locks(it, de);
548 #ifdef LUSTRE_KERNEL_VERSION
549                 lock_dentry(de);
550                 de->d_flags &= ~DCACHE_LUSTRE_INVALID;
551                 unlock_dentry(de);
552 #endif
553         }
554         RETURN(rc);
555         
556         /*
557          * This part is here to combat evil-evil race in real_lookup on 2.6
558          * kernels.  The race details are: We enter do_lookup() looking for some
559          * name, there is nothing in dcache for this name yet and d_lookup()
560          * returns NULL.  We proceed to real_lookup(), and while we do this,
561          * another process does open on the same file we looking up (most simple
562          * reproducer), open succeeds and the dentry is added. Now back to
563          * us. In real_lookup() we do d_lookup() again and suddenly find the
564          * dentry, so we call d_revalidate on it, but there is no lock, so
565          * without this code we would return 0, but unpatched real_lookup just
566          * returns -ENOENT in such a case instead of retrying the lookup. Once
567          * this is dealt with in real_lookup(), all of this ugly mess can go and
568          * we can just check locks in ->d_revalidate without doing any RPCs
569          * ever.
570          */
571 do_lookup:
572         if (it != &lookup_it) {
573                 ll_lookup_finish_locks(it, de);
574                 it = &lookup_it;
575         }
576         
577         /* do real lookup here */
578         op_data = ll_prep_md_op_data(NULL, parent, NULL,
579                                      de->d_name.name, de->d_name.len, 0);
580         if (op_data == NULL)
581                 RETURN(-ENOMEM);
582         
583         if (it->it_op & IT_CREAT) {
584                 /* 
585                  * Allocate new fid for case of create or open with O_CREAT. In
586                  * both cases it->it_op will contain IT_CREAT.
587                  */
588                 struct lu_placement_hint hint = { .ph_pname = NULL,
589                                                   .ph_pfid = ll_inode2fid(parent),
590                                                   .ph_cname = &de->d_name,
591                                                   .ph_opc = LUSTRE_OPC_CREATE };
592
593                 rc = ll_fid_md_alloc(ll_i2sbi(parent), &op_data->fid2, &hint);
594                 if (rc) {
595                         ll_finish_md_op_data(op_data);
596                         RETURN(rc);
597                 }
598         }
599         
600         rc = md_intent_lock(exp, op_data, NULL, 0,  it, 0, &req,
601                             ll_md_blocking_ast, 0);
602         if (rc >= 0) {
603                 struct mdt_body *mdt_body = lustre_msg_buf(req->rq_repmsg,
604                                                            DLM_REPLY_REC_OFF,
605                                                            sizeof(*mdt_body));
606                 /* see if we got same inode, if not - return error */
607                 if (lu_fid_eq(&op_data->fid2, &mdt_body->fid1)) {
608                         ll_finish_md_op_data(op_data);
609                         op_data = NULL;
610                         goto revalidate_finish;
611                 }
612                 ll_intent_release(it);
613         }
614         ll_finish_md_op_data(op_data);
615         GOTO(out, rc = 0);
616 }
617
618 /*static*/ void ll_pin(struct dentry *de, struct vfsmount *mnt, int flag)
619 {
620         struct inode *inode= de->d_inode;
621         struct ll_sb_info *sbi = ll_i2sbi(inode);
622         struct ll_dentry_data *ldd = ll_d2d(de);
623         struct obd_client_handle *handle;
624         struct obd_capa *oc;
625         int rc = 0;
626         ENTRY;
627         LASSERT(ldd);
628
629         lock_kernel();
630         /* Strictly speaking this introduces an additional race: the
631          * increments should wait until the rpc has returned.
632          * However, given that at present the function is void, this
633          * issue is moot. */
634         if (flag == 1 && (++ldd->lld_mnt_count) > 1) {
635                 unlock_kernel();
636                 EXIT;
637                 return;
638         }
639
640         if (flag == 0 && (++ldd->lld_cwd_count) > 1) {
641                 unlock_kernel();
642                 EXIT;
643                 return;
644         }
645         unlock_kernel();
646
647         handle = (flag) ? &ldd->lld_mnt_och : &ldd->lld_cwd_och;
648         oc = ll_mdscapa_get(inode);
649         rc = obd_pin(sbi->ll_md_exp, ll_inode2fid(inode), oc, handle, flag);
650         capa_put(oc);
651         if (rc) {
652                 lock_kernel();
653                 memset(handle, 0, sizeof(*handle));
654                 if (flag == 0)
655                         ldd->lld_cwd_count--;
656                 else
657                         ldd->lld_mnt_count--;
658                 unlock_kernel();
659         }
660
661         EXIT;
662         return;
663 }
664
665 /*static*/ void ll_unpin(struct dentry *de, struct vfsmount *mnt, int flag)
666 {
667         struct ll_sb_info *sbi = ll_i2sbi(de->d_inode);
668         struct ll_dentry_data *ldd = ll_d2d(de);
669         struct obd_client_handle handle;
670         int count, rc = 0;
671         ENTRY;
672         LASSERT(ldd);
673
674         lock_kernel();
675         /* Strictly speaking this introduces an additional race: the
676          * increments should wait until the rpc has returned.
677          * However, given that at present the function is void, this
678          * issue is moot. */
679         handle = (flag) ? ldd->lld_mnt_och : ldd->lld_cwd_och;
680         if (handle.och_magic != OBD_CLIENT_HANDLE_MAGIC) {
681                 /* the "pin" failed */
682                 unlock_kernel();
683                 EXIT;
684                 return;
685         }
686
687         if (flag)
688                 count = --ldd->lld_mnt_count;
689         else
690                 count = --ldd->lld_cwd_count;
691         unlock_kernel();
692
693         if (count != 0) {
694                 EXIT;
695                 return;
696         }
697
698         rc = obd_unpin(sbi->ll_md_exp, &handle, flag);
699         EXIT;
700         return;
701 }
702
703 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
704 #ifdef LUSTRE_KERNEL_VERSION
705 static int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
706 {
707         int rc;
708         ENTRY;
709
710         if (nd && nd->flags & LOOKUP_LAST && !(nd->flags & LOOKUP_LINK_NOTLAST))
711                 rc = ll_revalidate_it(dentry, nd->flags, &nd->intent);
712         else
713                 rc = ll_revalidate_it(dentry, 0, NULL);
714
715         RETURN(rc);
716 }
717 #else
718 int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
719 {
720         int rc;
721         ENTRY;
722
723         if (nd && !(nd->flags & (LOOKUP_CONTINUE|LOOKUP_PARENT))) {
724                 struct lookup_intent *it;
725                 it = ll_convert_intent(&nd->intent.open, nd->flags);
726                 if (IS_ERR(it))
727                         RETURN(0);
728                 if (it->it_op == (IT_OPEN|IT_CREAT))
729                         if (nd->intent.open.flags & O_EXCL) {
730                                 CDEBUG(D_VFSTRACE, "create O_EXCL, returning 0\n");
731                                 rc = 0;
732                                 goto out_it;
733                         }
734
735                 rc = ll_revalidate_it(dentry, nd->flags, it);
736
737                 if (rc && (nd->flags & LOOKUP_OPEN) &&
738                     it_disposition(it, DISP_OPEN_OPEN)) {/*Open*/
739 #ifdef HAVE_FILE_IN_STRUCT_INTENT
740 // XXX Code duplication with ll_lookup_nd
741                         if (S_ISFIFO(dentry->d_inode->i_mode)) {
742                                 // We cannot call open here as it would
743                                 // deadlock.
744                                 ptlrpc_req_finished(
745                                                (struct ptlrpc_request *)
746                                                   it->d.lustre.it_data);
747                         } else {
748                                 struct file *filp;
749
750                                 nd->intent.open.file->private_data = it;
751                                 filp = lookup_instantiate_filp(nd, dentry,NULL);
752 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,17))
753 /* 2.6.1[456] have a bug in open_namei() that forgets to check
754  * nd->intent.open.file for error, so we need to return it as lookup's result
755  * instead */
756                                 if (IS_ERR(filp))
757                                         rc = 0;
758 #endif
759                         }
760 #else
761                         ll_release_openhandle(dentry, it);
762 #endif /* HAVE_FILE_IN_STRUCT_INTENT */
763                 }
764                 if (!rc && (nd->flags & LOOKUP_CREATE) &&
765                     it_disposition(it, DISP_OPEN_CREATE)) {
766                         /* We created something but we may only return
767                          * negative dentry here, so save request in dentry,
768                          * if lookup will be called later on, it will
769                          * pick the request, otherwise it would be freed
770                          * with dentry */
771                         ll_d2d(dentry)->lld_it = it;
772                         it = NULL; /* avoid freeing */
773                 }
774                         
775 out_it:
776                 if (it) {
777                         ll_intent_release(it);
778                         OBD_FREE(it, sizeof(*it));
779                 }
780         } else {
781                 rc = ll_revalidate_it(dentry, 0, NULL);
782         }
783
784         RETURN(rc);
785 }
786 #endif
787 #endif
788
789 struct dentry_operations ll_d_ops = {
790 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
791         .d_revalidate = ll_revalidate_nd,
792 #else
793         .d_revalidate_it = ll_revalidate_it,
794 #endif
795         .d_release = ll_release,
796         .d_delete = ll_ddelete,
797 #ifdef LUSTRE_KERNEL_VERSION
798         .d_compare = ll_dcompare,
799 #endif
800 #if 0
801         .d_pin = ll_pin,
802         .d_unpin = ll_unpin,
803 #endif
804 };