Whamcloud - gitweb
- Don't shutdown the LDLM if other modules still have namespaces
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  */
11
12 #define EXPORT_SYMTAB
13 #define DEBUG_SUBSYSTEM S_LDLM
14
15 #include <linux/module.h>
16 #include <linux/slab.h>
17 #include <linux/lustre_dlm.h>
18
19 extern kmem_cache_t *ldlm_resource_slab;
20 extern kmem_cache_t *ldlm_lock_slab;
21 extern struct list_head ldlm_namespace_list;
22 extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
23 extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
24
25 static int ldlm_server_blocking_ast(struct ldlm_lock *lock,
26                                     struct ldlm_lock_desc *desc,
27                                     void *data, __u32 data_len)
28 {
29         struct ldlm_request *body;
30         struct ptlrpc_request *req;
31         struct ptlrpc_client *cl;
32         int rc = 0, size = sizeof(*body);
33         ENTRY;
34
35         cl = &lock->l_resource->lr_namespace->ns_rpc_client;
36         req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_BL_CALLBACK, 1,
37                               &size, NULL);
38         if (!req)
39                 RETURN(-ENOMEM);
40
41         body = lustre_msg_buf(req->rq_reqmsg, 0);
42         memcpy(&body->lock_handle1, &lock->l_remote_handle,
43                sizeof(body->lock_handle1));
44         memcpy(&body->lock_desc, desc, sizeof(*desc));
45
46         LDLM_DEBUG(lock, "server preparing blocking AST");
47         req->rq_replen = lustre_msg_size(0, NULL);
48
49         rc = ptlrpc_queue_wait(req);
50         rc = ptlrpc_check_status(req, rc);
51         ptlrpc_free_req(req);
52
53         RETURN(rc);
54 }
55
56 static int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags)
57 {
58         struct ldlm_request *body;
59         struct ptlrpc_request *req;
60         struct ptlrpc_client *cl;
61         int rc = 0, size = sizeof(*body);
62         ENTRY;
63
64         if (lock == NULL) {
65                 LBUG();
66                 RETURN(-EINVAL);
67         }
68
69         cl = &lock->l_resource->lr_namespace->ns_rpc_client;
70         req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CP_CALLBACK, 1,
71                               &size, NULL);
72         if (!req)
73                 RETURN(-ENOMEM);
74
75         body = lustre_msg_buf(req->rq_reqmsg, 0);
76         memcpy(&body->lock_handle1, &lock->l_remote_handle,
77                sizeof(body->lock_handle1));
78         body->lock_flags = flags;
79         ldlm_lock2desc(lock, &body->lock_desc);
80
81         LDLM_DEBUG(lock, "server preparing completion AST");
82         req->rq_replen = lustre_msg_size(0, NULL);
83
84         rc = ptlrpc_queue_wait(req);
85         rc = ptlrpc_check_status(req, rc);
86         ptlrpc_free_req(req);
87         RETURN(rc);
88 }
89
90 int ldlm_handle_enqueue(struct ptlrpc_request *req)
91 {
92         struct obd_device *obddev = req->rq_export->exp_obd;
93         struct ldlm_reply *dlm_rep;
94         struct ldlm_request *dlm_req;
95         int rc, size = sizeof(*dlm_rep), cookielen = 0;
96         __u32 flags;
97         ldlm_error_t err;
98         struct ldlm_lock *lock = NULL;
99         void *cookie = NULL;
100         ENTRY;
101
102         LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
103
104         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
105         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_MDSINTENT) {
106                 /* In this case, the reply buffer is allocated deep in
107                  * local_lock_enqueue by the policy function. */
108                 cookie = req;
109                 cookielen = sizeof(*req);
110         } else {
111                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
112                                      &req->rq_repmsg);
113                 if (rc) {
114                         CERROR("out of memory\n");
115                         RETURN(-ENOMEM);
116                 }
117                 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) {
118                         cookie = &dlm_req->lock_desc.l_extent;
119                         cookielen = sizeof(struct ldlm_extent);
120                 }
121         }
122
123         lock = ldlm_lock_create(obddev->obd_namespace,
124                                 &dlm_req->lock_handle2,
125                                 dlm_req->lock_desc.l_resource.lr_name,
126                                 dlm_req->lock_desc.l_resource.lr_type,
127                                 dlm_req->lock_desc.l_req_mode, NULL, 0);
128         if (!lock)
129                 GOTO(out, err = -ENOMEM);
130
131         memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
132                sizeof(lock->l_remote_handle));
133         LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
134
135         flags = dlm_req->lock_flags;
136         err = ldlm_lock_enqueue(lock, cookie, cookielen, &flags,
137                                 ldlm_server_completion_ast,
138                                 ldlm_server_blocking_ast);
139         if (err != ELDLM_OK)
140                 GOTO(out, err);
141
142         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
143         dlm_rep->lock_flags = flags;
144
145         ldlm_lock2handle(lock, &dlm_rep->lock_handle);
146         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
147                 memcpy(&dlm_rep->lock_extent, &lock->l_extent,
148                        sizeof(lock->l_extent));
149         if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED) {
150                 memcpy(dlm_rep->lock_resource_name, lock->l_resource->lr_name,
151                        sizeof(dlm_rep->lock_resource_name));
152                 dlm_rep->lock_mode = lock->l_req_mode;
153         }
154
155         lock->l_connection = ptlrpc_connection_addref(req->rq_connection);
156         EXIT;
157  out:
158         if (lock)
159                 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
160                            "(err=%d)", err);
161         req->rq_status = err;
162
163         if (ptlrpc_reply(req->rq_svc, req))
164                 LBUG();
165
166         if (lock) {
167                 if (!err)
168                         ldlm_reprocess_all(lock->l_resource);
169                 LDLM_LOCK_PUT(lock);
170         }
171         LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p)", lock);
172
173         return 0;
174 }
175
176 int ldlm_handle_convert(struct ptlrpc_request *req)
177 {
178         struct ldlm_request *dlm_req;
179         struct ldlm_reply *dlm_rep;
180         struct ldlm_lock *lock;
181         int rc, size = sizeof(*dlm_rep);
182         ENTRY;
183
184         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
185         if (rc) {
186                 CERROR("out of memory\n");
187                 RETURN(-ENOMEM);
188         }
189         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
190         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
191         dlm_rep->lock_flags = dlm_req->lock_flags;
192
193         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
194         if (!lock) {
195                 req->rq_status = EINVAL;
196         } else {
197                 LDLM_DEBUG(lock, "server-side convert handler START");
198                 ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
199                                   &dlm_rep->lock_flags);
200                 req->rq_status = 0;
201         }
202         if (ptlrpc_reply(req->rq_svc, req) != 0)
203                 LBUG();
204
205         if (lock) {
206                 ldlm_reprocess_all(lock->l_resource);
207                 LDLM_DEBUG(lock, "server-side convert handler END");
208                 LDLM_LOCK_PUT(lock);
209         } else
210                 LDLM_DEBUG_NOLOCK("server-side convert handler END");
211
212         RETURN(0);
213 }
214
215 int ldlm_handle_cancel(struct ptlrpc_request *req)
216 {
217         struct ldlm_request *dlm_req;
218         struct ldlm_lock *lock;
219         int rc;
220         ENTRY;
221
222         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
223         if (rc) {
224                 CERROR("out of memory\n");
225                 RETURN(-ENOMEM);
226         }
227         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
228
229         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
230         if (!lock) {
231                 LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock (lock %p)",
232                                   (void *)(unsigned long) dlm_req->lock_handle1.addr);
233                 req->rq_status = ESTALE;
234         } else {
235                 LDLM_DEBUG(lock, "server-side cancel handler START");
236                 ldlm_lock_cancel(lock);
237                 req->rq_status = 0;
238         }
239
240         if (ptlrpc_reply(req->rq_svc, req) != 0)
241                 LBUG();
242
243         if (lock) {
244                 ldlm_reprocess_all(lock->l_resource);
245                 LDLM_DEBUG(lock, "server-side cancel handler END");
246                 LDLM_LOCK_PUT(lock);
247         } 
248
249         RETURN(0);
250 }
251
252 static int ldlm_handle_bl_callback(struct ptlrpc_request *req)
253 {
254         struct ldlm_request *dlm_req;
255         struct ldlm_lock *lock;
256         int rc, do_ast;
257         ENTRY;
258
259         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
260         if (rc)
261                 RETURN(-ENOMEM);
262         rc = ptlrpc_reply(req->rq_svc, req);
263         if (rc)
264                 RETURN(rc);
265
266         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
267
268         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
269         if (!lock) {
270                 CERROR("blocking callback on lock %Lx - lock disappeared\n",
271                        dlm_req->lock_handle1.addr);
272                 RETURN(0);
273         }
274
275         LDLM_DEBUG(lock, "client blocking AST callback handler START");
276
277         l_lock(&lock->l_resource->lr_namespace->ns_lock);
278         lock->l_flags |= LDLM_FL_CBPENDING;
279         do_ast = (!lock->l_readers && !lock->l_writers);
280         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
281
282         if (do_ast) {
283                 LDLM_DEBUG(lock, "already unused, calling "
284                            "callback (%p)", lock->l_blocking_ast);
285                 if (lock->l_blocking_ast != NULL) {
286                         lock->l_blocking_ast(lock, &dlm_req->lock_desc,
287                                              lock->l_data, lock->l_data_len);
288                 }
289         } else
290                 LDLM_DEBUG(lock, "Lock still has references, will be"
291                            " cancelled later");
292
293         LDLM_DEBUG(lock, "client blocking callback handler END");
294         LDLM_LOCK_PUT(lock);
295         RETURN(0);
296 }
297
298 static int ldlm_handle_cp_callback(struct ptlrpc_request *req)
299 {
300         struct list_head ast_list = LIST_HEAD_INIT(ast_list);
301         struct ldlm_request *dlm_req;
302         struct ldlm_lock *lock;
303         int rc;
304         ENTRY;
305
306         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
307         if (rc)
308                 RETURN(-ENOMEM);
309         rc = ptlrpc_reply(req->rq_svc, req);
310         if (rc)
311                 RETURN(rc);
312
313         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
314
315         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
316         if (!lock) {
317                 CERROR("completion callback on lock %Lx - lock disappeared\n",
318                        dlm_req->lock_handle1.addr);
319                 RETURN(0);
320         }
321
322         LDLM_DEBUG(lock, "client completion callback handler START");
323
324         l_lock(&lock->l_resource->lr_namespace->ns_lock);
325         lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
326
327         /* If we receive the completion AST before the actual enqueue returned,
328          * then we might need to switch resources. */
329         ldlm_resource_unlink_lock(lock);
330         if (memcmp(dlm_req->lock_desc.l_resource.lr_name,
331                    lock->l_resource->lr_name,
332                    sizeof(__u64) * RES_NAME_SIZE) != 0) {
333                 ldlm_lock_change_resource(lock, dlm_req->lock_desc.l_resource.lr_name);
334                 LDLM_DEBUG(lock, "completion AST, new resource");
335         }
336         lock->l_resource->lr_tmp = &ast_list;
337         ldlm_grant_lock(lock);
338         lock->l_resource->lr_tmp = NULL;
339         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
340         LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
341         LDLM_LOCK_PUT(lock);
342
343         ldlm_run_ast_work(&ast_list);
344
345         LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
346                           lock);
347         RETURN(0);
348 }
349
350 static int ldlm_callback_handler(struct ptlrpc_request *req)
351 {
352         int rc;
353         ENTRY;
354
355         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
356         if (rc) {
357                 CERROR("lustre_ldlm: Invalid request\n");
358                 GOTO(out, rc);
359         }
360
361         if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
362                 CERROR("lustre_ldlm: wrong packet type sent %d\n",
363                        req->rq_reqmsg->type);
364                 GOTO(out, rc = -EINVAL);
365         }
366         switch (req->rq_reqmsg->opc) {
367         case LDLM_BL_CALLBACK:
368                 CDEBUG(D_INODE, "blocking ast\n");
369                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
370                 rc = ldlm_handle_bl_callback(req);
371                 break;
372         case LDLM_CP_CALLBACK:
373                 CDEBUG(D_INODE, "completion ast\n");
374                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
375                 rc = ldlm_handle_cp_callback(req);
376                 break;
377
378         default:
379                 rc = ptlrpc_error(req->rq_svc, req);
380                 RETURN(rc);
381         }
382
383         EXIT;
384 out:
385         if (rc)
386                 RETURN(ptlrpc_error(req->rq_svc, req));
387         return 0;
388 }
389
390
391 static int ldlm_iocontrol(long cmd, struct lustre_handle *conn, int len,
392                           void *karg, void *uarg)
393 {
394         struct obd_device *obddev = class_conn2obd(conn);
395         struct ptlrpc_connection *connection;
396         int err = 0;
397         ENTRY;
398
399         if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
400             _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
401                 CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
402                        _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
403                 RETURN(-EINVAL);
404         }
405
406         OBD_ALLOC(obddev->u.ldlm.ldlm_client,
407                   sizeof(*obddev->u.ldlm.ldlm_client));
408         ptlrpc_init_client(NULL, NULL,
409                            LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
410                            obddev->u.ldlm.ldlm_client);
411         connection = ptlrpc_uuid_to_connection("ldlm");
412         if (!connection)
413                 CERROR("No LDLM UUID found: assuming ldlm is local.\n");
414
415         switch (cmd) {
416         case IOC_LDLM_TEST:
417                 err = ldlm_test(obddev, connection);
418                 CERROR("-- done err %d\n", err);
419                 GOTO(out, err);
420         case IOC_LDLM_DUMP:
421                 ldlm_dump_all_namespaces();
422                 GOTO(out, err);
423         default:
424                 GOTO(out, err = -EINVAL);
425         }
426
427  out:
428         if (connection)
429                 ptlrpc_put_connection(connection);
430         OBD_FREE(obddev->u.ldlm.ldlm_client,
431                  sizeof(*obddev->u.ldlm.ldlm_client));
432         return err;
433 }
434
435 #define LDLM_NUM_THREADS        8
436
437 static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
438 {
439         struct ldlm_obd *ldlm = &obddev->u.ldlm;
440         int rc, i;
441         ENTRY;
442
443         MOD_INC_USE_COUNT;
444         rc = ldlm_proc_setup(obddev);
445         if (rc != 0)
446                 GOTO(out_dec, rc);
447
448         ldlm->ldlm_service = ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL,
449                                              LDLM_REPLY_PORTAL, "self",
450                                              ldlm_callback_handler);
451         if (!ldlm->ldlm_service)
452                 GOTO(out_proc, rc = -ENOMEM);
453
454         for (i = 0; i < LDLM_NUM_THREADS; i++) {
455                 char name[32];
456                 sprintf(name, "lustre_dlm_%02d", i);
457                 rc = ptlrpc_start_thread(obddev, ldlm->ldlm_service, name);
458                 if (rc) {
459                         CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
460                         LBUG();
461                         GOTO(out_thread, rc);
462                 }
463         }
464
465         RETURN(0);
466
467  out_thread:
468         ptlrpc_stop_all_threads(ldlm->ldlm_service);
469         ptlrpc_unregister_service(ldlm->ldlm_service);
470  out_proc:
471         ldlm_proc_cleanup(obddev);
472  out_dec:
473         MOD_DEC_USE_COUNT;
474         return rc;
475 }
476
477 static int ldlm_cleanup(struct obd_device *obddev)
478 {
479         struct ldlm_obd *ldlm = &obddev->u.ldlm;
480         ENTRY;
481
482         if (!list_empty(&ldlm_namespace_list)) {
483                 CERROR("ldlm still has namespaces; clean these up first.\n");
484                 RETURN(-EBUSY);
485         }
486
487         ptlrpc_stop_all_threads(ldlm->ldlm_service);
488         ptlrpc_unregister_service(ldlm->ldlm_service);
489         ldlm_proc_cleanup(obddev);
490
491         MOD_DEC_USE_COUNT;
492         RETURN(0);
493 }
494
495 struct obd_ops ldlm_obd_ops = {
496         o_iocontrol:   ldlm_iocontrol,
497         o_setup:       ldlm_setup,
498         o_cleanup:     ldlm_cleanup,
499         o_connect:     class_connect,
500         o_disconnect:  class_disconnect
501 };
502
503 static int __init ldlm_init(void)
504 {
505         int rc = class_register_type(&ldlm_obd_ops, OBD_LDLM_DEVICENAME);
506         if (rc != 0)
507                 return rc;
508
509         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
510                                                sizeof(struct ldlm_resource), 0,
511                                                SLAB_HWCACHE_ALIGN, NULL, NULL);
512         if (ldlm_resource_slab == NULL)
513                 return -ENOMEM;
514
515         ldlm_lock_slab = kmem_cache_create("ldlm_locks",
516                                            sizeof(struct ldlm_lock), 0,
517                                            SLAB_HWCACHE_ALIGN, NULL, NULL);
518         if (ldlm_lock_slab == NULL) {
519                 kmem_cache_destroy(ldlm_resource_slab);
520                 return -ENOMEM;
521         }
522
523         return 0;
524 }
525
526 static void __exit ldlm_exit(void)
527 {
528         class_unregister_type(OBD_LDLM_DEVICENAME);
529         if (kmem_cache_destroy(ldlm_resource_slab) != 0)
530                 CERROR("couldn't free ldlm resource slab\n");
531         if (kmem_cache_destroy(ldlm_lock_slab) != 0)
532                 CERROR("couldn't free ldlm lock slab\n");
533 }
534
535 EXPORT_SYMBOL(ldlm_completion_ast);
536 EXPORT_SYMBOL(ldlm_handle_enqueue);
537 EXPORT_SYMBOL(ldlm_handle_cancel);
538 EXPORT_SYMBOL(ldlm_handle_convert);
539 EXPORT_SYMBOL(ldlm_register_intent);
540 EXPORT_SYMBOL(ldlm_unregister_intent); 
541 EXPORT_SYMBOL(ldlm_lockname);
542 EXPORT_SYMBOL(ldlm_typename);
543 EXPORT_SYMBOL(ldlm_handle2lock);
544 EXPORT_SYMBOL(ldlm_lock2handle);
545 EXPORT_SYMBOL(ldlm_lock_match);
546 EXPORT_SYMBOL(ldlm_lock_addref);
547 EXPORT_SYMBOL(ldlm_lock_decref);
548 EXPORT_SYMBOL(ldlm_lock_change_resource);
549 EXPORT_SYMBOL(ldlm_cli_convert);
550 EXPORT_SYMBOL(ldlm_cli_enqueue);
551 EXPORT_SYMBOL(ldlm_cli_cancel);
552 EXPORT_SYMBOL(ldlm_match_or_enqueue);
553 EXPORT_SYMBOL(ldlm_it2str);
554 EXPORT_SYMBOL(ldlm_test);
555 EXPORT_SYMBOL(ldlm_lock_dump);
556 EXPORT_SYMBOL(ldlm_namespace_new);
557 EXPORT_SYMBOL(ldlm_namespace_free);
558
559 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
560 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
561 MODULE_LICENSE("GPL");
562
563 module_init(ldlm_init);
564 module_exit(ldlm_exit);