Whamcloud - gitweb
- updated documentation (introduction)
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  * authors, Peter Braam <braam@clusterfs.com> & 
11  * Phil Schwan <phil@clusterfs.com>
12  */
13
14 #define EXPORT_SYMTAB
15
16 #include <linux/version.h>
17 #include <linux/module.h>
18 #include <linux/slab.h>
19 #include <asm/unistd.h>
20
21 #define DEBUG_SUBSYSTEM S_LDLM
22
23 #include <linux/obd_support.h>
24 #include <linux/obd_class.h>
25
26 #include <linux/lustre_dlm.h>
27
28 extern kmem_cache_t *ldlm_lock_slab;
29
30 static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
31                                        struct ldlm_resource *resource,
32                                        ldlm_mode_t mode)
33 {
34         struct ldlm_lock *lock;
35
36         lock = kmem_cache_alloc(ldlm_lock_slab, SLAB_KERNEL);
37         if (lock == NULL)
38                 BUG();
39
40         memset(lock, 0, sizeof(*lock));
41         lock->l_resource = resource;
42         lock->l_req_mode = mode;
43         INIT_LIST_HEAD(&lock->l_children);
44
45         if (parent != NULL) {
46                 lock->l_parent = parent;
47                 list_add(&lock->l_childof, &parent->l_children);
48         }
49
50         return lock;
51 }
52
53 static int ldlm_notify_incompatible(struct list_head *list,
54                                     struct ldlm_lock *new)
55 {
56         struct list_head *tmp;
57         int rc = 0;
58
59         list_for_each(tmp, list) {
60                 struct ldlm_lock *lock;
61                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
62                 if (lockmode_compat(lock->l_req_mode, new->l_req_mode))
63                         continue;
64
65                 rc = 1;
66
67                 if (lock->l_resource->lr_blocking != NULL)
68                         lock->l_resource->lr_blocking(lock, new);
69         }
70
71         return rc;
72 }
73
74
75 static int ldlm_reprocess_queue(struct list_head *queue, 
76                                 struct list_head *granted_list)
77 {
78         struct list_head *tmp1, *tmp2;
79         struct ldlm_resource *res;
80         int rc = 0;
81
82         list_for_each(tmp1, queue) { 
83                 struct ldlm_lock *pending;
84                 rc = 0; 
85                 pending = list_entry(tmp1, struct ldlm_lock, l_res_link);
86
87                 /* check if pending can go in ... */ 
88                 list_for_each(tmp2, granted_list) {
89                         struct ldlm_lock *lock;
90                         lock = list_entry(tmp2, struct ldlm_lock, l_res_link);
91                         if (lockmode_compat(lock->l_granted_mode, 
92                                             pending->l_req_mode))
93                                 continue;
94                         else { 
95                                 /* no, we are done */
96                                 rc = 1;
97                                 break;
98                         }
99                 }
100
101                 if (rc) { 
102                         /* no - we are done */
103                         break;
104                 }
105
106                 res = pending->l_resource;
107                 list_del(&pending->l_res_link); 
108                 list_add(&pending->l_res_link, &res->lr_granted);
109                 pending->l_granted_mode = pending->l_req_mode;
110
111                 if (pending->l_granted_mode < res->lr_most_restr)
112                         res->lr_most_restr = pending->l_granted_mode;
113
114                 /* XXX call completion here */ 
115                 
116
117         }
118
119         return rc;
120 }
121
122 ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev, __u32 ns_id,
123                                      struct ldlm_resource *parent_res,
124                                      struct ldlm_lock *parent_lock,
125                                      __u32 *res_id, ldlm_mode_t mode, 
126                                      struct ldlm_handle *lockh)
127 {
128         struct ldlm_namespace *ns;
129         struct ldlm_resource *res;
130         struct ldlm_lock *lock;
131         int incompat, rc;
132
133         ENTRY;
134
135         ns = ldlm_namespace_find(obddev, ns_id);
136         if (ns == NULL || ns->ns_hash == NULL)
137                 BUG();
138
139         res = ldlm_resource_get(ns, parent_res, res_id, 1);
140         if (res == NULL)
141                 BUG();
142
143         lock = ldlm_lock_new(parent_lock, res, mode);
144         if (lock == NULL)
145                 BUG();
146
147         lockh->addr = (__u64)(unsigned long)lock;
148         spin_lock(&res->lr_lock);
149
150         /* FIXME: We may want to optimize by checking lr_most_restr */
151
152         if (!list_empty(&res->lr_converting)) {
153                 list_add(&lock->l_res_link, res->lr_waiting.prev);
154                 rc = ELDLM_BLOCK_CONV;
155                 GOTO(out, rc);
156         }
157         if (!list_empty(&res->lr_waiting)) {
158                 list_add(&lock->l_res_link, res->lr_waiting.prev);
159                 rc = ELDLM_BLOCK_WAIT;
160                 GOTO(out, rc);
161         }
162         incompat = ldlm_notify_incompatible(&res->lr_granted, lock);
163         if (incompat) {
164                 list_add(&lock->l_res_link, res->lr_waiting.prev);
165                 rc = ELDLM_BLOCK_GRANTED;
166                 GOTO(out, rc);
167         }
168
169         list_add(&lock->l_res_link, &res->lr_granted);
170         lock->l_granted_mode = mode;
171         if (mode < res->lr_most_restr)
172                 res->lr_most_restr = mode;
173
174         /* XXX call the completion call back function */ 
175
176         rc = ELDLM_OK;
177         GOTO(out, rc);
178
179  out:
180         spin_unlock(&res->lr_lock);
181         return rc;
182 }
183
184 ldlm_error_t ldlm_local_lock_cancel(struct obd_device *obddev, 
185                                      struct ldlm_handle *lockh)
186 {
187         struct ldlm_lock *lock;
188         struct ldlm_resource *res = lock->l_resource;
189         ENTRY;
190
191         lock = (struct ldlm_lock *)(unsigned long)lockh->addr;
192         list_del(&lock->l_res_link); 
193
194         kmem_cache_free(ldlm_lock_slab, lock); 
195         if (ldlm_resource_put(lock->l_resource)) {
196                 EXIT;
197                 return 0;
198         }
199         
200         ldlm_reprocess_queue(&res->lr_converting, &res->lr_granted); 
201         if (list_empty(&res->lr_converting))
202                 ldlm_reprocess_queue(&res->lr_waiting, &res->lr_granted); 
203         
204         return 0;
205 }
206
207 void ldlm_lock_dump(struct ldlm_lock *lock)
208 {
209         char ver[128];
210
211         if (RES_VERSION_SIZE != 4)
212                 BUG();
213
214         snprintf(ver, sizeof(ver), "%x %x %x %x",
215                  lock->l_version[0], lock->l_version[1],
216                  lock->l_version[2], lock->l_version[3]);
217
218         CDEBUG(D_OTHER, "  -- Lock dump: %p (%s)\n", lock, ver);
219         CDEBUG(D_OTHER, "  Parent: %p\n", lock->l_parent);
220         CDEBUG(D_OTHER, "  Requested mode: %d, granted mode: %d\n",
221                (int)lock->l_req_mode, (int)lock->l_granted_mode);
222 }