1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002 Cluster File Systems, Inc.
6 * This code is issued under the GNU General Public License.
7 * See the file COPYING in this distribution
9 * by Cluster File Systems, Inc.
13 #define DEBUG_SUBSYSTEM S_LDLM
15 #include <linux/module.h>
16 #include <linux/slab.h>
17 #include <linux/lustre_dlm.h>
19 extern kmem_cache_t *ldlm_resource_slab;
20 extern kmem_cache_t *ldlm_lock_slab;
21 extern struct list_head ldlm_namespace_list;
22 extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
23 extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
25 static int ldlm_server_blocking_ast(struct ldlm_lock *lock,
26 struct ldlm_lock_desc *desc,
27 void *data, __u32 data_len)
29 struct ldlm_request *body;
30 struct ptlrpc_request *req;
31 struct ptlrpc_client *cl;
32 int rc = 0, size = sizeof(*body);
35 cl = &lock->l_resource->lr_namespace->ns_rpc_client;
36 req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_BL_CALLBACK, 1,
41 body = lustre_msg_buf(req->rq_reqmsg, 0);
42 memcpy(&body->lock_handle1, &lock->l_remote_handle,
43 sizeof(body->lock_handle1));
44 memcpy(&body->lock_desc, desc, sizeof(*desc));
46 LDLM_DEBUG(lock, "server preparing blocking AST");
47 req->rq_replen = lustre_msg_size(0, NULL);
49 rc = ptlrpc_queue_wait(req);
50 rc = ptlrpc_check_status(req, rc);
56 static int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags)
58 struct ldlm_request *body;
59 struct ptlrpc_request *req;
60 struct ptlrpc_client *cl;
61 int rc = 0, size = sizeof(*body);
69 cl = &lock->l_resource->lr_namespace->ns_rpc_client;
70 req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CP_CALLBACK, 1,
75 body = lustre_msg_buf(req->rq_reqmsg, 0);
76 memcpy(&body->lock_handle1, &lock->l_remote_handle,
77 sizeof(body->lock_handle1));
78 body->lock_flags = flags;
79 ldlm_lock2desc(lock, &body->lock_desc);
81 LDLM_DEBUG(lock, "server preparing completion AST");
82 req->rq_replen = lustre_msg_size(0, NULL);
84 rc = ptlrpc_queue_wait(req);
85 rc = ptlrpc_check_status(req, rc);
90 int ldlm_handle_enqueue(struct ptlrpc_request *req)
92 struct obd_device *obddev = req->rq_export->exp_obd;
93 struct ldlm_reply *dlm_rep;
94 struct ldlm_request *dlm_req;
95 int rc, size = sizeof(*dlm_rep), cookielen = 0;
98 struct ldlm_lock *lock = NULL;
102 LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
104 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
105 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_MDSINTENT) {
106 /* In this case, the reply buffer is allocated deep in
107 * local_lock_enqueue by the policy function. */
109 cookielen = sizeof(*req);
111 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
114 CERROR("out of memory\n");
117 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) {
118 cookie = &dlm_req->lock_desc.l_extent;
119 cookielen = sizeof(struct ldlm_extent);
123 lock = ldlm_lock_create(obddev->obd_namespace,
124 &dlm_req->lock_handle2,
125 dlm_req->lock_desc.l_resource.lr_name,
126 dlm_req->lock_desc.l_resource.lr_type,
127 dlm_req->lock_desc.l_req_mode, NULL, 0);
129 GOTO(out, err = -ENOMEM);
131 memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
132 sizeof(lock->l_remote_handle));
133 LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
135 flags = dlm_req->lock_flags;
136 err = ldlm_lock_enqueue(lock, cookie, cookielen, &flags,
137 ldlm_server_completion_ast,
138 ldlm_server_blocking_ast);
142 dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
143 dlm_rep->lock_flags = flags;
145 ldlm_lock2handle(lock, &dlm_rep->lock_handle);
146 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
147 memcpy(&dlm_rep->lock_extent, &lock->l_extent,
148 sizeof(lock->l_extent));
149 if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED) {
150 memcpy(dlm_rep->lock_resource_name, lock->l_resource->lr_name,
151 sizeof(dlm_rep->lock_resource_name));
152 dlm_rep->lock_mode = lock->l_req_mode;
155 lock->l_connection = ptlrpc_connection_addref(req->rq_connection);
159 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
161 req->rq_status = err;
163 if (ptlrpc_reply(req->rq_svc, req))
168 ldlm_reprocess_all(lock->l_resource);
171 LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p)", lock);
176 int ldlm_handle_convert(struct ptlrpc_request *req)
178 struct ldlm_request *dlm_req;
179 struct ldlm_reply *dlm_rep;
180 struct ldlm_lock *lock;
181 int rc, size = sizeof(*dlm_rep);
184 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
186 CERROR("out of memory\n");
189 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
190 dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
191 dlm_rep->lock_flags = dlm_req->lock_flags;
193 lock = ldlm_handle2lock(&dlm_req->lock_handle1);
195 req->rq_status = EINVAL;
197 LDLM_DEBUG(lock, "server-side convert handler START");
198 ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
199 &dlm_rep->lock_flags);
202 if (ptlrpc_reply(req->rq_svc, req) != 0)
206 ldlm_reprocess_all(lock->l_resource);
207 LDLM_DEBUG(lock, "server-side convert handler END");
210 LDLM_DEBUG_NOLOCK("server-side convert handler END");
215 int ldlm_handle_cancel(struct ptlrpc_request *req)
217 struct ldlm_request *dlm_req;
218 struct ldlm_lock *lock;
222 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
224 CERROR("out of memory\n");
227 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
229 lock = ldlm_handle2lock(&dlm_req->lock_handle1);
231 LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock (lock %p)",
232 (void *)(unsigned long) dlm_req->lock_handle1.addr);
233 req->rq_status = ESTALE;
235 LDLM_DEBUG(lock, "server-side cancel handler START");
236 ldlm_lock_cancel(lock);
240 if (ptlrpc_reply(req->rq_svc, req) != 0)
244 ldlm_reprocess_all(lock->l_resource);
245 LDLM_DEBUG(lock, "server-side cancel handler END");
252 static int ldlm_handle_bl_callback(struct ptlrpc_request *req)
254 struct ldlm_request *dlm_req;
255 struct ldlm_lock *lock;
259 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
262 rc = ptlrpc_reply(req->rq_svc, req);
266 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
268 lock = ldlm_handle2lock(&dlm_req->lock_handle1);
270 CERROR("blocking callback on lock %Lx - lock disappeared\n",
271 dlm_req->lock_handle1.addr);
275 LDLM_DEBUG(lock, "client blocking AST callback handler START");
277 l_lock(&lock->l_resource->lr_namespace->ns_lock);
278 lock->l_flags |= LDLM_FL_CBPENDING;
279 do_ast = (!lock->l_readers && !lock->l_writers);
280 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
283 LDLM_DEBUG(lock, "already unused, calling "
284 "callback (%p)", lock->l_blocking_ast);
285 if (lock->l_blocking_ast != NULL) {
286 lock->l_blocking_ast(lock, &dlm_req->lock_desc,
287 lock->l_data, lock->l_data_len);
290 LDLM_DEBUG(lock, "Lock still has references, will be"
293 LDLM_DEBUG(lock, "client blocking callback handler END");
298 static int ldlm_handle_cp_callback(struct ptlrpc_request *req)
300 struct list_head ast_list = LIST_HEAD_INIT(ast_list);
301 struct ldlm_request *dlm_req;
302 struct ldlm_lock *lock;
306 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
309 rc = ptlrpc_reply(req->rq_svc, req);
313 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
315 lock = ldlm_handle2lock(&dlm_req->lock_handle1);
317 CERROR("completion callback on lock %Lx - lock disappeared\n",
318 dlm_req->lock_handle1.addr);
322 LDLM_DEBUG(lock, "client completion callback handler START");
324 l_lock(&lock->l_resource->lr_namespace->ns_lock);
325 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
327 /* If we receive the completion AST before the actual enqueue returned,
328 * then we might need to switch resources. */
329 ldlm_resource_unlink_lock(lock);
330 if (memcmp(dlm_req->lock_desc.l_resource.lr_name,
331 lock->l_resource->lr_name,
332 sizeof(__u64) * RES_NAME_SIZE) != 0) {
333 ldlm_lock_change_resource(lock, dlm_req->lock_desc.l_resource.lr_name);
334 LDLM_DEBUG(lock, "completion AST, new resource");
336 lock->l_resource->lr_tmp = &ast_list;
337 ldlm_grant_lock(lock);
338 lock->l_resource->lr_tmp = NULL;
339 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
340 LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
343 ldlm_run_ast_work(&ast_list);
345 LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
350 static int ldlm_callback_handler(struct ptlrpc_request *req)
355 rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
357 CERROR("lustre_ldlm: Invalid request\n");
361 if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
362 CERROR("lustre_ldlm: wrong packet type sent %d\n",
363 req->rq_reqmsg->type);
364 GOTO(out, rc = -EINVAL);
366 switch (req->rq_reqmsg->opc) {
367 case LDLM_BL_CALLBACK:
368 CDEBUG(D_INODE, "blocking ast\n");
369 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
370 rc = ldlm_handle_bl_callback(req);
372 case LDLM_CP_CALLBACK:
373 CDEBUG(D_INODE, "completion ast\n");
374 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
375 rc = ldlm_handle_cp_callback(req);
379 rc = ptlrpc_error(req->rq_svc, req);
386 RETURN(ptlrpc_error(req->rq_svc, req));
391 static int ldlm_iocontrol(long cmd, struct lustre_handle *conn, int len,
392 void *karg, void *uarg)
394 struct obd_device *obddev = class_conn2obd(conn);
395 struct ptlrpc_connection *connection;
399 if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
400 _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
401 CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
402 _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
406 OBD_ALLOC(obddev->u.ldlm.ldlm_client,
407 sizeof(*obddev->u.ldlm.ldlm_client));
408 ptlrpc_init_client(NULL, NULL,
409 LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
410 obddev->u.ldlm.ldlm_client);
411 connection = ptlrpc_uuid_to_connection("ldlm");
413 CERROR("No LDLM UUID found: assuming ldlm is local.\n");
417 err = ldlm_test(obddev, connection);
418 CERROR("-- done err %d\n", err);
421 ldlm_dump_all_namespaces();
424 GOTO(out, err = -EINVAL);
429 ptlrpc_put_connection(connection);
430 OBD_FREE(obddev->u.ldlm.ldlm_client,
431 sizeof(*obddev->u.ldlm.ldlm_client));
435 #define LDLM_NUM_THREADS 8
437 static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
439 struct ldlm_obd *ldlm = &obddev->u.ldlm;
444 rc = ldlm_proc_setup(obddev);
448 ldlm->ldlm_service = ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL,
449 LDLM_REPLY_PORTAL, "self",
450 ldlm_callback_handler);
451 if (!ldlm->ldlm_service)
452 GOTO(out_proc, rc = -ENOMEM);
454 for (i = 0; i < LDLM_NUM_THREADS; i++) {
456 sprintf(name, "lustre_dlm_%02d", i);
457 rc = ptlrpc_start_thread(obddev, ldlm->ldlm_service, name);
459 CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
461 GOTO(out_thread, rc);
468 ptlrpc_stop_all_threads(ldlm->ldlm_service);
469 ptlrpc_unregister_service(ldlm->ldlm_service);
471 ldlm_proc_cleanup(obddev);
477 static int ldlm_cleanup(struct obd_device *obddev)
479 struct ldlm_obd *ldlm = &obddev->u.ldlm;
482 if (!list_empty(&ldlm_namespace_list)) {
483 CERROR("ldlm still has namespaces; clean these up first.\n");
487 ptlrpc_stop_all_threads(ldlm->ldlm_service);
488 ptlrpc_unregister_service(ldlm->ldlm_service);
489 ldlm_proc_cleanup(obddev);
495 struct obd_ops ldlm_obd_ops = {
496 o_iocontrol: ldlm_iocontrol,
498 o_cleanup: ldlm_cleanup,
499 o_connect: class_connect,
500 o_disconnect: class_disconnect
503 static int __init ldlm_init(void)
505 int rc = class_register_type(&ldlm_obd_ops, OBD_LDLM_DEVICENAME);
509 ldlm_resource_slab = kmem_cache_create("ldlm_resources",
510 sizeof(struct ldlm_resource), 0,
511 SLAB_HWCACHE_ALIGN, NULL, NULL);
512 if (ldlm_resource_slab == NULL)
515 ldlm_lock_slab = kmem_cache_create("ldlm_locks",
516 sizeof(struct ldlm_lock), 0,
517 SLAB_HWCACHE_ALIGN, NULL, NULL);
518 if (ldlm_lock_slab == NULL) {
519 kmem_cache_destroy(ldlm_resource_slab);
526 static void __exit ldlm_exit(void)
528 class_unregister_type(OBD_LDLM_DEVICENAME);
529 if (kmem_cache_destroy(ldlm_resource_slab) != 0)
530 CERROR("couldn't free ldlm resource slab\n");
531 if (kmem_cache_destroy(ldlm_lock_slab) != 0)
532 CERROR("couldn't free ldlm lock slab\n");
535 EXPORT_SYMBOL(ldlm_completion_ast);
536 EXPORT_SYMBOL(ldlm_handle_enqueue);
537 EXPORT_SYMBOL(ldlm_handle_cancel);
538 EXPORT_SYMBOL(ldlm_handle_convert);
539 EXPORT_SYMBOL(ldlm_register_intent);
540 EXPORT_SYMBOL(ldlm_unregister_intent);
541 EXPORT_SYMBOL(ldlm_lockname);
542 EXPORT_SYMBOL(ldlm_typename);
543 EXPORT_SYMBOL(ldlm_handle2lock);
544 EXPORT_SYMBOL(ldlm_lock2handle);
545 EXPORT_SYMBOL(ldlm_lock_match);
546 EXPORT_SYMBOL(ldlm_lock_addref);
547 EXPORT_SYMBOL(ldlm_lock_decref);
548 EXPORT_SYMBOL(ldlm_lock_change_resource);
549 EXPORT_SYMBOL(ldlm_cli_convert);
550 EXPORT_SYMBOL(ldlm_cli_enqueue);
551 EXPORT_SYMBOL(ldlm_cli_cancel);
552 EXPORT_SYMBOL(ldlm_match_or_enqueue);
553 EXPORT_SYMBOL(ldlm_it2str);
554 EXPORT_SYMBOL(ldlm_test);
555 EXPORT_SYMBOL(ldlm_lock_dump);
556 EXPORT_SYMBOL(ldlm_namespace_new);
557 EXPORT_SYMBOL(ldlm_namespace_free);
559 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
560 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
561 MODULE_LICENSE("GPL");
563 module_init(ldlm_init);
564 module_exit(ldlm_exit);