Whamcloud - gitweb
Remove silly LBUG
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  */
11
12 #define EXPORT_SYMTAB
13 #define DEBUG_SUBSYSTEM S_LDLM
14
15 #include <linux/module.h>
16 #include <linux/slab.h>
17 #include <linux/lustre_dlm.h>
18
19 extern kmem_cache_t *ldlm_resource_slab;
20 extern kmem_cache_t *ldlm_lock_slab;
21 extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
22 extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
23
24 static int ldlm_server_blocking_ast(struct ldlm_lock *lock,
25                                     struct ldlm_lock_desc *desc,
26                                     void *data, __u32 data_len)
27 {
28         struct ldlm_request *body;
29         struct ptlrpc_request *req;
30         struct ptlrpc_client *cl;
31         int rc = 0, size = sizeof(*body);
32         ENTRY;
33
34         cl = &lock->l_resource->lr_namespace->ns_rpc_client;
35         req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_BL_CALLBACK, 1,
36                               &size, NULL);
37         if (!req)
38                 RETURN(-ENOMEM);
39
40         body = lustre_msg_buf(req->rq_reqmsg, 0);
41         memcpy(&body->lock_handle1, &lock->l_remote_handle,
42                sizeof(body->lock_handle1));
43         memcpy(&body->lock_desc, desc, sizeof(*desc));
44
45         LDLM_DEBUG(lock, "server preparing blocking AST");
46         req->rq_replen = lustre_msg_size(0, NULL);
47
48         rc = ptlrpc_queue_wait(req);
49         rc = ptlrpc_check_status(req, rc);
50         ptlrpc_free_req(req);
51
52         RETURN(rc);
53 }
54
55 static int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags)
56 {
57         struct ldlm_request *body;
58         struct ptlrpc_request *req;
59         struct ptlrpc_client *cl;
60         int rc = 0, size = sizeof(*body);
61         ENTRY;
62
63         if (lock == NULL) {
64                 LBUG();
65                 RETURN(-EINVAL);
66         }
67
68         cl = &lock->l_resource->lr_namespace->ns_rpc_client;
69         req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CP_CALLBACK, 1,
70                               &size, NULL);
71         if (!req)
72                 RETURN(-ENOMEM);
73
74         body = lustre_msg_buf(req->rq_reqmsg, 0);
75         memcpy(&body->lock_handle1, &lock->l_remote_handle,
76                sizeof(body->lock_handle1));
77         body->lock_flags = flags;
78         ldlm_lock2desc(lock, &body->lock_desc);
79
80         LDLM_DEBUG(lock, "server preparing completion AST");
81         req->rq_replen = lustre_msg_size(0, NULL);
82
83         rc = ptlrpc_queue_wait(req);
84         rc = ptlrpc_check_status(req, rc);
85         ptlrpc_free_req(req);
86         RETURN(rc);
87 }
88
89 int ldlm_handle_enqueue(struct ptlrpc_request *req)
90 {
91         struct obd_device *obddev = req->rq_export->exp_obd;
92         struct ldlm_reply *dlm_rep;
93         struct ldlm_request *dlm_req;
94         int rc, size = sizeof(*dlm_rep), cookielen = 0;
95         __u32 flags;
96         ldlm_error_t err;
97         struct ldlm_lock *lock = NULL;
98         void *cookie = NULL;
99         ENTRY;
100
101         LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
102
103         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
104         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_MDSINTENT) {
105                 /* In this case, the reply buffer is allocated deep in
106                  * local_lock_enqueue by the policy function. */
107                 cookie = req;
108                 cookielen = sizeof(*req);
109         } else {
110                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
111                                      &req->rq_repmsg);
112                 if (rc) {
113                         CERROR("out of memory\n");
114                         RETURN(-ENOMEM);
115                 }
116                 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) {
117                         cookie = &dlm_req->lock_desc.l_extent;
118                         cookielen = sizeof(struct ldlm_extent);
119                 }
120         }
121
122         lock = ldlm_lock_create(obddev->obd_namespace,
123                                 &dlm_req->lock_handle2,
124                                 dlm_req->lock_desc.l_resource.lr_name,
125                                 dlm_req->lock_desc.l_resource.lr_type,
126                                 dlm_req->lock_desc.l_req_mode, NULL, 0);
127         if (!lock)
128                 GOTO(out, err = -ENOMEM);
129
130         memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
131                sizeof(lock->l_remote_handle));
132         LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
133
134         flags = dlm_req->lock_flags;
135         err = ldlm_lock_enqueue(lock, cookie, cookielen, &flags,
136                                 ldlm_server_completion_ast,
137                                 ldlm_server_blocking_ast);
138         if (err != ELDLM_OK)
139                 GOTO(out, err);
140
141         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
142         dlm_rep->lock_flags = flags;
143
144         ldlm_lock2handle(lock, &dlm_rep->lock_handle);
145         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
146                 memcpy(&dlm_rep->lock_extent, &lock->l_extent,
147                        sizeof(lock->l_extent));
148         if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED) {
149                 memcpy(dlm_rep->lock_resource_name, lock->l_resource->lr_name,
150                        sizeof(dlm_rep->lock_resource_name));
151                 dlm_rep->lock_mode = lock->l_req_mode;
152         }
153
154         lock->l_connection = ptlrpc_connection_addref(req->rq_connection);
155         EXIT;
156  out:
157         if (lock)
158                 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
159                            "(err=%d)", err);
160         req->rq_status = err;
161
162         if (ptlrpc_reply(req->rq_svc, req))
163                 LBUG();
164
165         if (lock) {
166                 if (!err)
167                         ldlm_reprocess_all(lock->l_resource);
168                 LDLM_LOCK_PUT(lock);
169         }
170         LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p)", lock);
171
172         return 0;
173 }
174
175 int ldlm_handle_convert(struct ptlrpc_request *req)
176 {
177         struct ldlm_request *dlm_req;
178         struct ldlm_reply *dlm_rep;
179         struct ldlm_lock *lock;
180         int rc, size = sizeof(*dlm_rep);
181         ENTRY;
182
183         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
184         if (rc) {
185                 CERROR("out of memory\n");
186                 RETURN(-ENOMEM);
187         }
188         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
189         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
190         dlm_rep->lock_flags = dlm_req->lock_flags;
191
192         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
193         if (!lock) {
194                 req->rq_status = EINVAL;
195         } else {
196                 LDLM_DEBUG(lock, "server-side convert handler START");
197                 ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
198                                   &dlm_rep->lock_flags);
199                 req->rq_status = 0;
200         }
201         if (ptlrpc_reply(req->rq_svc, req) != 0)
202                 LBUG();
203
204         if (lock) {
205                 ldlm_reprocess_all(lock->l_resource);
206                 LDLM_DEBUG(lock, "server-side convert handler END");
207                 LDLM_LOCK_PUT(lock);
208         } else
209                 LDLM_DEBUG_NOLOCK("server-side convert handler END");
210
211         RETURN(0);
212 }
213
214 int ldlm_handle_cancel(struct ptlrpc_request *req)
215 {
216         struct ldlm_request *dlm_req;
217         struct ldlm_lock *lock;
218         int rc;
219         ENTRY;
220
221         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
222         if (rc) {
223                 CERROR("out of memory\n");
224                 RETURN(-ENOMEM);
225         }
226         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
227
228         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
229         if (!lock) {
230                 LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock (lock %p)",
231                                   (void *)(unsigned long) dlm_req->lock_handle1.addr);
232                 req->rq_status = ESTALE;
233         } else {
234                 LDLM_DEBUG(lock, "server-side cancel handler START");
235                 ldlm_lock_cancel(lock);
236                 req->rq_status = 0;
237         }
238
239         if (ptlrpc_reply(req->rq_svc, req) != 0)
240                 LBUG();
241
242         if (lock) {
243                 ldlm_reprocess_all(lock->l_resource);
244                 LDLM_DEBUG(lock, "server-side cancel handler END");
245                 LDLM_LOCK_PUT(lock);
246         } 
247
248         RETURN(0);
249 }
250
251 static int ldlm_handle_bl_callback(struct ptlrpc_request *req)
252 {
253         struct ldlm_request *dlm_req;
254         struct ldlm_lock *lock;
255         int rc, do_ast;
256         ENTRY;
257
258         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
259         if (rc)
260                 RETURN(-ENOMEM);
261         rc = ptlrpc_reply(req->rq_svc, req);
262         if (rc)
263                 RETURN(rc);
264
265         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
266
267         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
268         if (!lock) {
269                 CERROR("blocking callback on lock %Lx - lock disappeared\n",
270                        dlm_req->lock_handle1.addr);
271                 RETURN(0);
272         }
273
274         LDLM_DEBUG(lock, "client blocking AST callback handler START");
275
276         l_lock(&lock->l_resource->lr_namespace->ns_lock);
277         lock->l_flags |= LDLM_FL_CBPENDING;
278         do_ast = (!lock->l_readers && !lock->l_writers);
279         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
280
281         if (do_ast) {
282                 LDLM_DEBUG(lock, "already unused, calling "
283                            "callback (%p)", lock->l_blocking_ast);
284                 if (lock->l_blocking_ast != NULL) {
285                         lock->l_blocking_ast(lock, &dlm_req->lock_desc,
286                                              lock->l_data, lock->l_data_len);
287                 }
288         } else
289                 LDLM_DEBUG(lock, "Lock still has references, will be"
290                            " cancelled later");
291
292         LDLM_DEBUG(lock, "client blocking callback handler END");
293         LDLM_LOCK_PUT(lock);
294         RETURN(0);
295 }
296
297 static int ldlm_handle_cp_callback(struct ptlrpc_request *req)
298 {
299         struct list_head ast_list = LIST_HEAD_INIT(ast_list);
300         struct ldlm_request *dlm_req;
301         struct ldlm_lock *lock;
302         int rc;
303         ENTRY;
304
305         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
306         if (rc)
307                 RETURN(-ENOMEM);
308         rc = ptlrpc_reply(req->rq_svc, req);
309         if (rc)
310                 RETURN(rc);
311
312         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
313
314         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
315         if (!lock) {
316                 CERROR("completion callback on lock %Lx - lock disappeared\n",
317                        dlm_req->lock_handle1.addr);
318                 RETURN(0);
319         }
320
321         LDLM_DEBUG(lock, "client completion callback handler START");
322
323         l_lock(&lock->l_resource->lr_namespace->ns_lock);
324         lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
325
326         /* If we receive the completion AST before the actual enqueue returned,
327          * then we might need to switch resources. */
328         ldlm_resource_unlink_lock(lock);
329         if (memcmp(dlm_req->lock_desc.l_resource.lr_name,
330                    lock->l_resource->lr_name,
331                    sizeof(__u64) * RES_NAME_SIZE) != 0) {
332                 ldlm_lock_change_resource(lock, dlm_req->lock_desc.l_resource.lr_name);
333                 LDLM_DEBUG(lock, "completion AST, new resource");
334         }
335         lock->l_resource->lr_tmp = &ast_list;
336         ldlm_grant_lock(lock);
337         lock->l_resource->lr_tmp = NULL;
338         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
339         LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
340         LDLM_LOCK_PUT(lock);
341
342         ldlm_run_ast_work(&ast_list);
343
344         LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
345                           lock);
346         RETURN(0);
347 }
348
349 static int ldlm_callback_handler(struct ptlrpc_request *req)
350 {
351         int rc;
352         ENTRY;
353
354         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
355         if (rc) {
356                 CERROR("lustre_ldlm: Invalid request\n");
357                 GOTO(out, rc);
358         }
359
360         if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
361                 CERROR("lustre_ldlm: wrong packet type sent %d\n",
362                        req->rq_reqmsg->type);
363                 GOTO(out, rc = -EINVAL);
364         }
365         switch (req->rq_reqmsg->opc) {
366         case LDLM_BL_CALLBACK:
367                 CDEBUG(D_INODE, "blocking ast\n");
368                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
369                 rc = ldlm_handle_bl_callback(req);
370                 break;
371         case LDLM_CP_CALLBACK:
372                 CDEBUG(D_INODE, "completion ast\n");
373                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
374                 rc = ldlm_handle_cp_callback(req);
375                 break;
376
377         default:
378                 rc = ptlrpc_error(req->rq_svc, req);
379                 RETURN(rc);
380         }
381
382         EXIT;
383 out:
384         if (rc)
385                 RETURN(ptlrpc_error(req->rq_svc, req));
386         return 0;
387 }
388
389
390 static int ldlm_iocontrol(long cmd, struct lustre_handle *conn, int len,
391                           void *karg, void *uarg)
392 {
393         struct obd_device *obddev = class_conn2obd(conn);
394         struct ptlrpc_connection *connection;
395         int err = 0;
396         ENTRY;
397
398         if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
399             _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
400                 CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
401                        _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
402                 RETURN(-EINVAL);
403         }
404
405         OBD_ALLOC(obddev->u.ldlm.ldlm_client,
406                   sizeof(*obddev->u.ldlm.ldlm_client));
407         ptlrpc_init_client(NULL, NULL,
408                            LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
409                            obddev->u.ldlm.ldlm_client);
410         connection = ptlrpc_uuid_to_connection("ldlm");
411         if (!connection)
412                 CERROR("No LDLM UUID found: assuming ldlm is local.\n");
413
414         switch (cmd) {
415         case IOC_LDLM_TEST:
416                 err = ldlm_test(obddev, connection);
417                 CERROR("-- done err %d\n", err);
418                 GOTO(out, err);
419         case IOC_LDLM_DUMP:
420                 ldlm_dump_all_namespaces();
421                 GOTO(out, err);
422         default:
423                 GOTO(out, err = -EINVAL);
424         }
425
426  out:
427         if (connection)
428                 ptlrpc_put_connection(connection);
429         OBD_FREE(obddev->u.ldlm.ldlm_client,
430                  sizeof(*obddev->u.ldlm.ldlm_client));
431         return err;
432 }
433
434 #define LDLM_NUM_THREADS        8
435
436 static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
437 {
438         struct ldlm_obd *ldlm = &obddev->u.ldlm;
439         int rc, i;
440         ENTRY;
441
442         MOD_INC_USE_COUNT;
443         rc = ldlm_proc_setup(obddev);
444         if (rc != 0)
445                 GOTO(out_dec, rc);
446
447         ldlm->ldlm_service = ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL,
448                                              LDLM_REPLY_PORTAL, "self",
449                                              ldlm_callback_handler);
450         if (!ldlm->ldlm_service)
451                 GOTO(out_proc, rc = -ENOMEM);
452
453         for (i = 0; i < LDLM_NUM_THREADS; i++) {
454                 char name[32];
455                 sprintf(name, "lustre_dlm_%02d", i);
456                 rc = ptlrpc_start_thread(obddev, ldlm->ldlm_service, name);
457                 if (rc) {
458                         CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
459                         LBUG();
460                         GOTO(out_thread, rc);
461                 }
462         }
463
464         RETURN(0);
465
466  out_thread:
467         ptlrpc_stop_all_threads(ldlm->ldlm_service);
468         ptlrpc_unregister_service(ldlm->ldlm_service);
469  out_proc:
470         ldlm_proc_cleanup(obddev);
471  out_dec:
472         MOD_DEC_USE_COUNT;
473         return rc;
474 }
475
476 static int ldlm_cleanup(struct obd_device *obddev)
477 {
478         struct ldlm_obd *ldlm = &obddev->u.ldlm;
479         ENTRY;
480
481         ptlrpc_stop_all_threads(ldlm->ldlm_service);
482         ptlrpc_unregister_service(ldlm->ldlm_service);
483         ldlm_proc_cleanup(obddev);
484
485         MOD_DEC_USE_COUNT;
486         RETURN(0);
487 }
488
489 struct obd_ops ldlm_obd_ops = {
490         o_iocontrol:   ldlm_iocontrol,
491         o_setup:       ldlm_setup,
492         o_cleanup:     ldlm_cleanup,
493         o_connect:     class_connect,
494         o_disconnect:  class_disconnect
495 };
496
497 static int __init ldlm_init(void)
498 {
499         int rc = class_register_type(&ldlm_obd_ops, OBD_LDLM_DEVICENAME);
500         if (rc != 0)
501                 return rc;
502
503         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
504                                                sizeof(struct ldlm_resource), 0,
505                                                SLAB_HWCACHE_ALIGN, NULL, NULL);
506         if (ldlm_resource_slab == NULL)
507                 return -ENOMEM;
508
509         ldlm_lock_slab = kmem_cache_create("ldlm_locks",
510                                            sizeof(struct ldlm_lock), 0,
511                                            SLAB_HWCACHE_ALIGN, NULL, NULL);
512         if (ldlm_lock_slab == NULL) {
513                 kmem_cache_destroy(ldlm_resource_slab);
514                 return -ENOMEM;
515         }
516
517         return 0;
518 }
519
520 static void __exit ldlm_exit(void)
521 {
522         class_unregister_type(OBD_LDLM_DEVICENAME);
523         if (kmem_cache_destroy(ldlm_resource_slab) != 0)
524                 CERROR("couldn't free ldlm resource slab\n");
525         if (kmem_cache_destroy(ldlm_lock_slab) != 0)
526                 CERROR("couldn't free ldlm lock slab\n");
527 }
528
529 EXPORT_SYMBOL(ldlm_completion_ast);
530 EXPORT_SYMBOL(ldlm_handle_enqueue);
531 EXPORT_SYMBOL(ldlm_handle_cancel);
532 EXPORT_SYMBOL(ldlm_handle_convert);
533 EXPORT_SYMBOL(ldlm_register_intent);
534 EXPORT_SYMBOL(ldlm_unregister_intent); 
535 EXPORT_SYMBOL(ldlm_lockname);
536 EXPORT_SYMBOL(ldlm_typename);
537 EXPORT_SYMBOL(ldlm_handle2lock);
538 EXPORT_SYMBOL(ldlm_lock2handle);
539 EXPORT_SYMBOL(ldlm_lock_match);
540 EXPORT_SYMBOL(ldlm_lock_addref);
541 EXPORT_SYMBOL(ldlm_lock_decref);
542 EXPORT_SYMBOL(ldlm_lock_change_resource);
543 EXPORT_SYMBOL(ldlm_cli_convert);
544 EXPORT_SYMBOL(ldlm_cli_enqueue);
545 EXPORT_SYMBOL(ldlm_cli_cancel);
546 EXPORT_SYMBOL(ldlm_match_or_enqueue);
547 EXPORT_SYMBOL(ldlm_it2str);
548 EXPORT_SYMBOL(ldlm_test);
549 EXPORT_SYMBOL(ldlm_lock_dump);
550 EXPORT_SYMBOL(ldlm_namespace_new);
551 EXPORT_SYMBOL(ldlm_namespace_free);
552
553 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
554 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
555 MODULE_LICENSE("GPL");
556
557 module_init(ldlm_init);
558 module_exit(ldlm_exit);