Whamcloud - gitweb
5dd68e92e9bf20848a8826e8275940a3d7ec046f
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  */
11
12 #define EXPORT_SYMTAB
13
14 #include <linux/version.h>
15 #include <linux/module.h>
16 #include <linux/slab.h>
17 #include <asm/unistd.h>
18
19 #define DEBUG_SUBSYSTEM S_LDLM
20
21 #include <linux/obd_class.h>
22 #include <linux/lustre_dlm.h>
23 #include <linux/lustre_net.h>
24
25 extern kmem_cache_t *ldlm_resource_slab;
26 extern kmem_cache_t *ldlm_lock_slab;
27
28 static int ldlm_client_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
29                                 void *data, __u32 data_len)
30 {
31         LBUG();
32         return 0;
33 }
34
35 static int ldlm_enqueue(struct ptlrpc_request *req)
36 {
37         struct ldlm_reply *dlm_rep;
38         struct ldlm_request *dlm_req;
39         struct lustre_msg *msg, *req_msg;
40         ldlm_error_t err;
41         int rc;
42         int bufsize = sizeof(*dlm_rep);
43         
44         rc = lustre_pack_msg(1, &bufsize, NULL, &req->rq_replen,
45                              &req->rq_repbuf);
46         if (rc) {
47                 CERROR("out of memory\n");
48                 req->rq_status = -ENOMEM;
49                 RETURN(0);
50         }
51         msg = (struct lustre_msg *)req->rq_repbuf;
52         req_msg = req->rq_req.lustre;
53         dlm_rep = lustre_msg_buf(0, msg);
54         dlm_req = lustre_msg_buf(0, req_msg);
55
56         msg->xid = req_msg->xid;
57
58         err = ldlm_local_lock_enqueue(req->rq_obd,
59                                       dlm_req->ns_id,
60                                       &dlm_req->parent_lock_handle,
61                                       dlm_req->res_id,
62                                       dlm_req->res_type,
63                                       &dlm_req->lock_extent,
64                                       dlm_req->mode,
65                                       &dlm_req->flags,
66                                       ldlm_client_callback,
67                                       ldlm_client_callback,
68                                       lustre_msg_buf(1, req_msg),
69                                       req_msg->buflens[1],
70                                       &dlm_rep->lock_handle);
71         msg->status = HTON__u32(err);
72
73         /* XXX unfinished */
74         return 0;
75 }
76
77 static int ldlm_handle(struct obd_device *dev, struct ptlrpc_service *svc,
78                        struct ptlrpc_request *req)
79 {
80         int rc;
81         struct ptlreq_hdr *hdr;
82
83         ENTRY;
84
85         hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
86
87         if (NTOH__u32(hdr->type) != PTL_RPC_REQUEST) {
88                 CERROR("lustre_ldlm: wrong packet type sent %d\n",
89                        NTOH__u32(hdr->type));
90                 rc = -EINVAL;
91                 GOTO(out, rc);
92         }
93
94         rc = lustre_unpack_msg(req->rq_reqbuf, req->rq_reqlen);
95         req->rq_reqhdr = (void *)req->rq_reqbuf;
96         if (rc) {
97                 CERROR("lustre_ldlm: Invalid request\n");
98                 GOTO(out, rc);
99         }
100
101         switch (req->rq_reqhdr->opc) {
102         case LDLM_ENQUEUE:
103                 CDEBUG(D_INODE, "enqueue\n");
104                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
105                 rc = ldlm_enqueue(req);
106                 break;
107 #if 0
108         case LDLM_CONVERT:
109                 CDEBUG(D_INODE, "convert\n");
110                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
111                 rc = ldlm_convert(req);
112                 break;
113
114         case LDLM_CANCEL:
115                 CDEBUG(D_INODE, "cancel\n");
116                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
117                 rc = ldlm_cancel(req);
118                 break;
119
120         case LDLM_CALLBACK:
121                 CDEBUG(D_INODE, "callback\n");
122                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
123                 rc = ldlm_callback(req);
124                 break;
125 #endif
126
127         default:
128                 rc = ptlrpc_error(dev, svc, req);
129                 RETURN(rc);
130         }
131
132         EXIT;
133 out:
134         if (rc) {
135                 CERROR("no header\n");
136                 return 0;
137         }
138
139         if( req->rq_status) {
140                 ptlrpc_error(dev, svc, req);
141         } else {
142                 CDEBUG(D_NET, "sending reply\n");
143                 ptlrpc_reply(dev, svc, req);
144         }
145
146         return 0;
147 }
148
149
150
151 static int ldlm_iocontrol(int cmd, struct obd_conn *conn, int len, void *karg,
152                           void *uarg)
153 {
154         struct obd_device *obddev = conn->oc_dev;
155         int err;
156
157         ENTRY;
158
159         if ( _IOC_TYPE(cmd) != IOC_LDLM_TYPE ||
160              _IOC_NR(cmd) < IOC_LDLM_MIN_NR  ||
161              _IOC_NR(cmd) > IOC_LDLM_MAX_NR ) {
162                 CDEBUG(D_IOCTL, "invalid ioctl ( type %d, nr %d, size %d )\n",
163                                 _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
164                 EXIT;
165                 return -EINVAL;
166         }
167
168         switch (cmd) {
169         case IOC_LDLM_TEST: {
170                 err = ldlm_test(obddev);
171                 CERROR("-- done err %d\n", err);
172                 EXIT;
173                 break;
174         }
175         default:
176                 err = -EINVAL;
177                 EXIT;
178                 break;
179         }
180
181         return err;
182 }
183
184 static int ldlm_setup(struct obd_device *obddev, obd_count len, void *data)
185 {
186         struct ldlm_obd *ldlm = &obddev->u.ldlm;
187         int err;
188         ENTRY;
189
190         INIT_LIST_HEAD(&obddev->u.ldlm.ldlm_namespaces);
191         obddev->u.ldlm.ldlm_lock = SPIN_LOCK_UNLOCKED;
192
193         ldlm->ldlm_service = ptlrpc_init_svc(64 * 1024,
194                                              LDLM_REQUEST_PORTAL,
195                                              LDLM_REPLY_PORTAL,
196                                              "self", ldlm_handle);
197         if (!ldlm->ldlm_service)
198                 LBUG();
199
200         err = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm");
201         if (err)
202                 CERROR("cannot start thread\n");
203
204         MOD_INC_USE_COUNT;
205         RETURN(0);
206 }
207
208 static int cleanup_resource(struct ldlm_resource *res, struct list_head *q)
209 {
210         struct list_head *tmp, *pos;
211         int rc = 0;
212
213         list_for_each_safe(tmp, pos, q) {
214                 struct ldlm_lock *lock;
215
216                 if (rc) {
217                         /* Res was already cleaned up. */
218                         LBUG();
219                 }
220
221                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
222
223                 ldlm_resource_del_lock(lock);
224                 ldlm_lock_free(lock);
225                 rc = ldlm_resource_put(res);
226         }
227
228         return rc;
229 }
230
231 static int do_free_namespace(struct ldlm_namespace *ns)
232 {
233         struct list_head *tmp, *pos;
234         int i, rc;
235
236         for (i = 0; i < RES_HASH_SIZE; i++) {
237                 list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) {
238                         struct ldlm_resource *res;
239                         res = list_entry(tmp, struct ldlm_resource, lr_hash);
240                         list_del_init(&res->lr_hash);
241
242                         rc = cleanup_resource(res, &res->lr_granted);
243                         if (!rc)
244                                 rc = cleanup_resource(res, &res->lr_converting);
245                         if (!rc)
246                                 rc = cleanup_resource(res, &res->lr_waiting);
247
248                         while (rc == 0)
249                                 rc = ldlm_resource_put(res);
250                 }
251         }
252
253         return ldlm_namespace_free(ns);
254 }
255
256 static int ldlm_free_all(struct obd_device *obddev)
257 {
258         struct list_head *tmp, *pos;
259         int rc = 0;
260
261         ldlm_lock(obddev);
262
263         list_for_each_safe(tmp, pos, &obddev->u.ldlm.ldlm_namespaces) { 
264                 struct ldlm_namespace *ns;
265                 ns = list_entry(tmp, struct ldlm_namespace, ns_link);
266
267                 rc |= do_free_namespace(ns);
268         }
269
270         ldlm_unlock(obddev);
271
272         return rc;
273 }
274
275 static int ldlm_cleanup(struct obd_device *obddev)
276 {
277         struct ldlm_obd *ldlm = &obddev->u.ldlm;
278         ENTRY;
279
280         ptlrpc_stop_thread(ldlm->ldlm_service);
281         rpc_unregister_service(ldlm->ldlm_service);
282
283         if (!list_empty(&ldlm->ldlm_service->srv_reqs)) {
284                 // XXX reply with errors and clean up
285                 CERROR("Request list not empty!\n");
286         }
287
288         OBD_FREE(ldlm->ldlm_service, sizeof(*ldlm->ldlm_service));
289
290         if (ldlm_free_all(obddev)) {
291                 CERROR("ldlm_free_all could not complete.\n");
292                 RETURN(-1);
293         }
294
295         MOD_DEC_USE_COUNT;
296         RETURN(0);
297 }
298
299 struct obd_ops ldlm_obd_ops = {
300         o_iocontrol:   ldlm_iocontrol,
301         o_setup:       ldlm_setup,
302         o_cleanup:     ldlm_cleanup,
303         o_connect:     gen_connect,
304         o_disconnect:  gen_disconnect
305 };
306
307
308 static int __init ldlm_init(void)
309 {
310         int rc = obd_register_type(&ldlm_obd_ops, OBD_LDLM_DEVICENAME);
311         if (rc != 0)
312                 return rc;
313
314         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
315                                                sizeof(struct ldlm_resource), 0,
316                                                SLAB_HWCACHE_ALIGN, NULL, NULL);
317         if (ldlm_resource_slab == NULL)
318                 return -ENOMEM;
319
320         ldlm_lock_slab = kmem_cache_create("ldlm_locks",
321                                            sizeof(struct ldlm_lock), 0,
322                                            SLAB_HWCACHE_ALIGN, NULL, NULL);
323         if (ldlm_lock_slab == NULL) {
324                 kmem_cache_destroy(ldlm_resource_slab);
325                 return -ENOMEM;
326         }
327
328         return 0;
329 }
330
331 static void __exit ldlm_exit(void)
332 {
333         obd_unregister_type(OBD_LDLM_DEVICENAME);
334         kmem_cache_destroy(ldlm_resource_slab);
335         kmem_cache_destroy(ldlm_lock_slab);
336 }
337
338 MODULE_AUTHOR("Cluster File Systems, Inc. <braam@clusterfs.com>");
339 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
340 MODULE_LICENSE("GPL"); 
341
342 module_init(ldlm_init);
343 module_exit(ldlm_exit);