Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / llite / dcache.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #include <linux/fs.h>
23 #include <linux/sched.h>
24 #include <linux/smp_lock.h>
25 #include <linux/quotaops.h>
26
27 #define DEBUG_SUBSYSTEM S_LLITE
28
29 #include <obd_support.h>
30 #include <lustre_lite.h>
31 #include <lustre/lustre_idl.h>
32 #include <lustre_dlm.h>
33 #include <lustre_mdc.h>
34 //#include <lustre_ver.h>
35 //#include <lustre_version.h>
36
37 #include "llite_internal.h"
38
39 /* should NOT be called with the dcache lock, see fs/dcache.c */
40 static void ll_release(struct dentry *de)
41 {
42         struct ll_dentry_data *lld;
43         ENTRY;
44         LASSERT(de != NULL);
45         lld = ll_d2d(de);
46         if (lld == NULL) { /* NFS copies the de->d_op methods (bug 4655) */
47                 EXIT;
48                 return;
49         }
50 #ifndef HAVE_VFS_INTENT_PATCHES
51         if (lld->lld_it) {
52                 ll_intent_release(lld->lld_it);
53                 OBD_FREE(lld->lld_it, sizeof(*lld->lld_it));
54         }
55 #endif
56         LASSERT(lld->lld_cwd_count == 0);
57         LASSERT(lld->lld_mnt_count == 0);
58         OBD_FREE(de->d_fsdata, sizeof(*lld));
59
60         EXIT;
61 }
62
63 #ifdef DCACHE_LUSTRE_INVALID
64 /* Compare if two dentries are the same.  Don't match if the existing dentry
65  * is marked DCACHE_LUSTRE_INVALID.  Returns 1 if different, 0 if the same.
66  *
67  * This avoids a race where ll_lookup_it() instantiates a dentry, but we get
68  * an AST before calling d_revalidate_it().  The dentry still exists (marked
69  * INVALID) so d_lookup() matches it, but we have no lock on it (so
70  * lock_match() fails) and we spin around real_lookup(). */
71 int ll_dcompare(struct dentry *parent, struct qstr *d_name, struct qstr *name)
72 {
73         struct dentry *dchild;
74         ENTRY;
75
76         if (d_name->len != name->len)
77                 RETURN(1);
78
79         if (memcmp(d_name->name, name->name, name->len))
80                 RETURN(1);
81
82         /* XXX: d_name must be in-dentry structure */
83         dchild = container_of(d_name, struct dentry, d_name); /* ugh */
84         if (dchild->d_flags & DCACHE_LUSTRE_INVALID) {
85                 CDEBUG(D_DENTRY,"INVALID dentry %p not matched, was bug 3784\n",
86                        dchild);
87                 RETURN(1);
88         }
89
90         RETURN(0);
91 }
92 #endif
93
94 /* should NOT be called with the dcache lock, see fs/dcache.c */
95 static int ll_ddelete(struct dentry *de)
96 {
97         ENTRY;
98         LASSERT(de);
99 #ifndef DCACHE_LUSTRE_INVALID
100 #define DCACHE_LUSTRE_INVALID 0
101 #endif
102
103         CDEBUG(D_DENTRY, "%s dentry %.*s (%p, parent %p, inode %p) %s%s\n",
104                (de->d_flags & DCACHE_LUSTRE_INVALID ? "deleting" : "keeping"),
105                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
106                d_unhashed(de) ? "" : "hashed,",
107                list_empty(&de->d_subdirs) ? "" : "subdirs");
108 #if DCACHE_LUSTRE_INVALID == 0
109 #undef DCACHE_LUSTRE_INVALID
110 #endif
111
112         RETURN(0);
113 }
114
115 void ll_set_dd(struct dentry *de)
116 {
117         ENTRY;
118         LASSERT(de != NULL);
119
120         CDEBUG(D_DENTRY, "ldd on dentry %.*s (%p) parent %p inode %p refc %d\n",
121                de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
122                atomic_read(&de->d_count));
123
124         if (de->d_fsdata == NULL) {
125                 struct ll_dentry_data *lld;
126
127                 OBD_ALLOC_PTR(lld);
128                 if (likely(lld != NULL)) {
129                         cfs_waitq_init(&lld->lld_waitq);
130                         lock_dentry(de);
131                         if (likely(de->d_fsdata == NULL))
132                                 de->d_fsdata = lld;
133                         else
134                                 OBD_FREE_PTR(lld);
135                         unlock_dentry(de);
136                 }
137         }
138
139         EXIT;
140 }
141
142 void ll_intent_drop_lock(struct lookup_intent *it)
143 {
144         struct lustre_handle *handle;
145
146         if (it->it_op && it->d.lustre.it_lock_mode) {
147                 handle = (struct lustre_handle *)&it->d.lustre.it_lock_handle;
148                 CDEBUG(D_DLMTRACE, "releasing lock with cookie "LPX64
149                        " from it %p\n", handle->cookie, it);
150                 ldlm_lock_decref(handle, it->d.lustre.it_lock_mode);
151
152                 /* bug 494: intent_release may be called multiple times, from
153                  * this thread and we don't want to double-decref this lock */
154                 it->d.lustre.it_lock_mode = 0;
155         }
156 }
157
158 void ll_intent_release(struct lookup_intent *it)
159 {
160         ENTRY;
161
162         CDEBUG(D_INFO, "intent %p released\n", it);
163         ll_intent_drop_lock(it);
164 #ifdef HAVE_VFS_INTENT_PATCHES
165         it->it_magic = 0;
166         it->it_op_release = 0;
167 #endif
168         /* We are still holding extra reference on a request, need to free it */
169         if (it_disposition(it, DISP_ENQ_OPEN_REF))
170                  ptlrpc_req_finished(it->d.lustre.it_data); /* ll_file_open */
171         if (it_disposition(it, DISP_ENQ_CREATE_REF)) /* create rec */
172                 ptlrpc_req_finished(it->d.lustre.it_data);
173         if (it_disposition(it, DISP_ENQ_COMPLETE)) /* saved req from revalidate
174                                                     * to lookup */
175                 ptlrpc_req_finished(it->d.lustre.it_data);
176
177         it->d.lustre.it_disposition = 0;
178         it->d.lustre.it_data = NULL;
179         EXIT;
180 }
181
182 /* Drop dentry if it is not used already, unhash otherwise.
183    Should be called with dcache lock held!
184    Returns: 1 if dentry was dropped, 0 if unhashed. */
185 int ll_drop_dentry(struct dentry *dentry)
186 {
187         lock_dentry(dentry);
188         if (atomic_read(&dentry->d_count) == 0) {
189                 CDEBUG(D_DENTRY, "deleting dentry %.*s (%p) parent %p "
190                        "inode %p\n", dentry->d_name.len,
191                        dentry->d_name.name, dentry, dentry->d_parent,
192                        dentry->d_inode);
193                 dget_locked(dentry);
194                 __d_drop(dentry);
195                 unlock_dentry(dentry);
196                 spin_unlock(&dcache_lock);
197                 dput(dentry);
198                 spin_lock(&dcache_lock);
199                 return 1;
200         }
201         /* disconected dentry can not be find without lookup, because we 
202          * not need his to unhash or mark invalid. */
203         if (dentry->d_flags & DCACHE_DISCONNECTED) {
204                 unlock_dentry(dentry);
205                 RETURN (0);
206         }
207
208 #ifdef DCACHE_LUSTRE_INVALID
209         if (!(dentry->d_flags & DCACHE_LUSTRE_INVALID)) {
210 #else
211         if (!d_unhashed(dentry)) {
212 #endif
213                 CDEBUG(D_DENTRY, "unhashing dentry %.*s (%p) parent %p "
214                        "inode %p refc %d\n", dentry->d_name.len,
215                        dentry->d_name.name, dentry, dentry->d_parent,
216                        dentry->d_inode, atomic_read(&dentry->d_count));
217                 /* actually we don't unhash the dentry, rather just
218                  * mark it inaccessible for to __d_lookup(). otherwise
219                  * sys_getcwd() could return -ENOENT -bzzz */
220 #ifdef DCACHE_LUSTRE_INVALID
221                 dentry->d_flags |= DCACHE_LUSTRE_INVALID;
222 #endif
223                 if (!dentry->d_inode || !S_ISDIR(dentry->d_inode->i_mode))
224                         __d_drop(dentry);
225
226         }
227         unlock_dentry(dentry);
228         return 0;
229 }
230
231 void ll_unhash_aliases(struct inode *inode)
232 {
233         struct list_head *tmp, *head;
234         ENTRY;
235
236         if (inode == NULL) {
237                 CERROR("unexpected NULL inode, tell phil\n");
238                 return;
239         }
240
241         CDEBUG(D_INODE, "marking dentries for ino %lu/%u(%p) invalid\n",
242                inode->i_ino, inode->i_generation, inode);
243
244         head = &inode->i_dentry;
245         spin_lock(&dcache_lock);
246 restart:
247         tmp = head;
248         while ((tmp = tmp->next) != head) {
249                 struct dentry *dentry = list_entry(tmp, struct dentry, d_alias);
250
251                 CDEBUG(D_DENTRY, "dentry in drop %.*s (%p) parent %p "
252                        "inode %p flags %d\n", dentry->d_name.len,
253                        dentry->d_name.name, dentry, dentry->d_parent,
254                        dentry->d_inode, dentry->d_flags);
255
256                 if (dentry->d_name.len == 1 && dentry->d_name.name[0] == '/') {
257                         CERROR("called on root (?) dentry=%p, inode=%p "
258                                "ino=%lu\n", dentry, inode, inode->i_ino);
259                         lustre_dump_dentry(dentry, 1);
260                         libcfs_debug_dumpstack(NULL);
261                 } else if (d_mountpoint(dentry)) {
262                         /* For mountpoints we skip removal of the dentry
263                            which happens solely because we have a lock on it
264                            obtained when this dentry was not a mountpoint yet */
265                         CDEBUG(D_DENTRY, "Skippind mountpoint dentry removal "
266                                          "%.*s (%p) parent %p\n",
267                                           dentry->d_name.len,
268                                           dentry->d_name.name,
269                                           dentry, dentry->d_parent);
270
271                         continue;
272                 }
273
274                 if (ll_drop_dentry(dentry))
275                           goto restart;
276         }
277         spin_unlock(&dcache_lock);
278         EXIT;
279 }
280
281 int ll_revalidate_it_finish(struct ptlrpc_request *request,
282                             struct lookup_intent *it,
283                             struct dentry *de)
284 {
285         int rc = 0;
286         ENTRY;
287
288         if (!request)
289                 RETURN(0);
290
291         if (it_disposition(it, DISP_LOOKUP_NEG)) 
292                 RETURN(-ENOENT);
293
294         rc = ll_prep_inode(&de->d_inode, request, NULL);
295
296         RETURN(rc);
297 }
298
299 void ll_lookup_finish_locks(struct lookup_intent *it, struct dentry *dentry)
300 {
301         LASSERT(it != NULL);
302         LASSERT(dentry != NULL);
303
304         if (it->d.lustre.it_lock_mode && dentry->d_inode != NULL) {
305                 struct inode *inode = dentry->d_inode;
306                 struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
307
308                 CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%u)\n",
309                        inode, inode->i_ino, inode->i_generation);
310                 md_set_lock_data(sbi->ll_md_exp, &it->d.lustre.it_lock_handle,
311                                  inode);
312         }
313
314         /* drop lookup or getattr locks immediately */
315         if (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR) {
316                 /* on 2.6 there are situation when several lookups and
317                  * revalidations may be requested during single operation.
318                  * therefore, we don't release intent here -bzzz */
319                 ll_intent_drop_lock(it);
320         }
321 }
322
323 void ll_frob_intent(struct lookup_intent **itp, struct lookup_intent *deft)
324 {
325         struct lookup_intent *it = *itp;
326 #ifdef HAVE_VFS_INTENT_PATCHES
327         if (it) {
328                 LASSERTF(it->it_magic == INTENT_MAGIC, 
329                          "%p has bad intent magic: %x\n",
330                          it, it->it_magic);
331         }
332 #endif
333
334         if (!it || it->it_op == IT_GETXATTR)
335                 it = *itp = deft;
336
337 #ifdef HAVE_VFS_INTENT_PATCHES
338         it->it_op_release = ll_intent_release;
339 #endif
340 }
341
342 int ll_revalidate_it(struct dentry *de, int lookup_flags,
343                      struct lookup_intent *it)
344 {
345         struct md_op_data *op_data;
346         struct ptlrpc_request *req = NULL;
347         struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
348         struct obd_export *exp;
349         struct inode *parent;
350         int rc, first = 0;
351
352         ENTRY;
353         CDEBUG(D_VFSTRACE, "VFS Op:name=%s,intent=%s\n", de->d_name.name,
354                LL_IT2STR(it));
355
356         if (de->d_inode == NULL) {
357                 /* We can only use negative dentries if this is stat or lookup,
358                    for opens and stuff we do need to query server. */
359                 /* If there is IT_CREAT in intent op set, then we must throw
360                    away this negative dentry and actually do the request to
361                    kernel to create whatever needs to be created (if possible)*/
362                 if (it && (it->it_op & IT_CREAT))
363                         RETURN(0);
364
365 #ifdef DCACHE_LUSTRE_INVALID
366                 if (de->d_flags & DCACHE_LUSTRE_INVALID)
367                         RETURN(0);
368 #endif
369
370                 rc = ll_have_md_lock(de->d_parent->d_inode,
371                                      MDS_INODELOCK_UPDATE);
372                 GOTO(out_sa, rc);
373         }
374
375         exp = ll_i2mdexp(de->d_inode);
376
377         /* Never execute intents for mount points.
378          * Attributes will be fixed up in ll_inode_revalidate_it */
379         if (d_mountpoint(de))
380                 GOTO(out_sa, rc = 1);
381
382         /* Root of the lustre tree. Always valid.
383          * Attributes will be fixed up in ll_inode_revalidate_it */
384         if (de == de->d_sb->s_root)
385                 GOTO(out_sa, rc = 1);
386
387         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_REVALIDATE_PAUSE, 5);
388         ll_frob_intent(&it, &lookup_it);
389         LASSERT(it);
390         parent = de->d_parent->d_inode;
391
392         op_data = ll_prep_md_op_data(NULL, parent, de->d_inode,
393                                      de->d_name.name, de->d_name.len,
394                                      0, LUSTRE_OPC_ANY, NULL);
395         if (IS_ERR(op_data))
396                 RETURN(PTR_ERR(op_data));
397
398         if ((it->it_op == IT_OPEN) && de->d_inode) {
399                 struct inode *inode = de->d_inode;
400                 struct ll_inode_info *lli = ll_i2info(inode);
401                 struct obd_client_handle **och_p;
402                 __u64 *och_usecount;
403
404                 /*
405                  * We used to check for MDS_INODELOCK_OPEN here, but in fact
406                  * just having LOOKUP lock is enough to justify inode is the
407                  * same. And if inode is the same and we have suitable
408                  * openhandle, then there is no point in doing another OPEN RPC
409                  * just to throw away newly received openhandle.  There are no
410                  * security implications too, if file owner or access mode is
411                  * change, LOOKUP lock is revoked.
412                  */
413
414
415                 if (it->it_flags & FMODE_WRITE) {
416                         och_p = &lli->lli_mds_write_och;
417                         och_usecount = &lli->lli_open_fd_write_count;
418                 } else if (it->it_flags & FMODE_EXEC) {
419                         och_p = &lli->lli_mds_exec_och;
420                         och_usecount = &lli->lli_open_fd_exec_count;
421                 } else {
422                         och_p = &lli->lli_mds_read_och;
423                         och_usecount = &lli->lli_open_fd_read_count;
424                 }
425                 /* Check for the proper lock. */
426                 if (!ll_have_md_lock(inode, MDS_INODELOCK_LOOKUP))
427                         goto do_lock;
428                 down(&lli->lli_och_sem);
429                 if (*och_p) { /* Everything is open already, do nothing */
430                         /*(*och_usecount)++;  Do not let them steal our open
431                           handle from under us */
432                         /* XXX The code above was my original idea, but in case
433                            we have the handle, but we cannot use it due to later
434                            checks (e.g. O_CREAT|O_EXCL flags set), nobody
435                            would decrement counter increased here. So we just
436                            hope the lock won't be invalidated in between. But
437                            if it would be, we'll reopen the open request to
438                            MDS later during file open path */
439                         up(&lli->lli_och_sem);
440                         ll_finish_md_op_data(op_data);
441                         RETURN(1);
442                 } else {
443                         up(&lli->lli_och_sem);
444                 }
445         }
446
447         if (it->it_op == IT_GETATTR)
448                 first = ll_statahead_enter(de->d_parent->d_inode, &de, 0);
449
450 do_lock:
451         it->it_create_mode &= ~current->fs->umask;
452         it->it_flags |= O_CHECK_STALE;
453         rc = md_intent_lock(exp, op_data, NULL, 0, it,
454                             lookup_flags,
455                             &req, ll_md_blocking_ast, 0);
456         it->it_flags &= ~O_CHECK_STALE;
457         ll_finish_md_op_data(op_data);
458         if (it->it_op == IT_GETATTR && !first)
459                 ll_statahead_exit(de, rc);
460
461         /* If req is NULL, then md_intent_lock only tried to do a lock match;
462          * if all was well, it will return 1 if it found locks, 0 otherwise. */
463         if (req == NULL && rc >= 0) {
464                 if (!rc)
465                         goto do_lookup;
466                 GOTO(out, rc);
467         }
468
469         if (rc < 0) {
470                 if (rc != -ESTALE) {
471                         CDEBUG(D_INFO, "ll_intent_lock: rc %d : it->it_status "
472                                "%d\n", rc, it->d.lustre.it_status);
473                 }
474                 GOTO(out, rc = 0);
475         }
476
477 revalidate_finish:
478         rc = ll_revalidate_it_finish(req, it, de);
479         if (rc != 0) {
480                 if (rc != -ESTALE && rc != -ENOENT)
481                         ll_intent_release(it);
482                 GOTO(out, rc = 0);
483         }
484
485         if ((it->it_op & IT_OPEN) && de->d_inode && 
486             !S_ISREG(de->d_inode->i_mode) && 
487             !S_ISDIR(de->d_inode->i_mode)) {
488                 ll_release_openhandle(de, it);
489         }
490         rc = 1;
491
492         /* unfortunately ll_intent_lock may cause a callback and revoke our
493          * dentry */
494         spin_lock(&dcache_lock);
495         lock_dentry(de);
496         __d_drop(de);
497         unlock_dentry(de);
498         d_rehash_cond(de, 0);
499         spin_unlock(&dcache_lock);
500
501 out:
502         /* We do not free request as it may be reused during following lookup
503          * (see comment in mdc/mdc_locks.c::mdc_intent_lock()), request will
504          * be freed in ll_lookup_it or in ll_intent_release. But if
505          * request was not completed, we need to free it. (bug 5154, 9903) */
506         if (req != NULL && !it_disposition(it, DISP_ENQ_COMPLETE))
507                 ptlrpc_req_finished(req);
508         if (rc == 0) {
509 #ifdef DCACHE_LUSTRE_INVALID
510                 ll_unhash_aliases(de->d_inode);
511                 /* done in ll_unhash_aliases()
512                    dentry->d_flags |= DCACHE_LUSTRE_INVALID; */
513 #else
514                 /* We do not want d_invalidate to kill all child dentries too */
515                 d_drop(de);
516 #endif
517         } else {
518                 CDEBUG(D_DENTRY, "revalidated dentry %.*s (%p) parent %p "
519                        "inode %p refc %d\n", de->d_name.len,
520                        de->d_name.name, de, de->d_parent, de->d_inode,
521                        atomic_read(&de->d_count));
522                 ll_lookup_finish_locks(it, de);
523 #ifdef DCACHE_LUSTRE_INVALID
524                 lock_dentry(de);
525                 de->d_flags &= ~DCACHE_LUSTRE_INVALID;
526                 unlock_dentry(de);
527 #endif
528         }
529         RETURN(rc);
530
531         /*
532          * This part is here to combat evil-evil race in real_lookup on 2.6
533          * kernels.  The race details are: We enter do_lookup() looking for some
534          * name, there is nothing in dcache for this name yet and d_lookup()
535          * returns NULL.  We proceed to real_lookup(), and while we do this,
536          * another process does open on the same file we looking up (most simple
537          * reproducer), open succeeds and the dentry is added. Now back to
538          * us. In real_lookup() we do d_lookup() again and suddenly find the
539          * dentry, so we call d_revalidate on it, but there is no lock, so
540          * without this code we would return 0, but unpatched real_lookup just
541          * returns -ENOENT in such a case instead of retrying the lookup. Once
542          * this is dealt with in real_lookup(), all of this ugly mess can go and
543          * we can just check locks in ->d_revalidate without doing any RPCs
544          * ever.
545          */
546 do_lookup:
547         if (it != &lookup_it) {
548                 /* MDS_INODELOCK_UPDATE needed for IT_GETATTR case. */
549                 if (it->it_op == IT_GETATTR)
550                         lookup_it.it_op = IT_GETATTR;
551                 ll_lookup_finish_locks(it, de);
552                 it = &lookup_it;
553         }
554
555         /* Do real lookup here. */
556         op_data = ll_prep_md_op_data(NULL, parent, NULL, de->d_name.name,
557                                      de->d_name.len, 0, (it->it_op & IT_CREAT ?
558                                                          LUSTRE_OPC_CREATE :
559                                                          LUSTRE_OPC_ANY), NULL);
560         if (IS_ERR(op_data))
561                 RETURN(PTR_ERR(op_data));
562
563         rc = md_intent_lock(exp, op_data, NULL, 0,  it, 0, &req,
564                             ll_md_blocking_ast, 0);
565         if (rc >= 0) {
566                 struct mdt_body *mdt_body;
567                 struct lu_fid fid = {.f_seq = 0, .f_oid = 0, .f_ver = 0};
568                 mdt_body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
569
570                 if (de->d_inode)
571                         fid = *ll_inode2fid(de->d_inode);
572
573                 /* see if we got same inode, if not - return error */
574                 if (lu_fid_eq(&fid, &mdt_body->fid1)) {
575                         ll_finish_md_op_data(op_data);
576                         op_data = NULL;
577                         goto revalidate_finish;
578                 }
579                 ll_intent_release(it);
580         }
581         ll_finish_md_op_data(op_data);
582         GOTO(out, rc = 0);
583
584 out_sa:
585         /*
586          * For rc == 1 case, should not return directly to prevent losing
587          * statahead windows; for rc == 0 case, the "lookup" will be done later.
588          */
589         if (it && it->it_op == IT_GETATTR && rc == 1) {
590                 first = ll_statahead_enter(de->d_parent->d_inode, &de, 0);
591                 if (!first)
592                         ll_statahead_exit(de, rc);
593         }
594
595         return rc;
596 }
597
598 /*static*/ void ll_pin(struct dentry *de, struct vfsmount *mnt, int flag)
599 {
600         struct inode *inode= de->d_inode;
601         struct ll_sb_info *sbi = ll_i2sbi(inode);
602         struct ll_dentry_data *ldd = ll_d2d(de);
603         struct obd_client_handle *handle;
604         struct obd_capa *oc;
605         int rc = 0;
606         ENTRY;
607         LASSERT(ldd);
608
609         lock_kernel();
610         /* Strictly speaking this introduces an additional race: the
611          * increments should wait until the rpc has returned.
612          * However, given that at present the function is void, this
613          * issue is moot. */
614         if (flag == 1 && (++ldd->lld_mnt_count) > 1) {
615                 unlock_kernel();
616                 EXIT;
617                 return;
618         }
619
620         if (flag == 0 && (++ldd->lld_cwd_count) > 1) {
621                 unlock_kernel();
622                 EXIT;
623                 return;
624         }
625         unlock_kernel();
626
627         handle = (flag) ? &ldd->lld_mnt_och : &ldd->lld_cwd_och;
628         oc = ll_mdscapa_get(inode);
629         rc = obd_pin(sbi->ll_md_exp, ll_inode2fid(inode), oc, handle, flag);
630         capa_put(oc);
631         if (rc) {
632                 lock_kernel();
633                 memset(handle, 0, sizeof(*handle));
634                 if (flag == 0)
635                         ldd->lld_cwd_count--;
636                 else
637                         ldd->lld_mnt_count--;
638                 unlock_kernel();
639         }
640
641         EXIT;
642         return;
643 }
644
645 /*static*/ void ll_unpin(struct dentry *de, struct vfsmount *mnt, int flag)
646 {
647         struct ll_sb_info *sbi = ll_i2sbi(de->d_inode);
648         struct ll_dentry_data *ldd = ll_d2d(de);
649         struct obd_client_handle handle;
650         int count, rc = 0;
651         ENTRY;
652         LASSERT(ldd);
653
654         lock_kernel();
655         /* Strictly speaking this introduces an additional race: the
656          * increments should wait until the rpc has returned.
657          * However, given that at present the function is void, this
658          * issue is moot. */
659         handle = (flag) ? ldd->lld_mnt_och : ldd->lld_cwd_och;
660         if (handle.och_magic != OBD_CLIENT_HANDLE_MAGIC) {
661                 /* the "pin" failed */
662                 unlock_kernel();
663                 EXIT;
664                 return;
665         }
666
667         if (flag)
668                 count = --ldd->lld_mnt_count;
669         else
670                 count = --ldd->lld_cwd_count;
671         unlock_kernel();
672
673         if (count != 0) {
674                 EXIT;
675                 return;
676         }
677
678         rc = obd_unpin(sbi->ll_md_exp, &handle, flag);
679         EXIT;
680         return;
681 }
682
683 #ifdef HAVE_VFS_INTENT_PATCHES
684 static int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
685 {
686         int rc;
687         ENTRY;
688
689         if (nd && nd->flags & LOOKUP_LAST && !(nd->flags & LOOKUP_LINK_NOTLAST))
690                 rc = ll_revalidate_it(dentry, nd->flags, &nd->intent);
691         else
692                 rc = ll_revalidate_it(dentry, 0, NULL);
693
694         RETURN(rc);
695 }
696 #else
697 int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
698 {
699         int rc;
700         ENTRY;
701
702         if (nd && !(nd->flags & (LOOKUP_CONTINUE|LOOKUP_PARENT))) {
703                 struct lookup_intent *it;
704                 it = ll_convert_intent(&nd->intent.open, nd->flags);
705                 if (IS_ERR(it))
706                         RETURN(0);
707                 if (it->it_op == (IT_OPEN|IT_CREAT))
708                         if (nd->intent.open.flags & O_EXCL) {
709                                 CDEBUG(D_VFSTRACE, "create O_EXCL, returning 0\n");
710                                 rc = 0;
711                                 goto out_it;
712                         }
713
714                 rc = ll_revalidate_it(dentry, nd->flags, it);
715
716                 if (rc && (nd->flags & LOOKUP_OPEN) &&
717                     it_disposition(it, DISP_OPEN_OPEN)) {/*Open*/
718 #ifdef HAVE_FILE_IN_STRUCT_INTENT
719 // XXX Code duplication with ll_lookup_nd
720                         if (S_ISFIFO(dentry->d_inode->i_mode)) {
721                                 // We cannot call open here as it would
722                                 // deadlock.
723                                 ptlrpc_req_finished(
724                                                (struct ptlrpc_request *)
725                                                   it->d.lustre.it_data);
726                         } else {
727                                 struct file *filp;
728
729                                 nd->intent.open.file->private_data = it;
730                                 filp = lookup_instantiate_filp(nd, dentry,NULL);
731 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,17))
732 /* 2.6.1[456] have a bug in open_namei() that forgets to check
733  * nd->intent.open.file for error, so we need to return it as lookup's result
734  * instead */
735                                 if (IS_ERR(filp))
736                                         rc = 0;
737 #endif
738                         }
739 #else
740                         ll_release_openhandle(dentry, it);
741 #endif /* HAVE_FILE_IN_STRUCT_INTENT */
742                 }
743                 if (!rc && (nd->flags & LOOKUP_CREATE) &&
744                     it_disposition(it, DISP_OPEN_CREATE)) {
745                         /* We created something but we may only return
746                          * negative dentry here, so save request in dentry,
747                          * if lookup will be called later on, it will
748                          * pick the request, otherwise it would be freed
749                          * with dentry */
750                         ll_d2d(dentry)->lld_it = it;
751                         it = NULL; /* avoid freeing */
752                 }
753
754 out_it:
755                 if (it) {
756                         ll_intent_release(it);
757                         OBD_FREE(it, sizeof(*it));
758                 }
759         } else {
760                 rc = ll_revalidate_it(dentry, 0, NULL);
761         }
762
763         RETURN(rc);
764 }
765 #endif
766
767 struct dentry_operations ll_d_ops = {
768         .d_revalidate = ll_revalidate_nd,
769         .d_release = ll_release,
770         .d_delete = ll_ddelete,
771 #ifdef DCACHE_LUSTRE_INVALID
772         .d_compare = ll_dcompare,
773 #endif
774 #if 0
775         .d_pin = ll_pin,
776         .d_unpin = ll_unpin,
777 #endif
778 };
779
780 static int ll_fini_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
781 {
782         ENTRY;
783         /* need lookup */
784         RETURN(0);
785 }
786
787 struct dentry_operations ll_fini_d_ops = {
788         .d_revalidate = ll_fini_revalidate_nd,
789         .d_release = ll_release,
790 };
791
792 /*
793  * It is for the following race condition:
794  * When someone (maybe statahead thread) adds the dentry to the dentry hash
795  * table, the dentry's "d_op" maybe NULL, at the same time, another (maybe
796  * "ls -l") process finds such dentry by "do_lookup()" without "do_revalidate()"
797  * called. It causes statahead window lost, and maybe other issues. --Fan Yong
798  */
799 static int ll_init_revalidate_nd(struct dentry *dentry, struct nameidata *nd)
800 {
801         struct l_wait_info lwi = { 0 };
802         struct ll_dentry_data *lld;
803         ENTRY;
804
805         ll_set_dd(dentry);
806         lld = ll_d2d(dentry);
807         if (unlikely(lld == NULL))
808                 RETURN(-ENOMEM);
809
810         l_wait_event(lld->lld_waitq, dentry->d_op != &ll_init_d_ops, &lwi);
811         if (likely(dentry->d_op == &ll_d_ops))
812                 RETURN(ll_revalidate_nd(dentry, nd));
813         else
814                 RETURN(dentry->d_op == &ll_fini_d_ops ? 0 : -EINVAL);
815 }
816
817 struct dentry_operations ll_init_d_ops = {
818         .d_revalidate = ll_init_revalidate_nd,
819         .d_release = ll_release,
820 };