1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002 Cluster File Systems, Inc.
5 * Author: Peter Braam <braam@clusterfs.com>
6 * Author: Phil Schwan <phil@clusterfs.com>
8 * This file is part of Lustre, http://www.lustre.org.
10 * Lustre is free software; you can redistribute it and/or
11 * modify it under the terms of version 2 of the GNU General Public
12 * License as published by the Free Software Foundation.
14 * Lustre is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with Lustre; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25 #define DEBUG_SUBSYSTEM S_LDLM
27 #include <linux/module.h>
28 #include <linux/slab.h>
29 #include <linux/lustre_dlm.h>
30 #include <linux/init.h>
32 extern kmem_cache_t *ldlm_resource_slab;
33 extern kmem_cache_t *ldlm_lock_slab;
34 extern struct list_head ldlm_namespace_list;
35 extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
36 extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
38 static int ldlm_server_blocking_ast(struct ldlm_lock *lock,
39 struct ldlm_lock_desc *desc,
40 void *data, __u32 data_len)
42 struct ldlm_request *body;
43 struct ptlrpc_request *req;
44 struct ptlrpc_client *cl;
45 int rc = 0, size = sizeof(*body);
48 cl = &lock->l_resource->lr_namespace->ns_rpc_client;
49 req = ptlrpc_prep_req(cl, lock->l_export->exp_connection,
50 LDLM_BL_CALLBACK, 1, &size, NULL);
54 body = lustre_msg_buf(req->rq_reqmsg, 0);
55 memcpy(&body->lock_handle1, &lock->l_remote_handle,
56 sizeof(body->lock_handle1));
57 memcpy(&body->lock_desc, desc, sizeof(*desc));
59 LDLM_DEBUG(lock, "server preparing blocking AST");
60 req->rq_replen = lustre_msg_size(0, NULL);
62 rc = ptlrpc_queue_wait(req);
63 rc = ptlrpc_check_status(req, rc);
69 static int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags)
71 struct ldlm_request *body;
72 struct ptlrpc_request *req;
73 struct ptlrpc_client *cl;
74 int rc = 0, size = sizeof(*body);
82 cl = &lock->l_resource->lr_namespace->ns_rpc_client;
83 req = ptlrpc_prep_req(cl, lock->l_export->exp_connection,
84 LDLM_CP_CALLBACK, 1, &size, NULL);
88 body = lustre_msg_buf(req->rq_reqmsg, 0);
89 memcpy(&body->lock_handle1, &lock->l_remote_handle,
90 sizeof(body->lock_handle1));
91 body->lock_flags = flags;
92 ldlm_lock2desc(lock, &body->lock_desc);
94 LDLM_DEBUG(lock, "server preparing completion AST");
95 req->rq_replen = lustre_msg_size(0, NULL);
97 rc = ptlrpc_queue_wait(req);
98 rc = ptlrpc_check_status(req, rc);
103 int ldlm_handle_enqueue(struct ptlrpc_request *req)
105 struct obd_device *obddev = req->rq_export->exp_obd;
106 struct ldlm_reply *dlm_rep;
107 struct ldlm_request *dlm_req;
108 int rc, size = sizeof(*dlm_rep), cookielen = 0;
111 struct ldlm_lock *lock = NULL;
115 LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
117 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
118 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_MDSINTENT) {
119 /* In this case, the reply buffer is allocated deep in
120 * local_lock_enqueue by the policy function. */
122 cookielen = sizeof(*req);
124 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
127 CERROR("out of memory\n");
130 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) {
131 cookie = &dlm_req->lock_desc.l_extent;
132 cookielen = sizeof(struct ldlm_extent);
136 lock = ldlm_lock_create(obddev->obd_namespace,
137 &dlm_req->lock_handle2,
138 dlm_req->lock_desc.l_resource.lr_name,
139 dlm_req->lock_desc.l_resource.lr_type,
140 dlm_req->lock_desc.l_req_mode, NULL, 0);
142 GOTO(out, err = -ENOMEM);
144 memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
145 sizeof(lock->l_remote_handle));
146 LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
148 flags = dlm_req->lock_flags;
149 err = ldlm_lock_enqueue(lock, cookie, cookielen, &flags,
150 ldlm_server_completion_ast,
151 ldlm_server_blocking_ast);
155 dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
156 dlm_rep->lock_flags = flags;
158 ldlm_lock2handle(lock, &dlm_rep->lock_handle);
159 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
160 memcpy(&dlm_rep->lock_extent, &lock->l_extent,
161 sizeof(lock->l_extent));
162 if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED) {
163 memcpy(dlm_rep->lock_resource_name, lock->l_resource->lr_name,
164 sizeof(dlm_rep->lock_resource_name));
165 dlm_rep->lock_mode = lock->l_req_mode;
168 lock->l_export = req->rq_export;
169 ptlrpc_connection_addref(req->rq_connection);
173 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
175 req->rq_status = err;
177 if (ptlrpc_reply(req->rq_svc, req))
182 ldlm_reprocess_all(lock->l_resource);
185 LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p)", lock);
190 int ldlm_handle_convert(struct ptlrpc_request *req)
192 struct ldlm_request *dlm_req;
193 struct ldlm_reply *dlm_rep;
194 struct ldlm_lock *lock;
195 int rc, size = sizeof(*dlm_rep);
198 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
200 CERROR("out of memory\n");
203 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
204 dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
205 dlm_rep->lock_flags = dlm_req->lock_flags;
207 lock = ldlm_handle2lock(&dlm_req->lock_handle1);
209 req->rq_status = EINVAL;
211 LDLM_DEBUG(lock, "server-side convert handler START");
212 ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
213 &dlm_rep->lock_flags);
216 if (ptlrpc_reply(req->rq_svc, req) != 0)
220 ldlm_reprocess_all(lock->l_resource);
221 LDLM_DEBUG(lock, "server-side convert handler END");
224 LDLM_DEBUG_NOLOCK("server-side convert handler END");
229 int ldlm_handle_cancel(struct ptlrpc_request *req)
231 struct ldlm_request *dlm_req;
232 struct ldlm_lock *lock;
236 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
238 CERROR("out of memory\n");
241 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
243 lock = ldlm_handle2lock(&dlm_req->lock_handle1);
245 LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock (lock "
246 "%p)", (void *)(unsigned long)
247 dlm_req->lock_handle1.addr);
248 req->rq_status = ESTALE;
250 LDLM_DEBUG(lock, "server-side cancel handler START");
251 ldlm_lock_cancel(lock);
255 if (ptlrpc_reply(req->rq_svc, req) != 0)
259 ldlm_reprocess_all(lock->l_resource);
260 LDLM_DEBUG(lock, "server-side cancel handler END");
267 static int ldlm_handle_bl_callback(struct ptlrpc_request *req)
269 struct ldlm_request *dlm_req;
270 struct ldlm_lock *lock;
274 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
277 rc = ptlrpc_reply(req->rq_svc, req);
281 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
283 lock = ldlm_handle2lock(&dlm_req->lock_handle1);
285 CERROR("blocking callback on lock %Lx - lock disappeared\n",
286 dlm_req->lock_handle1.addr);
290 LDLM_DEBUG(lock, "client blocking AST callback handler START");
292 l_lock(&lock->l_resource->lr_namespace->ns_lock);
293 lock->l_flags |= LDLM_FL_CBPENDING;
294 do_ast = (!lock->l_readers && !lock->l_writers);
295 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
298 LDLM_DEBUG(lock, "already unused, calling "
299 "callback (%p)", lock->l_blocking_ast);
300 if (lock->l_blocking_ast != NULL) {
301 lock->l_blocking_ast(lock, &dlm_req->lock_desc,
302 lock->l_data, lock->l_data_len);
305 LDLM_DEBUG(lock, "Lock still has references, will be"
308 LDLM_DEBUG(lock, "client blocking callback handler END");
313 static int ldlm_handle_cp_callback(struct ptlrpc_request *req)
315 struct list_head ast_list = LIST_HEAD_INIT(ast_list);
316 struct ldlm_request *dlm_req;
317 struct ldlm_lock *lock;
321 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
324 rc = ptlrpc_reply(req->rq_svc, req);
328 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
330 lock = ldlm_handle2lock(&dlm_req->lock_handle1);
332 CERROR("completion callback on lock %Lx - lock disappeared\n",
333 dlm_req->lock_handle1.addr);
337 LDLM_DEBUG(lock, "client completion callback handler START");
339 l_lock(&lock->l_resource->lr_namespace->ns_lock);
341 /* If we receive the completion AST before the actual enqueue returned,
342 * then we might need to switch resources or lock modes. */
343 if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
344 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
345 LDLM_DEBUG(lock, "completion AST, new lock mode");
347 ldlm_resource_unlink_lock(lock);
348 if (memcmp(dlm_req->lock_desc.l_resource.lr_name,
349 lock->l_resource->lr_name,
350 sizeof(__u64) * RES_NAME_SIZE) != 0) {
351 ldlm_lock_change_resource(lock, dlm_req->lock_desc.l_resource.lr_name);
352 LDLM_DEBUG(lock, "completion AST, new resource");
354 lock->l_resource->lr_tmp = &ast_list;
355 ldlm_grant_lock(lock);
356 lock->l_resource->lr_tmp = NULL;
357 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
358 LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
361 ldlm_run_ast_work(&ast_list);
363 LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
368 static int ldlm_callback_handler(struct ptlrpc_request *req)
373 rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
375 CERROR("lustre_ldlm: Invalid request\n");
379 if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
380 CERROR("lustre_ldlm: wrong packet type sent %d\n",
381 req->rq_reqmsg->type);
382 GOTO(out, rc = -EINVAL);
384 switch (req->rq_reqmsg->opc) {
385 case LDLM_BL_CALLBACK:
386 CDEBUG(D_INODE, "blocking ast\n");
387 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
388 rc = ldlm_handle_bl_callback(req);
390 case LDLM_CP_CALLBACK:
391 CDEBUG(D_INODE, "completion ast\n");
392 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
393 rc = ldlm_handle_cp_callback(req);
397 rc = ptlrpc_error(req->rq_svc, req);
404 RETURN(ptlrpc_error(req->rq_svc, req));
409 static int ldlm_iocontrol(long cmd, struct lustre_handle *conn, int len,
410 void *karg, void *uarg)
412 struct obd_device *obddev = class_conn2obd(conn);
413 struct ptlrpc_connection *connection;
417 if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
418 _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
419 CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
420 _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
424 OBD_ALLOC(obddev->u.ldlm.ldlm_client,
425 sizeof(*obddev->u.ldlm.ldlm_client));
426 ptlrpc_init_client(NULL, NULL,
427 LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
428 obddev->u.ldlm.ldlm_client);
429 connection = ptlrpc_uuid_to_connection("ldlm");
431 CERROR("No LDLM UUID found: assuming ldlm is local.\n");
435 err = ldlm_test(obddev, conn);
436 CERROR("-- done err %d\n", err);
439 ldlm_dump_all_namespaces();
442 GOTO(out, err = -EINVAL);
447 ptlrpc_put_connection(connection);
448 OBD_FREE(obddev->u.ldlm.ldlm_client,
449 sizeof(*obddev->u.ldlm.ldlm_client));
453 #define LDLM_NUM_THREADS 8
455 static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
457 struct ldlm_obd *ldlm = &obddev->u.ldlm;
462 rc = ldlm_proc_setup(obddev);
466 ldlm->ldlm_service = ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL,
467 LDLM_REPLY_PORTAL, "self",
468 ldlm_callback_handler);
469 if (!ldlm->ldlm_service)
470 GOTO(out_proc, rc = -ENOMEM);
472 for (i = 0; i < LDLM_NUM_THREADS; i++) {
474 sprintf(name, "lustre_dlm_%02d", i);
475 rc = ptlrpc_start_thread(obddev, ldlm->ldlm_service, name);
477 CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
479 GOTO(out_thread, rc);
486 ptlrpc_stop_all_threads(ldlm->ldlm_service);
487 ptlrpc_unregister_service(ldlm->ldlm_service);
489 ldlm_proc_cleanup(obddev);
495 static int ldlm_cleanup(struct obd_device *obddev)
497 struct ldlm_obd *ldlm = &obddev->u.ldlm;
500 if (!list_empty(&ldlm_namespace_list)) {
501 CERROR("ldlm still has namespaces; clean these up first.\n");
505 ptlrpc_stop_all_threads(ldlm->ldlm_service);
506 ptlrpc_unregister_service(ldlm->ldlm_service);
507 ldlm_proc_cleanup(obddev);
513 struct obd_ops ldlm_obd_ops = {
514 o_iocontrol: ldlm_iocontrol,
516 o_cleanup: ldlm_cleanup,
517 o_connect: class_connect,
518 o_disconnect: class_disconnect
521 static int __init ldlm_init(void)
523 int rc = class_register_type(&ldlm_obd_ops, OBD_LDLM_DEVICENAME);
527 ldlm_resource_slab = kmem_cache_create("ldlm_resources",
528 sizeof(struct ldlm_resource), 0,
529 SLAB_HWCACHE_ALIGN, NULL, NULL);
530 if (ldlm_resource_slab == NULL)
533 ldlm_lock_slab = kmem_cache_create("ldlm_locks",
534 sizeof(struct ldlm_lock), 0,
535 SLAB_HWCACHE_ALIGN, NULL, NULL);
536 if (ldlm_lock_slab == NULL) {
537 kmem_cache_destroy(ldlm_resource_slab);
544 static void __exit ldlm_exit(void)
546 class_unregister_type(OBD_LDLM_DEVICENAME);
547 if (kmem_cache_destroy(ldlm_resource_slab) != 0)
548 CERROR("couldn't free ldlm resource slab\n");
549 if (kmem_cache_destroy(ldlm_lock_slab) != 0)
550 CERROR("couldn't free ldlm lock slab\n");
553 EXPORT_SYMBOL(ldlm_completion_ast);
554 EXPORT_SYMBOL(ldlm_handle_enqueue);
555 EXPORT_SYMBOL(ldlm_handle_cancel);
556 EXPORT_SYMBOL(ldlm_handle_convert);
557 EXPORT_SYMBOL(ldlm_register_intent);
558 EXPORT_SYMBOL(ldlm_unregister_intent);
559 EXPORT_SYMBOL(ldlm_lockname);
560 EXPORT_SYMBOL(ldlm_typename);
561 EXPORT_SYMBOL(ldlm_handle2lock);
562 EXPORT_SYMBOL(ldlm_lock2handle);
563 EXPORT_SYMBOL(ldlm_lock_put);
564 EXPORT_SYMBOL(ldlm_lock_match);
565 EXPORT_SYMBOL(ldlm_lock_addref);
566 EXPORT_SYMBOL(ldlm_lock_decref);
567 EXPORT_SYMBOL(ldlm_lock_change_resource);
568 EXPORT_SYMBOL(ldlm_cli_convert);
569 EXPORT_SYMBOL(ldlm_cli_enqueue);
570 EXPORT_SYMBOL(ldlm_cli_cancel);
571 EXPORT_SYMBOL(ldlm_match_or_enqueue);
572 EXPORT_SYMBOL(ldlm_it2str);
573 EXPORT_SYMBOL(ldlm_test);
574 EXPORT_SYMBOL(ldlm_regression_start);
575 EXPORT_SYMBOL(ldlm_regression_stop);
576 EXPORT_SYMBOL(ldlm_lock_dump);
577 EXPORT_SYMBOL(ldlm_namespace_new);
578 EXPORT_SYMBOL(ldlm_namespace_free);
579 EXPORT_SYMBOL(l_lock);
580 EXPORT_SYMBOL(l_unlock);
582 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
583 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
584 MODULE_LICENSE("GPL");
586 module_init(ldlm_init);
587 module_exit(ldlm_exit);