1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002 Cluster File Systems, Inc.
6 * This code is issued under the GNU General Public License.
7 * See the file COPYING in this distribution
9 * by Cluster File Systems, Inc.
13 #define DEBUG_SUBSYSTEM S_LDLM
15 #include <linux/module.h>
16 #include <linux/slab.h>
17 #include <linux/lustre_dlm.h>
19 extern kmem_cache_t *ldlm_resource_slab;
20 extern kmem_cache_t *ldlm_lock_slab;
21 extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
22 extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
24 static int ldlm_handle_enqueue(struct ptlrpc_request *req)
26 struct obd_device *obddev = req->rq_export->exp_obd;
27 struct ldlm_reply *dlm_rep;
28 struct ldlm_request *dlm_req;
29 int rc, size = sizeof(*dlm_rep), cookielen = 0;
32 struct ldlm_lock *lock = NULL;
36 LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
38 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
39 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_MDSINTENT) {
40 /* In this case, the reply buffer is allocated deep in
41 * local_lock_enqueue by the policy function. */
43 cookielen = sizeof(*req);
45 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
48 CERROR("out of memory\n");
51 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) {
52 cookie = &dlm_req->lock_desc.l_extent;
53 cookielen = sizeof(struct ldlm_extent);
57 lock = ldlm_lock_create(obddev->obd_namespace,
58 &dlm_req->lock_handle2,
59 dlm_req->lock_desc.l_resource.lr_name,
60 dlm_req->lock_desc.l_resource.lr_type,
61 dlm_req->lock_desc.l_req_mode, NULL, 0);
63 GOTO(out, err = -ENOMEM);
65 memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
66 sizeof(lock->l_remote_handle));
67 LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
69 flags = dlm_req->lock_flags;
70 err = ldlm_lock_enqueue(lock, cookie, cookielen, &flags,
71 ldlm_server_ast, ldlm_server_ast);
75 dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
76 dlm_rep->lock_flags = flags;
78 ldlm_lock2handle(lock, &dlm_rep->lock_handle);
79 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
80 memcpy(&dlm_rep->lock_extent, &lock->l_extent,
81 sizeof(lock->l_extent));
82 if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED)
83 memcpy(dlm_rep->lock_resource_name, lock->l_resource->lr_name,
84 sizeof(dlm_rep->lock_resource_name));
86 lock->l_connection = ptlrpc_connection_addref(req->rq_connection);
90 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
94 if (ptlrpc_reply(req->rq_svc, req))
99 ldlm_reprocess_all(lock->l_resource);
102 LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p)", lock);
107 static int ldlm_handle_convert(struct ptlrpc_request *req)
109 struct ldlm_request *dlm_req;
110 struct ldlm_reply *dlm_rep;
111 struct ldlm_lock *lock;
112 int rc, size = sizeof(*dlm_rep);
115 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
117 CERROR("out of memory\n");
120 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
121 dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
122 dlm_rep->lock_flags = dlm_req->lock_flags;
124 lock = ldlm_handle2lock(&dlm_req->lock_handle1);
126 req->rq_status = EINVAL;
128 LDLM_DEBUG(lock, "server-side convert handler START");
129 ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
130 &dlm_rep->lock_flags);
133 if (ptlrpc_reply(req->rq_svc, req) != 0)
137 ldlm_reprocess_all(lock->l_resource);
138 LDLM_DEBUG(lock, "server-side convert handler END");
141 LDLM_DEBUG_NOLOCK("server-side convert handler END");
146 static int ldlm_handle_cancel(struct ptlrpc_request *req)
148 struct ldlm_request *dlm_req;
149 struct ldlm_lock *lock;
153 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
155 CERROR("out of memory\n");
158 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
160 lock = ldlm_handle2lock(&dlm_req->lock_handle1);
162 req->rq_status = ESTALE;
164 LDLM_DEBUG(lock, "server-side cancel handler START");
165 ldlm_lock_cancel(lock);
169 if (ptlrpc_reply(req->rq_svc, req) != 0)
173 ldlm_reprocess_all(lock->l_resource);
174 LDLM_DEBUG(lock, "server-side cancel handler END");
177 LDLM_DEBUG_NOLOCK("server-side cancel handler END (lock %p)",
183 static int ldlm_handle_callback(struct ptlrpc_request *req)
185 struct ldlm_request *dlm_req;
186 struct ldlm_lock_desc *descp = NULL;
187 struct ldlm_lock *lock;
188 __u64 is_blocking_ast = 0;
192 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
194 CERROR("out of memory\n");
197 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
199 /* We must send the reply first, so that the thread is free to handle
200 * any requests made in common_callback() */
201 rc = ptlrpc_reply(req->rq_svc, req);
205 lock = ldlm_handle2lock(&dlm_req->lock_handle1);
207 CERROR("callback on lock %Lx - lock disappeared\n",
208 dlm_req->lock_handle1.addr);
212 /* check if this is a blocking AST */
213 if (dlm_req->lock_desc.l_req_mode !=
214 dlm_req->lock_desc.l_granted_mode) {
215 descp = &dlm_req->lock_desc;
219 LDLM_DEBUG(lock, "client %s callback handler START",
220 is_blocking_ast ? "blocked" : "completion");
224 l_lock(&lock->l_resource->lr_namespace->ns_lock);
225 lock->l_flags |= LDLM_FL_CBPENDING;
226 do_ast = (!lock->l_readers && !lock->l_writers);
227 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
230 LDLM_DEBUG(lock, "already unused, calling "
231 "callback (%p)", lock->l_blocking_ast);
232 if (lock->l_blocking_ast != NULL) {
233 struct lustre_handle lockh;
234 ldlm_lock2handle(lock, &lockh);
235 lock->l_blocking_ast(&lockh, descp,
240 LDLM_DEBUG(lock, "Lock still has references, will be"
245 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
247 l_lock(&lock->l_resource->lr_namespace->ns_lock);
248 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
250 /* If we receive the completion AST before the actual enqueue
251 * returned, then we might need to switch resources. */
252 if (memcmp(dlm_req->lock_desc.l_resource.lr_name,
253 lock->l_resource->lr_name,
254 sizeof(__u64) * RES_NAME_SIZE) != 0) {
255 ldlm_lock_change_resource(lock, dlm_req->lock_desc.l_resource.lr_name);
256 LDLM_DEBUG(lock, "completion AST, new resource");
258 lock->l_resource->lr_tmp = &rpc_list;
259 ldlm_resource_unlink_lock(lock);
260 ldlm_grant_lock(lock);
261 /* FIXME: we want any completion function, not just wake_up */
262 wake_up(&lock->l_waitq);
263 lock->l_resource->lr_tmp = NULL;
264 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
267 ldlm_run_ast_work(&rpc_list);
270 LDLM_DEBUG_NOLOCK("client %s callback handler END (lock %p)",
271 is_blocking_ast ? "blocked" : "completion", lock);
275 static int ldlm_handle(struct ptlrpc_request *req)
280 rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
282 CERROR("lustre_ldlm: Invalid request\n");
286 if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
287 CERROR("lustre_ldlm: wrong packet type sent %d\n",
288 req->rq_reqmsg->type);
289 GOTO(out, rc = -EINVAL);
292 if (!req->rq_export && req->rq_reqmsg->opc == LDLM_ENQUEUE) {
293 CERROR("No export handle for enqueue request.\n");
294 GOTO(out, rc = -ENOTCONN);
297 switch (req->rq_reqmsg->opc) {
299 CDEBUG(D_INODE, "enqueue\n");
300 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
301 rc = ldlm_handle_enqueue(req);
305 CDEBUG(D_INODE, "convert\n");
306 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
307 rc = ldlm_handle_convert(req);
311 CDEBUG(D_INODE, "cancel\n");
312 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
313 rc = ldlm_handle_cancel(req);
317 CDEBUG(D_INODE, "callback\n");
318 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
319 rc = ldlm_handle_callback(req);
323 rc = ptlrpc_error(req->rq_svc, req);
330 RETURN(ptlrpc_error(req->rq_svc, req));
334 static int ldlm_iocontrol(long cmd, struct lustre_handle *conn, int len,
335 void *karg, void *uarg)
337 struct obd_device *obddev = class_conn2obd(conn);
338 struct ptlrpc_connection *connection;
342 if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
343 _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
344 CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
345 _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
349 OBD_ALLOC(obddev->u.ldlm.ldlm_client,
350 sizeof(*obddev->u.ldlm.ldlm_client));
351 ptlrpc_init_client(NULL, NULL,
352 LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
353 obddev->u.ldlm.ldlm_client);
354 connection = ptlrpc_uuid_to_connection("ldlm");
356 CERROR("No LDLM UUID found: assuming ldlm is local.\n");
359 case IOC_LDLM_TEST: {
360 err = ldlm_test(obddev, connection);
361 CERROR("-- done err %d\n", err);
365 GOTO(out, err = -EINVAL);
370 ptlrpc_put_connection(connection);
371 OBD_FREE(obddev->u.ldlm.ldlm_client,
372 sizeof(*obddev->u.ldlm.ldlm_client));
376 #define LDLM_NUM_THREADS 8
378 static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
380 struct ldlm_obd *ldlm = &obddev->u.ldlm;
387 ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL,
388 LDLM_REPLY_PORTAL, "self", ldlm_handle);
389 if (!ldlm->ldlm_service) {
391 GOTO(out_dec, rc = -ENOMEM);
394 if (mds_reint_p == NULL)
395 mds_reint_p = inter_module_get_request("mds_reint", "mds");
396 if (IS_ERR(mds_reint_p)) {
397 CERROR("MDSINTENT locks require the MDS module.\n");
398 GOTO(out_dec, rc = -EINVAL);
400 if (mds_getattr_name_p == NULL)
401 mds_getattr_name_p = inter_module_get_request
402 ("mds_getattr_name", "mds");
403 if (IS_ERR(mds_getattr_name_p)) {
404 CERROR("MDSINTENT locks require the MDS module.\n");
405 GOTO(out_dec, rc = -EINVAL);
408 for (i = 0; i < LDLM_NUM_THREADS; i++) {
409 rc = ptlrpc_start_thread(obddev, ldlm->ldlm_service,
412 CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
414 GOTO(out_thread, rc);
421 ptlrpc_stop_all_threads(ldlm->ldlm_service);
422 ptlrpc_unregister_service(ldlm->ldlm_service);
424 if (mds_reint_p != NULL)
425 inter_module_put("mds_reint");
426 if (mds_getattr_name_p != NULL)
427 inter_module_put("mds_getattr_name");
432 static int ldlm_cleanup(struct obd_device *obddev)
434 struct ldlm_obd *ldlm = &obddev->u.ldlm;
437 ptlrpc_stop_all_threads(ldlm->ldlm_service);
438 ptlrpc_unregister_service(ldlm->ldlm_service);
440 if (mds_reint_p != NULL)
441 inter_module_put("mds_reint");
442 if (mds_getattr_name_p != NULL)
443 inter_module_put("mds_getattr_name");
449 struct obd_ops ldlm_obd_ops = {
450 o_iocontrol: ldlm_iocontrol,
452 o_cleanup: ldlm_cleanup,
453 o_connect: class_connect,
454 o_disconnect: class_disconnect
458 static int __init ldlm_init(void)
460 int rc = class_register_type(&ldlm_obd_ops, OBD_LDLM_DEVICENAME);
464 ldlm_resource_slab = kmem_cache_create("ldlm_resources",
465 sizeof(struct ldlm_resource), 0,
466 SLAB_HWCACHE_ALIGN, NULL, NULL);
467 if (ldlm_resource_slab == NULL)
470 ldlm_lock_slab = kmem_cache_create("ldlm_locks",
471 sizeof(struct ldlm_lock), 0,
472 SLAB_HWCACHE_ALIGN, NULL, NULL);
473 if (ldlm_lock_slab == NULL) {
474 kmem_cache_destroy(ldlm_resource_slab);
481 static void __exit ldlm_exit(void)
483 class_unregister_type(OBD_LDLM_DEVICENAME);
484 if (kmem_cache_destroy(ldlm_resource_slab) != 0)
485 CERROR("couldn't free ldlm resource slab\n");
486 if (kmem_cache_destroy(ldlm_lock_slab) != 0)
487 CERROR("couldn't free ldlm lock slab\n");
490 EXPORT_SYMBOL(ldlm_lockname);
491 EXPORT_SYMBOL(ldlm_typename);
492 EXPORT_SYMBOL(ldlm_handle2lock);
493 EXPORT_SYMBOL(ldlm_lock_match);
494 EXPORT_SYMBOL(ldlm_lock_addref);
495 EXPORT_SYMBOL(ldlm_lock_decref);
496 EXPORT_SYMBOL(ldlm_cli_convert);
497 EXPORT_SYMBOL(ldlm_cli_enqueue);
498 EXPORT_SYMBOL(ldlm_cli_cancel);
499 EXPORT_SYMBOL(ldlm_match_or_enqueue);
500 EXPORT_SYMBOL(ldlm_test);
501 EXPORT_SYMBOL(ldlm_lock_dump);
502 EXPORT_SYMBOL(ldlm_namespace_new);
503 EXPORT_SYMBOL(ldlm_namespace_free);
505 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
506 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
507 MODULE_LICENSE("GPL");
509 module_init(ldlm_init);
510 module_exit(ldlm_exit);