Whamcloud - gitweb
- added a 'dying' head to fix very bad bug in yesterday's request code
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  */
11
12 #define EXPORT_SYMTAB
13
14 #include <linux/version.h>
15 #include <linux/module.h>
16 #include <linux/slab.h>
17 #include <asm/unistd.h>
18
19 #define DEBUG_SUBSYSTEM S_LDLM
20
21 #include <linux/lustre_dlm.h>
22
23 extern kmem_cache_t *ldlm_resource_slab;
24 extern kmem_cache_t *ldlm_lock_slab;
25
26 static int _ldlm_namespace_new(struct obd_device *obddev,
27                                struct ptlrpc_request *req)
28 {
29         struct ldlm_request *dlm_req;
30         struct ldlm_namespace *ns;
31         int rc;
32         ldlm_error_t err;
33         ENTRY;
34
35         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
36         if (rc) {
37                 CERROR("out of memory\n");
38                 req->rq_status = -ENOMEM;
39                 RETURN(0);
40         }
41         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
42
43         err = ldlm_namespace_new(obddev, dlm_req->lock_desc.l_resource.lr_ns_id,
44                                  &ns);
45         req->rq_status = err;
46
47         CERROR("err = %d\n", err);
48
49         RETURN(0);
50 }
51
52 static int _ldlm_enqueue(struct ptlrpc_request *req)
53 {
54         struct ldlm_reply *dlm_rep;
55         struct ldlm_request *dlm_req;
56         int rc, size = sizeof(*dlm_rep);
57         ldlm_error_t err;
58         struct ldlm_lock *lock;
59         ENTRY;
60
61         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
62         if (rc) {
63                 CERROR("out of memory\n");
64                 req->rq_status = -ENOMEM;
65                 RETURN(0);
66         }
67         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
68         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
69
70         memcpy(&dlm_rep->lock_extent, &dlm_req->lock_desc.l_extent,
71                sizeof(dlm_rep->lock_extent));
72         dlm_rep->flags = dlm_req->flags;
73
74         err = ldlm_local_lock_create(dlm_req->lock_desc.l_resource.lr_ns_id,
75                                      &dlm_req->lock_handle2,
76                                      dlm_req->lock_desc.l_resource.lr_name,
77                                      dlm_req->lock_desc.l_resource.lr_type,
78                                      &dlm_rep->lock_handle);
79         if (err != ELDLM_OK)
80                 GOTO(out, err);
81
82         err = ldlm_local_lock_enqueue(&dlm_rep->lock_handle,
83                                       dlm_req->lock_desc.l_req_mode,
84                                       &dlm_rep->lock_extent,
85                                       &dlm_rep->flags,
86                                       ldlm_cli_callback,
87                                       ldlm_cli_callback,
88                                       lustre_msg_buf(req->rq_reqmsg, 1),
89                                       req->rq_reqmsg->buflens[1]);
90         if (err != ELDLM_OK)
91                 GOTO(out, err);
92
93         lock = ldlm_handle2object(&dlm_rep->lock_handle);
94         memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
95                sizeof(lock->l_remote_handle));
96         lock->l_connection = ptlrpc_connection_addref(req->rq_connection);
97         EXIT;
98  out:
99         req->rq_status = err;
100         CERROR("err = %d\n", err);
101
102         return 0;
103 }
104
105 static int _ldlm_convert(struct ptlrpc_request *req)
106 {
107         struct ldlm_request *dlm_req;
108         int rc;
109         ENTRY;
110
111         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
112         if (rc) {
113                 CERROR("out of memory\n");
114                 req->rq_status = -ENOMEM;
115                 RETURN(0);
116         }
117         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
118
119         req->rq_status =
120                 ldlm_local_lock_convert(&dlm_req->lock_handle1,
121                                         dlm_req->lock_desc.l_req_mode,
122                                         &dlm_req->flags);
123         RETURN(0);
124 }
125
126 static int _ldlm_cancel(struct ptlrpc_request *req)
127 {
128         struct ldlm_request *dlm_req;
129         int rc;
130         ENTRY;
131
132         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
133         if (rc) {
134                 CERROR("out of memory\n");
135                 req->rq_status = -ENOMEM;
136                 RETURN(0);
137         }
138         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
139
140         req->rq_status = ldlm_local_lock_cancel(&dlm_req->lock_handle1);
141         RETURN(0);
142 }
143
144 static int _ldlm_callback(struct ptlrpc_request *req)
145 {
146         struct ldlm_request *dlm_req;
147         struct ldlm_lock *lock;
148         int rc;
149         ENTRY;
150
151         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
152         if (rc) {
153                 CERROR("out of memory\n");
154                 req->rq_status = -ENOMEM;
155                 RETURN(0);
156         }
157         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
158
159         lock = ldlm_handle2object(&dlm_req->lock_handle1);
160         ldlm_lock_dump(lock);
161         if (dlm_req->lock_handle2.addr) {
162                 CERROR("Got blocked callback for lock %p.\n", lock);
163                 /* FIXME: do something impressive. */
164         } else {
165                 CERROR("Got granted callback for lock %p.\n", lock);
166                 lock->l_granted_mode = lock->l_req_mode;
167                 wake_up(&lock->l_waitq);
168         }
169
170         req->rq_status = 0;
171
172         RETURN(0);
173 }
174
175 static int ldlm_handle(struct obd_device *dev, struct ptlrpc_service *svc,
176                        struct ptlrpc_request *req)
177 {
178         int rc;
179         ENTRY;
180
181         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
182         if (rc) {
183                 CERROR("lustre_ldlm: Invalid request\n");
184                 GOTO(out, rc);
185         }
186
187         if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
188                 CERROR("lustre_ldlm: wrong packet type sent %d\n",
189                        req->rq_reqmsg->type);
190                 GOTO(out, rc = -EINVAL);
191         }
192
193         switch (req->rq_reqmsg->opc) {
194         case LDLM_NAMESPACE_NEW:
195                 CDEBUG(D_INODE, "namespace_new\n");
196                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_NAMESPACE_NEW, 0);
197                 rc = _ldlm_namespace_new(dev, req);
198                 break;
199
200         case LDLM_ENQUEUE:
201                 CDEBUG(D_INODE, "enqueue\n");
202                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
203                 rc = _ldlm_enqueue(req);
204                 break;
205
206         case LDLM_CONVERT:
207                 CDEBUG(D_INODE, "convert\n");
208                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
209                 rc = _ldlm_convert(req);
210                 break;
211
212         case LDLM_CANCEL:
213                 CDEBUG(D_INODE, "cancel\n");
214                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
215                 rc = _ldlm_cancel(req);
216                 break;
217
218         case LDLM_CALLBACK:
219                 CDEBUG(D_INODE, "callback\n");
220                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
221                 rc = _ldlm_callback(req);
222                 break;
223
224         default:
225                 rc = ptlrpc_error(svc, req);
226                 RETURN(rc);
227         }
228
229 out:
230         if (rc)
231                 RETURN(ptlrpc_error(svc, req));
232         else
233                 RETURN(ptlrpc_reply(svc, req));
234 }
235
236 static int ldlm_iocontrol(int cmd, struct obd_conn *conn, int len, void *karg,
237                           void *uarg)
238 {
239         struct obd_device *obddev = conn->oc_dev;
240         struct ptlrpc_connection *connection;
241         int err;
242         ENTRY;
243
244         if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
245             _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
246                 CDEBUG(D_IOCTL, "invalid ioctl ( type %d, nr %d, size %d )\n",
247                        _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
248                 RETURN(-EINVAL);
249         }
250
251         ptlrpc_init_client(NULL, NULL,
252                            LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
253                            obddev->u.ldlm.ldlm_client);
254         connection = ptlrpc_uuid_to_connection("ldlm");
255         if (!connection)
256                 CERROR("No LDLM UUID found: assuming ldlm is local.\n");
257
258         switch (cmd) {
259         case IOC_LDLM_TEST: {
260                 err = ldlm_test(obddev, connection);
261                 CERROR("-- done err %d\n", err);
262                 GOTO(out, err);
263         }
264         default:
265                 GOTO(out, err = -EINVAL);
266         }
267
268  out:
269         if (connection)
270                 ptlrpc_put_connection(connection);
271         return err;
272 }
273
274 static int ldlm_setup(struct obd_device *obddev, obd_count len, void *data)
275 {
276         struct ldlm_obd *ldlm = &obddev->u.ldlm;
277         int err;
278         ENTRY;
279
280         ldlm_spinlock = SPIN_LOCK_UNLOCKED;
281
282         ldlm->ldlm_service =
283                 ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL,
284                                 LDLM_REPLY_PORTAL, "self", ldlm_handle);
285         if (!ldlm->ldlm_service)
286                 LBUG();
287
288         err = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm");
289         if (err) {
290                 CERROR("cannot start thread\n");
291                 LBUG();
292         }
293
294         OBD_ALLOC(ldlm->ldlm_client, sizeof(*ldlm->ldlm_client));
295         if (ldlm->ldlm_client == NULL)
296                 LBUG();
297         ptlrpc_init_client(NULL, NULL,
298                            LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
299                            ldlm->ldlm_client);
300
301         MOD_INC_USE_COUNT;
302         RETURN(0);
303 }
304
305 static int cleanup_resource(struct ldlm_resource *res, struct list_head *q)
306 {
307         struct list_head *tmp, *pos;
308         int rc = 0;
309
310         list_for_each_safe(tmp, pos, q) {
311                 struct ldlm_lock *lock;
312
313                 if (rc) {
314                         /* Res was already cleaned up. */
315                         LBUG();
316                 }
317
318                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
319
320                 ldlm_resource_del_lock(lock);
321                 ldlm_lock_free(lock);
322                 rc = ldlm_resource_put(res);
323         }
324
325         return rc;
326 }
327
328 static int do_free_namespace(struct ldlm_namespace *ns)
329 {
330         struct list_head *tmp, *pos;
331         int i, rc;
332
333         for (i = 0; i < RES_HASH_SIZE; i++) {
334                 list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) {
335                         struct ldlm_resource *res;
336                         res = list_entry(tmp, struct ldlm_resource, lr_hash);
337                         list_del_init(&res->lr_hash);
338
339                         rc = cleanup_resource(res, &res->lr_granted);
340                         if (!rc)
341                                 rc = cleanup_resource(res, &res->lr_converting);
342                         if (!rc)
343                                 rc = cleanup_resource(res, &res->lr_waiting);
344
345                         while (rc == 0)
346                                 rc = ldlm_resource_put(res);
347                 }
348         }
349
350         return ldlm_namespace_free(ns);
351 }
352
353 static int ldlm_free_all(struct obd_device *obddev)
354 {
355         struct list_head *tmp, *pos;
356         int rc = 0;
357
358         ldlm_lock();
359
360         list_for_each_safe(tmp, pos, &ldlm_namespaces) {
361                 struct ldlm_namespace *ns;
362                 ns = list_entry(tmp, struct ldlm_namespace, ns_link);
363
364                 rc |= do_free_namespace(ns);
365         }
366
367         ldlm_unlock();
368
369         return rc;
370 }
371
372 static int ldlm_cleanup(struct obd_device *obddev)
373 {
374         struct ldlm_obd *ldlm = &obddev->u.ldlm;
375         ENTRY;
376
377         ptlrpc_stop_all_threads(ldlm->ldlm_service);
378         rpc_unregister_service(ldlm->ldlm_service);
379
380         if (!list_empty(&ldlm->ldlm_service->srv_reqs)) {
381                 // XXX reply with errors and clean up
382                 CERROR("Request list not empty!\n");
383         }
384
385         OBD_FREE(ldlm->ldlm_client, sizeof(*ldlm->ldlm_client));
386         OBD_FREE(ldlm->ldlm_service, sizeof(*ldlm->ldlm_service));
387
388         if (ldlm_free_all(obddev)) {
389                 CERROR("ldlm_free_all could not complete.\n");
390                 RETURN(-1);
391         }
392
393         MOD_DEC_USE_COUNT;
394         RETURN(0);
395 }
396
397 struct obd_ops ldlm_obd_ops = {
398         o_iocontrol:   ldlm_iocontrol,
399         o_setup:       ldlm_setup,
400         o_cleanup:     ldlm_cleanup,
401         o_connect:     gen_connect,
402         o_disconnect:  gen_disconnect
403 };
404
405
406 static int __init ldlm_init(void)
407 {
408         int rc = obd_register_type(&ldlm_obd_ops, OBD_LDLM_DEVICENAME);
409         if (rc != 0)
410                 return rc;
411
412         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
413                                                sizeof(struct ldlm_resource), 0,
414                                                SLAB_HWCACHE_ALIGN, NULL, NULL);
415         if (ldlm_resource_slab == NULL)
416                 return -ENOMEM;
417
418         ldlm_lock_slab = kmem_cache_create("ldlm_locks",
419                                            sizeof(struct ldlm_lock), 0,
420                                            SLAB_HWCACHE_ALIGN, NULL, NULL);
421         if (ldlm_lock_slab == NULL) {
422                 kmem_cache_destroy(ldlm_resource_slab);
423                 return -ENOMEM;
424         }
425
426         return 0;
427 }
428
429 static void __exit ldlm_exit(void)
430 {
431         obd_unregister_type(OBD_LDLM_DEVICENAME);
432         kmem_cache_destroy(ldlm_resource_slab);
433         kmem_cache_destroy(ldlm_lock_slab);
434 }
435
436 MODULE_AUTHOR("Cluster File Systems, Inc. <braam@clusterfs.com>");
437 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
438 MODULE_LICENSE("GPL");
439
440 module_init(ldlm_init);
441 module_exit(ldlm_exit);