Whamcloud - gitweb
- More Peter's additions for the ha manager. This doesn't seem to break much -
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  */
11
12 #define EXPORT_SYMTAB
13
14 #include <linux/version.h>
15 #include <linux/module.h>
16 #include <linux/slab.h>
17 #include <asm/unistd.h>
18
19 #define DEBUG_SUBSYSTEM S_LDLM
20
21 #include <linux/lustre_dlm.h>
22
23 extern kmem_cache_t *ldlm_resource_slab;
24 extern kmem_cache_t *ldlm_lock_slab;
25
26 static int _ldlm_namespace_new(struct obd_device *obddev,
27                                struct ptlrpc_request *req)
28 {
29         struct ldlm_request *dlm_req;
30         struct ldlm_namespace *ns;
31         int rc;
32         ldlm_error_t err;
33         ENTRY;
34
35         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf);
36         req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
37         if (rc) {
38                 CERROR("out of memory\n");
39                 req->rq_status = -ENOMEM;
40                 RETURN(0);
41         }
42         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
43
44         err = ldlm_namespace_new(obddev, dlm_req->lock_desc.l_resource.lr_ns_id,
45                                  &ns);
46         req->rq_status = err;
47
48         CERROR("err = %d\n", err);
49
50         RETURN(0);
51 }
52
53 static int _ldlm_enqueue(struct ptlrpc_request *req)
54 {
55         struct ldlm_reply *dlm_rep;
56         struct ldlm_request *dlm_req;
57         int rc, size = sizeof(*dlm_rep);
58         ldlm_error_t err;
59         ENTRY;
60
61         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repbuf);
62         req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
63         if (rc) {
64                 CERROR("out of memory\n");
65                 req->rq_status = -ENOMEM;
66                 RETURN(0);
67         }
68         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
69         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
70
71         memcpy(&dlm_rep->lock_extent, &dlm_req->lock_desc.l_extent,
72                sizeof(dlm_rep->lock_extent));
73
74         err = ldlm_local_lock_enqueue(req->rq_obd,
75                                       dlm_req->lock_desc.l_resource.lr_ns_id,
76                                       &dlm_req->lock_handle2,
77                                       dlm_req->lock_desc.l_resource.lr_name,
78                                       dlm_req->lock_desc.l_resource.lr_type,
79                                       &dlm_rep->lock_extent,
80                                       dlm_req->lock_desc.l_req_mode,
81                                       &dlm_req->flags,
82                                       //ldlm_cli_callback,
83                                       NULL,
84                                       ldlm_cli_callback,
85                                       lustre_msg_buf(req->rq_reqmsg, 1),
86                                       req->rq_reqmsg->buflens[1],
87                                       &dlm_rep->lock_handle);
88         if (err != -ENOMEM && err != -ELDLM_BAD_NAMESPACE) {
89                 struct ldlm_lock *lock;
90                 lock = ldlm_handle2object(&dlm_rep->lock_handle);
91                 memcpy(&lock->l_peer, &req->rq_peer, sizeof(lock->l_peer));
92                 memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
93                        sizeof(lock->l_remote_handle));
94         }
95         req->rq_status = err;
96
97         CERROR("err = %d\n", err);
98
99         RETURN(0);
100 }
101
102 static int _ldlm_convert(struct ptlrpc_request *req)
103 {
104         struct ldlm_request *dlm_req;
105         int rc;
106         ENTRY;
107
108         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf);
109         req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
110         if (rc) {
111                 CERROR("out of memory\n");
112                 req->rq_status = -ENOMEM;
113                 RETURN(0);
114         }
115         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
116
117         req->rq_status =
118                 ldlm_local_lock_convert(req->rq_obd, &dlm_req->lock_handle1,
119                                         dlm_req->lock_desc.l_req_mode,
120                                         &dlm_req->flags);
121         RETURN(0);
122 }
123
124 static int _ldlm_cancel(struct ptlrpc_request *req)
125 {
126         struct ldlm_request *dlm_req;
127         int rc;
128         ENTRY;
129
130         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf);
131         req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
132         if (rc) {
133                 CERROR("out of memory\n");
134                 req->rq_status = -ENOMEM;
135                 RETURN(0);
136         }
137         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
138
139         req->rq_status =
140                 ldlm_local_lock_cancel(req->rq_obd, &dlm_req->lock_handle1);
141         RETURN(0);
142 }
143
144 static int _ldlm_callback(struct ptlrpc_request *req)
145 {
146         struct ldlm_request *dlm_req;
147         struct ldlm_lock *lock;
148         int rc;
149         ENTRY;
150
151         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repbuf);
152         req->rq_repmsg = (struct lustre_msg *)req->rq_repbuf;
153         if (rc) {
154                 CERROR("out of memory\n");
155                 req->rq_status = -ENOMEM;
156                 RETURN(0);
157         }
158         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
159
160         lock = ldlm_handle2object(&dlm_req->lock_handle1);
161         ldlm_lock_dump(lock);
162
163         req->rq_status = 0;
164
165         RETURN(0);
166 }
167
168 static int ldlm_handle(struct obd_device *dev, struct ptlrpc_service *svc,
169                        struct ptlrpc_request *req)
170 {
171         int rc;
172         ENTRY;
173
174         rc = lustre_unpack_msg(req->rq_reqbuf, req->rq_reqlen);
175         req->rq_reqmsg = (struct lustre_msg *)req->rq_reqbuf;
176         if (rc) {
177                 CERROR("lustre_ldlm: Invalid request\n");
178                 GOTO(out, rc);
179         }
180
181         if (req->rq_reqmsg->type != PTL_RPC_REQUEST) {
182                 CERROR("lustre_ldlm: wrong packet type sent %d\n",
183                        req->rq_reqmsg->type);
184                 GOTO(out, rc = -EINVAL);
185         }
186
187         switch (req->rq_reqmsg->opc) {
188         case LDLM_NAMESPACE_NEW:
189                 CDEBUG(D_INODE, "namespace_new\n");
190                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_NAMESPACE_NEW, 0);
191                 rc = _ldlm_namespace_new(dev, req);
192                 break;
193
194         case LDLM_ENQUEUE:
195                 CDEBUG(D_INODE, "enqueue\n");
196                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
197                 rc = _ldlm_enqueue(req);
198                 break;
199
200         case LDLM_CONVERT:
201                 CDEBUG(D_INODE, "convert\n");
202                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
203                 rc = _ldlm_convert(req);
204                 break;
205
206         case LDLM_CANCEL:
207                 CDEBUG(D_INODE, "cancel\n");
208                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
209                 rc = _ldlm_cancel(req);
210                 break;
211
212         case LDLM_CALLBACK:
213                 CDEBUG(D_INODE, "callback\n");
214                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
215                 rc = _ldlm_callback(req);
216                 break;
217
218         default:
219                 rc = ptlrpc_error(svc, req);
220                 RETURN(rc);
221         }
222
223 out:
224         if (rc)
225                 RETURN(ptlrpc_error(svc, req));
226         else
227                 RETURN(ptlrpc_reply(svc, req));
228 }
229
230 static int ldlm_iocontrol(int cmd, struct obd_conn *conn, int len, void *karg,
231                           void *uarg)
232 {
233         struct obd_device *obddev = conn->oc_dev;
234         int err;
235         ENTRY;
236
237         if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
238             _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
239                 CDEBUG(D_IOCTL, "invalid ioctl ( type %d, nr %d, size %d )\n",
240                                 _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
241                 EXIT;
242                 return -EINVAL;
243         }
244
245 #if 0
246         /* XX phil -- put the peer back in */
247
248         ptlrpc_init_client(NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL, &cl);
249         err = ptlrpc_connect_client("ldlm", &cl, NULL);
250 #endif
251         if (err) {
252                 CERROR("cannot create client\n");
253                 RETURN(-EINVAL);
254         }
255
256         switch (cmd) {
257         case IOC_LDLM_TEST: {
258                 err = ldlm_test(obddev);
259                 CERROR("-- done err %d\n", err);
260                 GOTO(out, err);
261         }
262         default:
263                 GOTO(out, err = -EINVAL);
264         }
265
266  out:
267         return err;
268 }
269
270 static int ldlm_setup(struct obd_device *obddev, obd_count len, void *data)
271 {
272         struct ldlm_obd *ldlm = &obddev->u.ldlm;
273         int err;
274         ENTRY;
275
276         INIT_LIST_HEAD(&obddev->u.ldlm.ldlm_namespaces);
277         obddev->u.ldlm.ldlm_lock = SPIN_LOCK_UNLOCKED;
278
279         ldlm->ldlm_service = ptlrpc_init_svc(64 * 1024,
280                                              LDLM_REQUEST_PORTAL,
281                                              LDLM_REPLY_PORTAL,
282                                              "self", ldlm_handle);
283         if (!ldlm->ldlm_service)
284                 LBUG();
285
286         err = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm");
287         if (err) {
288                 CERROR("cannot start thread\n");
289                 LBUG();
290         }
291
292         OBD_ALLOC(ldlm->ldlm_client, sizeof(*ldlm->ldlm_client));
293         if (ldlm->ldlm_client == NULL)
294                 LBUG();
295
296         ptlrpc_init_client(-1, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
297                            ldlm->ldlm_client);
298         err = ptlrpc_connect_client("ldlm", ldlm->ldlm_client,
299                                     &ldlm->ldlm_server_peer);
300         if (err) {
301                 CERROR("cannot create client\n");
302                 LBUG();
303         }
304
305         MOD_INC_USE_COUNT;
306         RETURN(0);
307 }
308
309 static int cleanup_resource(struct ldlm_resource *res, struct list_head *q)
310 {
311         struct list_head *tmp, *pos;
312         int rc = 0;
313
314         list_for_each_safe(tmp, pos, q) {
315                 struct ldlm_lock *lock;
316
317                 if (rc) {
318                         /* Res was already cleaned up. */
319                         LBUG();
320                 }
321
322                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
323
324                 ldlm_resource_del_lock(lock);
325                 ldlm_lock_free(lock);
326                 rc = ldlm_resource_put(res);
327         }
328
329         return rc;
330 }
331
332 static int do_free_namespace(struct ldlm_namespace *ns)
333 {
334         struct list_head *tmp, *pos;
335         int i, rc;
336
337         for (i = 0; i < RES_HASH_SIZE; i++) {
338                 list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) {
339                         struct ldlm_resource *res;
340                         res = list_entry(tmp, struct ldlm_resource, lr_hash);
341                         list_del_init(&res->lr_hash);
342
343                         rc = cleanup_resource(res, &res->lr_granted);
344                         if (!rc)
345                                 rc = cleanup_resource(res, &res->lr_converting);
346                         if (!rc)
347                                 rc = cleanup_resource(res, &res->lr_waiting);
348
349                         while (rc == 0)
350                                 rc = ldlm_resource_put(res);
351                 }
352         }
353
354         return ldlm_namespace_free(ns);
355 }
356
357 static int ldlm_free_all(struct obd_device *obddev)
358 {
359         struct list_head *tmp, *pos;
360         int rc = 0;
361
362         ldlm_lock(obddev);
363
364         list_for_each_safe(tmp, pos, &obddev->u.ldlm.ldlm_namespaces) {
365                 struct ldlm_namespace *ns;
366                 ns = list_entry(tmp, struct ldlm_namespace, ns_link);
367
368                 rc |= do_free_namespace(ns);
369         }
370
371         ldlm_unlock(obddev);
372
373         return rc;
374 }
375
376 static int ldlm_cleanup(struct obd_device *obddev)
377 {
378         struct ldlm_obd *ldlm = &obddev->u.ldlm;
379         ENTRY;
380
381         ptlrpc_stop_thread(ldlm->ldlm_service);
382         rpc_unregister_service(ldlm->ldlm_service);
383
384         if (!list_empty(&ldlm->ldlm_service->srv_reqs)) {
385                 // XXX reply with errors and clean up
386                 CERROR("Request list not empty!\n");
387         }
388
389         OBD_FREE(ldlm->ldlm_client, sizeof(*ldlm->ldlm_client));
390         OBD_FREE(ldlm->ldlm_service, sizeof(*ldlm->ldlm_service));
391
392         if (ldlm_free_all(obddev)) {
393                 CERROR("ldlm_free_all could not complete.\n");
394                 RETURN(-1);
395         }
396
397         MOD_DEC_USE_COUNT;
398         RETURN(0);
399 }
400
401 struct obd_ops ldlm_obd_ops = {
402         o_iocontrol:   ldlm_iocontrol,
403         o_setup:       ldlm_setup,
404         o_cleanup:     ldlm_cleanup,
405         o_connect:     gen_connect,
406         o_disconnect:  gen_disconnect
407 };
408
409
410 static int __init ldlm_init(void)
411 {
412         int rc = obd_register_type(&ldlm_obd_ops, OBD_LDLM_DEVICENAME);
413         if (rc != 0)
414                 return rc;
415
416         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
417                                                sizeof(struct ldlm_resource), 0,
418                                                SLAB_HWCACHE_ALIGN, NULL, NULL);
419         if (ldlm_resource_slab == NULL)
420                 return -ENOMEM;
421
422         ldlm_lock_slab = kmem_cache_create("ldlm_locks",
423                                            sizeof(struct ldlm_lock), 0,
424                                            SLAB_HWCACHE_ALIGN, NULL, NULL);
425         if (ldlm_lock_slab == NULL) {
426                 kmem_cache_destroy(ldlm_resource_slab);
427                 return -ENOMEM;
428         }
429
430         return 0;
431 }
432
433 static void __exit ldlm_exit(void)
434 {
435         obd_unregister_type(OBD_LDLM_DEVICENAME);
436         kmem_cache_destroy(ldlm_resource_slab);
437         kmem_cache_destroy(ldlm_lock_slab);
438 }
439
440 MODULE_AUTHOR("Cluster File Systems, Inc. <braam@clusterfs.com>");
441 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
442 MODULE_LICENSE("GPL");
443
444 module_init(ldlm_init);
445 module_exit(ldlm_exit);