Whamcloud - gitweb
- add more infrastructure to handle extents and debug.
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  * authors, Peter Braam <braam@clusterfs.com> & 
11  * Phil Schwan <phil@clusterfs.com>
12  */
13
14 #define EXPORT_SYMTAB
15
16 #include <linux/version.h>
17 #include <linux/module.h>
18 #include <linux/slab.h>
19 #include <asm/unistd.h>
20
21 #define DEBUG_SUBSYSTEM S_LDLM
22
23 #include <linux/obd_support.h>
24 #include <linux/obd_class.h>
25
26 #include <linux/lustre_dlm.h>
27
28 extern kmem_cache_t *ldlm_lock_slab;
29
30 ldlm_res_compat ldlm_res_compat_table [] = {
31         [LDLM_PLAIN] NULL,
32         [LDLM_EXTENT] ldlm_extent_compat,
33         [LDLM_MDSINTENT] NULL
34 };
35
36 ldlm_res_policy ldlm_res_policy_table [] = {
37         [LDLM_PLAIN] NULL,
38         [LDLM_EXTENT] ldlm_extent_policy,
39         [LDLM_MDSINTENT] NULL
40 };
41
42 static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
43                                        struct ldlm_resource *resource,
44                                        ldlm_mode_t mode)
45 {
46         struct ldlm_lock *lock;
47
48         if (resource == NULL)
49                 LBUG();
50
51         lock = kmem_cache_alloc(ldlm_lock_slab, SLAB_KERNEL);
52         if (lock == NULL)
53                 return NULL;
54
55         memset(lock, 0, sizeof(*lock));
56         lock->l_resource = resource;
57         lock->l_req_mode = mode;
58         INIT_LIST_HEAD(&lock->l_children);
59
60         if (parent != NULL) {
61                 lock->l_parent = parent;
62                 list_add(&lock->l_childof, &parent->l_children);
63         }
64
65         return lock;
66 }
67
68 static int ldlm_notify_incompatible(struct list_head *list,
69                                     struct ldlm_lock *new)
70 {
71         struct list_head *tmp;
72         int rc = 0;
73
74         list_for_each(tmp, list) {
75                 struct ldlm_lock *lock;
76                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
77                 if (lockmode_compat(lock->l_req_mode, new->l_req_mode))
78                         continue;
79
80                 rc = 1;
81
82                 if (lock->l_blocking_ast != NULL)
83                         lock->l_blocking_ast(lock, new, lock->l_data,
84                                              lock->l_data_len);
85         }
86
87         return rc;
88 }
89
90 static int ldlm_lock_compat(struct ldlm_lock *lock)
91 {
92         struct ldlm_resource *parent_res = lock->l_resource->lr_parent;
93         ldlm_res_compat compat;
94
95         if (parent_res &&
96             (compat = ldlm_res_compat_table[parent_res->lr_type])) {
97                 struct list_head *tmp;
98                 int incompat = 0;
99                 list_for_each(tmp, &parent_res->lr_children) {
100                         struct ldlm_resource *child;
101                         child = list_entry(tmp, struct ldlm_resource,
102                                            lr_childof);
103
104                         /* compat will return 0 when child == l_resource
105                          * hence notifications on the same resource are incl. */
106                         if (compat(child, lock->l_resource))
107                                 continue;
108
109                         incompat |= ldlm_notify_incompatible(&child->lr_granted,
110                                                              lock);
111                 }
112
113                 return incompat;
114         }
115
116         return ldlm_notify_incompatible(&lock->l_resource->lr_granted, lock);
117 }
118
119 static void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock)
120 {
121         ldlm_resource_add_lock(res, &res->lr_granted, lock);
122         lock->l_granted_mode = lock->l_req_mode;
123
124         if (lock->l_granted_mode < res->lr_most_restr)
125                 res->lr_most_restr = lock->l_granted_mode;
126
127         if (lock->l_completion_ast)
128                 lock->l_completion_ast(lock, NULL, NULL, 0);
129 }
130
131 static int ldlm_reprocess_queue(struct ldlm_lock *lock,
132                                 struct list_head *converting,
133                                 struct list_head *granted_list)
134 {
135         struct list_head *tmp, *pos;
136         int incompat = 0;
137
138         list_for_each_safe(tmp, pos, converting) { 
139                 struct ldlm_lock *pending;
140                 pending = list_entry(tmp, struct ldlm_lock, l_res_link);
141
142                 incompat = ldlm_lock_compat(pending);
143                 if (incompat)
144                         break;
145
146                 list_del(&pending->l_res_link); 
147                 ldlm_grant_lock(pending->l_resource, pending);
148         }
149
150         return incompat;
151 }
152
153 /* XXX: Revisit the error handling; we do not, for example, do
154  * ldlm_resource_put()s in our error cases, and we probably leak an allocated
155  * memory. */
156 ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev,
157                                      __u32 ns_id,
158                                      struct ldlm_handle *parent_lock_handle,
159                                      __u64 *res_id,
160                                      __u32 type,
161                                      ldlm_mode_t mode,
162                                      int *flags,
163                                      ldlm_lock_callback completion,
164                                      ldlm_lock_callback blocking,
165                                      void *data,
166                                      __u32 data_len,
167                                      struct ldlm_handle *lockh)
168 {
169         struct ldlm_namespace *ns;
170         struct ldlm_resource *res, *parent_res;
171         struct ldlm_lock *lock, *parent_lock;
172         int incompat = 0, rc;
173         __u64 new_id[RES_NAME_SIZE];
174         ldlm_res_policy policy;
175
176         ENTRY;
177         
178         parent_lock = ldlm_handle2object(parent_lock_handle);
179         if ( parent_lock ) 
180                 parent_res = parent_lock->l_resource;
181         else 
182                 parent_res = NULL;
183
184         ns = ldlm_namespace_find(obddev, ns_id);
185         if (ns == NULL || ns->ns_hash == NULL) 
186                 RETURN(-ELDLM_BAD_NAMESPACE);
187
188         if (parent_res &&
189             (policy = ldlm_res_policy_table[parent_res->lr_type])) {
190                 rc = policy(parent_res, res_id, new_id, mode, NULL);
191                 if (rc == ELDLM_RES_CHANGED) {
192                         *flags |= LDLM_FL_RES_CHANGED;
193                         memcpy(res_id, new_id, sizeof(__u64) * RES_NAME_SIZE);
194                 }
195         }
196
197         res = ldlm_resource_get(ns, parent_res, res_id, type, 1);
198         if (res == NULL)
199                 RETURN(-ENOMEM);
200
201         lock = ldlm_lock_new(parent_lock, res, mode);
202         if (lock == NULL)
203                 RETURN(-ENOMEM);
204
205         lock->l_data = data;
206         lock->l_data_len = data_len;
207         if ((*flags) & LDLM_FL_COMPLETION_AST)
208                 lock->l_completion_ast = completion;
209         if ((*flags) & LDLM_FL_BLOCKING_AST)
210                 lock->l_blocking_ast = blocking;
211         ldlm_object2handle(lock, lockh);
212         spin_lock(&res->lr_lock);
213
214         /* FIXME: We may want to optimize by checking lr_most_restr */
215
216         if (!list_empty(&res->lr_converting)) {
217                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
218                 GOTO(out, rc = -ELDLM_BLOCK_CONV);
219         }
220         if (!list_empty(&res->lr_waiting)) {
221                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
222                 GOTO(out, rc = -ELDLM_BLOCK_WAIT);
223         }
224
225         incompat = ldlm_lock_compat(lock);
226         if (incompat) {
227                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
228                 GOTO(out, rc = -ELDLM_BLOCK_GRANTED);
229         }
230
231         ldlm_grant_lock(res, lock);
232         GOTO(out, rc = ELDLM_OK);
233
234  out:
235         spin_unlock(&res->lr_lock);
236         return rc;
237 }
238
239 static void ldlm_reprocess_res_compat(struct ldlm_lock *lock)
240 {
241         struct ldlm_resource *parent_res = lock->l_resource->lr_parent;
242         struct list_head *tmp;
243         int do_waiting;
244
245         list_for_each(tmp, &parent_res->lr_children) {
246                 struct ldlm_resource *child;
247                 child = list_entry(tmp, struct ldlm_resource, lr_childof);
248
249                 ldlm_reprocess_queue(lock, &child->lr_converting,
250                                      &child->lr_granted);
251                 if (!list_empty(&child->lr_converting))
252                         do_waiting = 0;
253         }
254
255         if (!do_waiting)
256                 return;
257
258         list_for_each(tmp, &parent_res->lr_children) {
259                 struct ldlm_resource *child;
260                 child = list_entry(tmp, struct ldlm_resource, lr_childof);
261
262                 ldlm_reprocess_queue(lock, &child->lr_waiting,
263                                      &child->lr_granted);
264         }
265 }
266
267 static void ldlm_reprocess_all(struct ldlm_lock *lock)
268 {
269         struct ldlm_resource *res = lock->l_resource;
270         struct ldlm_resource *parent_res = res->lr_parent;
271
272         if (parent_res && ldlm_res_compat_table[parent_res->lr_type]) {
273                 ldlm_reprocess_res_compat(lock);
274                 return;
275         }
276
277         ldlm_reprocess_queue(lock, &res->lr_converting, &res->lr_granted);
278         if (list_empty(&res->lr_converting))
279                 ldlm_reprocess_queue(lock, &res->lr_waiting, &res->lr_granted);
280 }
281
282 ldlm_error_t ldlm_local_lock_cancel(struct obd_device *obddev,
283                                     struct ldlm_handle *lockh)
284 {
285         struct ldlm_lock *lock;
286         struct ldlm_resource *res;
287         ENTRY;
288
289         lock = ldlm_handle2object(lockh);
290         res = lock->l_resource;
291
292         ldlm_resource_del_lock(lock);
293
294         kmem_cache_free(ldlm_lock_slab, lock);
295         if (ldlm_resource_put(res))
296                 RETURN(ELDLM_OK);
297         ldlm_reprocess_all(lock);
298
299         RETURN(ELDLM_OK);
300 }
301
302 ldlm_error_t ldlm_local_lock_convert(struct obd_device *obddev,
303                                      struct ldlm_handle *lockh,
304                                      int new_mode, int *flags)
305 {
306         struct ldlm_lock *lock;
307         struct ldlm_resource *res;
308         ENTRY;
309
310         lock = ldlm_handle2object(lockh);
311         res = lock->l_resource;
312         list_del(&lock->l_res_link);
313         lock->l_req_mode = new_mode;
314
315         list_add(&lock->l_res_link, res->lr_converting.prev);
316
317         ldlm_reprocess_all(lock);
318
319         RETURN(ELDLM_OK);
320 }
321
322 void ldlm_lock_dump(struct ldlm_lock *lock)
323 {
324         char ver[128];
325
326         if (RES_VERSION_SIZE != 4)
327                 LBUG();
328
329         snprintf(ver, sizeof(ver), "%x %x %x %x",
330                  lock->l_version[0], lock->l_version[1],
331                  lock->l_version[2], lock->l_version[3]);
332
333         CDEBUG(D_OTHER, "  -- Lock dump: %p (%s)\n", lock, ver);
334         CDEBUG(D_OTHER, "  Parent: %p\n", lock->l_parent);
335         CDEBUG(D_OTHER, "  Requested mode: %d, granted mode: %d\n",
336                (int)lock->l_req_mode, (int)lock->l_granted_mode);
337 }