Whamcloud - gitweb
Big diff, mostly cosmetic.
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  */
11
12 #define EXPORT_SYMTAB
13 #define DEBUG_SUBSYSTEM S_LDLM
14
15 #include <linux/module.h>
16 #include <linux/slab.h>
17 #include <linux/lustre_dlm.h>
18
19 extern kmem_cache_t *ldlm_resource_slab;
20 extern kmem_cache_t *ldlm_lock_slab;
21 extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
22 extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
23
24 static int ldlm_handle_enqueue(struct ptlrpc_request *req)
25 {
26         struct obd_device *obddev = req->rq_export->exp_obd;
27         struct ldlm_reply *dlm_rep;
28         struct ldlm_request *dlm_req;
29         int rc, size = sizeof(*dlm_rep), cookielen = 0;
30         __u32 flags;
31         ldlm_error_t err;
32         struct ldlm_lock *lock = NULL;
33         void *cookie = NULL;
34         ENTRY;
35
36         LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
37
38         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
39         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_MDSINTENT) {
40                 /* In this case, the reply buffer is allocated deep in
41                  * local_lock_enqueue by the policy function. */
42                 cookie = req;
43                 cookielen = sizeof(*req);
44         } else {
45                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
46                                      &req->rq_repmsg);
47                 if (rc) {
48                         CERROR("out of memory\n");
49                         RETURN(-ENOMEM);
50                 }
51                 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) {
52                         cookie = &dlm_req->lock_desc.l_extent;
53                         cookielen = sizeof(struct ldlm_extent);
54                 }
55         }
56
57         lock = ldlm_lock_create(obddev->obd_namespace,
58                                 &dlm_req->lock_handle2,
59                                 dlm_req->lock_desc.l_resource.lr_name,
60                                 dlm_req->lock_desc.l_resource.lr_type,
61                                 dlm_req->lock_desc.l_req_mode, NULL, 0);
62         if (!lock)
63                 GOTO(out, err = -ENOMEM);
64
65         memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
66                sizeof(lock->l_remote_handle));
67         LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
68
69         flags = dlm_req->lock_flags;
70         err = ldlm_lock_enqueue(lock, cookie, cookielen, &flags,
71                                 ldlm_server_ast, ldlm_server_ast);
72         if (err != ELDLM_OK)
73                 GOTO(out, err);
74
75         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
76         dlm_rep->lock_flags = flags;
77
78         ldlm_lock2handle(lock, &dlm_rep->lock_handle);
79         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
80                 memcpy(&dlm_rep->lock_extent, &lock->l_extent,
81                        sizeof(lock->l_extent));
82         if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED)
83                 memcpy(dlm_rep->lock_resource_name, lock->l_resource->lr_name,
84                        sizeof(dlm_rep->lock_resource_name));
85
86         lock->l_connection = ptlrpc_connection_addref(req->rq_connection);
87         EXIT;
88  out:
89         if (lock)
90                 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
91                            "(err=%d)", err);
92         req->rq_status = err;
93
94         if (ptlrpc_reply(req->rq_svc, req))
95                 LBUG();
96
97         if (lock) {
98                 if (!err)
99                         ldlm_reprocess_all(lock->l_resource);
100                 LDLM_LOCK_PUT(lock);
101         }
102         LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p)", lock);
103
104         return 0;
105 }
106
107 static int ldlm_handle_convert(struct ptlrpc_request *req)
108 {
109         struct ldlm_request *dlm_req;
110         struct ldlm_reply *dlm_rep;
111         struct ldlm_lock *lock;
112         int rc, size = sizeof(*dlm_rep);
113         ENTRY;
114
115         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
116         if (rc) {
117                 CERROR("out of memory\n");
118                 RETURN(-ENOMEM);
119         }
120         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
121         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
122         dlm_rep->lock_flags = dlm_req->lock_flags;
123
124         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
125         if (!lock) {
126                 req->rq_status = EINVAL;
127         } else {
128                 LDLM_DEBUG(lock, "server-side convert handler START");
129                 ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
130                                   &dlm_rep->lock_flags);
131                 req->rq_status = 0;
132         }
133         if (ptlrpc_reply(req->rq_svc, req) != 0)
134                 LBUG();
135
136         if (lock) {
137                 ldlm_reprocess_all(lock->l_resource);
138                 LDLM_DEBUG(lock, "server-side convert handler END");
139                 LDLM_LOCK_PUT(lock);
140         } else
141                 LDLM_DEBUG_NOLOCK("server-side convert handler END");
142
143         RETURN(0);
144 }
145
146 static int ldlm_handle_cancel(struct ptlrpc_request *req)
147 {
148         struct ldlm_request *dlm_req;
149         struct ldlm_lock *lock;
150         int rc;
151         ENTRY;
152
153         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
154         if (rc) {
155                 CERROR("out of memory\n");
156                 RETURN(-ENOMEM);
157         }
158         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
159
160         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
161         if (!lock) {
162                 req->rq_status = ESTALE;
163         } else {
164                 LDLM_DEBUG(lock, "server-side cancel handler START");
165                 ldlm_lock_cancel(lock);
166                 req->rq_status = 0;
167         }
168
169         if (ptlrpc_reply(req->rq_svc, req) != 0)
170                 LBUG();
171
172         if (lock) {
173                 ldlm_reprocess_all(lock->l_resource);
174                 LDLM_DEBUG(lock, "server-side cancel handler END");
175                 LDLM_LOCK_PUT(lock);
176         } else
177                 LDLM_DEBUG_NOLOCK("server-side cancel handler END (lock %p)",
178                                   lock);
179
180         RETURN(0);
181 }
182
183 static int ldlm_handle_callback(struct ptlrpc_request *req)
184 {
185         struct ldlm_request *dlm_req;
186         struct ldlm_lock_desc *descp = NULL;
187         struct ldlm_lock *lock;
188         __u64 is_blocking_ast = 0;
189         int rc;
190         ENTRY;
191
192         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
193         if (rc) {
194                 CERROR("out of memory\n");
195                 RETURN(-ENOMEM);
196         }
197         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
198
199         /* We must send the reply first, so that the thread is free to handle
200          * any requests made in common_callback() */
201         rc = ptlrpc_reply(req->rq_svc, req);
202         if (rc != 0)
203                 RETURN(rc);
204
205         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
206         if (!lock) {
207                 CERROR("callback on lock %Lx - lock disappeared\n",
208                        dlm_req->lock_handle1.addr);
209                 RETURN(0);
210         }
211
212         /* check if this is a blocking AST */
213         if (dlm_req->lock_desc.l_req_mode !=
214             dlm_req->lock_desc.l_granted_mode) {
215                 descp = &dlm_req->lock_desc;
216                 is_blocking_ast = 1;
217         }
218
219         LDLM_DEBUG(lock, "client %s callback handler START",
220                    is_blocking_ast ? "blocked" : "completion");
221
222         if (descp) {
223                 int do_ast;
224                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
225                 lock->l_flags |= LDLM_FL_CBPENDING;
226                 do_ast = (!lock->l_readers && !lock->l_writers);
227                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
228
229                 if (do_ast) {
230                         LDLM_DEBUG(lock, "already unused, calling "
231                                    "callback (%p)", lock->l_blocking_ast);
232                         if (lock->l_blocking_ast != NULL) {
233                                 struct lustre_handle lockh;
234                                 ldlm_lock2handle(lock, &lockh);
235                                 lock->l_blocking_ast(&lockh, descp,
236                                                      lock->l_data,
237                                                      lock->l_data_len);
238                         }
239                 } else {
240                         LDLM_DEBUG(lock, "Lock still has references, will be"
241                                    " cancelled later");
242                 }
243                 LDLM_LOCK_PUT(lock);
244         } else {
245                 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
246
247                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
248                 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
249
250                 /* If we receive the completion AST before the actual enqueue
251                  * returned, then we might need to switch resources. */
252                 if (memcmp(dlm_req->lock_desc.l_resource.lr_name,
253                            lock->l_resource->lr_name,
254                            sizeof(__u64) * RES_NAME_SIZE) != 0) {
255                         ldlm_lock_change_resource(lock, dlm_req->lock_desc.l_resource.lr_name);
256                         LDLM_DEBUG(lock, "completion AST, new resource");
257                 }
258                 lock->l_resource->lr_tmp = &rpc_list;
259                 ldlm_resource_unlink_lock(lock);
260                 ldlm_grant_lock(lock);
261                 /*  FIXME: we want any completion function, not just wake_up */
262                 wake_up(&lock->l_waitq);
263                 lock->l_resource->lr_tmp = NULL;
264                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
265                 LDLM_LOCK_PUT(lock);
266
267                 ldlm_run_ast_work(&rpc_list);
268         }
269
270         LDLM_DEBUG_NOLOCK("client %s callback handler END (lock %p)",
271                           is_blocking_ast ? "blocked" : "completion", lock);
272         RETURN(0);
273 }
274
275 static int ldlm_handle(struct ptlrpc_request *req)
276 {
277         int rc;
278         ENTRY;
279
280         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
281         if (rc) {
282                 CERROR("lustre_ldlm: Invalid request\n");
283                 GOTO(out, rc);
284         }
285
286         if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
287                 CERROR("lustre_ldlm: wrong packet type sent %d\n",
288                        req->rq_reqmsg->type);
289                 GOTO(out, rc = -EINVAL);
290         }
291
292         if (!req->rq_export && req->rq_reqmsg->opc == LDLM_ENQUEUE) {
293                 CERROR("No export handle for enqueue request.\n");
294                 GOTO(out, rc = -ENOTCONN);
295         }
296
297         switch (req->rq_reqmsg->opc) {
298         case LDLM_ENQUEUE:
299                 CDEBUG(D_INODE, "enqueue\n");
300                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
301                 rc = ldlm_handle_enqueue(req);
302                 break;
303
304         case LDLM_CONVERT:
305                 CDEBUG(D_INODE, "convert\n");
306                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
307                 rc = ldlm_handle_convert(req);
308                 break;
309
310         case LDLM_CANCEL:
311                 CDEBUG(D_INODE, "cancel\n");
312                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
313                 rc = ldlm_handle_cancel(req);
314                 break;
315
316         case LDLM_CALLBACK:
317                 CDEBUG(D_INODE, "callback\n");
318                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
319                 rc = ldlm_handle_callback(req);
320                 break;
321
322         default:
323                 rc = ptlrpc_error(req->rq_svc, req);
324                 RETURN(rc);
325         }
326
327         EXIT;
328 out:
329         if (rc)
330                 RETURN(ptlrpc_error(req->rq_svc, req));
331         return 0;
332 }
333
334 static int ldlm_iocontrol(long cmd, struct lustre_handle *conn, int len,
335                           void *karg, void *uarg)
336 {
337         struct obd_device *obddev = class_conn2obd(conn);
338         struct ptlrpc_connection *connection;
339         int err;
340         ENTRY;
341
342         if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
343             _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
344                 CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
345                        _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
346                 RETURN(-EINVAL);
347         }
348
349         OBD_ALLOC(obddev->u.ldlm.ldlm_client,
350                   sizeof(*obddev->u.ldlm.ldlm_client));
351         ptlrpc_init_client(NULL, NULL,
352                            LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
353                            obddev->u.ldlm.ldlm_client);
354         connection = ptlrpc_uuid_to_connection("ldlm");
355         if (!connection)
356                 CERROR("No LDLM UUID found: assuming ldlm is local.\n");
357
358         switch (cmd) {
359         case IOC_LDLM_TEST: {
360                 err = ldlm_test(obddev, connection);
361                 CERROR("-- done err %d\n", err);
362                 GOTO(out, err);
363         }
364         default:
365                 GOTO(out, err = -EINVAL);
366         }
367
368  out:
369         if (connection)
370                 ptlrpc_put_connection(connection);
371         OBD_FREE(obddev->u.ldlm.ldlm_client,
372                  sizeof(*obddev->u.ldlm.ldlm_client));
373         return err;
374 }
375
376 #define LDLM_NUM_THREADS        8
377
378 static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
379 {
380         struct ldlm_obd *ldlm = &obddev->u.ldlm;
381         int rc;
382         int i;
383         ENTRY;
384
385         MOD_INC_USE_COUNT;
386         ldlm->ldlm_service =
387                 ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL,
388                                 LDLM_REPLY_PORTAL, "self", ldlm_handle);
389         if (!ldlm->ldlm_service) {
390                 LBUG();
391                 GOTO(out_dec, rc = -ENOMEM);
392         }
393
394         if (mds_reint_p == NULL)
395                 mds_reint_p = inter_module_get_request("mds_reint", "mds");
396         if (IS_ERR(mds_reint_p)) {
397                 CERROR("MDSINTENT locks require the MDS module.\n");
398                 GOTO(out_dec, rc = -EINVAL);
399         }
400         if (mds_getattr_name_p == NULL)
401                 mds_getattr_name_p = inter_module_get_request
402                         ("mds_getattr_name", "mds");
403         if (IS_ERR(mds_getattr_name_p)) {
404                 CERROR("MDSINTENT locks require the MDS module.\n");
405                 GOTO(out_dec, rc = -EINVAL);
406         }
407
408         for (i = 0; i < LDLM_NUM_THREADS; i++) {
409                 rc = ptlrpc_start_thread(obddev, ldlm->ldlm_service,
410                                          "lustre_dlm");
411                 if (rc) {
412                         CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
413                         LBUG();
414                         GOTO(out_thread, rc);
415                 }
416         }
417
418         RETURN(0);
419
420 out_thread:
421         ptlrpc_stop_all_threads(ldlm->ldlm_service);
422         ptlrpc_unregister_service(ldlm->ldlm_service);
423 out_dec:
424         if (mds_reint_p != NULL)
425                 inter_module_put("mds_reint");
426         if (mds_getattr_name_p != NULL)
427                 inter_module_put("mds_getattr_name");
428         MOD_DEC_USE_COUNT;
429         return rc;
430 }
431
432 static int ldlm_cleanup(struct obd_device *obddev)
433 {
434         struct ldlm_obd *ldlm = &obddev->u.ldlm;
435         ENTRY;
436
437         ptlrpc_stop_all_threads(ldlm->ldlm_service);
438         ptlrpc_unregister_service(ldlm->ldlm_service);
439
440         if (mds_reint_p != NULL)
441                 inter_module_put("mds_reint");
442         if (mds_getattr_name_p != NULL)
443                 inter_module_put("mds_getattr_name");
444
445         MOD_DEC_USE_COUNT;
446         RETURN(0);
447 }
448
449 struct obd_ops ldlm_obd_ops = {
450         o_iocontrol:   ldlm_iocontrol,
451         o_setup:       ldlm_setup,
452         o_cleanup:     ldlm_cleanup,
453         o_connect:     class_connect,
454         o_disconnect:  class_disconnect
455 };
456
457
458 static int __init ldlm_init(void)
459 {
460         int rc = class_register_type(&ldlm_obd_ops, OBD_LDLM_DEVICENAME);
461         if (rc != 0)
462                 return rc;
463
464         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
465                                                sizeof(struct ldlm_resource), 0,
466                                                SLAB_HWCACHE_ALIGN, NULL, NULL);
467         if (ldlm_resource_slab == NULL)
468                 return -ENOMEM;
469
470         ldlm_lock_slab = kmem_cache_create("ldlm_locks",
471                                            sizeof(struct ldlm_lock), 0,
472                                            SLAB_HWCACHE_ALIGN, NULL, NULL);
473         if (ldlm_lock_slab == NULL) {
474                 kmem_cache_destroy(ldlm_resource_slab);
475                 return -ENOMEM;
476         }
477
478         return 0;
479 }
480
481 static void __exit ldlm_exit(void)
482 {
483         class_unregister_type(OBD_LDLM_DEVICENAME);
484         if (kmem_cache_destroy(ldlm_resource_slab) != 0)
485                 CERROR("couldn't free ldlm resource slab\n");
486         if (kmem_cache_destroy(ldlm_lock_slab) != 0)
487                 CERROR("couldn't free ldlm lock slab\n");
488 }
489
490 EXPORT_SYMBOL(ldlm_lockname);
491 EXPORT_SYMBOL(ldlm_typename);
492 EXPORT_SYMBOL(ldlm_handle2lock);
493 EXPORT_SYMBOL(ldlm_lock_match);
494 EXPORT_SYMBOL(ldlm_lock_addref);
495 EXPORT_SYMBOL(ldlm_lock_decref);
496 EXPORT_SYMBOL(ldlm_cli_convert);
497 EXPORT_SYMBOL(ldlm_cli_enqueue);
498 EXPORT_SYMBOL(ldlm_cli_cancel);
499 EXPORT_SYMBOL(ldlm_match_or_enqueue);
500 EXPORT_SYMBOL(ldlm_test);
501 EXPORT_SYMBOL(ldlm_lock_dump);
502 EXPORT_SYMBOL(ldlm_namespace_new);
503 EXPORT_SYMBOL(ldlm_namespace_free);
504
505 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
506 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
507 MODULE_LICENSE("GPL");
508
509 module_init(ldlm_init);
510 module_exit(ldlm_exit);