Whamcloud - gitweb
Many dlm and intent lock fixes.
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  * authors, Peter Braam <braam@clusterfs.com> &
11  * Phil Schwan <phil@clusterfs.com>
12  */
13
14 #define DEBUG_SUBSYSTEM S_LDLM
15
16 #include <linux/slab.h>
17 #include <linux/module.h>
18 #include <linux/lustre_dlm.h>
19 #include <linux/lustre_mds.h>
20
21 extern kmem_cache_t *ldlm_lock_slab;
22 int (*mds_reint_p)(int offset, struct ptlrpc_request *req) = NULL;
23 int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req) = NULL;
24
25 static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b);
26 static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
27                               ldlm_mode_t mode, void *data);
28
29 ldlm_res_compat ldlm_res_compat_table [] = {
30         [LDLM_PLAIN] ldlm_plain_compat,
31         [LDLM_EXTENT] ldlm_extent_compat,
32         [LDLM_MDSINTENT] ldlm_plain_compat
33 };
34
35 ldlm_res_policy ldlm_res_policy_table [] = {
36         [LDLM_PLAIN] NULL,
37         [LDLM_EXTENT] ldlm_extent_policy,
38         [LDLM_MDSINTENT] ldlm_intent_policy
39 };
40
41 static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
42                               ldlm_mode_t mode, void *data)
43 {
44         struct ptlrpc_request *req = req_cookie;
45         int rc = 0;
46         ENTRY;
47
48         if (!req_cookie)
49                 RETURN(0);
50
51         if (req->rq_reqmsg->bufcount > 1) {
52                 /* an intent needs to be considered */
53                 struct ldlm_intent *it = lustre_msg_buf(req->rq_reqmsg, 1);
54                 struct mds_body *mds_rep;
55                 struct ldlm_reply *rep;
56                 struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
57                 __u32 type = lock->l_resource->lr_type;
58                 __u64 new_resid[3] = {0, 0, 0}, old_res;
59                 int bufcount, rc, size[3] = {sizeof(struct ldlm_reply),
60                                              sizeof(struct mds_body),
61                                              sizeof(struct obdo)};
62
63                 it->opc = NTOH__u64(it->opc);
64
65                 LDLM_DEBUG(lock, "intent policy, opc: %Ld", it->opc);
66
67                 /* prepare reply */
68                 switch(it->opc) {
69                 case IT_GETATTR:
70                         /* Note that in the negative case you may be returning
71                          * a file and its obdo */
72                 case IT_CREAT:
73                 case IT_CREAT|IT_OPEN:
74                 case IT_MKDIR:
75                 case IT_SYMLINK:
76                 case IT_MKNOD:
77                 case IT_LINK:
78                 case IT_OPEN:
79                 case IT_RENAME:
80                         bufcount = 3;
81                         break;
82                 case IT_UNLINK:
83                         bufcount = 2;
84                         size[1] = sizeof(struct obdo); 
85                         break;
86                 default:
87                         LBUG();
88                 }
89
90                 rc = lustre_pack_msg(bufcount, size, NULL, &req->rq_replen,
91                                      &req->rq_repmsg);
92                 if (rc) {
93                         rc = req->rq_status = -ENOMEM;
94                         RETURN(rc);
95                 }
96
97                 rep = lustre_msg_buf(req->rq_repmsg, 0);
98                 rep->lock_policy_res1 = 1;
99
100                 /* execute policy */
101                 switch ( it->opc ) {
102                 case IT_CREAT:
103                 case IT_CREAT|IT_OPEN:
104                 case IT_MKDIR:
105                 case IT_SETATTR:
106                 case IT_SYMLINK:
107                 case IT_MKNOD:
108                 case IT_LINK:
109                 case IT_UNLINK:
110                 case IT_RENAME2:
111                         if (mds_reint_p == NULL)
112                                 mds_reint_p =
113                                         inter_module_get_request
114                                         ("mds_reint", "mds");
115                         if (IS_ERR(mds_reint_p)) {
116                                 CERROR("MDSINTENT locks require the MDS "
117                                        "module.\n");
118                                 LBUG();
119                                 RETURN(-EINVAL);
120                         }
121                         rc = mds_reint_p(2, req);
122                         if (rc)
123                                 LBUG();
124                         break;
125                 case IT_GETATTR:
126                 case IT_READDIR:
127                 case IT_RENAME:
128                 case IT_OPEN:
129                         if (mds_getattr_name_p == NULL)
130                                 mds_getattr_name_p =
131                                         inter_module_get_request
132                                         ("mds_getattr_name", "mds");
133                         if (IS_ERR(mds_getattr_name_p)) {
134                                 CERROR("MDSINTENT locks require the MDS "
135                                        "module.\n");
136                                 LBUG();
137                                 RETURN(-EINVAL);
138                         }
139                         rc = mds_getattr_name_p(2, req);
140                         if (rc) {
141                                 req->rq_status = rc;
142                                 RETURN(rc);
143                         }
144                         break;
145                 case IT_READDIR|IT_OPEN:
146                         LBUG();
147                         break;
148                 default:
149                         CERROR("Unhandled intent\n");
150                         LBUG();
151                 }
152
153                 if (it->opc == IT_UNLINK || it->opc == IT_RMDIR)
154                         RETURN(ELDLM_LOCK_ABORTED);
155
156                 mds_rep = lustre_msg_buf(req->rq_repmsg, 1);
157                 rep->lock_policy_res2 = req->rq_status;
158                 new_resid[0] = mds_rep->ino;
159                 old_res = lock->l_resource->lr_name[0];
160
161                 CDEBUG(D_INFO, "remote intent: locking %d instead of"
162                        "%ld\n", mds_rep->ino, (long)old_res);
163                 ldlm_resource_put(lock->l_resource);
164
165                 lock->l_resource =
166                         ldlm_resource_get(ns, NULL, new_resid, type, 1);
167                 if (lock->l_resource == NULL) {
168                         LBUG();
169                         RETURN(-ENOMEM);
170                 }
171                 LDLM_DEBUG(lock, "intent policy, old res %ld",
172                            (long)old_res);
173                 RETURN(ELDLM_LOCK_CHANGED);
174         } else {
175                 int size = sizeof(struct ldlm_reply);
176                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
177                                      &req->rq_repmsg);
178                 if (rc) {
179                         CERROR("out of memory\n");
180                         LBUG();
181                         RETURN(-ENOMEM);
182                 }
183         }
184         RETURN(rc);
185 }
186
187 static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b)
188 {
189         return lockmode_compat(a->l_req_mode, b->l_req_mode);
190 }
191
192 /* Args: referenced, unlocked parent (or NULL)
193  *       referenced, unlocked resource
194  * Locks: parent->l_lock */
195 static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
196                                        struct ldlm_resource *resource)
197 {
198         struct ldlm_lock *lock;
199
200         if (resource == NULL)
201                 LBUG();
202
203         lock = kmem_cache_alloc(ldlm_lock_slab, SLAB_KERNEL);
204         if (lock == NULL)
205                 return NULL;
206
207         memset(lock, 0, sizeof(*lock));
208         lock->l_resource = resource;
209         INIT_LIST_HEAD(&lock->l_children);
210         INIT_LIST_HEAD(&lock->l_res_link);
211         init_waitqueue_head(&lock->l_waitq);
212         lock->l_lock = SPIN_LOCK_UNLOCKED;
213
214         if (parent != NULL) {
215                 spin_lock(&parent->l_lock);
216                 lock->l_parent = parent;
217                 list_add(&lock->l_childof, &parent->l_children);
218                 spin_unlock(&parent->l_lock);
219         }
220
221         return lock;
222 }
223
224 /* Args: unreferenced, locked lock
225  *
226  * Caller must do its own ldlm_resource_put() on lock->l_resource */
227 void ldlm_lock_free(struct ldlm_lock *lock)
228 {
229         if (!list_empty(&lock->l_children)) {
230                 CERROR("lock %p still has children (%p)!\n", lock,
231                        lock->l_children.next);
232                 ldlm_lock_dump(lock);
233                 LBUG();
234         }
235
236         if (lock->l_readers || lock->l_writers)
237                 CDEBUG(D_INFO, "lock still has references (%d readers, %d "
238                        "writers)\n", lock->l_readers, lock->l_writers);
239
240         if (lock->l_connection)
241                 ptlrpc_put_connection(lock->l_connection);
242         kmem_cache_free(ldlm_lock_slab, lock);
243 }
244
245 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
246 {
247         ldlm_res2desc(lock->l_resource, &desc->l_resource);
248         desc->l_req_mode = lock->l_req_mode;
249         desc->l_granted_mode = lock->l_granted_mode;
250         memcpy(&desc->l_extent, &lock->l_extent, sizeof(desc->l_extent));
251         memcpy(desc->l_version, lock->l_version, sizeof(desc->l_version));
252 }
253
254 /* Args: unlocked lock */
255 void ldlm_lock_addref(struct ldlm_lock *lock, __u32 mode)
256 {
257         spin_lock(&lock->l_lock);
258         if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
259                 lock->l_readers++;
260         else
261                 lock->l_writers++;
262         spin_unlock(&lock->l_lock);
263 }
264
265 int ldlm_send_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock *new)
266 {
267         struct ptlrpc_request *req = NULL;
268         ENTRY;
269
270         spin_lock(&lock->l_lock);
271         if (lock->l_flags & LDLM_FL_AST_SENT) {
272                 RETURN(0);
273         }
274
275         lock->l_flags |= LDLM_FL_AST_SENT;
276
277         lock->l_blocking_ast(lock, new, lock->l_data, lock->l_data_len, &req);
278         spin_unlock(&lock->l_lock);
279         if (req != NULL) {
280                 struct list_head *list = lock->l_resource->lr_tmp;
281                 list_add(&req->rq_multi, list);
282         }
283         RETURN(1);
284 }
285
286 /* Args: unlocked lock */
287 void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode)
288 {
289         ENTRY;
290
291         if (lock == NULL)
292                 LBUG();
293
294         spin_lock(&lock->l_lock);
295         if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
296                 lock->l_readers--;
297         else
298                 lock->l_writers--;
299         if (!lock->l_readers && !lock->l_writers &&
300             lock->l_flags & LDLM_FL_DYING) {
301                 /* Read this lock its rights. */
302                 if (!lock->l_resource->lr_namespace->ns_client) {
303                         CERROR("LDLM_FL_DYING set on non-local lock!\n");
304                         LBUG();
305                 }
306
307                 CDEBUG(D_INFO, "final decref done on dying lock, "
308                        "calling callback.\n");
309                 spin_unlock(&lock->l_lock);
310                 /* This function pointer is unfortunately overloaded.  This
311                  * call will not result in an RPC. */
312                 lock->l_blocking_ast(lock, NULL, lock->l_data,
313                                      lock->l_data_len, NULL);
314         } else
315                 spin_unlock(&lock->l_lock);
316         EXIT;
317 }
318
319 /* Args: unlocked lock */
320 static int _ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs,
321                              struct list_head *queue)
322 {
323         struct list_head *tmp, *pos;
324         int rc = 0;
325
326         list_for_each_safe(tmp, pos, queue) {
327                 struct ldlm_lock *child;
328                 ldlm_res_compat compat;
329
330                 child = list_entry(tmp, struct ldlm_lock, l_res_link);
331                 if (lock == child)
332                         continue;
333
334                 compat = ldlm_res_compat_table[child->l_resource->lr_type];
335                 if (compat(child, lock)) {
336                         CDEBUG(D_OTHER, "compat function succeded, next.\n");
337                         continue;
338                 }
339                 if (lockmode_compat(child->l_granted_mode, lock->l_req_mode)) {
340                         CDEBUG(D_OTHER, "lock modes are compatible, next.\n");
341                         continue;
342                 }
343
344                 rc = 1;
345
346                 CDEBUG(D_OTHER, "compat function failed and lock modes incompat\n");
347                 if (send_cbs && child->l_blocking_ast != NULL) {
348                         CDEBUG(D_OTHER, "incompatible; sending blocking AST.\n");
349                         /* It's very difficult to actually send the AST from
350                          * here, because we'd have to drop the lock before going
351                          * to sleep to wait for the reply.  Instead we build the
352                          * packet and send it later. */
353                         ldlm_send_blocking_ast(child, lock);
354                 }
355         }
356
357         return rc;
358 }
359
360 /* Args: unlocked lock */
361 static int ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs)
362 {
363         int rc;
364         ENTRY;
365
366         rc = _ldlm_lock_compat(lock, send_cbs, &lock->l_resource->lr_granted);
367         /* FIXME: should we be sending ASTs to converting? */
368         rc |= _ldlm_lock_compat(lock, send_cbs,
369                                 &lock->l_resource->lr_converting);
370
371         RETURN(rc);
372 }
373
374 /* Args: locked lock, locked resource */
375 void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock)
376 {
377         ENTRY;
378
379         ldlm_resource_add_lock(res, &res->lr_granted, lock);
380         lock->l_granted_mode = lock->l_req_mode;
381
382         if (lock->l_granted_mode < res->lr_most_restr)
383                 res->lr_most_restr = lock->l_granted_mode;
384
385         if (lock->l_completion_ast)
386                 lock->l_completion_ast(lock, NULL, lock->l_data,
387                                        lock->l_data_len, NULL);
388         EXIT;
389 }
390
391 static int search_queue(struct list_head *queue, ldlm_mode_t mode,
392                         struct ldlm_extent *extent, struct lustre_handle *lockh)
393 {
394         struct list_head *tmp;
395
396         list_for_each(tmp, queue) {
397                 struct ldlm_lock *lock;
398                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
399
400                 if (lock->l_flags & LDLM_FL_DYING)
401                         continue;
402
403                 /* lock_convert() takes the resource lock, so we're sure that
404                  * req_mode, lr_type, and l_cookie won't change beneath us */
405                 if (lock->l_req_mode != mode)
406                         continue;
407
408                 if (lock->l_resource->lr_type == LDLM_EXTENT &&
409                     (lock->l_extent.start > extent->start ||
410                      lock->l_extent.end < extent->end))
411                         continue;
412
413                 ldlm_lock_addref(lock, mode);
414                 ldlm_object2handle(lock, lockh);
415                 return 1;
416         }
417
418         return 0;
419 }
420
421 /* Must be called with no resource or lock locks held.
422  *
423  * Returns 1 if it finds an already-existing lock that is compatible; in this
424  * case, lockh is filled in with a addref()ed lock */
425 int ldlm_local_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type,
426                           void *cookie, int cookielen, ldlm_mode_t mode,
427                           struct lustre_handle *lockh)
428 {
429         struct ldlm_resource *res;
430         int rc = 0;
431         ENTRY;
432
433         res = ldlm_resource_get(ns, NULL, res_id, type, 0);
434         if (res == NULL)
435                 RETURN(0);
436
437         spin_lock(&res->lr_lock);
438         if (search_queue(&res->lr_granted, mode, cookie, lockh))
439                 GOTO(out, rc = 1);
440         if (search_queue(&res->lr_converting, mode, cookie, lockh))
441                 GOTO(out, rc = 1);
442         if (search_queue(&res->lr_waiting, mode, cookie, lockh))
443                 GOTO(out, rc = 1);
444
445         EXIT;
446  out:
447         ldlm_resource_put(res);
448         spin_unlock(&res->lr_lock);
449         return rc;
450 }
451
452 /* Must be called without the resource lock held.  Returns a referenced,
453  * unlocked ldlm_lock. */
454 ldlm_error_t ldlm_local_lock_create(struct ldlm_namespace *ns,
455                                     struct lustre_handle *parent_lock_handle,
456                                     __u64 *res_id, __u32 type,
457                                      ldlm_mode_t mode,
458                                     void *data,
459                                     __u32 data_len,
460                                     struct lustre_handle *lockh)
461 {
462         struct ldlm_resource *res, *parent_res = NULL;
463         struct ldlm_lock *lock, *parent_lock;
464
465         parent_lock = lustre_handle2object(parent_lock_handle);
466         if (parent_lock)
467                 parent_res = parent_lock->l_resource;
468
469         res = ldlm_resource_get(ns, parent_res, res_id, type, 1);
470         if (res == NULL)
471                 RETURN(-ENOMEM);
472
473         lock = ldlm_lock_new(parent_lock, res);
474         if (lock == NULL) {
475                 spin_lock(&res->lr_lock);
476                 ldlm_resource_put(res);
477                 spin_unlock(&res->lr_lock);
478                 RETURN(-ENOMEM);
479         }
480
481         lock->l_req_mode = mode;
482         lock->l_data = data;
483         lock->l_data_len = data_len;
484         ldlm_lock_addref(lock, mode);
485
486         ldlm_object2handle(lock, lockh);
487         return ELDLM_OK;
488 }
489
490 /* Must be called with lock->l_lock and lock->l_resource->lr_lock not held */
491 ldlm_error_t ldlm_local_lock_enqueue(struct lustre_handle *lockh,
492                                      void *cookie, int cookie_len,
493                                      int *flags,
494                                      ldlm_lock_callback completion,
495                                      ldlm_lock_callback blocking)
496 {
497         struct ldlm_resource *res;
498         struct ldlm_lock *lock;
499         int incompat = 0, local;
500         ldlm_res_policy policy;
501         ENTRY;
502
503         lock = lustre_handle2object(lockh);
504         res = lock->l_resource;
505         local = res->lr_namespace->ns_client;
506         spin_lock(&res->lr_lock);
507
508         lock->l_blocking_ast = blocking;
509
510         if (res->lr_type == LDLM_EXTENT)
511                 memcpy(&lock->l_extent, cookie, sizeof(lock->l_extent));
512
513         /* policies are not executed on the client */
514         if (!local && (policy = ldlm_res_policy_table[res->lr_type])) {
515                 int rc = policy(lock, cookie, lock->l_req_mode, NULL);
516                 if (rc == ELDLM_LOCK_CHANGED) {
517                         res = lock->l_resource;
518                         *flags |= LDLM_FL_LOCK_CHANGED;
519                 }
520                 if (rc == ELDLM_LOCK_ABORTED) {
521                         /* Abort. */
522                         ldlm_resource_put(lock->l_resource);
523                         ldlm_lock_free(lock);
524                         RETURN(rc);
525                 }
526         }
527
528         lock->l_cookie = cookie;
529         lock->l_cookie_len = cookie_len;
530
531         if (local && lock->l_req_mode == lock->l_granted_mode) {
532                 /* The server returned a blocked lock, but it was granted before
533                  * we got a chance to actually enqueue it.  We don't need to do
534                  * anything else. */
535                 GOTO(out, ELDLM_OK);
536         }
537
538         /* If this is a local resource, put it on the appropriate list. */
539         list_del_init(&lock->l_res_link);
540         if (local) {
541                 if (*flags & LDLM_FL_BLOCK_CONV)
542                         ldlm_resource_add_lock(res, res->lr_converting.prev,
543                                                lock);
544                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
545                         ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
546                 else
547                         ldlm_grant_lock(res, lock);
548                 GOTO(out, ELDLM_OK);
549         }
550
551         /* FIXME: We may want to optimize by checking lr_most_restr */
552         if (!list_empty(&res->lr_converting)) {
553                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
554                 *flags |= LDLM_FL_BLOCK_CONV;
555                 GOTO(out, ELDLM_OK);
556         }
557         if (!list_empty(&res->lr_waiting)) {
558                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
559                 *flags |= LDLM_FL_BLOCK_WAIT;
560                 GOTO(out, ELDLM_OK);
561         }
562         incompat = ldlm_lock_compat(lock, 0);
563         if (incompat) {
564                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
565                 *flags |= LDLM_FL_BLOCK_GRANTED;
566                 GOTO(out, ELDLM_OK);
567         }
568
569         ldlm_grant_lock(res, lock);
570         EXIT;
571  out:
572         /* Don't set 'completion_ast' until here so that if the lock is granted
573          * immediately we don't do an unnecessary completion call. */
574         lock->l_completion_ast = completion;
575         spin_unlock(&res->lr_lock);
576         return ELDLM_OK;
577 }
578
579 /* Must be called with resource->lr_lock taken. */
580 static int ldlm_reprocess_queue(struct ldlm_resource *res,
581                                 struct list_head *converting)
582 {
583         struct list_head *tmp, *pos;
584         ENTRY;
585
586         list_for_each_safe(tmp, pos, converting) {
587                 struct ldlm_lock *pending;
588                 pending = list_entry(tmp, struct ldlm_lock, l_res_link);
589
590                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
591
592                 /* the resource lock protects ldlm_lock_compat */
593                 if (ldlm_lock_compat(pending, 1))
594                         RETURN(1);
595
596                 list_del_init(&pending->l_res_link);
597                 ldlm_grant_lock(res, pending);
598
599                 ldlm_lock_addref(pending, pending->l_req_mode);
600                 ldlm_lock_decref(pending, pending->l_granted_mode);
601         }
602
603         RETURN(0);
604 }
605
606 /* Must be called with resource->lr_lock not taken. */
607 void ldlm_reprocess_all(struct ldlm_resource *res)
608 {
609         struct list_head rpc_list, *tmp, *pos;
610
611         INIT_LIST_HEAD(&rpc_list);
612
613         /* Local lock trees don't get reprocessed. */
614         if (res->lr_namespace->ns_client)
615                 return;
616
617         spin_lock(&res->lr_lock);
618         res->lr_tmp = &rpc_list;
619
620         ldlm_reprocess_queue(res, &res->lr_converting);
621         if (list_empty(&res->lr_converting))
622                 ldlm_reprocess_queue(res, &res->lr_waiting);
623
624         res->lr_tmp = NULL;
625         spin_unlock(&res->lr_lock);
626
627         list_for_each_safe(tmp, pos, &rpc_list) {
628                 int rc;
629                 struct ptlrpc_request *req =
630                         list_entry(tmp, struct ptlrpc_request, rq_multi);
631
632                 CDEBUG(D_INFO, "Sending callback.\n");
633
634                 rc = ptlrpc_queue_wait(req);
635                 rc = ptlrpc_check_status(req, rc);
636                 ptlrpc_free_req(req);
637                 if (rc)
638                         CERROR("Callback send failed: %d\n", rc);
639         }
640 }
641
642 /* Must be called with lock and lock->l_resource unlocked */
643 struct ldlm_resource *ldlm_local_lock_cancel(struct ldlm_lock *lock)
644 {
645         struct ldlm_resource *res;
646         ENTRY;
647
648         res = lock->l_resource;
649
650         spin_lock(&res->lr_lock);
651         spin_lock(&lock->l_lock);
652
653         if (lock->l_readers || lock->l_writers)
654                 CDEBUG(D_INFO, "lock still has references (%d readers, %d "
655                        "writers)\n", lock->l_readers, lock->l_writers);
656
657         ldlm_resource_del_lock(lock);
658         if (ldlm_resource_put(res))
659                 res = NULL; /* res was freed, nothing else to do. */
660         else
661                 spin_unlock(&res->lr_lock);
662         ldlm_lock_free(lock);
663
664         RETURN(res);
665 }
666
667 /* Must be called with lock and lock->l_resource unlocked */
668 struct ldlm_resource *ldlm_local_lock_convert(struct lustre_handle *lockh,
669                                               int new_mode, int *flags)
670 {
671         struct ldlm_lock *lock;
672         struct ldlm_resource *res;
673         ENTRY;
674
675         lock = lustre_handle2object(lockh);
676         res = lock->l_resource;
677
678         spin_lock(&res->lr_lock);
679
680         lock->l_req_mode = new_mode;
681         list_del_init(&lock->l_res_link);
682
683         /* If this is a local resource, put it on the appropriate list. */
684         if (res->lr_namespace->ns_client) {
685                 if (*flags & LDLM_FL_BLOCK_CONV)
686                         ldlm_resource_add_lock(res, res->lr_converting.prev,
687                                                lock);
688                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
689                         ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
690                 else
691                         ldlm_grant_lock(res, lock);
692         } else {
693                 list_add(&lock->l_res_link, res->lr_converting.prev);
694         }
695
696         spin_unlock(&res->lr_lock);
697
698         RETURN(res);
699 }
700
701 void ldlm_lock_dump(struct ldlm_lock *lock)
702 {
703         char ver[128];
704
705         if (!(portal_debug & D_OTHER))
706                 return;
707
708         if (RES_VERSION_SIZE != 4)
709                 LBUG();
710
711         if (!lock) {
712                 CDEBUG(D_OTHER, "  NULL LDLM lock\n");
713                 return;
714         }
715
716         snprintf(ver, sizeof(ver), "%x %x %x %x",
717                  lock->l_version[0], lock->l_version[1],
718                  lock->l_version[2], lock->l_version[3]);
719
720         CDEBUG(D_OTHER, "  -- Lock dump: %p (%s)\n", lock, ver);
721         CDEBUG(D_OTHER, "  Parent: %p\n", lock->l_parent);
722         CDEBUG(D_OTHER, "  Resource: %p (%Ld)\n", lock->l_resource,
723                lock->l_resource->lr_name[0]);
724         CDEBUG(D_OTHER, "  Requested mode: %d, granted mode: %d\n",
725                (int)lock->l_req_mode, (int)lock->l_granted_mode);
726         CDEBUG(D_OTHER, "  Readers: %u ; Writers; %u\n",
727                lock->l_readers, lock->l_writers);
728         if (lock->l_resource->lr_type == LDLM_EXTENT)
729                 CDEBUG(D_OTHER, "  Extent: %Lu -> %Lu\n",
730                        (unsigned long long)lock->l_extent.start,
731                        (unsigned long long)lock->l_extent.end);
732 }