Whamcloud - gitweb
Don't LBUG() in resource_cleanup() when cancel fails
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  * authors, Peter Braam <braam@clusterfs.com> &
11  * Phil Schwan <phil@clusterfs.com>
12  */
13
14 #define DEBUG_SUBSYSTEM S_LDLM
15
16 #include <linux/slab.h>
17 #include <linux/module.h>
18 #include <linux/random.h>
19 #include <linux/lustre_dlm.h>
20 #include <linux/lustre_mds.h>
21
22 /* lock types */
23 char *ldlm_lockname[] = {
24         [0]      "--",
25         [LCK_EX] "EX",
26         [LCK_PW] "PW",
27         [LCK_PR] "PR",
28         [LCK_CW] "CW",
29         [LCK_CR] "CR",
30         [LCK_NL] "NL"
31 };
32 char *ldlm_typename[] = {
33         [LDLM_PLAIN]     "PLN",
34         [LDLM_EXTENT]    "EXT",
35         [LDLM_MDSINTENT] "INT"
36 };
37
38 extern kmem_cache_t *ldlm_lock_slab;
39 int (*mds_reint_p)(int offset, struct ptlrpc_request *req) = NULL;
40 int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req) = NULL;
41
42 static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b);
43 static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
44                               ldlm_mode_t mode, void *data);
45
46 ldlm_res_compat ldlm_res_compat_table [] = {
47         [LDLM_PLAIN] ldlm_plain_compat,
48         [LDLM_EXTENT] ldlm_extent_compat,
49         [LDLM_MDSINTENT] ldlm_plain_compat
50 };
51
52 ldlm_res_policy ldlm_res_policy_table [] = {
53         [LDLM_PLAIN] NULL,
54         [LDLM_EXTENT] ldlm_extent_policy,
55         [LDLM_MDSINTENT] ldlm_intent_policy
56 };
57
58
59 /*
60  * REFCOUNTED LOCK OBJECTS
61  */
62
63
64 /*
65  * Lock refcounts, during creation:
66  *   - one special one for allocation, dec'd only once in destroy
67  *   - one for being a lock that's in-use
68  *   - one for the addref associated with a new lock
69  */
70 struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
71 {
72         l_lock(&lock->l_resource->lr_namespace->ns_lock);
73         lock->l_refc++;
74         ldlm_resource_getref(lock->l_resource);
75         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
76         return lock;
77 }
78
79 void ldlm_lock_put(struct ldlm_lock *lock)
80 {
81         struct lustre_lock *nslock = &lock->l_resource->lr_namespace->ns_lock;
82         ENTRY;
83
84         l_lock(nslock);
85         lock->l_refc--;
86         LDLM_DEBUG(lock, "after refc--");
87         if (lock->l_refc < 0)
88                 LBUG();
89
90         ldlm_resource_put(lock->l_resource);
91         if (lock->l_parent)
92                 LDLM_LOCK_PUT(lock->l_parent);
93
94         if (lock->l_refc == 0 && (lock->l_flags & LDLM_FL_DESTROYED)) {
95                 lock->l_resource = NULL;
96                 LDLM_DEBUG(lock, "final lock_put on destroyed lock, freeing");
97                 if (lock->l_connection)
98                         ptlrpc_put_connection(lock->l_connection);
99                 CDEBUG(D_MALLOC, "kfreed 'lock': %d at %p (tot 1).\n",
100                        sizeof(*lock), lock);
101                 kmem_cache_free(ldlm_lock_slab, lock);
102         }
103         l_unlock(nslock);
104         EXIT;
105 }
106
107 void ldlm_lock_destroy(struct ldlm_lock *lock)
108 {
109         ENTRY;
110         l_lock(&lock->l_resource->lr_namespace->ns_lock);
111
112         if (!list_empty(&lock->l_children)) {
113                 LDLM_DEBUG(lock, "still has children (%p)!",
114                            lock->l_children.next);
115                 ldlm_lock_dump(lock);
116                 LBUG();
117         }
118         if (lock->l_readers || lock->l_writers) {
119                 LDLM_DEBUG(lock, "lock still has references");
120                 ldlm_lock_dump(lock);
121                 LBUG();
122         }
123
124         if (!list_empty(&lock->l_res_link)) {
125                 ldlm_lock_dump(lock);
126                 LBUG();
127         }
128
129         if (lock->l_flags & LDLM_FL_DESTROYED) {
130                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
131                 EXIT;
132                 return;
133         }
134
135         lock->l_flags = LDLM_FL_DESTROYED;
136         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
137         LDLM_LOCK_PUT(lock);
138         EXIT;
139 }
140
141 /*
142    usage: pass in a resource on which you have done get
143           pass in a parent lock on which you have done a get
144           do not put the resource or the parent
145    returns: lock with refcount 1
146 */
147 static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
148                                        struct ldlm_resource *resource)
149 {
150         struct ldlm_lock *lock;
151         ENTRY;
152
153         if (resource == NULL)
154                 LBUG();
155
156         lock = kmem_cache_alloc(ldlm_lock_slab, SLAB_KERNEL);
157         if (lock == NULL)
158                 RETURN(NULL);
159         CDEBUG(D_MALLOC, "kmalloced 'lock': %d at "
160                "%p (tot %d).\n", sizeof(*lock), lock, 1);
161
162         memset(lock, 0, sizeof(*lock));
163         get_random_bytes(&lock->l_random, sizeof(__u64));
164
165         lock->l_resource = resource;
166         /* this refcount matches the one of the resource passed
167            in which is not being put away */
168         lock->l_refc = 1;
169         INIT_LIST_HEAD(&lock->l_children);
170         INIT_LIST_HEAD(&lock->l_res_link);
171         init_waitqueue_head(&lock->l_waitq);
172
173         if (parent != NULL) {
174                 l_lock(&parent->l_resource->lr_namespace->ns_lock);
175                 lock->l_parent = parent;
176                 list_add(&lock->l_childof, &parent->l_children);
177                 l_unlock(&parent->l_resource->lr_namespace->ns_lock);
178         }
179         /* this is the extra refcount, to prevent the lock
180            evaporating */
181         LDLM_LOCK_GET(lock);
182         RETURN(lock);
183 }
184
185 int ldlm_lock_change_resource(struct ldlm_lock *lock, __u64 new_resid[3])
186 {
187         struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
188         struct ldlm_resource *oldres = lock->l_resource;
189         int type, i;
190         ENTRY;
191
192         l_lock(&ns->ns_lock);
193         type = lock->l_resource->lr_type;
194
195         lock->l_resource = ldlm_resource_get(ns, NULL, new_resid, type, 1);
196         if (lock->l_resource == NULL) {
197                 LBUG();
198                 RETURN(-ENOMEM);
199         }
200
201         /* move references over */
202         for (i = 0; i < lock->l_refc; i++) {
203                 int rc;
204                 ldlm_resource_getref(lock->l_resource);
205                 rc = ldlm_resource_put(oldres);
206                 if (rc == 1 && i != lock->l_refc - 1)
207                         LBUG();
208         }
209         /* compensate for the initial get above.. */
210         ldlm_resource_put(lock->l_resource);
211
212         l_unlock(&ns->ns_lock);
213         RETURN(0);
214 }
215
216 /*
217  *  HANDLES
218  */
219
220 void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh)
221 {
222         lockh->addr = (__u64)(unsigned long)lock;
223         lockh->cookie = lock->l_random;
224 }
225
226 struct ldlm_lock *ldlm_handle2lock(struct lustre_handle *handle)
227 {
228         struct ldlm_lock *lock = NULL, *retval = NULL;
229         ENTRY;
230
231         if (!handle || !handle->addr)
232                 RETURN(NULL);
233
234         lock = (struct ldlm_lock *)(unsigned long)(handle->addr);
235         if (!kmem_cache_validate(ldlm_lock_slab, (void *)lock))
236                 RETURN(NULL);
237
238         l_lock(&lock->l_resource->lr_namespace->ns_lock);
239         if (lock->l_random != handle->cookie)
240                 GOTO(out, NULL);
241
242         if (lock->l_flags & LDLM_FL_DESTROYED)
243                 GOTO(out, NULL);
244
245         retval = LDLM_LOCK_GET(lock);
246         EXIT;
247  out:
248         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
249         return retval;
250 }
251
252
253
254 static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
255                               ldlm_mode_t mode, void *data)
256 {
257         struct ptlrpc_request *req = req_cookie;
258         int rc = 0;
259         ENTRY;
260
261         if (!req_cookie)
262                 RETURN(0);
263
264         if (req->rq_reqmsg->bufcount > 1) {
265                 /* an intent needs to be considered */
266                 struct ldlm_intent *it = lustre_msg_buf(req->rq_reqmsg, 1);
267                 struct mds_obd *mds= &req->rq_export->exp_obd->u.mds;
268                 struct mds_body *mds_rep;
269                 struct ldlm_reply *rep;
270                 __u64 new_resid[3] = {0, 0, 0}, old_res;
271                 int bufcount = -1, rc, size[3] = {sizeof(struct ldlm_reply),
272                                                   sizeof(struct mds_body),
273                                                   mds->mds_max_mdsize};
274
275                 it->opc = NTOH__u64(it->opc);
276
277                 LDLM_DEBUG(lock, "intent policy, opc: %Ld", it->opc);
278
279                 /* prepare reply */
280                 switch(it->opc) {
281                 case IT_GETATTR:
282                         /* Note that in the negative case you may be returning
283                          * a file and its obdo */
284                 case IT_CREAT:
285                 case IT_CREAT|IT_OPEN:
286                 case IT_LINK:
287                 case IT_LOOKUP:
288                 case IT_MKDIR:
289                 case IT_MKNOD:
290                 case IT_OPEN:
291                 case IT_READLINK:
292                 case IT_RENAME:
293                 case IT_RMDIR:
294                 case IT_SETATTR:
295                 case IT_SYMLINK:
296                 case IT_UNLINK:
297                         bufcount = 3;
298                         break;
299                 case IT_RENAME2:
300                         bufcount = 1;
301                         break;
302                 default:
303                         LBUG();
304                 }
305
306                 rc = lustre_pack_msg(bufcount, size, NULL, &req->rq_replen,
307                                      &req->rq_repmsg);
308                 if (rc) {
309                         rc = req->rq_status = -ENOMEM;
310                         RETURN(rc);
311                 }
312
313                 rep = lustre_msg_buf(req->rq_repmsg, 0);
314                 rep->lock_policy_res1 = 1;
315
316                 /* execute policy */
317                 switch (it->opc) {
318                 case IT_CREAT:
319                 case IT_CREAT|IT_OPEN:
320                 case IT_LINK:
321                 case IT_MKDIR:
322                 case IT_MKNOD:
323                 case IT_RENAME2:
324                 case IT_RMDIR:
325                 case IT_SYMLINK:
326                 case IT_UNLINK:
327                         rc = mds_reint_p(2, req);
328                         if (rc || req->rq_status != 0) {
329                                 rep->lock_policy_res2 = req->rq_status;
330                                 RETURN(ELDLM_LOCK_ABORTED);
331                         }
332                         break;
333                 case IT_GETATTR:
334                 case IT_LOOKUP:
335                 case IT_OPEN:
336                 case IT_READDIR:
337                 case IT_READLINK:
338                 case IT_RENAME:
339                 case IT_SETATTR:
340                         rc = mds_getattr_name_p(2, req);
341                         /* FIXME: we need to sit down and decide on who should
342                          * set req->rq_status, who should return negative and
343                          * positive return values, and what they all mean. */
344                         if (rc || req->rq_status != 0) {
345                                 rep->lock_policy_res2 = req->rq_status;
346                                 RETURN(ELDLM_LOCK_ABORTED);
347                         }
348                         break;
349                 case IT_READDIR|IT_OPEN:
350                         LBUG();
351                         break;
352                 default:
353                         CERROR("Unhandled intent\n");
354                         LBUG();
355                 }
356
357                 if (it->opc == IT_UNLINK || it->opc == IT_RMDIR ||
358                     it->opc == IT_RENAME || it->opc == IT_RENAME2)
359                         RETURN(ELDLM_LOCK_ABORTED);
360
361                 rep->lock_policy_res2 = req->rq_status;
362                 mds_rep = lustre_msg_buf(req->rq_repmsg, 1);
363                 new_resid[0] = NTOH__u32(mds_rep->ino);
364                 if (new_resid[0] == 0)
365                         LBUG();
366                 old_res = lock->l_resource->lr_name[0];
367
368                 CDEBUG(D_INFO, "remote intent: locking %d instead of"
369                        "%ld\n", mds_rep->ino, (long)old_res);
370
371                 ldlm_lock_change_resource(lock, new_resid);
372                 if (lock->l_resource == NULL) {
373                         LBUG();
374                         RETURN(-ENOMEM);
375                 }
376                 LDLM_DEBUG(lock, "intent policy, old res %ld",
377                            (long)old_res);
378                 RETURN(ELDLM_LOCK_CHANGED);
379         } else {
380                 int size = sizeof(struct ldlm_reply);
381                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
382                                      &req->rq_repmsg);
383                 if (rc) {
384                         CERROR("out of memory\n");
385                         LBUG();
386                         RETURN(-ENOMEM);
387                 }
388         }
389         RETURN(rc);
390 }
391
392 static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b)
393 {
394         return lockmode_compat(a->l_req_mode, b->l_req_mode);
395 }
396
397 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
398 {
399         ldlm_res2desc(lock->l_resource, &desc->l_resource);
400         desc->l_req_mode = lock->l_req_mode;
401         desc->l_granted_mode = lock->l_granted_mode;
402         memcpy(&desc->l_extent, &lock->l_extent, sizeof(desc->l_extent));
403         memcpy(desc->l_version, lock->l_version, sizeof(desc->l_version));
404 }
405
406 static void ldlm_add_ast_work_item(struct ldlm_lock *lock,
407                                    struct ldlm_lock *new)
408 {
409         struct ldlm_ast_work *w;
410         ENTRY;
411
412         l_lock(&lock->l_resource->lr_namespace->ns_lock);
413         if (new && (lock->l_flags & LDLM_FL_AST_SENT))
414                 GOTO(out, 0);
415
416         OBD_ALLOC(w, sizeof(*w));
417         if (!w) {
418                 LBUG();
419                 GOTO(out, 0);
420         }
421
422         if (new) {
423                 lock->l_flags |= LDLM_FL_AST_SENT;
424                 w->w_blocking = 1;
425                 ldlm_lock2desc(new, &w->w_desc);
426         }
427
428         w->w_lock = LDLM_LOCK_GET(lock);
429         list_add(&w->w_list, lock->l_resource->lr_tmp);
430  out:
431         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
432         return;
433 }
434
435 void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
436 {
437         struct ldlm_lock *lock;
438
439         lock = ldlm_handle2lock(lockh);
440         ldlm_lock_addref_internal(lock, mode);
441         LDLM_LOCK_PUT(lock);
442 }
443
444 /* only called for local locks */
445 void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
446 {
447         l_lock(&lock->l_resource->lr_namespace->ns_lock);
448         if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
449                 lock->l_readers++;
450         else
451                 lock->l_writers++;
452         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
453         LDLM_LOCK_GET(lock);
454         LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
455 }
456
457 /* Args: unlocked lock */
458 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
459 {
460         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
461         ENTRY;
462
463         if (lock == NULL)
464                 LBUG();
465
466         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
467         l_lock(&lock->l_resource->lr_namespace->ns_lock);
468         if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
469                 lock->l_readers--;
470         else
471                 lock->l_writers--;
472
473         /* If we received a blocked AST and this was the last reference,
474          * run the callback. */
475         if (!lock->l_readers && !lock->l_writers &&
476             (lock->l_flags & LDLM_FL_CBPENDING)) {
477                 struct lustre_handle lockh;
478
479                 if (!lock->l_resource->lr_namespace->ns_client) {
480                         CERROR("LDLM_FL_CBPENDING set on non-local lock!\n");
481                         LBUG();
482                 }
483
484                 LDLM_DEBUG(lock, "final decref done on cbpending lock");
485                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
486
487                 ldlm_lock2handle(lock, &lockh);
488                 /* FIXME: -1 is a really, really bad 'desc' */
489                 lock->l_blocking_ast(&lockh, (void *)-1, lock->l_data,
490                                      lock->l_data_len);
491         } else
492                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
493
494         LDLM_LOCK_PUT(lock); /* matches the ldlm_lock_get in addref */
495         LDLM_LOCK_PUT(lock); /* matches the handle2lock above */
496
497         EXIT;
498 }
499
500 static int ldlm_lock_compat_list(struct ldlm_lock *lock, int send_cbs,
501                              struct list_head *queue)
502 {
503         struct list_head *tmp, *pos;
504         int rc = 1;
505
506         list_for_each_safe(tmp, pos, queue) {
507                 struct ldlm_lock *child;
508                 ldlm_res_compat compat;
509
510                 child = list_entry(tmp, struct ldlm_lock, l_res_link);
511                 if (lock == child)
512                         continue;
513
514                 compat = ldlm_res_compat_table[child->l_resource->lr_type];
515                 if (compat && compat(child, lock)) {
516                         CDEBUG(D_OTHER, "compat function succeded, next.\n");
517                         continue;
518                 }
519                 if (lockmode_compat(child->l_granted_mode, lock->l_req_mode)) {
520                         CDEBUG(D_OTHER, "lock modes are compatible, next.\n");
521                         continue;
522                 }
523
524                 rc = 0;
525
526                 if (send_cbs && child->l_blocking_ast != NULL) {
527                         CDEBUG(D_OTHER, "incompatible; sending blocking "
528                                "AST.\n");
529                         ldlm_add_ast_work_item(child, lock);
530                 }
531         }
532
533         return rc;
534 }
535
536 static int ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs)
537 {
538         int rc;
539         ENTRY;
540
541         l_lock(&lock->l_resource->lr_namespace->ns_lock);
542         rc = ldlm_lock_compat_list(lock, send_cbs,
543                                    &lock->l_resource->lr_granted);
544         /* FIXME: should we be sending ASTs to converting? */
545         if (rc)
546                 rc = ldlm_lock_compat_list
547                         (lock, send_cbs, &lock->l_resource->lr_converting);
548
549         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
550         RETURN(rc);
551 }
552
553 /* NOTE: called by
554    - ldlm_handle_enqueuque - resource
555 */
556 void ldlm_grant_lock(struct ldlm_lock *lock)
557 {
558         struct ldlm_resource *res = lock->l_resource;
559         ENTRY;
560
561         l_lock(&lock->l_resource->lr_namespace->ns_lock);
562         ldlm_resource_add_lock(res, &res->lr_granted, lock);
563         lock->l_granted_mode = lock->l_req_mode;
564
565         if (lock->l_granted_mode < res->lr_most_restr)
566                 res->lr_most_restr = lock->l_granted_mode;
567
568         if (lock->l_completion_ast) {
569                 ldlm_add_ast_work_item(lock, NULL);
570         }
571         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
572         EXIT;
573 }
574
575 static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
576                                       struct ldlm_extent *extent)
577 {
578         struct ldlm_lock *lock;
579         struct list_head *tmp;
580
581         list_for_each(tmp, queue) {
582                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
583
584                 if (lock->l_flags & LDLM_FL_CBPENDING)
585                         continue;
586
587                 /* lock_convert() takes the resource lock, so we're sure that
588                  * req_mode, lr_type, and l_cookie won't change beneath us */
589                 if (lock->l_req_mode != mode)
590                         continue;
591
592                 if (lock->l_resource->lr_type == LDLM_EXTENT &&
593                     (lock->l_extent.start > extent->start ||
594                      lock->l_extent.end < extent->end))
595                         continue;
596
597                 ldlm_lock_addref_internal(lock, mode);
598                 return lock;
599         }
600
601         return NULL;
602 }
603
604 /* Must be called with no resource or lock locks held.
605  *
606  * Returns 1 if it finds an already-existing lock that is compatible; in this
607  * case, lockh is filled in with a addref()ed lock */
608 int ldlm_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type,
609                     void *cookie, int cookielen, ldlm_mode_t mode,
610                     struct lustre_handle *lockh)
611 {
612         struct ldlm_resource *res;
613         struct ldlm_lock *lock;
614         int rc = 0;
615         ENTRY;
616
617         res = ldlm_resource_get(ns, NULL, res_id, type, 0);
618         if (res == NULL)
619                 RETURN(0);
620
621         ns = res->lr_namespace;
622         l_lock(&ns->ns_lock);
623
624         if ((lock = search_queue(&res->lr_granted, mode, cookie)))
625                 GOTO(out, rc = 1);
626         if ((lock = search_queue(&res->lr_converting, mode, cookie)))
627                 GOTO(out, rc = 1);
628         if ((lock = search_queue(&res->lr_waiting, mode, cookie)))
629                 GOTO(out, rc = 1);
630
631         EXIT;
632  out:
633         ldlm_resource_put(res);
634         l_unlock(&ns->ns_lock);
635
636         if (lock) {
637                 ldlm_lock2handle(lock, lockh);
638                 wait_event(lock->l_waitq,
639                            lock->l_req_mode == lock->l_granted_mode);
640         }
641         if (rc)
642                 LDLM_DEBUG(lock, "matched");
643         else
644                 LDLM_DEBUG_NOLOCK("not matched");
645         return rc;
646 }
647
648 /*   Returns a referenced, lock */
649 struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
650                                    struct lustre_handle *parent_lock_handle,
651                                    __u64 *res_id, __u32 type,
652                                    ldlm_mode_t mode,
653                                    void *data,
654                                    __u32 data_len)
655 {
656         struct ldlm_resource *res, *parent_res = NULL;
657         struct ldlm_lock *lock, *parent_lock;
658
659         parent_lock = ldlm_handle2lock(parent_lock_handle);
660         if (parent_lock)
661                 parent_res = parent_lock->l_resource;
662
663         res = ldlm_resource_get(ns, parent_res, res_id, type, 1);
664         if (res == NULL)
665                 RETURN(NULL);
666
667         lock = ldlm_lock_new(parent_lock, res);
668         if (lock == NULL) {
669                 ldlm_resource_put(res);
670                 RETURN(NULL);
671         }
672
673         lock->l_req_mode = mode;
674         lock->l_data = data;
675         lock->l_data_len = data_len;
676
677         return lock;
678 }
679
680 /* Must be called with lock->l_lock and lock->l_resource->lr_lock not held */
681 ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock *lock,
682                                void *cookie, int cookie_len,
683                                int *flags,
684                                ldlm_lock_callback completion,
685                                ldlm_lock_callback blocking)
686 {
687         struct ldlm_resource *res;
688         int local;
689         ldlm_res_policy policy;
690         ENTRY;
691
692         res = lock->l_resource;
693         lock->l_blocking_ast = blocking;
694
695         if (res->lr_type == LDLM_EXTENT)
696                 memcpy(&lock->l_extent, cookie, sizeof(lock->l_extent));
697
698         /* policies are not executed on the client */
699         local = res->lr_namespace->ns_client;
700         if (!local && (policy = ldlm_res_policy_table[res->lr_type])) {
701                 int rc;
702                 rc = policy(lock, cookie, lock->l_req_mode, NULL);
703
704                 if (rc == ELDLM_LOCK_CHANGED) {
705                         res = lock->l_resource;
706                         *flags |= LDLM_FL_LOCK_CHANGED;
707                 } else if (rc == ELDLM_LOCK_ABORTED) {
708                         ldlm_lock_destroy(lock);
709                         RETURN(rc);
710                 }
711         }
712
713         lock->l_cookie = cookie;
714         lock->l_cookie_len = cookie_len;
715
716         if (local && lock->l_req_mode == lock->l_granted_mode) {
717                 /* The server returned a blocked lock, but it was granted before
718                  * we got a chance to actually enqueue it.  We don't need to do
719                  * anything else. */
720                 GOTO(out, ELDLM_OK);
721         }
722
723         /* If this is a local resource, put it on the appropriate list. */
724         /* FIXME: don't like this: can we call ldlm_resource_unlink_lock? */
725         list_del_init(&lock->l_res_link);
726         if (local) {
727                 if (*flags & LDLM_FL_BLOCK_CONV)
728                         ldlm_resource_add_lock(res, res->lr_converting.prev,
729                                                lock);
730                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
731                         ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
732                 else
733                         ldlm_grant_lock(lock);
734                 GOTO(out, ELDLM_OK);
735         }
736
737         /* FIXME: We may want to optimize by checking lr_most_restr */
738         if (!list_empty(&res->lr_converting)) {
739                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
740                 *flags |= LDLM_FL_BLOCK_CONV;
741                 GOTO(out, ELDLM_OK);
742         }
743         if (!list_empty(&res->lr_waiting)) {
744                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
745                 *flags |= LDLM_FL_BLOCK_WAIT;
746                 GOTO(out, ELDLM_OK);
747         }
748         if (!ldlm_lock_compat(lock,0)) {
749                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
750                 *flags |= LDLM_FL_BLOCK_GRANTED;
751                 GOTO(out, ELDLM_OK);
752         }
753
754         ldlm_grant_lock(lock);
755         EXIT;
756  out:
757         /* Don't set 'completion_ast' until here so that if the lock is granted
758          * immediately we don't do an unnecessary completion call. */
759         lock->l_completion_ast = completion;
760         return ELDLM_OK;
761 }
762
763 /* Must be called with namespace taken: queue is waiting or converting. */
764 static int ldlm_reprocess_queue(struct ldlm_resource *res,
765                                 struct list_head *queue)
766 {
767         struct list_head *tmp, *pos;
768         ENTRY;
769
770         list_for_each_safe(tmp, pos, queue) {
771                 struct ldlm_lock *pending;
772                 pending = list_entry(tmp, struct ldlm_lock, l_res_link);
773
774                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
775
776                 if (!ldlm_lock_compat(pending, 1))
777                         RETURN(1);
778
779                 list_del_init(&pending->l_res_link);
780                 ldlm_grant_lock(pending);
781         }
782
783         RETURN(0);
784 }
785
786 void ldlm_run_ast_work(struct list_head *rpc_list)
787 {
788         struct list_head *tmp, *pos;
789         int rc;
790         ENTRY;
791
792         list_for_each_safe(tmp, pos, rpc_list) {
793                 struct ldlm_ast_work *w =
794                         list_entry(tmp, struct ldlm_ast_work, w_list);
795                 struct lustre_handle lockh;
796
797                 ldlm_lock2handle(w->w_lock, &lockh);
798                 if (w->w_blocking)
799                         rc = w->w_lock->l_blocking_ast
800                                 (&lockh, &w->w_desc, w->w_data, w->w_datalen);
801                 else
802                         rc = w->w_lock->l_completion_ast
803                                 (&lockh, NULL, w->w_data, w->w_datalen);
804                 if (rc)
805                         CERROR("Failed AST - should clean & disconnect "
806                                "client\n");
807                 LDLM_LOCK_PUT(w->w_lock);
808                 list_del(&w->w_list);
809                 OBD_FREE(w, sizeof(*w));
810         }
811         EXIT;
812 }
813
814 /* Must be called with resource->lr_lock not taken. */
815 void ldlm_reprocess_all(struct ldlm_resource *res)
816 {
817         struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
818         ENTRY;
819
820         /* Local lock trees don't get reprocessed. */
821         if (res->lr_namespace->ns_client) {
822                 EXIT;
823                 return;
824         }
825
826         l_lock(&res->lr_namespace->ns_lock);
827         res->lr_tmp = &rpc_list;
828
829         ldlm_reprocess_queue(res, &res->lr_converting);
830         if (list_empty(&res->lr_converting))
831                 ldlm_reprocess_queue(res, &res->lr_waiting);
832
833         res->lr_tmp = NULL;
834         l_unlock(&res->lr_namespace->ns_lock);
835
836         ldlm_run_ast_work(&rpc_list);
837         EXIT;
838 }
839
840 void ldlm_lock_cancel(struct ldlm_lock *lock)
841 {
842         struct ldlm_resource *res;
843         struct ldlm_namespace *ns;
844         ENTRY;
845
846         res = lock->l_resource;
847         ns = res->lr_namespace;
848
849         l_lock(&ns->ns_lock);
850         if (lock->l_readers || lock->l_writers)
851                 CDEBUG(D_INFO, "lock still has references (%d readers, %d "
852                        "writers)\n", lock->l_readers, lock->l_writers);
853
854         ldlm_resource_unlink_lock(lock);
855         ldlm_lock_destroy(lock);
856         l_unlock(&ns->ns_lock);
857         EXIT;
858 }
859
860 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
861                                         int *flags)
862 {
863         struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
864         struct ldlm_resource *res;
865         struct ldlm_namespace *ns;
866         int granted = 0;
867         ENTRY;
868
869         res = lock->l_resource;
870         ns = res->lr_namespace;
871
872         l_lock(&ns->ns_lock);
873
874         lock->l_req_mode = new_mode;
875         ldlm_resource_unlink_lock(lock);
876
877         /* If this is a local resource, put it on the appropriate list. */
878         if (res->lr_namespace->ns_client) {
879                 if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED))
880                         ldlm_resource_add_lock(res, res->lr_converting.prev,
881                                                lock);
882                 else {
883                         res->lr_tmp = &rpc_list;
884                         ldlm_grant_lock(lock);
885                         res->lr_tmp = NULL;
886                         granted = 1;
887                         /* FIXME: completion handling not with ns_lock held ! */
888                         wake_up(&lock->l_waitq);
889                 }
890         } else
891                 list_add(&lock->l_res_link, res->lr_converting.prev);
892
893         l_unlock(&ns->ns_lock);
894
895         if (granted)
896                 ldlm_run_ast_work(&rpc_list);
897         RETURN(res);
898 }
899
900 void ldlm_lock_dump(struct ldlm_lock *lock)
901 {
902         char ver[128];
903
904         if (!(portal_debug & D_OTHER))
905                 return;
906
907         if (RES_VERSION_SIZE != 4)
908                 LBUG();
909
910         if (!lock) {
911                 CDEBUG(D_OTHER, "  NULL LDLM lock\n");
912                 return;
913         }
914
915         snprintf(ver, sizeof(ver), "%x %x %x %x",
916                  lock->l_version[0], lock->l_version[1],
917                  lock->l_version[2], lock->l_version[3]);
918
919         CDEBUG(D_OTHER, "  -- Lock dump: %p (%s)\n", lock, ver);
920         CDEBUG(D_OTHER, "  Parent: %p\n", lock->l_parent);
921         CDEBUG(D_OTHER, "  Resource: %p (%Ld)\n", lock->l_resource,
922                lock->l_resource->lr_name[0]);
923         CDEBUG(D_OTHER, "  Requested mode: %d, granted mode: %d\n",
924                (int)lock->l_req_mode, (int)lock->l_granted_mode);
925         CDEBUG(D_OTHER, "  Readers: %u ; Writers; %u\n",
926                lock->l_readers, lock->l_writers);
927         if (lock->l_resource->lr_type == LDLM_EXTENT)
928                 CDEBUG(D_OTHER, "  Extent: %Lu -> %Lu\n",
929                        (unsigned long long)lock->l_extent.start,
930                        (unsigned long long)lock->l_extent.end);
931 }