Whamcloud - gitweb
- move the peter branch changes to the head
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  */
11
12 #define EXPORT_SYMTAB
13 #define DEBUG_SUBSYSTEM S_LDLM
14
15 #include <linux/module.h>
16 #include <linux/slab.h>
17 #include <linux/lustre_dlm.h>
18
19 extern kmem_cache_t *ldlm_resource_slab;
20 extern kmem_cache_t *ldlm_lock_slab;
21 extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
22 extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
23
24 int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags)
25 {
26         struct ldlm_request *body;
27         struct ptlrpc_request *req;
28         struct ptlrpc_client *cl;
29         int rc = 0, size = sizeof(*body);
30         ENTRY;
31
32         if (lock == NULL) {
33                 LBUG();
34                 RETURN(-EINVAL);
35         }
36
37         cl = &lock->l_resource->lr_namespace->ns_rpc_client;
38         req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 1,
39                               &size, NULL);
40         if (!req)
41                 RETURN(-ENOMEM);
42
43         body = lustre_msg_buf(req->rq_reqmsg, 0);
44         memcpy(&body->lock_handle1, &lock->l_remote_handle,
45                sizeof(body->lock_handle1));
46         body->lock_flags = flags;
47
48         LDLM_DEBUG(lock, "server preparing completion AST");
49         req->rq_replen = lustre_msg_size(0, NULL);
50
51         rc = ptlrpc_queue_wait(req);
52         rc = ptlrpc_check_status(req, rc);
53         ptlrpc_free_req(req);
54         RETURN(rc);
55 }
56
57 int ldlm_handle_enqueue(struct ptlrpc_request *req)
58 {
59         struct obd_device *obddev = req->rq_export->exp_obd;
60         struct ldlm_reply *dlm_rep;
61         struct ldlm_request *dlm_req;
62         int rc, size = sizeof(*dlm_rep), cookielen = 0;
63         __u32 flags;
64         ldlm_error_t err;
65         struct ldlm_lock *lock = NULL;
66         void *cookie = NULL;
67         ENTRY;
68
69         LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
70
71         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
72         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_MDSINTENT) {
73                 /* In this case, the reply buffer is allocated deep in
74                  * local_lock_enqueue by the policy function. */
75                 cookie = req;
76                 cookielen = sizeof(*req);
77         } else {
78                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
79                                      &req->rq_repmsg);
80                 if (rc) {
81                         CERROR("out of memory\n");
82                         RETURN(-ENOMEM);
83                 }
84                 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) {
85                         cookie = &dlm_req->lock_desc.l_extent;
86                         cookielen = sizeof(struct ldlm_extent);
87                 }
88         }
89
90         lock = ldlm_lock_create(obddev->obd_namespace,
91                                 &dlm_req->lock_handle2,
92                                 dlm_req->lock_desc.l_resource.lr_name,
93                                 dlm_req->lock_desc.l_resource.lr_type,
94                                 dlm_req->lock_desc.l_req_mode, NULL, 0);
95         if (!lock)
96                 GOTO(out, err = -ENOMEM);
97
98         memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
99                sizeof(lock->l_remote_handle));
100         LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
101
102         flags = dlm_req->lock_flags;
103         err = ldlm_lock_enqueue(lock, cookie, cookielen, &flags,
104                                 ldlm_server_completion_ast, ldlm_server_ast);
105         if (err != ELDLM_OK)
106                 GOTO(out, err);
107
108         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
109         dlm_rep->lock_flags = flags;
110
111         ldlm_lock2handle(lock, &dlm_rep->lock_handle);
112         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
113                 memcpy(&dlm_rep->lock_extent, &lock->l_extent,
114                        sizeof(lock->l_extent));
115         if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED)
116                 memcpy(dlm_rep->lock_resource_name, lock->l_resource->lr_name,
117                        sizeof(dlm_rep->lock_resource_name));
118
119         lock->l_connection = ptlrpc_connection_addref(req->rq_connection);
120         EXIT;
121  out:
122         if (lock)
123                 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
124                            "(err=%d)", err);
125         req->rq_status = err;
126
127         if (ptlrpc_reply(req->rq_svc, req))
128                 LBUG();
129
130         if (lock) {
131                 if (!err)
132                         ldlm_reprocess_all(lock->l_resource);
133                 LDLM_LOCK_PUT(lock);
134         }
135         LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p)", lock);
136
137         return 0;
138 }
139
140 int ldlm_handle_convert(struct ptlrpc_request *req)
141 {
142         struct ldlm_request *dlm_req;
143         struct ldlm_reply *dlm_rep;
144         struct ldlm_lock *lock;
145         int rc, size = sizeof(*dlm_rep);
146         ENTRY;
147
148         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
149         if (rc) {
150                 CERROR("out of memory\n");
151                 RETURN(-ENOMEM);
152         }
153         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
154         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
155         dlm_rep->lock_flags = dlm_req->lock_flags;
156
157         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
158         if (!lock) {
159                 req->rq_status = EINVAL;
160         } else {
161                 LDLM_DEBUG(lock, "server-side convert handler START");
162                 ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
163                                   &dlm_rep->lock_flags);
164                 req->rq_status = 0;
165         }
166         if (ptlrpc_reply(req->rq_svc, req) != 0)
167                 LBUG();
168
169         if (lock) {
170                 ldlm_reprocess_all(lock->l_resource);
171                 LDLM_DEBUG(lock, "server-side convert handler END");
172                 LDLM_LOCK_PUT(lock);
173         } else
174                 LDLM_DEBUG_NOLOCK("server-side convert handler END");
175
176         RETURN(0);
177 }
178
179 int ldlm_handle_cancel(struct ptlrpc_request *req)
180 {
181         struct ldlm_request *dlm_req;
182         struct ldlm_lock *lock;
183         int rc;
184         ENTRY;
185
186         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
187         if (rc) {
188                 CERROR("out of memory\n");
189                 RETURN(-ENOMEM);
190         }
191         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
192
193         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
194         if (!lock) {
195                 LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock (lock %p)",
196                                   (void *)(unsigned long) dlm_req->lock_handle1.addr);
197                 req->rq_status = ESTALE;
198         } else {
199                 LDLM_DEBUG(lock, "server-side cancel handler START");
200                 ldlm_lock_cancel(lock);
201                 req->rq_status = 0;
202         }
203
204         if (ptlrpc_reply(req->rq_svc, req) != 0)
205                 LBUG();
206
207         if (lock) {
208                 ldlm_reprocess_all(lock->l_resource);
209                 LDLM_DEBUG(lock, "server-side cancel handler END");
210                 LDLM_LOCK_PUT(lock);
211         } 
212
213         RETURN(0);
214 }
215
216 static int ldlm_handle_callback(struct ptlrpc_request *req)
217 {
218         struct ldlm_request *dlm_req;
219         struct ldlm_lock_desc *descp = NULL;
220         struct ldlm_lock *lock;
221         __u64 is_blocking_ast = 0;
222         int rc;
223         ENTRY;
224
225         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
226         if (rc) {
227                 CERROR("out of memory\n");
228                 RETURN(-ENOMEM);
229         }
230         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
231
232         /* We must send the reply first, so that the thread is free to handle
233          * any requests made in common_callback() */
234         rc = ptlrpc_reply(req->rq_svc, req);
235         if (rc != 0)
236                 RETURN(rc);
237
238         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
239         if (!lock) {
240                 CERROR("callback on lock %Lx - lock disappeared\n",
241                        dlm_req->lock_handle1.addr);
242                 RETURN(0);
243         }
244
245         /* check if this is a blocking AST */
246         if (dlm_req->lock_desc.l_req_mode !=
247             dlm_req->lock_desc.l_granted_mode) {
248                 descp = &dlm_req->lock_desc;
249                 is_blocking_ast = 1;
250         }
251
252         LDLM_DEBUG(lock, "client %s callback handler START",
253                    is_blocking_ast ? "blocked" : "completion");
254
255         if (descp) {
256                 int do_ast;
257                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
258                 lock->l_flags |= LDLM_FL_CBPENDING;
259                 do_ast = (!lock->l_readers && !lock->l_writers);
260                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
261
262                 if (do_ast) {
263                         LDLM_DEBUG(lock, "already unused, calling "
264                                    "callback (%p)", lock->l_blocking_ast);
265                         if (lock->l_blocking_ast != NULL) {
266                                 struct lustre_handle lockh;
267                                 ldlm_lock2handle(lock, &lockh);
268                                 lock->l_blocking_ast(&lockh, descp,
269                                                      lock->l_data,
270                                                      lock->l_data_len);
271                         }
272                 } else {
273                         LDLM_DEBUG(lock, "Lock still has references, will be"
274                                    " cancelled later");
275                 }
276                 LDLM_LOCK_PUT(lock);
277         } else {
278                 struct list_head ast_list = LIST_HEAD_INIT(ast_list);
279
280                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
281                 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
282
283                 /* If we receive the completion AST before the actual enqueue
284                  * returned, then we might need to switch resources. */
285                 ldlm_resource_unlink_lock(lock);
286                 if (memcmp(dlm_req->lock_desc.l_resource.lr_name,
287                            lock->l_resource->lr_name,
288                            sizeof(__u64) * RES_NAME_SIZE) != 0) {
289                         ldlm_lock_change_resource(lock, dlm_req->lock_desc.l_resource.lr_name);
290                         LDLM_DEBUG(lock, "completion AST, new resource");
291                 }
292                 lock->l_resource->lr_tmp = &ast_list;
293                 ldlm_grant_lock(lock);
294                 lock->l_resource->lr_tmp = NULL;
295                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
296                 LDLM_LOCK_PUT(lock);
297
298                 ldlm_run_ast_work(&ast_list);
299         }
300
301         LDLM_DEBUG_NOLOCK("client %s callback handler END (lock %p)",
302                           is_blocking_ast ? "blocked" : "completion", lock);
303         RETURN(0);
304 }
305
306
307 static int ldlm_callback_handler(struct ptlrpc_request *req)
308 {
309         int rc;
310         ENTRY;
311
312         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
313         if (rc) {
314                 CERROR("lustre_ldlm: Invalid request\n");
315                 GOTO(out, rc);
316         }
317
318         if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
319                 CERROR("lustre_ldlm: wrong packet type sent %d\n",
320                        req->rq_reqmsg->type);
321                 GOTO(out, rc = -EINVAL);
322         }
323         switch (req->rq_reqmsg->opc) {
324         case LDLM_CALLBACK:
325                 CDEBUG(D_INODE, "callback\n");
326                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
327                 rc = ldlm_handle_callback(req);
328                 break;
329
330         default:
331                 rc = ptlrpc_error(req->rq_svc, req);
332                 RETURN(rc);
333         }
334
335         EXIT;
336 out:
337         if (rc)
338                 RETURN(ptlrpc_error(req->rq_svc, req));
339         return 0;
340 }
341
342
343 static int ldlm_iocontrol(long cmd, struct lustre_handle *conn, int len,
344                           void *karg, void *uarg)
345 {
346         struct obd_device *obddev = class_conn2obd(conn);
347         struct ptlrpc_connection *connection;
348         int err;
349         ENTRY;
350
351         if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
352             _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
353                 CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
354                        _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
355                 RETURN(-EINVAL);
356         }
357
358         OBD_ALLOC(obddev->u.ldlm.ldlm_client,
359                   sizeof(*obddev->u.ldlm.ldlm_client));
360         ptlrpc_init_client(NULL, NULL,
361                            LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
362                            obddev->u.ldlm.ldlm_client);
363         connection = ptlrpc_uuid_to_connection("ldlm");
364         if (!connection)
365                 CERROR("No LDLM UUID found: assuming ldlm is local.\n");
366
367         switch (cmd) {
368         case IOC_LDLM_TEST: {
369                 err = ldlm_test(obddev, connection);
370                 CERROR("-- done err %d\n", err);
371                 GOTO(out, err);
372         }
373         default:
374                 GOTO(out, err = -EINVAL);
375         }
376
377  out:
378         if (connection)
379                 ptlrpc_put_connection(connection);
380         OBD_FREE(obddev->u.ldlm.ldlm_client,
381                  sizeof(*obddev->u.ldlm.ldlm_client));
382         return err;
383 }
384
385 #define LDLM_NUM_THREADS        8
386
387 static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
388 {
389         struct ldlm_obd *ldlm = &obddev->u.ldlm;
390         int rc;
391         int i;
392         ENTRY;
393
394         MOD_INC_USE_COUNT;
395         ldlm->ldlm_service =
396                 ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL,
397                                 LDLM_REPLY_PORTAL, "self", ldlm_callback_handler);
398         if (!ldlm->ldlm_service) {
399                 LBUG();
400                 GOTO(out_dec, rc = -ENOMEM);
401         }
402
403         for (i = 0; i < LDLM_NUM_THREADS; i++) {
404                 char name[32];
405                 sprintf(name, "lustre_dlm_%2d", i); 
406                 rc = ptlrpc_start_thread(obddev, ldlm->ldlm_service, name);
407                 if (rc) {
408                         CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
409                         LBUG();
410                         GOTO(out_thread, rc);
411                 }
412         }
413
414         RETURN(0);
415
416 out_thread:
417         ptlrpc_stop_all_threads(ldlm->ldlm_service);
418         ptlrpc_unregister_service(ldlm->ldlm_service);
419 out_dec:
420         MOD_DEC_USE_COUNT;
421         return rc;
422 }
423
424 static int ldlm_cleanup(struct obd_device *obddev)
425 {
426         struct ldlm_obd *ldlm = &obddev->u.ldlm;
427         ENTRY;
428
429         ptlrpc_stop_all_threads(ldlm->ldlm_service);
430         ptlrpc_unregister_service(ldlm->ldlm_service);
431
432         MOD_DEC_USE_COUNT;
433         RETURN(0);
434 }
435
436 struct obd_ops ldlm_obd_ops = {
437         o_iocontrol:   ldlm_iocontrol,
438         o_setup:       ldlm_setup,
439         o_cleanup:     ldlm_cleanup,
440         o_connect:     class_connect,
441         o_disconnect:  class_disconnect
442 };
443
444
445 static int __init ldlm_init(void)
446 {
447         int rc = class_register_type(&ldlm_obd_ops, OBD_LDLM_DEVICENAME);
448         if (rc != 0)
449                 return rc;
450
451         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
452                                                sizeof(struct ldlm_resource), 0,
453                                                SLAB_HWCACHE_ALIGN, NULL, NULL);
454         if (ldlm_resource_slab == NULL)
455                 return -ENOMEM;
456
457         ldlm_lock_slab = kmem_cache_create("ldlm_locks",
458                                            sizeof(struct ldlm_lock), 0,
459                                            SLAB_HWCACHE_ALIGN, NULL, NULL);
460         if (ldlm_lock_slab == NULL) {
461                 kmem_cache_destroy(ldlm_resource_slab);
462                 return -ENOMEM;
463         }
464
465         return 0;
466 }
467
468 static void __exit ldlm_exit(void)
469 {
470         class_unregister_type(OBD_LDLM_DEVICENAME);
471         if (kmem_cache_destroy(ldlm_resource_slab) != 0)
472                 CERROR("couldn't free ldlm resource slab\n");
473         if (kmem_cache_destroy(ldlm_lock_slab) != 0)
474                 CERROR("couldn't free ldlm lock slab\n");
475 }
476
477 EXPORT_SYMBOL(ldlm_completion_ast);
478 EXPORT_SYMBOL(ldlm_handle_enqueue);
479 EXPORT_SYMBOL(ldlm_handle_cancel);
480 EXPORT_SYMBOL(ldlm_handle_convert);
481 EXPORT_SYMBOL(ldlm_register_intent);
482 EXPORT_SYMBOL(ldlm_unregister_intent); 
483 EXPORT_SYMBOL(ldlm_lockname);
484 EXPORT_SYMBOL(ldlm_typename);
485 EXPORT_SYMBOL(ldlm_handle2lock);
486 EXPORT_SYMBOL(ldlm_lock_match);
487 EXPORT_SYMBOL(ldlm_lock_addref);
488 EXPORT_SYMBOL(ldlm_lock_decref);
489 EXPORT_SYMBOL(ldlm_lock_change_resource);
490 EXPORT_SYMBOL(ldlm_cli_convert);
491 EXPORT_SYMBOL(ldlm_cli_enqueue);
492 EXPORT_SYMBOL(ldlm_cli_cancel);
493 EXPORT_SYMBOL(ldlm_match_or_enqueue);
494 EXPORT_SYMBOL(ldlm_it2str);
495 EXPORT_SYMBOL(ldlm_test);
496 EXPORT_SYMBOL(ldlm_lock_dump);
497 EXPORT_SYMBOL(ldlm_namespace_new);
498 EXPORT_SYMBOL(ldlm_namespace_free);
499
500 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
501 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
502 MODULE_LICENSE("GPL");
503
504 module_init(ldlm_init);
505 module_exit(ldlm_exit);