1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002 Cluster File Systems, Inc.
5 * Author: Peter Braam <braam@clusterfs.com>
6 * Author: Phil Schwan <phil@clusterfs.com>
8 * This file is part of Lustre, http://www.lustre.org.
10 * Lustre is free software; you can redistribute it and/or
11 * modify it under the terms of version 2 of the GNU General Public
12 * License as published by the Free Software Foundation.
14 * Lustre is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with Lustre; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25 #define DEBUG_SUBSYSTEM S_LDLM
27 #include <linux/module.h>
28 #include <linux/slab.h>
29 #include <linux/lustre_dlm.h>
30 #include <linux/init.h>
32 extern kmem_cache_t *ldlm_resource_slab;
33 extern kmem_cache_t *ldlm_lock_slab;
34 extern struct list_head ldlm_namespace_list;
35 extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
36 extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
38 static int ldlm_server_blocking_ast(struct ldlm_lock *lock,
39 struct ldlm_lock_desc *desc,
40 void *data, __u32 data_len)
42 struct ldlm_request *body;
43 struct ptlrpc_request *req;
44 struct ptlrpc_client *cl;
45 int rc = 0, size = sizeof(*body);
48 cl = &lock->l_resource->lr_namespace->ns_rpc_client;
49 req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_BL_CALLBACK, 1,
54 body = lustre_msg_buf(req->rq_reqmsg, 0);
55 memcpy(&body->lock_handle1, &lock->l_remote_handle,
56 sizeof(body->lock_handle1));
57 memcpy(&body->lock_desc, desc, sizeof(*desc));
59 LDLM_DEBUG(lock, "server preparing blocking AST");
60 req->rq_replen = lustre_msg_size(0, NULL);
62 rc = ptlrpc_queue_wait(req);
63 rc = ptlrpc_check_status(req, rc);
69 static int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags)
71 struct ldlm_request *body;
72 struct ptlrpc_request *req;
73 struct ptlrpc_client *cl;
74 int rc = 0, size = sizeof(*body);
82 cl = &lock->l_resource->lr_namespace->ns_rpc_client;
83 req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CP_CALLBACK, 1,
88 body = lustre_msg_buf(req->rq_reqmsg, 0);
89 memcpy(&body->lock_handle1, &lock->l_remote_handle,
90 sizeof(body->lock_handle1));
91 body->lock_flags = flags;
92 ldlm_lock2desc(lock, &body->lock_desc);
94 LDLM_DEBUG(lock, "server preparing completion AST");
95 req->rq_replen = lustre_msg_size(0, NULL);
97 rc = ptlrpc_queue_wait(req);
98 rc = ptlrpc_check_status(req, rc);
103 int ldlm_handle_enqueue(struct ptlrpc_request *req)
105 struct obd_device *obddev = req->rq_export->exp_obd;
106 struct ldlm_reply *dlm_rep;
107 struct ldlm_request *dlm_req;
108 int rc, size = sizeof(*dlm_rep), cookielen = 0;
111 struct ldlm_lock *lock = NULL;
115 LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
117 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
118 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_MDSINTENT) {
119 /* In this case, the reply buffer is allocated deep in
120 * local_lock_enqueue by the policy function. */
122 cookielen = sizeof(*req);
124 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
127 CERROR("out of memory\n");
130 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) {
131 cookie = &dlm_req->lock_desc.l_extent;
132 cookielen = sizeof(struct ldlm_extent);
136 lock = ldlm_lock_create(obddev->obd_namespace,
137 &dlm_req->lock_handle2,
138 dlm_req->lock_desc.l_resource.lr_name,
139 dlm_req->lock_desc.l_resource.lr_type,
140 dlm_req->lock_desc.l_req_mode, NULL, 0);
142 GOTO(out, err = -ENOMEM);
144 memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
145 sizeof(lock->l_remote_handle));
146 LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
148 flags = dlm_req->lock_flags;
149 err = ldlm_lock_enqueue(lock, cookie, cookielen, &flags,
150 ldlm_server_completion_ast,
151 ldlm_server_blocking_ast);
155 dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
156 dlm_rep->lock_flags = flags;
158 ldlm_lock2handle(lock, &dlm_rep->lock_handle);
159 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
160 memcpy(&dlm_rep->lock_extent, &lock->l_extent,
161 sizeof(lock->l_extent));
162 if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED) {
163 memcpy(dlm_rep->lock_resource_name, lock->l_resource->lr_name,
164 sizeof(dlm_rep->lock_resource_name));
165 dlm_rep->lock_mode = lock->l_req_mode;
168 lock->l_connection = ptlrpc_connection_addref(req->rq_connection);
172 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
174 req->rq_status = err;
176 if (ptlrpc_reply(req->rq_svc, req))
181 ldlm_reprocess_all(lock->l_resource);
184 LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p)", lock);
189 int ldlm_handle_convert(struct ptlrpc_request *req)
191 struct ldlm_request *dlm_req;
192 struct ldlm_reply *dlm_rep;
193 struct ldlm_lock *lock;
194 int rc, size = sizeof(*dlm_rep);
197 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
199 CERROR("out of memory\n");
202 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
203 dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
204 dlm_rep->lock_flags = dlm_req->lock_flags;
206 lock = ldlm_handle2lock(&dlm_req->lock_handle1);
208 req->rq_status = EINVAL;
210 LDLM_DEBUG(lock, "server-side convert handler START");
211 ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
212 &dlm_rep->lock_flags);
215 if (ptlrpc_reply(req->rq_svc, req) != 0)
219 ldlm_reprocess_all(lock->l_resource);
220 LDLM_DEBUG(lock, "server-side convert handler END");
223 LDLM_DEBUG_NOLOCK("server-side convert handler END");
228 int ldlm_handle_cancel(struct ptlrpc_request *req)
230 struct ldlm_request *dlm_req;
231 struct ldlm_lock *lock;
235 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
237 CERROR("out of memory\n");
240 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
242 lock = ldlm_handle2lock(&dlm_req->lock_handle1);
244 LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock (lock "
245 "%p)", (void *)(unsigned long)
246 dlm_req->lock_handle1.addr);
247 req->rq_status = ESTALE;
249 LDLM_DEBUG(lock, "server-side cancel handler START");
250 ldlm_lock_cancel(lock);
254 if (ptlrpc_reply(req->rq_svc, req) != 0)
258 ldlm_reprocess_all(lock->l_resource);
259 LDLM_DEBUG(lock, "server-side cancel handler END");
266 static int ldlm_handle_bl_callback(struct ptlrpc_request *req)
268 struct ldlm_request *dlm_req;
269 struct ldlm_lock *lock;
273 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
276 rc = ptlrpc_reply(req->rq_svc, req);
280 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
282 lock = ldlm_handle2lock(&dlm_req->lock_handle1);
284 CERROR("blocking callback on lock %Lx - lock disappeared\n",
285 dlm_req->lock_handle1.addr);
289 LDLM_DEBUG(lock, "client blocking AST callback handler START");
291 l_lock(&lock->l_resource->lr_namespace->ns_lock);
292 lock->l_flags |= LDLM_FL_CBPENDING;
293 do_ast = (!lock->l_readers && !lock->l_writers);
294 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
297 LDLM_DEBUG(lock, "already unused, calling "
298 "callback (%p)", lock->l_blocking_ast);
299 if (lock->l_blocking_ast != NULL) {
300 lock->l_blocking_ast(lock, &dlm_req->lock_desc,
301 lock->l_data, lock->l_data_len);
304 LDLM_DEBUG(lock, "Lock still has references, will be"
307 LDLM_DEBUG(lock, "client blocking callback handler END");
312 static int ldlm_handle_cp_callback(struct ptlrpc_request *req)
314 struct list_head ast_list = LIST_HEAD_INIT(ast_list);
315 struct ldlm_request *dlm_req;
316 struct ldlm_lock *lock;
320 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
323 rc = ptlrpc_reply(req->rq_svc, req);
327 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
329 lock = ldlm_handle2lock(&dlm_req->lock_handle1);
331 CERROR("completion callback on lock %Lx - lock disappeared\n",
332 dlm_req->lock_handle1.addr);
336 LDLM_DEBUG(lock, "client completion callback handler START");
338 l_lock(&lock->l_resource->lr_namespace->ns_lock);
340 /* If we receive the completion AST before the actual enqueue returned,
341 * then we might need to switch resources or lock modes. */
342 if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
343 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
344 LDLM_DEBUG(lock, "completion AST, new lock mode");
346 ldlm_resource_unlink_lock(lock);
347 if (memcmp(dlm_req->lock_desc.l_resource.lr_name,
348 lock->l_resource->lr_name,
349 sizeof(__u64) * RES_NAME_SIZE) != 0) {
350 ldlm_lock_change_resource(lock, dlm_req->lock_desc.l_resource.lr_name);
351 LDLM_DEBUG(lock, "completion AST, new resource");
353 lock->l_resource->lr_tmp = &ast_list;
354 ldlm_grant_lock(lock);
355 lock->l_resource->lr_tmp = NULL;
356 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
357 LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
360 ldlm_run_ast_work(&ast_list);
362 LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
367 static int ldlm_callback_handler(struct ptlrpc_request *req)
372 rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
374 CERROR("lustre_ldlm: Invalid request\n");
378 if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
379 CERROR("lustre_ldlm: wrong packet type sent %d\n",
380 req->rq_reqmsg->type);
381 GOTO(out, rc = -EINVAL);
383 switch (req->rq_reqmsg->opc) {
384 case LDLM_BL_CALLBACK:
385 CDEBUG(D_INODE, "blocking ast\n");
386 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
387 rc = ldlm_handle_bl_callback(req);
389 case LDLM_CP_CALLBACK:
390 CDEBUG(D_INODE, "completion ast\n");
391 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
392 rc = ldlm_handle_cp_callback(req);
396 rc = ptlrpc_error(req->rq_svc, req);
403 RETURN(ptlrpc_error(req->rq_svc, req));
408 static int ldlm_iocontrol(long cmd, struct lustre_handle *conn, int len,
409 void *karg, void *uarg)
411 struct obd_device *obddev = class_conn2obd(conn);
412 struct ptlrpc_connection *connection;
416 if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
417 _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
418 CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
419 _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
423 OBD_ALLOC(obddev->u.ldlm.ldlm_client,
424 sizeof(*obddev->u.ldlm.ldlm_client));
425 ptlrpc_init_client(NULL, NULL,
426 LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
427 obddev->u.ldlm.ldlm_client);
428 connection = ptlrpc_uuid_to_connection("ldlm");
430 CERROR("No LDLM UUID found: assuming ldlm is local.\n");
434 err = ldlm_test(obddev, conn);
435 CERROR("-- done err %d\n", err);
438 ldlm_dump_all_namespaces();
441 GOTO(out, err = -EINVAL);
446 ptlrpc_put_connection(connection);
447 OBD_FREE(obddev->u.ldlm.ldlm_client,
448 sizeof(*obddev->u.ldlm.ldlm_client));
452 #define LDLM_NUM_THREADS 8
454 static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
456 struct ldlm_obd *ldlm = &obddev->u.ldlm;
461 rc = ldlm_proc_setup(obddev);
465 ldlm->ldlm_service = ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL,
466 LDLM_REPLY_PORTAL, "self",
467 ldlm_callback_handler);
468 if (!ldlm->ldlm_service)
469 GOTO(out_proc, rc = -ENOMEM);
471 for (i = 0; i < LDLM_NUM_THREADS; i++) {
473 sprintf(name, "lustre_dlm_%02d", i);
474 rc = ptlrpc_start_thread(obddev, ldlm->ldlm_service, name);
476 CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
478 GOTO(out_thread, rc);
485 ptlrpc_stop_all_threads(ldlm->ldlm_service);
486 ptlrpc_unregister_service(ldlm->ldlm_service);
488 ldlm_proc_cleanup(obddev);
494 static int ldlm_cleanup(struct obd_device *obddev)
496 struct ldlm_obd *ldlm = &obddev->u.ldlm;
499 if (!list_empty(&ldlm_namespace_list)) {
500 CERROR("ldlm still has namespaces; clean these up first.\n");
504 ptlrpc_stop_all_threads(ldlm->ldlm_service);
505 ptlrpc_unregister_service(ldlm->ldlm_service);
506 ldlm_proc_cleanup(obddev);
512 struct obd_ops ldlm_obd_ops = {
513 o_iocontrol: ldlm_iocontrol,
515 o_cleanup: ldlm_cleanup,
516 o_connect: class_connect,
517 o_disconnect: class_disconnect
520 static int __init ldlm_init(void)
522 int rc = class_register_type(&ldlm_obd_ops, OBD_LDLM_DEVICENAME);
526 ldlm_resource_slab = kmem_cache_create("ldlm_resources",
527 sizeof(struct ldlm_resource), 0,
528 SLAB_HWCACHE_ALIGN, NULL, NULL);
529 if (ldlm_resource_slab == NULL)
532 ldlm_lock_slab = kmem_cache_create("ldlm_locks",
533 sizeof(struct ldlm_lock), 0,
534 SLAB_HWCACHE_ALIGN, NULL, NULL);
535 if (ldlm_lock_slab == NULL) {
536 kmem_cache_destroy(ldlm_resource_slab);
543 static void __exit ldlm_exit(void)
545 class_unregister_type(OBD_LDLM_DEVICENAME);
546 if (kmem_cache_destroy(ldlm_resource_slab) != 0)
547 CERROR("couldn't free ldlm resource slab\n");
548 if (kmem_cache_destroy(ldlm_lock_slab) != 0)
549 CERROR("couldn't free ldlm lock slab\n");
552 EXPORT_SYMBOL(ldlm_completion_ast);
553 EXPORT_SYMBOL(ldlm_handle_enqueue);
554 EXPORT_SYMBOL(ldlm_handle_cancel);
555 EXPORT_SYMBOL(ldlm_handle_convert);
556 EXPORT_SYMBOL(ldlm_register_intent);
557 EXPORT_SYMBOL(ldlm_unregister_intent);
558 EXPORT_SYMBOL(ldlm_lockname);
559 EXPORT_SYMBOL(ldlm_typename);
560 EXPORT_SYMBOL(ldlm_handle2lock);
561 EXPORT_SYMBOL(ldlm_lock2handle);
562 EXPORT_SYMBOL(ldlm_lock_put);
563 EXPORT_SYMBOL(ldlm_lock_match);
564 EXPORT_SYMBOL(ldlm_lock_addref);
565 EXPORT_SYMBOL(ldlm_lock_decref);
566 EXPORT_SYMBOL(ldlm_lock_change_resource);
567 EXPORT_SYMBOL(ldlm_cli_convert);
568 EXPORT_SYMBOL(ldlm_cli_enqueue);
569 EXPORT_SYMBOL(ldlm_cli_cancel);
570 EXPORT_SYMBOL(ldlm_match_or_enqueue);
571 EXPORT_SYMBOL(ldlm_it2str);
572 EXPORT_SYMBOL(ldlm_test);
573 EXPORT_SYMBOL(ldlm_regression_start);
574 EXPORT_SYMBOL(ldlm_regression_stop);
575 EXPORT_SYMBOL(ldlm_lock_dump);
576 EXPORT_SYMBOL(ldlm_namespace_new);
577 EXPORT_SYMBOL(ldlm_namespace_free);
578 EXPORT_SYMBOL(l_lock);
579 EXPORT_SYMBOL(l_unlock);
581 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
582 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
583 MODULE_LICENSE("GPL");
585 module_init(ldlm_init);
586 module_exit(ldlm_exit);