Whamcloud - gitweb
f81a7a202dc50f753daeeaf7158ce0e6ae327cfe
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  */
11
12 #define EXPORT_SYMTAB
13 #define DEBUG_SUBSYSTEM S_LDLM
14
15 #include <linux/module.h>
16 #include <linux/slab.h>
17 #include <linux/lustre_dlm.h>
18
19 extern kmem_cache_t *ldlm_resource_slab;
20 extern kmem_cache_t *ldlm_lock_slab;
21 extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
22 extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
23
24 static int ldlm_handle_enqueue(struct obd_device *obddev, struct ptlrpc_service *svc,
25                          struct ptlrpc_request *req)
26 {
27         struct ldlm_reply *dlm_rep;
28         struct ldlm_request *dlm_req;
29         int rc, size = sizeof(*dlm_rep), cookielen = 0;
30         __u32 flags;
31         ldlm_error_t err;
32         struct ldlm_lock *lock = NULL;
33         void *cookie = NULL;
34         ENTRY;
35
36         LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
37
38         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
39         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_MDSINTENT) {
40                 /* In this case, the reply buffer is allocated deep in
41                  * local_lock_enqueue by the policy function. */
42                 cookie = req;
43                 cookielen = sizeof(*req);
44         } else {
45                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
46                                      &req->rq_repmsg);
47                 if (rc) {
48                         CERROR("out of memory\n");
49                         RETURN(-ENOMEM);
50                 }
51                 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) {
52                         cookie = &dlm_req->lock_desc.l_extent;
53                         cookielen = sizeof(struct ldlm_extent);
54                 }
55         }
56
57         lock = ldlm_lock_create(obddev->obd_namespace,
58                                 &dlm_req->lock_handle2,
59                                 dlm_req->lock_desc.l_resource.lr_name,
60                                 dlm_req->lock_desc.l_resource.lr_type,
61                                 dlm_req->lock_desc.l_req_mode, NULL, 0);
62         if (!lock)
63                 GOTO(out, -ENOMEM);
64
65         memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
66                sizeof(lock->l_remote_handle));
67         LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
68
69         flags = dlm_req->lock_flags;
70         err = ldlm_lock_enqueue(lock, cookie, cookielen, &flags,
71                                 ldlm_cli_callback, ldlm_cli_callback);
72         if (err != ELDLM_OK)
73                 GOTO(out, err);
74
75         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
76         dlm_rep->lock_flags = flags;
77
78         ldlm_lock2handle(lock, &dlm_rep->lock_handle);
79         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
80                 memcpy(&dlm_rep->lock_extent, &lock->l_extent,
81                        sizeof(lock->l_extent));
82         if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED)
83                 memcpy(dlm_rep->lock_resource_name, lock->l_resource->lr_name,
84                        sizeof(dlm_rep->lock_resource_name));
85
86         lock->l_connection = ptlrpc_connection_addref(req->rq_connection);
87         EXIT;
88  out:
89         if (lock)
90                 ldlm_lock_put(lock);
91         req->rq_status = err;
92         CDEBUG(D_INFO, "err = %d\n", err);
93
94         if (ptlrpc_reply(svc, req))
95                 LBUG();
96
97         if (err)
98                 LDLM_DEBUG_NOLOCK("server-side enqueue handler END");
99         else {
100                 ldlm_reprocess_all(lock->l_resource);
101                 LDLM_DEBUG(lock, "server-side enqueue handler END");
102         }
103
104         return 0;
105 }
106
107 static int ldlm_handle_convert(struct ptlrpc_service *svc, struct ptlrpc_request *req)
108 {
109         struct ldlm_request *dlm_req;
110         struct ldlm_reply *dlm_rep;
111         struct ldlm_lock *lock;
112         int rc, size = sizeof(*dlm_rep);
113         ENTRY;
114
115         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
116         if (rc) {
117                 CERROR("out of memory\n");
118                 RETURN(-ENOMEM);
119         }
120         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
121         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
122         dlm_rep->lock_flags = dlm_req->lock_flags;
123
124         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
125         if (!lock) { 
126                 req->rq_status = EINVAL;
127         } else {         
128                 LDLM_DEBUG(lock, "server-side convert handler START");
129                 ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
130                                   &dlm_rep->lock_flags);
131                 req->rq_status = 0;
132         }
133         if (ptlrpc_reply(svc, req) != 0)
134                 LBUG();
135
136         ldlm_reprocess_all(lock->l_resource);
137         ldlm_lock_put(lock); 
138         LDLM_DEBUG(lock, "server-side convert handler END");
139
140         RETURN(0);
141 }
142
143 static int ldlm_handle_cancel(struct ptlrpc_service *svc, struct ptlrpc_request *req)
144 {
145         struct ldlm_request *dlm_req;
146         struct ldlm_lock *lock;
147         int rc;
148         ENTRY;
149
150         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
151         if (rc) {
152                 CERROR("out of memory\n");
153                 RETURN(-ENOMEM);
154         }
155         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
156
157         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
158         if (!lock) { 
159                 req->rq_status = ESTALE;
160         } else { 
161                 LDLM_DEBUG(lock, "server-side cancel handler START");
162                 ldlm_lock_cancel(lock);
163                 req->rq_status = 0;
164         }
165
166         if (ptlrpc_reply(svc, req) != 0)
167                 LBUG();
168
169         ldlm_reprocess_all(lock->l_resource);
170         ldlm_lock_put(lock);
171         LDLM_DEBUG_NOLOCK("server-side cancel handler END");
172
173         RETURN(0);
174 }
175
176 static int ldlm_handle_callback(struct ptlrpc_service *svc,
177                           struct ptlrpc_request *req)
178 {
179         struct ldlm_request *dlm_req;
180         struct ldlm_lock_desc desc; 
181         struct ldlm_lock_desc *descp; 
182         struct ldlm_lock *lock;
183         __u64 is_blocking_ast;
184         int rc;
185         ENTRY;
186
187         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
188         if (rc) {
189                 CERROR("out of memory\n");
190                 RETURN(-ENOMEM);
191         }
192         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
193
194         /* We must send the reply first, so that the thread is free to handle
195          * any requests made in common_callback() */
196         rc = ptlrpc_reply(svc, req);
197         if (rc != 0)
198                 RETURN(rc);
199
200         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
201         is_blocking_ast = dlm_req->lock_handle2.addr;
202         if (is_blocking_ast) { 
203                 /* FIXME: copy lock information into desc here */
204                 descp = &desc;
205         }  else 
206                 descp = NULL; 
207
208         if (!lock) { 
209                 CERROR("callback on lock %Lx - lock disappeared\n", 
210                        dlm_req->lock_handle1.addr);
211                 RETURN(0);
212         }
213
214         LDLM_DEBUG(lock, "client %s callback handler START",
215                    is_blocking_ast ? "blocked" : "completion");
216
217         if (is_blocking_ast) {
218                 int do_ast; 
219                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
220                 lock->l_flags |= LDLM_FL_CBPENDING;
221                 do_ast = (!lock->l_readers && !lock->l_writers); 
222                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
223                 
224                 if (do_ast) { 
225                         CDEBUG(D_INFO, "Lock already unused, calling "
226                                "callback (%p).\n", lock->l_blocking_ast);
227                         if (lock->l_blocking_ast != NULL)
228                                 lock->l_blocking_ast(lock, descp, lock->l_data,
229                                                      lock->l_data_len, NULL);
230                 } else {
231                         LDLM_DEBUG(lock, "Lock still has references, will be"
232                                " cancelled later");
233                 }
234                 ldlm_lock_put(lock); 
235         } else {
236                 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
237
238                 /* If we receive the completion AST before the actual enqueue
239                  * returned, then we might need to switch resources. */
240                 if (memcmp(dlm_req->lock_desc.l_resource.lr_name,
241                            lock->l_resource->lr_name,
242                            sizeof(__u64) * RES_NAME_SIZE) != 0) {
243                         ldlm_lock_change_resource(lock, dlm_req->lock_desc.l_resource.lr_name);
244                         LDLM_DEBUG(lock, "completion AST, new resource");
245                 }
246                 ldlm_grant_lock(lock);
247                 /*  FIXME: we want any completion function, not just wake_up */
248                 wake_up(&lock->l_waitq);
249                 ldlm_lock_put(lock);
250         }
251
252         LDLM_DEBUG_NOLOCK("client %s callback handler END (lock: %p)",
253                           is_blocking_ast ? "blocked" : "completion", lock);
254         RETURN(0);
255 }
256
257 static int lustre_handle(struct obd_device *dev, struct ptlrpc_service *svc,
258                        struct ptlrpc_request *req)
259 {
260         struct obd_device *req_dev;
261         int id, rc;
262         ENTRY;
263
264         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
265         if (rc) {
266                 CERROR("lustre_ldlm: Invalid request\n");
267                 GOTO(out, rc);
268         }
269
270         if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
271                 CERROR("lustre_ldlm: wrong packet type sent %d\n",
272                        req->rq_reqmsg->type);
273                 GOTO(out, rc = -EINVAL);
274         }
275
276         id = req->rq_reqmsg->target_id;
277         if (id < 0 || id > MAX_OBD_DEVICES)
278                 GOTO(out, rc = -ENODEV);
279         req_dev = req->rq_obd = &obd_dev[id];
280
281         switch (req->rq_reqmsg->opc) {
282         case LDLM_ENQUEUE:
283                 CDEBUG(D_INODE, "enqueue\n");
284                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
285                 rc = ldlm_handle_enqueue(req_dev, svc, req);
286                 break;
287
288         case LDLM_CONVERT:
289                 CDEBUG(D_INODE, "convert\n");
290                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
291                 rc = ldlm_handle_convert(svc, req);
292                 break;
293
294         case LDLM_CANCEL:
295                 CDEBUG(D_INODE, "cancel\n");
296                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
297                 rc = ldlm_handle_cancel(svc, req);
298                 break;
299
300         case LDLM_CALLBACK:
301                 CDEBUG(D_INODE, "callback\n");
302                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
303                 rc = ldlm_handle_callback(svc, req);
304                 break;
305
306         default:
307                 rc = ptlrpc_error(svc, req);
308                 RETURN(rc);
309         }
310
311         EXIT;
312 out:
313         if (rc)
314                 RETURN(ptlrpc_error(svc, req));
315         return 0;
316 }
317
318 static int ldlm_iocontrol(long cmd, struct obd_conn *conn, int len, void *karg,
319                           void *uarg)
320 {
321         struct obd_device *obddev = conn->oc_dev;
322         struct ptlrpc_connection *connection;
323         int err;
324         ENTRY;
325
326         if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
327             _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
328                 CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
329                        _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
330                 RETURN(-EINVAL);
331         }
332
333         OBD_ALLOC(obddev->u.ldlm.ldlm_client,
334                   sizeof(*obddev->u.ldlm.ldlm_client));
335         ptlrpc_init_client(NULL, NULL,
336                            LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
337                            obddev->u.ldlm.ldlm_client);
338         connection = ptlrpc_uuid_to_connection("ldlm");
339         if (!connection)
340                 CERROR("No LDLM UUID found: assuming ldlm is local.\n");
341
342         switch (cmd) {
343         case IOC_LDLM_TEST: {
344                 err = ldlm_test(obddev, connection);
345                 CERROR("-- done err %d\n", err);
346                 GOTO(out, err);
347         }
348         default:
349                 GOTO(out, err = -EINVAL);
350         }
351
352  out:
353         if (connection)
354                 ptlrpc_put_connection(connection);
355         OBD_FREE(obddev->u.ldlm.ldlm_client,
356                  sizeof(*obddev->u.ldlm.ldlm_client));
357         return err;
358 }
359
360 #define LDLM_NUM_THREADS        8
361
362 static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
363 {
364         struct ldlm_obd *ldlm = &obddev->u.ldlm;
365         int rc;
366         int i;
367         ENTRY;
368
369         MOD_INC_USE_COUNT;
370         ldlm->ldlm_service =
371                 ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL,
372                                 LDLM_REPLY_PORTAL, "self", lustre_handle);
373         if (!ldlm->ldlm_service) {
374                 LBUG();
375                 GOTO(out_dec, rc = -ENOMEM);
376         }
377
378         for (i = 0; i < LDLM_NUM_THREADS; i++) {
379                 rc = ptlrpc_start_thread(obddev, ldlm->ldlm_service,
380                                          "lustre_dlm");
381                 /* XXX We could just continue if we had started at least
382                  *     a few threads here.
383                  */
384                 if (rc) {
385                         CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
386                         LBUG();
387                         GOTO(out_thread, rc);
388                 }
389         }
390
391         RETURN(0);
392
393 out_thread:
394         ptlrpc_stop_all_threads(ldlm->ldlm_service);
395         ptlrpc_unregister_service(ldlm->ldlm_service);
396 out_dec:
397         MOD_DEC_USE_COUNT;
398         return rc;
399 }
400
401 static int ldlm_cleanup(struct obd_device *obddev)
402 {
403         struct ldlm_obd *ldlm = &obddev->u.ldlm;
404         ENTRY;
405
406         ptlrpc_stop_all_threads(ldlm->ldlm_service);
407         ptlrpc_unregister_service(ldlm->ldlm_service);
408
409         if (mds_reint_p != NULL)
410                 inter_module_put("mds_reint");
411         if (mds_getattr_name_p != NULL)
412                 inter_module_put("mds_getattr_name");
413
414         MOD_DEC_USE_COUNT;
415         RETURN(0);
416 }
417
418 struct obd_ops ldlm_obd_ops = {
419         o_iocontrol:   ldlm_iocontrol,
420         o_setup:       ldlm_setup,
421         o_cleanup:     ldlm_cleanup,
422         o_connect:     gen_connect,
423         o_disconnect:  gen_disconnect
424 };
425
426
427 static int __init ldlm_init(void)
428 {
429         int rc = obd_register_type(&ldlm_obd_ops, OBD_LDLM_DEVICENAME);
430         if (rc != 0)
431                 return rc;
432
433         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
434                                                sizeof(struct ldlm_resource), 0,
435                                                SLAB_HWCACHE_ALIGN, NULL, NULL);
436         if (ldlm_resource_slab == NULL)
437                 return -ENOMEM;
438
439         ldlm_lock_slab = kmem_cache_create("ldlm_locks",
440                                            sizeof(struct ldlm_lock), 0,
441                                            SLAB_HWCACHE_ALIGN, NULL, NULL);
442         if (ldlm_lock_slab == NULL) {
443                 kmem_cache_destroy(ldlm_resource_slab);
444                 return -ENOMEM;
445         }
446
447         return 0;
448 }
449
450 static void __exit ldlm_exit(void)
451 {
452         obd_unregister_type(OBD_LDLM_DEVICENAME);
453         if (kmem_cache_destroy(ldlm_resource_slab) != 0)
454                 CERROR("couldn't free ldlm resource slab\n");
455         if (kmem_cache_destroy(ldlm_lock_slab) != 0)
456                 CERROR("couldn't free ldlm lock slab\n");
457 }
458
459 EXPORT_SYMBOL(ldlm_lock_match);
460 EXPORT_SYMBOL(ldlm_lock_addref);
461 EXPORT_SYMBOL(ldlm_lock_decref);
462 EXPORT_SYMBOL(ldlm_cli_convert);
463 EXPORT_SYMBOL(ldlm_cli_enqueue);
464 EXPORT_SYMBOL(ldlm_cli_cancel);
465 EXPORT_SYMBOL(lustre_handle2object);
466 EXPORT_SYMBOL(ldlm_test);
467 EXPORT_SYMBOL(ldlm_lock_dump);
468 EXPORT_SYMBOL(ldlm_namespace_new);
469 EXPORT_SYMBOL(ldlm_namespace_free);
470
471 MODULE_AUTHOR("Cluster File Systems, Inc. <braam@clusterfs.com>");
472 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
473 MODULE_LICENSE("GPL");
474
475 module_init(ldlm_init);
476 module_exit(ldlm_exit);