Whamcloud - gitweb
- comment out the noisy get/put LDLM_DEBUGs; I'll remove them when I'm sure
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #define EXPORT_SYMTAB
25 #define DEBUG_SUBSYSTEM S_LDLM
26
27 #include <linux/module.h>
28 #include <linux/slab.h>
29 #include <linux/lustre_dlm.h>
30
31 extern kmem_cache_t *ldlm_resource_slab;
32 extern kmem_cache_t *ldlm_lock_slab;
33 extern struct list_head ldlm_namespace_list;
34 extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
35 extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
36
37 static int ldlm_server_blocking_ast(struct ldlm_lock *lock,
38                                     struct ldlm_lock_desc *desc,
39                                     void *data, __u32 data_len)
40 {
41         struct ldlm_request *body;
42         struct ptlrpc_request *req;
43         struct ptlrpc_client *cl;
44         int rc = 0, size = sizeof(*body);
45         ENTRY;
46
47         cl = &lock->l_resource->lr_namespace->ns_rpc_client;
48         req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_BL_CALLBACK, 1,
49                               &size, NULL);
50         if (!req)
51                 RETURN(-ENOMEM);
52
53         body = lustre_msg_buf(req->rq_reqmsg, 0);
54         memcpy(&body->lock_handle1, &lock->l_remote_handle,
55                sizeof(body->lock_handle1));
56         memcpy(&body->lock_desc, desc, sizeof(*desc));
57
58         LDLM_DEBUG(lock, "server preparing blocking AST");
59         req->rq_replen = lustre_msg_size(0, NULL);
60
61         rc = ptlrpc_queue_wait(req);
62         rc = ptlrpc_check_status(req, rc);
63         ptlrpc_free_req(req);
64
65         RETURN(rc);
66 }
67
68 static int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags)
69 {
70         struct ldlm_request *body;
71         struct ptlrpc_request *req;
72         struct ptlrpc_client *cl;
73         int rc = 0, size = sizeof(*body);
74         ENTRY;
75
76         if (lock == NULL) {
77                 LBUG();
78                 RETURN(-EINVAL);
79         }
80
81         cl = &lock->l_resource->lr_namespace->ns_rpc_client;
82         req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CP_CALLBACK, 1,
83                               &size, NULL);
84         if (!req)
85                 RETURN(-ENOMEM);
86
87         body = lustre_msg_buf(req->rq_reqmsg, 0);
88         memcpy(&body->lock_handle1, &lock->l_remote_handle,
89                sizeof(body->lock_handle1));
90         body->lock_flags = flags;
91         ldlm_lock2desc(lock, &body->lock_desc);
92
93         LDLM_DEBUG(lock, "server preparing completion AST");
94         req->rq_replen = lustre_msg_size(0, NULL);
95
96         rc = ptlrpc_queue_wait(req);
97         rc = ptlrpc_check_status(req, rc);
98         ptlrpc_free_req(req);
99         RETURN(rc);
100 }
101
102 int ldlm_handle_enqueue(struct ptlrpc_request *req)
103 {
104         struct obd_device *obddev = req->rq_export->exp_obd;
105         struct ldlm_reply *dlm_rep;
106         struct ldlm_request *dlm_req;
107         int rc, size = sizeof(*dlm_rep), cookielen = 0;
108         __u32 flags;
109         ldlm_error_t err;
110         struct ldlm_lock *lock = NULL;
111         void *cookie = NULL;
112         ENTRY;
113
114         LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
115
116         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
117         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_MDSINTENT) {
118                 /* In this case, the reply buffer is allocated deep in
119                  * local_lock_enqueue by the policy function. */
120                 cookie = req;
121                 cookielen = sizeof(*req);
122         } else {
123                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
124                                      &req->rq_repmsg);
125                 if (rc) {
126                         CERROR("out of memory\n");
127                         RETURN(-ENOMEM);
128                 }
129                 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) {
130                         cookie = &dlm_req->lock_desc.l_extent;
131                         cookielen = sizeof(struct ldlm_extent);
132                 }
133         }
134
135         lock = ldlm_lock_create(obddev->obd_namespace,
136                                 &dlm_req->lock_handle2,
137                                 dlm_req->lock_desc.l_resource.lr_name,
138                                 dlm_req->lock_desc.l_resource.lr_type,
139                                 dlm_req->lock_desc.l_req_mode, NULL, 0);
140         if (!lock)
141                 GOTO(out, err = -ENOMEM);
142
143         memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
144                sizeof(lock->l_remote_handle));
145         LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
146
147         flags = dlm_req->lock_flags;
148         err = ldlm_lock_enqueue(lock, cookie, cookielen, &flags,
149                                 ldlm_server_completion_ast,
150                                 ldlm_server_blocking_ast);
151         if (err != ELDLM_OK)
152                 GOTO(out, err);
153
154         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
155         dlm_rep->lock_flags = flags;
156
157         ldlm_lock2handle(lock, &dlm_rep->lock_handle);
158         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
159                 memcpy(&dlm_rep->lock_extent, &lock->l_extent,
160                        sizeof(lock->l_extent));
161         if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED) {
162                 memcpy(dlm_rep->lock_resource_name, lock->l_resource->lr_name,
163                        sizeof(dlm_rep->lock_resource_name));
164                 dlm_rep->lock_mode = lock->l_req_mode;
165         }
166
167         lock->l_connection = ptlrpc_connection_addref(req->rq_connection);
168         EXIT;
169  out:
170         if (lock)
171                 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
172                            "(err=%d)", err);
173         req->rq_status = err;
174
175         if (ptlrpc_reply(req->rq_svc, req))
176                 LBUG();
177
178         if (lock) {
179                 if (!err)
180                         ldlm_reprocess_all(lock->l_resource);
181                 LDLM_LOCK_PUT(lock);
182         }
183         LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p)", lock);
184
185         return 0;
186 }
187
188 int ldlm_handle_convert(struct ptlrpc_request *req)
189 {
190         struct ldlm_request *dlm_req;
191         struct ldlm_reply *dlm_rep;
192         struct ldlm_lock *lock;
193         int rc, size = sizeof(*dlm_rep);
194         ENTRY;
195
196         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
197         if (rc) {
198                 CERROR("out of memory\n");
199                 RETURN(-ENOMEM);
200         }
201         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
202         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
203         dlm_rep->lock_flags = dlm_req->lock_flags;
204
205         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
206         if (!lock) {
207                 req->rq_status = EINVAL;
208         } else {
209                 LDLM_DEBUG(lock, "server-side convert handler START");
210                 ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
211                                   &dlm_rep->lock_flags);
212                 req->rq_status = 0;
213         }
214         if (ptlrpc_reply(req->rq_svc, req) != 0)
215                 LBUG();
216
217         if (lock) {
218                 ldlm_reprocess_all(lock->l_resource);
219                 LDLM_DEBUG(lock, "server-side convert handler END");
220                 LDLM_LOCK_PUT(lock);
221         } else
222                 LDLM_DEBUG_NOLOCK("server-side convert handler END");
223
224         RETURN(0);
225 }
226
227 int ldlm_handle_cancel(struct ptlrpc_request *req)
228 {
229         struct ldlm_request *dlm_req;
230         struct ldlm_lock *lock;
231         int rc;
232         ENTRY;
233
234         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
235         if (rc) {
236                 CERROR("out of memory\n");
237                 RETURN(-ENOMEM);
238         }
239         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
240
241         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
242         if (!lock) {
243                 LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock (lock "
244                                   "%p)", (void *)(unsigned long)
245                                   dlm_req->lock_handle1.addr);
246                 req->rq_status = ESTALE;
247         } else {
248                 LDLM_DEBUG(lock, "server-side cancel handler START");
249                 ldlm_lock_cancel(lock);
250                 req->rq_status = 0;
251         }
252
253         if (ptlrpc_reply(req->rq_svc, req) != 0)
254                 LBUG();
255
256         if (lock) {
257                 ldlm_reprocess_all(lock->l_resource);
258                 LDLM_DEBUG(lock, "server-side cancel handler END");
259                 LDLM_LOCK_PUT(lock);
260         } 
261
262         RETURN(0);
263 }
264
265 static int ldlm_handle_bl_callback(struct ptlrpc_request *req)
266 {
267         struct ldlm_request *dlm_req;
268         struct ldlm_lock *lock;
269         int rc, do_ast;
270         ENTRY;
271
272         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
273         if (rc)
274                 RETURN(-ENOMEM);
275         rc = ptlrpc_reply(req->rq_svc, req);
276         if (rc)
277                 RETURN(rc);
278
279         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
280
281         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
282         if (!lock) {
283                 CERROR("blocking callback on lock %Lx - lock disappeared\n",
284                        dlm_req->lock_handle1.addr);
285                 RETURN(0);
286         }
287
288         LDLM_DEBUG(lock, "client blocking AST callback handler START");
289
290         l_lock(&lock->l_resource->lr_namespace->ns_lock);
291         lock->l_flags |= LDLM_FL_CBPENDING;
292         do_ast = (!lock->l_readers && !lock->l_writers);
293         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
294
295         if (do_ast) {
296                 LDLM_DEBUG(lock, "already unused, calling "
297                            "callback (%p)", lock->l_blocking_ast);
298                 if (lock->l_blocking_ast != NULL) {
299                         lock->l_blocking_ast(lock, &dlm_req->lock_desc,
300                                              lock->l_data, lock->l_data_len);
301                 }
302         } else
303                 LDLM_DEBUG(lock, "Lock still has references, will be"
304                            " cancelled later");
305
306         LDLM_DEBUG(lock, "client blocking callback handler END");
307         LDLM_LOCK_PUT(lock);
308         RETURN(0);
309 }
310
311 static int ldlm_handle_cp_callback(struct ptlrpc_request *req)
312 {
313         struct list_head ast_list = LIST_HEAD_INIT(ast_list);
314         struct ldlm_request *dlm_req;
315         struct ldlm_lock *lock;
316         int rc;
317         ENTRY;
318
319         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
320         if (rc)
321                 RETURN(-ENOMEM);
322         rc = ptlrpc_reply(req->rq_svc, req);
323         if (rc)
324                 RETURN(rc);
325
326         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
327
328         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
329         if (!lock) {
330                 CERROR("completion callback on lock %Lx - lock disappeared\n",
331                        dlm_req->lock_handle1.addr);
332                 RETURN(0);
333         }
334
335         LDLM_DEBUG(lock, "client completion callback handler START");
336
337         l_lock(&lock->l_resource->lr_namespace->ns_lock);
338
339         /* If we receive the completion AST before the actual enqueue returned,
340          * then we might need to switch resources or lock modes. */
341         if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
342                 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
343                 LDLM_DEBUG(lock, "completion AST, new lock mode");
344         }
345         ldlm_resource_unlink_lock(lock);
346         if (memcmp(dlm_req->lock_desc.l_resource.lr_name,
347                    lock->l_resource->lr_name,
348                    sizeof(__u64) * RES_NAME_SIZE) != 0) {
349                 ldlm_lock_change_resource(lock, dlm_req->lock_desc.l_resource.lr_name);
350                 LDLM_DEBUG(lock, "completion AST, new resource");
351         }
352         lock->l_resource->lr_tmp = &ast_list;
353         ldlm_grant_lock(lock);
354         lock->l_resource->lr_tmp = NULL;
355         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
356         LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
357         LDLM_LOCK_PUT(lock);
358
359         ldlm_run_ast_work(&ast_list);
360
361         LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
362                           lock);
363         RETURN(0);
364 }
365
366 static int ldlm_callback_handler(struct ptlrpc_request *req)
367 {
368         int rc;
369         ENTRY;
370
371         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
372         if (rc) {
373                 CERROR("lustre_ldlm: Invalid request\n");
374                 GOTO(out, rc);
375         }
376
377         if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
378                 CERROR("lustre_ldlm: wrong packet type sent %d\n",
379                        req->rq_reqmsg->type);
380                 GOTO(out, rc = -EINVAL);
381         }
382         switch (req->rq_reqmsg->opc) {
383         case LDLM_BL_CALLBACK:
384                 CDEBUG(D_INODE, "blocking ast\n");
385                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
386                 rc = ldlm_handle_bl_callback(req);
387                 break;
388         case LDLM_CP_CALLBACK:
389                 CDEBUG(D_INODE, "completion ast\n");
390                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
391                 rc = ldlm_handle_cp_callback(req);
392                 break;
393
394         default:
395                 rc = ptlrpc_error(req->rq_svc, req);
396                 RETURN(rc);
397         }
398
399         EXIT;
400 out:
401         if (rc)
402                 RETURN(ptlrpc_error(req->rq_svc, req));
403         return 0;
404 }
405
406
407 static int ldlm_iocontrol(long cmd, struct lustre_handle *conn, int len,
408                           void *karg, void *uarg)
409 {
410         struct obd_device *obddev = class_conn2obd(conn);
411         struct ptlrpc_connection *connection;
412         int err = 0;
413         ENTRY;
414
415         if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
416             _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
417                 CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
418                        _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
419                 RETURN(-EINVAL);
420         }
421
422         OBD_ALLOC(obddev->u.ldlm.ldlm_client,
423                   sizeof(*obddev->u.ldlm.ldlm_client));
424         ptlrpc_init_client(NULL, NULL,
425                            LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
426                            obddev->u.ldlm.ldlm_client);
427         connection = ptlrpc_uuid_to_connection("ldlm");
428         if (!connection)
429                 CERROR("No LDLM UUID found: assuming ldlm is local.\n");
430
431         switch (cmd) {
432         case IOC_LDLM_TEST:
433                 err = ldlm_test(obddev, conn);
434                 CERROR("-- done err %d\n", err);
435                 GOTO(out, err);
436         case IOC_LDLM_DUMP:
437                 ldlm_dump_all_namespaces();
438                 GOTO(out, err);
439         default:
440                 GOTO(out, err = -EINVAL);
441         }
442
443  out:
444         if (connection)
445                 ptlrpc_put_connection(connection);
446         OBD_FREE(obddev->u.ldlm.ldlm_client,
447                  sizeof(*obddev->u.ldlm.ldlm_client));
448         return err;
449 }
450
451 #define LDLM_NUM_THREADS        8
452
453 static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
454 {
455         struct ldlm_obd *ldlm = &obddev->u.ldlm;
456         int rc, i;
457         ENTRY;
458
459         MOD_INC_USE_COUNT;
460         rc = ldlm_proc_setup(obddev);
461         if (rc != 0)
462                 GOTO(out_dec, rc);
463
464         ldlm->ldlm_service = ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL,
465                                              LDLM_REPLY_PORTAL, "self",
466                                              ldlm_callback_handler);
467         if (!ldlm->ldlm_service)
468                 GOTO(out_proc, rc = -ENOMEM);
469
470         for (i = 0; i < LDLM_NUM_THREADS; i++) {
471                 char name[32];
472                 sprintf(name, "lustre_dlm_%02d", i);
473                 rc = ptlrpc_start_thread(obddev, ldlm->ldlm_service, name);
474                 if (rc) {
475                         CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
476                         LBUG();
477                         GOTO(out_thread, rc);
478                 }
479         }
480
481         RETURN(0);
482
483  out_thread:
484         ptlrpc_stop_all_threads(ldlm->ldlm_service);
485         ptlrpc_unregister_service(ldlm->ldlm_service);
486  out_proc:
487         ldlm_proc_cleanup(obddev);
488  out_dec:
489         MOD_DEC_USE_COUNT;
490         return rc;
491 }
492
493 static int ldlm_cleanup(struct obd_device *obddev)
494 {
495         struct ldlm_obd *ldlm = &obddev->u.ldlm;
496         ENTRY;
497
498         if (!list_empty(&ldlm_namespace_list)) {
499                 CERROR("ldlm still has namespaces; clean these up first.\n");
500                 RETURN(-EBUSY);
501         }
502
503         ptlrpc_stop_all_threads(ldlm->ldlm_service);
504         ptlrpc_unregister_service(ldlm->ldlm_service);
505         ldlm_proc_cleanup(obddev);
506
507         MOD_DEC_USE_COUNT;
508         RETURN(0);
509 }
510
511 struct obd_ops ldlm_obd_ops = {
512         o_iocontrol:   ldlm_iocontrol,
513         o_setup:       ldlm_setup,
514         o_cleanup:     ldlm_cleanup,
515         o_connect:     class_connect,
516         o_disconnect:  class_disconnect
517 };
518
519 static int __init ldlm_init(void)
520 {
521         int rc = class_register_type(&ldlm_obd_ops, OBD_LDLM_DEVICENAME);
522         if (rc != 0)
523                 return rc;
524
525         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
526                                                sizeof(struct ldlm_resource), 0,
527                                                SLAB_HWCACHE_ALIGN, NULL, NULL);
528         if (ldlm_resource_slab == NULL)
529                 return -ENOMEM;
530
531         ldlm_lock_slab = kmem_cache_create("ldlm_locks",
532                                            sizeof(struct ldlm_lock), 0,
533                                            SLAB_HWCACHE_ALIGN, NULL, NULL);
534         if (ldlm_lock_slab == NULL) {
535                 kmem_cache_destroy(ldlm_resource_slab);
536                 return -ENOMEM;
537         }
538
539         return 0;
540 }
541
542 static void __exit ldlm_exit(void)
543 {
544         class_unregister_type(OBD_LDLM_DEVICENAME);
545         if (kmem_cache_destroy(ldlm_resource_slab) != 0)
546                 CERROR("couldn't free ldlm resource slab\n");
547         if (kmem_cache_destroy(ldlm_lock_slab) != 0)
548                 CERROR("couldn't free ldlm lock slab\n");
549 }
550
551 EXPORT_SYMBOL(ldlm_completion_ast);
552 EXPORT_SYMBOL(ldlm_handle_enqueue);
553 EXPORT_SYMBOL(ldlm_handle_cancel);
554 EXPORT_SYMBOL(ldlm_handle_convert);
555 EXPORT_SYMBOL(ldlm_register_intent);
556 EXPORT_SYMBOL(ldlm_unregister_intent); 
557 EXPORT_SYMBOL(ldlm_lockname);
558 EXPORT_SYMBOL(ldlm_typename);
559 EXPORT_SYMBOL(ldlm_handle2lock);
560 EXPORT_SYMBOL(ldlm_lock2handle);
561 EXPORT_SYMBOL(ldlm_lock_match);
562 EXPORT_SYMBOL(ldlm_lock_addref);
563 EXPORT_SYMBOL(ldlm_lock_decref);
564 EXPORT_SYMBOL(ldlm_lock_change_resource);
565 EXPORT_SYMBOL(ldlm_cli_convert);
566 EXPORT_SYMBOL(ldlm_cli_enqueue);
567 EXPORT_SYMBOL(ldlm_cli_cancel);
568 EXPORT_SYMBOL(ldlm_match_or_enqueue);
569 EXPORT_SYMBOL(ldlm_it2str);
570 EXPORT_SYMBOL(ldlm_test);
571 EXPORT_SYMBOL(ldlm_regression_start);
572 EXPORT_SYMBOL(ldlm_regression_stop);
573 EXPORT_SYMBOL(ldlm_lock_dump);
574 EXPORT_SYMBOL(ldlm_namespace_new);
575 EXPORT_SYMBOL(ldlm_namespace_free);
576
577 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
578 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
579 MODULE_LICENSE("GPL");
580
581 module_init(ldlm_init);
582 module_exit(ldlm_exit);