1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002 Cluster File Systems, Inc.
6 * This code is issued under the GNU General Public License.
7 * See the file COPYING in this distribution
9 * by Cluster File Systems, Inc.
13 #define DEBUG_SUBSYSTEM S_LDLM
15 #include <linux/module.h>
16 #include <linux/slab.h>
17 #include <linux/lustre_dlm.h>
19 extern kmem_cache_t *ldlm_resource_slab;
20 extern kmem_cache_t *ldlm_lock_slab;
21 extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
22 extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
24 int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags)
26 struct ldlm_request *body;
27 struct ptlrpc_request *req;
28 struct ptlrpc_client *cl;
29 int rc = 0, size = sizeof(*body);
37 cl = &lock->l_resource->lr_namespace->ns_rpc_client;
38 req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 1,
43 body = lustre_msg_buf(req->rq_reqmsg, 0);
44 memcpy(&body->lock_handle1, &lock->l_remote_handle,
45 sizeof(body->lock_handle1));
46 body->lock_flags = flags;
48 LDLM_DEBUG(lock, "server preparing completion AST");
49 req->rq_replen = lustre_msg_size(0, NULL);
51 rc = ptlrpc_queue_wait(req);
52 rc = ptlrpc_check_status(req, rc);
57 int ldlm_handle_enqueue(struct ptlrpc_request *req)
59 struct obd_device *obddev = req->rq_export->exp_obd;
60 struct ldlm_reply *dlm_rep;
61 struct ldlm_request *dlm_req;
62 int rc, size = sizeof(*dlm_rep), cookielen = 0;
65 struct ldlm_lock *lock = NULL;
69 LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
71 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
72 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_MDSINTENT) {
73 /* In this case, the reply buffer is allocated deep in
74 * local_lock_enqueue by the policy function. */
76 cookielen = sizeof(*req);
78 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
81 CERROR("out of memory\n");
84 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) {
85 cookie = &dlm_req->lock_desc.l_extent;
86 cookielen = sizeof(struct ldlm_extent);
90 lock = ldlm_lock_create(obddev->obd_namespace,
91 &dlm_req->lock_handle2,
92 dlm_req->lock_desc.l_resource.lr_name,
93 dlm_req->lock_desc.l_resource.lr_type,
94 dlm_req->lock_desc.l_req_mode, NULL, 0);
96 GOTO(out, err = -ENOMEM);
98 memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
99 sizeof(lock->l_remote_handle));
100 LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
102 flags = dlm_req->lock_flags;
103 err = ldlm_lock_enqueue(lock, cookie, cookielen, &flags,
104 ldlm_server_completion_ast, ldlm_server_ast);
108 dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
109 dlm_rep->lock_flags = flags;
111 ldlm_lock2handle(lock, &dlm_rep->lock_handle);
112 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
113 memcpy(&dlm_rep->lock_extent, &lock->l_extent,
114 sizeof(lock->l_extent));
115 if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED)
116 memcpy(dlm_rep->lock_resource_name, lock->l_resource->lr_name,
117 sizeof(dlm_rep->lock_resource_name));
119 lock->l_connection = ptlrpc_connection_addref(req->rq_connection);
123 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
125 req->rq_status = err;
127 if (ptlrpc_reply(req->rq_svc, req))
132 ldlm_reprocess_all(lock->l_resource);
135 LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p)", lock);
140 int ldlm_handle_convert(struct ptlrpc_request *req)
142 struct ldlm_request *dlm_req;
143 struct ldlm_reply *dlm_rep;
144 struct ldlm_lock *lock;
145 int rc, size = sizeof(*dlm_rep);
148 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
150 CERROR("out of memory\n");
153 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
154 dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
155 dlm_rep->lock_flags = dlm_req->lock_flags;
157 lock = ldlm_handle2lock(&dlm_req->lock_handle1);
159 req->rq_status = EINVAL;
161 LDLM_DEBUG(lock, "server-side convert handler START");
162 ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
163 &dlm_rep->lock_flags);
166 if (ptlrpc_reply(req->rq_svc, req) != 0)
170 ldlm_reprocess_all(lock->l_resource);
171 LDLM_DEBUG(lock, "server-side convert handler END");
174 LDLM_DEBUG_NOLOCK("server-side convert handler END");
179 int ldlm_handle_cancel(struct ptlrpc_request *req)
181 struct ldlm_request *dlm_req;
182 struct ldlm_lock *lock;
186 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
188 CERROR("out of memory\n");
191 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
193 lock = ldlm_handle2lock(&dlm_req->lock_handle1);
195 LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock (lock %p)",
196 (void *)(unsigned long) dlm_req->lock_handle1.addr);
197 req->rq_status = ESTALE;
199 LDLM_DEBUG(lock, "server-side cancel handler START");
200 ldlm_lock_cancel(lock);
204 if (ptlrpc_reply(req->rq_svc, req) != 0)
208 ldlm_reprocess_all(lock->l_resource);
209 LDLM_DEBUG(lock, "server-side cancel handler END");
216 static int ldlm_handle_callback(struct ptlrpc_request *req)
218 struct ldlm_request *dlm_req;
219 struct ldlm_lock_desc *descp = NULL;
220 struct ldlm_lock *lock;
221 __u64 is_blocking_ast = 0;
225 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
227 CERROR("out of memory\n");
230 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
232 /* We must send the reply first, so that the thread is free to handle
233 * any requests made in common_callback() */
234 rc = ptlrpc_reply(req->rq_svc, req);
238 lock = ldlm_handle2lock(&dlm_req->lock_handle1);
240 CERROR("callback on lock %Lx - lock disappeared\n",
241 dlm_req->lock_handle1.addr);
245 /* check if this is a blocking AST */
246 if (dlm_req->lock_desc.l_req_mode !=
247 dlm_req->lock_desc.l_granted_mode) {
248 descp = &dlm_req->lock_desc;
252 LDLM_DEBUG(lock, "client %s callback handler START",
253 is_blocking_ast ? "blocked" : "completion");
257 l_lock(&lock->l_resource->lr_namespace->ns_lock);
258 lock->l_flags |= LDLM_FL_CBPENDING;
259 do_ast = (!lock->l_readers && !lock->l_writers);
260 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
263 LDLM_DEBUG(lock, "already unused, calling "
264 "callback (%p)", lock->l_blocking_ast);
265 if (lock->l_blocking_ast != NULL) {
266 struct lustre_handle lockh;
267 ldlm_lock2handle(lock, &lockh);
268 lock->l_blocking_ast(&lockh, descp,
273 LDLM_DEBUG(lock, "Lock still has references, will be"
278 struct list_head ast_list = LIST_HEAD_INIT(ast_list);
280 l_lock(&lock->l_resource->lr_namespace->ns_lock);
281 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
283 /* If we receive the completion AST before the actual enqueue
284 * returned, then we might need to switch resources. */
285 ldlm_resource_unlink_lock(lock);
286 if (memcmp(dlm_req->lock_desc.l_resource.lr_name,
287 lock->l_resource->lr_name,
288 sizeof(__u64) * RES_NAME_SIZE) != 0) {
289 ldlm_lock_change_resource(lock, dlm_req->lock_desc.l_resource.lr_name);
290 LDLM_DEBUG(lock, "completion AST, new resource");
292 lock->l_resource->lr_tmp = &ast_list;
293 ldlm_grant_lock(lock);
294 lock->l_resource->lr_tmp = NULL;
295 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
298 ldlm_run_ast_work(&ast_list);
301 LDLM_DEBUG_NOLOCK("client %s callback handler END (lock %p)",
302 is_blocking_ast ? "blocked" : "completion", lock);
307 static int ldlm_callback_handler(struct ptlrpc_request *req)
312 rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
314 CERROR("lustre_ldlm: Invalid request\n");
318 if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
319 CERROR("lustre_ldlm: wrong packet type sent %d\n",
320 req->rq_reqmsg->type);
321 GOTO(out, rc = -EINVAL);
323 switch (req->rq_reqmsg->opc) {
325 CDEBUG(D_INODE, "callback\n");
326 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
327 rc = ldlm_handle_callback(req);
331 rc = ptlrpc_error(req->rq_svc, req);
338 RETURN(ptlrpc_error(req->rq_svc, req));
343 static int ldlm_iocontrol(long cmd, struct lustre_handle *conn, int len,
344 void *karg, void *uarg)
346 struct obd_device *obddev = class_conn2obd(conn);
347 struct ptlrpc_connection *connection;
351 if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
352 _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
353 CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
354 _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
358 OBD_ALLOC(obddev->u.ldlm.ldlm_client,
359 sizeof(*obddev->u.ldlm.ldlm_client));
360 ptlrpc_init_client(NULL, NULL,
361 LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
362 obddev->u.ldlm.ldlm_client);
363 connection = ptlrpc_uuid_to_connection("ldlm");
365 CERROR("No LDLM UUID found: assuming ldlm is local.\n");
368 case IOC_LDLM_TEST: {
369 err = ldlm_test(obddev, connection);
370 CERROR("-- done err %d\n", err);
374 GOTO(out, err = -EINVAL);
379 ptlrpc_put_connection(connection);
380 OBD_FREE(obddev->u.ldlm.ldlm_client,
381 sizeof(*obddev->u.ldlm.ldlm_client));
385 #define LDLM_NUM_THREADS 8
387 static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
389 struct ldlm_obd *ldlm = &obddev->u.ldlm;
396 ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL,
397 LDLM_REPLY_PORTAL, "self", ldlm_callback_handler);
398 if (!ldlm->ldlm_service) {
400 GOTO(out_dec, rc = -ENOMEM);
403 for (i = 0; i < LDLM_NUM_THREADS; i++) {
405 sprintf(name, "lustre_dlm_%2d", i);
406 rc = ptlrpc_start_thread(obddev, ldlm->ldlm_service, name);
408 CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
410 GOTO(out_thread, rc);
417 ptlrpc_stop_all_threads(ldlm->ldlm_service);
418 ptlrpc_unregister_service(ldlm->ldlm_service);
424 static int ldlm_cleanup(struct obd_device *obddev)
426 struct ldlm_obd *ldlm = &obddev->u.ldlm;
429 ptlrpc_stop_all_threads(ldlm->ldlm_service);
430 ptlrpc_unregister_service(ldlm->ldlm_service);
436 struct obd_ops ldlm_obd_ops = {
437 o_iocontrol: ldlm_iocontrol,
439 o_cleanup: ldlm_cleanup,
440 o_connect: class_connect,
441 o_disconnect: class_disconnect
445 static int __init ldlm_init(void)
447 int rc = class_register_type(&ldlm_obd_ops, OBD_LDLM_DEVICENAME);
451 ldlm_resource_slab = kmem_cache_create("ldlm_resources",
452 sizeof(struct ldlm_resource), 0,
453 SLAB_HWCACHE_ALIGN, NULL, NULL);
454 if (ldlm_resource_slab == NULL)
457 ldlm_lock_slab = kmem_cache_create("ldlm_locks",
458 sizeof(struct ldlm_lock), 0,
459 SLAB_HWCACHE_ALIGN, NULL, NULL);
460 if (ldlm_lock_slab == NULL) {
461 kmem_cache_destroy(ldlm_resource_slab);
468 static void __exit ldlm_exit(void)
470 class_unregister_type(OBD_LDLM_DEVICENAME);
471 if (kmem_cache_destroy(ldlm_resource_slab) != 0)
472 CERROR("couldn't free ldlm resource slab\n");
473 if (kmem_cache_destroy(ldlm_lock_slab) != 0)
474 CERROR("couldn't free ldlm lock slab\n");
477 EXPORT_SYMBOL(ldlm_completion_ast);
478 EXPORT_SYMBOL(ldlm_handle_enqueue);
479 EXPORT_SYMBOL(ldlm_handle_cancel);
480 EXPORT_SYMBOL(ldlm_handle_convert);
481 EXPORT_SYMBOL(ldlm_register_intent);
482 EXPORT_SYMBOL(ldlm_unregister_intent);
483 EXPORT_SYMBOL(ldlm_lockname);
484 EXPORT_SYMBOL(ldlm_typename);
485 EXPORT_SYMBOL(ldlm_handle2lock);
486 EXPORT_SYMBOL(ldlm_lock_match);
487 EXPORT_SYMBOL(ldlm_lock_addref);
488 EXPORT_SYMBOL(ldlm_lock_decref);
489 EXPORT_SYMBOL(ldlm_lock_change_resource);
490 EXPORT_SYMBOL(ldlm_cli_convert);
491 EXPORT_SYMBOL(ldlm_cli_enqueue);
492 EXPORT_SYMBOL(ldlm_cli_cancel);
493 EXPORT_SYMBOL(ldlm_match_or_enqueue);
494 EXPORT_SYMBOL(ldlm_it2str);
495 EXPORT_SYMBOL(ldlm_test);
496 EXPORT_SYMBOL(ldlm_lock_dump);
497 EXPORT_SYMBOL(ldlm_namespace_new);
498 EXPORT_SYMBOL(ldlm_namespace_free);
500 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
501 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
502 MODULE_LICENSE("GPL");
504 module_init(ldlm_init);
505 module_exit(ldlm_exit);