Whamcloud - gitweb
64-bit warning fixes. Someone should take a closer look at ext2_obd.c
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  * authors, Peter Braam <braam@clusterfs.com> & 
11  * Phil Schwan <phil@clusterfs.com>
12  */
13
14 #define EXPORT_SYMTAB
15 #define DEBUG_SUBSYSTEM S_LDLM
16
17 #include <linux/slab.h>
18 #include <linux/lustre_dlm.h>
19
20 extern kmem_cache_t *ldlm_lock_slab;
21
22 static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b);
23 static int ldlm_intent_compat(struct ldlm_lock *a, struct ldlm_lock *b);
24
25 ldlm_res_compat ldlm_res_compat_table [] = {
26         [LDLM_PLAIN] ldlm_plain_compat,
27         [LDLM_EXTENT] ldlm_extent_compat,
28         [LDLM_MDSINTENT] ldlm_intent_compat
29 };
30
31 ldlm_res_policy ldlm_res_policy_table [] = {
32         [LDLM_PLAIN] NULL,
33         [LDLM_EXTENT] ldlm_extent_policy,
34         [LDLM_MDSINTENT] NULL
35 };
36
37 static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b)
38 {
39         return lockmode_compat(a->l_req_mode, b->l_req_mode);
40 }
41
42 static int ldlm_intent_compat(struct ldlm_lock *a, struct ldlm_lock *b)
43 {
44         LBUG();
45         return 0;
46 }
47
48 /* Caller should do ldlm_resource_get() on this resource first. */
49 static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
50                                        struct ldlm_resource *resource)
51 {
52         struct ldlm_lock *lock;
53
54         if (resource == NULL)
55                 LBUG();
56
57         lock = kmem_cache_alloc(ldlm_lock_slab, SLAB_KERNEL);
58         if (lock == NULL)
59                 return NULL;
60
61         memset(lock, 0, sizeof(*lock));
62         lock->l_resource = resource;
63         INIT_LIST_HEAD(&lock->l_children);
64         init_waitqueue_head(&lock->l_waitq);
65
66         if (parent != NULL) {
67                 lock->l_parent = parent;
68                 list_add(&lock->l_childof, &parent->l_children);
69         }
70
71         return lock;
72 }
73
74 /* Caller must do its own ldlm_resource_put() on lock->l_resource */
75 void ldlm_lock_free(struct ldlm_lock *lock)
76 {
77         if (!list_empty(&lock->l_children)) {
78                 CERROR("lock still has children!\n");
79                 LBUG();
80         }
81         kmem_cache_free(ldlm_lock_slab, lock);
82 }
83
84 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
85 {
86         ldlm_res2desc(lock->l_resource, &desc->l_resource);
87         desc->l_req_mode = lock->l_req_mode;
88         desc->l_granted_mode = lock->l_granted_mode;
89         memcpy(&desc->l_extent, &lock->l_extent, sizeof(desc->l_extent));
90         memcpy(desc->l_version, lock->l_version, sizeof(desc->l_version));
91 }
92
93 static int ldlm_lock_compat(struct ldlm_lock *lock)
94 {
95         struct list_head *tmp;
96         int rc = 0;
97
98         list_for_each(tmp, &lock->l_resource->lr_granted) {
99                 struct ldlm_lock *child;
100                 ldlm_res_compat compat;
101
102                 child = list_entry(tmp, struct ldlm_lock, l_res_link);
103
104                 compat = ldlm_res_compat_table[child->l_resource->lr_type];
105                 if (compat(child, lock) ||
106                     lockmode_compat(child->l_req_mode, lock->l_req_mode))
107                         continue;
108
109                 rc = 1;
110
111                 if (child->l_blocking_ast != NULL)
112                         child->l_blocking_ast(child, lock, child->l_data,
113                                               child->l_data_len);
114         }
115
116         return rc;
117 }
118
119 static void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock)
120 {
121         ldlm_resource_add_lock(res, &res->lr_granted, lock);
122         lock->l_granted_mode = lock->l_req_mode;
123
124         if (lock->l_granted_mode < res->lr_most_restr)
125                 res->lr_most_restr = lock->l_granted_mode;
126
127         if (lock->l_completion_ast)
128                 lock->l_completion_ast(lock, NULL,
129                                        lock->l_data, lock->l_data_len);
130 }
131
132 ldlm_error_t ldlm_local_lock_create(__u32 ns_id,
133                                     struct ldlm_handle *parent_lock_handle,
134                                     __u64 *res_id, __u32 type,
135                                     struct ldlm_handle *lockh)
136 {
137         struct ldlm_namespace *ns;
138         struct ldlm_resource *res, *parent_res = NULL;
139         struct ldlm_lock *lock, *parent_lock;
140
141         ns = ldlm_namespace_find(ns_id);
142         if (ns == NULL || ns->ns_hash == NULL) 
143                 RETURN(-ELDLM_BAD_NAMESPACE);
144
145         parent_lock = ldlm_handle2object(parent_lock_handle);
146         if (parent_lock)
147                 parent_res = parent_lock->l_resource;
148
149         res = ldlm_resource_get(ns, parent_res, res_id, type, 1);
150         if (res == NULL)
151                 RETURN(-ENOMEM);
152
153         lock = ldlm_lock_new(parent_lock, res);
154         if (lock == NULL)
155                 RETURN(-ENOMEM);
156
157         ldlm_object2handle(lock, lockh);
158
159         return ELDLM_OK;
160 }
161
162 /* XXX: Revisit the error handling; we do not, for example, do
163  * ldlm_resource_put()s in our error cases, and we probably leak any allocated
164  * memory. */
165 ldlm_error_t ldlm_local_lock_enqueue(struct ldlm_handle *lockh,
166                                      ldlm_mode_t mode,
167                                      struct ldlm_extent *req_ex,
168                                      int *flags,
169                                      ldlm_lock_callback completion,
170                                      ldlm_lock_callback blocking,
171                                      void *data,
172                                      __u32 data_len)
173 {
174         struct ldlm_lock *lock;
175         struct ldlm_extent new_ex;
176         int incompat = 0, rc;
177         ldlm_res_policy policy;
178         ENTRY;
179
180         lock = ldlm_handle2object(lockh);
181         if ((policy = ldlm_res_policy_table[lock->l_resource->lr_type])) {
182                 rc = policy(lock->l_resource, req_ex, &new_ex, mode, NULL);
183                 if (rc == ELDLM_LOCK_CHANGED) {
184                         *flags |= LDLM_FL_LOCK_CHANGED;
185                         memcpy(req_ex, &new_ex, sizeof(new_ex));
186                 }
187         }
188
189         if ((lock->l_resource->lr_type == LDLM_EXTENT && !req_ex) ||
190             (lock->l_resource->lr_type != LDLM_EXTENT && req_ex))
191                 LBUG();
192         if (req_ex)
193                 memcpy(&lock->l_extent, req_ex, sizeof(*req_ex));
194         lock->l_req_mode = mode;
195         lock->l_data = data;
196         lock->l_data_len = data_len;
197         lock->l_blocking_ast = blocking;
198         spin_lock(&lock->l_resource->lr_lock);
199
200         /* FIXME: We may want to optimize by checking lr_most_restr */
201
202         if (!list_empty(&lock->l_resource->lr_converting)) {
203                 ldlm_resource_add_lock(lock->l_resource,
204                                        lock->l_resource->lr_waiting.prev, lock);
205                 *flags |= LDLM_FL_BLOCK_CONV;
206                 GOTO(out, ELDLM_OK);
207         }
208         if (!list_empty(&lock->l_resource->lr_waiting)) {
209                 ldlm_resource_add_lock(lock->l_resource,
210                                        lock->l_resource->lr_waiting.prev, lock);
211                 *flags |= LDLM_FL_BLOCK_WAIT;
212                 GOTO(out, ELDLM_OK);
213         }
214
215         incompat = ldlm_lock_compat(lock);
216         if (incompat) {
217                 ldlm_resource_add_lock(lock->l_resource,
218                                        lock->l_resource->lr_waiting.prev, lock);
219                 *flags |= LDLM_FL_BLOCK_GRANTED;
220                 GOTO(out, ELDLM_OK);
221         }
222
223         ldlm_grant_lock(lock->l_resource, lock);
224         EXIT;
225  out:
226         /* Don't set 'completion_ast' until here so that if the lock is granted
227          * immediately we don't do an unnecessary completion call. */
228         lock->l_completion_ast = completion;
229         spin_unlock(&lock->l_resource->lr_lock);
230         return ELDLM_OK;
231 }
232
233 static int ldlm_reprocess_queue(struct ldlm_resource *res,
234                                 struct list_head *converting)
235 {
236         struct list_head *tmp, *pos;
237         int incompat = 0;
238
239         list_for_each_safe(tmp, pos, converting) { 
240                 struct ldlm_lock *pending;
241                 pending = list_entry(tmp, struct ldlm_lock, l_res_link);
242
243                 incompat = ldlm_lock_compat(pending);
244                 if (incompat)
245                         break;
246
247                 list_del(&pending->l_res_link); 
248                 ldlm_grant_lock(res, pending);
249         }
250
251         return incompat;
252 }
253
254 static void ldlm_reprocess_all(struct ldlm_resource *res)
255 {
256         ldlm_reprocess_queue(res, &res->lr_converting);
257         if (list_empty(&res->lr_converting))
258                 ldlm_reprocess_queue(res, &res->lr_waiting);
259 }
260
261 ldlm_error_t ldlm_local_lock_cancel(struct ldlm_handle *lockh)
262 {
263         struct ldlm_lock *lock;
264         struct ldlm_resource *res;
265         ENTRY;
266
267         lock = ldlm_handle2object(lockh);
268         res = lock->l_resource;
269
270         ldlm_resource_del_lock(lock);
271
272         ldlm_lock_free(lock);
273         if (ldlm_resource_put(res))
274                 RETURN(ELDLM_OK);
275         ldlm_reprocess_all(res);
276
277         RETURN(ELDLM_OK);
278 }
279
280 ldlm_error_t ldlm_local_lock_convert(struct ldlm_handle *lockh,
281                                      int new_mode, int *flags)
282 {
283         struct ldlm_lock *lock;
284         struct ldlm_resource *res;
285         ENTRY;
286
287         lock = ldlm_handle2object(lockh);
288         res = lock->l_resource;
289         list_del(&lock->l_res_link);
290         lock->l_req_mode = new_mode;
291
292         list_add(&lock->l_res_link, res->lr_converting.prev);
293
294         ldlm_reprocess_all(res);
295
296         RETURN(ELDLM_OK);
297 }
298
299 void ldlm_lock_dump(struct ldlm_lock *lock)
300 {
301         char ver[128];
302
303         if (RES_VERSION_SIZE != 4)
304                 LBUG();
305
306         snprintf(ver, sizeof(ver), "%x %x %x %x",
307                  lock->l_version[0], lock->l_version[1],
308                  lock->l_version[2], lock->l_version[3]);
309
310         CDEBUG(D_OTHER, "  -- Lock dump: %p (%s)\n", lock, ver);
311         CDEBUG(D_OTHER, "  Parent: %p\n", lock->l_parent);
312         CDEBUG(D_OTHER, "  Resource: %p\n", lock->l_resource);
313         CDEBUG(D_OTHER, "  Requested mode: %d, granted mode: %d\n",
314                (int)lock->l_req_mode, (int)lock->l_granted_mode);
315         if (lock->l_resource->lr_type == LDLM_EXTENT)
316                 CDEBUG(D_OTHER, "  Extent: %Lu -> %Lu\n",
317                        (unsigned long long)lock->l_extent.start,
318                        (unsigned long long)lock->l_extent.end);
319 }