1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002 Cluster File Systems, Inc.
6 * This code is issued under the GNU General Public License.
7 * See the file COPYING in this distribution
9 * by Cluster File Systems, Inc.
13 #define DEBUG_SUBSYSTEM S_LDLM
15 #include <linux/module.h>
16 #include <linux/slab.h>
17 #include <linux/lustre_dlm.h>
19 extern kmem_cache_t *ldlm_resource_slab;
20 extern kmem_cache_t *ldlm_lock_slab;
21 extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
22 extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
24 #define LOOPBACK(x) (((x) & cpu_to_be32(0xff000000)) == cpu_to_be32(0x7f000000))
26 static int is_local_conn(struct ptlrpc_connection *conn)
32 RETURN(LOOPBACK(conn->c_peer.peer_nid));
35 /* _ldlm_callback and local_callback setup the variables then call this common
37 static int common_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
38 ldlm_mode_t mode, void *data, __u32 data_len)
44 if (!lock->l_resource)
49 spin_lock(&lock->l_resource->lr_lock);
50 spin_lock(&lock->l_lock);
52 CDEBUG(D_INFO, "Got local completion AST for lock %p.\n", lock);
53 lock->l_req_mode = mode;
54 list_del_init(&lock->l_res_link);
55 ldlm_grant_lock(lock->l_resource, lock);
56 wake_up(&lock->l_waitq);
57 spin_unlock(&lock->l_lock);
58 spin_unlock(&lock->l_resource->lr_lock);
60 CDEBUG(D_INFO, "Got local blocking AST for lock %p.\n", lock);
61 lock->l_flags |= LDLM_FL_DYING;
62 spin_unlock(&lock->l_lock);
63 spin_unlock(&lock->l_resource->lr_lock);
64 if (!lock->l_readers && !lock->l_writers) {
65 CDEBUG(D_INFO, "Lock already unused, calling "
66 "callback (%p).\n", lock->l_blocking_ast);
67 if (lock->l_blocking_ast != NULL)
68 lock->l_blocking_ast(lock, new, lock->l_data,
71 CDEBUG(D_INFO, "Lock still has references; lock will be"
72 " cancelled later.\n");
78 /* FIXME: I think that this is no longer necessary. */
79 static int local_callback(struct ldlm_lock *l, struct ldlm_lock *new,
80 void *data, __u32 data_len)
82 struct ldlm_lock *lock;
83 /* the 'remote handle' is the lock in the FS's namespace */
84 lock = lustre_handle2object(&l->l_remote_handle);
86 return common_callback(lock, new, l->l_granted_mode, data, data_len);
89 static int _ldlm_enqueue(struct obd_device *obddev, struct ptlrpc_service *svc,
90 struct ptlrpc_request *req)
92 struct ldlm_reply *dlm_rep;
93 struct ldlm_request *dlm_req;
94 int rc, size = sizeof(*dlm_rep), cookielen;
97 struct ldlm_lock *lock = NULL;
98 ldlm_lock_callback callback;
99 struct lustre_handle lockh;
103 /* Is this lock managed locally? */
104 if (is_local_conn(req->rq_connection))
105 callback = local_callback;
107 callback = ldlm_cli_callback;
109 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
110 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_MDSINTENT) {
111 /* In this case, the reply buffer is allocated deep in
112 * local_lock_enqueue by the policy function. */
114 cookielen = sizeof(*req);
116 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
119 CERROR("out of memory\n");
122 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) {
123 cookie = &dlm_req->lock_desc.l_extent;
124 cookielen = sizeof(struct ldlm_extent);
128 err = ldlm_local_lock_create(obddev->obd_namespace,
129 &dlm_req->lock_handle2,
130 dlm_req->lock_desc.l_resource.lr_name,
131 dlm_req->lock_desc.l_resource.lr_type,
132 dlm_req->lock_desc.l_req_mode,
137 flags = dlm_req->lock_flags;
138 err = ldlm_local_lock_enqueue(&lockh, cookie, cookielen, &flags,
143 dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
144 dlm_rep->lock_flags = flags;
146 memcpy(&dlm_rep->lock_handle, &lockh, sizeof(lockh));
147 lock = lustre_handle2object(&lockh);
148 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
149 memcpy(&dlm_rep->lock_extent, &lock->l_extent,
150 sizeof(lock->l_extent));
151 if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED)
152 memcpy(dlm_rep->lock_resource_name, lock->l_resource->lr_name,
153 sizeof(dlm_rep->lock_resource_name));
155 memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
156 sizeof(lock->l_remote_handle));
157 lock->l_connection = ptlrpc_connection_addref(req->rq_connection);
160 req->rq_status = err;
161 CDEBUG(D_INFO, "err = %d\n", err);
163 if (ptlrpc_reply(svc, req))
167 ldlm_reprocess_all(lock->l_resource);
172 static int _ldlm_convert(struct ptlrpc_service *svc, struct ptlrpc_request *req)
174 struct ldlm_request *dlm_req;
175 struct ldlm_reply *dlm_rep;
176 struct ldlm_resource *res;
177 int rc, size = sizeof(*dlm_rep);
180 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
182 CERROR("out of memory\n");
185 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
186 dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
187 dlm_rep->lock_flags = dlm_req->lock_flags;
189 res = ldlm_local_lock_convert(&dlm_req->lock_handle1,
190 dlm_req->lock_desc.l_req_mode,
191 &dlm_rep->lock_flags);
193 if (ptlrpc_reply(svc, req) != 0)
196 ldlm_reprocess_all(res);
201 static int _ldlm_cancel(struct ptlrpc_service *svc, struct ptlrpc_request *req)
203 struct ldlm_request *dlm_req;
204 struct ldlm_lock *lock;
205 struct ldlm_resource *res;
209 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
211 CERROR("out of memory\n");
214 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
216 lock = lustre_handle2object(&dlm_req->lock_handle1);
217 res = ldlm_local_lock_cancel(lock);
219 if (ptlrpc_reply(svc, req) != 0)
223 ldlm_reprocess_all(res);
228 static int _ldlm_callback(struct ptlrpc_service *svc,
229 struct ptlrpc_request *req)
231 struct ldlm_request *dlm_req;
232 struct ldlm_lock *lock1, *lock2;
236 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
238 CERROR("out of memory\n");
241 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
243 /* We must send the reply first, so that the thread is free to handle
244 * any requests made in common_callback() */
245 rc = ptlrpc_reply(svc, req);
249 lock1 = lustre_handle2object(&dlm_req->lock_handle1);
250 lock2 = lustre_handle2object(&dlm_req->lock_handle2);
252 common_callback(lock1, lock2, dlm_req->lock_desc.l_granted_mode, NULL,
257 static int lustre_handle(struct obd_device *dev, struct ptlrpc_service *svc,
258 struct ptlrpc_request *req)
260 struct obd_device *req_dev;
264 rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
266 CERROR("lustre_ldlm: Invalid request\n");
270 if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
271 CERROR("lustre_ldlm: wrong packet type sent %d\n",
272 req->rq_reqmsg->type);
273 GOTO(out, rc = -EINVAL);
276 id = req->rq_reqmsg->target_id;
277 if (id < 0 || id > MAX_OBD_DEVICES)
278 GOTO(out, rc = -ENODEV);
279 req_dev = req->rq_obd = &obd_dev[id];
281 switch (req->rq_reqmsg->opc) {
283 CDEBUG(D_INODE, "enqueue\n");
284 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
285 rc = _ldlm_enqueue(req_dev, svc, req);
289 CDEBUG(D_INODE, "convert\n");
290 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
291 rc = _ldlm_convert(svc, req);
295 CDEBUG(D_INODE, "cancel\n");
296 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
297 rc = _ldlm_cancel(svc, req);
301 CDEBUG(D_INODE, "callback\n");
302 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
303 rc = _ldlm_callback(svc, req);
307 rc = ptlrpc_error(svc, req);
314 RETURN(ptlrpc_error(svc, req));
318 static int ldlm_iocontrol(long cmd, struct obd_conn *conn, int len, void *karg,
321 struct obd_device *obddev = conn->oc_dev;
322 struct ptlrpc_connection *connection;
326 if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
327 _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
328 CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
329 _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
333 OBD_ALLOC(obddev->u.ldlm.ldlm_client,
334 sizeof(*obddev->u.ldlm.ldlm_client));
335 ptlrpc_init_client(NULL, NULL,
336 LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
337 obddev->u.ldlm.ldlm_client);
338 connection = ptlrpc_uuid_to_connection("ldlm");
340 CERROR("No LDLM UUID found: assuming ldlm is local.\n");
343 case IOC_LDLM_TEST: {
344 err = ldlm_test(obddev, connection);
345 CERROR("-- done err %d\n", err);
349 GOTO(out, err = -EINVAL);
354 ptlrpc_put_connection(connection);
355 OBD_FREE(obddev->u.ldlm.ldlm_client,
356 sizeof(*obddev->u.ldlm.ldlm_client));
360 static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
362 struct ldlm_obd *ldlm = &obddev->u.ldlm;
367 ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL,
368 LDLM_REPLY_PORTAL, "self", lustre_handle);
369 if (!ldlm->ldlm_service)
372 err = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm");
374 CERROR("cannot start thread\n");
377 err = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm");
379 CERROR("cannot start thread\n");
382 err = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm");
384 CERROR("cannot start thread\n");
387 err = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm");
389 CERROR("cannot start thread\n");
397 static int ldlm_cleanup(struct obd_device *obddev)
399 struct ldlm_obd *ldlm = &obddev->u.ldlm;
402 ptlrpc_stop_all_threads(ldlm->ldlm_service);
403 rpc_unregister_service(ldlm->ldlm_service);
405 if (!list_empty(&ldlm->ldlm_service->srv_reqs)) {
406 // XXX reply with errors and clean up
407 CERROR("Request list not empty!\n");
410 OBD_FREE(ldlm->ldlm_service, sizeof(*ldlm->ldlm_service));
412 if (mds_reint_p != NULL)
413 inter_module_put("mds_reint");
414 if (mds_getattr_name_p != NULL)
415 inter_module_put("mds_getattr_name");
421 struct obd_ops ldlm_obd_ops = {
422 o_iocontrol: ldlm_iocontrol,
424 o_cleanup: ldlm_cleanup,
425 o_connect: gen_connect,
426 o_disconnect: gen_disconnect
430 static int __init ldlm_init(void)
432 int rc = obd_register_type(&ldlm_obd_ops, OBD_LDLM_DEVICENAME);
436 ldlm_resource_slab = kmem_cache_create("ldlm_resources",
437 sizeof(struct ldlm_resource), 0,
438 SLAB_HWCACHE_ALIGN, NULL, NULL);
439 if (ldlm_resource_slab == NULL)
442 ldlm_lock_slab = kmem_cache_create("ldlm_locks",
443 sizeof(struct ldlm_lock), 0,
444 SLAB_HWCACHE_ALIGN, NULL, NULL);
445 if (ldlm_lock_slab == NULL) {
446 kmem_cache_destroy(ldlm_resource_slab);
453 static void __exit ldlm_exit(void)
455 obd_unregister_type(OBD_LDLM_DEVICENAME);
456 if (kmem_cache_destroy(ldlm_resource_slab) != 0)
457 CERROR("couldn't free ldlm resource slab\n");
458 if (kmem_cache_destroy(ldlm_lock_slab) != 0)
459 CERROR("couldn't free ldlm lock slab\n");
462 EXPORT_SYMBOL(ldlm_local_lock_match);
463 EXPORT_SYMBOL(ldlm_lock_addref);
464 EXPORT_SYMBOL(ldlm_lock_decref);
465 EXPORT_SYMBOL(ldlm_cli_convert);
466 EXPORT_SYMBOL(ldlm_cli_enqueue);
467 EXPORT_SYMBOL(ldlm_cli_cancel);
468 EXPORT_SYMBOL(lustre_handle2object);
469 EXPORT_SYMBOL(ldlm_test);
470 EXPORT_SYMBOL(ldlm_lock_dump);
471 EXPORT_SYMBOL(ldlm_namespace_new);
472 EXPORT_SYMBOL(ldlm_namespace_free);
474 MODULE_AUTHOR("Cluster File Systems, Inc. <braam@clusterfs.com>");
475 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
476 MODULE_LICENSE("GPL");
478 module_init(ldlm_init);
479 module_exit(ldlm_exit);