Whamcloud - gitweb
b2359eb62187b80b62b703fe465aba513e9db510
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  */
11
12 #define EXPORT_SYMTAB
13
14 #include <linux/version.h>
15 #include <linux/module.h>
16 #include <linux/slab.h>
17 #include <asm/unistd.h>
18
19 #define DEBUG_SUBSYSTEM S_LDLM
20
21 #include <linux/obd_class.h>
22 #include <linux/lustre_dlm.h>
23 #include <linux/lustre_net.h>
24
25 extern kmem_cache_t *ldlm_resource_slab;
26 extern kmem_cache_t *ldlm_lock_slab;
27
28 static int ldlm_client_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
29                                 void *data, __u32 data_len)
30 {
31         LBUG();
32         return 0;
33 }
34
35 static int ldlm_enqueue(struct ptlrpc_request *req)
36 {
37         struct ldlm_reply *dlm_rep;
38         struct ldlm_request *dlm_req;
39         ldlm_error_t err;
40         int rc, bufsize = sizeof(*dlm_rep);
41         
42         rc = lustre_pack_msg(1, &bufsize, NULL, &req->rq_replen,
43                              &req->rq_repbuf);
44         if (rc) {
45                 CERROR("out of memory\n");
46                 req->rq_status = -ENOMEM;
47                 RETURN(0);
48         }
49         req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
50         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
51         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
52
53         err = ldlm_local_lock_enqueue(req->rq_obd,
54                                       dlm_req->ns_id,
55                                       &dlm_req->parent_lock_handle,
56                                       dlm_req->res_id,
57                                       dlm_req->res_type,
58                                       &dlm_req->lock_extent,
59                                       dlm_req->mode,
60                                       &dlm_req->flags,
61                                       ldlm_client_callback,
62                                       ldlm_client_callback,
63                                       lustre_msg_buf(req->rq_reqmsg, 1),
64                                       req->rq_reqmsg->buflens[1],
65                                       &dlm_rep->lock_handle);
66         req->rq_status = err;
67
68         CERROR("local_lock_enqueue: %d\n", err);
69
70         RETURN(0);
71 }
72
73 static int ldlm_handle(struct obd_device *dev, struct ptlrpc_service *svc,
74                        struct ptlrpc_request *req)
75 {
76         int rc;
77         ENTRY;
78
79         rc = lustre_unpack_msg(req->rq_reqbuf, req->rq_reqlen);
80         req->rq_reqmsg = (struct lustre_msg *)req->rq_reqbuf;
81         if (rc) {
82                 CERROR("lustre_ldlm: Invalid request\n");
83                 GOTO(out, rc);
84         }
85
86         if (req->rq_reqmsg->type != PTL_RPC_REQUEST) {
87                 CERROR("lustre_ldlm: wrong packet type sent %d\n",
88                        req->rq_reqmsg->type);
89                 GOTO(out, rc = -EINVAL);
90         }
91
92         switch (req->rq_reqmsg->opc) {
93         case LDLM_ENQUEUE:
94                 CDEBUG(D_INODE, "enqueue\n");
95                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
96                 rc = ldlm_enqueue(req);
97                 break;
98 #if 0
99         case LDLM_CONVERT:
100                 CDEBUG(D_INODE, "convert\n");
101                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
102                 rc = ldlm_convert(req);
103                 break;
104
105         case LDLM_CANCEL:
106                 CDEBUG(D_INODE, "cancel\n");
107                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
108                 rc = ldlm_cancel(req);
109                 break;
110
111         case LDLM_CALLBACK:
112                 CDEBUG(D_INODE, "callback\n");
113                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
114                 rc = ldlm_callback(req);
115                 break;
116 #endif
117
118         default:
119                 rc = ptlrpc_error(dev, svc, req);
120                 RETURN(rc);
121         }
122
123         EXIT;
124 out:
125         if (rc) {
126                 CERROR("no header\n");
127                 return 0;
128         }
129
130         if( req->rq_status) {
131                 ptlrpc_error(dev, svc, req);
132         } else {
133                 CDEBUG(D_NET, "sending reply\n");
134                 ptlrpc_reply(dev, svc, req);
135         }
136
137         return 0;
138 }
139
140 static int ldlm_iocontrol(int cmd, struct obd_conn *conn, int len, void *karg,
141                           void *uarg)
142 {
143         struct obd_device *obddev = conn->oc_dev;
144         struct ptlrpc_client cl;
145         int err;
146
147         ENTRY;
148
149         if ( _IOC_TYPE(cmd) != IOC_LDLM_TYPE ||
150              _IOC_NR(cmd) < IOC_LDLM_MIN_NR  ||
151              _IOC_NR(cmd) > IOC_LDLM_MAX_NR ) {
152                 CDEBUG(D_IOCTL, "invalid ioctl ( type %d, nr %d, size %d )\n",
153                                 _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
154                 EXIT;
155                 return -EINVAL;
156         }
157
158         err = ptlrpc_connect_client(-1, "ldlm",
159                                     LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
160                                     &cl);
161         if (err) {
162                 CERROR("cannot create client\n");
163                 RETURN(-EINVAL);
164         }
165
166         switch (cmd) {
167         case IOC_LDLM_TEST: {
168                 err = ldlm_test(obddev);
169                 CERROR("-- done err %d\n", err);
170                 EXIT;
171                 break;
172         }
173         default:
174                 err = -EINVAL;
175                 EXIT;
176                 break;
177         }
178
179         return err;
180 }
181
182 static int ldlm_setup(struct obd_device *obddev, obd_count len, void *data)
183 {
184         struct ldlm_obd *ldlm = &obddev->u.ldlm;
185         int err;
186         ENTRY;
187
188         INIT_LIST_HEAD(&obddev->u.ldlm.ldlm_namespaces);
189         obddev->u.ldlm.ldlm_lock = SPIN_LOCK_UNLOCKED;
190
191         ldlm->ldlm_service = ptlrpc_init_svc(64 * 1024,
192                                              LDLM_REQUEST_PORTAL,
193                                              LDLM_REPLY_PORTAL,
194                                              "self", ldlm_handle);
195         if (!ldlm->ldlm_service)
196                 LBUG();
197
198         err = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm");
199         if (err)
200                 CERROR("cannot start thread\n");
201
202         MOD_INC_USE_COUNT;
203         RETURN(0);
204 }
205
206 static int cleanup_resource(struct ldlm_resource *res, struct list_head *q)
207 {
208         struct list_head *tmp, *pos;
209         int rc = 0;
210
211         list_for_each_safe(tmp, pos, q) {
212                 struct ldlm_lock *lock;
213
214                 if (rc) {
215                         /* Res was already cleaned up. */
216                         LBUG();
217                 }
218
219                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
220
221                 ldlm_resource_del_lock(lock);
222                 ldlm_lock_free(lock);
223                 rc = ldlm_resource_put(res);
224         }
225
226         return rc;
227 }
228
229 static int do_free_namespace(struct ldlm_namespace *ns)
230 {
231         struct list_head *tmp, *pos;
232         int i, rc;
233
234         for (i = 0; i < RES_HASH_SIZE; i++) {
235                 list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) {
236                         struct ldlm_resource *res;
237                         res = list_entry(tmp, struct ldlm_resource, lr_hash);
238                         list_del_init(&res->lr_hash);
239
240                         rc = cleanup_resource(res, &res->lr_granted);
241                         if (!rc)
242                                 rc = cleanup_resource(res, &res->lr_converting);
243                         if (!rc)
244                                 rc = cleanup_resource(res, &res->lr_waiting);
245
246                         while (rc == 0)
247                                 rc = ldlm_resource_put(res);
248                 }
249         }
250
251         return ldlm_namespace_free(ns);
252 }
253
254 static int ldlm_free_all(struct obd_device *obddev)
255 {
256         struct list_head *tmp, *pos;
257         int rc = 0;
258
259         ldlm_lock(obddev);
260
261         list_for_each_safe(tmp, pos, &obddev->u.ldlm.ldlm_namespaces) { 
262                 struct ldlm_namespace *ns;
263                 ns = list_entry(tmp, struct ldlm_namespace, ns_link);
264
265                 rc |= do_free_namespace(ns);
266         }
267
268         ldlm_unlock(obddev);
269
270         return rc;
271 }
272
273 static int ldlm_cleanup(struct obd_device *obddev)
274 {
275         struct ldlm_obd *ldlm = &obddev->u.ldlm;
276         ENTRY;
277
278         ptlrpc_stop_thread(ldlm->ldlm_service);
279         rpc_unregister_service(ldlm->ldlm_service);
280
281         if (!list_empty(&ldlm->ldlm_service->srv_reqs)) {
282                 // XXX reply with errors and clean up
283                 CERROR("Request list not empty!\n");
284         }
285
286         OBD_FREE(ldlm->ldlm_service, sizeof(*ldlm->ldlm_service));
287
288         if (ldlm_free_all(obddev)) {
289                 CERROR("ldlm_free_all could not complete.\n");
290                 RETURN(-1);
291         }
292
293         MOD_DEC_USE_COUNT;
294         RETURN(0);
295 }
296
297 struct obd_ops ldlm_obd_ops = {
298         o_iocontrol:   ldlm_iocontrol,
299         o_setup:       ldlm_setup,
300         o_cleanup:     ldlm_cleanup,
301         o_connect:     gen_connect,
302         o_disconnect:  gen_disconnect
303 };
304
305
306 static int __init ldlm_init(void)
307 {
308         int rc = obd_register_type(&ldlm_obd_ops, OBD_LDLM_DEVICENAME);
309         if (rc != 0)
310                 return rc;
311
312         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
313                                                sizeof(struct ldlm_resource), 0,
314                                                SLAB_HWCACHE_ALIGN, NULL, NULL);
315         if (ldlm_resource_slab == NULL)
316                 return -ENOMEM;
317
318         ldlm_lock_slab = kmem_cache_create("ldlm_locks",
319                                            sizeof(struct ldlm_lock), 0,
320                                            SLAB_HWCACHE_ALIGN, NULL, NULL);
321         if (ldlm_lock_slab == NULL) {
322                 kmem_cache_destroy(ldlm_resource_slab);
323                 return -ENOMEM;
324         }
325
326         return 0;
327 }
328
329 static void __exit ldlm_exit(void)
330 {
331         obd_unregister_type(OBD_LDLM_DEVICENAME);
332         kmem_cache_destroy(ldlm_resource_slab);
333         kmem_cache_destroy(ldlm_lock_slab);
334 }
335
336 MODULE_AUTHOR("Cluster File Systems, Inc. <braam@clusterfs.com>");
337 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
338 MODULE_LICENSE("GPL"); 
339
340 module_init(ldlm_init);
341 module_exit(ldlm_exit);