Whamcloud - gitweb
dc56bcc1ba036958eb60cce1f3e3e90b5c393faa
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  */
11
12 #define EXPORT_SYMTAB
13
14 #include <linux/version.h>
15 #include <linux/module.h>
16 #include <linux/slab.h>
17 #include <asm/unistd.h>
18
19 #define DEBUG_SUBSYSTEM S_LDLM
20
21 #include <linux/lustre_dlm.h>
22
23 extern kmem_cache_t *ldlm_resource_slab;
24 extern kmem_cache_t *ldlm_lock_slab;
25
26 static int _ldlm_namespace_new(struct obd_device *obddev,
27                                struct ptlrpc_request *req)
28 {
29         struct ldlm_request *dlm_req;
30         struct ldlm_namespace *ns;
31         int rc;
32         ldlm_error_t err;
33         ENTRY;
34
35         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
36         if (rc) {
37                 CERROR("out of memory\n");
38                 req->rq_status = -ENOMEM;
39                 RETURN(0);
40         }
41         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
42
43         err = ldlm_namespace_new(obddev, dlm_req->lock_desc.l_resource.lr_ns_id,
44                                  &ns);
45         req->rq_status = err;
46
47         CERROR("err = %d\n", err);
48
49         RETURN(0);
50 }
51
52 static int _ldlm_enqueue(struct ptlrpc_request *req)
53 {
54         struct ldlm_reply *dlm_rep;
55         struct ldlm_request *dlm_req;
56         int rc, size = sizeof(*dlm_rep);
57         ldlm_error_t err;
58         struct ldlm_lock *lock;
59         ENTRY;
60
61         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
62         if (rc) {
63                 CERROR("out of memory\n");
64                 req->rq_status = -ENOMEM;
65                 RETURN(0);
66         }
67         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
68         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
69
70         memcpy(&dlm_rep->lock_extent, &dlm_req->lock_desc.l_extent,
71                sizeof(dlm_rep->lock_extent));
72         dlm_rep->flags = dlm_req->flags;
73
74         err = ldlm_local_lock_create(dlm_req->lock_desc.l_resource.lr_ns_id,
75                                      &dlm_req->lock_handle2,
76                                      dlm_req->lock_desc.l_resource.lr_name,
77                                      dlm_req->lock_desc.l_resource.lr_type,
78                                      &dlm_rep->lock_handle);
79         if (err != ELDLM_OK)
80                 GOTO(out, err);
81
82         err = ldlm_local_lock_enqueue(&dlm_rep->lock_handle,
83                                       dlm_req->lock_desc.l_req_mode,
84                                       &dlm_rep->lock_extent,
85                                       &dlm_rep->flags,
86                                       NULL, //ldlm_cli_callback,
87                                       ldlm_cli_callback,
88                                       lustre_msg_buf(req->rq_reqmsg, 1),
89                                       req->rq_reqmsg->buflens[1]);
90         if (err != ELDLM_OK)
91                 GOTO(out, err);
92
93         lock = ldlm_handle2object(&dlm_rep->lock_handle);
94         memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
95                sizeof(lock->l_remote_handle));
96         lock->l_connection = ptlrpc_connection_addref(req->rq_connection);
97         EXIT;
98  out:
99         req->rq_status = err;
100         CERROR("err = %d\n", err);
101
102         return 0;
103 }
104
105 static int _ldlm_convert(struct ptlrpc_request *req)
106 {
107         struct ldlm_request *dlm_req;
108         int rc;
109         ENTRY;
110
111         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
112         if (rc) {
113                 CERROR("out of memory\n");
114                 req->rq_status = -ENOMEM;
115                 RETURN(0);
116         }
117         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
118
119         req->rq_status =
120                 ldlm_local_lock_convert(&dlm_req->lock_handle1,
121                                         dlm_req->lock_desc.l_req_mode,
122                                         &dlm_req->flags);
123         RETURN(0);
124 }
125
126 static int _ldlm_cancel(struct ptlrpc_request *req)
127 {
128         struct ldlm_request *dlm_req;
129         int rc;
130         ENTRY;
131
132         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
133         if (rc) {
134                 CERROR("out of memory\n");
135                 req->rq_status = -ENOMEM;
136                 RETURN(0);
137         }
138         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
139
140         req->rq_status = ldlm_local_lock_cancel(&dlm_req->lock_handle1);
141         RETURN(0);
142 }
143
144 static int _ldlm_callback(struct ptlrpc_request *req)
145 {
146         struct ldlm_request *dlm_req;
147         struct ldlm_lock *lock;
148         int rc;
149         ENTRY;
150
151         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
152         if (rc) {
153                 CERROR("out of memory\n");
154                 req->rq_status = -ENOMEM;
155                 RETURN(0);
156         }
157         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
158
159         lock = ldlm_handle2object(&dlm_req->lock_handle1);
160         ldlm_lock_dump(lock);
161
162         req->rq_status = 0;
163
164         RETURN(0);
165 }
166
167 static int ldlm_handle(struct obd_device *dev, struct ptlrpc_service *svc,
168                        struct ptlrpc_request *req)
169 {
170         int rc;
171         ENTRY;
172
173         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
174         if (rc) {
175                 CERROR("lustre_ldlm: Invalid request\n");
176                 GOTO(out, rc);
177         }
178
179         if (req->rq_reqmsg->type != PTL_RPC_REQUEST) {
180                 CERROR("lustre_ldlm: wrong packet type sent %d\n",
181                        req->rq_reqmsg->type);
182                 GOTO(out, rc = -EINVAL);
183         }
184
185         switch (req->rq_reqmsg->opc) {
186         case LDLM_NAMESPACE_NEW:
187                 CDEBUG(D_INODE, "namespace_new\n");
188                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_NAMESPACE_NEW, 0);
189                 rc = _ldlm_namespace_new(dev, req);
190                 break;
191
192         case LDLM_ENQUEUE:
193                 CDEBUG(D_INODE, "enqueue\n");
194                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
195                 rc = _ldlm_enqueue(req);
196                 break;
197
198         case LDLM_CONVERT:
199                 CDEBUG(D_INODE, "convert\n");
200                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
201                 rc = _ldlm_convert(req);
202                 break;
203
204         case LDLM_CANCEL:
205                 CDEBUG(D_INODE, "cancel\n");
206                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
207                 rc = _ldlm_cancel(req);
208                 break;
209
210         case LDLM_CALLBACK:
211                 CDEBUG(D_INODE, "callback\n");
212                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
213                 rc = _ldlm_callback(req);
214                 break;
215
216         default:
217                 rc = ptlrpc_error(svc, req);
218                 RETURN(rc);
219         }
220
221 out:
222         if (rc)
223                 RETURN(ptlrpc_error(svc, req));
224         else
225                 RETURN(ptlrpc_reply(svc, req));
226 }
227
228 static int ldlm_iocontrol(int cmd, struct obd_conn *conn, int len, void *karg,
229                           void *uarg)
230 {
231         struct obd_device *obddev = conn->oc_dev;
232         int err;
233         ENTRY;
234
235         if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
236             _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
237                 CDEBUG(D_IOCTL, "invalid ioctl ( type %d, nr %d, size %d )\n",
238                                 _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
239                 EXIT;
240                 return -EINVAL;
241         }
242
243 #if 0
244         /* XX phil -- put the peer back in */
245
246         ptlrpc_init_client(NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL, &cl);
247         err = ptlrpc_connect_client("ldlm", &cl, NULL);
248 #endif
249         if (err) {
250                 CERROR("cannot create client\n");
251                 RETURN(-EINVAL);
252         }
253
254         switch (cmd) {
255         case IOC_LDLM_TEST: {
256                 err = ldlm_test(obddev);
257                 CERROR("-- done err %d\n", err);
258                 GOTO(out, err);
259         }
260         default:
261                 GOTO(out, err = -EINVAL);
262         }
263
264  out:
265         return err;
266 }
267
268 static int ldlm_setup(struct obd_device *obddev, obd_count len, void *data)
269 {
270         struct ldlm_obd *ldlm = &obddev->u.ldlm;
271         int err;
272         ENTRY;
273
274         INIT_LIST_HEAD(&ldlm_namespaces);
275         ldlm_spinlock = SPIN_LOCK_UNLOCKED;
276
277         ldlm->ldlm_service = ptlrpc_init_svc(64 * 1024,
278                                              LDLM_REQUEST_PORTAL,
279                                              LDLM_REPLY_PORTAL,
280                                              "self", ldlm_handle);
281         if (!ldlm->ldlm_service)
282                 LBUG();
283
284         err = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm");
285         if (err) {
286                 CERROR("cannot start thread\n");
287                 LBUG();
288         }
289
290         OBD_ALLOC(ldlm->ldlm_client, sizeof(*ldlm->ldlm_client));
291         if (ldlm->ldlm_client == NULL)
292                 LBUG();
293
294         ptlrpc_init_client(NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
295                            ldlm->ldlm_client);
296         ldlm->ldlm_server_conn = ptlrpc_uuid_to_connection("ldlm");
297         if (!ldlm->ldlm_server_conn) {
298                 CERROR("cannot create client\n");
299                 LBUG();
300         }
301
302         MOD_INC_USE_COUNT;
303         RETURN(0);
304 }
305
306 static int cleanup_resource(struct ldlm_resource *res, struct list_head *q)
307 {
308         struct list_head *tmp, *pos;
309         int rc = 0;
310
311         list_for_each_safe(tmp, pos, q) {
312                 struct ldlm_lock *lock;
313
314                 if (rc) {
315                         /* Res was already cleaned up. */
316                         LBUG();
317                 }
318
319                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
320
321                 ldlm_resource_del_lock(lock);
322                 ldlm_lock_free(lock);
323                 rc = ldlm_resource_put(res);
324         }
325
326         return rc;
327 }
328
329 static int do_free_namespace(struct ldlm_namespace *ns)
330 {
331         struct list_head *tmp, *pos;
332         int i, rc;
333
334         for (i = 0; i < RES_HASH_SIZE; i++) {
335                 list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) {
336                         struct ldlm_resource *res;
337                         res = list_entry(tmp, struct ldlm_resource, lr_hash);
338                         list_del_init(&res->lr_hash);
339
340                         rc = cleanup_resource(res, &res->lr_granted);
341                         if (!rc)
342                                 rc = cleanup_resource(res, &res->lr_converting);
343                         if (!rc)
344                                 rc = cleanup_resource(res, &res->lr_waiting);
345
346                         while (rc == 0)
347                                 rc = ldlm_resource_put(res);
348                 }
349         }
350
351         return ldlm_namespace_free(ns);
352 }
353
354 static int ldlm_free_all(struct obd_device *obddev)
355 {
356         struct list_head *tmp, *pos;
357         int rc = 0;
358
359         ldlm_lock();
360
361         list_for_each_safe(tmp, pos, &ldlm_namespaces) {
362                 struct ldlm_namespace *ns;
363                 ns = list_entry(tmp, struct ldlm_namespace, ns_link);
364
365                 rc |= do_free_namespace(ns);
366         }
367
368         ldlm_unlock();
369
370         return rc;
371 }
372
373 static int ldlm_cleanup(struct obd_device *obddev)
374 {
375         struct ldlm_obd *ldlm = &obddev->u.ldlm;
376         ENTRY;
377
378         ptlrpc_stop_thread(ldlm->ldlm_service);
379         rpc_unregister_service(ldlm->ldlm_service);
380
381         if (!list_empty(&ldlm->ldlm_service->srv_reqs)) {
382                 // XXX reply with errors and clean up
383                 CERROR("Request list not empty!\n");
384         }
385
386         OBD_FREE(ldlm->ldlm_client, sizeof(*ldlm->ldlm_client));
387         OBD_FREE(ldlm->ldlm_service, sizeof(*ldlm->ldlm_service));
388         ptlrpc_put_connection(ldlm->ldlm_server_conn);
389
390         if (ldlm_free_all(obddev)) {
391                 CERROR("ldlm_free_all could not complete.\n");
392                 RETURN(-1);
393         }
394
395         MOD_DEC_USE_COUNT;
396         RETURN(0);
397 }
398
399 struct obd_ops ldlm_obd_ops = {
400         o_iocontrol:   ldlm_iocontrol,
401         o_setup:       ldlm_setup,
402         o_cleanup:     ldlm_cleanup,
403         o_connect:     gen_connect,
404         o_disconnect:  gen_disconnect
405 };
406
407
408 static int __init ldlm_init(void)
409 {
410         int rc = obd_register_type(&ldlm_obd_ops, OBD_LDLM_DEVICENAME);
411         if (rc != 0)
412                 return rc;
413
414         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
415                                                sizeof(struct ldlm_resource), 0,
416                                                SLAB_HWCACHE_ALIGN, NULL, NULL);
417         if (ldlm_resource_slab == NULL)
418                 return -ENOMEM;
419
420         ldlm_lock_slab = kmem_cache_create("ldlm_locks",
421                                            sizeof(struct ldlm_lock), 0,
422                                            SLAB_HWCACHE_ALIGN, NULL, NULL);
423         if (ldlm_lock_slab == NULL) {
424                 kmem_cache_destroy(ldlm_resource_slab);
425                 return -ENOMEM;
426         }
427
428         return 0;
429 }
430
431 static void __exit ldlm_exit(void)
432 {
433         obd_unregister_type(OBD_LDLM_DEVICENAME);
434         kmem_cache_destroy(ldlm_resource_slab);
435         kmem_cache_destroy(ldlm_lock_slab);
436 }
437
438 MODULE_AUTHOR("Cluster File Systems, Inc. <braam@clusterfs.com>");
439 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
440 MODULE_LICENSE("GPL");
441
442 module_init(ldlm_init);
443 module_exit(ldlm_exit);