Whamcloud - gitweb
LU-163 MDS returns 32/64-bit dir name hash according to client type
[fs/lustre-release.git] / lustre / llite / dcache.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #include <linux/fs.h>
38 #include <linux/sched.h>
39 #include <linux/smp_lock.h>
40 #include <linux/quotaops.h>
41
42 #define DEBUG_SUBSYSTEM S_LLITE
43
44 #include <obd_support.h>
45 #include <lustre_lite.h>
46 #include <lustre/lustre_idl.h>
47 #include <lustre_dlm.h>
48 #include <lustre_mdc.h>
49 //#include <lustre_ver.h>
50 //#include <lustre_version.h>
51
52 #include "llite_internal.h"
53
54 cfs_spinlock_t ll_lookup_lock = CFS_SPIN_LOCK_UNLOCKED;
55
56 /* should NOT be called with the dcache lock, see fs/dcache.c */
57 static void ll_release(struct dentry *de)
58 {
59         struct ll_dentry_data *lld;
60         ENTRY;
61         LASSERT(de != NULL);
62         lld = ll_d2d(de);
63         if (lld == NULL) { /* NFS copies the de->d_op methods (bug 4655) */
64                 EXIT;
65                 return;
66         }
67         if (lld->lld_it) {
68                 ll_intent_release(lld->lld_it);
69                 OBD_FREE(lld->lld_it, sizeof(*lld->lld_it));
70         }
71         LASSERT(lld->lld_cwd_count == 0);
72         LASSERT(lld->lld_mnt_count == 0);
73         OBD_FREE(de->d_fsdata, sizeof(*lld));
74
75         EXIT;
76 }
77
78 /* Compare if two dentries are the same.  Don't match if the existing dentry
79  * is marked DCACHE_LUSTRE_INVALID.  Returns 1 if different, 0 if the same.
80  *
81  * This avoids a race where ll_lookup_it() instantiates a dentry, but we get
82  * an AST before calling d_revalidate_it().  The dentry still exists (marked
83  * INVALID) so d_lookup() matches it, but we have no lock on it (so
84  * lock_match() fails) and we spin around real_lookup(). */
85 int ll_dcompare(struct dentry *parent, struct qstr *d_name, struct qstr *name)
86 {
87         struct dentry *dchild;
88         ENTRY;
89
90         if (d_name->len != name->len)
91                 RETURN(1);
92
93         if (memcmp(d_name->name, name->name, name->len))
94                 RETURN(1);
95
96         /* XXX: d_name must be in-dentry structure */
97         dchild = container_of(d_name, struct dentry, d_name); /* ugh */
98
99         CDEBUG(D_DENTRY,"found name %.*s(%p) - flags %d/%x - refc %d\n",
100                name->len, name->name, dchild,
101                d_mountpoint(dchild), dchild->d_flags & DCACHE_LUSTRE_INVALID,
102                atomic_read(&dchild->d_count));
103
104          /* mountpoint is always valid */
105         if (d_mountpoint(dchild))
106                 RETURN(0);
107
108         if (dchild->d_flags & DCACHE_LUSTRE_INVALID)
109                 RETURN(1);
110
111         RETURN(0);
112 }
113
114 static inline int return_if_equal(struct ldlm_lock *lock, void *data)
115 {
116         if (lock->l_flags & LDLM_FL_CANCELING)
117                 return LDLM_ITER_CONTINUE;
118         return LDLM_ITER_STOP;
119 }
120
121 /* find any ldlm lock of the inode in mdc and lov
122  * return 0    not find
123  *        1    find one
124  *      < 0    error */
125 static int find_cbdata(struct inode *inode)
126 {
127         struct ll_inode_info *lli = ll_i2info(inode);
128         struct ll_sb_info *sbi = ll_i2sbi(inode);
129         int rc = 0;
130         ENTRY;
131
132         LASSERT(inode);
133         rc = md_find_cbdata(sbi->ll_md_exp, ll_inode2fid(inode),
134                             return_if_equal, NULL);
135         if (rc != 0)
136                  RETURN(rc);
137
138         if (lli->lli_smd)
139                 rc = obd_find_cbdata(sbi->ll_dt_exp, lli->lli_smd,
140                                      return_if_equal, NULL);
141
142         RETURN(rc);
143 }
144
145 /**
146  * Called when last reference to a dentry is dropped and dcache wants to know
147  * whether or not it should cache it:
148  * - return 1 to delete the dentry immediately
149  * - return 0 to cache the dentry
150  * Should NOT be called with the dcache lock, see fs/dcache.c
151  */
152 static int ll_ddelete(struct dentry *de)
153 {
154         ENTRY;
155         LASSERT(de);
156
157         CDEBUG(D_DENTRY, "%s dentry %.*s (%p, parent %p, inode %p) %s%s\n",
158                (de->d_flags & DCACHE_LUSTRE_INVALID ? "deleting" : "keeping"),
159                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
160                d_unhashed(de) ? "" : "hashed,",
161                list_empty(&de->d_subdirs) ? "" : "subdirs");
162
163         /* if not ldlm lock for this inode, set i_nlink to 0 so that
164          * this inode can be recycled later b=20433 */
165         LASSERT(atomic_read(&de->d_count) == 0);
166         if (de->d_inode && !find_cbdata(de->d_inode))
167                 de->d_inode->i_nlink = 0;
168
169         if (de->d_flags & DCACHE_LUSTRE_INVALID)
170                 RETURN(1);
171
172         RETURN(0);
173 }
174
175 static int ll_set_dd(struct dentry *de)
176 {
177         ENTRY;
178         LASSERT(de != NULL);
179
180         CDEBUG(D_DENTRY, "ldd on dentry %.*s (%p) parent %p inode %p refc %d\n",
181                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
182                atomic_read(&de->d_count));
183
184         if (de->d_fsdata == NULL) {
185                 struct ll_dentry_data *lld;
186
187                 OBD_ALLOC_PTR(lld);
188                 if (likely(lld != NULL)) {
189                         lock_dentry(de);
190                         if (likely(de->d_fsdata == NULL))
191                                 de->d_fsdata = lld;
192                         else
193                                 OBD_FREE_PTR(lld);
194                         unlock_dentry(de);
195                 } else {
196                         RETURN(-ENOMEM);
197                 }
198         }
199
200         RETURN(0);
201 }
202
203 int ll_dops_init(struct dentry *de, int block, int init_sa)
204 {
205         struct ll_dentry_data *lld = ll_d2d(de);
206         int rc = 0;
207
208         if (lld == NULL && block != 0) {
209                 rc = ll_set_dd(de);
210                 if (rc)
211                         return rc;
212
213                 lld = ll_d2d(de);
214         }
215
216         if (lld != NULL && init_sa != 0)
217                 lld->lld_sa_generation = 0;
218
219         de->d_op = &ll_d_ops;
220         return rc;
221 }
222
223 void ll_intent_drop_lock(struct lookup_intent *it)
224 {
225         struct lustre_handle *handle;
226
227         if (it->it_op && it->d.lustre.it_lock_mode) {
228                 handle = (struct lustre_handle *)&it->d.lustre.it_lock_handle;
229                 CDEBUG(D_DLMTRACE, "releasing lock with cookie "LPX64
230                        " from it %p\n", handle->cookie, it);
231                 ldlm_lock_decref(handle, it->d.lustre.it_lock_mode);
232
233                 /* bug 494: intent_release may be called multiple times, from
234                  * this thread and we don't want to double-decref this lock */
235                 it->d.lustre.it_lock_mode = 0;
236         }
237 }
238
239 void ll_intent_release(struct lookup_intent *it)
240 {
241         ENTRY;
242
243         CDEBUG(D_INFO, "intent %p released\n", it);
244         ll_intent_drop_lock(it);
245         /* We are still holding extra reference on a request, need to free it */
246         if (it_disposition(it, DISP_ENQ_OPEN_REF))
247                  ptlrpc_req_finished(it->d.lustre.it_data); /* ll_file_open */
248         if (it_disposition(it, DISP_ENQ_CREATE_REF)) /* create rec */
249                 ptlrpc_req_finished(it->d.lustre.it_data);
250         if (it_disposition(it, DISP_ENQ_COMPLETE)) /* saved req from revalidate
251                                                     * to lookup */
252                 ptlrpc_req_finished(it->d.lustre.it_data);
253
254         it->d.lustre.it_disposition = 0;
255         it->d.lustre.it_data = NULL;
256         EXIT;
257 }
258
259 /* Drop dentry if it is not used already, unhash otherwise.
260    Should be called with dcache lock held!
261    Returns: 1 if dentry was dropped, 0 if unhashed. */
262 int ll_drop_dentry(struct dentry *dentry)
263 {
264         lock_dentry(dentry);
265         if (atomic_read(&dentry->d_count) == 0) {
266                 CDEBUG(D_DENTRY, "deleting dentry %.*s (%p) parent %p "
267                        "inode %p\n", dentry->d_name.len,
268                        dentry->d_name.name, dentry, dentry->d_parent,
269                        dentry->d_inode);
270                 dget_locked(dentry);
271                 __d_drop(dentry);
272                 unlock_dentry(dentry);
273                 spin_unlock(&dcache_lock);
274                 cfs_spin_unlock(&ll_lookup_lock);
275                 dput(dentry);
276                 cfs_spin_lock(&ll_lookup_lock);
277                 spin_lock(&dcache_lock);
278                 return 1;
279         }
280         /* disconected dentry can not be find without lookup, because we
281          * not need his to unhash or mark invalid. */
282         if (dentry->d_flags & DCACHE_DISCONNECTED) {
283                 unlock_dentry(dentry);
284                 RETURN (0);
285         }
286
287         if (!(dentry->d_flags & DCACHE_LUSTRE_INVALID)) {
288                 CDEBUG(D_DENTRY, "unhashing dentry %.*s (%p) parent %p "
289                        "inode %p refc %d\n", dentry->d_name.len,
290                        dentry->d_name.name, dentry, dentry->d_parent,
291                        dentry->d_inode, atomic_read(&dentry->d_count));
292                 /* actually we don't unhash the dentry, rather just
293                  * mark it inaccessible for to __d_lookup(). otherwise
294                  * sys_getcwd() could return -ENOENT -bzzz */
295                 dentry->d_flags |= DCACHE_LUSTRE_INVALID;
296                 if (!dentry->d_inode || !S_ISDIR(dentry->d_inode->i_mode))
297                         __d_drop(dentry);
298         }
299         unlock_dentry(dentry);
300         return 0;
301 }
302
303 void ll_unhash_aliases(struct inode *inode)
304 {
305         struct list_head *tmp, *head;
306         ENTRY;
307
308         if (inode == NULL) {
309                 CERROR("unexpected NULL inode, tell phil\n");
310                 return;
311         }
312
313         CDEBUG(D_INODE, "marking dentries for ino %lu/%u(%p) invalid\n",
314                inode->i_ino, inode->i_generation, inode);
315
316         head = &inode->i_dentry;
317         cfs_spin_lock(&ll_lookup_lock);
318         spin_lock(&dcache_lock);
319 restart:
320         tmp = head;
321         while ((tmp = tmp->next) != head) {
322                 struct dentry *dentry = list_entry(tmp, struct dentry, d_alias);
323
324                 CDEBUG(D_DENTRY, "dentry in drop %.*s (%p) parent %p "
325                        "inode %p flags %d\n", dentry->d_name.len,
326                        dentry->d_name.name, dentry, dentry->d_parent,
327                        dentry->d_inode, dentry->d_flags);
328
329                 if (dentry->d_name.len == 1 && dentry->d_name.name[0] == '/') {
330                         CERROR("called on root (?) dentry=%p, inode=%p "
331                                "ino=%lu\n", dentry, inode, inode->i_ino);
332                         lustre_dump_dentry(dentry, 1);
333                         libcfs_debug_dumpstack(NULL);
334                 }
335
336                 if (ll_drop_dentry(dentry))
337                           goto restart;
338         }
339         spin_unlock(&dcache_lock);
340         cfs_spin_unlock(&ll_lookup_lock);
341
342         EXIT;
343 }
344
345 int ll_revalidate_it_finish(struct ptlrpc_request *request,
346                             struct lookup_intent *it,
347                             struct dentry *de)
348 {
349         int rc = 0;
350         ENTRY;
351
352         if (!request)
353                 RETURN(0);
354
355         if (it_disposition(it, DISP_LOOKUP_NEG))
356                 RETURN(-ENOENT);
357
358         rc = ll_prep_inode(&de->d_inode, request, NULL);
359
360         RETURN(rc);
361 }
362
363 void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry)
364 {
365         LASSERT(it != NULL);
366         LASSERT(dentry != NULL);
367
368         if (it->d.lustre.it_lock_mode && dentry->d_inode != NULL) {
369                 struct inode *inode = dentry->d_inode;
370                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
371
372                 CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%u)\n",
373                        inode, inode->i_ino, inode->i_generation);
374                 md_set_lock_data(sbi->ll_md_exp, &it->d.lustre.it_lock_handle,
375                                  inode, NULL);
376         }
377
378         /* drop lookup or getattr locks immediately */
379         if (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR) {
380                 /* on 2.6 there are situation when several lookups and
381                  * revalidations may be requested during single operation.
382                  * therefore, we don't release intent here -bzzz */
383                 ll_intent_drop_lock(it);
384         }
385 }
386
387 void ll_frob_intent(struct lookup_intent **itp, struct lookup_intent *deft)
388 {
389         struct lookup_intent *it = *itp;
390
391         if (!it || it->it_op == IT_GETXATTR)
392                 it = *itp = deft;
393
394 }
395
396 int ll_revalidate_it(struct dentry *de, int lookup_flags,
397                      struct lookup_intent *it)
398 {
399         struct md_op_data *op_data;
400         struct ptlrpc_request *req = NULL;
401         struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
402         struct obd_export *exp;
403         struct inode *parent = de->d_parent->d_inode;
404         int rc, first = 0;
405
406         ENTRY;
407         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,intent=%s\n", de->d_name.name,
408                LL_IT2STR(it));
409
410         if (de->d_inode == NULL) {
411                 /* We can only use negative dentries if this is stat or lookup,
412                    for opens and stuff we do need to query server. */
413                 /* If there is IT_CREAT in intent op set, then we must throw
414                    away this negative dentry and actually do the request to
415                    kernel to create whatever needs to be created (if possible)*/
416                 if (it && (it->it_op & IT_CREAT))
417                         RETURN(0);
418
419                 if (de->d_flags & DCACHE_LUSTRE_INVALID)
420                         RETURN(0);
421
422                 rc = ll_have_md_lock(parent, MDS_INODELOCK_UPDATE, LCK_MINMODE);
423                 GOTO(out_sa, rc);
424         }
425
426         /* Never execute intents for mount points.
427          * Attributes will be fixed up in ll_inode_revalidate_it */
428         if (d_mountpoint(de))
429                 GOTO(out_sa, rc = 1);
430
431         /* need to get attributes in case root got changed from other client */
432         if (de == de->d_sb->s_root) {
433                 rc = __ll_inode_revalidate_it(de, it, MDS_INODELOCK_LOOKUP);
434                 if (rc == 0)
435                         rc = 1;
436                 GOTO(out_sa, rc);
437         }
438
439         exp = ll_i2mdexp(de->d_inode);
440
441         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_REVALIDATE_PAUSE, 5);
442         ll_frob_intent(&it, &lookup_it);
443         LASSERT(it);
444
445         if (it->it_op == IT_LOOKUP && !(de->d_flags & DCACHE_LUSTRE_INVALID))
446                 GOTO(out_sa, rc = 1);
447
448         op_data = ll_prep_md_op_data(NULL, parent, de->d_inode,
449                                      de->d_name.name, de->d_name.len,
450                                      0, LUSTRE_OPC_ANY, NULL);
451         if (IS_ERR(op_data))
452                 RETURN(PTR_ERR(op_data));
453
454         if ((it->it_op == IT_OPEN) && de->d_inode) {
455                 struct inode *inode = de->d_inode;
456                 struct ll_inode_info *lli = ll_i2info(inode);
457                 struct obd_client_handle **och_p;
458                 __u64 *och_usecount;
459
460                 /*
461                  * We used to check for MDS_INODELOCK_OPEN here, but in fact
462                  * just having LOOKUP lock is enough to justify inode is the
463                  * same. And if inode is the same and we have suitable
464                  * openhandle, then there is no point in doing another OPEN RPC
465                  * just to throw away newly received openhandle.  There are no
466                  * security implications too, if file owner or access mode is
467                  * change, LOOKUP lock is revoked.
468                  */
469
470
471                 if (it->it_flags & FMODE_WRITE) {
472                         och_p = &lli->lli_mds_write_och;
473                         och_usecount = &lli->lli_open_fd_write_count;
474                 } else if (it->it_flags & FMODE_EXEC) {
475                         och_p = &lli->lli_mds_exec_och;
476                         och_usecount = &lli->lli_open_fd_exec_count;
477                 } else {
478                         och_p = &lli->lli_mds_read_och;
479                         och_usecount = &lli->lli_open_fd_read_count;
480                 }
481                 /* Check for the proper lock. */
482                 if (!ll_have_md_lock(inode, MDS_INODELOCK_LOOKUP, LCK_MINMODE))
483                         goto do_lock;
484                 cfs_down(&lli->lli_och_sem);
485                 if (*och_p) { /* Everything is open already, do nothing */
486                         /*(*och_usecount)++;  Do not let them steal our open
487                           handle from under us */
488                         /* XXX The code above was my original idea, but in case
489                            we have the handle, but we cannot use it due to later
490                            checks (e.g. O_CREAT|O_EXCL flags set), nobody
491                            would decrement counter increased here. So we just
492                            hope the lock won't be invalidated in between. But
493                            if it would be, we'll reopen the open request to
494                            MDS later during file open path */
495                         cfs_up(&lli->lli_och_sem);
496                         ll_finish_md_op_data(op_data);
497                         RETURN(1);
498                 } else {
499                         cfs_up(&lli->lli_och_sem);
500                 }
501         }
502
503         if (it->it_op == IT_GETATTR)
504                 first = ll_statahead_enter(parent, &de, 0);
505
506 do_lock:
507         it->it_create_mode &= ~cfs_curproc_umask();
508         it->it_create_mode |= M_CHECK_STALE;
509         rc = md_intent_lock(exp, op_data, NULL, 0, it,
510                             lookup_flags,
511                             &req, ll_md_blocking_ast, 0);
512         it->it_create_mode &= ~M_CHECK_STALE;
513         ll_finish_md_op_data(op_data);
514         if (it->it_op == IT_GETATTR && !first)
515                 /* If there are too many locks on client-side, then some
516                  * locks taken by statahead maybe dropped automatically
517                  * before the real "revalidate" using them. */
518                 ll_statahead_exit(parent, de, req == NULL ? rc : 0);
519         else if (first == -EEXIST)
520                 ll_statahead_mark(parent, de);
521
522         /* If req is NULL, then md_intent_lock only tried to do a lock match;
523          * if all was well, it will return 1 if it found locks, 0 otherwise. */
524         if (req == NULL && rc >= 0) {
525                 if (!rc)
526                         goto do_lookup;
527                 GOTO(out, rc);
528         }
529
530         if (rc < 0) {
531                 if (rc != -ESTALE) {
532                         CDEBUG(D_INFO, "ll_intent_lock: rc %d : it->it_status "
533                                "%d\n", rc, it->d.lustre.it_status);
534                 }
535                 GOTO(out, rc = 0);
536         }
537
538 revalidate_finish:
539         rc = ll_revalidate_it_finish(req, it, de);
540         if (rc != 0) {
541                 if (rc != -ESTALE && rc != -ENOENT)
542                         ll_intent_release(it);
543                 GOTO(out, rc = 0);
544         }
545
546         if ((it->it_op & IT_OPEN) && de->d_inode &&
547             !S_ISREG(de->d_inode->i_mode) &&
548             !S_ISDIR(de->d_inode->i_mode)) {
549                 ll_release_openhandle(de, it);
550         }
551         rc = 1;
552
553         /* unfortunately ll_intent_lock may cause a callback and revoke our
554          * dentry */
555         cfs_spin_lock(&ll_lookup_lock);
556         spin_lock(&dcache_lock);
557         lock_dentry(de);
558         __d_drop(de);
559         unlock_dentry(de);
560         d_rehash_cond(de, 0);
561         spin_unlock(&dcache_lock);
562         cfs_spin_unlock(&ll_lookup_lock);
563
564 out:
565         /* We do not free request as it may be reused during following lookup
566          * (see comment in mdc/mdc_locks.c::mdc_intent_lock()), request will
567          * be freed in ll_lookup_it or in ll_intent_release. But if
568          * request was not completed, we need to free it. (bug 5154, 9903) */
569         if (req != NULL && !it_disposition(it, DISP_ENQ_COMPLETE))
570                 ptlrpc_req_finished(req);
571         if (rc == 0) {
572                 ll_unhash_aliases(de->d_inode);
573                 /* done in ll_unhash_aliases()
574                    dentry->d_flags |= DCACHE_LUSTRE_INVALID; */
575         } else {
576                 CDEBUG(D_DENTRY, "revalidated dentry %.*s (%p) parent %p "
577                        "inode %p refc %d\n", de->d_name.len,
578                        de->d_name.name, de, de->d_parent, de->d_inode,
579                        atomic_read(&de->d_count));
580                 if (de->d_flags & DCACHE_LUSTRE_INVALID) {
581                         lock_dentry(de);
582                         de->d_flags &= ~DCACHE_LUSTRE_INVALID;
583                         unlock_dentry(de);
584                 }
585                 ll_lookup_finish_locks(it, de);
586         }
587         RETURN(rc);
588
589         /*
590          * This part is here to combat evil-evil race in real_lookup on 2.6
591          * kernels.  The race details are: We enter do_lookup() looking for some
592          * name, there is nothing in dcache for this name yet and d_lookup()
593          * returns NULL.  We proceed to real_lookup(), and while we do this,
594          * another process does open on the same file we looking up (most simple
595          * reproducer), open succeeds and the dentry is added. Now back to
596          * us. In real_lookup() we do d_lookup() again and suddenly find the
597          * dentry, so we call d_revalidate on it, but there is no lock, so
598          * without this code we would return 0, but unpatched real_lookup just
599          * returns -ENOENT in such a case instead of retrying the lookup. Once
600          * this is dealt with in real_lookup(), all of this ugly mess can go and
601          * we can just check locks in ->d_revalidate without doing any RPCs
602          * ever.
603          */
604 do_lookup:
605         if (it != &lookup_it) {
606                 /* MDS_INODELOCK_UPDATE needed for IT_GETATTR case. */
607                 if (it->it_op == IT_GETATTR)
608                         lookup_it.it_op = IT_GETATTR;
609                 ll_lookup_finish_locks(it, de);
610                 it = &lookup_it;
611         }
612
613         /* Do real lookup here. */
614         op_data = ll_prep_md_op_data(NULL, parent, NULL, de->d_name.name,
615                                      de->d_name.len, 0, (it->it_op & IT_CREAT ?
616                                                          LUSTRE_OPC_CREATE :
617                                                          LUSTRE_OPC_ANY), NULL);
618         if (IS_ERR(op_data))
619                 RETURN(PTR_ERR(op_data));
620
621         rc = md_intent_lock(exp, op_data, NULL, 0,  it, 0, &req,
622                             ll_md_blocking_ast, 0);
623         if (rc >= 0) {
624                 struct mdt_body *mdt_body;
625                 struct lu_fid fid = {.f_seq = 0, .f_oid = 0, .f_ver = 0};
626                 mdt_body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
627
628                 if (de->d_inode)
629                         fid = *ll_inode2fid(de->d_inode);
630
631                 /* see if we got same inode, if not - return error */
632                 if (lu_fid_eq(&fid, &mdt_body->fid1)) {
633                         ll_finish_md_op_data(op_data);
634                         op_data = NULL;
635                         goto revalidate_finish;
636                 }
637                 ll_intent_release(it);
638         }
639         ll_finish_md_op_data(op_data);
640         GOTO(out, rc = 0);
641
642 out_sa:
643         /*
644          * For rc == 1 case, should not return directly to prevent losing
645          * statahead windows; for rc == 0 case, the "lookup" will be done later.
646          */
647         if (it && it->it_op == IT_GETATTR && rc == 1) {
648                 first = ll_statahead_enter(parent, &de, 0);
649                 if (first >= 0)
650                         ll_statahead_exit(parent, de, 1);
651                 else if (first == -EEXIST)
652                         ll_statahead_mark(parent, de);
653         }
654
655         return rc;
656 }
657
658 int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
659 {
660         int rc;
661         ENTRY;
662
663         if (nd && !(nd->flags & (LOOKUP_CONTINUE|LOOKUP_PARENT))) {
664                 struct lookup_intent *it;
665
666                 it = ll_convert_intent(&nd->intent.open, nd->flags);
667                 if (IS_ERR(it))
668                         RETURN(0);
669
670                 if (it->it_op == (IT_OPEN|IT_CREAT) &&
671                     nd->intent.open.flags & O_EXCL) {
672                         CDEBUG(D_VFSTRACE, "create O_EXCL, returning 0\n");
673                         rc = 0;
674                         goto out_it;
675                 }
676
677                 rc = ll_revalidate_it(dentry, nd->flags, it);
678
679                 if (rc && (nd->flags & LOOKUP_OPEN) &&
680                     it_disposition(it, DISP_OPEN_OPEN)) {/*Open*/
681 #ifdef HAVE_FILE_IN_STRUCT_INTENT
682 // XXX Code duplication with ll_lookup_nd
683                         if (S_ISFIFO(dentry->d_inode->i_mode)) {
684                                 // We cannot call open here as it would
685                                 // deadlock.
686                                 ptlrpc_req_finished(
687                                                (struct ptlrpc_request *)
688                                                   it->d.lustre.it_data);
689                         } else {
690 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,17))
691 /* 2.6.1[456] have a bug in open_namei() that forgets to check
692  * nd->intent.open.file for error, so we need to return it as lookup's result
693  * instead */
694                                 struct file *filp;
695
696                                 nd->intent.open.file->private_data = it;
697                                 filp = lookup_instantiate_filp(nd, dentry,NULL);
698                                 if (IS_ERR(filp)) {
699                                         rc = PTR_ERR(filp);
700                                 }
701 #else
702                                 nd->intent.open.file->private_data = it;
703                                 (void)lookup_instantiate_filp(nd, dentry,NULL);
704 #endif
705                         }
706 #else
707                         ll_release_openhandle(dentry, it);
708 #endif /* HAVE_FILE_IN_STRUCT_INTENT */
709                 }
710                 if (!rc && (nd->flags & LOOKUP_CREATE) &&
711                     it_disposition(it, DISP_OPEN_CREATE)) {
712                         /* We created something but we may only return
713                          * negative dentry here, so save request in dentry,
714                          * if lookup will be called later on, it will
715                          * pick the request, otherwise it would be freed
716                          * with dentry */
717                         ll_d2d(dentry)->lld_it = it;
718                         it = NULL; /* avoid freeing */
719                 }
720
721 out_it:
722                 if (it) {
723                         ll_intent_release(it);
724                         OBD_FREE(it, sizeof(*it));
725                 }
726         } else {
727                 rc = ll_revalidate_it(dentry, 0, NULL);
728         }
729
730         RETURN(rc);
731 }
732
733 void ll_d_iput(struct dentry *de, struct inode *inode)
734 {
735         LASSERT(inode);
736         if (!find_cbdata(inode))
737                 inode->i_nlink = 0;
738         iput(inode);
739 }
740
741 struct dentry_operations ll_d_ops = {
742         .d_revalidate = ll_revalidate_nd,
743         .d_release = ll_release,
744         .d_delete  = ll_ddelete,
745         .d_iput    = ll_d_iput,
746         .d_compare = ll_dcompare,
747 };