Whamcloud - gitweb
- Fixed a DLM deadlock bug
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  * authors, Peter Braam <braam@clusterfs.com> &
11  * Phil Schwan <phil@clusterfs.com>
12  */
13
14 #define DEBUG_SUBSYSTEM S_LDLM
15
16 #include <linux/slab.h>
17 #include <linux/module.h>
18 #include <linux/lustre_dlm.h>
19 #include <linux/lustre_mds.h>
20
21 extern kmem_cache_t *ldlm_lock_slab;
22 int (*mds_reint_p)(int offset, struct ptlrpc_request *req) = NULL;
23 int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req) = NULL;
24
25 static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b);
26 static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
27                               ldlm_mode_t mode, void *data);
28
29 ldlm_res_compat ldlm_res_compat_table [] = {
30         [LDLM_PLAIN] ldlm_plain_compat,
31         [LDLM_EXTENT] ldlm_extent_compat,
32         [LDLM_MDSINTENT] ldlm_plain_compat
33 };
34
35 ldlm_res_policy ldlm_res_policy_table [] = {
36         [LDLM_PLAIN] NULL,
37         [LDLM_EXTENT] ldlm_extent_policy,
38         [LDLM_MDSINTENT] ldlm_intent_policy
39 };
40
41 static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
42                               ldlm_mode_t mode, void *data)
43 {
44         struct ptlrpc_request *req = req_cookie;
45         int rc = 0;
46         ENTRY;
47
48         if (!req_cookie)
49                 RETURN(0);
50
51         if (req->rq_reqmsg->bufcount > 1) {
52                 /* an intent needs to be considered */
53                 struct ldlm_intent *it = lustre_msg_buf(req->rq_reqmsg, 1);
54                 struct mds_body *mds_rep;
55                 struct ldlm_reply *rep;
56                 struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
57                 __u32 type = lock->l_resource->lr_type;
58                 __u64 new_resid[3] = {0, 0, 0}, old_res;
59                 int bufcount, rc, size[3] = {sizeof(struct ldlm_reply),
60                                              sizeof(struct mds_body),
61                                              sizeof(struct obdo)};
62
63                 it->opc = NTOH__u64(it->opc);
64
65                 LDLM_DEBUG(lock, "intent policy, opc: %Ld", it->opc);
66
67                 /* prepare reply */
68                 switch(it->opc) {
69                 case IT_GETATTR:
70                         /* Note that in the negative case you may be returning
71                          * a file and its obdo */
72                 case IT_CREAT:
73                 case IT_CREAT|IT_OPEN:
74                 case IT_MKDIR:
75                 case IT_SYMLINK:
76                 case IT_MKNOD:
77                 case IT_LINK:
78                 case IT_OPEN:
79                 case IT_RENAME:
80                         bufcount = 3;
81                         break;
82                 case IT_UNLINK:
83                         bufcount = 2;
84                         size[1] = sizeof(struct obdo); 
85                         break;
86                 default:
87                         LBUG();
88                 }
89
90                 rc = lustre_pack_msg(bufcount, size, NULL, &req->rq_replen,
91                                      &req->rq_repmsg);
92                 if (rc) {
93                         rc = req->rq_status = -ENOMEM;
94                         RETURN(rc);
95                 }
96
97                 rep = lustre_msg_buf(req->rq_repmsg, 0);
98                 rep->lock_policy_res1 = 1;
99
100                 /* execute policy */
101                 switch ( it->opc ) {
102                 case IT_CREAT:
103                 case IT_CREAT|IT_OPEN:
104                 case IT_MKDIR:
105                 case IT_SETATTR:
106                 case IT_SYMLINK:
107                 case IT_MKNOD:
108                 case IT_LINK:
109                 case IT_UNLINK:
110                 case IT_RENAME2:
111                         if (mds_reint_p == NULL)
112                                 mds_reint_p =
113                                         inter_module_get_request
114                                         ("mds_reint", "mds");
115                         if (IS_ERR(mds_reint_p)) {
116                                 CERROR("MDSINTENT locks require the MDS "
117                                        "module.\n");
118                                 LBUG();
119                                 RETURN(-EINVAL);
120                         }
121                         rc = mds_reint_p(2, req);
122                         if (rc)
123                                 LBUG();
124                         break;
125                 case IT_GETATTR:
126                 case IT_READDIR:
127                 case IT_RENAME:
128                 case IT_OPEN:
129                         if (mds_getattr_name_p == NULL)
130                                 mds_getattr_name_p =
131                                         inter_module_get_request
132                                         ("mds_getattr_name", "mds");
133                         if (IS_ERR(mds_getattr_name_p)) {
134                                 CERROR("MDSINTENT locks require the MDS "
135                                        "module.\n");
136                                 LBUG();
137                                 RETURN(-EINVAL);
138                         }
139                         rc = mds_getattr_name_p(2, req);
140                         if (rc) {
141                                 req->rq_status = rc;
142                                 RETURN(rc);
143                         }
144                         break;
145                 case IT_READDIR|IT_OPEN:
146                         LBUG();
147                         break;
148                 default:
149                         CERROR("Unhandled intent\n");
150                         LBUG();
151                 }
152
153                 if (it->opc == IT_UNLINK || it->opc == IT_RMDIR)
154                         RETURN(ELDLM_LOCK_ABORTED);
155
156                 mds_rep = lustre_msg_buf(req->rq_repmsg, 1);
157                 rep->lock_policy_res2 = req->rq_status;
158                 new_resid[0] = mds_rep->ino;
159                 old_res = lock->l_resource->lr_name[0];
160
161                 CDEBUG(D_INFO, "remote intent: locking %d instead of"
162                        "%ld\n", mds_rep->ino, (long)old_res);
163                 ldlm_resource_put(lock->l_resource);
164
165                 lock->l_resource =
166                         ldlm_resource_get(ns, NULL, new_resid, type, 1);
167                 if (lock->l_resource == NULL) {
168                         LBUG();
169                         RETURN(-ENOMEM);
170                 }
171                 LDLM_DEBUG(lock, "intent policy, old res %ld",
172                            (long)old_res);
173                 RETURN(ELDLM_LOCK_CHANGED);
174         } else {
175                 int size = sizeof(struct ldlm_reply);
176                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
177                                      &req->rq_repmsg);
178                 if (rc) {
179                         CERROR("out of memory\n");
180                         LBUG();
181                         RETURN(-ENOMEM);
182                 }
183         }
184         RETURN(rc);
185 }
186
187 static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b)
188 {
189         return lockmode_compat(a->l_req_mode, b->l_req_mode);
190 }
191
192 /* Args: referenced, unlocked parent (or NULL)
193  *       referenced, unlocked resource
194  * Locks: parent->l_lock */
195 static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
196                                        struct ldlm_resource *resource)
197 {
198         struct ldlm_lock *lock;
199
200         if (resource == NULL)
201                 LBUG();
202
203         lock = kmem_cache_alloc(ldlm_lock_slab, SLAB_KERNEL);
204         if (lock == NULL)
205                 return NULL;
206
207         memset(lock, 0, sizeof(*lock));
208         lock->l_resource = resource;
209         INIT_LIST_HEAD(&lock->l_children);
210         INIT_LIST_HEAD(&lock->l_res_link);
211         init_waitqueue_head(&lock->l_waitq);
212         lock->l_lock = SPIN_LOCK_UNLOCKED;
213
214         if (parent != NULL) {
215                 spin_lock(&parent->l_lock);
216                 lock->l_parent = parent;
217                 list_add(&lock->l_childof, &parent->l_children);
218                 spin_unlock(&parent->l_lock);
219         }
220
221         return lock;
222 }
223
224 /* Args: unreferenced, locked lock
225  *
226  * Caller must do its own ldlm_resource_put() on lock->l_resource */
227 void ldlm_lock_free(struct ldlm_lock *lock)
228 {
229         if (!list_empty(&lock->l_children)) {
230                 CERROR("lock %p still has children (%p)!\n", lock,
231                        lock->l_children.next);
232                 ldlm_lock_dump(lock);
233                 LBUG();
234         }
235
236         if (lock->l_readers || lock->l_writers)
237                 CDEBUG(D_INFO, "lock still has references (%d readers, %d "
238                        "writers)\n", lock->l_readers, lock->l_writers);
239
240         if (lock->l_connection)
241                 ptlrpc_put_connection(lock->l_connection);
242         kmem_cache_free(ldlm_lock_slab, lock);
243 }
244
245 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
246 {
247         ldlm_res2desc(lock->l_resource, &desc->l_resource);
248         desc->l_req_mode = lock->l_req_mode;
249         desc->l_granted_mode = lock->l_granted_mode;
250         memcpy(&desc->l_extent, &lock->l_extent, sizeof(desc->l_extent));
251         memcpy(desc->l_version, lock->l_version, sizeof(desc->l_version));
252 }
253
254 /* Args: unlocked lock */
255 void ldlm_lock_addref(struct ldlm_lock *lock, __u32 mode)
256 {
257         spin_lock(&lock->l_lock);
258         if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
259                 lock->l_readers++;
260         else
261                 lock->l_writers++;
262         spin_unlock(&lock->l_lock);
263 }
264
265 int ldlm_send_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock *new)
266 {
267         struct ptlrpc_request *req = NULL;
268         ENTRY;
269
270         spin_lock(&lock->l_lock);
271         if (lock->l_flags & LDLM_FL_AST_SENT) {
272                 RETURN(0);
273         }
274
275         lock->l_flags |= LDLM_FL_AST_SENT;
276
277         lock->l_blocking_ast(lock, new, lock->l_data, lock->l_data_len, &req);
278         spin_unlock(&lock->l_lock);
279         if (req != NULL) {
280                 struct list_head *list = lock->l_resource->lr_tmp;
281                 list_add(&req->rq_multi, list);
282         }
283         RETURN(1);
284 }
285
286 /* Args: unlocked lock */
287 void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode)
288 {
289         ENTRY;
290
291         if (lock == NULL)
292                 LBUG();
293
294         spin_lock(&lock->l_lock);
295         if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
296                 lock->l_readers--;
297         else
298                 lock->l_writers--;
299         if (!lock->l_readers && !lock->l_writers &&
300             lock->l_flags & LDLM_FL_DYING) {
301                 /* Read this lock its rights. */
302                 if (!lock->l_resource->lr_namespace->ns_client) {
303                         CERROR("LDLM_FL_DYING set on non-local lock!\n");
304                         LBUG();
305                 }
306
307                 CDEBUG(D_INFO, "final decref done on dying lock, "
308                        "calling callback.\n");
309                 spin_unlock(&lock->l_lock);
310                 /* This function pointer is unfortunately overloaded.  This
311                  * call will not result in an RPC. */
312                 lock->l_blocking_ast(lock, NULL, lock->l_data,
313                                      lock->l_data_len, NULL);
314         } else
315                 spin_unlock(&lock->l_lock);
316         EXIT;
317 }
318
319 /* Args: unlocked lock */
320 static int _ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs,
321                              struct list_head *queue)
322 {
323         struct list_head *tmp, *pos;
324         int rc = 0;
325
326         list_for_each_safe(tmp, pos, queue) {
327                 struct ldlm_lock *child;
328                 ldlm_res_compat compat;
329
330                 child = list_entry(tmp, struct ldlm_lock, l_res_link);
331                 if (lock == child)
332                         continue;
333
334                 compat = ldlm_res_compat_table[child->l_resource->lr_type];
335                 if (compat(child, lock)) {
336                         CDEBUG(D_OTHER, "compat function succeded, next.\n");
337                         continue;
338                 }
339                 if (lockmode_compat(child->l_granted_mode, lock->l_req_mode)) {
340                         CDEBUG(D_OTHER, "lock modes are compatible, next.\n");
341                         continue;
342                 }
343
344                 rc = 1;
345
346                 CDEBUG(D_OTHER, "compat function failed and lock modes incompat\n");
347                 if (send_cbs && child->l_blocking_ast != NULL) {
348                         CDEBUG(D_OTHER, "incompatible; sending blocking AST.\n");
349                         /* It's very difficult to actually send the AST from
350                          * here, because we'd have to drop the lock before going
351                          * to sleep to wait for the reply.  Instead we build the
352                          * packet and send it later. */
353                         ldlm_send_blocking_ast(child, lock);
354                 }
355         }
356
357         return rc;
358 }
359
360 /* Args: unlocked lock */
361 static int ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs)
362 {
363         int rc;
364         ENTRY;
365
366         rc = _ldlm_lock_compat(lock, send_cbs, &lock->l_resource->lr_granted);
367         /* FIXME: should we be sending ASTs to converting? */
368         rc |= _ldlm_lock_compat(lock, send_cbs,
369                                 &lock->l_resource->lr_converting);
370
371         RETURN(rc);
372 }
373
374 /* Args: locked lock, locked resource */
375 void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock)
376 {
377         ENTRY;
378
379         ldlm_resource_add_lock(res, &res->lr_granted, lock);
380         lock->l_granted_mode = lock->l_req_mode;
381
382         if (lock->l_granted_mode < res->lr_most_restr)
383                 res->lr_most_restr = lock->l_granted_mode;
384
385         if (lock->l_completion_ast)
386                 lock->l_completion_ast(lock, NULL, lock->l_data,
387                                        lock->l_data_len, NULL);
388         EXIT;
389 }
390
391 static int search_queue(struct list_head *queue, ldlm_mode_t mode,
392                         struct ldlm_extent *extent, struct lustre_handle *lockh)
393 {
394         struct list_head *tmp;
395
396         list_for_each(tmp, queue) {
397                 struct ldlm_lock *lock;
398                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
399
400                 if (lock->l_flags & LDLM_FL_DYING)
401                         continue;
402
403                 /* lock_convert() takes the resource lock, so we're sure that
404                  * req_mode, lr_type, and l_cookie won't change beneath us */
405                 if (lock->l_req_mode != mode)
406                         continue;
407
408                 if (lock->l_resource->lr_type == LDLM_EXTENT &&
409                     (lock->l_extent.start > extent->start ||
410                      lock->l_extent.end < extent->end))
411                         continue;
412
413                 ldlm_lock_addref(lock, mode);
414                 ldlm_object2handle(lock, lockh);
415                 return 1;
416         }
417
418         return 0;
419 }
420
421 /* Must be called with no resource or lock locks held.
422  *
423  * Returns 1 if it finds an already-existing lock that is compatible; in this
424  * case, lockh is filled in with a addref()ed lock */
425 int ldlm_local_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type,
426                           void *cookie, int cookielen, ldlm_mode_t mode,
427                           struct lustre_handle *lockh)
428 {
429         struct ldlm_resource *res;
430         int rc = 0;
431         ENTRY;
432
433         res = ldlm_resource_get(ns, NULL, res_id, type, 0);
434         if (res == NULL)
435                 RETURN(0);
436
437         spin_lock(&res->lr_lock);
438         if (search_queue(&res->lr_granted, mode, cookie, lockh))
439                 GOTO(out, rc = 1);
440         if (search_queue(&res->lr_converting, mode, cookie, lockh))
441                 GOTO(out, rc = 1);
442         if (search_queue(&res->lr_waiting, mode, cookie, lockh))
443                 GOTO(out, rc = 1);
444
445         EXIT;
446  out:
447         ldlm_resource_put(res);
448         spin_unlock(&res->lr_lock);
449         return rc;
450 }
451
452 /* Must be called without the resource lock held.  Returns a referenced,
453  * unlocked ldlm_lock. */
454 ldlm_error_t ldlm_local_lock_create(struct ldlm_namespace *ns,
455                                     struct lustre_handle *parent_lock_handle,
456                                     __u64 *res_id, __u32 type,
457                                      ldlm_mode_t mode,
458                                     void *data,
459                                     __u32 data_len,
460                                     struct lustre_handle *lockh)
461 {
462         struct ldlm_resource *res, *parent_res = NULL;
463         struct ldlm_lock *lock, *parent_lock;
464
465         parent_lock = lustre_handle2object(parent_lock_handle);
466         if (parent_lock)
467                 parent_res = parent_lock->l_resource;
468
469         res = ldlm_resource_get(ns, parent_res, res_id, type, 1);
470         if (res == NULL)
471                 RETURN(-ENOMEM);
472
473         lock = ldlm_lock_new(parent_lock, res);
474         if (lock == NULL) {
475                 spin_lock(&res->lr_lock);
476                 ldlm_resource_put(res);
477                 spin_unlock(&res->lr_lock);
478                 RETURN(-ENOMEM);
479         }
480
481         lock->l_req_mode = mode;
482         lock->l_data = data;
483         lock->l_data_len = data_len;
484         ldlm_lock_addref(lock, mode);
485
486         ldlm_object2handle(lock, lockh);
487         return ELDLM_OK;
488 }
489
490 /* Must be called with lock->l_lock and lock->l_resource->lr_lock not held */
491 ldlm_error_t ldlm_local_lock_enqueue(struct lustre_handle *lockh,
492                                      void *cookie, int cookie_len,
493                                      int *flags,
494                                      ldlm_lock_callback completion,
495                                      ldlm_lock_callback blocking)
496 {
497         struct ldlm_resource *res;
498         struct ldlm_lock *lock;
499         int incompat = 0, local;
500         ldlm_res_policy policy;
501         ENTRY;
502
503         lock = lustre_handle2object(lockh);
504         res = lock->l_resource;
505         local = res->lr_namespace->ns_client;
506         spin_lock(&res->lr_lock);
507
508         lock->l_blocking_ast = blocking;
509
510         if (res->lr_type == LDLM_EXTENT)
511                 memcpy(&lock->l_extent, cookie, sizeof(lock->l_extent));
512
513         /* policies are not executed on the client */
514         if (!local && (policy = ldlm_res_policy_table[res->lr_type])) {
515                 int rc;
516
517                 /* We do this dancing with refcounts and locks because the
518                  * policy function could send an RPC */
519                 res->lr_refcount++;
520                 spin_unlock(&res->lr_lock);
521
522                 rc = policy(lock, cookie, lock->l_req_mode, NULL);
523
524                 spin_lock(&res->lr_lock);
525                 ldlm_resource_put(res);
526
527                 if (rc == ELDLM_LOCK_CHANGED) {
528                         res = lock->l_resource;
529                         *flags |= LDLM_FL_LOCK_CHANGED;
530                 } else if (rc == ELDLM_LOCK_ABORTED) {
531                         /* Abort. */
532                         ldlm_resource_put(lock->l_resource);
533                         ldlm_lock_free(lock);
534                         RETURN(rc);
535                 }
536         }
537
538         lock->l_cookie = cookie;
539         lock->l_cookie_len = cookie_len;
540
541         if (local && lock->l_req_mode == lock->l_granted_mode) {
542                 /* The server returned a blocked lock, but it was granted before
543                  * we got a chance to actually enqueue it.  We don't need to do
544                  * anything else. */
545                 GOTO(out_noput, ELDLM_OK);
546         }
547
548         /* If this is a local resource, put it on the appropriate list. */
549         list_del_init(&lock->l_res_link);
550         if (local) {
551                 if (*flags & LDLM_FL_BLOCK_CONV)
552                         ldlm_resource_add_lock(res, res->lr_converting.prev,
553                                                lock);
554                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
555                         ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
556                 else
557                         ldlm_grant_lock(res, lock);
558                 GOTO(out, ELDLM_OK);
559         }
560
561         /* FIXME: We may want to optimize by checking lr_most_restr */
562         if (!list_empty(&res->lr_converting)) {
563                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
564                 *flags |= LDLM_FL_BLOCK_CONV;
565                 GOTO(out, ELDLM_OK);
566         }
567         if (!list_empty(&res->lr_waiting)) {
568                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
569                 *flags |= LDLM_FL_BLOCK_WAIT;
570                 GOTO(out, ELDLM_OK);
571         }
572         incompat = ldlm_lock_compat(lock, 0);
573         if (incompat) {
574                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
575                 *flags |= LDLM_FL_BLOCK_GRANTED;
576                 GOTO(out, ELDLM_OK);
577         }
578
579         ldlm_grant_lock(res, lock);
580         EXIT;
581  out:
582         /* We're called with a lock that has a referenced resource and is not on
583          * any resource list.  When we added it to a list, we incurred an extra
584          * reference. */
585         ldlm_resource_put(lock->l_resource);
586  out_noput:
587         /* Don't set 'completion_ast' until here so that if the lock is granted
588          * immediately we don't do an unnecessary completion call. */
589         lock->l_completion_ast = completion;
590         spin_unlock(&res->lr_lock);
591         return ELDLM_OK;
592 }
593
594 /* Must be called with resource->lr_lock taken. */
595 static int ldlm_reprocess_queue(struct ldlm_resource *res,
596                                 struct list_head *converting)
597 {
598         struct list_head *tmp, *pos;
599         ENTRY;
600
601         list_for_each_safe(tmp, pos, converting) {
602                 struct ldlm_lock *pending;
603                 pending = list_entry(tmp, struct ldlm_lock, l_res_link);
604
605                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
606
607                 /* the resource lock protects ldlm_lock_compat */
608                 if (ldlm_lock_compat(pending, 1))
609                         RETURN(1);
610
611                 list_del_init(&pending->l_res_link);
612                 ldlm_grant_lock(res, pending);
613
614                 ldlm_lock_addref(pending, pending->l_req_mode);
615                 ldlm_lock_decref(pending, pending->l_granted_mode);
616         }
617
618         RETURN(0);
619 }
620
621 /* Must be called with resource->lr_lock not taken. */
622 void ldlm_reprocess_all(struct ldlm_resource *res)
623 {
624         struct list_head rpc_list, *tmp, *pos;
625
626         INIT_LIST_HEAD(&rpc_list);
627
628         /* Local lock trees don't get reprocessed. */
629         if (res->lr_namespace->ns_client)
630                 return;
631
632         spin_lock(&res->lr_lock);
633         res->lr_tmp = &rpc_list;
634
635         ldlm_reprocess_queue(res, &res->lr_converting);
636         if (list_empty(&res->lr_converting))
637                 ldlm_reprocess_queue(res, &res->lr_waiting);
638
639         res->lr_tmp = NULL;
640         spin_unlock(&res->lr_lock);
641
642         list_for_each_safe(tmp, pos, &rpc_list) {
643                 int rc;
644                 struct ptlrpc_request *req =
645                         list_entry(tmp, struct ptlrpc_request, rq_multi);
646
647                 CDEBUG(D_INFO, "Sending callback.\n");
648
649                 rc = ptlrpc_queue_wait(req);
650                 rc = ptlrpc_check_status(req, rc);
651                 ptlrpc_free_req(req);
652                 if (rc)
653                         CERROR("Callback send failed: %d\n", rc);
654         }
655 }
656
657 /* Must be called with lock and lock->l_resource unlocked */
658 struct ldlm_resource *ldlm_local_lock_cancel(struct ldlm_lock *lock)
659 {
660         struct ldlm_resource *res;
661         ENTRY;
662
663         res = lock->l_resource;
664
665         spin_lock(&res->lr_lock);
666         spin_lock(&lock->l_lock);
667
668         if (lock->l_readers || lock->l_writers)
669                 CDEBUG(D_INFO, "lock still has references (%d readers, %d "
670                        "writers)\n", lock->l_readers, lock->l_writers);
671
672         if (ldlm_resource_del_lock(lock))
673                 res = NULL; /* res was freed, nothing else to do. */
674         else
675                 spin_unlock(&res->lr_lock);
676         ldlm_lock_free(lock);
677
678         RETURN(res);
679 }
680
681 /* Must be called with lock and lock->l_resource unlocked */
682 struct ldlm_resource *ldlm_local_lock_convert(struct lustre_handle *lockh,
683                                               int new_mode, int *flags)
684 {
685         struct ldlm_lock *lock;
686         struct ldlm_resource *res;
687         ENTRY;
688
689         lock = lustre_handle2object(lockh);
690         res = lock->l_resource;
691
692         spin_lock(&res->lr_lock);
693
694         lock->l_req_mode = new_mode;
695         list_del_init(&lock->l_res_link);
696
697         /* If this is a local resource, put it on the appropriate list. */
698         if (res->lr_namespace->ns_client) {
699                 if (*flags & LDLM_FL_BLOCK_CONV)
700                         ldlm_resource_add_lock(res, res->lr_converting.prev,
701                                                lock);
702                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
703                         ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
704                 else
705                         ldlm_grant_lock(res, lock);
706         } else {
707                 list_add(&lock->l_res_link, res->lr_converting.prev);
708         }
709
710         spin_unlock(&res->lr_lock);
711
712         RETURN(res);
713 }
714
715 void ldlm_lock_dump(struct ldlm_lock *lock)
716 {
717         char ver[128];
718
719         if (!(portal_debug & D_OTHER))
720                 return;
721
722         if (RES_VERSION_SIZE != 4)
723                 LBUG();
724
725         if (!lock) {
726                 CDEBUG(D_OTHER, "  NULL LDLM lock\n");
727                 return;
728         }
729
730         snprintf(ver, sizeof(ver), "%x %x %x %x",
731                  lock->l_version[0], lock->l_version[1],
732                  lock->l_version[2], lock->l_version[3]);
733
734         CDEBUG(D_OTHER, "  -- Lock dump: %p (%s)\n", lock, ver);
735         CDEBUG(D_OTHER, "  Parent: %p\n", lock->l_parent);
736         CDEBUG(D_OTHER, "  Resource: %p (%Ld)\n", lock->l_resource,
737                lock->l_resource->lr_name[0]);
738         CDEBUG(D_OTHER, "  Requested mode: %d, granted mode: %d\n",
739                (int)lock->l_req_mode, (int)lock->l_granted_mode);
740         CDEBUG(D_OTHER, "  Readers: %u ; Writers; %u\n",
741                lock->l_readers, lock->l_writers);
742         if (lock->l_resource->lr_type == LDLM_EXTENT)
743                 CDEBUG(D_OTHER, "  Extent: %Lu -> %Lu\n",
744                        (unsigned long long)lock->l_extent.start,
745                        (unsigned long long)lock->l_extent.end);
746 }