Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / ldlm / ldlm_inodebits.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003, 2004 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #define DEBUG_SUBSYSTEM S_LDLM
25 #ifndef __KERNEL__
26 # include <liblustre.h>
27 #endif
28
29 #include <lustre_dlm.h>
30 #include <obd_support.h>
31 #include <lustre_lib.h>
32
33 #include "ldlm_internal.h"
34
35 /* Determine if the lock is compatible with all locks on the queue. */
36 static int
37 ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req,
38                             struct list_head *work_list)
39 {
40         struct list_head *tmp, *tmp_tail;
41         struct ldlm_lock *lock;
42         ldlm_mode_t req_mode = req->l_req_mode;
43         __u64 req_bits = req->l_policy_data.l_inodebits.bits;
44         int compat = 1;
45         ENTRY;
46
47         LASSERT(req_bits); /* There is no sense in lock with no bits set,
48                               I think. Also such a lock would be compatible
49                                with any other bit lock */
50         list_for_each(tmp, queue) {
51                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
52
53                 if (req == lock)
54                         RETURN(compat);
55
56                 /* locks are compatible, bits don't matter */
57                 if (lockmode_compat(lock->l_req_mode, req_mode)) {
58                         /* jump to next mode group */
59                         if (LDLM_SL_HEAD(&lock->l_sl_mode))
60                                 tmp = &list_entry(lock->l_sl_mode.next, 
61                                                   struct ldlm_lock,
62                                                   l_sl_mode)->l_res_link;
63                         continue;
64                 }
65                 
66                 tmp_tail = tmp;
67                 if (LDLM_SL_HEAD(&lock->l_sl_mode))
68                         tmp_tail = &list_entry(lock->l_sl_mode.next,
69                                                struct ldlm_lock,
70                                                l_sl_mode)->l_res_link;
71                 for (;;) {
72                         /* locks with bits overlapped are conflicting locks */
73                         if (lock->l_policy_data.l_inodebits.bits & req_bits) {
74                                 /* conflicting policy */
75                                 if (!work_list)
76                                         RETURN(0);
77
78                                 compat = 0;
79                                 if (lock->l_blocking_ast)
80                                         ldlm_add_ast_work_item(lock, req, 
81                                                                work_list);
82                                 /* add all members of the policy group */
83                                 if (LDLM_SL_HEAD(&lock->l_sl_policy)) {
84                                         do {
85                                                 tmp = lock->l_res_link.next;
86                                                 lock = list_entry(tmp,
87                                                             struct ldlm_lock,
88                                                             l_res_link);
89                                                 if (lock->l_blocking_ast)
90                                                         ldlm_add_ast_work_item(
91                                                                      lock,
92                                                                      req,
93                                                                      work_list);
94                                         } while (!LDLM_SL_TAIL(&lock->l_sl_policy));
95                                 }
96                         } else if (LDLM_SL_HEAD(&lock->l_sl_policy)) {
97                                 /* jump to next policy group */
98                                 tmp = &list_entry(lock->l_sl_policy.next,
99                                                   struct ldlm_lock,
100                                                   l_sl_policy)->l_res_link;
101                         }
102                         if (tmp == tmp_tail)
103                                 break;
104                         else
105                                 tmp = tmp->next;
106                         lock = list_entry(tmp, struct ldlm_lock, l_res_link);
107                 }       /* for locks in a mode group */
108         }       /* for each lock in the queue */
109
110         RETURN(compat);
111 }
112
113 /* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
114   *   - blocking ASTs have already been sent
115   *   - must call this function with the ns lock held
116   *
117   * If first_enq is 1 (ie, called from ldlm_lock_enqueue):
118   *   - blocking ASTs have not been sent
119   *   - must call this function with the ns lock held once */
120 int ldlm_process_inodebits_lock(struct ldlm_lock *lock, int *flags,
121                                 int first_enq, ldlm_error_t *err,
122                                 struct list_head *work_list)
123 {
124         struct ldlm_resource *res = lock->l_resource;
125         struct list_head rpc_list = CFS_LIST_HEAD_INIT(rpc_list);
126         int rc;
127         ENTRY;
128
129         LASSERT(list_empty(&res->lr_converting));
130         check_res_locked(res);
131
132         if (!first_enq) {
133                 LASSERT(work_list != NULL);
134                 rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, NULL);
135                 if (!rc)
136                         RETURN(LDLM_ITER_STOP);
137                 rc = ldlm_inodebits_compat_queue(&res->lr_waiting, lock, NULL);
138                 if (!rc)
139                         RETURN(LDLM_ITER_STOP);
140
141                 ldlm_resource_unlink_lock(lock);
142                 ldlm_grant_lock(lock, work_list);
143                 RETURN(LDLM_ITER_CONTINUE);
144         }
145
146  restart:
147         rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, &rpc_list);
148         rc += ldlm_inodebits_compat_queue(&res->lr_waiting, lock, &rpc_list);
149
150         if (rc != 2) {
151                 /* If either of the compat_queue()s returned 0, then we
152                  * have ASTs to send and must go onto the waiting list.
153                  *
154                  * bug 2322: we used to unlink and re-add here, which was a
155                  * terrible folly -- if we goto restart, we could get
156                  * re-ordered!  Causes deadlock, because ASTs aren't sent! */
157                 if (list_empty(&lock->l_res_link))
158                         ldlm_resource_add_lock(res, &res->lr_waiting, lock);
159                 unlock_res(res);
160                 rc = ldlm_run_bl_ast_work(&rpc_list);
161                 lock_res(res);
162                 if (rc == -ERESTART)
163                         GOTO(restart, -ERESTART);
164                 *flags |= LDLM_FL_BLOCK_GRANTED;
165         } else {
166                 ldlm_resource_unlink_lock(lock);
167                 ldlm_grant_lock(lock, NULL);
168         }
169         RETURN(0);
170 }