Whamcloud - gitweb
Change module data to use "info@clusterfs.com" email per Peter.
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  */
11
12 #define EXPORT_SYMTAB
13 #define DEBUG_SUBSYSTEM S_LDLM
14
15 #include <linux/module.h>
16 #include <linux/slab.h>
17 #include <linux/lustre_dlm.h>
18
19 extern kmem_cache_t *ldlm_resource_slab;
20 extern kmem_cache_t *ldlm_lock_slab;
21 extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
22 extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
23
24 static int ldlm_handle_enqueue(struct ptlrpc_request *req)
25 {
26         struct obd_device *obddev = req->rq_export->export_obd;
27         struct ldlm_reply *dlm_rep;
28         struct ldlm_request *dlm_req;
29         int rc, size = sizeof(*dlm_rep), cookielen = 0;
30         __u32 flags;
31         ldlm_error_t err;
32         struct ldlm_lock *lock = NULL;
33         void *cookie = NULL;
34         ENTRY;
35
36         LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
37
38         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
39         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_MDSINTENT) {
40                 /* In this case, the reply buffer is allocated deep in
41                  * local_lock_enqueue by the policy function. */
42                 cookie = req;
43                 cookielen = sizeof(*req);
44         } else {
45                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
46                                      &req->rq_repmsg);
47                 if (rc) {
48                         CERROR("out of memory\n");
49                         RETURN(-ENOMEM);
50                 }
51                 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) {
52                         cookie = &dlm_req->lock_desc.l_extent;
53                         cookielen = sizeof(struct ldlm_extent);
54                 }
55         }
56
57         lock = ldlm_lock_create(obddev->obd_namespace,
58                                 &dlm_req->lock_handle2,
59                                 dlm_req->lock_desc.l_resource.lr_name,
60                                 dlm_req->lock_desc.l_resource.lr_type,
61                                 dlm_req->lock_desc.l_req_mode, NULL, 0);
62         if (!lock)
63                 GOTO(out, err = -ENOMEM);
64
65         memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
66                sizeof(lock->l_remote_handle));
67         LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
68
69         flags = dlm_req->lock_flags;
70         err = ldlm_lock_enqueue(lock, cookie, cookielen, &flags,
71                                 ldlm_server_ast, ldlm_server_ast);
72         if (err != ELDLM_OK)
73                 GOTO(out, err);
74
75         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
76         dlm_rep->lock_flags = flags;
77
78         ldlm_lock2handle(lock, &dlm_rep->lock_handle);
79         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
80                 memcpy(&dlm_rep->lock_extent, &lock->l_extent,
81                        sizeof(lock->l_extent));
82         if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED)
83                 memcpy(dlm_rep->lock_resource_name, lock->l_resource->lr_name,
84                        sizeof(dlm_rep->lock_resource_name));
85
86         lock->l_connection = ptlrpc_connection_addref(req->rq_connection);
87         EXIT;
88  out:
89         if (lock) {
90                 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply");
91                 ldlm_lock_put(lock);
92         }
93         req->rq_status = err;
94         CDEBUG(D_INFO, "err = %d\n", err);
95
96         if (ptlrpc_reply(req->rq_svc, req))
97                 LBUG();
98
99         if (!err)
100                 ldlm_reprocess_all(lock->l_resource);
101         LDLM_DEBUG_NOLOCK("server-side enqueue handler END");
102
103         return 0;
104 }
105
106 static int ldlm_handle_convert(struct ptlrpc_request *req)
107 {
108         struct ldlm_request *dlm_req;
109         struct ldlm_reply *dlm_rep;
110         struct ldlm_lock *lock;
111         int rc, size = sizeof(*dlm_rep);
112         ENTRY;
113
114         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
115         if (rc) {
116                 CERROR("out of memory\n");
117                 RETURN(-ENOMEM);
118         }
119         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
120         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
121         dlm_rep->lock_flags = dlm_req->lock_flags;
122
123         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
124         if (!lock) { 
125                 req->rq_status = EINVAL;
126         } else {         
127                 LDLM_DEBUG(lock, "server-side convert handler START");
128                 ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
129                                   &dlm_rep->lock_flags);
130                 req->rq_status = 0;
131         }
132         if (ptlrpc_reply(req->rq_svc, req) != 0)
133                 LBUG();
134
135         ldlm_reprocess_all(lock->l_resource);
136         ldlm_lock_put(lock); 
137         LDLM_DEBUG(lock, "server-side convert handler END");
138
139         RETURN(0);
140 }
141
142 static int ldlm_handle_cancel(struct ptlrpc_request *req)
143 {
144         struct ldlm_request *dlm_req;
145         struct ldlm_lock *lock;
146         int rc;
147         ENTRY;
148
149         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
150         if (rc) {
151                 CERROR("out of memory\n");
152                 RETURN(-ENOMEM);
153         }
154         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
155
156         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
157         if (!lock) { 
158                 req->rq_status = ESTALE;
159         } else { 
160                 LDLM_DEBUG(lock, "server-side cancel handler START");
161                 ldlm_lock_cancel(lock);
162                 req->rq_status = 0;
163         }
164
165         if (ptlrpc_reply(req->rq_svc, req) != 0)
166                 LBUG();
167
168         ldlm_reprocess_all(lock->l_resource);
169         ldlm_lock_put(lock);
170         LDLM_DEBUG_NOLOCK("server-side cancel handler END");
171
172         RETURN(0);
173 }
174
175 static int ldlm_handle_callback(struct ptlrpc_request *req)
176 {
177         struct ldlm_request *dlm_req;
178         struct ldlm_lock_desc *descp = NULL; 
179         struct ldlm_lock *lock;
180         __u64 is_blocking_ast = 0;
181         int rc;
182         ENTRY;
183
184         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
185         if (rc) {
186                 CERROR("out of memory\n");
187                 RETURN(-ENOMEM);
188         }
189         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
190
191         /* We must send the reply first, so that the thread is free to handle
192          * any requests made in common_callback() */
193         rc = ptlrpc_reply(req->rq_svc, req);
194         if (rc != 0)
195                 RETURN(rc);
196         
197         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
198         if (!lock) { 
199                 CERROR("callback on lock %Lx - lock disappeared\n", 
200                        dlm_req->lock_handle1.addr);
201                 RETURN(0);
202         }
203
204         /* check if this is a blocking AST */ 
205         if (dlm_req->lock_desc.l_req_mode !=
206             dlm_req->lock_desc.l_granted_mode) {
207                 descp = &dlm_req->lock_desc;
208                 is_blocking_ast = 1;
209         }
210
211         LDLM_DEBUG(lock, "client %s callback handler START",
212                    is_blocking_ast ? "blocked" : "completion");
213
214         if (descp) {
215                 int do_ast; 
216                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
217                 lock->l_flags |= LDLM_FL_CBPENDING;
218                 do_ast = (!lock->l_readers && !lock->l_writers); 
219                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
220                 
221                 if (do_ast) {
222                         LDLM_DEBUG(lock, "already unused, calling "
223                                    "callback (%p)", lock->l_blocking_ast);
224                         if (lock->l_blocking_ast != NULL) {
225                                 struct lustre_handle lockh;
226                                 ldlm_lock2handle(lock, &lockh);
227                                 lock->l_blocking_ast(&lockh, descp,
228                                                      lock->l_data,
229                                                      lock->l_data_len);
230                         }
231                 } else {
232                         LDLM_DEBUG(lock, "Lock still has references, will be"
233                                " cancelled later");
234                 }
235                 ldlm_lock_put(lock); 
236         } else {
237                 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
238
239                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
240                 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
241
242                 /* If we receive the completion AST before the actual enqueue
243                  * returned, then we might need to switch resources. */
244                 if (memcmp(dlm_req->lock_desc.l_resource.lr_name,
245                            lock->l_resource->lr_name,
246                            sizeof(__u64) * RES_NAME_SIZE) != 0) {
247                         ldlm_lock_change_resource(lock, dlm_req->lock_desc.l_resource.lr_name);
248                         LDLM_DEBUG(lock, "completion AST, new resource");
249                 }
250                 lock->l_resource->lr_tmp = &rpc_list;
251                 ldlm_resource_unlink_lock(lock);
252                 ldlm_grant_lock(lock);
253                 /*  FIXME: we want any completion function, not just wake_up */
254                 wake_up(&lock->l_waitq);
255                 lock->l_resource->lr_tmp = NULL;
256                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
257                 ldlm_lock_put(lock);
258
259                 ldlm_run_ast_work(&rpc_list);
260         }
261
262         LDLM_DEBUG_NOLOCK("client %s callback handler END (lock: %p)",
263                           is_blocking_ast ? "blocked" : "completion", lock);
264         RETURN(0);
265 }
266
267 static int ldlm_handle(struct ptlrpc_request *req)
268 {
269         int rc;
270         ENTRY;
271
272         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
273         if (rc) {
274                 CERROR("lustre_ldlm: Invalid request\n");
275                 GOTO(out, rc);
276         }
277
278         if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
279                 CERROR("lustre_ldlm: wrong packet type sent %d\n",
280                        req->rq_reqmsg->type);
281                 GOTO(out, rc = -EINVAL);
282         }
283
284         if (!req->rq_export && 
285             req->rq_reqmsg->opc == LDLM_ENQUEUE) { 
286                 CERROR("No export handle for enqueue request.\n");
287                 GOTO(out, rc = -ENOTCONN);
288         }
289         switch (req->rq_reqmsg->opc) {
290         case LDLM_ENQUEUE:
291                 CDEBUG(D_INODE, "enqueue\n");
292                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
293                 rc = ldlm_handle_enqueue(req);
294                 break;
295
296         case LDLM_CONVERT:
297                 CDEBUG(D_INODE, "convert\n");
298                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
299                 rc = ldlm_handle_convert(req);
300                 break;
301
302         case LDLM_CANCEL:
303                 CDEBUG(D_INODE, "cancel\n");
304                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
305                 rc = ldlm_handle_cancel(req);
306                 break;
307
308         case LDLM_CALLBACK:
309                 CDEBUG(D_INODE, "callback\n");
310                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
311                 rc = ldlm_handle_callback(req);
312                 break;
313
314         default:
315                 rc = ptlrpc_error(req->rq_svc, req);
316                 RETURN(rc);
317         }
318
319         EXIT;
320 out:
321         if (rc)
322                 RETURN(ptlrpc_error(req->rq_svc, req));
323         return 0;
324 }
325
326 static int ldlm_iocontrol(long cmd, struct lustre_handle *conn, int len, void *karg,
327                           void *uarg)
328 {
329         struct obd_device *obddev = class_conn2obd(conn);
330         struct ptlrpc_connection *connection;
331         int err;
332         ENTRY;
333
334         if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
335             _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
336                 CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
337                        _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
338                 RETURN(-EINVAL);
339         }
340
341         OBD_ALLOC(obddev->u.ldlm.ldlm_client,
342                   sizeof(*obddev->u.ldlm.ldlm_client));
343         ptlrpc_init_client(NULL, NULL,
344                            LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
345                            obddev->u.ldlm.ldlm_client);
346         connection = ptlrpc_uuid_to_connection("ldlm");
347         if (!connection)
348                 CERROR("No LDLM UUID found: assuming ldlm is local.\n");
349
350         switch (cmd) {
351         case IOC_LDLM_TEST: {
352                 err = ldlm_test(obddev, connection);
353                 CERROR("-- done err %d\n", err);
354                 GOTO(out, err);
355         }
356         default:
357                 GOTO(out, err = -EINVAL);
358         }
359
360  out:
361         if (connection)
362                 ptlrpc_put_connection(connection);
363         OBD_FREE(obddev->u.ldlm.ldlm_client,
364                  sizeof(*obddev->u.ldlm.ldlm_client));
365         return err;
366 }
367
368 #define LDLM_NUM_THREADS        8
369
370 static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
371 {
372         struct ldlm_obd *ldlm = &obddev->u.ldlm;
373         int rc;
374         int i;
375         ENTRY;
376
377         MOD_INC_USE_COUNT;
378         ldlm->ldlm_service =
379                 ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL,
380                                 LDLM_REPLY_PORTAL, "self", ldlm_handle);
381         if (!ldlm->ldlm_service) {
382                 LBUG();
383                 GOTO(out_dec, rc = -ENOMEM);
384         }
385
386         for (i = 0; i < LDLM_NUM_THREADS; i++) {
387                 rc = ptlrpc_start_thread(obddev, ldlm->ldlm_service,
388                                          "lustre_dlm");
389                 if (rc) {
390                         CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
391                         LBUG();
392                         GOTO(out_thread, rc);
393                 }
394         }
395
396         RETURN(0);
397
398 out_thread:
399         ptlrpc_stop_all_threads(ldlm->ldlm_service);
400         ptlrpc_unregister_service(ldlm->ldlm_service);
401 out_dec:
402         MOD_DEC_USE_COUNT;
403         return rc;
404 }
405
406 static int ldlm_cleanup(struct obd_device *obddev)
407 {
408         struct ldlm_obd *ldlm = &obddev->u.ldlm;
409         ENTRY;
410
411         ptlrpc_stop_all_threads(ldlm->ldlm_service);
412         ptlrpc_unregister_service(ldlm->ldlm_service);
413
414         if (mds_reint_p != NULL)
415                 inter_module_put("mds_reint");
416         if (mds_getattr_name_p != NULL)
417                 inter_module_put("mds_getattr_name");
418
419         MOD_DEC_USE_COUNT;
420         RETURN(0);
421 }
422
423 struct obd_ops ldlm_obd_ops = {
424         o_iocontrol:   ldlm_iocontrol,
425         o_setup:       ldlm_setup,
426         o_cleanup:     ldlm_cleanup,
427         o_connect:     class_connect,
428         o_disconnect:  class_disconnect
429 };
430
431
432 static int __init ldlm_init(void)
433 {
434         int rc = class_register_type(&ldlm_obd_ops, OBD_LDLM_DEVICENAME);
435         if (rc != 0)
436                 return rc;
437
438         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
439                                                sizeof(struct ldlm_resource), 0,
440                                                SLAB_HWCACHE_ALIGN, NULL, NULL);
441         if (ldlm_resource_slab == NULL)
442                 return -ENOMEM;
443
444         ldlm_lock_slab = kmem_cache_create("ldlm_locks",
445                                            sizeof(struct ldlm_lock), 0,
446                                            SLAB_HWCACHE_ALIGN, NULL, NULL);
447         if (ldlm_lock_slab == NULL) {
448                 kmem_cache_destroy(ldlm_resource_slab);
449                 return -ENOMEM;
450         }
451
452         return 0;
453 }
454
455 static void __exit ldlm_exit(void)
456 {
457         class_unregister_type(OBD_LDLM_DEVICENAME);
458         if (kmem_cache_destroy(ldlm_resource_slab) != 0)
459                 CERROR("couldn't free ldlm resource slab\n");
460         if (kmem_cache_destroy(ldlm_lock_slab) != 0)
461                 CERROR("couldn't free ldlm lock slab\n");
462 }
463
464 EXPORT_SYMBOL(ldlm_lockname);
465 EXPORT_SYMBOL(ldlm_typename);
466 EXPORT_SYMBOL(ldlm_handle2lock);
467 EXPORT_SYMBOL(ldlm_lock_match);
468 EXPORT_SYMBOL(ldlm_lock_addref);
469 EXPORT_SYMBOL(ldlm_lock_decref);
470 EXPORT_SYMBOL(ldlm_cli_convert);
471 EXPORT_SYMBOL(ldlm_cli_enqueue);
472 EXPORT_SYMBOL(ldlm_cli_cancel);
473 EXPORT_SYMBOL(ldlm_test);
474 EXPORT_SYMBOL(ldlm_lock_dump);
475 EXPORT_SYMBOL(ldlm_namespace_new);
476 EXPORT_SYMBOL(ldlm_namespace_free);
477
478 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
479 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
480 MODULE_LICENSE("GPL");
481
482 module_init(ldlm_init);
483 module_exit(ldlm_exit);