Whamcloud - gitweb
93c9e192a8c2939126d688427665213f3177dd30
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  */
11
12 #define EXPORT_SYMTAB
13 #define DEBUG_SUBSYSTEM S_LDLM
14
15 #include <linux/module.h>
16 #include <linux/slab.h>
17 #include <linux/lustre_dlm.h>
18
19 extern kmem_cache_t *ldlm_resource_slab;
20 extern kmem_cache_t *ldlm_lock_slab;
21 extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
22 extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
23
24 #define LOOPBACK(x) (((x) & cpu_to_be32(0xff000000)) == cpu_to_be32(0x7f000000))
25
26 static int is_local_conn(struct ptlrpc_connection *conn)
27 {
28         ENTRY;
29         if (conn == NULL)
30                 RETURN(1);
31
32         RETURN(LOOPBACK(conn->c_peer.peer_nid));
33 }
34
35 /* _ldlm_callback and local_callback setup the variables then call this common
36  * code */
37 static int common_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
38                            ldlm_mode_t mode, void *data, __u32 data_len)
39 {
40         ENTRY;
41
42         if (!lock)
43                 LBUG();
44         if (!lock->l_resource)
45                 LBUG();
46
47         ldlm_lock_dump(lock);
48
49         spin_lock(&lock->l_resource->lr_lock);
50         spin_lock(&lock->l_lock);
51         if (!new) {
52                 CDEBUG(D_INFO, "Got local completion AST for lock %p.\n", lock);
53                 lock->l_req_mode = mode;
54                 list_del_init(&lock->l_res_link); 
55                 ldlm_grant_lock(lock->l_resource, lock);
56                 wake_up(&lock->l_waitq);
57                 spin_unlock(&lock->l_lock);
58                 spin_unlock(&lock->l_resource->lr_lock);
59         } else {
60                 CDEBUG(D_INFO, "Got local blocking AST for lock %p.\n", lock);
61                 lock->l_flags |= LDLM_FL_DYING;
62                 spin_unlock(&lock->l_lock);
63                 spin_unlock(&lock->l_resource->lr_lock);
64                 if (!lock->l_readers && !lock->l_writers) {
65                         CDEBUG(D_INFO, "Lock already unused, calling "
66                                "callback (%p).\n", lock->l_blocking_ast);
67                         if (lock->l_blocking_ast != NULL)
68                                 lock->l_blocking_ast(lock, new, lock->l_data,
69                                                      lock->l_data_len);
70                 } else {
71                         CDEBUG(D_INFO, "Lock still has references; lock will be"
72                                " cancelled later.\n");
73                 }
74         }
75         RETURN(0);
76 }
77
78 /* FIXME: I think that this is no longer necessary. */
79 static int local_callback(struct ldlm_lock *l, struct ldlm_lock *new,
80                           void *data, __u32 data_len)
81 {
82         struct ldlm_lock *lock;
83         /* the 'remote handle' is the lock in the FS's namespace */
84         lock = lustre_handle2object(&l->l_remote_handle);
85
86         return common_callback(lock, new, l->l_granted_mode, data, data_len);
87 }
88
89 static int _ldlm_enqueue(struct obd_device *obddev, struct ptlrpc_service *svc,
90                          struct ptlrpc_request *req)
91 {
92         struct ldlm_reply *dlm_rep;
93         struct ldlm_request *dlm_req;
94         int rc, size = sizeof(*dlm_rep), cookielen;
95         __u32 flags;
96         ldlm_error_t err;
97         struct ldlm_lock *lock = NULL;
98         ldlm_lock_callback callback;
99         struct lustre_handle lockh;
100         void *cookie;
101         ENTRY;
102
103         /* Is this lock managed locally? */
104         if (is_local_conn(req->rq_connection))
105                 callback = local_callback;
106         else
107                 callback = ldlm_cli_callback;
108
109         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
110         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_MDSINTENT) {
111                 /* In this case, the reply buffer is allocated deep in
112                  * local_lock_enqueue by the policy function. */
113                 cookie = req;
114                 cookielen = sizeof(*req);
115         } else {
116                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
117                                      &req->rq_repmsg);
118                 if (rc) {
119                         CERROR("out of memory\n");
120                         RETURN(-ENOMEM);
121                 }
122                 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) {
123                         cookie = &dlm_req->lock_desc.l_extent;
124                         cookielen = sizeof(struct ldlm_extent);
125                 }
126         }
127
128         err = ldlm_local_lock_create(obddev->obd_namespace,
129                                      &dlm_req->lock_handle2,
130                                      dlm_req->lock_desc.l_resource.lr_name,
131                                      dlm_req->lock_desc.l_resource.lr_type,
132                                      dlm_req->lock_desc.l_req_mode,
133                                      NULL, 0, &lockh);
134         if (err != ELDLM_OK)
135                 GOTO(out, err);
136
137         flags = dlm_req->lock_flags;
138         err = ldlm_local_lock_enqueue(&lockh, cookie, cookielen, &flags,
139                                       callback, callback);
140         if (err != ELDLM_OK)
141                 GOTO(out, err);
142
143         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
144         dlm_rep->lock_flags = flags;
145
146         memcpy(&dlm_rep->lock_handle, &lockh, sizeof(lockh));
147         lock = lustre_handle2object(&lockh);
148         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
149                 memcpy(&dlm_rep->lock_extent, &lock->l_extent,
150                        sizeof(lock->l_extent));
151         if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED)
152                 memcpy(dlm_rep->lock_resource_name, lock->l_resource->lr_name,
153                        sizeof(dlm_rep->lock_resource_name));
154
155         memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
156                sizeof(lock->l_remote_handle));
157         lock->l_connection = ptlrpc_connection_addref(req->rq_connection);
158         EXIT;
159  out:
160         req->rq_status = err;
161         CDEBUG(D_INFO, "err = %d\n", err);
162
163         if (ptlrpc_reply(svc, req))
164                 LBUG();
165
166         if (!err)
167                 ldlm_reprocess_all(lock->l_resource);
168
169         return 0;
170 }
171
172 static int _ldlm_convert(struct ptlrpc_service *svc, struct ptlrpc_request *req)
173 {
174         struct ldlm_request *dlm_req, *dlm_rep;
175         struct ldlm_resource *res;
176         int rc, size = sizeof(*dlm_rep);
177         ENTRY;
178
179         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
180         if (rc) {
181                 CERROR("out of memory\n");
182                 RETURN(-ENOMEM);
183         }
184         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
185         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
186         dlm_rep->lock_flags = dlm_req->lock_flags;
187
188         res = ldlm_local_lock_convert(&dlm_req->lock_handle1,
189                                       dlm_req->lock_desc.l_req_mode,
190                                       &dlm_rep->lock_flags);
191         req->rq_status = 0;
192         if (ptlrpc_reply(svc, req) != 0)
193                 LBUG();
194
195         ldlm_reprocess_all(res);
196
197         RETURN(0);
198 }
199
200 static int _ldlm_cancel(struct ptlrpc_service *svc, struct ptlrpc_request *req)
201 {
202         struct ldlm_request *dlm_req;
203         struct ldlm_lock *lock;
204         struct ldlm_resource *res;
205         int rc;
206         ENTRY;
207
208         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
209         if (rc) {
210                 CERROR("out of memory\n");
211                 RETURN(-ENOMEM);
212         }
213         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
214
215         lock = lustre_handle2object(&dlm_req->lock_handle1);
216         res = ldlm_local_lock_cancel(lock);
217         req->rq_status = 0;
218         if (ptlrpc_reply(svc, req) != 0)
219                 LBUG();
220
221         if (res != NULL)
222                 ldlm_reprocess_all(res);
223
224         RETURN(0);
225 }
226
227 static int _ldlm_callback(struct ptlrpc_service *svc,
228                           struct ptlrpc_request *req)
229 {
230         struct ldlm_request *dlm_req;
231         struct ldlm_lock *lock1, *lock2;
232         int rc;
233         ENTRY;
234
235         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
236         if (rc) {
237                 CERROR("out of memory\n");
238                 RETURN(-ENOMEM);
239         }
240         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
241
242         /* We must send the reply first, so that the thread is free to handle
243          * any requests made in common_callback() */
244         rc = ptlrpc_reply(svc, req);
245         if (rc != 0)
246                 RETURN(rc);
247
248         lock1 = lustre_handle2object(&dlm_req->lock_handle1);
249         lock2 = lustre_handle2object(&dlm_req->lock_handle2);
250
251         common_callback(lock1, lock2, dlm_req->lock_desc.l_granted_mode, NULL,
252                         0);
253         RETURN(0);
254 }
255
256 static int lustre_handle(struct obd_device *dev, struct ptlrpc_service *svc,
257                        struct ptlrpc_request *req)
258 {
259         struct obd_device *req_dev;
260         int id, rc;
261         ENTRY;
262
263         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
264         if (rc) {
265                 CERROR("lustre_ldlm: Invalid request\n");
266                 GOTO(out, rc);
267         }
268
269         if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
270                 CERROR("lustre_ldlm: wrong packet type sent %d\n",
271                        req->rq_reqmsg->type);
272                 GOTO(out, rc = -EINVAL);
273         }
274
275         id = req->rq_reqmsg->target_id;
276         if (id < 0 || id > MAX_OBD_DEVICES)
277                 GOTO(out, rc = -ENODEV);
278         req_dev = req->rq_obd = &obd_dev[id];
279
280         switch (req->rq_reqmsg->opc) {
281         case LDLM_ENQUEUE:
282                 CDEBUG(D_INODE, "enqueue\n");
283                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
284                 rc = _ldlm_enqueue(req_dev, svc, req);
285                 break;
286
287         case LDLM_CONVERT:
288                 CDEBUG(D_INODE, "convert\n");
289                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
290                 rc = _ldlm_convert(svc, req);
291                 break;
292
293         case LDLM_CANCEL:
294                 CDEBUG(D_INODE, "cancel\n");
295                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
296                 rc = _ldlm_cancel(svc, req);
297                 break;
298
299         case LDLM_CALLBACK:
300                 CDEBUG(D_INODE, "callback\n");
301                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
302                 rc = _ldlm_callback(svc, req);
303                 break;
304
305         default:
306                 rc = ptlrpc_error(svc, req);
307                 RETURN(rc);
308         }
309
310         EXIT;
311 out:
312         if (rc)
313                 RETURN(ptlrpc_error(svc, req));
314         return 0;
315 }
316
317 static int ldlm_iocontrol(long cmd, struct obd_conn *conn, int len, void *karg,
318                           void *uarg)
319 {
320         struct obd_device *obddev = conn->oc_dev;
321         struct ptlrpc_connection *connection;
322         int err;
323         ENTRY;
324
325         if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
326             _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
327                 CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
328                        _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
329                 RETURN(-EINVAL);
330         }
331
332         ptlrpc_init_client(NULL, NULL,
333                            LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
334                            obddev->u.ldlm.ldlm_client);
335         connection = ptlrpc_uuid_to_connection("ldlm");
336         if (!connection)
337                 CERROR("No LDLM UUID found: assuming ldlm is local.\n");
338
339         switch (cmd) {
340         case IOC_LDLM_TEST: {
341                 err = ldlm_test(obddev, connection);
342                 CERROR("-- done err %d\n", err);
343                 GOTO(out, err);
344         }
345         default:
346                 GOTO(out, err = -EINVAL);
347         }
348
349  out:
350         if (connection)
351                 ptlrpc_put_connection(connection);
352         return err;
353 }
354
355 static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
356 {
357         struct ldlm_obd *ldlm = &obddev->u.ldlm;
358         int err;
359         ENTRY;
360
361         ldlm->ldlm_service =
362                 ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL,
363                                 LDLM_REPLY_PORTAL, "self", lustre_handle);
364         if (!ldlm->ldlm_service)
365                 LBUG();
366
367         err = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm");
368         if (err) {
369                 CERROR("cannot start thread\n");
370                 LBUG();
371         }
372         err = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm");
373         if (err) {
374                 CERROR("cannot start thread\n");
375                 LBUG();
376         }
377         err = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm");
378         if (err) {
379                 CERROR("cannot start thread\n");
380                 LBUG();
381         }
382         err = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm");
383         if (err) {
384                 CERROR("cannot start thread\n");
385                 LBUG();
386         }
387
388         MOD_INC_USE_COUNT;
389         RETURN(0);
390 }
391
392 static int ldlm_cleanup(struct obd_device *obddev)
393 {
394         struct ldlm_obd *ldlm = &obddev->u.ldlm;
395         ENTRY;
396
397         ptlrpc_stop_all_threads(ldlm->ldlm_service);
398         rpc_unregister_service(ldlm->ldlm_service);
399
400         if (!list_empty(&ldlm->ldlm_service->srv_reqs)) {
401                 // XXX reply with errors and clean up
402                 CERROR("Request list not empty!\n");
403         }
404
405         OBD_FREE(ldlm->ldlm_client, sizeof(*ldlm->ldlm_client));
406         OBD_FREE(ldlm->ldlm_service, sizeof(*ldlm->ldlm_service));
407
408         if (mds_reint_p != NULL)
409                 inter_module_put("mds_reint");
410         if (mds_getattr_name_p != NULL)
411                 inter_module_put("mds_getattr_name");
412
413         MOD_DEC_USE_COUNT;
414         RETURN(0);
415 }
416
417 struct obd_ops ldlm_obd_ops = {
418         o_iocontrol:   ldlm_iocontrol,
419         o_setup:       ldlm_setup,
420         o_cleanup:     ldlm_cleanup,
421         o_connect:     gen_connect,
422         o_disconnect:  gen_disconnect
423 };
424
425
426 static int __init ldlm_init(void)
427 {
428         int rc = obd_register_type(&ldlm_obd_ops, OBD_LDLM_DEVICENAME);
429         if (rc != 0)
430                 return rc;
431
432         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
433                                                sizeof(struct ldlm_resource), 0,
434                                                SLAB_HWCACHE_ALIGN, NULL, NULL);
435         if (ldlm_resource_slab == NULL)
436                 return -ENOMEM;
437
438         ldlm_lock_slab = kmem_cache_create("ldlm_locks",
439                                            sizeof(struct ldlm_lock), 0,
440                                            SLAB_HWCACHE_ALIGN, NULL, NULL);
441         if (ldlm_lock_slab == NULL) {
442                 kmem_cache_destroy(ldlm_resource_slab);
443                 return -ENOMEM;
444         }
445
446         return 0;
447 }
448
449 static void __exit ldlm_exit(void)
450 {
451         obd_unregister_type(OBD_LDLM_DEVICENAME);
452         kmem_cache_destroy(ldlm_resource_slab);
453         kmem_cache_destroy(ldlm_lock_slab);
454 }
455
456 EXPORT_SYMBOL(ldlm_local_lock_match);
457 EXPORT_SYMBOL(ldlm_lock_addref);
458 EXPORT_SYMBOL(ldlm_lock_decref);
459 EXPORT_SYMBOL(ldlm_cli_convert);
460 EXPORT_SYMBOL(ldlm_cli_enqueue);
461 EXPORT_SYMBOL(ldlm_cli_cancel);
462 EXPORT_SYMBOL(lustre_handle2object);
463 EXPORT_SYMBOL(ldlm_test);
464 EXPORT_SYMBOL(ldlm_lock_dump);
465 EXPORT_SYMBOL(ldlm_namespace_new);
466 EXPORT_SYMBOL(ldlm_namespace_free);
467
468 MODULE_AUTHOR("Cluster File Systems, Inc. <braam@clusterfs.com>");
469 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
470 MODULE_LICENSE("GPL");
471
472 module_init(ldlm_init);
473 module_exit(ldlm_exit);