Whamcloud - gitweb
Landing the ldlm_testing branch; now the only difference is that the locking
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  * authors, Peter Braam <braam@clusterfs.com> & 
11  * Phil Schwan <phil@clusterfs.com>
12  */
13
14 #define EXPORT_SYMTAB
15 #define DEBUG_SUBSYSTEM S_LDLM
16
17 #include <linux/slab.h>
18 #include <linux/lustre_dlm.h>
19
20 extern kmem_cache_t *ldlm_lock_slab;
21
22 static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b);
23 static int ldlm_intent_compat(struct ldlm_lock *a, struct ldlm_lock *b);
24
25 ldlm_res_compat ldlm_res_compat_table [] = {
26         [LDLM_PLAIN] ldlm_plain_compat,
27         [LDLM_EXTENT] ldlm_extent_compat,
28         [LDLM_MDSINTENT] ldlm_intent_compat
29 };
30
31 ldlm_res_policy ldlm_res_policy_table [] = {
32         [LDLM_PLAIN] NULL,
33         [LDLM_EXTENT] ldlm_extent_policy,
34         [LDLM_MDSINTENT] NULL
35 };
36
37 static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b)
38 {
39         return lockmode_compat(a->l_req_mode, b->l_req_mode);
40 }
41
42 static int ldlm_intent_compat(struct ldlm_lock *a, struct ldlm_lock *b)
43 {
44         LBUG();
45         return 0;
46 }
47
48 /* Args: referenced, unlocked parent (or NULL)
49  *       referenced, unlocked resource
50  * Locks: parent->l_lock */
51 static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
52                                        struct ldlm_resource *resource)
53 {
54         struct ldlm_lock *lock;
55
56         if (resource == NULL)
57                 LBUG();
58
59         lock = kmem_cache_alloc(ldlm_lock_slab, SLAB_KERNEL);
60         if (lock == NULL)
61                 return NULL;
62
63         memset(lock, 0, sizeof(*lock));
64         lock->l_resource = resource;
65         INIT_LIST_HEAD(&lock->l_children);
66         INIT_LIST_HEAD(&lock->l_res_link);
67         init_waitqueue_head(&lock->l_waitq);
68         lock->l_lock = SPIN_LOCK_UNLOCKED;
69
70         if (parent != NULL) {
71                 spin_lock(&parent->l_lock);
72                 lock->l_parent = parent;
73                 list_add(&lock->l_childof, &parent->l_children);
74                 spin_unlock(&parent->l_lock);
75         }
76
77         return lock;
78 }
79
80 /* Args: unreferenced, locked lock
81  *
82  * Caller must do its own ldlm_resource_put() on lock->l_resource */
83 void ldlm_lock_free(struct ldlm_lock *lock)
84 {
85         if (!list_empty(&lock->l_children)) {
86                 CERROR("lock %p still has children (%p)!\n", lock,
87                        lock->l_children.next);
88                 ldlm_lock_dump(lock);
89                 LBUG();
90         }
91
92         if (lock->l_readers || lock->l_writers)
93                 CDEBUG(D_INFO, "lock still has references (%d readers, %d "
94                        "writers)\n", lock->l_readers, lock->l_writers);
95
96         if (lock->l_connection)
97                 ptlrpc_put_connection(lock->l_connection);
98         kmem_cache_free(ldlm_lock_slab, lock);
99 }
100
101 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
102 {
103         ldlm_res2desc(lock->l_resource, &desc->l_resource);
104         desc->l_req_mode = lock->l_req_mode;
105         desc->l_granted_mode = lock->l_granted_mode;
106         memcpy(&desc->l_extent, &lock->l_extent, sizeof(desc->l_extent));
107         memcpy(desc->l_version, lock->l_version, sizeof(desc->l_version));
108 }
109
110 /* Args: unlocked lock */
111 void ldlm_lock_addref(struct ldlm_lock *lock, __u32 mode)
112 {
113         spin_lock(&lock->l_lock);
114         if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
115                 lock->l_readers++;
116         else
117                 lock->l_writers++;
118         spin_unlock(&lock->l_lock);
119 }
120
121 /* Args: unlocked lock */
122 void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode)
123 {
124         int rc;
125
126         spin_lock(&lock->l_lock);
127         if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
128                 lock->l_readers--;
129         else
130                 lock->l_writers--;
131         if (!lock->l_readers && !lock->l_writers &&
132             lock->l_flags & LDLM_FL_DYING) {
133                 /* Read this lock its rights. */
134                 if (!lock->l_resource->lr_namespace->ns_local) {
135                         CERROR("LDLM_FL_DYING set on non-local lock!\n");
136                         LBUG();
137                 }
138
139                 CDEBUG(D_INFO, "final decref done on dying lock, "
140                        "cancelling.\n");
141                 spin_unlock(&lock->l_lock);
142                 rc = ldlm_cli_cancel(lock->l_client, lock);
143                 if (rc) {
144                         /* FIXME: do something more dramatic */
145                         CERROR("ldlm_cli_cancel: %d\n", rc);
146                 }
147         } else
148                 spin_unlock(&lock->l_lock);
149 }
150
151 /* Args: locked lock */
152 static int _ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs,
153                              struct list_head *queue)
154 {
155         struct list_head *tmp, *pos;
156         int rc = 0;
157
158         list_for_each_safe(tmp, pos, queue) {
159                 struct ldlm_lock *child;
160                 ldlm_res_compat compat;
161
162                 child = list_entry(tmp, struct ldlm_lock, l_res_link);
163                 if (lock == child)
164                         continue;
165
166                 compat = ldlm_res_compat_table[child->l_resource->lr_type];
167                 if (compat(child, lock)) {
168                         CDEBUG(D_OTHER, "compat function succeded, next.\n");
169                         continue;
170                 }
171                 if (lockmode_compat(child->l_granted_mode, lock->l_req_mode)) {
172                         CDEBUG(D_OTHER, "lock modes are compatible, next.\n");
173                         continue;
174                 }
175
176                 rc = 1;
177
178                 CDEBUG(D_OTHER, "compat function failed and lock modes are "
179                        "incompatible; sending blocking AST.\n");
180                 if (send_cbs && child->l_blocking_ast != NULL)
181                         child->l_blocking_ast(child, lock, child->l_data,
182                                               child->l_data_len);
183         }
184
185         return rc;
186 }
187
188 /* Args: unlocked lock */
189 static int ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs)
190 {
191         int rc;
192         ENTRY;
193
194         rc = _ldlm_lock_compat(lock, send_cbs, &lock->l_resource->lr_granted);
195         rc |= _ldlm_lock_compat(lock, send_cbs,
196                                 &lock->l_resource->lr_converting);
197
198         RETURN(rc);
199 }
200
201 /* Args: locked lock, locked resource */
202 void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock)
203 {
204         ENTRY;
205
206         ldlm_resource_add_lock(res, &res->lr_granted, lock);
207         lock->l_granted_mode = lock->l_req_mode;
208
209         if (lock->l_granted_mode < res->lr_most_restr)
210                 res->lr_most_restr = lock->l_granted_mode;
211
212         if (lock->l_completion_ast)
213                 lock->l_completion_ast(lock, NULL,
214                                        lock->l_data, lock->l_data_len);
215         EXIT;
216 }
217
218 static int search_queue(struct list_head *queue, ldlm_mode_t mode,
219                         struct ldlm_extent *extent, struct ldlm_handle *lockh)
220 {
221         struct list_head *tmp;
222
223         list_for_each(tmp, queue) { 
224                 struct ldlm_lock *lock;
225                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
226
227                 if (lock->l_flags & LDLM_FL_DYING)
228                         continue;
229
230                 /* lock_convert() takes the resource lock, so we're sure that
231                  * req_mode, lr_type, and l_extent won't change beneath us */
232                 if (lock->l_req_mode != mode)
233                         continue;
234
235                 if (lock->l_resource->lr_type == LDLM_EXTENT &&
236                     (lock->l_extent.start > extent->start ||
237                      lock->l_extent.end < extent->end))
238                         continue;
239
240                 ldlm_lock_addref(lock, mode);
241                 ldlm_object2handle(lock, lockh);
242                 return 1;
243         }
244
245         return 0;
246 }
247
248 /* Must be called with no resource or lock locks held.
249  *
250  * Returns 1 if it finds an already-existing lock that is compatible; in this
251  * case, lockh is filled in with a addref()ed lock */
252 int ldlm_local_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type,
253                           struct ldlm_extent *extent, ldlm_mode_t mode,
254                           struct ldlm_handle *lockh)
255 {
256         struct ldlm_resource *res;
257         int rc = 0;
258         ENTRY;
259
260         res = ldlm_resource_get(ns, NULL, res_id, type, 0);
261         if (res == NULL)
262                 RETURN(0);
263
264         spin_lock(&res->lr_lock);
265         if (search_queue(&res->lr_granted, mode, extent, lockh))
266                 GOTO(out, rc = 1);
267         if (search_queue(&res->lr_converting, mode, extent, lockh))
268                 GOTO(out, rc = 1);
269         if (search_queue(&res->lr_waiting, mode, extent, lockh))
270                 GOTO(out, rc = 1);
271
272         EXIT;
273  out:
274         ldlm_resource_put(res);
275         spin_unlock(&res->lr_lock);
276         return rc;
277 }
278
279 /* Must be called without the resource lock held. */
280 ldlm_error_t ldlm_local_lock_create(struct ldlm_namespace *ns,
281                                     struct ldlm_handle *parent_lock_handle,
282                                     __u64 *res_id, __u32 type,
283                                      ldlm_mode_t mode,
284                                     void *data,
285                                     __u32 data_len,
286                                     struct ldlm_handle *lockh)
287 {
288         struct ldlm_resource *res, *parent_res = NULL;
289         struct ldlm_lock *lock, *parent_lock;
290
291         parent_lock = ldlm_handle2object(parent_lock_handle);
292         if (parent_lock)
293                 parent_res = parent_lock->l_resource;
294
295         res = ldlm_resource_get(ns, parent_res, res_id, type, 1);
296         if (res == NULL)
297                 RETURN(-ENOMEM);
298
299         lock = ldlm_lock_new(parent_lock, res);
300         if (lock == NULL) {
301                 spin_lock(&res->lr_lock);
302                 ldlm_resource_put(res);
303                 spin_unlock(&res->lr_lock);
304                 RETURN(-ENOMEM);
305         }
306
307         lock->l_req_mode = mode;
308         lock->l_data = data;
309         lock->l_data_len = data_len;
310         ldlm_lock_addref(lock, mode);
311
312         ldlm_object2handle(lock, lockh);
313         return ELDLM_OK;
314 }
315
316 /* Must be called with lock->l_lock and lock->l_resource->lr_lock not held */
317 ldlm_error_t ldlm_local_lock_enqueue(struct ldlm_handle *lockh,
318                                      struct ldlm_extent *req_ex,
319                                      int *flags,
320                                      ldlm_lock_callback completion,
321                                      ldlm_lock_callback blocking)
322 {
323         struct ldlm_resource *res;
324         struct ldlm_lock *lock;
325         int incompat = 0, local;
326         ldlm_res_policy policy;
327         ENTRY;
328
329         lock = ldlm_handle2object(lockh);
330         res = lock->l_resource;
331         local = res->lr_namespace->ns_local;
332         spin_lock(&res->lr_lock);
333
334         lock->l_blocking_ast = blocking;
335
336         if ((res->lr_type == LDLM_EXTENT && !req_ex) ||
337             (res->lr_type != LDLM_EXTENT && req_ex))
338                 LBUG();
339
340         if ((policy = ldlm_res_policy_table[res->lr_type])) {
341                 struct ldlm_extent new_ex;
342                 int rc = policy(res, req_ex, &new_ex, lock->l_req_mode, NULL);
343                 if (rc == ELDLM_LOCK_CHANGED) {
344                         *flags |= LDLM_FL_LOCK_CHANGED;
345                         memcpy(req_ex, &new_ex, sizeof(new_ex));
346                 }
347         }
348
349         if (req_ex)
350                 memcpy(&lock->l_extent, req_ex, sizeof(*req_ex));
351
352         if (local && lock->l_req_mode == lock->l_granted_mode) {
353                 /* The server returned a blocked lock, but it was granted before
354                  * we got a chance to actually enqueue it.  We don't need to do
355                  * anything else. */
356                 GOTO(out, ELDLM_OK);
357         }
358
359         /* If this is a local resource, put it on the appropriate list. */
360         if (local) {
361                 if (*flags & LDLM_FL_BLOCK_CONV)
362                         ldlm_resource_add_lock(res, res->lr_converting.prev,
363                                                lock);
364                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
365                         ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
366                 else
367                         ldlm_grant_lock(res, lock);
368                 GOTO(out, ELDLM_OK);
369         }
370
371         /* FIXME: We may want to optimize by checking lr_most_restr */
372         if (!list_empty(&res->lr_converting)) {
373                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
374                 *flags |= LDLM_FL_BLOCK_CONV;
375                 GOTO(out, ELDLM_OK);
376         }
377         if (!list_empty(&res->lr_waiting)) {
378                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
379                 *flags |= LDLM_FL_BLOCK_WAIT;
380                 GOTO(out, ELDLM_OK);
381         }
382         incompat = ldlm_lock_compat(lock, 0);
383         if (incompat) {
384                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
385                 *flags |= LDLM_FL_BLOCK_GRANTED;
386                 GOTO(out, ELDLM_OK);
387         }
388
389         ldlm_grant_lock(res, lock);
390         EXIT;
391  out:
392         /* Don't set 'completion_ast' until here so that if the lock is granted
393          * immediately we don't do an unnecessary completion call. */
394         lock->l_completion_ast = completion;
395         spin_unlock(&res->lr_lock);
396         return ELDLM_OK;
397 }
398
399 /* Must be called with resource->lr_lock taken. */
400 static int ldlm_reprocess_queue(struct ldlm_resource *res,
401                                 struct list_head *converting)
402 {
403         struct list_head *tmp, *pos;
404         ENTRY;
405
406         list_for_each_safe(tmp, pos, converting) { 
407                 struct ldlm_lock *pending;
408                 pending = list_entry(tmp, struct ldlm_lock, l_res_link);
409
410                 /* the resource lock protects ldlm_lock_compat */
411                 if (ldlm_lock_compat(pending, 1))
412                         RETURN(1);
413
414                 list_del_init(&pending->l_res_link); 
415                 ldlm_grant_lock(res, pending);
416
417                 ldlm_lock_addref(pending, pending->l_req_mode);
418                 ldlm_lock_decref(pending, pending->l_granted_mode);
419         }
420
421         RETURN(0);
422 }
423
424 /* Must be called with resource->lr_lock not taken. */
425 void ldlm_reprocess_all(struct ldlm_resource *res)
426 {
427         /* Local lock trees don't get reprocessed. */
428         if (res->lr_namespace->ns_local)
429                 return;
430
431         spin_lock(&res->lr_lock);
432         ldlm_reprocess_queue(res, &res->lr_converting);
433         if (list_empty(&res->lr_converting))
434                 ldlm_reprocess_queue(res, &res->lr_waiting);
435         spin_unlock(&res->lr_lock);
436 }
437
438 /* Must be called with lock and lock->l_resource unlocked */
439 struct ldlm_resource *ldlm_local_lock_cancel(struct ldlm_lock *lock)
440 {
441         struct ldlm_resource *res;
442         ENTRY;
443
444         res = lock->l_resource;
445
446         spin_lock(&res->lr_lock);
447         spin_lock(&lock->l_lock);
448
449         if (lock->l_readers || lock->l_writers)
450                 CDEBUG(D_INFO, "lock still has references (%d readers, %d "
451                        "writers)\n", lock->l_readers, lock->l_writers);
452
453         ldlm_resource_del_lock(lock);
454         if (ldlm_resource_put(res))
455                 res = NULL; /* res was freed, nothing else to do. */
456         else
457                 spin_unlock(&res->lr_lock);
458         ldlm_lock_free(lock);
459
460         RETURN(res);
461 }
462
463 /* Must be called with lock and lock->l_resource unlocked */
464 struct ldlm_resource *ldlm_local_lock_convert(struct ldlm_handle *lockh,
465                                               int new_mode, int *flags)
466 {
467         struct ldlm_lock *lock;
468         struct ldlm_resource *res;
469         ENTRY;
470
471         lock = ldlm_handle2object(lockh);
472         res = lock->l_resource;
473
474         spin_lock(&res->lr_lock);
475
476         lock->l_req_mode = new_mode;
477         list_del_init(&lock->l_res_link);
478
479         /* If this is a local resource, put it on the appropriate list. */
480         if (res->lr_namespace->ns_local) {
481                 if (*flags & LDLM_FL_BLOCK_CONV)
482                         ldlm_resource_add_lock(res, res->lr_converting.prev,
483                                                lock);
484                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
485                         ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
486                 else
487                         ldlm_grant_lock(res, lock);
488         } else {
489                 list_add(&lock->l_res_link, res->lr_converting.prev);
490         }
491
492         spin_unlock(&res->lr_lock);
493
494         RETURN(res);
495 }
496
497 void ldlm_lock_dump(struct ldlm_lock *lock)
498 {
499         char ver[128];
500
501         if (!(portal_debug & D_OTHER))
502                 return;
503
504         if (RES_VERSION_SIZE != 4)
505                 LBUG();
506
507         snprintf(ver, sizeof(ver), "%x %x %x %x",
508                  lock->l_version[0], lock->l_version[1],
509                  lock->l_version[2], lock->l_version[3]);
510
511         CDEBUG(D_OTHER, "  -- Lock dump: %p (%s)\n", lock, ver);
512         CDEBUG(D_OTHER, "  Parent: %p\n", lock->l_parent);
513         CDEBUG(D_OTHER, "  Resource: %p\n", lock->l_resource);
514         CDEBUG(D_OTHER, "  Requested mode: %d, granted mode: %d\n",
515                (int)lock->l_req_mode, (int)lock->l_granted_mode);
516         CDEBUG(D_OTHER, "  Readers: %u ; Writers; %u\n",
517                lock->l_readers, lock->l_writers);
518         if (lock->l_resource->lr_type == LDLM_EXTENT)
519                 CDEBUG(D_OTHER, "  Extent: %Lu -> %Lu\n",
520                        (unsigned long long)lock->l_extent.start,
521                        (unsigned long long)lock->l_extent.end);
522 }