Whamcloud - gitweb
- make HEAD from b_post_cmd3
[fs/lustre-release.git] / lustre / llite / dcache.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #include <linux/fs.h>
23 #include <linux/sched.h>
24 #include <linux/smp_lock.h>
25 #include <linux/quotaops.h>
26
27 #define DEBUG_SUBSYSTEM S_LLITE
28
29 #include <obd_support.h>
30 #include <lustre_lite.h>
31 #include <lustre/lustre_idl.h>
32 #include <lustre_dlm.h>
33 #include <lustre_mdc.h>
34 //#include <lustre_ver.h>
35 //#include <lustre_version.h>
36
37 #include "llite_internal.h"
38
39 /* should NOT be called with the dcache lock, see fs/dcache.c */
40 static void ll_release(struct dentry *de)
41 {
42         struct ll_dentry_data *lld;
43         ENTRY;
44         LASSERT(de != NULL);
45         lld = ll_d2d(de);
46         if (lld == NULL) { /* NFS copies the de->d_op methods (bug 4655) */
47                 EXIT;
48                 return;
49         }
50 #ifndef LUSTRE_KERNEL_VERSION
51         if (lld->lld_it) {
52                 ll_intent_release(lld->lld_it);
53                 OBD_FREE(lld->lld_it, sizeof(*lld->lld_it));
54         }
55 #endif
56         LASSERT(lld->lld_cwd_count == 0);
57         LASSERT(lld->lld_mnt_count == 0);
58         OBD_FREE(de->d_fsdata, sizeof(*lld));
59
60         EXIT;
61 }
62
63 #ifdef LUSTRE_KERNEL_VERSION
64 /* Compare if two dentries are the same.  Don't match if the existing dentry
65  * is marked DCACHE_LUSTRE_INVALID.  Returns 1 if different, 0 if the same.
66  *
67  * This avoids a race where ll_lookup_it() instantiates a dentry, but we get
68  * an AST before calling d_revalidate_it().  The dentry still exists (marked
69  * INVALID) so d_lookup() matches it, but we have no lock on it (so
70  * lock_match() fails) and we spin around real_lookup(). */
71 int ll_dcompare(struct dentry *parent, struct qstr *d_name, struct qstr *name)
72 {
73         struct dentry *dchild;
74         ENTRY;
75
76         if (d_name->len != name->len)
77                 RETURN(1);
78
79         if (memcmp(d_name->name, name->name, name->len))
80                 RETURN(1);
81
82         /* XXX: d_name must be in-dentry structure */
83         dchild = container_of(d_name, struct dentry, d_name); /* ugh */
84         if (dchild->d_flags & DCACHE_LUSTRE_INVALID) {
85                 CDEBUG(D_DENTRY,"INVALID dentry %p not matched, was bug 3784\n",
86                        dchild);
87                 RETURN(1);
88         }
89
90         RETURN(0);
91 }
92 #endif
93
94 /* should NOT be called with the dcache lock, see fs/dcache.c */
95 static int ll_ddelete(struct dentry *de)
96 {
97         ENTRY;
98         LASSERT(de);
99 #ifndef DCACHE_LUSTRE_INVALID
100 #define DCACHE_LUSTRE_INVALID 0
101 #endif
102
103         CDEBUG(D_DENTRY, "%s dentry %.*s (%p, parent %p, inode %p) %s%s\n",
104                (de->d_flags & DCACHE_LUSTRE_INVALID ? "deleting" : "keeping"),
105                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
106                d_unhashed(de) ? "" : "hashed,",
107                list_empty(&de->d_subdirs) ? "" : "subdirs");
108 #if DCACHE_LUSTRE_INVALID == 0
109 #undef DCACHE_LUSTRE_INVALID
110 #endif
111
112         RETURN(0);
113 }
114
115 void ll_set_dd(struct dentry *de)
116 {
117         ENTRY;
118         LASSERT(de != NULL);
119
120         CDEBUG(D_DENTRY, "ldd on dentry %.*s (%p) parent %p inode %p refc %d\n",
121                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
122                atomic_read(&de->d_count));
123         lock_kernel();
124         if (de->d_fsdata == NULL) {
125                 OBD_ALLOC(de->d_fsdata, sizeof(struct ll_dentry_data));
126         }
127         unlock_kernel();
128
129         EXIT;
130 }
131
132 void ll_intent_drop_lock(struct lookup_intent *it)
133 {
134         struct lustre_handle *handle;
135
136         if (it->it_op && it->d.lustre.it_lock_mode) {
137                 handle = (struct lustre_handle *)&it->d.lustre.it_lock_handle;
138                 CDEBUG(D_DLMTRACE, "releasing lock with cookie "LPX64
139                        " from it %p\n", handle->cookie, it);
140                 ldlm_lock_decref(handle, it->d.lustre.it_lock_mode);
141
142                 /* bug 494: intent_release may be called multiple times, from
143                  * this thread and we don't want to double-decref this lock */
144                 it->d.lustre.it_lock_mode = 0;
145         }
146 }
147
148 void ll_intent_release(struct lookup_intent *it)
149 {
150         ENTRY;
151
152         CDEBUG(D_INFO, "intent %p released\n", it);
153         ll_intent_drop_lock(it);
154 #ifdef LUSTRE_KERNEL_VERSION
155         it->it_magic = 0;
156         it->it_op_release = 0;
157 #endif
158         /* We are still holding extra reference on a request, need to free it */
159         if (it_disposition(it, DISP_ENQ_OPEN_REF)) /* open req for llfile_open*/
160                 ptlrpc_req_finished(it->d.lustre.it_data);
161         if (it_disposition(it, DISP_ENQ_CREATE_REF)) /* create rec */
162                 ptlrpc_req_finished(it->d.lustre.it_data);
163         if (it_disposition(it, DISP_ENQ_COMPLETE)) /* saved req from revalidate
164                                                     * to lookup */
165                 ptlrpc_req_finished(it->d.lustre.it_data);
166
167         it->d.lustre.it_disposition = 0;
168         it->d.lustre.it_data = NULL;
169         EXIT;
170 }
171
172 /* Drop dentry if it is not used already, unhash otherwise.
173    Should be called with dcache lock held!
174    Returns: 1 if dentry was dropped, 0 if unhashed. */
175 int ll_drop_dentry(struct dentry *dentry)
176 {
177         lock_dentry(dentry);
178         if (atomic_read(&dentry->d_count) == 0) {
179                 CDEBUG(D_DENTRY, "deleting dentry %.*s (%p) parent %p "
180                        "inode %p\n", dentry->d_name.len,
181                        dentry->d_name.name, dentry, dentry->d_parent,
182                        dentry->d_inode);
183                 dget_locked(dentry);
184                 __d_drop(dentry);
185                 unlock_dentry(dentry);
186                 spin_unlock(&dcache_lock);
187                 dput(dentry);
188                 spin_lock(&dcache_lock);
189                 return 1;
190         }
191         /* disconected dentry can not be find without lookup, because we 
192          * not need his to unhash or mark invalid. */
193         if (dentry->d_flags & DCACHE_DISCONNECTED) {
194                 unlock_dentry(dentry);
195                 RETURN (0);
196         }
197
198 #ifdef LUSTRE_KERNEL_VERSION
199         if (!(dentry->d_flags & DCACHE_LUSTRE_INVALID)) {
200 #else
201         if (!d_unhashed(dentry)) {
202 #endif
203                 CDEBUG(D_DENTRY, "unhashing dentry %.*s (%p) parent %p "
204                        "inode %p refc %d\n", dentry->d_name.len,
205                        dentry->d_name.name, dentry, dentry->d_parent,
206                        dentry->d_inode, atomic_read(&dentry->d_count));
207                 /* actually we don't unhash the dentry, rather just
208                  * mark it inaccessible for to __d_lookup(). otherwise
209                  * sys_getcwd() could return -ENOENT -bzzz */
210 #ifdef LUSTRE_KERNEL_VERSION
211                 dentry->d_flags |= DCACHE_LUSTRE_INVALID;
212 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
213                 __d_drop(dentry);
214                 if (dentry->d_inode) {
215                         /* Put positive dentries to orphan list */
216                         list_add(&dentry->d_hash,
217                                  &ll_i2sbi(dentry->d_inode)->ll_orphan_dentry_list);
218                 }
219 #else
220                 if (!dentry->d_inode || !S_ISDIR(dentry->d_inode->i_mode))
221                         __d_drop(dentry);
222 #endif
223 #else
224                 if (!dentry->d_inode || !S_ISDIR(dentry->d_inode->i_mode))
225                         __d_drop(dentry);
226 #endif
227
228         }
229         unlock_dentry(dentry);
230         return 0;
231 }
232
233 void ll_unhash_aliases(struct inode *inode)
234 {
235         struct list_head *tmp, *head;
236         ENTRY;
237
238         if (inode == NULL) {
239                 CERROR("unexpected NULL inode, tell phil\n");
240                 return;
241         }
242
243         CDEBUG(D_INODE, "marking dentries for ino %lu/%u(%p) invalid\n",
244                inode->i_ino, inode->i_generation, inode);
245
246         head = &inode->i_dentry;
247         spin_lock(&dcache_lock);
248 restart:
249         tmp = head;
250         while ((tmp = tmp->next) != head) {
251                 struct dentry *dentry = list_entry(tmp, struct dentry, d_alias);
252
253                 CDEBUG(D_DENTRY, "dentry in drop %.*s (%p) parent %p "
254                        "inode %p flags %d\n", dentry->d_name.len,
255                        dentry->d_name.name, dentry, dentry->d_parent,
256                        dentry->d_inode, dentry->d_flags);
257
258                 if (dentry->d_name.len == 1 && dentry->d_name.name[0] == '/') {
259                         CERROR("called on root (?) dentry=%p, inode=%p "
260                                "ino=%lu\n", dentry, inode, inode->i_ino);
261                         lustre_dump_dentry(dentry, 1);
262                         libcfs_debug_dumpstack(NULL);
263                 } else if (d_mountpoint(dentry)) {
264                         /* For mountpoints we skip removal of the dentry
265                            which happens solely because we have a lock on it
266                            obtained when this dentry was not a mountpoint yet */
267                         CDEBUG(D_DENTRY, "Skippind mountpoint dentry removal "
268                                          "%.*s (%p) parent %p\n",
269                                           dentry->d_name.len,
270                                           dentry->d_name.name,
271                                           dentry, dentry->d_parent);
272
273                         continue;
274                 }
275                 
276                 if (ll_drop_dentry(dentry))
277                           goto restart;
278         }
279         spin_unlock(&dcache_lock);
280         EXIT;
281 }
282
283 int ll_revalidate_it_finish(struct ptlrpc_request *request,
284                             int offset, struct lookup_intent *it,
285                             struct dentry *de)
286 {
287         int rc = 0;
288         ENTRY;
289
290         if (!request)
291                 RETURN(0);
292
293         if (it_disposition(it, DISP_LOOKUP_NEG)) 
294                 RETURN(-ENOENT);
295
296         rc = ll_prep_inode(&de->d_inode,
297                            request, offset, NULL);
298
299         RETURN(rc);
300 }
301
302 void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry)
303 {
304         LASSERT(it != NULL);
305         LASSERT(dentry != NULL);
306
307         if (it->d.lustre.it_lock_mode && dentry->d_inode != NULL) {
308                 struct inode *inode = dentry->d_inode;
309                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
310
311                 CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%u)\n",
312                        inode, inode->i_ino, inode->i_generation);
313                 md_set_lock_data(sbi->ll_md_exp, &it->d.lustre.it_lock_handle,
314                                  inode);
315         }
316
317         /* drop lookup or getattr locks immediately */
318         if (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR) {
319 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
320                 /* on 2.6 there are situation when several lookups and
321                  * revalidations may be requested during single operation.
322                  * therefore, we don't release intent here -bzzz */
323                 ll_intent_drop_lock(it);
324 #else
325                 ll_intent_release(it);
326 #endif
327         }
328 }
329
330 void ll_frob_intent(struct lookup_intent **itp, struct lookup_intent *deft)
331 {
332         struct lookup_intent *it = *itp;
333 #if defined(LUSTRE_KERNEL_VERSION)&&(LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
334         if (it) {
335                 LASSERTF(it->it_magic == INTENT_MAGIC, 
336                          "%p has bad intent magic: %x\n",
337                          it, it->it_magic);
338         }
339 #endif
340
341         if (!it || it->it_op == IT_GETXATTR)
342                 it = *itp = deft;
343
344 #ifdef LUSTRE_KERNEL_VERSION
345         it->it_op_release = ll_intent_release;
346 #endif
347 }
348
349 int ll_revalidate_it(struct dentry *de, int lookup_flags,
350                      struct lookup_intent *it)
351 {
352         int rc;
353         struct md_op_data *op_data;
354         struct ptlrpc_request *req = NULL;
355         struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
356         struct obd_export *exp;
357         struct inode *parent;
358
359         ENTRY;
360         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,intent=%s\n", de->d_name.name,
361                LL_IT2STR(it));
362
363         if (de->d_inode == NULL) {
364                 /* We can only use negative dentries if this is stat or lookup,
365                    for opens and stuff we do need to query server. */
366                 /* If there is IT_CREAT in intent op set, then we must throw
367                    away this negative dentry and actually do the request to
368                    kernel to create whatever needs to be created (if possible)*/
369                 if (it && (it->it_op & IT_CREAT))
370                         RETURN(0);
371
372 #ifdef LUSTRE_KERNEL_VERSION
373                 if (de->d_flags & DCACHE_LUSTRE_INVALID)
374                         RETURN(0);
375 #endif
376
377                 rc = ll_have_md_lock(de->d_parent->d_inode, 
378                                      MDS_INODELOCK_UPDATE);
379         
380                 RETURN(rc);
381         }
382
383         exp = ll_i2mdexp(de->d_inode);
384
385         /* Never execute intents for mount points.
386          * Attributes will be fixed up in ll_inode_revalidate_it */
387         if (d_mountpoint(de))
388                 RETURN(1);
389
390         /* Root of the lustre tree. Always valid.
391          * Attributes will be fixed up in ll_inode_revalidate_it */
392         if (de->d_name.name[0] == '/' && de->d_name.len == 1)
393                 RETURN(1);
394
395         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_REVALIDATE_PAUSE, 5);
396         ll_frob_intent(&it, &lookup_it);
397         LASSERT(it);
398
399         parent = de->d_parent->d_inode;
400
401         if (it->it_op & IT_CREAT) {
402                 op_data = ll_prep_md_op_data(NULL, parent, NULL, de->d_name.name,
403                                              de->d_name.len, 0, LUSTRE_OPC_CREATE);
404         } else {
405                 op_data = ll_prep_md_op_data(NULL, parent, de->d_inode,
406                                              de->d_name.name, de->d_name.len,
407                                              0, LUSTRE_OPC_ANY);
408         }
409         if (IS_ERR(op_data))
410                 RETURN(PTR_ERR(op_data));
411
412
413         if ((it->it_op == IT_OPEN) && de->d_inode) {
414                 struct inode *inode = de->d_inode;
415                 struct ll_inode_info *lli = ll_i2info(inode);
416                 struct obd_client_handle **och_p;
417                 __u64 *och_usecount;
418                 
419                 /*
420                  * We used to check for MDS_INODELOCK_OPEN here, but in fact
421                  * just having LOOKUP lock is enough to justify inode is the
422                  * same. And if inode is the same and we have suitable
423                  * openhandle, then there is no point in doing another OPEN RPC
424                  * just to throw away newly received openhandle.  There are no
425                  * security implications too, if file owner or access mode is
426                  * change, LOOKUP lock is revoked.
427                  */
428
429
430                 if (it->it_flags & FMODE_WRITE) {
431                         och_p = &lli->lli_mds_write_och;
432                         och_usecount = &lli->lli_open_fd_write_count;
433                 } else if (it->it_flags & FMODE_EXEC) {
434                         och_p = &lli->lli_mds_exec_och;
435                         och_usecount = &lli->lli_open_fd_exec_count;
436                 } else {
437                         och_p = &lli->lli_mds_read_och;
438                         och_usecount = &lli->lli_open_fd_read_count;
439                 }
440                 /* Check for the proper lock. */
441                 if (!ll_have_md_lock(inode, MDS_INODELOCK_LOOKUP))
442                         goto do_lock;
443                 down(&lli->lli_och_sem);
444                 if (*och_p) { /* Everything is open already, do nothing */
445                         /*(*och_usecount)++;  Do not let them steal our open
446                           handle from under us */
447                         /* XXX The code above was my original idea, but in case
448                            we have the handle, but we cannot use it due to later
449                            checks (e.g. O_CREAT|O_EXCL flags set), nobody
450                            would decrement counter increased here. So we just
451                            hope the lock won't be invalidated in between. But
452                            if it would be, we'll reopen the open request to
453                            MDS later during file open path */
454                         up(&lli->lli_och_sem);
455                         ll_finish_md_op_data(op_data);
456                         RETURN(1);
457                 } else {
458                         up(&lli->lli_och_sem);
459                 }
460         }
461
462 do_lock:
463         it->it_create_mode &= ~current->fs->umask;
464         it->it_flags |= O_CHECK_STALE;
465         rc = md_intent_lock(exp, op_data, NULL, 0, it,
466                             lookup_flags,
467                             &req, ll_md_blocking_ast, 0);
468         it->it_flags &= ~O_CHECK_STALE;
469         ll_finish_md_op_data(op_data);
470         /* If req is NULL, then md_intent_lock only tried to do a lock match;
471          * if all was well, it will return 1 if it found locks, 0 otherwise. */
472         if (req == NULL && rc >= 0) {
473                 if (!rc)
474                         goto do_lookup;
475                 GOTO(out, rc);
476         }
477
478         if (rc < 0) {
479                 if (rc != -ESTALE) {
480                         CDEBUG(D_INFO, "ll_intent_lock: rc %d : it->it_status "
481                                "%d\n", rc, it->d.lustre.it_status);
482                 }
483                 GOTO(out, rc = 0);
484         }
485
486 revalidate_finish:
487         rc = ll_revalidate_it_finish(req, DLM_REPLY_REC_OFF, it, de);
488         if (rc != 0) {
489                 if (rc != -ESTALE && rc != -ENOENT)
490                         ll_intent_release(it);
491                 GOTO(out, rc = 0);
492         }
493
494         if ((it->it_op & IT_OPEN) && de->d_inode && 
495             !S_ISREG(de->d_inode->i_mode) && 
496             !S_ISDIR(de->d_inode->i_mode)) {
497                 ll_release_openhandle(de, it);
498         }
499         rc = 1;
500
501         /* unfortunately ll_intent_lock may cause a callback and revoke our
502          * dentry */
503         spin_lock(&dcache_lock);
504         lock_dentry(de);
505         __d_drop(de);
506         unlock_dentry(de);
507         __d_rehash(de, 0);
508         spin_unlock(&dcache_lock);
509
510 out:
511         /* We do not free request as it may be reused during following lookup
512          * (see comment in mdc/mdc_locks.c::mdc_intent_lock()), request will
513          * be freed in ll_lookup_it or in ll_intent_release. But if
514          * request was not completed, we need to free it. (bug 5154, 9903) */
515         if (req != NULL && !it_disposition(it, DISP_ENQ_COMPLETE))
516                 ptlrpc_req_finished(req);
517         if (rc == 0) {
518 #ifdef LUSTRE_KERNEL_VERSION
519                 ll_unhash_aliases(de->d_inode);
520                 /* done in ll_unhash_aliases()
521                    dentry->d_flags |= DCACHE_LUSTRE_INVALID; */
522 #else
523                 /* We do not want d_invalidate to kill all child dentries too */
524                 d_drop(de);
525 #endif
526         } else {
527                 CDEBUG(D_DENTRY, "revalidated dentry %.*s (%p) parent %p "
528                        "inode %p refc %d\n", de->d_name.len,
529                        de->d_name.name, de, de->d_parent, de->d_inode,
530                        atomic_read(&de->d_count));
531                 ll_lookup_finish_locks(it, de);
532 #ifdef LUSTRE_KERNEL_VERSION
533                 lock_dentry(de);
534                 de->d_flags &= ~DCACHE_LUSTRE_INVALID;
535                 unlock_dentry(de);
536 #endif
537         }
538         RETURN(rc);
539         
540         /*
541          * This part is here to combat evil-evil race in real_lookup on 2.6
542          * kernels.  The race details are: We enter do_lookup() looking for some
543          * name, there is nothing in dcache for this name yet and d_lookup()
544          * returns NULL.  We proceed to real_lookup(), and while we do this,
545          * another process does open on the same file we looking up (most simple
546          * reproducer), open succeeds and the dentry is added. Now back to
547          * us. In real_lookup() we do d_lookup() again and suddenly find the
548          * dentry, so we call d_revalidate on it, but there is no lock, so
549          * without this code we would return 0, but unpatched real_lookup just
550          * returns -ENOENT in such a case instead of retrying the lookup. Once
551          * this is dealt with in real_lookup(), all of this ugly mess can go and
552          * we can just check locks in ->d_revalidate without doing any RPCs
553          * ever.
554          */
555 do_lookup:
556         if (it != &lookup_it) {
557                 /* MDS_INODELOCK_UPDATE needed for IT_GETATTR case. */
558                 if (it->it_op == IT_GETATTR)
559                         lookup_it.it_op = IT_GETATTR;
560                 ll_lookup_finish_locks(it, de);
561                 it = &lookup_it;
562         }
563         
564         /* Do real lookup here. */
565         op_data = ll_prep_md_op_data(NULL, parent, NULL, de->d_name.name,
566                                      de->d_name.len, 0, (it->it_op & IT_CREAT ?
567                                                          LUSTRE_OPC_CREATE :
568                                                          LUSTRE_OPC_ANY));
569         if (IS_ERR(op_data))
570                 RETURN(PTR_ERR(op_data));
571
572         rc = md_intent_lock(exp, op_data, NULL, 0,  it, 0, &req,
573                             ll_md_blocking_ast, 0);
574         if (rc >= 0) {
575                 struct mdt_body *mdt_body = lustre_msg_buf(req->rq_repmsg,
576                                                            DLM_REPLY_REC_OFF,
577                                                            sizeof(*mdt_body));
578                 struct lu_fid fid = {.f_seq = 0, .f_oid = 0, .f_ver = 0};
579                 
580                 if (de->d_inode)
581                         fid = *ll_inode2fid(de->d_inode);
582
583                 /* see if we got same inode, if not - return error */
584                 if (lu_fid_eq(&fid, &mdt_body->fid1)) {
585                         ll_finish_md_op_data(op_data);
586                         op_data = NULL;
587                         goto revalidate_finish;
588                 }
589                 ll_intent_release(it);
590         }
591         ll_finish_md_op_data(op_data);
592         GOTO(out, rc = 0);
593 }
594
595 /*static*/ void ll_pin(struct dentry *de, struct vfsmount *mnt, int flag)
596 {
597         struct inode *inode= de->d_inode;
598         struct ll_sb_info *sbi = ll_i2sbi(inode);
599         struct ll_dentry_data *ldd = ll_d2d(de);
600         struct obd_client_handle *handle;
601         struct obd_capa *oc;
602         int rc = 0;
603         ENTRY;
604         LASSERT(ldd);
605
606         lock_kernel();
607         /* Strictly speaking this introduces an additional race: the
608          * increments should wait until the rpc has returned.
609          * However, given that at present the function is void, this
610          * issue is moot. */
611         if (flag == 1 && (++ldd->lld_mnt_count) > 1) {
612                 unlock_kernel();
613                 EXIT;
614                 return;
615         }
616
617         if (flag == 0 && (++ldd->lld_cwd_count) > 1) {
618                 unlock_kernel();
619                 EXIT;
620                 return;
621         }
622         unlock_kernel();
623
624         handle = (flag) ? &ldd->lld_mnt_och : &ldd->lld_cwd_och;
625         oc = ll_mdscapa_get(inode);
626         rc = obd_pin(sbi->ll_md_exp, ll_inode2fid(inode), oc, handle, flag);
627         capa_put(oc);
628         if (rc) {
629                 lock_kernel();
630                 memset(handle, 0, sizeof(*handle));
631                 if (flag == 0)
632                         ldd->lld_cwd_count--;
633                 else
634                         ldd->lld_mnt_count--;
635                 unlock_kernel();
636         }
637
638         EXIT;
639         return;
640 }
641
642 /*static*/ void ll_unpin(struct dentry *de, struct vfsmount *mnt, int flag)
643 {
644         struct ll_sb_info *sbi = ll_i2sbi(de->d_inode);
645         struct ll_dentry_data *ldd = ll_d2d(de);
646         struct obd_client_handle handle;
647         int count, rc = 0;
648         ENTRY;
649         LASSERT(ldd);
650
651         lock_kernel();
652         /* Strictly speaking this introduces an additional race: the
653          * increments should wait until the rpc has returned.
654          * However, given that at present the function is void, this
655          * issue is moot. */
656         handle = (flag) ? ldd->lld_mnt_och : ldd->lld_cwd_och;
657         if (handle.och_magic != OBD_CLIENT_HANDLE_MAGIC) {
658                 /* the "pin" failed */
659                 unlock_kernel();
660                 EXIT;
661                 return;
662         }
663
664         if (flag)
665                 count = --ldd->lld_mnt_count;
666         else
667                 count = --ldd->lld_cwd_count;
668         unlock_kernel();
669
670         if (count != 0) {
671                 EXIT;
672                 return;
673         }
674
675         rc = obd_unpin(sbi->ll_md_exp, &handle, flag);
676         EXIT;
677         return;
678 }
679
680 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
681 #ifdef LUSTRE_KERNEL_VERSION
682 static int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
683 {
684         int rc;
685         ENTRY;
686
687         if (nd && nd->flags & LOOKUP_LAST && !(nd->flags & LOOKUP_LINK_NOTLAST))
688                 rc = ll_revalidate_it(dentry, nd->flags, &nd->intent);
689         else
690                 rc = ll_revalidate_it(dentry, 0, NULL);
691
692         RETURN(rc);
693 }
694 #else
695 int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
696 {
697         int rc;
698         ENTRY;
699
700         if (nd && !(nd->flags & (LOOKUP_CONTINUE|LOOKUP_PARENT))) {
701                 struct lookup_intent *it;
702                 it = ll_convert_intent(&nd->intent.open, nd->flags);
703                 if (IS_ERR(it))
704                         RETURN(0);
705                 if (it->it_op == (IT_OPEN|IT_CREAT))
706                         if (nd->intent.open.flags & O_EXCL) {
707                                 CDEBUG(D_VFSTRACE, "create O_EXCL, returning 0\n");
708                                 rc = 0;
709                                 goto out_it;
710                         }
711
712                 rc = ll_revalidate_it(dentry, nd->flags, it);
713
714                 if (rc && (nd->flags & LOOKUP_OPEN) &&
715                     it_disposition(it, DISP_OPEN_OPEN)) {/*Open*/
716 #ifdef HAVE_FILE_IN_STRUCT_INTENT
717 // XXX Code duplication with ll_lookup_nd
718                         if (S_ISFIFO(dentry->d_inode->i_mode)) {
719                                 // We cannot call open here as it would
720                                 // deadlock.
721                                 ptlrpc_req_finished(
722                                                (struct ptlrpc_request *)
723                                                   it->d.lustre.it_data);
724                         } else {
725                                 struct file *filp;
726
727                                 nd->intent.open.file->private_data = it;
728                                 filp = lookup_instantiate_filp(nd, dentry,NULL);
729 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,17))
730 /* 2.6.1[456] have a bug in open_namei() that forgets to check
731  * nd->intent.open.file for error, so we need to return it as lookup's result
732  * instead */
733                                 if (IS_ERR(filp))
734                                         rc = 0;
735 #endif
736                         }
737 #else
738                         ll_release_openhandle(dentry, it);
739 #endif /* HAVE_FILE_IN_STRUCT_INTENT */
740                 }
741                 if (!rc && (nd->flags & LOOKUP_CREATE) &&
742                     it_disposition(it, DISP_OPEN_CREATE)) {
743                         /* We created something but we may only return
744                          * negative dentry here, so save request in dentry,
745                          * if lookup will be called later on, it will
746                          * pick the request, otherwise it would be freed
747                          * with dentry */
748                         ll_d2d(dentry)->lld_it = it;
749                         it = NULL; /* avoid freeing */
750                 }
751                         
752 out_it:
753                 if (it) {
754                         ll_intent_release(it);
755                         OBD_FREE(it, sizeof(*it));
756                 }
757         } else {
758                 rc = ll_revalidate_it(dentry, 0, NULL);
759         }
760
761         RETURN(rc);
762 }
763 #endif
764 #endif
765
766 struct dentry_operations ll_d_ops = {
767 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
768         .d_revalidate = ll_revalidate_nd,
769 #else
770         .d_revalidate_it = ll_revalidate_it,
771 #endif
772         .d_release = ll_release,
773         .d_delete = ll_ddelete,
774 #ifdef LUSTRE_KERNEL_VERSION
775         .d_compare = ll_dcompare,
776 #endif
777 #if 0
778         .d_pin = ll_pin,
779         .d_unpin = ll_unpin,
780 #endif
781 };