Whamcloud - gitweb
- remainder of rmdir changes
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  * authors, Peter Braam <braam@clusterfs.com> &
11  * Phil Schwan <phil@clusterfs.com>
12  */
13
14 #define DEBUG_SUBSYSTEM S_LDLM
15
16 #include <linux/slab.h>
17 #include <linux/module.h>
18 #include <linux/lustre_dlm.h>
19 #include <linux/lustre_mds.h>
20
21 extern kmem_cache_t *ldlm_lock_slab;
22 int (*mds_reint_p)(int offset, struct ptlrpc_request *req) = NULL;
23 int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req) = NULL;
24
25 static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b);
26 static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
27                               ldlm_mode_t mode, void *data);
28
29 ldlm_res_compat ldlm_res_compat_table [] = {
30         [LDLM_PLAIN] ldlm_plain_compat,
31         [LDLM_EXTENT] ldlm_extent_compat,
32         [LDLM_MDSINTENT] ldlm_plain_compat
33 };
34
35 ldlm_res_policy ldlm_res_policy_table [] = {
36         [LDLM_PLAIN] NULL,
37         [LDLM_EXTENT] ldlm_extent_policy,
38         [LDLM_MDSINTENT] ldlm_intent_policy
39 };
40
41 static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
42                               ldlm_mode_t mode, void *data)
43 {
44         struct ptlrpc_request *req = req_cookie;
45         int rc = 0;
46         ENTRY;
47
48         if (!req_cookie)
49                 RETURN(0);
50
51         if (req->rq_reqmsg->bufcount > 1) {
52                 /* an intent needs to be considered */
53                 struct ldlm_intent *it = lustre_msg_buf(req->rq_reqmsg, 1);
54                 struct mds_body *mds_rep;
55                 struct ldlm_reply *rep;
56                 struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
57                 __u32 type = lock->l_resource->lr_type;
58                 __u64 new_resid[3] = {0, 0, 0}, old_res;
59                 int bufcount = -1, rc, size[3] = {sizeof(struct ldlm_reply),
60                                                   sizeof(struct mds_body),
61                                                   sizeof(struct obdo)};
62
63                 it->opc = NTOH__u64(it->opc);
64
65                 LDLM_DEBUG(lock, "intent policy, opc: %Ld", it->opc);
66
67                 /* prepare reply */
68                 switch(it->opc) {
69                 case IT_GETATTR:
70                         /* Note that in the negative case you may be returning
71                          * a file and its obdo */
72                 case IT_CREAT:
73                 case IT_CREAT|IT_OPEN:
74                 case IT_MKDIR:
75                 case IT_SYMLINK:
76                 case IT_MKNOD:
77                 case IT_LINK:
78                 case IT_OPEN:
79                 case IT_RENAME:
80                         bufcount = 3;
81                         break;
82                 case IT_UNLINK:
83                         bufcount = 2;
84                         size[1] = sizeof(struct obdo); 
85                         break;
86                 case IT_RMDIR:
87                         bufcount = 1;
88                         break;
89                 default:
90                         LBUG();
91                 }
92
93                 rc = lustre_pack_msg(bufcount, size, NULL, &req->rq_replen,
94                                      &req->rq_repmsg);
95                 if (rc) {
96                         rc = req->rq_status = -ENOMEM;
97                         RETURN(rc);
98                 }
99
100                 rep = lustre_msg_buf(req->rq_repmsg, 0);
101                 rep->lock_policy_res1 = 1;
102
103                 /* execute policy */
104                 switch ( it->opc ) {
105                 case IT_CREAT:
106                 case IT_CREAT|IT_OPEN:
107                 case IT_MKDIR:
108                 case IT_SETATTR:
109                 case IT_SYMLINK:
110                 case IT_MKNOD:
111                 case IT_LINK:
112                 case IT_UNLINK:
113                 case IT_RMDIR:
114                 case IT_RENAME2:
115                         if (mds_reint_p == NULL)
116                                 mds_reint_p =
117                                         inter_module_get_request
118                                         ("mds_reint", "mds");
119                         if (IS_ERR(mds_reint_p)) {
120                                 CERROR("MDSINTENT locks require the MDS "
121                                        "module.\n");
122                                 LBUG();
123                                 RETURN(-EINVAL);
124                         }
125                         rc = mds_reint_p(2, req);
126                         if (rc)
127                                 LBUG();
128                         break;
129                 case IT_GETATTR:
130                 case IT_READDIR:
131                 case IT_RENAME:
132                 case IT_OPEN:
133                         if (mds_getattr_name_p == NULL)
134                                 mds_getattr_name_p =
135                                         inter_module_get_request
136                                         ("mds_getattr_name", "mds");
137                         if (IS_ERR(mds_getattr_name_p)) {
138                                 CERROR("MDSINTENT locks require the MDS "
139                                        "module.\n");
140                                 LBUG();
141                                 RETURN(-EINVAL);
142                         }
143                         rc = mds_getattr_name_p(2, req);
144                         if (rc) {
145                                 req->rq_status = rc;
146                                 RETURN(rc);
147                         }
148                         break;
149                 case IT_READDIR|IT_OPEN:
150                         LBUG();
151                         break;
152                 default:
153                         CERROR("Unhandled intent\n");
154                         LBUG();
155                 }
156
157                 if (it->opc == IT_UNLINK || it->opc == IT_RMDIR)
158                         RETURN(ELDLM_LOCK_ABORTED);
159
160                 mds_rep = lustre_msg_buf(req->rq_repmsg, 1);
161                 rep->lock_policy_res2 = req->rq_status;
162                 new_resid[0] = mds_rep->ino;
163                 old_res = lock->l_resource->lr_name[0];
164
165                 CDEBUG(D_INFO, "remote intent: locking %d instead of"
166                        "%ld\n", mds_rep->ino, (long)old_res);
167                 spin_lock(&lock->l_resource->lr_lock);
168                 if (!ldlm_resource_put(lock->l_resource))
169                         /* unlock it unless the resource was freed */
170                         spin_unlock(&lock->l_resource->lr_lock);
171
172                 lock->l_resource =
173                         ldlm_resource_get(ns, NULL, new_resid, type, 1);
174                 if (lock->l_resource == NULL) {
175                         LBUG();
176                         RETURN(-ENOMEM);
177                 }
178                 LDLM_DEBUG(lock, "intent policy, old res %ld",
179                            (long)old_res);
180                 RETURN(ELDLM_LOCK_CHANGED);
181         } else {
182                 int size = sizeof(struct ldlm_reply);
183                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
184                                      &req->rq_repmsg);
185                 if (rc) {
186                         CERROR("out of memory\n");
187                         LBUG();
188                         RETURN(-ENOMEM);
189                 }
190         }
191         RETURN(rc);
192 }
193
194 static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b)
195 {
196         return lockmode_compat(a->l_req_mode, b->l_req_mode);
197 }
198
199 /* Args: referenced, unlocked parent (or NULL)
200  *       referenced, unlocked resource
201  * Locks: parent->l_lock */
202 static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
203                                        struct ldlm_resource *resource)
204 {
205         struct ldlm_lock *lock;
206
207         if (resource == NULL)
208                 LBUG();
209
210         lock = kmem_cache_alloc(ldlm_lock_slab, SLAB_KERNEL);
211         if (lock == NULL)
212                 return NULL;
213
214         memset(lock, 0, sizeof(*lock));
215         lock->l_resource = resource;
216         INIT_LIST_HEAD(&lock->l_children);
217         INIT_LIST_HEAD(&lock->l_res_link);
218         init_waitqueue_head(&lock->l_waitq);
219         lock->l_lock = SPIN_LOCK_UNLOCKED;
220
221         if (parent != NULL) {
222                 spin_lock(&parent->l_lock);
223                 lock->l_parent = parent;
224                 list_add(&lock->l_childof, &parent->l_children);
225                 spin_unlock(&parent->l_lock);
226         }
227
228         return lock;
229 }
230
231 /* Args: unreferenced, locked lock
232  *
233  * Caller must do its own ldlm_resource_put() on lock->l_resource */
234 void ldlm_lock_free(struct ldlm_lock *lock)
235 {
236         if (!list_empty(&lock->l_children)) {
237                 CERROR("lock %p still has children (%p)!\n", lock,
238                        lock->l_children.next);
239                 ldlm_lock_dump(lock);
240                 LBUG();
241         }
242
243         if (lock->l_readers || lock->l_writers)
244                 CDEBUG(D_INFO, "lock still has references (%d readers, %d "
245                        "writers)\n", lock->l_readers, lock->l_writers);
246
247         if (lock->l_connection)
248                 ptlrpc_put_connection(lock->l_connection);
249         kmem_cache_free(ldlm_lock_slab, lock);
250 }
251
252 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
253 {
254         ldlm_res2desc(lock->l_resource, &desc->l_resource);
255         desc->l_req_mode = lock->l_req_mode;
256         desc->l_granted_mode = lock->l_granted_mode;
257         memcpy(&desc->l_extent, &lock->l_extent, sizeof(desc->l_extent));
258         memcpy(desc->l_version, lock->l_version, sizeof(desc->l_version));
259 }
260
261 /* Args: unlocked lock */
262 void ldlm_lock_addref(struct ldlm_lock *lock, __u32 mode)
263 {
264         spin_lock(&lock->l_lock);
265         if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
266                 lock->l_readers++;
267         else
268                 lock->l_writers++;
269         spin_unlock(&lock->l_lock);
270 }
271
272 int ldlm_send_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock *new)
273 {
274         struct ptlrpc_request *req = NULL;
275         ENTRY;
276
277         spin_lock(&lock->l_lock);
278         if (lock->l_flags & LDLM_FL_AST_SENT) {
279                 RETURN(0);
280         }
281
282         lock->l_flags |= LDLM_FL_AST_SENT;
283
284         lock->l_blocking_ast(lock, new, lock->l_data, lock->l_data_len, &req);
285         spin_unlock(&lock->l_lock);
286         if (req != NULL) {
287                 struct list_head *list = lock->l_resource->lr_tmp;
288                 list_add(&req->rq_multi, list);
289         }
290         RETURN(1);
291 }
292
293 /* Args: unlocked lock */
294 void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode)
295 {
296         ENTRY;
297
298         if (lock == NULL)
299                 LBUG();
300
301         spin_lock(&lock->l_lock);
302         if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
303                 lock->l_readers--;
304         else
305                 lock->l_writers--;
306         if (!lock->l_readers && !lock->l_writers &&
307             lock->l_flags & LDLM_FL_DYING) {
308                 /* Read this lock its rights. */
309                 if (!lock->l_resource->lr_namespace->ns_client) {
310                         CERROR("LDLM_FL_DYING set on non-local lock!\n");
311                         LBUG();
312                 }
313
314                 CDEBUG(D_INFO, "final decref done on dying lock, "
315                        "calling callback.\n");
316                 spin_unlock(&lock->l_lock);
317                 /* This function pointer is unfortunately overloaded.  This
318                  * call will not result in an RPC. */
319                 lock->l_blocking_ast(lock, NULL, lock->l_data,
320                                      lock->l_data_len, NULL);
321         } else
322                 spin_unlock(&lock->l_lock);
323         EXIT;
324 }
325
326 /* Args: unlocked lock */
327 static int _ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs,
328                              struct list_head *queue)
329 {
330         struct list_head *tmp, *pos;
331         int rc = 0;
332
333         list_for_each_safe(tmp, pos, queue) {
334                 struct ldlm_lock *child;
335                 ldlm_res_compat compat;
336
337                 child = list_entry(tmp, struct ldlm_lock, l_res_link);
338                 if (lock == child)
339                         continue;
340
341                 compat = ldlm_res_compat_table[child->l_resource->lr_type];
342                 if (compat(child, lock)) {
343                         CDEBUG(D_OTHER, "compat function succeded, next.\n");
344                         continue;
345                 }
346                 if (lockmode_compat(child->l_granted_mode, lock->l_req_mode)) {
347                         CDEBUG(D_OTHER, "lock modes are compatible, next.\n");
348                         continue;
349                 }
350
351                 rc = 1;
352
353                 CDEBUG(D_OTHER, "compat function failed and lock modes incompat\n");
354                 if (send_cbs && child->l_blocking_ast != NULL) {
355                         CDEBUG(D_OTHER, "incompatible; sending blocking AST.\n");
356                         /* It's very difficult to actually send the AST from
357                          * here, because we'd have to drop the lock before going
358                          * to sleep to wait for the reply.  Instead we build the
359                          * packet and send it later. */
360                         ldlm_send_blocking_ast(child, lock);
361                 }
362         }
363
364         return rc;
365 }
366
367 /* Args: unlocked lock */
368 static int ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs)
369 {
370         int rc;
371         ENTRY;
372
373         rc = _ldlm_lock_compat(lock, send_cbs, &lock->l_resource->lr_granted);
374         /* FIXME: should we be sending ASTs to converting? */
375         rc |= _ldlm_lock_compat(lock, send_cbs,
376                                 &lock->l_resource->lr_converting);
377
378         RETURN(rc);
379 }
380
381 /* Args: locked lock, locked resource */
382 void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock)
383 {
384         ENTRY;
385
386         ldlm_resource_add_lock(res, &res->lr_granted, lock);
387         lock->l_granted_mode = lock->l_req_mode;
388
389         if (lock->l_granted_mode < res->lr_most_restr)
390                 res->lr_most_restr = lock->l_granted_mode;
391
392         if (lock->l_completion_ast)
393                 lock->l_completion_ast(lock, NULL, lock->l_data,
394                                        lock->l_data_len, NULL);
395         EXIT;
396 }
397
398 static int search_queue(struct list_head *queue, ldlm_mode_t mode,
399                         struct ldlm_extent *extent, struct lustre_handle *lockh)
400 {
401         struct list_head *tmp;
402
403         list_for_each(tmp, queue) {
404                 struct ldlm_lock *lock;
405                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
406
407                 if (lock->l_flags & LDLM_FL_DYING)
408                         continue;
409
410                 /* lock_convert() takes the resource lock, so we're sure that
411                  * req_mode, lr_type, and l_cookie won't change beneath us */
412                 if (lock->l_req_mode != mode)
413                         continue;
414
415                 if (lock->l_resource->lr_type == LDLM_EXTENT &&
416                     (lock->l_extent.start > extent->start ||
417                      lock->l_extent.end < extent->end))
418                         continue;
419
420                 ldlm_lock_addref(lock, mode);
421                 ldlm_object2handle(lock, lockh);
422                 return 1;
423         }
424
425         return 0;
426 }
427
428 /* Must be called with no resource or lock locks held.
429  *
430  * Returns 1 if it finds an already-existing lock that is compatible; in this
431  * case, lockh is filled in with a addref()ed lock */
432 int ldlm_local_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type,
433                           void *cookie, int cookielen, ldlm_mode_t mode,
434                           struct lustre_handle *lockh)
435 {
436         struct ldlm_resource *res;
437         int rc = 0;
438         ENTRY;
439
440         res = ldlm_resource_get(ns, NULL, res_id, type, 0);
441         if (res == NULL)
442                 RETURN(0);
443
444         spin_lock(&res->lr_lock);
445         if (search_queue(&res->lr_granted, mode, cookie, lockh))
446                 GOTO(out, rc = 1);
447         if (search_queue(&res->lr_converting, mode, cookie, lockh))
448                 GOTO(out, rc = 1);
449         if (search_queue(&res->lr_waiting, mode, cookie, lockh))
450                 GOTO(out, rc = 1);
451
452         EXIT;
453  out:
454         ldlm_resource_put(res);
455         spin_unlock(&res->lr_lock);
456         return rc;
457 }
458
459 /* Must be called without the resource lock held.  Returns a referenced,
460  * unlocked ldlm_lock. */
461 ldlm_error_t ldlm_local_lock_create(struct ldlm_namespace *ns,
462                                     struct lustre_handle *parent_lock_handle,
463                                     __u64 *res_id, __u32 type,
464                                      ldlm_mode_t mode,
465                                     void *data,
466                                     __u32 data_len,
467                                     struct lustre_handle *lockh)
468 {
469         struct ldlm_resource *res, *parent_res = NULL;
470         struct ldlm_lock *lock, *parent_lock;
471
472         parent_lock = lustre_handle2object(parent_lock_handle);
473         if (parent_lock)
474                 parent_res = parent_lock->l_resource;
475
476         res = ldlm_resource_get(ns, parent_res, res_id, type, 1);
477         if (res == NULL)
478                 RETURN(-ENOMEM);
479
480         lock = ldlm_lock_new(parent_lock, res);
481         if (lock == NULL) {
482                 spin_lock(&res->lr_lock);
483                 ldlm_resource_put(res);
484                 spin_unlock(&res->lr_lock);
485                 RETURN(-ENOMEM);
486         }
487
488         lock->l_req_mode = mode;
489         lock->l_data = data;
490         lock->l_data_len = data_len;
491         ldlm_lock_addref(lock, mode);
492
493         ldlm_object2handle(lock, lockh);
494         return ELDLM_OK;
495 }
496
497 /* Must be called with lock->l_lock and lock->l_resource->lr_lock not held */
498 ldlm_error_t ldlm_local_lock_enqueue(struct lustre_handle *lockh,
499                                      void *cookie, int cookie_len,
500                                      int *flags,
501                                      ldlm_lock_callback completion,
502                                      ldlm_lock_callback blocking)
503 {
504         struct ldlm_resource *res;
505         struct ldlm_lock *lock;
506         int incompat = 0, local;
507         ldlm_res_policy policy;
508         ENTRY;
509
510         lock = lustre_handle2object(lockh);
511         res = lock->l_resource;
512         local = res->lr_namespace->ns_client;
513         spin_lock(&res->lr_lock);
514
515         lock->l_blocking_ast = blocking;
516
517         if (res->lr_type == LDLM_EXTENT)
518                 memcpy(&lock->l_extent, cookie, sizeof(lock->l_extent));
519
520         /* policies are not executed on the client */
521         if (!local && (policy = ldlm_res_policy_table[res->lr_type])) {
522                 int rc;
523
524                 /* We do this dancing with refcounts and locks because the
525                  * policy function could send an RPC */
526                 res->lr_refcount++;
527                 spin_unlock(&res->lr_lock);
528
529                 rc = policy(lock, cookie, lock->l_req_mode, NULL);
530
531                 spin_lock(&res->lr_lock);
532                 ldlm_resource_put(res);
533
534                 if (rc == ELDLM_LOCK_CHANGED) {
535                         spin_unlock(&res->lr_lock);
536                         res = lock->l_resource;
537                         spin_lock(&res->lr_lock);
538                         *flags |= LDLM_FL_LOCK_CHANGED;
539                 } else if (rc == ELDLM_LOCK_ABORTED) {
540                         /* Abort. */
541                         ldlm_resource_put(lock->l_resource);
542                         ldlm_lock_free(lock);
543                         RETURN(rc);
544                 }
545         }
546
547         lock->l_cookie = cookie;
548         lock->l_cookie_len = cookie_len;
549
550         if (local && lock->l_req_mode == lock->l_granted_mode) {
551                 /* The server returned a blocked lock, but it was granted before
552                  * we got a chance to actually enqueue it.  We don't need to do
553                  * anything else. */
554                 GOTO(out_noput, ELDLM_OK);
555         }
556
557         /* If this is a local resource, put it on the appropriate list. */
558         list_del_init(&lock->l_res_link);
559         if (local) {
560                 if (*flags & LDLM_FL_BLOCK_CONV)
561                         ldlm_resource_add_lock(res, res->lr_converting.prev,
562                                                lock);
563                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
564                         ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
565                 else
566                         ldlm_grant_lock(res, lock);
567                 GOTO(out, ELDLM_OK);
568         }
569
570         /* FIXME: We may want to optimize by checking lr_most_restr */
571         if (!list_empty(&res->lr_converting)) {
572                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
573                 *flags |= LDLM_FL_BLOCK_CONV;
574                 GOTO(out, ELDLM_OK);
575         }
576         if (!list_empty(&res->lr_waiting)) {
577                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
578                 *flags |= LDLM_FL_BLOCK_WAIT;
579                 GOTO(out, ELDLM_OK);
580         }
581         incompat = ldlm_lock_compat(lock, 0);
582         if (incompat) {
583                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
584                 *flags |= LDLM_FL_BLOCK_GRANTED;
585                 GOTO(out, ELDLM_OK);
586         }
587
588         ldlm_grant_lock(res, lock);
589         EXIT;
590  out:
591         /* We're called with a lock that has a referenced resource and is not on
592          * any resource list.  When we added it to a list, we incurred an extra
593          * reference. */
594         ldlm_resource_put(lock->l_resource);
595  out_noput:
596         /* Don't set 'completion_ast' until here so that if the lock is granted
597          * immediately we don't do an unnecessary completion call. */
598         lock->l_completion_ast = completion;
599         spin_unlock(&res->lr_lock);
600         return ELDLM_OK;
601 }
602
603 /* Must be called with resource->lr_lock taken. */
604 static int ldlm_reprocess_queue(struct ldlm_resource *res,
605                                 struct list_head *converting)
606 {
607         struct list_head *tmp, *pos;
608         ENTRY;
609
610         list_for_each_safe(tmp, pos, converting) {
611                 struct ldlm_lock *pending;
612                 pending = list_entry(tmp, struct ldlm_lock, l_res_link);
613
614                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
615
616                 /* the resource lock protects ldlm_lock_compat */
617                 if (ldlm_lock_compat(pending, 1))
618                         RETURN(1);
619
620                 list_del_init(&pending->l_res_link);
621                 ldlm_grant_lock(res, pending);
622
623                 ldlm_lock_addref(pending, pending->l_req_mode);
624                 ldlm_lock_decref(pending, pending->l_granted_mode);
625         }
626
627         RETURN(0);
628 }
629
630 /* Must be called with resource->lr_lock not taken. */
631 void ldlm_reprocess_all(struct ldlm_resource *res)
632 {
633         struct list_head rpc_list, *tmp, *pos;
634
635         INIT_LIST_HEAD(&rpc_list);
636
637         /* Local lock trees don't get reprocessed. */
638         if (res->lr_namespace->ns_client)
639                 return;
640
641         spin_lock(&res->lr_lock);
642         res->lr_tmp = &rpc_list;
643
644         ldlm_reprocess_queue(res, &res->lr_converting);
645         if (list_empty(&res->lr_converting))
646                 ldlm_reprocess_queue(res, &res->lr_waiting);
647
648         res->lr_tmp = NULL;
649         spin_unlock(&res->lr_lock);
650
651         list_for_each_safe(tmp, pos, &rpc_list) {
652                 int rc;
653                 struct ptlrpc_request *req =
654                         list_entry(tmp, struct ptlrpc_request, rq_multi);
655
656                 CDEBUG(D_INFO, "Sending callback.\n");
657
658                 rc = ptlrpc_queue_wait(req);
659                 rc = ptlrpc_check_status(req, rc);
660                 ptlrpc_free_req(req);
661                 if (rc)
662                         CERROR("Callback send failed: %d\n", rc);
663         }
664 }
665
666 /* Must be called with lock and lock->l_resource unlocked */
667 struct ldlm_resource *ldlm_local_lock_cancel(struct ldlm_lock *lock)
668 {
669         struct ldlm_resource *res;
670         ENTRY;
671
672         res = lock->l_resource;
673
674         spin_lock(&res->lr_lock);
675         spin_lock(&lock->l_lock);
676
677         if (lock->l_readers || lock->l_writers)
678                 CDEBUG(D_INFO, "lock still has references (%d readers, %d "
679                        "writers)\n", lock->l_readers, lock->l_writers);
680
681         if (ldlm_resource_del_lock(lock))
682                 res = NULL; /* res was freed, nothing else to do. */
683         else
684                 spin_unlock(&res->lr_lock);
685         ldlm_lock_free(lock);
686
687         RETURN(res);
688 }
689
690 /* Must be called with lock and lock->l_resource unlocked */
691 struct ldlm_resource *ldlm_local_lock_convert(struct lustre_handle *lockh,
692                                               int new_mode, int *flags)
693 {
694         struct ldlm_lock *lock;
695         struct ldlm_resource *res;
696         ENTRY;
697
698         lock = lustre_handle2object(lockh);
699         res = lock->l_resource;
700
701         spin_lock(&res->lr_lock);
702
703         lock->l_req_mode = new_mode;
704         list_del_init(&lock->l_res_link);
705
706         /* If this is a local resource, put it on the appropriate list. */
707         if (res->lr_namespace->ns_client) {
708                 if (*flags & LDLM_FL_BLOCK_CONV)
709                         ldlm_resource_add_lock(res, res->lr_converting.prev,
710                                                lock);
711                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
712                         ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
713                 else
714                         ldlm_grant_lock(res, lock);
715         } else {
716                 list_add(&lock->l_res_link, res->lr_converting.prev);
717         }
718
719         spin_unlock(&res->lr_lock);
720
721         RETURN(res);
722 }
723
724 void ldlm_lock_dump(struct ldlm_lock *lock)
725 {
726         char ver[128];
727
728         if (!(portal_debug & D_OTHER))
729                 return;
730
731         if (RES_VERSION_SIZE != 4)
732                 LBUG();
733
734         if (!lock) {
735                 CDEBUG(D_OTHER, "  NULL LDLM lock\n");
736                 return;
737         }
738
739         snprintf(ver, sizeof(ver), "%x %x %x %x",
740                  lock->l_version[0], lock->l_version[1],
741                  lock->l_version[2], lock->l_version[3]);
742
743         CDEBUG(D_OTHER, "  -- Lock dump: %p (%s)\n", lock, ver);
744         CDEBUG(D_OTHER, "  Parent: %p\n", lock->l_parent);
745         CDEBUG(D_OTHER, "  Resource: %p (%Ld)\n", lock->l_resource,
746                lock->l_resource->lr_name[0]);
747         CDEBUG(D_OTHER, "  Requested mode: %d, granted mode: %d\n",
748                (int)lock->l_req_mode, (int)lock->l_granted_mode);
749         CDEBUG(D_OTHER, "  Readers: %u ; Writers; %u\n",
750                lock->l_readers, lock->l_writers);
751         if (lock->l_resource->lr_type == LDLM_EXTENT)
752                 CDEBUG(D_OTHER, "  Extent: %Lu -> %Lu\n",
753                        (unsigned long long)lock->l_extent.start,
754                        (unsigned long long)lock->l_extent.end);
755 }