Whamcloud - gitweb
de07cca8253dfd7edb782cd212303d0d7c29675d
[fs/lustre-release.git] / lustre / llite / dcache.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #include <linux/fs.h>
38 #include <linux/sched.h>
39 #include <linux/smp_lock.h>
40 #include <linux/quotaops.h>
41
42 #define DEBUG_SUBSYSTEM S_LLITE
43
44 #include <obd_support.h>
45 #include <lustre_lite.h>
46 #include <lustre/lustre_idl.h>
47 #include <lustre_dlm.h>
48 #include <lustre_mdc.h>
49 //#include <lustre_ver.h>
50 //#include <lustre_version.h>
51
52 #include "llite_internal.h"
53
54 cfs_spinlock_t ll_lookup_lock = CFS_SPIN_LOCK_UNLOCKED;
55
56 /* should NOT be called with the dcache lock, see fs/dcache.c */
57 static void ll_release(struct dentry *de)
58 {
59         struct ll_dentry_data *lld;
60         ENTRY;
61         LASSERT(de != NULL);
62         lld = ll_d2d(de);
63         if (lld == NULL) { /* NFS copies the de->d_op methods (bug 4655) */
64                 EXIT;
65                 return;
66         }
67 #ifndef HAVE_VFS_INTENT_PATCHES
68         if (lld->lld_it) {
69                 ll_intent_release(lld->lld_it);
70                 OBD_FREE(lld->lld_it, sizeof(*lld->lld_it));
71         }
72 #endif
73         LASSERT(lld->lld_cwd_count == 0);
74         LASSERT(lld->lld_mnt_count == 0);
75         OBD_FREE(de->d_fsdata, sizeof(*lld));
76
77         EXIT;
78 }
79
80 /* Compare if two dentries are the same.  Don't match if the existing dentry
81  * is marked DCACHE_LUSTRE_INVALID.  Returns 1 if different, 0 if the same.
82  *
83  * This avoids a race where ll_lookup_it() instantiates a dentry, but we get
84  * an AST before calling d_revalidate_it().  The dentry still exists (marked
85  * INVALID) so d_lookup() matches it, but we have no lock on it (so
86  * lock_match() fails) and we spin around real_lookup(). */
87 int ll_dcompare(struct dentry *parent, struct qstr *d_name, struct qstr *name)
88 {
89         struct dentry *dchild;
90         ENTRY;
91
92         if (d_name->len != name->len)
93                 RETURN(1);
94
95         if (memcmp(d_name->name, name->name, name->len))
96                 RETURN(1);
97
98         /* XXX: d_name must be in-dentry structure */
99         dchild = container_of(d_name, struct dentry, d_name); /* ugh */
100
101         CDEBUG(D_DENTRY,"found name %.*s(%p) - flags %d/%x - refc %d\n",
102                name->len, name->name, dchild,
103                d_mountpoint(dchild), dchild->d_flags & DCACHE_LUSTRE_INVALID,
104                atomic_read(&dchild->d_count));
105
106          /* mountpoint is always valid */
107         if (d_mountpoint(dchild))
108                 RETURN(0);
109
110         if (dchild->d_flags & DCACHE_LUSTRE_INVALID)
111                 RETURN(1);
112
113
114         RETURN(0);
115 }
116
117 static inline int return_if_equal(struct ldlm_lock *lock, void *data)
118 {
119         return LDLM_ITER_STOP;
120 }
121
122 /* find any ldlm lock of the inode in mdc and lov
123  * return 0    not find
124  *        1    find one
125  *      < 0    error */
126 static int find_cbdata(struct inode *inode)
127 {
128         struct ll_inode_info *lli = ll_i2info(inode);
129         struct ll_sb_info *sbi = ll_i2sbi(inode);
130         int rc = 0;
131         ENTRY;
132
133         LASSERT(inode);
134         rc = md_find_cbdata(sbi->ll_md_exp, ll_inode2fid(inode),
135                             return_if_equal, NULL);
136         if (rc != 0)
137                  RETURN(rc);
138
139         if (lli->lli_smd)
140                 rc = obd_find_cbdata(sbi->ll_dt_exp, lli->lli_smd,
141                                      return_if_equal, NULL);
142
143         RETURN(rc);
144 }
145
146 /* should NOT be called with the dcache lock, see fs/dcache.c */
147 static int ll_ddelete(struct dentry *de)
148 {
149         ENTRY;
150         LASSERT(de);
151
152         CDEBUG(D_DENTRY, "%s dentry %.*s (%p, parent %p, inode %p) %s%s\n",
153                (de->d_flags & DCACHE_LUSTRE_INVALID ? "deleting" : "keeping"),
154                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
155                d_unhashed(de) ? "" : "hashed,",
156                list_empty(&de->d_subdirs) ? "" : "subdirs");
157
158         /* if not ldlm lock for this inode, set i_nlink to 0 so that
159          * this inode can be recycled later b=20433 */
160         LASSERT(atomic_read(&de->d_count) == 0);
161         if (de->d_inode && !find_cbdata(de->d_inode))
162                 de->d_inode->i_nlink = 0;
163
164         if (de->d_flags & DCACHE_LUSTRE_INVALID)
165                 RETURN(1);
166
167         RETURN(0);
168 }
169
170 void ll_set_dd(struct dentry *de)
171 {
172         ENTRY;
173         LASSERT(de != NULL);
174
175         CDEBUG(D_DENTRY, "ldd on dentry %.*s (%p) parent %p inode %p refc %d\n",
176                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
177                atomic_read(&de->d_count));
178
179         if (de->d_fsdata == NULL) {
180                 struct ll_dentry_data *lld;
181
182                 OBD_ALLOC_PTR(lld);
183                 if (likely(lld != NULL)) {
184                         lock_dentry(de);
185                         if (likely(de->d_fsdata == NULL))
186                                 de->d_fsdata = lld;
187                         else
188                                 OBD_FREE_PTR(lld);
189                         unlock_dentry(de);
190                 }
191         }
192
193         EXIT;
194 }
195
196 void ll_intent_drop_lock(struct lookup_intent *it)
197 {
198         struct lustre_handle *handle;
199
200         if (it->it_op && it->d.lustre.it_lock_mode) {
201                 handle = (struct lustre_handle *)&it->d.lustre.it_lock_handle;
202                 CDEBUG(D_DLMTRACE, "releasing lock with cookie "LPX64
203                        " from it %p\n", handle->cookie, it);
204                 ldlm_lock_decref(handle, it->d.lustre.it_lock_mode);
205
206                 /* bug 494: intent_release may be called multiple times, from
207                  * this thread and we don't want to double-decref this lock */
208                 it->d.lustre.it_lock_mode = 0;
209         }
210 }
211
212 void ll_intent_release(struct lookup_intent *it)
213 {
214         ENTRY;
215
216         CDEBUG(D_INFO, "intent %p released\n", it);
217         ll_intent_drop_lock(it);
218 #ifdef HAVE_VFS_INTENT_PATCHES
219         it->it_magic = 0;
220         it->it_op_release = 0;
221 #endif
222         /* We are still holding extra reference on a request, need to free it */
223         if (it_disposition(it, DISP_ENQ_OPEN_REF))
224                  ptlrpc_req_finished(it->d.lustre.it_data); /* ll_file_open */
225         if (it_disposition(it, DISP_ENQ_CREATE_REF)) /* create rec */
226                 ptlrpc_req_finished(it->d.lustre.it_data);
227         if (it_disposition(it, DISP_ENQ_COMPLETE)) /* saved req from revalidate
228                                                     * to lookup */
229                 ptlrpc_req_finished(it->d.lustre.it_data);
230
231         it->d.lustre.it_disposition = 0;
232         it->d.lustre.it_data = NULL;
233         EXIT;
234 }
235
236 /* Drop dentry if it is not used already, unhash otherwise.
237    Should be called with dcache lock held!
238    Returns: 1 if dentry was dropped, 0 if unhashed. */
239 int ll_drop_dentry(struct dentry *dentry)
240 {
241         lock_dentry(dentry);
242         if (atomic_read(&dentry->d_count) == 0) {
243                 CDEBUG(D_DENTRY, "deleting dentry %.*s (%p) parent %p "
244                        "inode %p\n", dentry->d_name.len,
245                        dentry->d_name.name, dentry, dentry->d_parent,
246                        dentry->d_inode);
247                 dget_locked(dentry);
248                 __d_drop(dentry);
249                 unlock_dentry(dentry);
250                 spin_unlock(&dcache_lock);
251                 cfs_spin_unlock(&ll_lookup_lock);
252                 dput(dentry);
253                 cfs_spin_lock(&ll_lookup_lock);
254                 spin_lock(&dcache_lock);
255                 return 1;
256         }
257         /* disconected dentry can not be find without lookup, because we
258          * not need his to unhash or mark invalid. */
259         if (dentry->d_flags & DCACHE_DISCONNECTED) {
260                 unlock_dentry(dentry);
261                 RETURN (0);
262         }
263
264         if (!(dentry->d_flags & DCACHE_LUSTRE_INVALID)) {
265                 CDEBUG(D_DENTRY, "unhashing dentry %.*s (%p) parent %p "
266                        "inode %p refc %d\n", dentry->d_name.len,
267                        dentry->d_name.name, dentry, dentry->d_parent,
268                        dentry->d_inode, atomic_read(&dentry->d_count));
269                 /* actually we don't unhash the dentry, rather just
270                  * mark it inaccessible for to __d_lookup(). otherwise
271                  * sys_getcwd() could return -ENOENT -bzzz */
272                 dentry->d_flags |= DCACHE_LUSTRE_INVALID;
273                 if (!dentry->d_inode || !S_ISDIR(dentry->d_inode->i_mode))
274                         __d_drop(dentry);
275         }
276         unlock_dentry(dentry);
277         return 0;
278 }
279
280 void ll_unhash_aliases(struct inode *inode)
281 {
282         struct list_head *tmp, *head;
283         ENTRY;
284
285         if (inode == NULL) {
286                 CERROR("unexpected NULL inode, tell phil\n");
287                 return;
288         }
289
290         CDEBUG(D_INODE, "marking dentries for ino %lu/%u(%p) invalid\n",
291                inode->i_ino, inode->i_generation, inode);
292
293         head = &inode->i_dentry;
294         cfs_spin_lock(&ll_lookup_lock);
295         spin_lock(&dcache_lock);
296 restart:
297         tmp = head;
298         while ((tmp = tmp->next) != head) {
299                 struct dentry *dentry = list_entry(tmp, struct dentry, d_alias);
300
301                 CDEBUG(D_DENTRY, "dentry in drop %.*s (%p) parent %p "
302                        "inode %p flags %d\n", dentry->d_name.len,
303                        dentry->d_name.name, dentry, dentry->d_parent,
304                        dentry->d_inode, dentry->d_flags);
305
306                 if (dentry->d_name.len == 1 && dentry->d_name.name[0] == '/') {
307                         CERROR("called on root (?) dentry=%p, inode=%p "
308                                "ino=%lu\n", dentry, inode, inode->i_ino);
309                         lustre_dump_dentry(dentry, 1);
310                         libcfs_debug_dumpstack(NULL);
311                 }
312
313                 if (ll_drop_dentry(dentry))
314                           goto restart;
315         }
316         spin_unlock(&dcache_lock);
317         cfs_spin_unlock(&ll_lookup_lock);
318
319         EXIT;
320 }
321
322 int ll_revalidate_it_finish(struct ptlrpc_request *request,
323                             struct lookup_intent *it,
324                             struct dentry *de)
325 {
326         int rc = 0;
327         ENTRY;
328
329         if (!request)
330                 RETURN(0);
331
332         if (it_disposition(it, DISP_LOOKUP_NEG))
333                 RETURN(-ENOENT);
334
335         rc = ll_prep_inode(&de->d_inode, request, NULL);
336
337         RETURN(rc);
338 }
339
340 void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry)
341 {
342         LASSERT(it != NULL);
343         LASSERT(dentry != NULL);
344
345         if (it->d.lustre.it_lock_mode && dentry->d_inode != NULL) {
346                 struct inode *inode = dentry->d_inode;
347                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
348
349                 CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%u)\n",
350                        inode, inode->i_ino, inode->i_generation);
351                 md_set_lock_data(sbi->ll_md_exp, &it->d.lustre.it_lock_handle,
352                                  inode, NULL);
353         }
354
355         /* drop lookup or getattr locks immediately */
356         if (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR) {
357                 /* on 2.6 there are situation when several lookups and
358                  * revalidations may be requested during single operation.
359                  * therefore, we don't release intent here -bzzz */
360                 ll_intent_drop_lock(it);
361         }
362 }
363
364 void ll_frob_intent(struct lookup_intent **itp, struct lookup_intent *deft)
365 {
366         struct lookup_intent *it = *itp;
367 #ifdef HAVE_VFS_INTENT_PATCHES
368         if (it) {
369                 LASSERTF(it->it_magic == INTENT_MAGIC,
370                          "%p has bad intent magic: %x\n",
371                          it, it->it_magic);
372         }
373 #endif
374
375         if (!it || it->it_op == IT_GETXATTR)
376                 it = *itp = deft;
377
378 #ifdef HAVE_VFS_INTENT_PATCHES
379         it->it_op_release = ll_intent_release;
380 #endif
381 }
382
383 int ll_revalidate_it(struct dentry *de, int lookup_flags,
384                      struct lookup_intent *it)
385 {
386         struct md_op_data *op_data;
387         struct ptlrpc_request *req = NULL;
388         struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
389         struct obd_export *exp;
390         struct inode *parent = de->d_parent->d_inode;
391         int rc, first = 0;
392
393         ENTRY;
394         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,intent=%s\n", de->d_name.name,
395                LL_IT2STR(it));
396
397         if (de->d_inode == NULL) {
398                 /* We can only use negative dentries if this is stat or lookup,
399                    for opens and stuff we do need to query server. */
400                 /* If there is IT_CREAT in intent op set, then we must throw
401                    away this negative dentry and actually do the request to
402                    kernel to create whatever needs to be created (if possible)*/
403                 if (it && (it->it_op & IT_CREAT))
404                         RETURN(0);
405
406                 if (de->d_flags & DCACHE_LUSTRE_INVALID)
407                         RETURN(0);
408
409                 rc = ll_have_md_lock(parent, MDS_INODELOCK_UPDATE);
410                 GOTO(out_sa, rc);
411         }
412
413         /* Never execute intents for mount points.
414          * Attributes will be fixed up in ll_inode_revalidate_it */
415         if (d_mountpoint(de))
416                 GOTO(out_sa, rc = 1);
417
418         /* need to get attributes in case root got changed from other client */
419         if (de == de->d_sb->s_root) {
420                 rc = __ll_inode_revalidate_it(de, it, MDS_INODELOCK_LOOKUP);
421                 if (rc == 0)
422                         rc = 1;
423                 GOTO(out_sa, rc);
424         }
425
426         exp = ll_i2mdexp(de->d_inode);
427
428         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_REVALIDATE_PAUSE, 5);
429         ll_frob_intent(&it, &lookup_it);
430         LASSERT(it);
431
432         if (it->it_op == IT_LOOKUP && !(de->d_flags & DCACHE_LUSTRE_INVALID))
433                 GOTO(out_sa, rc = 1);
434
435         op_data = ll_prep_md_op_data(NULL, parent, de->d_inode,
436                                      de->d_name.name, de->d_name.len,
437                                      0, LUSTRE_OPC_ANY, NULL);
438         if (IS_ERR(op_data))
439                 RETURN(PTR_ERR(op_data));
440
441         if ((it->it_op == IT_OPEN) && de->d_inode) {
442                 struct inode *inode = de->d_inode;
443                 struct ll_inode_info *lli = ll_i2info(inode);
444                 struct obd_client_handle **och_p;
445                 __u64 *och_usecount;
446
447                 /*
448                  * We used to check for MDS_INODELOCK_OPEN here, but in fact
449                  * just having LOOKUP lock is enough to justify inode is the
450                  * same. And if inode is the same and we have suitable
451                  * openhandle, then there is no point in doing another OPEN RPC
452                  * just to throw away newly received openhandle.  There are no
453                  * security implications too, if file owner or access mode is
454                  * change, LOOKUP lock is revoked.
455                  */
456
457
458                 if (it->it_flags & FMODE_WRITE) {
459                         och_p = &lli->lli_mds_write_och;
460                         och_usecount = &lli->lli_open_fd_write_count;
461                 } else if (it->it_flags & FMODE_EXEC) {
462                         och_p = &lli->lli_mds_exec_och;
463                         och_usecount = &lli->lli_open_fd_exec_count;
464                 } else {
465                         och_p = &lli->lli_mds_read_och;
466                         och_usecount = &lli->lli_open_fd_read_count;
467                 }
468                 /* Check for the proper lock. */
469                 if (!ll_have_md_lock(inode, MDS_INODELOCK_LOOKUP))
470                         goto do_lock;
471                 cfs_down(&lli->lli_och_sem);
472                 if (*och_p) { /* Everything is open already, do nothing */
473                         /*(*och_usecount)++;  Do not let them steal our open
474                           handle from under us */
475                         /* XXX The code above was my original idea, but in case
476                            we have the handle, but we cannot use it due to later
477                            checks (e.g. O_CREAT|O_EXCL flags set), nobody
478                            would decrement counter increased here. So we just
479                            hope the lock won't be invalidated in between. But
480                            if it would be, we'll reopen the open request to
481                            MDS later during file open path */
482                         cfs_up(&lli->lli_och_sem);
483                         ll_finish_md_op_data(op_data);
484                         RETURN(1);
485                 } else {
486                         cfs_up(&lli->lli_och_sem);
487                 }
488         }
489
490         if (it->it_op == IT_GETATTR) {
491                 first = ll_statahead_enter(parent, &de, 0);
492                 if (first == 1) {
493                         ll_statahead_exit(parent, de, 1);
494                         ll_finish_md_op_data(op_data);
495                         GOTO(out, rc = 1);
496                 }
497         }
498
499 do_lock:
500         it->it_create_mode &= ~current->fs->umask;
501         it->it_create_mode |= M_CHECK_STALE;
502         rc = md_intent_lock(exp, op_data, NULL, 0, it,
503                             lookup_flags,
504                             &req, ll_md_blocking_ast, 0);
505         it->it_create_mode &= ~M_CHECK_STALE;
506         ll_finish_md_op_data(op_data);
507         if (it->it_op == IT_GETATTR && !first)
508                 /* If there are too many locks on client-side, then some
509                  * locks taken by statahead maybe dropped automatically
510                  * before the real "revalidate" using them. */
511                 ll_statahead_exit(parent, de, req == NULL ? rc : 0);
512         else if (first == -EEXIST)
513                 ll_statahead_mark(parent, de);
514
515         /* If req is NULL, then md_intent_lock only tried to do a lock match;
516          * if all was well, it will return 1 if it found locks, 0 otherwise. */
517         if (req == NULL && rc >= 0) {
518                 if (!rc)
519                         goto do_lookup;
520                 GOTO(out, rc);
521         }
522
523         if (rc < 0) {
524                 if (rc != -ESTALE) {
525                         CDEBUG(D_INFO, "ll_intent_lock: rc %d : it->it_status "
526                                "%d\n", rc, it->d.lustre.it_status);
527                 }
528                 GOTO(out, rc = 0);
529         }
530
531 revalidate_finish:
532         rc = ll_revalidate_it_finish(req, it, de);
533         if (rc != 0) {
534                 if (rc != -ESTALE && rc != -ENOENT)
535                         ll_intent_release(it);
536                 GOTO(out, rc = 0);
537         }
538
539         if ((it->it_op & IT_OPEN) && de->d_inode &&
540             !S_ISREG(de->d_inode->i_mode) &&
541             !S_ISDIR(de->d_inode->i_mode)) {
542                 ll_release_openhandle(de, it);
543         }
544         rc = 1;
545
546         /* unfortunately ll_intent_lock may cause a callback and revoke our
547          * dentry */
548         cfs_spin_lock(&ll_lookup_lock);
549         spin_lock(&dcache_lock);
550         lock_dentry(de);
551         __d_drop(de);
552         unlock_dentry(de);
553         d_rehash_cond(de, 0);
554         spin_unlock(&dcache_lock);
555         cfs_spin_unlock(&ll_lookup_lock);
556
557 out:
558         /* We do not free request as it may be reused during following lookup
559          * (see comment in mdc/mdc_locks.c::mdc_intent_lock()), request will
560          * be freed in ll_lookup_it or in ll_intent_release. But if
561          * request was not completed, we need to free it. (bug 5154, 9903) */
562         if (req != NULL && !it_disposition(it, DISP_ENQ_COMPLETE))
563                 ptlrpc_req_finished(req);
564         if (rc == 0) {
565                 ll_unhash_aliases(de->d_inode);
566                 /* done in ll_unhash_aliases()
567                    dentry->d_flags |= DCACHE_LUSTRE_INVALID; */
568         } else {
569                 CDEBUG(D_DENTRY, "revalidated dentry %.*s (%p) parent %p "
570                        "inode %p refc %d\n", de->d_name.len,
571                        de->d_name.name, de, de->d_parent, de->d_inode,
572                        atomic_read(&de->d_count));
573                 if (first != 1) {
574                         if (de->d_flags & DCACHE_LUSTRE_INVALID) {
575                                 lock_dentry(de);
576                                 de->d_flags &= ~DCACHE_LUSTRE_INVALID;
577                                 unlock_dentry(de);
578                         }
579                         ll_lookup_finish_locks(it, de);
580                 }
581         }
582         RETURN(rc);
583
584         /*
585          * This part is here to combat evil-evil race in real_lookup on 2.6
586          * kernels.  The race details are: We enter do_lookup() looking for some
587          * name, there is nothing in dcache for this name yet and d_lookup()
588          * returns NULL.  We proceed to real_lookup(), and while we do this,
589          * another process does open on the same file we looking up (most simple
590          * reproducer), open succeeds and the dentry is added. Now back to
591          * us. In real_lookup() we do d_lookup() again and suddenly find the
592          * dentry, so we call d_revalidate on it, but there is no lock, so
593          * without this code we would return 0, but unpatched real_lookup just
594          * returns -ENOENT in such a case instead of retrying the lookup. Once
595          * this is dealt with in real_lookup(), all of this ugly mess can go and
596          * we can just check locks in ->d_revalidate without doing any RPCs
597          * ever.
598          */
599 do_lookup:
600         if (it != &lookup_it) {
601                 /* MDS_INODELOCK_UPDATE needed for IT_GETATTR case. */
602                 if (it->it_op == IT_GETATTR)
603                         lookup_it.it_op = IT_GETATTR;
604                 ll_lookup_finish_locks(it, de);
605                 it = &lookup_it;
606         }
607
608         /* Do real lookup here. */
609         op_data = ll_prep_md_op_data(NULL, parent, NULL, de->d_name.name,
610                                      de->d_name.len, 0, (it->it_op & IT_CREAT ?
611                                                          LUSTRE_OPC_CREATE :
612                                                          LUSTRE_OPC_ANY), NULL);
613         if (IS_ERR(op_data))
614                 RETURN(PTR_ERR(op_data));
615
616         rc = md_intent_lock(exp, op_data, NULL, 0,  it, 0, &req,
617                             ll_md_blocking_ast, 0);
618         if (rc >= 0) {
619                 struct mdt_body *mdt_body;
620                 struct lu_fid fid = {.f_seq = 0, .f_oid = 0, .f_ver = 0};
621                 mdt_body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
622
623                 if (de->d_inode)
624                         fid = *ll_inode2fid(de->d_inode);
625
626                 /* see if we got same inode, if not - return error */
627                 if (lu_fid_eq(&fid, &mdt_body->fid1)) {
628                         ll_finish_md_op_data(op_data);
629                         op_data = NULL;
630                         goto revalidate_finish;
631                 }
632                 ll_intent_release(it);
633         }
634         ll_finish_md_op_data(op_data);
635         GOTO(out, rc = 0);
636
637 out_sa:
638         /*
639          * For rc == 1 case, should not return directly to prevent losing
640          * statahead windows; for rc == 0 case, the "lookup" will be done later.
641          */
642         if (it && it->it_op == IT_GETATTR && rc == 1) {
643                 first = ll_statahead_enter(parent, &de, 0);
644                 if (first >= 0)
645                         ll_statahead_exit(parent, de, 1);
646                 else if (first == -EEXIST)
647                         ll_statahead_mark(parent, de);
648         }
649
650         return rc;
651 }
652
653 #if 0
654 static void ll_pin(struct dentry *de, struct vfsmount *mnt, int flag)
655 {
656         struct inode *inode= de->d_inode;
657         struct ll_sb_info *sbi = ll_i2sbi(inode);
658         struct ll_dentry_data *ldd = ll_d2d(de);
659         struct obd_client_handle *handle;
660         struct obd_capa *oc;
661         int rc = 0;
662         ENTRY;
663         LASSERT(ldd);
664
665         cfs_lock_kernel();
666         /* Strictly speaking this introduces an additional race: the
667          * increments should wait until the rpc has returned.
668          * However, given that at present the function is void, this
669          * issue is moot. */
670         if (flag == 1 && (++ldd->lld_mnt_count) > 1) {
671                 cfs_unlock_kernel();
672                 EXIT;
673                 return;
674         }
675
676         if (flag == 0 && (++ldd->lld_cwd_count) > 1) {
677                 cfs_unlock_kernel();
678                 EXIT;
679                 return;
680         }
681         cfs_unlock_kernel();
682
683         handle = (flag) ? &ldd->lld_mnt_och : &ldd->lld_cwd_och;
684         oc = ll_mdscapa_get(inode);
685         rc = obd_pin(sbi->ll_md_exp, ll_inode2fid(inode), oc, handle, flag);
686         capa_put(oc);
687         if (rc) {
688                 cfs_lock_kernel();
689                 memset(handle, 0, sizeof(*handle));
690                 if (flag == 0)
691                         ldd->lld_cwd_count--;
692                 else
693                         ldd->lld_mnt_count--;
694                 cfs_unlock_kernel();
695         }
696
697         EXIT;
698         return;
699 }
700
701 static void ll_unpin(struct dentry *de, struct vfsmount *mnt, int flag)
702 {
703         struct ll_sb_info *sbi = ll_i2sbi(de->d_inode);
704         struct ll_dentry_data *ldd = ll_d2d(de);
705         struct obd_client_handle handle;
706         int count, rc = 0;
707         ENTRY;
708         LASSERT(ldd);
709
710         cfs_lock_kernel();
711         /* Strictly speaking this introduces an additional race: the
712          * increments should wait until the rpc has returned.
713          * However, given that at present the function is void, this
714          * issue is moot. */
715         handle = (flag) ? ldd->lld_mnt_och : ldd->lld_cwd_och;
716         if (handle.och_magic != OBD_CLIENT_HANDLE_MAGIC) {
717                 /* the "pin" failed */
718                 cfs_unlock_kernel();
719                 EXIT;
720                 return;
721         }
722
723         if (flag)
724                 count = --ldd->lld_mnt_count;
725         else
726                 count = --ldd->lld_cwd_count;
727         cfs_unlock_kernel();
728
729         if (count != 0) {
730                 EXIT;
731                 return;
732         }
733
734         rc = obd_unpin(sbi->ll_md_exp, &handle, flag);
735         EXIT;
736         return;
737 }
738 #endif
739
740 #ifdef HAVE_VFS_INTENT_PATCHES
741 int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
742 {
743         int rc;
744         ENTRY;
745
746         if (nd && nd->flags & LOOKUP_LAST && !(nd->flags & LOOKUP_LINK_NOTLAST))
747                 rc = ll_revalidate_it(dentry, nd->flags, &nd->intent);
748         else
749                 rc = ll_revalidate_it(dentry, 0, NULL);
750
751         RETURN(rc);
752 }
753 #else
754 int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
755 {
756         int rc;
757         ENTRY;
758
759         if (nd && !(nd->flags & (LOOKUP_CONTINUE|LOOKUP_PARENT))) {
760                 struct lookup_intent *it;
761                 it = ll_convert_intent(&nd->intent.open, nd->flags);
762                 if (IS_ERR(it))
763                         RETURN(0);
764                 if (it->it_op == (IT_OPEN|IT_CREAT))
765                         if (nd->intent.open.flags & O_EXCL) {
766                                 CDEBUG(D_VFSTRACE, "create O_EXCL, returning 0\n");
767                                 rc = 0;
768                                 goto out_it;
769                         }
770
771                 rc = ll_revalidate_it(dentry, nd->flags, it);
772
773                 if (rc && (nd->flags & LOOKUP_OPEN) &&
774                     it_disposition(it, DISP_OPEN_OPEN)) {/*Open*/
775 #ifdef HAVE_FILE_IN_STRUCT_INTENT
776 // XXX Code duplication with ll_lookup_nd
777                         if (S_ISFIFO(dentry->d_inode->i_mode)) {
778                                 // We cannot call open here as it would
779                                 // deadlock.
780                                 ptlrpc_req_finished(
781                                                (struct ptlrpc_request *)
782                                                   it->d.lustre.it_data);
783                         } else {
784 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,17))
785 /* 2.6.1[456] have a bug in open_namei() that forgets to check
786  * nd->intent.open.file for error, so we need to return it as lookup's result
787  * instead */
788                                 struct file *filp;
789
790                                 nd->intent.open.file->private_data = it;
791                                 filp = lookup_instantiate_filp(nd, dentry,NULL);
792                                 if (IS_ERR(filp)) {
793                                         rc = PTR_ERR(filp);
794                                 }
795 #else
796                                 nd->intent.open.file->private_data = it;
797                                 (void)lookup_instantiate_filp(nd, dentry,NULL);
798 #endif
799                         }
800 #else
801                         ll_release_openhandle(dentry, it);
802 #endif /* HAVE_FILE_IN_STRUCT_INTENT */
803                 }
804                 if (!rc && (nd->flags & LOOKUP_CREATE) &&
805                     it_disposition(it, DISP_OPEN_CREATE)) {
806                         /* We created something but we may only return
807                          * negative dentry here, so save request in dentry,
808                          * if lookup will be called later on, it will
809                          * pick the request, otherwise it would be freed
810                          * with dentry */
811                         ll_d2d(dentry)->lld_it = it;
812                         it = NULL; /* avoid freeing */
813                 }
814
815 out_it:
816                 if (it) {
817                         ll_intent_release(it);
818                         OBD_FREE(it, sizeof(*it));
819                 }
820         } else {
821                 rc = ll_revalidate_it(dentry, 0, NULL);
822         }
823
824         RETURN(rc);
825 }
826 #endif
827
828 void ll_d_iput(struct dentry *de, struct inode *inode)
829 {
830         LASSERT(inode);
831         if (!find_cbdata(inode))
832                 inode->i_nlink = 0;
833         iput(inode);
834 }
835
836 struct dentry_operations ll_d_ops = {
837         .d_revalidate = ll_revalidate_nd,
838         .d_release = ll_release,
839         .d_delete  = ll_ddelete,
840         .d_iput    = ll_d_iput,
841         .d_compare = ll_dcompare,
842 #if 0
843         .d_pin = ll_pin,
844         .d_unpin = ll_unpin,
845 #endif
846 };