Whamcloud - gitweb
2102e454f670d20ce8126278b572b1671f3c2a20
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  */
11
12 #define EXPORT_SYMTAB
13 #define DEBUG_SUBSYSTEM S_LDLM
14
15 #include <linux/module.h>
16 #include <linux/slab.h>
17 #include <linux/lustre_dlm.h>
18
19 extern kmem_cache_t *ldlm_resource_slab;
20 extern kmem_cache_t *ldlm_lock_slab;
21 extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
22 extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
23
24 static int ldlm_handle_enqueue(struct obd_device *obddev,
25                                struct ptlrpc_service *svc,
26                                struct ptlrpc_request *req)
27 {
28         struct ldlm_reply *dlm_rep;
29         struct ldlm_request *dlm_req;
30         int rc, size = sizeof(*dlm_rep), cookielen = 0;
31         __u32 flags;
32         ldlm_error_t err;
33         struct ldlm_lock *lock = NULL;
34         void *cookie = NULL;
35         ENTRY;
36
37         LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
38
39         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
40         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_MDSINTENT) {
41                 /* In this case, the reply buffer is allocated deep in
42                  * local_lock_enqueue by the policy function. */
43                 cookie = req;
44                 cookielen = sizeof(*req);
45         } else {
46                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
47                                      &req->rq_repmsg);
48                 if (rc) {
49                         CERROR("out of memory\n");
50                         RETURN(-ENOMEM);
51                 }
52                 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) {
53                         cookie = &dlm_req->lock_desc.l_extent;
54                         cookielen = sizeof(struct ldlm_extent);
55                 }
56         }
57
58         lock = ldlm_lock_create(obddev->obd_namespace,
59                                 &dlm_req->lock_handle2,
60                                 dlm_req->lock_desc.l_resource.lr_name,
61                                 dlm_req->lock_desc.l_resource.lr_type,
62                                 dlm_req->lock_desc.l_req_mode, NULL, 0);
63         if (!lock)
64                 GOTO(out, err = -ENOMEM);
65
66         memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
67                sizeof(lock->l_remote_handle));
68         LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
69
70         flags = dlm_req->lock_flags;
71         err = ldlm_lock_enqueue(lock, cookie, cookielen, &flags,
72                                 ldlm_server_ast, ldlm_server_ast);
73         if (err != ELDLM_OK)
74                 GOTO(out, err);
75
76         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
77         dlm_rep->lock_flags = flags;
78
79         ldlm_lock2handle(lock, &dlm_rep->lock_handle);
80         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
81                 memcpy(&dlm_rep->lock_extent, &lock->l_extent,
82                        sizeof(lock->l_extent));
83         if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED)
84                 memcpy(dlm_rep->lock_resource_name, lock->l_resource->lr_name,
85                        sizeof(dlm_rep->lock_resource_name));
86
87         lock->l_connection = ptlrpc_connection_addref(req->rq_connection);
88         EXIT;
89  out:
90         if (lock)
91                 ldlm_lock_put(lock);
92         req->rq_status = err;
93         CDEBUG(D_INFO, "err = %d\n", err);
94
95         if (ptlrpc_reply(svc, req))
96                 LBUG();
97
98         if (err)
99                 LDLM_DEBUG_NOLOCK("server-side enqueue handler END");
100         else {
101                 ldlm_reprocess_all(lock->l_resource);
102                 LDLM_DEBUG(lock, "server-side enqueue handler END");
103         }
104
105         return 0;
106 }
107
108 static int ldlm_handle_convert(struct ptlrpc_service *svc,
109                                struct ptlrpc_request *req)
110 {
111         struct ldlm_request *dlm_req;
112         struct ldlm_reply *dlm_rep;
113         struct ldlm_lock *lock;
114         int rc, size = sizeof(*dlm_rep);
115         ENTRY;
116
117         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
118         if (rc) {
119                 CERROR("out of memory\n");
120                 RETURN(-ENOMEM);
121         }
122         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
123         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
124         dlm_rep->lock_flags = dlm_req->lock_flags;
125
126         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
127         if (!lock) { 
128                 req->rq_status = EINVAL;
129         } else {         
130                 LDLM_DEBUG(lock, "server-side convert handler START");
131                 ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
132                                   &dlm_rep->lock_flags);
133                 req->rq_status = 0;
134         }
135         if (ptlrpc_reply(svc, req) != 0)
136                 LBUG();
137
138         ldlm_reprocess_all(lock->l_resource);
139         ldlm_lock_put(lock); 
140         LDLM_DEBUG(lock, "server-side convert handler END");
141
142         RETURN(0);
143 }
144
145 static int ldlm_handle_cancel(struct ptlrpc_service *svc,
146                               struct ptlrpc_request *req)
147 {
148         struct ldlm_request *dlm_req;
149         struct ldlm_lock *lock;
150         int rc;
151         ENTRY;
152
153         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
154         if (rc) {
155                 CERROR("out of memory\n");
156                 RETURN(-ENOMEM);
157         }
158         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
159
160         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
161         if (!lock) { 
162                 req->rq_status = ESTALE;
163         } else { 
164                 LDLM_DEBUG(lock, "server-side cancel handler START");
165                 ldlm_lock_cancel(lock);
166                 req->rq_status = 0;
167         }
168
169         if (ptlrpc_reply(svc, req) != 0)
170                 LBUG();
171
172         ldlm_reprocess_all(lock->l_resource);
173         ldlm_lock_put(lock);
174         LDLM_DEBUG_NOLOCK("server-side cancel handler END");
175
176         RETURN(0);
177 }
178
179 static int ldlm_handle_callback(struct ptlrpc_service *svc,
180                                 struct ptlrpc_request *req)
181 {
182         struct ldlm_request *dlm_req;
183         struct ldlm_lock_desc *descp = NULL; 
184         struct ldlm_lock *lock;
185         __u64 is_blocking_ast = 0;
186         int rc;
187         ENTRY;
188
189         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
190         if (rc) {
191                 CERROR("out of memory\n");
192                 RETURN(-ENOMEM);
193         }
194         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
195
196         /* We must send the reply first, so that the thread is free to handle
197          * any requests made in common_callback() */
198         rc = ptlrpc_reply(svc, req);
199         if (rc != 0)
200                 RETURN(rc);
201         
202         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
203         if (!lock) { 
204                 CERROR("callback on lock %Lx - lock disappeared\n", 
205                        dlm_req->lock_handle1.addr);
206                 RETURN(0);
207         }
208
209         /* check if this is a blocking AST */ 
210         if (dlm_req->lock_desc.l_req_mode !=
211             dlm_req->lock_desc.l_granted_mode) {
212                 descp = &dlm_req->lock_desc;
213                 is_blocking_ast = 1;
214         }
215
216         LDLM_DEBUG(lock, "client %s callback handler START",
217                    is_blocking_ast ? "blocked" : "completion");
218
219         if (descp) {
220                 int do_ast; 
221                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
222                 lock->l_flags |= LDLM_FL_CBPENDING;
223                 do_ast = (!lock->l_readers && !lock->l_writers); 
224                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
225                 
226                 if (do_ast) {
227                         CDEBUG(D_INFO, "Lock already unused, calling "
228                                "callback (%p).\n", lock->l_blocking_ast);
229                         if (lock->l_blocking_ast != NULL) {
230                                 struct lustre_handle lockh;
231                                 ldlm_lock2handle(lock, &lockh);
232                                 lock->l_blocking_ast(&lockh, descp,
233                                                      lock->l_data,
234                                                      lock->l_data_len);
235                         }
236                 } else {
237                         LDLM_DEBUG(lock, "Lock still has references, will be"
238                                " cancelled later");
239                 }
240                 ldlm_lock_put(lock); 
241         } else {
242                 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
243
244                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
245                 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
246
247                 /* If we receive the completion AST before the actual enqueue
248                  * returned, then we might need to switch resources. */
249                 if (memcmp(dlm_req->lock_desc.l_resource.lr_name,
250                            lock->l_resource->lr_name,
251                            sizeof(__u64) * RES_NAME_SIZE) != 0) {
252                         ldlm_lock_change_resource(lock, dlm_req->lock_desc.l_resource.lr_name);
253                         LDLM_DEBUG(lock, "completion AST, new resource");
254                 }
255                 lock->l_resource->lr_tmp = &rpc_list;
256                 ldlm_resource_unlink_lock(lock);
257                 ldlm_grant_lock(lock);
258                 /*  FIXME: we want any completion function, not just wake_up */
259                 wake_up(&lock->l_waitq);
260                 ldlm_lock_put(lock);
261                 lock->l_resource->lr_tmp = NULL;
262                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
263
264                 ldlm_run_ast_work(&rpc_list);
265         }
266
267         LDLM_DEBUG_NOLOCK("client %s callback handler END (lock: %p)",
268                           is_blocking_ast ? "blocked" : "completion", lock);
269         RETURN(0);
270 }
271
272 static int lustre_handle(struct obd_device *dev, struct ptlrpc_service *svc,
273                          struct ptlrpc_request *req)
274 {
275         struct obd_device *req_dev;
276         int id, rc;
277         ENTRY;
278
279         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
280         if (rc) {
281                 CERROR("lustre_ldlm: Invalid request\n");
282                 GOTO(out, rc);
283         }
284
285         if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
286                 CERROR("lustre_ldlm: wrong packet type sent %d\n",
287                        req->rq_reqmsg->type);
288                 GOTO(out, rc = -EINVAL);
289         }
290
291         id = req->rq_reqmsg->target_id;
292         if (id < 0 || id > MAX_OBD_DEVICES)
293                 GOTO(out, rc = -ENODEV);
294         req_dev = req->rq_obd = &obd_dev[id];
295
296         switch (req->rq_reqmsg->opc) {
297         case LDLM_ENQUEUE:
298                 CDEBUG(D_INODE, "enqueue\n");
299                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
300                 rc = ldlm_handle_enqueue(req_dev, svc, req);
301                 break;
302
303         case LDLM_CONVERT:
304                 CDEBUG(D_INODE, "convert\n");
305                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
306                 rc = ldlm_handle_convert(svc, req);
307                 break;
308
309         case LDLM_CANCEL:
310                 CDEBUG(D_INODE, "cancel\n");
311                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
312                 rc = ldlm_handle_cancel(svc, req);
313                 break;
314
315         case LDLM_CALLBACK:
316                 CDEBUG(D_INODE, "callback\n");
317                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
318                 rc = ldlm_handle_callback(svc, req);
319                 break;
320
321         default:
322                 rc = ptlrpc_error(svc, req);
323                 RETURN(rc);
324         }
325
326         EXIT;
327 out:
328         if (rc)
329                 RETURN(ptlrpc_error(svc, req));
330         return 0;
331 }
332
333 static int ldlm_iocontrol(long cmd, struct obd_conn *conn, int len, void *karg,
334                           void *uarg)
335 {
336         struct obd_device *obddev = conn->oc_dev;
337         struct ptlrpc_connection *connection;
338         int err;
339         ENTRY;
340
341         if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
342             _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
343                 CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
344                        _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
345                 RETURN(-EINVAL);
346         }
347
348         OBD_ALLOC(obddev->u.ldlm.ldlm_client,
349                   sizeof(*obddev->u.ldlm.ldlm_client));
350         ptlrpc_init_client(NULL, NULL,
351                            LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
352                            obddev->u.ldlm.ldlm_client);
353         connection = ptlrpc_uuid_to_connection("ldlm");
354         if (!connection)
355                 CERROR("No LDLM UUID found: assuming ldlm is local.\n");
356
357         switch (cmd) {
358         case IOC_LDLM_TEST: {
359                 err = ldlm_test(obddev, connection);
360                 CERROR("-- done err %d\n", err);
361                 GOTO(out, err);
362         }
363         default:
364                 GOTO(out, err = -EINVAL);
365         }
366
367  out:
368         if (connection)
369                 ptlrpc_put_connection(connection);
370         OBD_FREE(obddev->u.ldlm.ldlm_client,
371                  sizeof(*obddev->u.ldlm.ldlm_client));
372         return err;
373 }
374
375 #define LDLM_NUM_THREADS        8
376
377 static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
378 {
379         struct ldlm_obd *ldlm = &obddev->u.ldlm;
380         int rc;
381         int i;
382         ENTRY;
383
384         MOD_INC_USE_COUNT;
385         ldlm->ldlm_service =
386                 ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL,
387                                 LDLM_REPLY_PORTAL, "self", lustre_handle);
388         if (!ldlm->ldlm_service) {
389                 LBUG();
390                 GOTO(out_dec, rc = -ENOMEM);
391         }
392
393         for (i = 0; i < LDLM_NUM_THREADS; i++) {
394                 rc = ptlrpc_start_thread(obddev, ldlm->ldlm_service,
395                                          "lustre_dlm");
396                 /* XXX We could just continue if we had started at least
397                  *     a few threads here.
398                  */
399                 if (rc) {
400                         CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
401                         LBUG();
402                         GOTO(out_thread, rc);
403                 }
404         }
405
406         RETURN(0);
407
408 out_thread:
409         ptlrpc_stop_all_threads(ldlm->ldlm_service);
410         ptlrpc_unregister_service(ldlm->ldlm_service);
411 out_dec:
412         MOD_DEC_USE_COUNT;
413         return rc;
414 }
415
416 static int ldlm_cleanup(struct obd_device *obddev)
417 {
418         struct ldlm_obd *ldlm = &obddev->u.ldlm;
419         ENTRY;
420
421         ptlrpc_stop_all_threads(ldlm->ldlm_service);
422         ptlrpc_unregister_service(ldlm->ldlm_service);
423
424         if (mds_reint_p != NULL)
425                 inter_module_put("mds_reint");
426         if (mds_getattr_name_p != NULL)
427                 inter_module_put("mds_getattr_name");
428
429         MOD_DEC_USE_COUNT;
430         RETURN(0);
431 }
432
433 struct obd_ops ldlm_obd_ops = {
434         o_iocontrol:   ldlm_iocontrol,
435         o_setup:       ldlm_setup,
436         o_cleanup:     ldlm_cleanup,
437         o_connect:     gen_connect,
438         o_disconnect:  gen_disconnect
439 };
440
441
442 static int __init ldlm_init(void)
443 {
444         int rc = obd_register_type(&ldlm_obd_ops, OBD_LDLM_DEVICENAME);
445         if (rc != 0)
446                 return rc;
447
448         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
449                                                sizeof(struct ldlm_resource), 0,
450                                                SLAB_HWCACHE_ALIGN, NULL, NULL);
451         if (ldlm_resource_slab == NULL)
452                 return -ENOMEM;
453
454         ldlm_lock_slab = kmem_cache_create("ldlm_locks",
455                                            sizeof(struct ldlm_lock), 0,
456                                            SLAB_HWCACHE_ALIGN, NULL, NULL);
457         if (ldlm_lock_slab == NULL) {
458                 kmem_cache_destroy(ldlm_resource_slab);
459                 return -ENOMEM;
460         }
461
462         return 0;
463 }
464
465 static void __exit ldlm_exit(void)
466 {
467         obd_unregister_type(OBD_LDLM_DEVICENAME);
468         if (kmem_cache_destroy(ldlm_resource_slab) != 0)
469                 CERROR("couldn't free ldlm resource slab\n");
470         if (kmem_cache_destroy(ldlm_lock_slab) != 0)
471                 CERROR("couldn't free ldlm lock slab\n");
472 }
473
474 EXPORT_SYMBOL(ldlm_lockname);
475 EXPORT_SYMBOL(ldlm_typename);
476 EXPORT_SYMBOL(ldlm_handle2lock);
477 EXPORT_SYMBOL(ldlm_lock_match);
478 EXPORT_SYMBOL(ldlm_lock_addref);
479 EXPORT_SYMBOL(ldlm_lock_decref);
480 EXPORT_SYMBOL(ldlm_cli_convert);
481 EXPORT_SYMBOL(ldlm_cli_enqueue);
482 EXPORT_SYMBOL(ldlm_cli_cancel);
483 EXPORT_SYMBOL(ldlm_test);
484 EXPORT_SYMBOL(ldlm_lock_dump);
485 EXPORT_SYMBOL(ldlm_namespace_new);
486 EXPORT_SYMBOL(ldlm_namespace_free);
487
488 MODULE_AUTHOR("Cluster File Systems, Inc. <braam@clusterfs.com>");
489 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
490 MODULE_LICENSE("GPL");
491
492 module_init(ldlm_init);
493 module_exit(ldlm_exit);