1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002 Cluster File Systems, Inc.
6 * This code is issued under the GNU General Public License.
7 * See the file COPYING in this distribution
9 * by Cluster File Systems, Inc.
14 #include <linux/version.h>
15 #include <linux/module.h>
16 #include <linux/slab.h>
17 #include <asm/unistd.h>
19 #define DEBUG_SUBSYSTEM S_LDLM
21 #include <linux/lustre_dlm.h>
23 extern kmem_cache_t *ldlm_resource_slab;
24 extern kmem_cache_t *ldlm_lock_slab;
26 static int _ldlm_namespace_new(struct obd_device *obddev,
27 struct ptlrpc_request *req)
29 struct ldlm_request *dlm_req;
30 struct ldlm_namespace *ns;
35 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf);
36 req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
38 CERROR("out of memory\n");
39 req->rq_status = -ENOMEM;
42 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
44 err = ldlm_namespace_new(obddev, dlm_req->lock_desc.l_resource.lr_ns_id,
48 CERROR("err = %d\n", err);
53 static int _ldlm_enqueue(struct ptlrpc_request *req)
55 struct ldlm_reply *dlm_rep;
56 struct ldlm_request *dlm_req;
57 int rc, size = sizeof(*dlm_rep);
61 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf);
62 req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
64 CERROR("out of memory\n");
65 req->rq_status = -ENOMEM;
68 dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
69 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
71 memcpy(&dlm_rep->lock_extent, &dlm_req->lock_desc.l_extent,
72 sizeof(dlm_rep->lock_extent));
74 err = ldlm_local_lock_enqueue(req->rq_obd,
75 dlm_req->lock_desc.l_resource.lr_ns_id,
76 &dlm_req->lock_handle2,
77 dlm_req->lock_desc.l_resource.lr_name,
78 dlm_req->lock_desc.l_resource.lr_type,
79 &dlm_rep->lock_extent,
80 dlm_req->lock_desc.l_req_mode,
85 lustre_msg_buf(req->rq_reqmsg, 1),
86 req->rq_reqmsg->buflens[1],
87 &dlm_rep->lock_handle);
88 if (err != -ENOMEM && err != -ELDLM_BAD_NAMESPACE) {
89 struct ldlm_lock *lock;
90 lock = ldlm_handle2object(&dlm_rep->lock_handle);
91 memcpy(&lock->l_peer, &req->rq_peer, sizeof(lock->l_peer));
92 memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
93 sizeof(lock->l_remote_handle));
97 CERROR("err = %d\n", err);
102 static int _ldlm_convert(struct ptlrpc_request *req)
104 struct ldlm_request *dlm_req;
108 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf);
109 req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
111 CERROR("out of memory\n");
112 req->rq_status = -ENOMEM;
115 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
118 ldlm_local_lock_convert(req->rq_obd, &dlm_req->lock_handle1,
119 dlm_req->lock_desc.l_req_mode,
124 static int _ldlm_cancel(struct ptlrpc_request *req)
126 struct ldlm_request *dlm_req;
130 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf);
131 req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
133 CERROR("out of memory\n");
134 req->rq_status = -ENOMEM;
137 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
140 ldlm_local_lock_cancel(req->rq_obd, &dlm_req->lock_handle1);
144 static int _ldlm_callback(struct ptlrpc_request *req)
146 struct ldlm_request *dlm_req;
147 struct ldlm_lock *lock;
151 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf);
152 req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
154 CERROR("out of memory\n");
155 req->rq_status = -ENOMEM;
158 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
160 lock = ldlm_handle2object(&dlm_req->lock_handle1);
161 ldlm_lock_dump(lock);
168 static int ldlm_handle(struct obd_device *dev, struct ptlrpc_service *svc,
169 struct ptlrpc_request *req)
174 rc = lustre_unpack_msg(req->rq_reqbuf, req->rq_reqlen);
175 req->rq_reqmsg = (struct lustre_msg *)req->rq_reqbuf;
177 CERROR("lustre_ldlm: Invalid request\n");
181 if (req->rq_reqmsg->type != PTL_RPC_REQUEST) {
182 CERROR("lustre_ldlm: wrong packet type sent %d\n",
183 req->rq_reqmsg->type);
184 GOTO(out, rc = -EINVAL);
187 switch (req->rq_reqmsg->opc) {
188 case LDLM_NAMESPACE_NEW:
189 CDEBUG(D_INODE, "namespace_new\n");
190 OBD_FAIL_RETURN(OBD_FAIL_LDLM_NAMESPACE_NEW, 0);
191 rc = _ldlm_namespace_new(dev, req);
195 CDEBUG(D_INODE, "enqueue\n");
196 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
197 rc = _ldlm_enqueue(req);
201 CDEBUG(D_INODE, "convert\n");
202 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
203 rc = _ldlm_convert(req);
207 CDEBUG(D_INODE, "cancel\n");
208 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
209 rc = _ldlm_cancel(req);
213 CDEBUG(D_INODE, "callback\n");
214 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
215 rc = _ldlm_callback(req);
219 rc = ptlrpc_error(svc, req);
225 RETURN(ptlrpc_error(svc, req));
227 RETURN(ptlrpc_reply(svc, req));
230 static int ldlm_iocontrol(int cmd, struct obd_conn *conn, int len, void *karg,
233 struct obd_device *obddev = conn->oc_dev;
237 if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
238 _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
239 CDEBUG(D_IOCTL, "invalid ioctl ( type %d, nr %d, size %d )\n",
240 _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
246 /* XX phil -- put the peer back in */
248 ptlrpc_init_client(NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL, &cl);
249 err = ptlrpc_connect_client("ldlm", &cl, NULL);
252 CERROR("cannot create client\n");
257 case IOC_LDLM_TEST: {
258 err = ldlm_test(obddev);
259 CERROR("-- done err %d\n", err);
263 GOTO(out, err = -EINVAL);
270 static int ldlm_setup(struct obd_device *obddev, obd_count len, void *data)
272 struct ldlm_obd *ldlm = &obddev->u.ldlm;
276 INIT_LIST_HEAD(&obddev->u.ldlm.ldlm_namespaces);
277 obddev->u.ldlm.ldlm_lock = SPIN_LOCK_UNLOCKED;
279 ldlm->ldlm_service = ptlrpc_init_svc(64 * 1024,
282 "self", ldlm_handle);
283 if (!ldlm->ldlm_service)
286 err = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm");
288 CERROR("cannot start thread\n");
292 OBD_ALLOC(ldlm->ldlm_client, sizeof(*ldlm->ldlm_client));
293 if (ldlm->ldlm_client == NULL)
296 ptlrpc_init_client(-1, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
298 err = ptlrpc_connect_client("ldlm", ldlm->ldlm_client,
299 &ldlm->ldlm_server_peer);
301 CERROR("cannot create client\n");
309 static int cleanup_resource(struct ldlm_resource *res, struct list_head *q)
311 struct list_head *tmp, *pos;
314 list_for_each_safe(tmp, pos, q) {
315 struct ldlm_lock *lock;
318 /* Res was already cleaned up. */
322 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
324 ldlm_resource_del_lock(lock);
325 ldlm_lock_free(lock);
326 rc = ldlm_resource_put(res);
332 static int do_free_namespace(struct ldlm_namespace *ns)
334 struct list_head *tmp, *pos;
337 for (i = 0; i < RES_HASH_SIZE; i++) {
338 list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) {
339 struct ldlm_resource *res;
340 res = list_entry(tmp, struct ldlm_resource, lr_hash);
341 list_del_init(&res->lr_hash);
343 rc = cleanup_resource(res, &res->lr_granted);
345 rc = cleanup_resource(res, &res->lr_converting);
347 rc = cleanup_resource(res, &res->lr_waiting);
350 rc = ldlm_resource_put(res);
354 return ldlm_namespace_free(ns);
357 static int ldlm_free_all(struct obd_device *obddev)
359 struct list_head *tmp, *pos;
364 list_for_each_safe(tmp, pos, &obddev->u.ldlm.ldlm_namespaces) {
365 struct ldlm_namespace *ns;
366 ns = list_entry(tmp, struct ldlm_namespace, ns_link);
368 rc |= do_free_namespace(ns);
376 static int ldlm_cleanup(struct obd_device *obddev)
378 struct ldlm_obd *ldlm = &obddev->u.ldlm;
381 ptlrpc_stop_thread(ldlm->ldlm_service);
382 rpc_unregister_service(ldlm->ldlm_service);
384 if (!list_empty(&ldlm->ldlm_service->srv_reqs)) {
385 // XXX reply with errors and clean up
386 CERROR("Request list not empty!\n");
389 OBD_FREE(ldlm->ldlm_client, sizeof(*ldlm->ldlm_client));
390 OBD_FREE(ldlm->ldlm_service, sizeof(*ldlm->ldlm_service));
392 if (ldlm_free_all(obddev)) {
393 CERROR("ldlm_free_all could not complete.\n");
401 struct obd_ops ldlm_obd_ops = {
402 o_iocontrol: ldlm_iocontrol,
404 o_cleanup: ldlm_cleanup,
405 o_connect: gen_connect,
406 o_disconnect: gen_disconnect
410 static int __init ldlm_init(void)
412 int rc = obd_register_type(&ldlm_obd_ops, OBD_LDLM_DEVICENAME);
416 ldlm_resource_slab = kmem_cache_create("ldlm_resources",
417 sizeof(struct ldlm_resource), 0,
418 SLAB_HWCACHE_ALIGN, NULL, NULL);
419 if (ldlm_resource_slab == NULL)
422 ldlm_lock_slab = kmem_cache_create("ldlm_locks",
423 sizeof(struct ldlm_lock), 0,
424 SLAB_HWCACHE_ALIGN, NULL, NULL);
425 if (ldlm_lock_slab == NULL) {
426 kmem_cache_destroy(ldlm_resource_slab);
433 static void __exit ldlm_exit(void)
435 obd_unregister_type(OBD_LDLM_DEVICENAME);
436 kmem_cache_destroy(ldlm_resource_slab);
437 kmem_cache_destroy(ldlm_lock_slab);
440 MODULE_AUTHOR("Cluster File Systems, Inc. <braam@clusterfs.com>");
441 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
442 MODULE_LICENSE("GPL");
444 module_init(ldlm_init);
445 module_exit(ldlm_exit);