Whamcloud - gitweb
b=20433 decrease the usage of memory on clients.
[fs/lustre-release.git] / lustre / llite / dcache.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #include <linux/fs.h>
38 #include <linux/sched.h>
39 #include <linux/smp_lock.h>
40 #include <linux/quotaops.h>
41
42 #define DEBUG_SUBSYSTEM S_LLITE
43
44 #include <obd_support.h>
45 #include <lustre_lite.h>
46 #include <lustre/lustre_idl.h>
47 #include <lustre_dlm.h>
48 #include <linux/lustre_version.h>
49
50 #include "llite_internal.h"
51
52 spinlock_t ll_lookup_lock = SPIN_LOCK_UNLOCKED;
53
54 /* should NOT be called with the dcache lock, see fs/dcache.c */
55 static void ll_release(struct dentry *de)
56 {
57         struct ll_dentry_data *lld;
58         ENTRY;
59         LASSERT(de != NULL);
60         lld = ll_d2d(de);
61         if (lld == NULL) { /* NFS copies the de->d_op methods (bug 4655) */
62                 EXIT;
63                 return;
64         }
65 #ifndef HAVE_VFS_INTENT_PATCHES
66         if (lld->lld_it) {
67                 ll_intent_release(lld->lld_it);
68                 OBD_FREE(lld->lld_it, sizeof(*lld->lld_it));
69         }
70 #endif
71         LASSERT(lld->lld_cwd_count == 0);
72         LASSERT(lld->lld_mnt_count == 0);
73         OBD_FREE(de->d_fsdata, sizeof(*lld));
74
75         EXIT;
76 }
77
78 /* Compare if two dentries are the same.  Don't match if the existing dentry
79  * is marked DCACHE_LUSTRE_INVALID.  Returns 1 if different, 0 if the same.
80  *
81  * This avoids a race where ll_lookup_it() instantiates a dentry, but we get
82  * an AST before calling d_revalidate_it().  The dentry still exists (marked
83  * INVALID) so d_lookup() matches it, but we have no lock on it (so
84  * lock_match() fails) and we spin around real_lookup(). */
85 int ll_dcompare(struct dentry *parent, struct qstr *d_name, struct qstr *name)
86 {
87         struct dentry *dchild;
88         ENTRY;
89
90         /* XXX: d_name must be in-dentry structure */
91         dchild = container_of(d_name, struct dentry, d_name); /* ugh */
92
93         if (d_name->len != name->len)
94                 RETURN(1);
95
96         if (memcmp(d_name->name, name->name, name->len))
97                 RETURN(1);
98
99         CDEBUG(D_DENTRY,"found name %.*s(%p) - flags %d/%x - refc %d\n",
100                name->len, name->name, dchild,
101                d_mountpoint(dchild), dchild->d_flags & DCACHE_LUSTRE_INVALID,
102                atomic_read(&dchild->d_count));
103
104          /* mountpoint is always valid */
105         if (d_mountpoint(dchild))
106                 RETURN(0);
107
108         if (dchild->d_flags & DCACHE_LUSTRE_INVALID)
109                 RETURN(1);
110
111         RETURN(0);
112 }
113
114 static inline int return_if_equal(struct ldlm_lock *lock, void *data)
115 {
116         return LDLM_ITER_STOP;
117 }
118
119 /* find any ldlm lock of the inode in mdc and lov
120  * return 0    not find
121  *        1    find one
122  *      < 0    error */
123 int find_cbdata(struct inode *inode)
124 {
125         struct ll_fid fid;
126         struct ll_inode_info *lli = ll_i2info(inode);
127         struct ll_sb_info *sbi = ll_i2sbi(inode);
128         int rc = 0;
129         ENTRY;
130
131         LASSERT(inode);
132         ll_inode2fid(&fid, inode);
133         rc = mdc_find_cbdata(sbi->ll_mdc_exp, &fid, return_if_equal, NULL);
134         if (rc != 0)
135                 RETURN(rc);
136
137         if (lli->lli_smd)
138                 rc = obd_find_cbdata(sbi->ll_osc_exp, lli->lli_smd,
139                                      return_if_equal, NULL);
140
141         RETURN(rc);
142 }
143
144 /* should NOT be called with the dcache lock, see fs/dcache.c */
145 static int ll_ddelete(struct dentry *de)
146 {
147         ENTRY;
148         LASSERT(de);
149
150         CDEBUG(D_DENTRY, "%s dentry %.*s (%p, parent %p, inode %p) %s%s\n",
151                (de->d_flags & DCACHE_LUSTRE_INVALID ? "hiden" : "keeping"),
152                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
153                d_unhashed(de) ? "" : "hashed,",
154                list_empty(&de->d_subdirs) ? "" : "subdirs");
155
156         /* if not ldlm lock for this inode, set i_nlink to 0 so that
157          * this inode can be recycled later b=20433 */
158         LASSERT(atomic_read(&de->d_count) == 0);
159         if (de->d_inode && !find_cbdata(de->d_inode))
160                 de->d_inode->i_nlink = 0;
161
162         if (de->d_flags & DCACHE_LUSTRE_INVALID)
163                 RETURN(1);
164
165         RETURN(0);
166 }
167
168 void ll_set_dd(struct dentry *de)
169 {
170         ENTRY;
171         LASSERT(de != NULL);
172
173         CDEBUG(D_DENTRY, "ldd on dentry %.*s (%p) parent %p inode %p refc %d\n",
174                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
175                atomic_read(&de->d_count));
176
177         if (de->d_fsdata == NULL) {
178                 struct ll_dentry_data *lld;
179
180                 OBD_ALLOC_PTR(lld);
181                 if (likely(lld != NULL)) {
182                         lock_dentry(de);
183                         if (likely(de->d_fsdata == NULL))
184                                 de->d_fsdata = lld;
185                         else
186                                 OBD_FREE_PTR(lld);
187                         unlock_dentry(de);
188                 }
189         }
190
191         EXIT;
192 }
193
194 void ll_intent_drop_lock(struct lookup_intent *it)
195 {
196         struct lustre_handle *handle;
197
198         if (it->it_op && it->d.lustre.it_lock_mode) {
199                 handle = (struct lustre_handle *)&it->d.lustre.it_lock_handle;
200                 CDEBUG(D_DLMTRACE, "releasing lock with cookie "LPX64
201                        " from it %p\n", handle->cookie, it);
202                 ldlm_lock_decref(handle, it->d.lustre.it_lock_mode);
203
204                 /* bug 494: intent_release may be called multiple times, from
205                  * this thread and we don't want to double-decref this lock */
206                 it->d.lustre.it_lock_mode = 0;
207         }
208 }
209
210 void ll_intent_release(struct lookup_intent *it)
211 {
212         ENTRY;
213
214         ll_intent_drop_lock(it);
215 #ifdef HAVE_VFS_INTENT_PATCHES
216         it->it_magic = 0;
217         it->it_op_release = 0;
218 #endif
219         /* We are still holding extra reference on a request, need to free it */
220         if (it_disposition(it, DISP_ENQ_OPEN_REF)) /* open req for llfile_open*/
221                 ptlrpc_req_finished(it->d.lustre.it_data);
222         if (it_disposition(it, DISP_ENQ_CREATE_REF)) /* create rec */
223                 ptlrpc_req_finished(it->d.lustre.it_data);
224         if (it_disposition(it, DISP_ENQ_COMPLETE)) /* saved req from revalidate
225                                                     * to lookup */
226                 ptlrpc_req_finished(it->d.lustre.it_data);
227
228         it->d.lustre.it_disposition = 0;
229         it->d.lustre.it_data = NULL;
230         EXIT;
231 }
232
233 /* Drop dentry if it is not used already, unhash otherwise.
234    Should be called with dcache lock held!
235    Returns: 1 if dentry was dropped, 0 if unhashed. */
236 int ll_drop_dentry(struct dentry *dentry)
237 {
238         lock_dentry(dentry);
239         if (atomic_read(&dentry->d_count) == 0) {
240                 CDEBUG(D_DENTRY, "deleting dentry %.*s (%p) parent %p "
241                        "inode %p\n", dentry->d_name.len,
242                        dentry->d_name.name, dentry, dentry->d_parent,
243                        dentry->d_inode);
244                 dget_locked(dentry);
245                 __d_drop(dentry);
246                 unlock_dentry(dentry);
247                 spin_unlock(&dcache_lock);
248                 spin_unlock(&ll_lookup_lock);
249                 dput(dentry);
250                 spin_lock(&ll_lookup_lock);
251                 spin_lock(&dcache_lock);
252                 return 1;
253         }
254         /* disconected dentry can not be find without lookup, because we
255          * not need his to unhash or mark invalid. */
256         if (dentry->d_flags & DCACHE_DISCONNECTED) {
257                 unlock_dentry(dentry);
258                 RETURN (0);
259         }
260
261 #ifdef DCACHE_LUSTRE_INVALID
262         if (!(dentry->d_flags & DCACHE_LUSTRE_INVALID)) {
263 #else
264         if (!d_unhashed(dentry)) {
265 #endif
266                 CDEBUG(D_DENTRY, "unhashing dentry %.*s (%p) parent %p "
267                        "inode %p refc %d\n", dentry->d_name.len,
268                        dentry->d_name.name, dentry, dentry->d_parent,
269                        dentry->d_inode, atomic_read(&dentry->d_count));
270                 /* actually we don't unhash the dentry, rather just
271                  * mark it inaccessible for to __d_lookup(). otherwise
272                  * sys_getcwd() could return -ENOENT -bzzz */
273 #ifdef DCACHE_LUSTRE_INVALID
274                 dentry->d_flags |= DCACHE_LUSTRE_INVALID;
275 #else
276                 if (!dentry->d_inode || !S_ISDIR(dentry->d_inode->i_mode))
277                         __d_drop(dentry);
278 #endif
279
280         }
281         unlock_dentry(dentry);
282         return 0;
283 }
284
285 void ll_unhash_aliases(struct inode *inode)
286 {
287         struct list_head *tmp, *head;
288         ENTRY;
289
290         if (inode == NULL) {
291                 CERROR("unexpected NULL inode, tell phil\n");
292                 return;
293         }
294
295         CDEBUG(D_INODE, "marking dentries for ino %lu/%u(%p) invalid\n",
296                inode->i_ino, inode->i_generation, inode);
297
298         head = &inode->i_dentry;
299         spin_lock(&ll_lookup_lock);
300         spin_lock(&dcache_lock);
301 restart:
302         tmp = head;
303         while ((tmp = tmp->next) != head) {
304                 struct dentry *dentry = list_entry(tmp, struct dentry, d_alias);
305
306                 if (dentry->d_name.len == 1 && dentry->d_name.name[0] == '/') {
307                         CERROR("called on root (?) dentry=%p, inode=%p "
308                                "ino=%lu\n", dentry, inode, inode->i_ino);
309                         lustre_dump_dentry(dentry, 1);
310                         libcfs_debug_dumpstack(NULL);
311                 }
312
313                 if (ll_drop_dentry(dentry))
314                           goto restart;
315         }
316         spin_unlock(&dcache_lock);
317         spin_unlock(&ll_lookup_lock);
318
319         EXIT;
320 }
321
322 int revalidate_it_finish(struct ptlrpc_request *request, int offset,
323                          struct lookup_intent *it, struct dentry *de)
324 {
325         int rc = 0;
326         ENTRY;
327
328         if (!request)
329                 RETURN(0);
330
331         if (it_disposition(it, DISP_LOOKUP_NEG))
332                 RETURN(-ENOENT);
333
334         rc = ll_prep_inode(ll_i2sbi(de->d_inode)->ll_osc_exp, &de->d_inode,
335                            request, offset, NULL);
336
337         RETURN(rc);
338 }
339
340 void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry)
341 {
342         LASSERT(it != NULL);
343         LASSERT(dentry != NULL);
344
345         if (it->d.lustre.it_lock_mode && dentry->d_inode != NULL) {
346                 struct inode *inode = dentry->d_inode;
347                 CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%u)\n",
348                        inode, inode->i_ino, inode->i_generation);
349                 mdc_set_lock_data(&it->d.lustre.it_lock_handle, inode, NULL);
350         }
351
352         /* drop lookup or getattr locks immediately */
353         if (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR) {
354                 /* on 2.6 there are situation when several lookups and
355                  * revalidations may be requested during single operation.
356                  * therefore, we don't release intent here -bzzz */
357                 ll_intent_drop_lock(it);
358         }
359 }
360
361 void ll_frob_intent(struct lookup_intent **itp, struct lookup_intent *deft)
362 {
363         struct lookup_intent *it = *itp;
364 #ifdef HAVE_VFS_INTENT_PATCHES
365         if (it) {
366                 LASSERTF(it->it_magic == INTENT_MAGIC, "bad intent magic: %x\n",
367                          it->it_magic);
368         }
369 #endif
370
371         if (!it || it->it_op == IT_GETXATTR)
372                 it = *itp = deft;
373
374 #ifdef HAVE_VFS_INTENT_PATCHES
375         it->it_op_release = ll_intent_release;
376 #endif
377 }
378
379 int ll_revalidate_it(struct dentry *de, int lookup_flags,
380                      struct lookup_intent *it)
381 {
382         struct mdc_op_data op_data = { { 0 } };
383         struct ptlrpc_request *req = NULL;
384         struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
385         struct obd_export *exp;
386         struct inode *parent = de->d_parent->d_inode;
387         int first = 0, rc;
388
389         ENTRY;
390         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,intent=%s\n", de->d_name.name,
391                LL_IT2STR(it));
392
393         if (de->d_inode == NULL) {
394                 /* We can only use negative dentries if this is stat or lookup,
395                    for opens and stuff we do need to query server. */
396                 /* If there is IT_CREAT in intent op set, then we must throw
397                    away this negative dentry and actually do the request to
398                    kernel to create whatever needs to be created (if possible)*/
399                 if (it && (it->it_op & IT_CREAT))
400                         RETURN(0);
401
402 #ifdef DCACHE_LUSTRE_INVALID
403                 if (de->d_flags & DCACHE_LUSTRE_INVALID)
404                         RETURN(0);
405 #endif
406
407                 rc = ll_have_md_lock(parent, MDS_INODELOCK_UPDATE);
408                 GOTO(out_sa, rc);
409         }
410
411         exp = ll_i2mdcexp(de->d_inode);
412
413         /* Never execute intents for mount points.
414          * Attributes will be fixed up in ll_inode_revalidate_it */
415         if (d_mountpoint(de))
416                 GOTO(out_sa, rc = 1);
417
418         /*need to get attributes in case it got changed from other client*/
419         if (de == de->d_sb->s_root)  {
420                 rc = __ll_inode_revalidate_it(de, it, MDS_INODELOCK_LOOKUP);
421                 if (rc == 0)
422                         rc = 1;
423                 GOTO(out_sa, rc);
424         }
425
426         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_REVALIDATE_PAUSE, 5);
427         ll_frob_intent(&it, &lookup_it);
428         LASSERT(it);
429
430         if (it->it_op == IT_LOOKUP && !(de->d_flags & DCACHE_LUSTRE_INVALID))
431                 GOTO(out_sa, rc = 1);
432
433         ll_prepare_mdc_op_data(&op_data, parent, de->d_inode,
434                                de->d_name.name, de->d_name.len, 0, NULL);
435
436         if ((it->it_op == IT_OPEN) && de->d_inode) {
437                 struct inode *inode = de->d_inode;
438                 struct ll_inode_info *lli = ll_i2info(inode);
439                 struct obd_client_handle **och_p;
440                 __u64 *och_usecount;
441                 /* We used to check for MDS_INODELOCK_OPEN here, but in fact
442                  * just having LOOKUP lock is enough to justify inode is the
443                  * same. And if inode is the same and we have suitable
444                  * openhandle, then there is no point in doing another OPEN RPC
445                  * just to throw away newly received openhandle.
446                  * There are no security implications too, if file owner or
447                  * access mode is change, LOOKUP lock is revoked */
448
449                 if (it->it_flags & FMODE_WRITE) {
450                         och_p = &lli->lli_mds_write_och;
451                         och_usecount = &lli->lli_open_fd_write_count;
452                 } else if (it->it_flags & FMODE_EXEC) {
453                         och_p = &lli->lli_mds_exec_och;
454                         och_usecount = &lli->lli_open_fd_exec_count;
455                 } else {
456                         och_p = &lli->lli_mds_read_och;
457                         och_usecount = &lli->lli_open_fd_read_count;
458                 }
459                 /* Check for the proper lock. */
460                 if (!ll_have_md_lock(inode, MDS_INODELOCK_LOOKUP))
461                         goto do_lock;
462                 down(&lli->lli_och_sem);
463                 if (*och_p) { /* Everything is open already, do nothing */
464                         /*(*och_usecount)++;  Do not let them steal our open
465                                               handle from under us */
466                         /* XXX The code above was my original idea, but in case
467                            we have the handle, but we cannot use it due to later
468                            checks (e.g. O_CREAT|O_EXCL flags set), nobody
469                            would decrement counter increased here. So we just
470                            hope the lock won't be invalidated in between. But
471                            if it would be, we'll reopen the open request to
472                            MDS later during file open path */
473                         up(&lli->lli_och_sem);
474                         RETURN(1);
475                 } else {
476                         up(&lli->lli_och_sem);
477                 }
478         }
479
480         if (it->it_op == IT_GETATTR)
481                 first = ll_statahead_enter(parent, &de, 0);
482
483 do_lock:
484         it->it_create_mode &= ~current->fs->umask;
485         it->it_create_mode |= M_CHECK_STALE;
486         rc = mdc_intent_lock(exp, &op_data, NULL, 0, it, lookup_flags,
487                              &req, ll_mdc_blocking_ast, 0);
488         it->it_create_mode &= ~M_CHECK_STALE;
489         if (it->it_op == IT_GETATTR && !first)
490                 /* If there are too many locks on client-side, then some
491                  * locks taken by statahead maybe dropped automatically
492                  * before the real "revalidate" using them. */
493                 ll_statahead_exit(parent, de, req == NULL ? rc : 0);
494         else if (first == -EEXIST)
495                 ll_statahead_mark(parent, de);
496
497         /* If req is NULL, then mdc_intent_lock only tried to do a lock match;
498          * if all was well, it will return 1 if it found locks, 0 otherwise. */
499         if (req == NULL && rc >= 0) {
500                 if (!rc)
501                         goto do_lookup;
502                 GOTO(out, rc);
503         }
504
505         if (rc < 0) {
506                 if (rc != -ESTALE) {
507                         CDEBUG(D_INFO, "ll_intent_lock: rc %d : it->it_status "
508                                "%d\n", rc, it->d.lustre.it_status);
509                 }
510                 GOTO(out, rc = 0);
511         }
512
513 revalidate_finish:
514         rc = revalidate_it_finish(req, DLM_REPLY_REC_OFF, it, de);
515         if (rc != 0) {
516                 /* we are going release the intent, so clear DISP_ENQ_COMPLETE
517                  * to prevent a double free of the request */
518                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
519                 ll_intent_release(it);
520                 GOTO(out, rc = 0);
521         }
522         if ((it->it_op & IT_OPEN) && de->d_inode &&
523             !S_ISREG(de->d_inode->i_mode) &&
524             !S_ISDIR(de->d_inode->i_mode)) {
525                 ll_release_openhandle(de, it);
526         }
527         rc = 1;
528
529         /* unfortunately ll_intent_lock may cause a callback and revoke our
530          * dentry */
531         spin_lock(&ll_lookup_lock);
532         spin_lock(&dcache_lock);
533         lock_dentry(de);
534         __d_drop(de);
535         unlock_dentry(de);
536         d_rehash_cond(de, 0);
537         spin_unlock(&dcache_lock);
538         spin_unlock(&ll_lookup_lock);
539
540  out:
541         /* We do not free request as it may be reused during following lookup
542          * (see comment in mdc/mdc_locks.c::mdc_intent_lock()), request will
543          * be freed in ll_lookup_it or in ll_intent_release. But if
544          * request was not completed, we need to free it. (bug 5154, 9903) */
545         if (req != NULL && !it_disposition(it, DISP_ENQ_COMPLETE))
546                 ptlrpc_req_finished(req);
547         if (rc == 0) {
548 #ifdef DCACHE_LUSTRE_INVALID
549                 ll_unhash_aliases(de->d_inode);
550                 /* done in ll_unhash_aliases()
551                 dentry->d_flags |= DCACHE_LUSTRE_INVALID; */
552 #else
553                 /* We do not want d_invalidate to kill all child dentries too */
554                 d_drop(de);
555 #endif
556         } else {
557                 CDEBUG(D_DENTRY, "revalidated dentry %.*s (%p) parent %p "
558                                "inode %p refc %d\n", de->d_name.len,
559                                de->d_name.name, de, de->d_parent, de->d_inode,
560                                atomic_read(&de->d_count));
561                 ll_lookup_finish_locks(it, de);
562 #ifdef DCACHE_LUSTRE_INVALID
563                 lock_dentry(de);
564                 de->d_flags &= ~DCACHE_LUSTRE_INVALID;
565                 unlock_dentry(de);
566 #endif
567         }
568         RETURN(rc);
569 /* This part is here to combat evil-evil race in real_lookup on 2.6 kernels.
570  * The race details are: We enter do_lookup() looking for some name,
571  * there is nothing in dcache for this name yet and d_lookup() returns NULL.
572  * We proceed to real_lookup(), and while we do this, another process does
573  * open on the same file we looking up (most simple reproducer), open succeeds
574  * and the dentry is added. Now back to us. In real_lookup() we do d_lookup()
575  * again and suddenly find the dentry, so we call d_revalidate on it, but there
576  * is no lock, so without this code we would return 0, but unpatched
577  * real_lookup just returns -ENOENT in such a case instead of retrying the
578  * lookup. Once this is dealt with in real_lookup(), all of this ugly mess
579  * can go and we can just check locks in ->d_revalidate without doing any
580  * RPCs ever. */
581 do_lookup:
582         if (it != &lookup_it) {
583                 ll_lookup_finish_locks(it, de);
584                 it = &lookup_it;
585         }
586         /*do real lookup here */
587         ll_prepare_mdc_op_data(&op_data, parent, NULL,
588                                de->d_name.name, de->d_name.len, 0, NULL);
589         rc = mdc_intent_lock(exp, &op_data, NULL, 0,  it, 0, &req,
590                              ll_mdc_blocking_ast, 0);
591         if (rc >= 0) {
592                 struct mds_body *mds_body = lustre_msg_buf(req->rq_repmsg,
593                                                            DLM_REPLY_REC_OFF,
594                                                            sizeof(*mds_body));
595                 struct ll_fid fid = { 0 };
596
597                 if (de->d_inode)
598                          ll_inode2fid(&fid, de->d_inode);
599
600                 /* see if we got same inode, if not - return error */
601                 if(!memcmp(&fid, &mds_body->fid1, sizeof(struct ll_fid)))
602                         goto revalidate_finish;
603                 /* we are going release the intent, so clear DISP_ENQ_COMPLETE
604                  * to prevent a double free of the request */
605                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
606                 ll_intent_release(it);
607         }
608         GOTO(out, rc = 0);
609
610 out_sa:
611         /*
612          * For rc == 1 case, should not return directly to prevent losing
613          * statahead windows; for rc == 0 case, the "lookup" will be done later.
614          */
615         if (it && it->it_op == IT_GETATTR && rc == 1) {
616                 first = ll_statahead_enter(parent, &de, 0);
617                 if (!first)
618                         ll_statahead_exit(parent, de, 1);
619                 else if (first == -EEXIST)
620                         ll_statahead_mark(parent, de);
621         }
622
623         return rc;
624 }
625
626 /*static*/ void ll_pin(struct dentry *de, struct vfsmount *mnt, int flag)
627 {
628         struct inode *inode= de->d_inode;
629         struct ll_sb_info *sbi = ll_i2sbi(inode);
630         struct ll_dentry_data *ldd = ll_d2d(de);
631         struct obd_client_handle *handle;
632         int rc = 0;
633         ENTRY;
634         LASSERT(ldd);
635
636         lock_kernel();
637         /* Strictly speaking this introduces an additional race: the
638          * increments should wait until the rpc has returned.
639          * However, given that at present the function is void, this
640          * issue is moot. */
641         if (flag == 1 && (++ldd->lld_mnt_count) > 1) {
642                 unlock_kernel();
643                 EXIT;
644                 return;
645         }
646
647         if (flag == 0 && (++ldd->lld_cwd_count) > 1) {
648                 unlock_kernel();
649                 EXIT;
650                 return;
651         }
652         unlock_kernel();
653
654         handle = (flag) ? &ldd->lld_mnt_och : &ldd->lld_cwd_och;
655         rc = obd_pin(sbi->ll_mdc_exp, ll_inode_ll_fid(inode),
656                      handle, flag);
657
658         if (rc) {
659                 lock_kernel();
660                 memset(handle, 0, sizeof(*handle));
661                 if (flag == 0)
662                         ldd->lld_cwd_count--;
663                 else
664                         ldd->lld_mnt_count--;
665                 unlock_kernel();
666         }
667
668         EXIT;
669         return;
670 }
671
672 /*static*/ void ll_unpin(struct dentry *de, struct vfsmount *mnt, int flag)
673 {
674         struct ll_sb_info *sbi = ll_i2sbi(de->d_inode);
675         struct ll_dentry_data *ldd = ll_d2d(de);
676         struct obd_client_handle handle;
677         int count, rc = 0;
678         ENTRY;
679         LASSERT(ldd);
680
681         lock_kernel();
682         /* Strictly speaking this introduces an additional race: the
683          * increments should wait until the rpc has returned.
684          * However, given that at present the function is void, this
685          * issue is moot. */
686         handle = (flag) ? ldd->lld_mnt_och : ldd->lld_cwd_och;
687         if (handle.och_magic != OBD_CLIENT_HANDLE_MAGIC) {
688                 /* the "pin" failed */
689                 unlock_kernel();
690                 EXIT;
691                 return;
692         }
693
694         if (flag)
695                 count = --ldd->lld_mnt_count;
696         else
697                 count = --ldd->lld_cwd_count;
698         unlock_kernel();
699
700         if (count != 0) {
701                 EXIT;
702                 return;
703         }
704
705         rc = obd_unpin(sbi->ll_mdc_exp, &handle, flag);
706         EXIT;
707         return;
708 }
709
710 #ifdef HAVE_VFS_INTENT_PATCHES
711 int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
712 {
713         int rc;
714         ENTRY;
715
716         if (nd && nd->flags & LOOKUP_LAST && !(nd->flags & LOOKUP_LINK_NOTLAST))
717                 rc = ll_revalidate_it(dentry, nd->flags, &nd->intent);
718         else
719                 rc = ll_revalidate_it(dentry, 0, NULL);
720
721         RETURN(rc);
722 }
723 #else
724 int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
725 {
726         int rc;
727         ENTRY;
728
729         if (nd && !(nd->flags & (LOOKUP_CONTINUE|LOOKUP_PARENT))) {
730                 struct lookup_intent *it;
731                 it = ll_convert_intent(&nd->intent.open, nd->flags);
732                 if (IS_ERR(it))
733                         RETURN(0);
734                 if (it->it_op == (IT_OPEN|IT_CREAT))
735                         if (nd->intent.open.flags & O_EXCL) {
736                                 CDEBUG(D_VFSTRACE,
737                                        "create O_EXCL, returning 0\n");
738                                 rc = 0;
739                                 goto out_it;
740                         }
741
742                 rc = ll_revalidate_it(dentry, nd->flags, it);
743
744                 if (rc && (nd->flags & LOOKUP_OPEN) &&
745                     it_disposition(it, DISP_OPEN_OPEN)) {/*Open*/
746 #ifdef HAVE_FILE_IN_STRUCT_INTENT
747 // XXX Code duplication with ll_lookup_nd
748                         if (S_ISFIFO(dentry->d_inode->i_mode)) {
749                                 // We cannot call open here as it would
750                                 // deadlock.
751                                 ptlrpc_req_finished(
752                                                (struct ptlrpc_request *)
753                                                   it->d.lustre.it_data);
754                         } else {
755 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,17))
756 /* 2.6.1[456] have a bug in open_namei() that forgets to check
757  * nd->intent.open.file for error, so we need to return it as lookup's result
758  * instead */
759                                 struct file *filp;
760
761                                 nd->intent.open.file->private_data = it;
762                                 filp = lookup_instantiate_filp(nd, dentry,NULL);
763                                 if (IS_ERR(filp)) {
764                                         rc = PTR_ERR(filp);
765                                 }
766 #else
767                                 nd->intent.open.file->private_data = it;
768                                 (void)lookup_instantiate_filp(nd, dentry,NULL);
769 #endif
770                         }
771 #else
772                         ll_release_openhandle(dentry, it);
773 #endif /* HAVE_FILE_IN_STRUCT_INTENT */
774                 }
775                 if (!rc && (nd->flags & LOOKUP_CREATE) &&
776                     it_disposition(it, DISP_OPEN_CREATE)) {
777                         /* We created something but we may only return
778                          * negative dentry here, so save request in dentry,
779                          * if lookup will be called later on, it will
780                          * pick the request, otherwise it would be freed
781                          * with dentry */
782                         ll_d2d(dentry)->lld_it = it;
783                         it = NULL; /* avoid freeing */
784                 }
785
786 out_it:
787                 if (it) {
788                         ll_intent_release(it);
789                         OBD_FREE(it, sizeof(*it));
790                 }
791         } else {
792                 rc = ll_revalidate_it(dentry, 0, NULL);
793         }
794
795         RETURN(rc);
796 }
797 #endif
798
799 void ll_d_iput(struct dentry *de, struct inode *inode)
800 {
801         LASSERT(inode);
802         if (inode && !find_cbdata(inode))
803                 inode->i_nlink = 0;
804         iput(inode);
805 }
806
807 struct dentry_operations ll_d_ops = {
808         .d_revalidate = ll_revalidate_nd,
809         .d_release = ll_release,
810         .d_delete = ll_ddelete,
811         .d_iput   = ll_d_iput,
812 #ifdef DCACHE_LUSTRE_INVALID
813         .d_compare = ll_dcompare,
814 #endif
815 #if 0
816         .d_pin = ll_pin,
817         .d_unpin = ll_unpin,
818 #endif
819 };