Whamcloud - gitweb
- Quiet the FIXMEs from CERRORs to CDEBUGs, because they were reducing LLNL
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  * authors, Peter Braam <braam@clusterfs.com> &
11  * Phil Schwan <phil@clusterfs.com>
12  */
13
14 #define DEBUG_SUBSYSTEM S_LDLM
15
16 #include <linux/slab.h>
17 #include <linux/module.h>
18 #include <linux/random.h>
19 #include <linux/lustre_dlm.h>
20 #include <linux/lustre_mds.h>
21
22 /* lock types */
23 char *ldlm_lockname[] = {
24         [0]      "--",
25         [LCK_EX] "EX",
26         [LCK_PW] "PW",
27         [LCK_PR] "PR",
28         [LCK_CW] "CW",
29         [LCK_CR] "CR",
30         [LCK_NL] "NL"
31 };
32 char *ldlm_typename[] = {
33         [LDLM_PLAIN]     "PLN",
34         [LDLM_EXTENT]    "EXT",
35         [LDLM_MDSINTENT] "INT"
36 };
37
38 extern kmem_cache_t *ldlm_lock_slab;
39 int (*mds_reint_p)(int offset, struct ptlrpc_request *req) = NULL;
40 int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req) = NULL;
41
42 static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b);
43 static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
44                               ldlm_mode_t mode, void *data);
45
46 ldlm_res_compat ldlm_res_compat_table [] = {
47         [LDLM_PLAIN] ldlm_plain_compat,
48         [LDLM_EXTENT] ldlm_extent_compat,
49         [LDLM_MDSINTENT] ldlm_plain_compat
50 };
51
52 ldlm_res_policy ldlm_res_policy_table [] = {
53         [LDLM_PLAIN] NULL,
54         [LDLM_EXTENT] ldlm_extent_policy,
55         [LDLM_MDSINTENT] ldlm_intent_policy
56 };
57
58
59 /*
60  * REFCOUNTED LOCK OBJECTS
61  */
62
63
64 /*
65  * Lock refcounts, during creation:
66  *   - one special one for allocation, dec'd only once in destroy
67  *   - one for being a lock that's in-use
68  *   - one for the addref associated with a new lock
69  */
70 struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
71 {
72         l_lock(&lock->l_resource->lr_namespace->ns_lock);
73         lock->l_refc++;
74         ldlm_resource_getref(lock->l_resource);
75         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
76         return lock;
77 }
78
79 void ldlm_lock_put(struct ldlm_lock *lock)
80 {
81         struct lustre_lock *nslock = &lock->l_resource->lr_namespace->ns_lock;
82         ENTRY;
83
84         l_lock(nslock);
85         lock->l_refc--;
86         LDLM_DEBUG(lock, "after refc--");
87         if (lock->l_refc < 0)
88                 LBUG();
89
90         ldlm_resource_put(lock->l_resource);
91         if (lock->l_parent)
92                 ldlm_lock_put(lock->l_parent);
93
94         if (lock->l_refc == 0 && (lock->l_flags & LDLM_FL_DESTROYED)) {
95                 if (lock->l_connection)
96                         ptlrpc_put_connection(lock->l_connection);
97                 CDEBUG(D_MALLOC, "kfreed 'lock': %d at %p (tot 1).\n",
98                        sizeof(*lock), lock);
99                 kmem_cache_free(ldlm_lock_slab, lock);
100         }
101         l_unlock(nslock);
102         EXIT;
103 }
104
105 void ldlm_lock_destroy(struct ldlm_lock *lock)
106 {
107         ENTRY;
108         l_lock(&lock->l_resource->lr_namespace->ns_lock);
109
110         if (!list_empty(&lock->l_children)) {
111                 LDLM_DEBUG(lock, "still has children (%p)!",
112                            lock->l_children.next);
113                 ldlm_lock_dump(lock);
114                 LBUG();
115         }
116         if (lock->l_readers || lock->l_writers) {
117                 LDLM_DEBUG(lock, "lock still has references");
118                 ldlm_lock_dump(lock);
119                 LBUG();
120         }
121
122         if (!list_empty(&lock->l_res_link)) {
123                 ldlm_lock_dump(lock);
124                 LBUG();
125         }
126
127         if (lock->l_flags & LDLM_FL_DESTROYED) {
128                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
129                 EXIT;
130                 return;
131         }
132
133         lock->l_flags = LDLM_FL_DESTROYED;
134         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
135         ldlm_lock_put(lock);
136         EXIT;
137 }
138
139 /*
140    usage: pass in a resource on which you have done get
141           pass in a parent lock on which you have done a get
142           do not put the resource or the parent
143    returns: lock with refcount 1
144 */
145 static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
146                                        struct ldlm_resource *resource)
147 {
148         struct ldlm_lock *lock;
149         ENTRY;
150
151         if (resource == NULL)
152                 LBUG();
153
154         lock = kmem_cache_alloc(ldlm_lock_slab, SLAB_KERNEL);
155         if (lock == NULL)
156                 RETURN(NULL);
157         CDEBUG(D_MALLOC, "kmalloced 'lock': %d at "
158                "%p (tot %d).\n", sizeof(*lock), lock, 1);
159
160         memset(lock, 0, sizeof(*lock));
161         get_random_bytes(&lock->l_random, sizeof(__u64));
162
163         lock->l_resource = resource;
164         /* this refcount matches the one of the resource passed
165            in which is not being put away */
166         lock->l_refc = 1;
167         INIT_LIST_HEAD(&lock->l_children);
168         INIT_LIST_HEAD(&lock->l_res_link);
169         init_waitqueue_head(&lock->l_waitq);
170
171         if (parent != NULL) {
172                 l_lock(&parent->l_resource->lr_namespace->ns_lock);
173                 lock->l_parent = parent;
174                 list_add(&lock->l_childof, &parent->l_children);
175                 l_unlock(&parent->l_resource->lr_namespace->ns_lock);
176         }
177         /* this is the extra refcount, to prevent the lock
178            evaporating */
179         ldlm_lock_get(lock);
180         RETURN(lock);
181 }
182
183 int ldlm_lock_change_resource(struct ldlm_lock *lock, __u64 new_resid[3])
184 {
185         struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
186         struct ldlm_resource *oldres = lock->l_resource;
187         int type, i;
188         ENTRY;
189
190         l_lock(&ns->ns_lock);
191         type = lock->l_resource->lr_type;
192
193         lock->l_resource = ldlm_resource_get(ns, NULL, new_resid, type, 1);
194         if (lock->l_resource == NULL) {
195                 LBUG();
196                 RETURN(-ENOMEM);
197         }
198
199         /* move references over */
200         for (i = 0; i < lock->l_refc; i++) {
201                 int rc;
202                 ldlm_resource_getref(lock->l_resource);
203                 rc = ldlm_resource_put(oldres);
204                 if (rc == 1 && i != lock->l_refc - 1)
205                         LBUG();
206         }
207         /* compensate for the initial get above.. */
208         ldlm_resource_put(lock->l_resource);
209
210         l_unlock(&ns->ns_lock);
211         RETURN(0);
212 }
213
214 /*
215  *  HANDLES
216  */
217
218 void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh)
219 {
220         lockh->addr = (__u64)(unsigned long)lock;
221         lockh->cookie = lock->l_random;
222 }
223
224 struct ldlm_lock *ldlm_handle2lock(struct lustre_handle *handle)
225 {
226         struct ldlm_lock *lock = NULL;
227         ENTRY;
228
229         if (!handle || !handle->addr)
230                 RETURN(NULL);
231
232         lock = (struct ldlm_lock *)(unsigned long)(handle->addr);
233         if (!kmem_cache_validate(ldlm_lock_slab, (void *)lock))
234                 RETURN(NULL);
235
236         l_lock(&lock->l_resource->lr_namespace->ns_lock);
237         if (lock->l_random != handle->cookie)
238                 GOTO(out, handle = NULL);
239
240         if (lock->l_flags & LDLM_FL_DESTROYED)
241                 GOTO(out, handle = NULL);
242
243         ldlm_lock_get(lock);
244         EXIT;
245  out:
246         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
247         return lock;
248 }
249
250
251
252 static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
253                               ldlm_mode_t mode, void *data)
254 {
255         struct ptlrpc_request *req = req_cookie;
256         int rc = 0;
257         ENTRY;
258
259         if (!req_cookie)
260                 RETURN(0);
261
262         if (req->rq_reqmsg->bufcount > 1) {
263                 /* an intent needs to be considered */
264                 struct ldlm_intent *it = lustre_msg_buf(req->rq_reqmsg, 1);
265                 struct mds_body *mds_rep;
266                 struct ldlm_reply *rep;
267                 __u64 new_resid[3] = {0, 0, 0}, old_res;
268                 int bufcount = -1, rc, size[3] = {sizeof(struct ldlm_reply),
269                                                   sizeof(struct mds_body),
270                                                   sizeof(struct obdo)};
271
272                 it->opc = NTOH__u64(it->opc);
273
274                 LDLM_DEBUG(lock, "intent policy, opc: %Ld", it->opc);
275
276                 /* prepare reply */
277                 switch(it->opc) {
278                 case IT_GETATTR:
279                         /* Note that in the negative case you may be returning
280                          * a file and its obdo */
281                 case IT_CREAT:
282                 case IT_CREAT|IT_OPEN:
283                 case IT_MKDIR:
284                 case IT_SYMLINK:
285                 case IT_MKNOD:
286                 case IT_LINK:
287                 case IT_OPEN:
288                 case IT_RENAME:
289                         bufcount = 3;
290                         break;
291                 case IT_RENAME2:
292                         bufcount = 5;
293                         break;
294                 case IT_UNLINK:
295                         bufcount = 2;
296                         size[1] = sizeof(struct obdo);
297                         break;
298                 case IT_RMDIR:
299                         bufcount = 1;
300                         break;
301                 default:
302                         LBUG();
303                 }
304
305                 rc = lustre_pack_msg(bufcount, size, NULL, &req->rq_replen,
306                                      &req->rq_repmsg);
307                 if (rc) {
308                         rc = req->rq_status = -ENOMEM;
309                         RETURN(rc);
310                 }
311
312                 rep = lustre_msg_buf(req->rq_repmsg, 0);
313                 rep->lock_policy_res1 = 1;
314
315                 /* execute policy */
316                 switch (it->opc) {
317                 case IT_CREAT:
318                 case IT_CREAT|IT_OPEN:
319                 case IT_MKDIR:
320                 case IT_SETATTR:
321                 case IT_SYMLINK:
322                 case IT_MKNOD:
323                 case IT_LINK:
324                 case IT_UNLINK:
325                 case IT_RMDIR:
326                 case IT_RENAME2:
327                         if (mds_reint_p == NULL)
328                                 mds_reint_p =
329                                         inter_module_get_request
330                                         ("mds_reint", "mds");
331                         if (IS_ERR(mds_reint_p)) {
332                                 CERROR("MDSINTENT locks require the MDS "
333                                        "module.\n");
334                                 LBUG();
335                                 RETURN(-EINVAL);
336                         }
337                         rc = mds_reint_p(2, req);
338                         if (rc)
339                                 LBUG();
340                         break;
341                 case IT_GETATTR:
342                 case IT_READDIR:
343                 case IT_RENAME:
344                 case IT_OPEN:
345                         if (mds_getattr_name_p == NULL)
346                                 mds_getattr_name_p =
347                                         inter_module_get_request
348                                         ("mds_getattr_name", "mds");
349                         if (IS_ERR(mds_getattr_name_p)) {
350                                 CERROR("MDSINTENT locks require the MDS "
351                                        "module.\n");
352                                 LBUG();
353                                 RETURN(-EINVAL);
354                         }
355                         rc = mds_getattr_name_p(2, req);
356                         /* FIXME: we need to sit down and decide on who should
357                          * set req->rq_status, who should return negative and
358                          * positive return values, and what they all mean. */
359                         if (rc || req->rq_status != 0) {
360                                 mds_rep = lustre_msg_buf(req->rq_repmsg, 1);
361                                 rep->lock_policy_res2 = req->rq_status;
362                                 RETURN(ELDLM_LOCK_ABORTED);
363                         }
364                         break;
365                 case IT_READDIR|IT_OPEN:
366                         LBUG();
367                         break;
368                 default:
369                         CERROR("Unhandled intent\n");
370                         LBUG();
371                 }
372
373                 if (it->opc == IT_UNLINK || it->opc == IT_RMDIR || 
374                     it->opc == IT_RENAME || it->opc == IT_RENAME2)
375                         RETURN(ELDLM_LOCK_ABORTED);
376
377                 mds_rep = lustre_msg_buf(req->rq_repmsg, 1);
378                 rep->lock_policy_res2 = req->rq_status;
379                 new_resid[0] = NTOH__u32(mds_rep->ino);
380                 if (new_resid[0] == 0)
381                         LBUG();
382                 old_res = lock->l_resource->lr_name[0];
383
384                 CDEBUG(D_INFO, "remote intent: locking %d instead of"
385                        "%ld\n", mds_rep->ino, (long)old_res);
386
387                 ldlm_lock_change_resource(lock, new_resid);
388                 if (lock->l_resource == NULL) {
389                         LBUG();
390                         RETURN(-ENOMEM);
391                 }
392                 LDLM_DEBUG(lock, "intent policy, old res %ld",
393                            (long)old_res);
394                 RETURN(ELDLM_LOCK_CHANGED);
395         } else {
396                 int size = sizeof(struct ldlm_reply);
397                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
398                                      &req->rq_repmsg);
399                 if (rc) {
400                         CERROR("out of memory\n");
401                         LBUG();
402                         RETURN(-ENOMEM);
403                 }
404         }
405         RETURN(rc);
406 }
407
408 static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b)
409 {
410         return lockmode_compat(a->l_req_mode, b->l_req_mode);
411 }
412
413 void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc)
414 {
415         ldlm_res2desc(lock->l_resource, &desc->l_resource);
416         desc->l_req_mode = lock->l_req_mode;
417         desc->l_granted_mode = lock->l_granted_mode;
418         memcpy(&desc->l_extent, &lock->l_extent, sizeof(desc->l_extent));
419         memcpy(desc->l_version, lock->l_version, sizeof(desc->l_version));
420 }
421
422 static void ldlm_add_ast_work_item(struct ldlm_lock *lock,
423                                    struct ldlm_lock *new)
424 {
425         struct ldlm_ast_work *w;
426         ENTRY;
427
428         l_lock(&lock->l_resource->lr_namespace->ns_lock);
429         if (new && (lock->l_flags & LDLM_FL_AST_SENT))
430                 GOTO(out, 0);
431
432         OBD_ALLOC(w, sizeof(*w));
433         if (!w) {
434                 LBUG();
435                 return;
436         }
437
438         if (new) {
439                 lock->l_flags |= LDLM_FL_AST_SENT;
440                 w->w_blocking = 1;
441                 ldlm_lock2desc(new, &w->w_desc);
442         }
443
444         w->w_lock = ldlm_lock_get(lock);
445         list_add(&w->w_list, lock->l_resource->lr_tmp);
446  out:
447         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
448         return;
449 }
450
451 void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
452 {
453         struct ldlm_lock *lock;
454
455         lock = ldlm_handle2lock(lockh);
456         ldlm_lock_addref_internal(lock, mode);
457         ldlm_lock_put(lock);
458 }
459
460 /* only called for local locks */
461 void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
462 {
463         l_lock(&lock->l_resource->lr_namespace->ns_lock);
464         if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
465                 lock->l_readers++;
466         else
467                 lock->l_writers++;
468         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
469         ldlm_lock_get(lock);
470         LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
471 }
472
473 /* Args: unlocked lock */
474 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
475 {
476         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
477         ENTRY;
478
479         if (lock == NULL)
480                 LBUG();
481
482         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
483         l_lock(&lock->l_resource->lr_namespace->ns_lock);
484         if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
485                 lock->l_readers--;
486         else
487                 lock->l_writers--;
488
489         /* If we received a blocked AST and this was the last reference,
490          * run the callback. */
491         if (!lock->l_readers && !lock->l_writers &&
492             (lock->l_flags & LDLM_FL_CBPENDING)) {
493                 struct lustre_handle lockh;
494
495                 if (!lock->l_resource->lr_namespace->ns_client) {
496                         CERROR("LDLM_FL_CBPENDING set on non-local lock!\n");
497                         LBUG();
498                 }
499
500                 LDLM_DEBUG(lock, "final decref done on cbpending lock");
501                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
502
503                 ldlm_lock2handle(lock, &lockh);
504                 /* FIXME: -1 is a really, really bad 'desc' */
505                 lock->l_blocking_ast(&lockh, (void *)-1, lock->l_data,
506                                      lock->l_data_len);
507         } else
508                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
509
510         ldlm_lock_put(lock); /* matches the ldlm_lock_get in addref */
511         ldlm_lock_put(lock); /* matches the handle2lock above */
512
513         EXIT;
514 }
515
516 static int ldlm_lock_compat_list(struct ldlm_lock *lock, int send_cbs,
517                              struct list_head *queue)
518 {
519         struct list_head *tmp, *pos;
520         int rc = 1;
521
522         list_for_each_safe(tmp, pos, queue) {
523                 struct ldlm_lock *child;
524                 ldlm_res_compat compat;
525
526                 child = list_entry(tmp, struct ldlm_lock, l_res_link);
527                 if (lock == child)
528                         continue;
529
530                 compat = ldlm_res_compat_table[child->l_resource->lr_type];
531                 if (compat && compat(child, lock)) {
532                         CDEBUG(D_OTHER, "compat function succeded, next.\n");
533                         continue;
534                 }
535                 if (lockmode_compat(child->l_granted_mode, lock->l_req_mode)) {
536                         CDEBUG(D_OTHER, "lock modes are compatible, next.\n");
537                         continue;
538                 }
539
540                 rc = 0;
541
542                 if (send_cbs && child->l_blocking_ast != NULL) {
543                         CDEBUG(D_OTHER, "incompatible; sending blocking "
544                                "AST.\n");
545                         ldlm_add_ast_work_item(child, lock);
546                 }
547         }
548
549         return rc;
550 }
551
552 static int ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs)
553 {
554         int rc;
555         ENTRY;
556
557         l_lock(&lock->l_resource->lr_namespace->ns_lock);
558         rc = ldlm_lock_compat_list(lock, send_cbs,
559                                    &lock->l_resource->lr_granted);
560         /* FIXME: should we be sending ASTs to converting? */
561         if (rc)
562                 rc = ldlm_lock_compat_list
563                         (lock, send_cbs, &lock->l_resource->lr_converting);
564
565         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
566         RETURN(rc);
567 }
568
569 /* NOTE: called by
570    - ldlm_handle_enqueuque - resource
571 */
572 void ldlm_grant_lock(struct ldlm_lock *lock)
573 {
574         struct ldlm_resource *res = lock->l_resource;
575         ENTRY;
576
577         l_lock(&lock->l_resource->lr_namespace->ns_lock);
578         ldlm_resource_add_lock(res, &res->lr_granted, lock);
579         lock->l_granted_mode = lock->l_req_mode;
580
581         if (lock->l_granted_mode < res->lr_most_restr)
582                 res->lr_most_restr = lock->l_granted_mode;
583
584         if (lock->l_completion_ast) {
585                 ldlm_add_ast_work_item(lock, NULL);
586         }
587         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
588         EXIT;
589 }
590
591 static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
592                                       struct ldlm_extent *extent)
593 {
594         struct ldlm_lock *lock;
595         struct list_head *tmp;
596
597         list_for_each(tmp, queue) {
598                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
599
600                 if (lock->l_flags & LDLM_FL_CBPENDING)
601                         continue;
602
603                 /* lock_convert() takes the resource lock, so we're sure that
604                  * req_mode, lr_type, and l_cookie won't change beneath us */
605                 if (lock->l_req_mode != mode)
606                         continue;
607
608                 if (lock->l_resource->lr_type == LDLM_EXTENT &&
609                     (lock->l_extent.start > extent->start ||
610                      lock->l_extent.end < extent->end))
611                         continue;
612
613                 ldlm_lock_addref_internal(lock, mode);
614                 return lock;
615         }
616
617         return NULL;
618 }
619
620 /* Must be called with no resource or lock locks held.
621  *
622  * Returns 1 if it finds an already-existing lock that is compatible; in this
623  * case, lockh is filled in with a addref()ed lock */
624 int ldlm_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type,
625                     void *cookie, int cookielen, ldlm_mode_t mode,
626                     struct lustre_handle *lockh)
627 {
628         struct ldlm_resource *res;
629         struct ldlm_lock *lock;
630         int rc = 0;
631         ENTRY;
632
633         res = ldlm_resource_get(ns, NULL, res_id, type, 0);
634         if (res == NULL)
635                 RETURN(0);
636
637         ns = res->lr_namespace;
638         l_lock(&ns->ns_lock);
639
640         if ((lock = search_queue(&res->lr_granted, mode, cookie)))
641                 GOTO(out, rc = 1);
642         if ((lock = search_queue(&res->lr_converting, mode, cookie)))
643                 GOTO(out, rc = 1);
644         if ((lock = search_queue(&res->lr_waiting, mode, cookie)))
645                 GOTO(out, rc = 1);
646
647         EXIT;
648  out:
649         ldlm_resource_put(res);
650         l_unlock(&ns->ns_lock);
651
652         if (lock) {
653                 ldlm_lock2handle(lock, lockh);
654                 wait_event_interruptible(lock->l_waitq, lock->l_req_mode ==
655                                          lock->l_granted_mode);
656         }
657         if (rc)
658                 LDLM_DEBUG(lock, "matched");
659         else
660                 LDLM_DEBUG_NOLOCK("not matched");
661         return rc;
662 }
663
664 /*   Returns a referenced, lock */
665 struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
666                                    struct lustre_handle *parent_lock_handle,
667                                    __u64 *res_id, __u32 type,
668                                    ldlm_mode_t mode,
669                                    void *data,
670                                    __u32 data_len)
671 {
672         struct ldlm_resource *res, *parent_res = NULL;
673         struct ldlm_lock *lock, *parent_lock;
674
675         parent_lock = ldlm_handle2lock(parent_lock_handle);
676         if (parent_lock)
677                 parent_res = parent_lock->l_resource;
678
679         res = ldlm_resource_get(ns, parent_res, res_id, type, 1);
680         if (res == NULL)
681                 RETURN(NULL);
682
683         lock = ldlm_lock_new(parent_lock, res);
684         if (lock == NULL) {
685                 ldlm_resource_put(res);
686                 RETURN(NULL);
687         }
688
689         lock->l_req_mode = mode;
690         lock->l_data = data;
691         lock->l_data_len = data_len;
692
693         return lock;
694 }
695
696 /* Must be called with lock->l_lock and lock->l_resource->lr_lock not held */
697 ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock *lock,
698                                void *cookie, int cookie_len,
699                                int *flags,
700                                ldlm_lock_callback completion,
701                                ldlm_lock_callback blocking)
702 {
703         struct ldlm_resource *res;
704         int local;
705         ldlm_res_policy policy;
706         ENTRY;
707
708         res = lock->l_resource;
709         lock->l_blocking_ast = blocking;
710
711         if (res->lr_type == LDLM_EXTENT)
712                 memcpy(&lock->l_extent, cookie, sizeof(lock->l_extent));
713
714         /* policies are not executed on the client */
715         local = res->lr_namespace->ns_client;
716         if (!local && (policy = ldlm_res_policy_table[res->lr_type])) {
717                 int rc;
718                 rc = policy(lock, cookie, lock->l_req_mode, NULL);
719
720                 if (rc == ELDLM_LOCK_CHANGED) {
721                         res = lock->l_resource;
722                         *flags |= LDLM_FL_LOCK_CHANGED;
723                 } else if (rc == ELDLM_LOCK_ABORTED) {
724                         ldlm_lock_destroy(lock);
725                         RETURN(rc);
726                 }
727         }
728
729         lock->l_cookie = cookie;
730         lock->l_cookie_len = cookie_len;
731
732         if (local && lock->l_req_mode == lock->l_granted_mode) {
733                 /* The server returned a blocked lock, but it was granted before
734                  * we got a chance to actually enqueue it.  We don't need to do
735                  * anything else. */
736                 GOTO(out, ELDLM_OK);
737         }
738
739         /* If this is a local resource, put it on the appropriate list. */
740         /* FIXME: don't like this: can we call ldlm_resource_unlink_lock? */
741         list_del_init(&lock->l_res_link);
742         if (local) {
743                 if (*flags & LDLM_FL_BLOCK_CONV)
744                         ldlm_resource_add_lock(res, res->lr_converting.prev,
745                                                lock);
746                 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))
747                         ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
748                 else
749                         ldlm_grant_lock(lock);
750                 GOTO(out, ELDLM_OK);
751         }
752
753         /* FIXME: We may want to optimize by checking lr_most_restr */
754         if (!list_empty(&res->lr_converting)) {
755                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
756                 *flags |= LDLM_FL_BLOCK_CONV;
757                 GOTO(out, ELDLM_OK);
758         }
759         if (!list_empty(&res->lr_waiting)) {
760                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
761                 *flags |= LDLM_FL_BLOCK_WAIT;
762                 GOTO(out, ELDLM_OK);
763         }
764         if (!ldlm_lock_compat(lock,0)) {
765                 ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
766                 *flags |= LDLM_FL_BLOCK_GRANTED;
767                 GOTO(out, ELDLM_OK);
768         }
769
770         ldlm_grant_lock(lock);
771         EXIT;
772  out:
773         /* Don't set 'completion_ast' until here so that if the lock is granted
774          * immediately we don't do an unnecessary completion call. */
775         lock->l_completion_ast = completion;
776         return ELDLM_OK;
777 }
778
779 /* Must be called with namespace taken: queue is waiting or converting. */
780 static int ldlm_reprocess_queue(struct ldlm_resource *res,
781                                 struct list_head *queue)
782 {
783         struct list_head *tmp, *pos;
784         ENTRY;
785
786         list_for_each_safe(tmp, pos, queue) {
787                 struct ldlm_lock *pending;
788                 pending = list_entry(tmp, struct ldlm_lock, l_res_link);
789
790                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
791
792                 if (!ldlm_lock_compat(pending, 1))
793                         RETURN(1);
794
795                 list_del_init(&pending->l_res_link);
796                 ldlm_grant_lock(pending);
797         }
798
799         RETURN(0);
800 }
801
802 void ldlm_run_ast_work(struct list_head *rpc_list)
803 {
804         struct list_head *tmp, *pos;
805         int rc;
806         ENTRY;
807
808         list_for_each_safe(tmp, pos, rpc_list) {
809                 struct ldlm_ast_work *w =
810                         list_entry(tmp, struct ldlm_ast_work, w_list);
811                 struct lustre_handle lockh;
812
813                 ldlm_lock2handle(w->w_lock, &lockh);
814                 if (w->w_blocking)
815                         rc = w->w_lock->l_blocking_ast
816                                 (&lockh, &w->w_desc, w->w_data, w->w_datalen);
817                 else
818                         rc = w->w_lock->l_completion_ast
819                                 (&lockh, NULL, w->w_data, w->w_datalen);
820                 if (rc)
821                         CERROR("Failed AST - should clean & disconnect "
822                                "client\n");
823                 ldlm_lock_put(w->w_lock);
824                 list_del(&w->w_list);
825                 OBD_FREE(w, sizeof(*w));
826         }
827         EXIT;
828 }
829
830 /* Must be called with resource->lr_lock not taken. */
831 void ldlm_reprocess_all(struct ldlm_resource *res)
832 {
833         struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
834         ENTRY;
835
836         /* Local lock trees don't get reprocessed. */
837         if (res->lr_namespace->ns_client) {
838                 EXIT;
839                 return;
840         }
841
842         l_lock(&res->lr_namespace->ns_lock);
843         res->lr_tmp = &rpc_list;
844
845         ldlm_reprocess_queue(res, &res->lr_converting);
846         if (list_empty(&res->lr_converting))
847                 ldlm_reprocess_queue(res, &res->lr_waiting);
848
849         res->lr_tmp = NULL;
850         l_unlock(&res->lr_namespace->ns_lock);
851
852         ldlm_run_ast_work(&rpc_list);
853         EXIT;
854 }
855
856 /* Must be called with lock and lock->l_resource unlocked */
857 void ldlm_lock_cancel(struct ldlm_lock *lock)
858 {
859         struct ldlm_resource *res;
860         struct ldlm_namespace *ns;
861         ENTRY;
862
863         res = lock->l_resource;
864         ns = res->lr_namespace;
865
866         l_lock(&ns->ns_lock);
867         if (lock->l_readers || lock->l_writers)
868                 CDEBUG(D_INFO, "lock still has references (%d readers, %d "
869                        "writers)\n", lock->l_readers, lock->l_writers);
870
871         ldlm_resource_unlink_lock(lock);
872         ldlm_lock_destroy(lock);
873         l_unlock(&ns->ns_lock);
874 }
875
876 /* Must be called with lock and lock->l_resource unlocked */
877 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
878                                         int *flags)
879 {
880         struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
881         struct ldlm_resource *res;
882         struct ldlm_namespace *ns;
883         int granted = 0;
884         ENTRY;
885
886         res = lock->l_resource;
887         ns = res->lr_namespace;
888
889         l_lock(&ns->ns_lock);
890
891         lock->l_req_mode = new_mode;
892         ldlm_resource_unlink_lock(lock);
893
894         /* If this is a local resource, put it on the appropriate list. */
895         if (res->lr_namespace->ns_client) {
896                 if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED))
897                         ldlm_resource_add_lock(res, res->lr_converting.prev,
898                                                lock);
899                 else {
900                         res->lr_tmp = &rpc_list;
901                         ldlm_grant_lock(lock);
902                         res->lr_tmp = NULL;
903                         granted = 1;
904                         /* FIXME: completion handling not with ns_lock held ! */
905                         wake_up(&lock->l_waitq);
906                 }
907         } else
908                 list_add(&lock->l_res_link, res->lr_converting.prev);
909
910         l_unlock(&ns->ns_lock);
911
912         if (granted)
913                 ldlm_run_ast_work(&rpc_list);
914         RETURN(res);
915 }
916
917 void ldlm_lock_dump(struct ldlm_lock *lock)
918 {
919         char ver[128];
920
921         if (!(portal_debug & D_OTHER))
922                 return;
923
924         if (RES_VERSION_SIZE != 4)
925                 LBUG();
926
927         if (!lock) {
928                 CDEBUG(D_OTHER, "  NULL LDLM lock\n");
929                 return;
930         }
931
932         snprintf(ver, sizeof(ver), "%x %x %x %x",
933                  lock->l_version[0], lock->l_version[1],
934                  lock->l_version[2], lock->l_version[3]);
935
936         CDEBUG(D_OTHER, "  -- Lock dump: %p (%s)\n", lock, ver);
937         CDEBUG(D_OTHER, "  Parent: %p\n", lock->l_parent);
938         CDEBUG(D_OTHER, "  Resource: %p (%Ld)\n", lock->l_resource,
939                lock->l_resource->lr_name[0]);
940         CDEBUG(D_OTHER, "  Requested mode: %d, granted mode: %d\n",
941                (int)lock->l_req_mode, (int)lock->l_granted_mode);
942         CDEBUG(D_OTHER, "  Readers: %u ; Writers; %u\n",
943                lock->l_readers, lock->l_writers);
944         if (lock->l_resource->lr_type == LDLM_EXTENT)
945                 CDEBUG(D_OTHER, "  Extent: %Lu -> %Lu\n",
946                        (unsigned long long)lock->l_extent.start,
947                        (unsigned long long)lock->l_extent.end);
948 }