Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / llite / dcache.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #include <linux/fs.h>
38 #include <linux/sched.h>
39 #include <linux/smp_lock.h>
40 #include <linux/quotaops.h>
41
42 #define DEBUG_SUBSYSTEM S_LLITE
43
44 #include <obd_support.h>
45 #include <lustre_lite.h>
46 #include <lustre/lustre_idl.h>
47 #include <lustre_dlm.h>
48 #include <lustre_mdc.h>
49 //#include <lustre_ver.h>
50 //#include <lustre_version.h>
51
52 #include "llite_internal.h"
53
54 spinlock_t ll_lookup_lock = SPIN_LOCK_UNLOCKED;
55
56 /* should NOT be called with the dcache lock, see fs/dcache.c */
57 void ll_release(struct dentry *de)
58 {
59         struct ll_dentry_data *lld;
60         ENTRY;
61         LASSERT(de != NULL);
62         lld = ll_d2d(de);
63         if (lld == NULL) { /* NFS copies the de->d_op methods (bug 4655) */
64                 EXIT;
65                 return;
66         }
67 #ifndef HAVE_VFS_INTENT_PATCHES
68         if (lld->lld_it) {
69                 ll_intent_release(lld->lld_it);
70                 OBD_FREE(lld->lld_it, sizeof(*lld->lld_it));
71         }
72 #endif
73         LASSERT(lld->lld_cwd_count == 0);
74         LASSERT(lld->lld_mnt_count == 0);
75         OBD_FREE(de->d_fsdata, sizeof(*lld));
76
77         EXIT;
78 }
79
80 #ifdef DCACHE_LUSTRE_INVALID
81 /* Compare if two dentries are the same.  Don't match if the existing dentry
82  * is marked DCACHE_LUSTRE_INVALID.  Returns 1 if different, 0 if the same.
83  *
84  * This avoids a race where ll_lookup_it() instantiates a dentry, but we get
85  * an AST before calling d_revalidate_it().  The dentry still exists (marked
86  * INVALID) so d_lookup() matches it, but we have no lock on it (so
87  * lock_match() fails) and we spin around real_lookup(). */
88 int ll_dcompare(struct dentry *parent, struct qstr *d_name, struct qstr *name)
89 {
90         struct dentry *dchild;
91         ENTRY;
92
93         if (d_name->len != name->len)
94                 RETURN(1);
95
96         if (memcmp(d_name->name, name->name, name->len))
97                 RETURN(1);
98
99         /* XXX: d_name must be in-dentry structure */
100         dchild = container_of(d_name, struct dentry, d_name); /* ugh */
101         if (dchild->d_flags & DCACHE_LUSTRE_INVALID) {
102                 CDEBUG(D_DENTRY,"INVALID dentry %p not matched, was bug 3784\n",
103                        dchild);
104                 RETURN(1);
105         }
106
107         RETURN(0);
108 }
109 #endif
110
111 /* should NOT be called with the dcache lock, see fs/dcache.c */
112 static int ll_ddelete(struct dentry *de)
113 {
114         ENTRY;
115         LASSERT(de);
116 #ifndef DCACHE_LUSTRE_INVALID
117 #define DCACHE_LUSTRE_INVALID 0
118 #endif
119
120         CDEBUG(D_DENTRY, "%s dentry %.*s (%p, parent %p, inode %p) %s%s\n",
121                (de->d_flags & DCACHE_LUSTRE_INVALID ? "deleting" : "keeping"),
122                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
123                d_unhashed(de) ? "" : "hashed,",
124                list_empty(&de->d_subdirs) ? "" : "subdirs");
125 #if DCACHE_LUSTRE_INVALID == 0
126 #undef DCACHE_LUSTRE_INVALID
127 #endif
128
129         RETURN(0);
130 }
131
132 void ll_set_dd(struct dentry *de)
133 {
134         ENTRY;
135         LASSERT(de != NULL);
136
137         CDEBUG(D_DENTRY, "ldd on dentry %.*s (%p) parent %p inode %p refc %d\n",
138                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
139                atomic_read(&de->d_count));
140
141         if (de->d_fsdata == NULL) {
142                 struct ll_dentry_data *lld;
143
144                 OBD_ALLOC_PTR(lld);
145                 if (likely(lld != NULL)) {
146                         cfs_waitq_init(&lld->lld_waitq);
147                         lock_dentry(de);
148                         if (likely(de->d_fsdata == NULL))
149                                 de->d_fsdata = lld;
150                         else
151                                 OBD_FREE_PTR(lld);
152                         unlock_dentry(de);
153                 }
154         }
155
156         EXIT;
157 }
158
159 void ll_intent_drop_lock(struct lookup_intent *it)
160 {
161         struct lustre_handle *handle;
162
163         if (it->it_op && it->d.lustre.it_lock_mode) {
164                 handle = (struct lustre_handle *)&it->d.lustre.it_lock_handle;
165                 CDEBUG(D_DLMTRACE, "releasing lock with cookie "LPX64
166                        " from it %p\n", handle->cookie, it);
167                 ldlm_lock_decref(handle, it->d.lustre.it_lock_mode);
168
169                 /* bug 494: intent_release may be called multiple times, from
170                  * this thread and we don't want to double-decref this lock */
171                 it->d.lustre.it_lock_mode = 0;
172         }
173 }
174
175 void ll_intent_release(struct lookup_intent *it)
176 {
177         ENTRY;
178
179         CDEBUG(D_INFO, "intent %p released\n", it);
180         ll_intent_drop_lock(it);
181 #ifdef HAVE_VFS_INTENT_PATCHES
182         it->it_magic = 0;
183         it->it_op_release = 0;
184 #endif
185         /* We are still holding extra reference on a request, need to free it */
186         if (it_disposition(it, DISP_ENQ_OPEN_REF))
187                  ptlrpc_req_finished(it->d.lustre.it_data); /* ll_file_open */
188         if (it_disposition(it, DISP_ENQ_CREATE_REF)) /* create rec */
189                 ptlrpc_req_finished(it->d.lustre.it_data);
190         if (it_disposition(it, DISP_ENQ_COMPLETE)) /* saved req from revalidate
191                                                     * to lookup */
192                 ptlrpc_req_finished(it->d.lustre.it_data);
193
194         it->d.lustre.it_disposition = 0;
195         it->d.lustre.it_data = NULL;
196         EXIT;
197 }
198
199 /* Drop dentry if it is not used already, unhash otherwise.
200    Should be called with dcache lock held!
201    Returns: 1 if dentry was dropped, 0 if unhashed. */
202 int ll_drop_dentry(struct dentry *dentry)
203 {
204         lock_dentry(dentry);
205         if (atomic_read(&dentry->d_count) == 0) {
206                 CDEBUG(D_DENTRY, "deleting dentry %.*s (%p) parent %p "
207                        "inode %p\n", dentry->d_name.len,
208                        dentry->d_name.name, dentry, dentry->d_parent,
209                        dentry->d_inode);
210                 dget_locked(dentry);
211                 __d_drop(dentry);
212                 unlock_dentry(dentry);
213                 spin_unlock(&dcache_lock);
214                 spin_unlock(&ll_lookup_lock);
215                 dput(dentry);
216                 spin_lock(&ll_lookup_lock);
217                 spin_lock(&dcache_lock);
218                 return 1;
219         }
220         /* disconected dentry can not be find without lookup, because we 
221          * not need his to unhash or mark invalid. */
222         if (dentry->d_flags & DCACHE_DISCONNECTED) {
223                 unlock_dentry(dentry);
224                 RETURN (0);
225         }
226
227 #ifdef DCACHE_LUSTRE_INVALID
228         if (!(dentry->d_flags & DCACHE_LUSTRE_INVALID)) {
229 #else
230         if (!d_unhashed(dentry)) {
231 #endif
232                 CDEBUG(D_DENTRY, "unhashing dentry %.*s (%p) parent %p "
233                        "inode %p refc %d\n", dentry->d_name.len,
234                        dentry->d_name.name, dentry, dentry->d_parent,
235                        dentry->d_inode, atomic_read(&dentry->d_count));
236                 /* actually we don't unhash the dentry, rather just
237                  * mark it inaccessible for to __d_lookup(). otherwise
238                  * sys_getcwd() could return -ENOENT -bzzz */
239 #ifdef DCACHE_LUSTRE_INVALID
240                 dentry->d_flags |= DCACHE_LUSTRE_INVALID;
241 #endif
242                 if (!dentry->d_inode || !S_ISDIR(dentry->d_inode->i_mode))
243                         __d_drop(dentry);
244
245         }
246         unlock_dentry(dentry);
247         return 0;
248 }
249
250 void ll_unhash_aliases(struct inode *inode)
251 {
252         struct list_head *tmp, *head;
253         ENTRY;
254
255         if (inode == NULL) {
256                 CERROR("unexpected NULL inode, tell phil\n");
257                 return;
258         }
259
260         CDEBUG(D_INODE, "marking dentries for ino %lu/%u(%p) invalid\n",
261                inode->i_ino, inode->i_generation, inode);
262
263         head = &inode->i_dentry;
264         spin_lock(&ll_lookup_lock);
265         spin_lock(&dcache_lock);
266 restart:
267         tmp = head;
268         while ((tmp = tmp->next) != head) {
269                 struct dentry *dentry = list_entry(tmp, struct dentry, d_alias);
270
271                 CDEBUG(D_DENTRY, "dentry in drop %.*s (%p) parent %p "
272                        "inode %p flags %d\n", dentry->d_name.len,
273                        dentry->d_name.name, dentry, dentry->d_parent,
274                        dentry->d_inode, dentry->d_flags);
275
276                 if (dentry->d_name.len == 1 && dentry->d_name.name[0] == '/') {
277                         CERROR("called on root (?) dentry=%p, inode=%p "
278                                "ino=%lu\n", dentry, inode, inode->i_ino);
279                         lustre_dump_dentry(dentry, 1);
280                         libcfs_debug_dumpstack(NULL);
281                 } else if (d_mountpoint(dentry)) {
282                         /* For mountpoints we skip removal of the dentry
283                            which happens solely because we have a lock on it
284                            obtained when this dentry was not a mountpoint yet */
285                         CDEBUG(D_DENTRY, "Skippind mountpoint dentry removal "
286                                          "%.*s (%p) parent %p\n",
287                                           dentry->d_name.len,
288                                           dentry->d_name.name,
289                                           dentry, dentry->d_parent);
290
291                         continue;
292                 }
293
294                 if (ll_drop_dentry(dentry))
295                           goto restart;
296         }
297         spin_unlock(&dcache_lock);
298         spin_unlock(&ll_lookup_lock);
299
300         EXIT;
301 }
302
303 int ll_revalidate_it_finish(struct ptlrpc_request *request,
304                             struct lookup_intent *it,
305                             struct dentry *de)
306 {
307         int rc = 0;
308         ENTRY;
309
310         if (!request)
311                 RETURN(0);
312
313         if (it_disposition(it, DISP_LOOKUP_NEG)) 
314                 RETURN(-ENOENT);
315
316         rc = ll_prep_inode(&de->d_inode, request, NULL);
317
318         RETURN(rc);
319 }
320
321 void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry)
322 {
323         LASSERT(it != NULL);
324         LASSERT(dentry != NULL);
325
326         if (it->d.lustre.it_lock_mode && dentry->d_inode != NULL) {
327                 struct inode *inode = dentry->d_inode;
328                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
329
330                 CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%u)\n",
331                        inode, inode->i_ino, inode->i_generation);
332                 md_set_lock_data(sbi->ll_md_exp, &it->d.lustre.it_lock_handle,
333                                  inode);
334         }
335
336         /* drop lookup or getattr locks immediately */
337         if (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR) {
338                 /* on 2.6 there are situation when several lookups and
339                  * revalidations may be requested during single operation.
340                  * therefore, we don't release intent here -bzzz */
341                 ll_intent_drop_lock(it);
342         }
343 }
344
345 void ll_frob_intent(struct lookup_intent **itp, struct lookup_intent *deft)
346 {
347         struct lookup_intent *it = *itp;
348 #ifdef HAVE_VFS_INTENT_PATCHES
349         if (it) {
350                 LASSERTF(it->it_magic == INTENT_MAGIC, 
351                          "%p has bad intent magic: %x\n",
352                          it, it->it_magic);
353         }
354 #endif
355
356         if (!it || it->it_op == IT_GETXATTR)
357                 it = *itp = deft;
358
359 #ifdef HAVE_VFS_INTENT_PATCHES
360         it->it_op_release = ll_intent_release;
361 #endif
362 }
363
364 int ll_revalidate_it(struct dentry *de, int lookup_flags,
365                      struct lookup_intent *it)
366 {
367         struct md_op_data *op_data;
368         struct ptlrpc_request *req = NULL;
369         struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
370         struct obd_export *exp;
371         struct inode *parent;
372         int rc, first = 0;
373
374         ENTRY;
375         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,intent=%s\n", de->d_name.name,
376                LL_IT2STR(it));
377
378         if (de->d_inode == NULL) {
379                 /* We can only use negative dentries if this is stat or lookup,
380                    for opens and stuff we do need to query server. */
381                 /* If there is IT_CREAT in intent op set, then we must throw
382                    away this negative dentry and actually do the request to
383                    kernel to create whatever needs to be created (if possible)*/
384                 if (it && (it->it_op & IT_CREAT))
385                         RETURN(0);
386
387 #ifdef DCACHE_LUSTRE_INVALID
388                 if (de->d_flags & DCACHE_LUSTRE_INVALID)
389                         RETURN(0);
390 #endif
391
392                 rc = ll_have_md_lock(de->d_parent->d_inode,
393                                      MDS_INODELOCK_UPDATE);
394                 GOTO(out_sa, rc);
395         }
396
397         exp = ll_i2mdexp(de->d_inode);
398
399         /* Never execute intents for mount points.
400          * Attributes will be fixed up in ll_inode_revalidate_it */
401         if (d_mountpoint(de))
402                 GOTO(out_sa, rc = 1);
403
404         /* Root of the lustre tree. Always valid.
405          * Attributes will be fixed up in ll_inode_revalidate_it */
406         if (de == de->d_sb->s_root)
407                 GOTO(out_sa, rc = 1);
408
409         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_REVALIDATE_PAUSE, 5);
410         ll_frob_intent(&it, &lookup_it);
411         LASSERT(it);
412         parent = de->d_parent->d_inode;
413
414         op_data = ll_prep_md_op_data(NULL, parent, de->d_inode,
415                                      de->d_name.name, de->d_name.len,
416                                      0, LUSTRE_OPC_ANY, NULL);
417         if (IS_ERR(op_data))
418                 RETURN(PTR_ERR(op_data));
419
420         if ((it->it_op == IT_OPEN) && de->d_inode) {
421                 struct inode *inode = de->d_inode;
422                 struct ll_inode_info *lli = ll_i2info(inode);
423                 struct obd_client_handle **och_p;
424                 __u64 *och_usecount;
425
426                 /*
427                  * We used to check for MDS_INODELOCK_OPEN here, but in fact
428                  * just having LOOKUP lock is enough to justify inode is the
429                  * same. And if inode is the same and we have suitable
430                  * openhandle, then there is no point in doing another OPEN RPC
431                  * just to throw away newly received openhandle.  There are no
432                  * security implications too, if file owner or access mode is
433                  * change, LOOKUP lock is revoked.
434                  */
435
436
437                 if (it->it_flags & FMODE_WRITE) {
438                         och_p = &lli->lli_mds_write_och;
439                         och_usecount = &lli->lli_open_fd_write_count;
440                 } else if (it->it_flags & FMODE_EXEC) {
441                         och_p = &lli->lli_mds_exec_och;
442                         och_usecount = &lli->lli_open_fd_exec_count;
443                 } else {
444                         och_p = &lli->lli_mds_read_och;
445                         och_usecount = &lli->lli_open_fd_read_count;
446                 }
447                 /* Check for the proper lock. */
448                 if (!ll_have_md_lock(inode, MDS_INODELOCK_LOOKUP))
449                         goto do_lock;
450                 down(&lli->lli_och_sem);
451                 if (*och_p) { /* Everything is open already, do nothing */
452                         /*(*och_usecount)++;  Do not let them steal our open
453                           handle from under us */
454                         /* XXX The code above was my original idea, but in case
455                            we have the handle, but we cannot use it due to later
456                            checks (e.g. O_CREAT|O_EXCL flags set), nobody
457                            would decrement counter increased here. So we just
458                            hope the lock won't be invalidated in between. But
459                            if it would be, we'll reopen the open request to
460                            MDS later during file open path */
461                         up(&lli->lli_och_sem);
462                         ll_finish_md_op_data(op_data);
463                         RETURN(1);
464                 } else {
465                         up(&lli->lli_och_sem);
466                 }
467         }
468
469         if (it->it_op == IT_GETATTR)
470                 first = ll_statahead_enter(de->d_parent->d_inode, &de, 0);
471
472 do_lock:
473         it->it_create_mode &= ~current->fs->umask;
474         it->it_flags |= O_CHECK_STALE;
475         rc = md_intent_lock(exp, op_data, NULL, 0, it,
476                             lookup_flags,
477                             &req, ll_md_blocking_ast, 0);
478         it->it_flags &= ~O_CHECK_STALE;
479         ll_finish_md_op_data(op_data);
480         if (it->it_op == IT_GETATTR && !first)
481                 ll_statahead_exit(de, rc);
482
483         /* If req is NULL, then md_intent_lock only tried to do a lock match;
484          * if all was well, it will return 1 if it found locks, 0 otherwise. */
485         if (req == NULL && rc >= 0) {
486                 if (!rc)
487                         goto do_lookup;
488                 GOTO(out, rc);
489         }
490
491         if (rc < 0) {
492                 if (rc != -ESTALE) {
493                         CDEBUG(D_INFO, "ll_intent_lock: rc %d : it->it_status "
494                                "%d\n", rc, it->d.lustre.it_status);
495                 }
496                 GOTO(out, rc = 0);
497         }
498
499 revalidate_finish:
500         rc = ll_revalidate_it_finish(req, it, de);
501         if (rc != 0) {
502                 if (rc != -ESTALE && rc != -ENOENT)
503                         ll_intent_release(it);
504                 GOTO(out, rc = 0);
505         }
506
507         if ((it->it_op & IT_OPEN) && de->d_inode && 
508             !S_ISREG(de->d_inode->i_mode) && 
509             !S_ISDIR(de->d_inode->i_mode)) {
510                 ll_release_openhandle(de, it);
511         }
512         rc = 1;
513
514         /* unfortunately ll_intent_lock may cause a callback and revoke our
515          * dentry */
516         spin_lock(&ll_lookup_lock);
517         spin_lock(&dcache_lock);
518         lock_dentry(de);
519         __d_drop(de);
520         unlock_dentry(de);
521         d_rehash_cond(de, 0);
522         spin_unlock(&dcache_lock);
523         spin_unlock(&ll_lookup_lock);
524
525 out:
526         /* We do not free request as it may be reused during following lookup
527          * (see comment in mdc/mdc_locks.c::mdc_intent_lock()), request will
528          * be freed in ll_lookup_it or in ll_intent_release. But if
529          * request was not completed, we need to free it. (bug 5154, 9903) */
530         if (req != NULL && !it_disposition(it, DISP_ENQ_COMPLETE))
531                 ptlrpc_req_finished(req);
532         if (rc == 0) {
533 #ifdef DCACHE_LUSTRE_INVALID
534                 ll_unhash_aliases(de->d_inode);
535                 /* done in ll_unhash_aliases()
536                    dentry->d_flags |= DCACHE_LUSTRE_INVALID; */
537 #else
538                 /* We do not want d_invalidate to kill all child dentries too */
539                 d_drop(de);
540 #endif
541         } else {
542                 CDEBUG(D_DENTRY, "revalidated dentry %.*s (%p) parent %p "
543                        "inode %p refc %d\n", de->d_name.len,
544                        de->d_name.name, de, de->d_parent, de->d_inode,
545                        atomic_read(&de->d_count));
546                 ll_lookup_finish_locks(it, de);
547 #ifdef DCACHE_LUSTRE_INVALID
548                 lock_dentry(de);
549                 de->d_flags &= ~DCACHE_LUSTRE_INVALID;
550                 unlock_dentry(de);
551 #endif
552         }
553         RETURN(rc);
554
555         /*
556          * This part is here to combat evil-evil race in real_lookup on 2.6
557          * kernels.  The race details are: We enter do_lookup() looking for some
558          * name, there is nothing in dcache for this name yet and d_lookup()
559          * returns NULL.  We proceed to real_lookup(), and while we do this,
560          * another process does open on the same file we looking up (most simple
561          * reproducer), open succeeds and the dentry is added. Now back to
562          * us. In real_lookup() we do d_lookup() again and suddenly find the
563          * dentry, so we call d_revalidate on it, but there is no lock, so
564          * without this code we would return 0, but unpatched real_lookup just
565          * returns -ENOENT in such a case instead of retrying the lookup. Once
566          * this is dealt with in real_lookup(), all of this ugly mess can go and
567          * we can just check locks in ->d_revalidate without doing any RPCs
568          * ever.
569          */
570 do_lookup:
571         if (it != &lookup_it) {
572                 /* MDS_INODELOCK_UPDATE needed for IT_GETATTR case. */
573                 if (it->it_op == IT_GETATTR)
574                         lookup_it.it_op = IT_GETATTR;
575                 ll_lookup_finish_locks(it, de);
576                 it = &lookup_it;
577         }
578
579         /* Do real lookup here. */
580         op_data = ll_prep_md_op_data(NULL, parent, NULL, de->d_name.name,
581                                      de->d_name.len, 0, (it->it_op & IT_CREAT ?
582                                                          LUSTRE_OPC_CREATE :
583                                                          LUSTRE_OPC_ANY), NULL);
584         if (IS_ERR(op_data))
585                 RETURN(PTR_ERR(op_data));
586
587         rc = md_intent_lock(exp, op_data, NULL, 0,  it, 0, &req,
588                             ll_md_blocking_ast, 0);
589         if (rc >= 0) {
590                 struct mdt_body *mdt_body;
591                 struct lu_fid fid = {.f_seq = 0, .f_oid = 0, .f_ver = 0};
592                 mdt_body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
593
594                 if (de->d_inode)
595                         fid = *ll_inode2fid(de->d_inode);
596
597                 /* see if we got same inode, if not - return error */
598                 if (lu_fid_eq(&fid, &mdt_body->fid1)) {
599                         ll_finish_md_op_data(op_data);
600                         op_data = NULL;
601                         goto revalidate_finish;
602                 }
603                 ll_intent_release(it);
604         }
605         ll_finish_md_op_data(op_data);
606         GOTO(out, rc = 0);
607
608 out_sa:
609         /*
610          * For rc == 1 case, should not return directly to prevent losing
611          * statahead windows; for rc == 0 case, the "lookup" will be done later.
612          */
613         if (it && it->it_op == IT_GETATTR && rc == 1) {
614                 first = ll_statahead_enter(de->d_parent->d_inode, &de, 0);
615                 if (!first)
616                         ll_statahead_exit(de, rc);
617         }
618
619         return rc;
620 }
621
622 /*static*/ void ll_pin(struct dentry *de, struct vfsmount *mnt, int flag)
623 {
624         struct inode *inode= de->d_inode;
625         struct ll_sb_info *sbi = ll_i2sbi(inode);
626         struct ll_dentry_data *ldd = ll_d2d(de);
627         struct obd_client_handle *handle;
628         struct obd_capa *oc;
629         int rc = 0;
630         ENTRY;
631         LASSERT(ldd);
632
633         lock_kernel();
634         /* Strictly speaking this introduces an additional race: the
635          * increments should wait until the rpc has returned.
636          * However, given that at present the function is void, this
637          * issue is moot. */
638         if (flag == 1 && (++ldd->lld_mnt_count) > 1) {
639                 unlock_kernel();
640                 EXIT;
641                 return;
642         }
643
644         if (flag == 0 && (++ldd->lld_cwd_count) > 1) {
645                 unlock_kernel();
646                 EXIT;
647                 return;
648         }
649         unlock_kernel();
650
651         handle = (flag) ? &ldd->lld_mnt_och : &ldd->lld_cwd_och;
652         oc = ll_mdscapa_get(inode);
653         rc = obd_pin(sbi->ll_md_exp, ll_inode2fid(inode), oc, handle, flag);
654         capa_put(oc);
655         if (rc) {
656                 lock_kernel();
657                 memset(handle, 0, sizeof(*handle));
658                 if (flag == 0)
659                         ldd->lld_cwd_count--;
660                 else
661                         ldd->lld_mnt_count--;
662                 unlock_kernel();
663         }
664
665         EXIT;
666         return;
667 }
668
669 /*static*/ void ll_unpin(struct dentry *de, struct vfsmount *mnt, int flag)
670 {
671         struct ll_sb_info *sbi = ll_i2sbi(de->d_inode);
672         struct ll_dentry_data *ldd = ll_d2d(de);
673         struct obd_client_handle handle;
674         int count, rc = 0;
675         ENTRY;
676         LASSERT(ldd);
677
678         lock_kernel();
679         /* Strictly speaking this introduces an additional race: the
680          * increments should wait until the rpc has returned.
681          * However, given that at present the function is void, this
682          * issue is moot. */
683         handle = (flag) ? ldd->lld_mnt_och : ldd->lld_cwd_och;
684         if (handle.och_magic != OBD_CLIENT_HANDLE_MAGIC) {
685                 /* the "pin" failed */
686                 unlock_kernel();
687                 EXIT;
688                 return;
689         }
690
691         if (flag)
692                 count = --ldd->lld_mnt_count;
693         else
694                 count = --ldd->lld_cwd_count;
695         unlock_kernel();
696
697         if (count != 0) {
698                 EXIT;
699                 return;
700         }
701
702         rc = obd_unpin(sbi->ll_md_exp, &handle, flag);
703         EXIT;
704         return;
705 }
706
707 #ifdef HAVE_VFS_INTENT_PATCHES
708 static int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
709 {
710         int rc;
711         ENTRY;
712
713         if (nd && nd->flags & LOOKUP_LAST && !(nd->flags & LOOKUP_LINK_NOTLAST))
714                 rc = ll_revalidate_it(dentry, nd->flags, &nd->intent);
715         else
716                 rc = ll_revalidate_it(dentry, 0, NULL);
717
718         RETURN(rc);
719 }
720 #else
721 int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
722 {
723         int rc;
724         ENTRY;
725
726         if (nd && !(nd->flags & (LOOKUP_CONTINUE|LOOKUP_PARENT))) {
727                 struct lookup_intent *it;
728                 it = ll_convert_intent(&nd->intent.open, nd->flags);
729                 if (IS_ERR(it))
730                         RETURN(0);
731                 if (it->it_op == (IT_OPEN|IT_CREAT))
732                         if (nd->intent.open.flags & O_EXCL) {
733                                 CDEBUG(D_VFSTRACE, "create O_EXCL, returning 0\n");
734                                 rc = 0;
735                                 goto out_it;
736                         }
737
738                 rc = ll_revalidate_it(dentry, nd->flags, it);
739
740                 if (rc && (nd->flags & LOOKUP_OPEN) &&
741                     it_disposition(it, DISP_OPEN_OPEN)) {/*Open*/
742 #ifdef HAVE_FILE_IN_STRUCT_INTENT
743 // XXX Code duplication with ll_lookup_nd
744                         if (S_ISFIFO(dentry->d_inode->i_mode)) {
745                                 // We cannot call open here as it would
746                                 // deadlock.
747                                 ptlrpc_req_finished(
748                                                (struct ptlrpc_request *)
749                                                   it->d.lustre.it_data);
750                         } else {
751                                 struct file *filp;
752
753                                 nd->intent.open.file->private_data = it;
754                                 filp = lookup_instantiate_filp(nd, dentry,NULL);
755 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,17))
756 /* 2.6.1[456] have a bug in open_namei() that forgets to check
757  * nd->intent.open.file for error, so we need to return it as lookup's result
758  * instead */
759                                 if (IS_ERR(filp))
760                                         rc = 0;
761 #endif
762                         }
763 #else
764                         ll_release_openhandle(dentry, it);
765 #endif /* HAVE_FILE_IN_STRUCT_INTENT */
766                 }
767                 if (!rc && (nd->flags & LOOKUP_CREATE) &&
768                     it_disposition(it, DISP_OPEN_CREATE)) {
769                         /* We created something but we may only return
770                          * negative dentry here, so save request in dentry,
771                          * if lookup will be called later on, it will
772                          * pick the request, otherwise it would be freed
773                          * with dentry */
774                         ll_d2d(dentry)->lld_it = it;
775                         it = NULL; /* avoid freeing */
776                 }
777
778 out_it:
779                 if (it) {
780                         ll_intent_release(it);
781                         OBD_FREE(it, sizeof(*it));
782                 }
783         } else {
784                 rc = ll_revalidate_it(dentry, 0, NULL);
785         }
786
787         RETURN(rc);
788 }
789 #endif
790
791 struct dentry_operations ll_d_ops = {
792         .d_revalidate = ll_revalidate_nd,
793         .d_release = ll_release,
794         .d_delete = ll_ddelete,
795 #ifdef DCACHE_LUSTRE_INVALID
796         .d_compare = ll_dcompare,
797 #endif
798 #if 0
799         .d_pin = ll_pin,
800         .d_unpin = ll_unpin,
801 #endif
802 };
803
804 static int ll_fini_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
805 {
806         ENTRY;
807         /* need lookup */
808         RETURN(0);
809 }
810
811 struct dentry_operations ll_fini_d_ops = {
812         .d_revalidate = ll_fini_revalidate_nd,
813         .d_release = ll_release,
814 };
815
816 /*
817  * It is for the following race condition:
818  * When someone (maybe statahead thread) adds the dentry to the dentry hash
819  * table, the dentry's "d_op" maybe NULL, at the same time, another (maybe
820  * "ls -l") process finds such dentry by "do_lookup()" without "do_revalidate()"
821  * called. It causes statahead window lost, and maybe other issues. --Fan Yong
822  */
823 static int ll_init_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
824 {
825         struct l_wait_info lwi = { 0 };
826         struct ll_dentry_data *lld;
827         ENTRY;
828
829         ll_set_dd(dentry);
830         lld = ll_d2d(dentry);
831         if (unlikely(lld == NULL))
832                 RETURN(-ENOMEM);
833
834         l_wait_event(lld->lld_waitq, dentry->d_op != &ll_init_d_ops, &lwi);
835         if (likely(dentry->d_op == &ll_d_ops))
836                 RETURN(ll_revalidate_nd(dentry, nd));
837         else
838                 RETURN(dentry->d_op == &ll_fini_d_ops ? 0 : -EINVAL);
839 }
840
841 struct dentry_operations ll_init_d_ops = {
842         .d_revalidate = ll_init_revalidate_nd,
843         .d_release = ll_release,
844 };