Whamcloud - gitweb
79955906522da5ad1493b08e2b3b5792034cfceb
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  */
11
12 #define EXPORT_SYMTAB
13
14 #include <linux/version.h>
15 #include <linux/module.h>
16 #include <linux/slab.h>
17 #include <asm/unistd.h>
18
19 #define DEBUG_SUBSYSTEM S_LDLM
20
21 #include <linux/lustre_dlm.h>
22
23 extern kmem_cache_t *ldlm_resource_slab;
24 extern kmem_cache_t *ldlm_lock_slab;
25
26 static int _ldlm_namespace_new(struct obd_device *obddev,
27                                struct ptlrpc_request *req)
28 {
29         struct ldlm_request *dlm_req;
30         struct ldlm_namespace *ns;
31         int rc;
32         ldlm_error_t err;
33         ENTRY;
34
35         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf);
36         req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
37         if (rc) {
38                 CERROR("out of memory\n");
39                 req->rq_status = -ENOMEM;
40                 RETURN(0);
41         }
42         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
43
44         err = ldlm_namespace_new(obddev, dlm_req->lock_desc.l_resource.lr_ns_id,
45                                  &ns);
46         req->rq_status = err;
47
48         CERROR("err = %d\n", err);
49
50         RETURN(0);
51 }
52
53 static int _ldlm_enqueue(struct ptlrpc_request *req)
54 {
55         struct ldlm_reply *dlm_rep;
56         struct ldlm_request *dlm_req;
57         int rc, size = sizeof(*dlm_rep);
58         ldlm_error_t err;
59         ENTRY;
60
61         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf);
62         req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
63         if (rc) {
64                 CERROR("out of memory\n");
65                 req->rq_status = -ENOMEM;
66                 RETURN(0);
67         }
68         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
69         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
70
71         memcpy(&dlm_rep->lock_extent, &dlm_req->lock_desc.l_extent,
72                sizeof(dlm_rep->lock_extent));
73
74         err = ldlm_local_lock_enqueue(req->rq_obd,
75                                       dlm_req->lock_desc.l_resource.lr_ns_id,
76                                       &dlm_req->lock_handle2,
77                                       dlm_req->lock_desc.l_resource.lr_name,
78                                       dlm_req->lock_desc.l_resource.lr_type,
79                                       &dlm_rep->lock_extent,
80                                       dlm_req->lock_desc.l_req_mode,
81                                       &dlm_req->flags,
82                                       //ldlm_cli_callback,
83                                       NULL,
84                                       ldlm_cli_callback,
85                                       lustre_msg_buf(req->rq_reqmsg, 1),
86                                       req->rq_reqmsg->buflens[1],
87                                       &dlm_rep->lock_handle);
88         if (err != -ENOMEM && err != -ELDLM_BAD_NAMESPACE) {
89                 struct ldlm_lock *lock;
90                 lock = ldlm_handle2object(&dlm_rep->lock_handle);
91                 memcpy(&lock->l_peer, &req->rq_peer, sizeof(lock->l_peer));
92                 memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
93                        sizeof(lock->l_remote_handle));
94         }
95         req->rq_status = err;
96
97         CERROR("err = %d\n", err);
98
99         RETURN(0);
100 }
101
102 static int _ldlm_convert(struct ptlrpc_request *req)
103 {
104         struct ldlm_request *dlm_req;
105         int rc;
106         ENTRY;
107
108         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf);
109         req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
110         if (rc) {
111                 CERROR("out of memory\n");
112                 req->rq_status = -ENOMEM;
113                 RETURN(0);
114         }
115         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
116
117         req->rq_status =
118                 ldlm_local_lock_convert(req->rq_obd, &dlm_req->lock_handle1,
119                                         dlm_req->lock_desc.l_req_mode,
120                                         &dlm_req->flags);
121         RETURN(0);
122 }
123
124 static int _ldlm_cancel(struct ptlrpc_request *req)
125 {
126         struct ldlm_request *dlm_req;
127         int rc;
128         ENTRY;
129
130         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf);
131         req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
132         if (rc) {
133                 CERROR("out of memory\n");
134                 req->rq_status = -ENOMEM;
135                 RETURN(0);
136         }
137         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
138
139         req->rq_status =
140                 ldlm_local_lock_cancel(req->rq_obd, &dlm_req->lock_handle1);
141         RETURN(0);
142 }
143
144 static int _ldlm_callback(struct ptlrpc_request *req)
145 {
146         struct ldlm_request *dlm_req;
147         struct ldlm_lock *lock;
148         int rc;
149         ENTRY;
150
151         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf);
152         req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
153         if (rc) {
154                 CERROR("out of memory\n");
155                 req->rq_status = -ENOMEM;
156                 RETURN(0);
157         }
158         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
159
160         lock = ldlm_handle2object(&dlm_req->lock_handle1);
161         ldlm_lock_dump(lock);
162
163         req->rq_status = 0;
164
165         RETURN(0);
166 }
167
168 static int ldlm_handle(struct obd_device *dev, struct ptlrpc_service *svc,
169                        struct ptlrpc_request *req)
170 {
171         int rc;
172         ENTRY;
173
174         rc = lustre_unpack_msg(req->rq_reqbuf, req->rq_reqlen);
175         req->rq_reqmsg = (struct lustre_msg *)req->rq_reqbuf;
176         if (rc) {
177                 CERROR("lustre_ldlm: Invalid request\n");
178                 GOTO(out, rc);
179         }
180
181         if (req->rq_reqmsg->type != PTL_RPC_REQUEST) {
182                 CERROR("lustre_ldlm: wrong packet type sent %d\n",
183                        req->rq_reqmsg->type);
184                 GOTO(out, rc = -EINVAL);
185         }
186
187         switch (req->rq_reqmsg->opc) {
188         case LDLM_NAMESPACE_NEW:
189                 CDEBUG(D_INODE, "namespace_new\n");
190                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_NAMESPACE_NEW, 0);
191                 rc = _ldlm_namespace_new(dev, req);
192                 break;
193
194         case LDLM_ENQUEUE:
195                 CDEBUG(D_INODE, "enqueue\n");
196                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
197                 rc = _ldlm_enqueue(req);
198                 break;
199
200         case LDLM_CONVERT:
201                 CDEBUG(D_INODE, "convert\n");
202                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
203                 rc = _ldlm_convert(req);
204                 break;
205
206         case LDLM_CANCEL:
207                 CDEBUG(D_INODE, "cancel\n");
208                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
209                 rc = _ldlm_cancel(req);
210                 break;
211
212         case LDLM_CALLBACK:
213                 CDEBUG(D_INODE, "callback\n");
214                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
215                 rc = _ldlm_callback(req);
216                 break;
217
218         default:
219                 rc = ptlrpc_error(svc, req);
220                 RETURN(rc);
221         }
222
223 out:
224         if (rc)
225                 RETURN(ptlrpc_error(svc, req));
226         else
227                 RETURN(ptlrpc_reply(svc, req));
228 }
229
230 static int ldlm_iocontrol(int cmd, struct obd_conn *conn, int len, void *karg,
231                           void *uarg)
232 {
233         struct obd_device *obddev = conn->oc_dev;
234         int err;
235         ENTRY;
236
237         if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
238             _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
239                 CDEBUG(D_IOCTL, "invalid ioctl ( type %d, nr %d, size %d )\n",
240                        _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
241                 RETURN(-EINVAL);
242         }
243
244         switch (cmd) {
245         case IOC_LDLM_TEST: {
246                 err = ldlm_test(obddev);
247                 CERROR("-- done err %d\n", err);
248                 GOTO(out, err);
249         }
250         default:
251                 GOTO(out, err = -EINVAL);
252         }
253
254  out:
255         return err;
256 }
257
258 static int ldlm_setup(struct obd_device *obddev, obd_count len, void *data)
259 {
260         struct ldlm_obd *ldlm = &obddev->u.ldlm;
261         int err;
262         ENTRY;
263
264         INIT_LIST_HEAD(&obddev->u.ldlm.ldlm_namespaces);
265         obddev->u.ldlm.ldlm_lock = SPIN_LOCK_UNLOCKED;
266
267         ldlm->ldlm_service = ptlrpc_init_svc(64 * 1024,
268                                              LDLM_REQUEST_PORTAL,
269                                              LDLM_REPLY_PORTAL,
270                                              "self", ldlm_handle);
271         if (!ldlm->ldlm_service)
272                 LBUG();
273
274         err = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm");
275         if (err) {
276                 CERROR("cannot start thread\n");
277                 LBUG();
278         }
279
280         OBD_ALLOC(ldlm->ldlm_client, sizeof(*ldlm->ldlm_client));
281         if (ldlm->ldlm_client == NULL)
282                 LBUG();
283
284         ptlrpc_init_client(-1, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
285                            ldlm->ldlm_client);
286         err = ptlrpc_connect_client("ldlm", ldlm->ldlm_client,
287                                     &ldlm->ldlm_server_peer);
288         if (err) {
289                 CERROR("cannot create client\n");
290                 LBUG();
291         }
292
293         MOD_INC_USE_COUNT;
294         RETURN(0);
295 }
296
297 static int cleanup_resource(struct ldlm_resource *res, struct list_head *q)
298 {
299         struct list_head *tmp, *pos;
300         int rc = 0;
301
302         list_for_each_safe(tmp, pos, q) {
303                 struct ldlm_lock *lock;
304
305                 if (rc) {
306                         /* Res was already cleaned up. */
307                         LBUG();
308                 }
309
310                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
311
312                 ldlm_resource_del_lock(lock);
313                 ldlm_lock_free(lock);
314                 rc = ldlm_resource_put(res);
315         }
316
317         return rc;
318 }
319
320 static int do_free_namespace(struct ldlm_namespace *ns)
321 {
322         struct list_head *tmp, *pos;
323         int i, rc;
324
325         for (i = 0; i < RES_HASH_SIZE; i++) {
326                 list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) {
327                         struct ldlm_resource *res;
328                         res = list_entry(tmp, struct ldlm_resource, lr_hash);
329                         list_del_init(&res->lr_hash);
330
331                         rc = cleanup_resource(res, &res->lr_granted);
332                         if (!rc)
333                                 rc = cleanup_resource(res, &res->lr_converting);
334                         if (!rc)
335                                 rc = cleanup_resource(res, &res->lr_waiting);
336
337                         while (rc == 0)
338                                 rc = ldlm_resource_put(res);
339                 }
340         }
341
342         return ldlm_namespace_free(ns);
343 }
344
345 static int ldlm_free_all(struct obd_device *obddev)
346 {
347         struct list_head *tmp, *pos;
348         int rc = 0;
349
350         ldlm_lock(obddev);
351
352         list_for_each_safe(tmp, pos, &obddev->u.ldlm.ldlm_namespaces) {
353                 struct ldlm_namespace *ns;
354                 ns = list_entry(tmp, struct ldlm_namespace, ns_link);
355
356                 rc |= do_free_namespace(ns);
357         }
358
359         ldlm_unlock(obddev);
360
361         return rc;
362 }
363
364 static int ldlm_cleanup(struct obd_device *obddev)
365 {
366         struct ldlm_obd *ldlm = &obddev->u.ldlm;
367         ENTRY;
368
369         ptlrpc_stop_thread(ldlm->ldlm_service);
370         rpc_unregister_service(ldlm->ldlm_service);
371
372         if (!list_empty(&ldlm->ldlm_service->srv_reqs)) {
373                 // XXX reply with errors and clean up
374                 CERROR("Request list not empty!\n");
375         }
376
377         OBD_FREE(ldlm->ldlm_client, sizeof(*ldlm->ldlm_client));
378         OBD_FREE(ldlm->ldlm_service, sizeof(*ldlm->ldlm_service));
379
380         if (ldlm_free_all(obddev)) {
381                 CERROR("ldlm_free_all could not complete.\n");
382                 RETURN(-1);
383         }
384
385         MOD_DEC_USE_COUNT;
386         RETURN(0);
387 }
388
389 struct obd_ops ldlm_obd_ops = {
390         o_iocontrol:   ldlm_iocontrol,
391         o_setup:       ldlm_setup,
392         o_cleanup:     ldlm_cleanup,
393         o_connect:     gen_connect,
394         o_disconnect:  gen_disconnect
395 };
396
397
398 static int __init ldlm_init(void)
399 {
400         int rc = obd_register_type(&ldlm_obd_ops, OBD_LDLM_DEVICENAME);
401         if (rc != 0)
402                 return rc;
403
404         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
405                                                sizeof(struct ldlm_resource), 0,
406                                                SLAB_HWCACHE_ALIGN, NULL, NULL);
407         if (ldlm_resource_slab == NULL)
408                 return -ENOMEM;
409
410         ldlm_lock_slab = kmem_cache_create("ldlm_locks",
411                                            sizeof(struct ldlm_lock), 0,
412                                            SLAB_HWCACHE_ALIGN, NULL, NULL);
413         if (ldlm_lock_slab == NULL) {
414                 kmem_cache_destroy(ldlm_resource_slab);
415                 return -ENOMEM;
416         }
417
418         return 0;
419 }
420
421 static void __exit ldlm_exit(void)
422 {
423         obd_unregister_type(OBD_LDLM_DEVICENAME);
424         kmem_cache_destroy(ldlm_resource_slab);
425         kmem_cache_destroy(ldlm_lock_slab);
426 }
427
428 MODULE_AUTHOR("Cluster File Systems, Inc. <braam@clusterfs.com>");
429 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
430 MODULE_LICENSE("GPL");
431
432 module_init(ldlm_init);
433 module_exit(ldlm_exit);