Whamcloud - gitweb
- more LDLM refcount locking infrastructure
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  * authors, Peter Braam <braam@clusterfs.com> &
11  * Phil Schwan <phil@clusterfs.com>
12  */
13
14 #define DEBUG_SUBSYSTEM S_LDLM
15
16 #include <linux/slab.h>
17 #include <linux/module.h>
18 #include <linux/random.h>
19 #include <linux/lustre_dlm.h>
20 #include <linux/lustre_mds.h>
21
22 /* lock types */
23 char *ldlm_lockname[] = {
24         [0]      "--",
25         [LCK_EX] "EX",
26         [LCK_PW] "PW",
27         [LCK_PR] "PR",
28         [LCK_CW] "CW",
29         [LCK_CR] "CR",
30         [LCK_NL] "NL"
31 };
32 char *ldlm_typename[] = {
33         [LDLM_PLAIN]     "PLN",
34         [LDLM_EXTENT]    "EXT",
35         [LDLM_MDSINTENT] "INT"
36 };
37
38 extern kmem_cache_t *ldlm_lock_slab;
39 int (*mds_reint_p)(int offset, struct ptlrpc_request *req) = NULL;
40 int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req) = NULL;
41
42 static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b);
43 static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
44                               ldlm_mode_t mode, void *data);
45
46 ldlm_res_compat ldlm_res_compat_table [] = {
47         [LDLM_PLAIN] ldlm_plain_compat,
48         [LDLM_EXTENT] ldlm_extent_compat,
49         [LDLM_MDSINTENT] ldlm_plain_compat
50 };
51
52 ldlm_res_policy ldlm_res_policy_table [] = {
53         [LDLM_PLAIN] NULL,
54         [LDLM_EXTENT] ldlm_extent_policy,
55         [LDLM_MDSINTENT] ldlm_intent_policy
56 };
57
58
59 /*
60  * REFCOUNTED LOCK OBJECTS
61  */
62
63
64 /*
65  * Lock refcounts, during creation:
66  *   - one special one for allocation, dec'd only once in destroy
67  *   - one for being a lock that's in-use
68  *   - one for the addref associated with a new lock
69  */
70 struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
71 {
72         l_lock(&lock->l_resource->lr_namespace->ns_lock);
73         lock->l_refc++;
74         ldlm_resource_getref(lock->l_resource);
75         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
76         return lock;
77 }
78
79 void ldlm_lock_put(struct ldlm_lock *lock)
80 {
81         struct lustre_lock *nslock = &lock->l_resource->lr_namespace->ns_lock;
82         ENTRY;
83
84         l_lock(nslock);
85         lock->l_refc--;
86         if (lock->l_refc < 0)
87                 LBUG();
88
89         ldlm_resource_put(lock->l_resource);
90         if (lock->l_parent)
91                 ldlm_lock_put(lock->l_parent);
92
93         if (lock->l_refc == 0 && (lock->l_flags & LDLM_FL_DESTROYED)) {
94                 if (lock->l_connection)
95                         ptlrpc_put_connection(lock->l_connection);
96                 CDEBUG(D_MALLOC, "kfreed 'lock': %d at %p (tot 1).\n",
97                        sizeof(*lock), lock);
98                 kmem_cache_free(ldlm_lock_slab, lock);
99         }
100         l_unlock(nslock);
101         EXIT;
102 }
103
104 void ldlm_lock_destroy(struct ldlm_lock *lock)
105 {
106         ENTRY;
107         l_lock(&lock->l_resource->lr_namespace->ns_lock);
108
109         if (!list_empty(&lock->l_children)) {
110                 CERROR("lock %p still has children (%p)!\n", lock,
111                        lock->l_children.next);
112                 ldlm_lock_dump(lock);
113                 LBUG();
114         }
115         if (lock->l_readers || lock->l_writers) {
116                 CDEBUG(D_INFO, "lock still has references (%d readers, %d "
117                        "writers)\n", lock->l_readers, lock->l_writers);
118                 LBUG();
119         }
120
121         if (!list_empty(&lock->l_res_link))
122                 LBUG();
123
124         if (lock->l_flags & LDLM_FL_DESTROYED) {
125                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
126                 EXIT;
127                 return;
128         }
129
130         lock->l_flags = LDLM_FL_DESTROYED;
131         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
132         ldlm_lock_put(lock);
133         EXIT;
134 }
135
136 /*
137    usage: pass in a resource on which you have done get
138           pass in a parent lock on which you have done a get
139           do not put the resource or the parent
140    returns: lock with refcount 1
141 */
142 static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
143                                        struct ldlm_resource *resource)
144 {
145         struct ldlm_lock *lock;
146         ENTRY;
147
148         if (resource == NULL)
149                 LBUG();
150
151         lock = kmem_cache_alloc(ldlm_lock_slab, SLAB_KERNEL);
152         if (lock == NULL)
153                 RETURN(NULL);
154         CDEBUG(D_MALLOC, "kmalloced 'lock': %d at "
155                "%p (tot %d).\n", sizeof(*lock), lock, 1);
156
157         memset(lock, 0, sizeof(*lock));
158         get_random_bytes(&lock->l_random, sizeof(__u64));
159
160         lock->l_resource = resource;
161         /* this refcount matches the one of the resource passed
162            in which is not being put away */
163         lock->l_refc = 1;
164         INIT_LIST_HEAD(&lock->l_children);
165         INIT_LIST_HEAD(&lock->l_res_link);
166         init_waitqueue_head(&lock->l_waitq);
167
168         if (parent != NULL) {
169                 l_lock(&parent->l_resource->lr_namespace->ns_lock);
170                 lock->l_parent = parent;
171                 list_add(&lock->l_childof, &parent->l_children);
172                 l_unlock(&parent->l_resource->lr_namespace->ns_lock);
173         }
174         /* this is the extra refcount, to prevent the lock
175            evaporating */
176         ldlm_lock_get(lock);
177         RETURN(lock);
178 }
179
180 int ldlm_lock_change_resource(struct ldlm_lock *lock, __u64 new_resid[3])
181 {
182         struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
183         struct ldlm_resource *oldres = lock->l_resource;
184         int type, i;
185         ENTRY;
186
187         l_lock(&ns->ns_lock);
188         type = lock->l_resource->lr_type;
189
190         lock->l_resource = ldlm_resource_get(ns, NULL, new_resid, type, 1);
191         if (lock->l_resource == NULL) {
192                 LBUG();
193                 RETURN(-ENOMEM);
194         }
195
196         /* move references over */
197         for (i = 0; i < lock->l_refc; i++) {
198                 int rc;
199                 ldlm_resource_getref(lock->l_resource);
200                 rc = ldlm_resource_put(oldres);
201                 if (rc == 1 && i != lock->l_refc - 1)
202                         LBUG();
203         }
204         /* compensate for the initial get above.. */
205         ldlm_resource_put(lock->l_resource);
206
207         l_unlock(&ns->ns_lock);
208         RETURN(0);
209 }
210
211 /*
212  *  HANDLES
213  */
214
215 void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh)
216 {
217         lockh->addr = (__u64)(unsigned long)lock;
218         lockh->cookie = lock->l_random;
219 }
220
221
222 struct ldlm_lock *ldlm_handle2lock(struct lustre_handle *handle)
223 {
224         struct ldlm_lock *lock = NULL;
225         ENTRY;
226
227         if (!handle || !handle->addr)
228                 RETURN(NULL);
229
230         lock = (struct ldlm_lock *)(unsigned long)(handle->addr);
231         if (!kmem_cache_validate(ldlm_lock_slab, (void *)lock))
232                 RETURN(NULL);
233
234         l_lock(&lock->l_resource->lr_namespace->ns_lock);
235         if (lock->l_random != handle->cookie)
236                 GOTO(out, handle = NULL);
237
238         if (lock->l_flags & LDLM_FL_DESTROYED)
239                 GOTO(out, handle = NULL);
240
241         ldlm_lock_get(lock);
242         EXIT;
243  out:
244         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
245         return  lock;
246 }
247
248
249
250 static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
251                               ldlm_mode_t mode, void *data)
252 {
253         struct ptlrpc_request *req = req_cookie;
254         int rc = 0;
255         ENTRY;
256
257         if (!req_cookie)
258                 RETURN(0);
259
260         if (req->rq_reqmsg->bufcount > 1) {
261                 /* an intent needs to be considered */
262                 struct ldlm_intent *it = lustre_msg_buf(req->rq_reqmsg, 1);
263                 struct mds_body *mds_rep;
264                 struct ldlm_reply *rep;
265                 __u64 new_resid[3] = {0, 0, 0}, old_res;
266                 int bufcount = -1, rc, size[3] = {sizeof(struct ldlm_reply),
267                                                   sizeof(struct mds_body),
268                                                   sizeof(struct obdo)};
269
270                 it->opc = NTOH__u64(it->opc);
271
272                 LDLM_DEBUG(lock, "intent policy, opc: %Ld", it->opc);
273
274                 /* prepare reply */
275                 switch(it->opc) {
276                 case IT_GETATTR:
277                         /* Note that in the negative case you may be returning
278                          * a file and its obdo */
279                 case IT_CREAT:
280                 case IT_CREAT|IT_OPEN:
281                 case IT_MKDIR:
282                 case IT_SYMLINK:
283                 case IT_MKNOD:
284                 case IT_LINK:
285                 case IT_OPEN:
286                 case IT_RENAME:
287                         bufcount = 3;
288                         break;
289                 case IT_UNLINK:
290                         bufcount = 2;
291                         size[1] = sizeof(struct obdo);
292                         break;
293                 case IT_RMDIR:
294                         bufcount = 1;
295                         break;
296                 default:
297                         LBUG();
298                 }
299
300                 rc = lustre_pack_msg(bufcount, size, NULL, &req->rq_replen,
301                                      &req->rq_repmsg);
302                 if (rc) {
303                         rc = req->rq_status = -ENOMEM;
304                         RETURN(rc);
305                 }
306
307                 rep = lustre_msg_buf(req->rq_repmsg, 0);
308                 rep->lock_policy_res1 = 1;
309
310                 /* execute policy */
311                 switch (it->opc) {
312                 case IT_CREAT:
313                 case IT_CREAT|IT_OPEN:
314                 case IT_MKDIR:
315                 case IT_SETATTR:
316                 case IT_SYMLINK:
317                 case IT_MKNOD:
318                 case IT_LINK:
319                 case IT_UNLINK:
320                 case IT_RMDIR:
321                 case IT_RENAME2:
322                         if (mds_reint_p == NULL)
323                                 mds_reint_p =
324                                         inter_module_get_request
325                                         ("mds_reint", "mds");
326                         if (IS_ERR(mds_reint_p)) {
327                                 CERROR("MDSINTENT locks require the MDS "
328                                        "module.\n");
329                                 LBUG();
330                                 RETURN(-EINVAL);
331                         }
332                         rc = mds_reint_p(2, req);
333                         if (rc)
334                                 LBUG();
335                         break;
336                 case IT_GETATTR:
337                 case IT_READDIR:
338                 case IT_RENAME:
339                 case IT_OPEN:
340                         if (mds_getattr_name_p == NULL)
341                                 mds_getattr_name_p =
342                                         inter_module_get_request
343                                         ("mds_getattr_name", "mds");
344                         if (IS_ERR(mds_getattr_name_p)) {
345                                 CERROR("MDSINTENT locks require the MDS "
346                                        "module.\n");
347                                 LBUG();
348                                 RETURN(-EINVAL);
349                         }
350                         rc = mds_getattr_name_p(2, req);
351                         /* FIXME: we need to sit down and decide on who should
352                          * set req->rq_status, who should return negative and
353                          * positive return values, and what they all mean. */
354                         if (rc || req->rq_status != 0) {
355                                 mds_rep = lustre_msg_buf(req->rq_repmsg, 1);
356                                 rep->lock_policy_res2 = req->rq_status;
357                                 RETURN(ELDLM_LOCK_ABORTED);
358                         }
359                         break;
360                 case IT_READDIR|IT_OPEN:
361                         LBUG();
362                         break;
363                 default:
364                         CERROR("Unhandled intent\n");
365                         LBUG();
366                 }
367
368                 if (it->opc == IT_UNLINK || it->opc == IT_RMDIR)
369                         RETURN(ELDLM_LOCK_ABORTED);
370
371                 mds_rep = lustre_msg_buf(req->rq_repmsg, 1);
372                 rep->lock_policy_res2 = req->rq_status;
373                 new_resid[0] = NTOH__u32(mds_rep->ino);
374                 if (new_resid[0] == 0)
375                         LBUG();
376                 old_res = lock->l_resource->lr_name[0];
377
378                 CDEBUG(D_INFO, "remote intent: locking %d instead of"
379                        "%ld\n", mds_rep->ino, (long)old_res);
380
381                 ldlm_lock_change_resource(lock, new_resid);
382                 if (lock->l_resource == NULL) {
383                         LBUG();
384                         RETURN(-ENOMEM);
385                 }
386                 LDLM_DEBUG(lock, "intent policy, old res %ld",
387                            (long)old_res);
388                 RETURN(ELDLM_LOCK_CHANGED);
389         } else {
390                 int size = sizeof(struct ldlm_reply);
391                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
392                                      &req->rq_repmsg);
393                 if (rc) {
394                         CERROR("out of memory\n");
395                         LBUG();
396                         RETURN(-ENOMEM);
397                 }
398         }
399         RETURN(rc);
400 }
401
402 static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b)
403 {
404         return lockmode_compat(a->l_req_mode, b->l_req_mode);
405 }
406
407 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
408 {
409         ldlm_res2desc(lock->l_resource, &desc->l_resource);
410         desc->l_req_mode = lock->l_req_mode;
411         desc->l_granted_mode = lock->l_granted_mode;
412         memcpy(&desc->l_extent, &lock->l_extent, sizeof(desc->l_extent));
413         memcpy(desc->l_version, lock->l_version, sizeof(desc->l_version));
414 }
415
416 static void ldlm_add_ast_work_item(struct ldlm_lock *lock,
417                                    struct ldlm_lock *new)
418 {
419         struct ldlm_ast_work *w;
420         ENTRY;
421
422         l_lock(&lock->l_resource->lr_namespace->ns_lock);
423         if (new && (lock->l_flags & LDLM_FL_AST_SENT))
424                 GOTO(out, 0);
425
426         OBD_ALLOC(w, sizeof(*w));
427         if (!w) {
428                 LBUG();
429                 return;
430         }
431
432         if (new) {
433                 lock->l_flags |= LDLM_FL_AST_SENT;
434                 w->w_blocking = 1;
435                 ldlm_lock2desc(new, &w->w_desc);
436         }
437
438         w->w_lock = ldlm_lock_get(lock);
439         list_add(&w->w_list, lock->l_resource->lr_tmp);
440  out:
441         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
442         return;
443 }
444
445 void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
446 {
447         struct ldlm_lock *lock;
448
449         lock = ldlm_handle2lock(lockh);
450         ldlm_lock_addref_internal(lock, mode);
451         ldlm_lock_put(lock);
452 }
453
454 /* only called for local locks */
455 void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
456 {
457         l_lock(&lock->l_resource->lr_namespace->ns_lock);
458         if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
459                 lock->l_readers++;
460         else
461                 lock->l_writers++;
462         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
463         ldlm_lock_get(lock);
464         LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
465 }
466
467 /* Args: unlocked lock */
468 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
469 {
470         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
471         ENTRY;
472
473         if (lock == NULL)
474                 LBUG();
475
476         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
477         l_lock(&lock->l_resource->lr_namespace->ns_lock);
478         if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
479                 lock->l_readers--;
480         else
481                 lock->l_writers--;
482
483         /* If we received a blocked AST and this was the last reference,
484          * run the callback. */
485         if (!lock->l_readers && !lock->l_writers &&
486             (lock->l_flags & LDLM_FL_CBPENDING)) {
487                 struct lustre_handle lockh;
488
489                 if (!lock->l_resource->lr_namespace->ns_client) {
490                         CERROR("LDLM_FL_CBPENDING set on non-local lock!\n");
491                         LBUG();
492                 }
493
494                 LDLM_DEBUG(lock, "final decref done on cbpending lock");
495                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
496
497                 ldlm_lock2handle(lock, &lockh);
498                 /* FIXME: -1 is a really, really bad 'desc' */
499                 lock->l_blocking_ast(&lockh, (void *)-1, lock->l_data,
500                                      lock->l_data_len);
501         } else
502                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
503
504         ldlm_lock_put(lock); /* matches the ldlm_lock_get in addref */
505         ldlm_lock_put(lock); /* matches the handle2lock above */
506
507         EXIT;
508 }
509
510 static int ldlm_lock_compat_list(struct ldlm_lock *lock, int send_cbs,
511                              struct list_head *queue)
512 {
513         struct list_head *tmp, *pos;
514         int rc = 1;
515
516         list_for_each_safe(tmp, pos, queue) {
517                 struct ldlm_lock *child;
518                 ldlm_res_compat compat;
519
520                 child = list_entry(tmp, struct ldlm_lock, l_res_link);
521                 if (lock == child)
522                         continue;
523
524                 compat = ldlm_res_compat_table[child->l_resource->lr_type];
525                 if (compat && compat(child, lock)) {
526                         CDEBUG(D_OTHER, "compat function succeded, next.\n");
527                         continue;
528                 }
529                 if (lockmode_compat(child->l_granted_mode, lock->l_req_mode)) {
530                         CDEBUG(D_OTHER, "lock modes are compatible, next.\n");
531                         continue;
532                 }
533
534                 rc = 0;
535
536                 if (send_cbs && child->l_blocking_ast != NULL) {
537                         CDEBUG(D_OTHER, "incompatible; sending blocking "
538                                "AST.\n");
539                         ldlm_add_ast_work_item(child, lock);
540                 }
541         }
542
543         return rc;
544 }
545
546 static int ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs)
547 {
548         int rc;
549         ENTRY;
550
551         l_lock(&lock->l_resource->lr_namespace->ns_lock);
552         rc = ldlm_lock_compat_list(lock, send_cbs,
553                                    &lock->l_resource->lr_granted);
554         /* FIXME: should we be sending ASTs to converting? */
555         if (rc)
556                 rc = ldlm_lock_compat_list
557                         (lock, send_cbs, &lock->l_resource->lr_converting);
558
559         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
560         RETURN(rc);
561 }
562
563 /* NOTE: called by
564    - ldlm_handle_enqueuque - resource
565 */
566 void ldlm_grant_lock(struct ldlm_lock *lock)
567 {
568         struct ldlm_resource *res = lock->l_resource;
569         ENTRY;
570
571         l_lock(&lock->l_resource->lr_namespace->ns_lock);
572         ldlm_resource_add_lock(res, &res->lr_granted, lock);
573         lock->l_granted_mode = lock->l_req_mode;
574
575         if (lock->l_granted_mode < res->lr_most_restr)
576                 res->lr_most_restr = lock->l_granted_mode;
577
578         if (lock->l_completion_ast) {
579                 ldlm_add_ast_work_item(lock, NULL);
580         }
581         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
582         EXIT;
583 }
584
585 static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
586                                       struct ldlm_extent *extent)
587 {
588         struct ldlm_lock *lock;
589         struct list_head *tmp;
590
591         list_for_each(tmp, queue) {
592                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
593
594                 if (lock->l_flags & LDLM_FL_CBPENDING)
595                         continue;
596
597                 /* lock_convert() takes the resource lock, so we're sure that
598                  * req_mode, lr_type, and l_cookie won't change beneath us */
599                 if (lock->l_req_mode != mode)
600                         continue;
601
602                 if (lock->l_resource->lr_type == LDLM_EXTENT &&
603                     (lock->l_extent.start > extent->start ||
604                      lock->l_extent.end < extent->end))
605                         continue;
606
607                 ldlm_lock_addref_internal(lock, mode);
608                 return lock;
609         }
610
611         return NULL;
612 }
613
614 /* Must be called with no resource or lock locks held.
615  *
616  * Returns 1 if it finds an already-existing lock that is compatible; in this
617  * case, lockh is filled in with a addref()ed lock */
618 int ldlm_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type,
619                     void *cookie, int cookielen, ldlm_mode_t mode,
620                     struct lustre_handle *lockh)
621 {
622         struct ldlm_resource *res;
623         struct ldlm_lock *lock;
624         int rc = 0;
625         ENTRY;
626
627         res = ldlm_resource_get(ns, NULL, res_id, type, 0);
628         if (res == NULL)
629                 RETURN(0);
630
631         ns = res->lr_namespace;
632         l_lock(&ns->ns_lock);
633
634         if ((lock = search_queue(&res->lr_granted, mode, cookie)))
635                 GOTO(out, rc = 1);
636         if ((lock = search_queue(&res->lr_converting, mode, cookie)))
637                 GOTO(out, rc = 1);
638         if ((lock = search_queue(&res->lr_waiting, mode, cookie)))
639                 GOTO(out, rc = 1);
640
641         EXIT;
642  out:
643         ldlm_resource_put(res);
644         l_unlock(&ns->ns_lock);
645
646         if (lock) {
647                 ldlm_lock2handle(lock, lockh);
648                 wait_event_interruptible(lock->l_waitq, lock->l_req_mode ==
649                                          lock->l_granted_mode);
650         }
651         if (rc)
652                 LDLM_DEBUG(lock, "matched");
653         else
654                 LDLM_DEBUG_NOLOCK("not matched");
655         return rc;
656 }
657
658 /*   Returns a referenced, lock */
659 struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
660                                    struct lustre_handle *parent_lock_handle,
661                                    __u64 *res_id, __u32 type,
662                                    ldlm_mode_t mode,
663                                    void *data,
664                                    __u32 data_len)
665 {
666         struct ldlm_resource *res, *parent_res = NULL;
667         struct ldlm_lock *lock, *parent_lock;
668
669         parent_lock = ldlm_handle2lock(parent_lock_handle);
670         if (parent_lock)
671                 parent_res = parent_lock->l_resource;
672
673         res = ldlm_resource_get(ns, parent_res, res_id, type, 1);
674         if (res == NULL)
675                 RETURN(NULL);
676
677         lock = ldlm_lock_new(parent_lock, res);
678         if (lock == NULL) {
679                 ldlm_resource_put(res);
680                 RETURN(NULL);
681         }
682
683         lock->l_req_mode = mode;
684         lock->l_data = data;
685         lock->l_data_len = data_len;
686
687         return lock;
688 }
689
690 /* Must be called with lock->l_lock and lock->l_resource->lr_lock not held */
691 ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock *lock,
692                                void *cookie, int cookie_len,
693                                int *flags,
694                                ldlm_lock_callback completion,
695                                ldlm_lock_callback blocking)
696 {
697         struct ldlm_resource *res;
698         int local;
699         ldlm_res_policy policy;
700         ENTRY;
701
702         res = lock->l_resource;
703         lock->l_blocking_ast = blocking;
704
705         if (res->lr_type == LDLM_EXTENT)
706                 memcpy(&lock->l_extent, cookie, sizeof(lock->l_extent));
707
708         /* policies are not executed on the client */
709         local = res->lr_namespace->ns_client;
710         if (!local && (policy = ldlm_res_policy_table[res->lr_type])) {
711                 int rc;
712                 rc = policy(lock, cookie, lock->l_req_mode, NULL);
713
714                 if (rc == ELDLM_LOCK_CHANGED) {
715                         res = lock->l_resource;
716                         *flags |= LDLM_FL_LOCK_CHANGED;
717                 } else if (rc == ELDLM_LOCK_ABORTED) {
718                         ldlm_lock_destroy(lock);
719                         RETURN(rc);
720                 }
721         }
722
723         lock->l_cookie = cookie;
724         lock->l_cookie_len = cookie_len;
725
726         if (local && lock->l_req_mode == lock->l_granted_mode) {
727                 /* The server returned a blocked lock, but it was granted before
728                  * we got a chance to actually enqueue it.  We don't need to do
729                  * anything else. */
730                 GOTO(out, ELDLM_OK);
731         }
732
733         /* If this is a local resource, put it on the appropriate list. */
734         /* FIXME: don't like this: can we call ldlm_resource_unlink_lock? */
735         list_del_init(&lock->l_res_link);
736         if (local) {
737                 if (*flags & LDLM_FL_BLOCK_CONV)
738                         ldlm_resource_add_lock(res, res->lr_converting.prev,
739                                                lock);
740                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
741                         ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
742                 else
743                         ldlm_grant_lock(lock);
744                 GOTO(out, ELDLM_OK);
745         }
746
747         /* FIXME: We may want to optimize by checking lr_most_restr */
748         if (!list_empty(&res->lr_converting)) {
749                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
750                 *flags |= LDLM_FL_BLOCK_CONV;
751                 GOTO(out, ELDLM_OK);
752         }
753         if (!list_empty(&res->lr_waiting)) {
754                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
755                 *flags |= LDLM_FL_BLOCK_WAIT;
756                 GOTO(out, ELDLM_OK);
757         }
758         if (!ldlm_lock_compat(lock,0)) {
759                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
760                 *flags |= LDLM_FL_BLOCK_GRANTED;
761                 GOTO(out, ELDLM_OK);
762         }
763
764         ldlm_grant_lock(lock);
765         EXIT;
766  out:
767         /* Don't set 'completion_ast' until here so that if the lock is granted
768          * immediately we don't do an unnecessary completion call. */
769         lock->l_completion_ast = completion;
770         return ELDLM_OK;
771 }
772
773 /* Must be called with namespace taken: queue is waiting or converting. */
774 static int ldlm_reprocess_queue(struct ldlm_resource *res,
775                                 struct list_head *queue)
776 {
777         struct list_head *tmp, *pos;
778         ENTRY;
779
780         list_for_each_safe(tmp, pos, queue) {
781                 struct ldlm_lock *pending;
782                 pending = list_entry(tmp, struct ldlm_lock, l_res_link);
783
784                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
785
786                 if (!ldlm_lock_compat(pending, 1))
787                         RETURN(1);
788
789                 list_del_init(&pending->l_res_link);
790                 ldlm_grant_lock(pending);
791         }
792
793         RETURN(0);
794 }
795
796 void ldlm_run_ast_work(struct list_head *rpc_list)
797 {
798         struct list_head *tmp, *pos;
799         int rc;
800         ENTRY;
801
802         list_for_each_safe(tmp, pos, rpc_list) {
803                 struct ldlm_ast_work *w =
804                         list_entry(tmp, struct ldlm_ast_work, w_list);
805                 struct lustre_handle lockh;
806
807                 ldlm_lock2handle(w->w_lock, &lockh);
808                 if (w->w_blocking)
809                         rc = w->w_lock->l_blocking_ast
810                                 (&lockh, &w->w_desc, w->w_data, w->w_datalen);
811                 else
812                         rc = w->w_lock->l_completion_ast
813                                 (&lockh, NULL, w->w_data, w->w_datalen);
814                 if (rc)
815                         CERROR("Failed AST - should clean & disconnect "
816                                "client\n");
817                 ldlm_lock_put(w->w_lock);
818                 list_del(&w->w_list);
819                 OBD_FREE(w, sizeof(*w));
820         }
821         EXIT;
822 }
823
824 /* Must be called with resource->lr_lock not taken. */
825 void ldlm_reprocess_all(struct ldlm_resource *res)
826 {
827         struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
828         ENTRY;
829
830         /* Local lock trees don't get reprocessed. */
831         if (res->lr_namespace->ns_client) {
832                 EXIT;
833                 return;
834         }
835
836         l_lock(&res->lr_namespace->ns_lock);
837         res->lr_tmp = &rpc_list;
838
839         ldlm_reprocess_queue(res, &res->lr_converting);
840         if (list_empty(&res->lr_converting))
841                 ldlm_reprocess_queue(res, &res->lr_waiting);
842
843         res->lr_tmp = NULL;
844         l_unlock(&res->lr_namespace->ns_lock);
845
846         ldlm_run_ast_work(&rpc_list);
847         EXIT;
848 }
849
850 /* Must be called with lock and lock->l_resource unlocked */
851 void ldlm_lock_cancel(struct ldlm_lock *lock)
852 {
853         struct ldlm_resource *res;
854         struct ldlm_namespace *ns;
855         ENTRY;
856
857         res = lock->l_resource;
858         ns = res->lr_namespace;
859
860         l_lock(&ns->ns_lock);
861         if (lock->l_readers || lock->l_writers)
862                 CDEBUG(D_INFO, "lock still has references (%d readers, %d "
863                        "writers)\n", lock->l_readers, lock->l_writers);
864
865         ldlm_resource_unlink_lock(lock);
866         ldlm_lock_destroy(lock);
867         l_unlock(&ns->ns_lock);
868 }
869
870 /* Must be called with lock and lock->l_resource unlocked */
871 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
872                                         int *flags)
873 {
874         struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
875         struct ldlm_resource *res;
876         struct ldlm_namespace *ns;
877         int granted = 0;
878         ENTRY;
879
880         res = lock->l_resource;
881         ns = res->lr_namespace;
882
883         l_lock(&ns->ns_lock);
884
885         lock->l_req_mode = new_mode;
886         ldlm_resource_unlink_lock(lock);
887
888         /* If this is a local resource, put it on the appropriate list. */
889         if (res->lr_namespace->ns_client) {
890                 if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED))
891                         ldlm_resource_add_lock(res, res->lr_converting.prev,
892                                                lock);
893                 else {
894                         res->lr_tmp = &rpc_list;
895                         ldlm_grant_lock(lock);
896                         res->lr_tmp = NULL;
897                         granted = 1;
898                         /* FIXME: completion handling not with ns_lock held ! */
899                         wake_up(&lock->l_waitq);
900                 }
901         } else
902                 list_add(&lock->l_res_link, res->lr_converting.prev);
903
904         l_unlock(&ns->ns_lock);
905
906         if (granted)
907                 ldlm_run_ast_work(&rpc_list);
908         RETURN(res);
909 }
910
911 void ldlm_lock_dump(struct ldlm_lock *lock)
912 {
913         char ver[128];
914
915         if (!(portal_debug & D_OTHER))
916                 return;
917
918         if (RES_VERSION_SIZE != 4)
919                 LBUG();
920
921         if (!lock) {
922                 CDEBUG(D_OTHER, "  NULL LDLM lock\n");
923                 return;
924         }
925
926         snprintf(ver, sizeof(ver), "%x %x %x %x",
927                  lock->l_version[0], lock->l_version[1],
928                  lock->l_version[2], lock->l_version[3]);
929
930         CDEBUG(D_OTHER, "  -- Lock dump: %p (%s)\n", lock, ver);
931         CDEBUG(D_OTHER, "  Parent: %p\n", lock->l_parent);
932         CDEBUG(D_OTHER, "  Resource: %p (%Ld)\n", lock->l_resource,
933                lock->l_resource->lr_name[0]);
934         CDEBUG(D_OTHER, "  Requested mode: %d, granted mode: %d\n",
935                (int)lock->l_req_mode, (int)lock->l_granted_mode);
936         CDEBUG(D_OTHER, "  Readers: %u ; Writers; %u\n",
937                lock->l_readers, lock->l_writers);
938         if (lock->l_resource->lr_type == LDLM_EXTENT)
939                 CDEBUG(D_OTHER, "  Extent: %Lu -> %Lu\n",
940                        (unsigned long long)lock->l_extent.start,
941                        (unsigned long long)lock->l_extent.end);
942 }