1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002 Cluster File Systems, Inc.
5 * Author: Peter Braam <braam@clusterfs.com>
6 * Author: Phil Schwan <phil@clusterfs.com>
8 * This file is part of Lustre, http://www.lustre.org.
10 * Lustre is free software; you can redistribute it and/or
11 * modify it under the terms of version 2 of the GNU General Public
12 * License as published by the Free Software Foundation.
14 * Lustre is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with Lustre; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25 #define DEBUG_SUBSYSTEM S_LDLM
27 #include <linux/module.h>
28 #include <linux/slab.h>
29 #include <linux/lustre_dlm.h>
31 extern kmem_cache_t *ldlm_resource_slab;
32 extern kmem_cache_t *ldlm_lock_slab;
33 extern struct list_head ldlm_namespace_list;
34 extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
35 extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
37 static int ldlm_server_blocking_ast(struct ldlm_lock *lock,
38 struct ldlm_lock_desc *desc,
39 void *data, __u32 data_len)
41 struct ldlm_request *body;
42 struct ptlrpc_request *req;
43 struct ptlrpc_client *cl;
44 int rc = 0, size = sizeof(*body);
47 cl = &lock->l_resource->lr_namespace->ns_rpc_client;
48 req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_BL_CALLBACK, 1,
53 body = lustre_msg_buf(req->rq_reqmsg, 0);
54 memcpy(&body->lock_handle1, &lock->l_remote_handle,
55 sizeof(body->lock_handle1));
56 memcpy(&body->lock_desc, desc, sizeof(*desc));
58 LDLM_DEBUG(lock, "server preparing blocking AST");
59 req->rq_replen = lustre_msg_size(0, NULL);
61 rc = ptlrpc_queue_wait(req);
62 rc = ptlrpc_check_status(req, rc);
68 static int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags)
70 struct ldlm_request *body;
71 struct ptlrpc_request *req;
72 struct ptlrpc_client *cl;
73 int rc = 0, size = sizeof(*body);
81 cl = &lock->l_resource->lr_namespace->ns_rpc_client;
82 req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CP_CALLBACK, 1,
87 body = lustre_msg_buf(req->rq_reqmsg, 0);
88 memcpy(&body->lock_handle1, &lock->l_remote_handle,
89 sizeof(body->lock_handle1));
90 body->lock_flags = flags;
91 ldlm_lock2desc(lock, &body->lock_desc);
93 LDLM_DEBUG(lock, "server preparing completion AST");
94 req->rq_replen = lustre_msg_size(0, NULL);
96 rc = ptlrpc_queue_wait(req);
97 rc = ptlrpc_check_status(req, rc);
102 int ldlm_handle_enqueue(struct ptlrpc_request *req)
104 struct obd_device *obddev = req->rq_export->exp_obd;
105 struct ldlm_reply *dlm_rep;
106 struct ldlm_request *dlm_req;
107 int rc, size = sizeof(*dlm_rep), cookielen = 0;
110 struct ldlm_lock *lock = NULL;
114 LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
116 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
117 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_MDSINTENT) {
118 /* In this case, the reply buffer is allocated deep in
119 * local_lock_enqueue by the policy function. */
121 cookielen = sizeof(*req);
123 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
126 CERROR("out of memory\n");
129 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) {
130 cookie = &dlm_req->lock_desc.l_extent;
131 cookielen = sizeof(struct ldlm_extent);
135 lock = ldlm_lock_create(obddev->obd_namespace,
136 &dlm_req->lock_handle2,
137 dlm_req->lock_desc.l_resource.lr_name,
138 dlm_req->lock_desc.l_resource.lr_type,
139 dlm_req->lock_desc.l_req_mode, NULL, 0);
141 GOTO(out, err = -ENOMEM);
143 memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
144 sizeof(lock->l_remote_handle));
145 LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
147 flags = dlm_req->lock_flags;
148 err = ldlm_lock_enqueue(lock, cookie, cookielen, &flags,
149 ldlm_server_completion_ast,
150 ldlm_server_blocking_ast);
154 dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
155 dlm_rep->lock_flags = flags;
157 ldlm_lock2handle(lock, &dlm_rep->lock_handle);
158 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
159 memcpy(&dlm_rep->lock_extent, &lock->l_extent,
160 sizeof(lock->l_extent));
161 if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED) {
162 memcpy(dlm_rep->lock_resource_name, lock->l_resource->lr_name,
163 sizeof(dlm_rep->lock_resource_name));
164 dlm_rep->lock_mode = lock->l_req_mode;
167 lock->l_connection = ptlrpc_connection_addref(req->rq_connection);
171 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
173 req->rq_status = err;
175 if (ptlrpc_reply(req->rq_svc, req))
180 ldlm_reprocess_all(lock->l_resource);
183 LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p)", lock);
188 int ldlm_handle_convert(struct ptlrpc_request *req)
190 struct ldlm_request *dlm_req;
191 struct ldlm_reply *dlm_rep;
192 struct ldlm_lock *lock;
193 int rc, size = sizeof(*dlm_rep);
196 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
198 CERROR("out of memory\n");
201 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
202 dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
203 dlm_rep->lock_flags = dlm_req->lock_flags;
205 lock = ldlm_handle2lock(&dlm_req->lock_handle1);
207 req->rq_status = EINVAL;
209 LDLM_DEBUG(lock, "server-side convert handler START");
210 ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
211 &dlm_rep->lock_flags);
214 if (ptlrpc_reply(req->rq_svc, req) != 0)
218 ldlm_reprocess_all(lock->l_resource);
219 LDLM_DEBUG(lock, "server-side convert handler END");
222 LDLM_DEBUG_NOLOCK("server-side convert handler END");
227 int ldlm_handle_cancel(struct ptlrpc_request *req)
229 struct ldlm_request *dlm_req;
230 struct ldlm_lock *lock;
234 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
236 CERROR("out of memory\n");
239 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
241 lock = ldlm_handle2lock(&dlm_req->lock_handle1);
243 LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock (lock "
244 "%p)", (void *)(unsigned long)
245 dlm_req->lock_handle1.addr);
246 req->rq_status = ESTALE;
248 LDLM_DEBUG(lock, "server-side cancel handler START");
249 ldlm_lock_cancel(lock);
253 if (ptlrpc_reply(req->rq_svc, req) != 0)
257 ldlm_reprocess_all(lock->l_resource);
258 LDLM_DEBUG(lock, "server-side cancel handler END");
265 static int ldlm_handle_bl_callback(struct ptlrpc_request *req)
267 struct ldlm_request *dlm_req;
268 struct ldlm_lock *lock;
272 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
275 rc = ptlrpc_reply(req->rq_svc, req);
279 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
281 lock = ldlm_handle2lock(&dlm_req->lock_handle1);
283 CERROR("blocking callback on lock %Lx - lock disappeared\n",
284 dlm_req->lock_handle1.addr);
288 LDLM_DEBUG(lock, "client blocking AST callback handler START");
290 l_lock(&lock->l_resource->lr_namespace->ns_lock);
291 lock->l_flags |= LDLM_FL_CBPENDING;
292 do_ast = (!lock->l_readers && !lock->l_writers);
293 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
296 LDLM_DEBUG(lock, "already unused, calling "
297 "callback (%p)", lock->l_blocking_ast);
298 if (lock->l_blocking_ast != NULL) {
299 lock->l_blocking_ast(lock, &dlm_req->lock_desc,
300 lock->l_data, lock->l_data_len);
303 LDLM_DEBUG(lock, "Lock still has references, will be"
306 LDLM_DEBUG(lock, "client blocking callback handler END");
311 static int ldlm_handle_cp_callback(struct ptlrpc_request *req)
313 struct list_head ast_list = LIST_HEAD_INIT(ast_list);
314 struct ldlm_request *dlm_req;
315 struct ldlm_lock *lock;
319 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
322 rc = ptlrpc_reply(req->rq_svc, req);
326 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
328 lock = ldlm_handle2lock(&dlm_req->lock_handle1);
330 CERROR("completion callback on lock %Lx - lock disappeared\n",
331 dlm_req->lock_handle1.addr);
335 LDLM_DEBUG(lock, "client completion callback handler START");
337 l_lock(&lock->l_resource->lr_namespace->ns_lock);
339 /* If we receive the completion AST before the actual enqueue returned,
340 * then we might need to switch resources or lock modes. */
341 if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
342 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
343 LDLM_DEBUG(lock, "completion AST, new lock mode");
345 ldlm_resource_unlink_lock(lock);
346 if (memcmp(dlm_req->lock_desc.l_resource.lr_name,
347 lock->l_resource->lr_name,
348 sizeof(__u64) * RES_NAME_SIZE) != 0) {
349 ldlm_lock_change_resource(lock, dlm_req->lock_desc.l_resource.lr_name);
350 LDLM_DEBUG(lock, "completion AST, new resource");
352 lock->l_resource->lr_tmp = &ast_list;
353 ldlm_grant_lock(lock);
354 lock->l_resource->lr_tmp = NULL;
355 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
356 LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
359 ldlm_run_ast_work(&ast_list);
361 LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
366 static int ldlm_callback_handler(struct ptlrpc_request *req)
371 rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
373 CERROR("lustre_ldlm: Invalid request\n");
377 if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
378 CERROR("lustre_ldlm: wrong packet type sent %d\n",
379 req->rq_reqmsg->type);
380 GOTO(out, rc = -EINVAL);
382 switch (req->rq_reqmsg->opc) {
383 case LDLM_BL_CALLBACK:
384 CDEBUG(D_INODE, "blocking ast\n");
385 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
386 rc = ldlm_handle_bl_callback(req);
388 case LDLM_CP_CALLBACK:
389 CDEBUG(D_INODE, "completion ast\n");
390 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
391 rc = ldlm_handle_cp_callback(req);
395 rc = ptlrpc_error(req->rq_svc, req);
402 RETURN(ptlrpc_error(req->rq_svc, req));
407 static int ldlm_iocontrol(long cmd, struct lustre_handle *conn, int len,
408 void *karg, void *uarg)
410 struct obd_device *obddev = class_conn2obd(conn);
411 struct ptlrpc_connection *connection;
415 if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
416 _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
417 CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
418 _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
422 OBD_ALLOC(obddev->u.ldlm.ldlm_client,
423 sizeof(*obddev->u.ldlm.ldlm_client));
424 ptlrpc_init_client(NULL, NULL,
425 LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
426 obddev->u.ldlm.ldlm_client);
427 connection = ptlrpc_uuid_to_connection("ldlm");
429 CERROR("No LDLM UUID found: assuming ldlm is local.\n");
433 err = ldlm_test(obddev, conn);
434 CERROR("-- done err %d\n", err);
437 ldlm_dump_all_namespaces();
440 GOTO(out, err = -EINVAL);
445 ptlrpc_put_connection(connection);
446 OBD_FREE(obddev->u.ldlm.ldlm_client,
447 sizeof(*obddev->u.ldlm.ldlm_client));
451 #define LDLM_NUM_THREADS 8
453 static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
455 struct ldlm_obd *ldlm = &obddev->u.ldlm;
460 rc = ldlm_proc_setup(obddev);
464 ldlm->ldlm_service = ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL,
465 LDLM_REPLY_PORTAL, "self",
466 ldlm_callback_handler);
467 if (!ldlm->ldlm_service)
468 GOTO(out_proc, rc = -ENOMEM);
470 for (i = 0; i < LDLM_NUM_THREADS; i++) {
472 sprintf(name, "lustre_dlm_%02d", i);
473 rc = ptlrpc_start_thread(obddev, ldlm->ldlm_service, name);
475 CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
477 GOTO(out_thread, rc);
484 ptlrpc_stop_all_threads(ldlm->ldlm_service);
485 ptlrpc_unregister_service(ldlm->ldlm_service);
487 ldlm_proc_cleanup(obddev);
493 static int ldlm_cleanup(struct obd_device *obddev)
495 struct ldlm_obd *ldlm = &obddev->u.ldlm;
498 if (!list_empty(&ldlm_namespace_list)) {
499 CERROR("ldlm still has namespaces; clean these up first.\n");
503 ptlrpc_stop_all_threads(ldlm->ldlm_service);
504 ptlrpc_unregister_service(ldlm->ldlm_service);
505 ldlm_proc_cleanup(obddev);
511 struct obd_ops ldlm_obd_ops = {
512 o_iocontrol: ldlm_iocontrol,
514 o_cleanup: ldlm_cleanup,
515 o_connect: class_connect,
516 o_disconnect: class_disconnect
519 static int __init ldlm_init(void)
521 int rc = class_register_type(&ldlm_obd_ops, OBD_LDLM_DEVICENAME);
525 ldlm_resource_slab = kmem_cache_create("ldlm_resources",
526 sizeof(struct ldlm_resource), 0,
527 SLAB_HWCACHE_ALIGN, NULL, NULL);
528 if (ldlm_resource_slab == NULL)
531 ldlm_lock_slab = kmem_cache_create("ldlm_locks",
532 sizeof(struct ldlm_lock), 0,
533 SLAB_HWCACHE_ALIGN, NULL, NULL);
534 if (ldlm_lock_slab == NULL) {
535 kmem_cache_destroy(ldlm_resource_slab);
542 static void __exit ldlm_exit(void)
544 class_unregister_type(OBD_LDLM_DEVICENAME);
545 if (kmem_cache_destroy(ldlm_resource_slab) != 0)
546 CERROR("couldn't free ldlm resource slab\n");
547 if (kmem_cache_destroy(ldlm_lock_slab) != 0)
548 CERROR("couldn't free ldlm lock slab\n");
551 EXPORT_SYMBOL(ldlm_completion_ast);
552 EXPORT_SYMBOL(ldlm_handle_enqueue);
553 EXPORT_SYMBOL(ldlm_handle_cancel);
554 EXPORT_SYMBOL(ldlm_handle_convert);
555 EXPORT_SYMBOL(ldlm_register_intent);
556 EXPORT_SYMBOL(ldlm_unregister_intent);
557 EXPORT_SYMBOL(ldlm_lockname);
558 EXPORT_SYMBOL(ldlm_typename);
559 EXPORT_SYMBOL(ldlm_handle2lock);
560 EXPORT_SYMBOL(ldlm_lock2handle);
561 EXPORT_SYMBOL(ldlm_lock_match);
562 EXPORT_SYMBOL(ldlm_lock_addref);
563 EXPORT_SYMBOL(ldlm_lock_decref);
564 EXPORT_SYMBOL(ldlm_lock_change_resource);
565 EXPORT_SYMBOL(ldlm_cli_convert);
566 EXPORT_SYMBOL(ldlm_cli_enqueue);
567 EXPORT_SYMBOL(ldlm_cli_cancel);
568 EXPORT_SYMBOL(ldlm_match_or_enqueue);
569 EXPORT_SYMBOL(ldlm_it2str);
570 EXPORT_SYMBOL(ldlm_test);
571 EXPORT_SYMBOL(ldlm_regression_start);
572 EXPORT_SYMBOL(ldlm_regression_stop);
573 EXPORT_SYMBOL(ldlm_lock_dump);
574 EXPORT_SYMBOL(ldlm_namespace_new);
575 EXPORT_SYMBOL(ldlm_namespace_free);
577 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
578 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
579 MODULE_LICENSE("GPL");
581 module_init(ldlm_init);
582 module_exit(ldlm_exit);