Whamcloud - gitweb
- Fixed remote dlm lock conversion
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  */
11
12 #define EXPORT_SYMTAB
13 #define DEBUG_SUBSYSTEM S_LDLM
14
15 #include <linux/module.h>
16 #include <linux/slab.h>
17 #include <linux/lustre_dlm.h>
18
19 extern kmem_cache_t *ldlm_resource_slab;
20 extern kmem_cache_t *ldlm_lock_slab;
21 extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
22 extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
23
24 #define LOOPBACK(x) (((x) & cpu_to_be32(0xff000000)) == cpu_to_be32(0x7f000000))
25
26 static int is_local_conn(struct ptlrpc_connection *conn)
27 {
28         ENTRY;
29         if (conn == NULL)
30                 RETURN(1);
31
32         RETURN(LOOPBACK(conn->c_peer.peer_nid));
33 }
34
35 /* _ldlm_callback and local_callback setup the variables then call this common
36  * code */
37 static int common_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
38                            ldlm_mode_t mode, void *data, __u32 data_len)
39 {
40         ENTRY;
41
42         if (!lock)
43                 LBUG();
44         if (!lock->l_resource)
45                 LBUG();
46
47         ldlm_lock_dump(lock);
48
49         spin_lock(&lock->l_resource->lr_lock);
50         spin_lock(&lock->l_lock);
51         if (!new) {
52                 CDEBUG(D_INFO, "Got local completion AST for lock %p.\n", lock);
53                 lock->l_req_mode = mode;
54                 list_del_init(&lock->l_res_link); 
55                 ldlm_grant_lock(lock->l_resource, lock);
56                 wake_up(&lock->l_waitq);
57                 spin_unlock(&lock->l_lock);
58                 spin_unlock(&lock->l_resource->lr_lock);
59         } else {
60                 CDEBUG(D_INFO, "Got local blocking AST for lock %p.\n", lock);
61                 lock->l_flags |= LDLM_FL_DYING;
62                 spin_unlock(&lock->l_lock);
63                 spin_unlock(&lock->l_resource->lr_lock);
64                 if (!lock->l_readers && !lock->l_writers) {
65                         CDEBUG(D_INFO, "Lock already unused, calling "
66                                "callback (%p).\n", lock->l_blocking_ast);
67                         if (lock->l_blocking_ast != NULL)
68                                 lock->l_blocking_ast(lock, new, lock->l_data,
69                                                      lock->l_data_len);
70                 } else {
71                         CDEBUG(D_INFO, "Lock still has references; lock will be"
72                                " cancelled later.\n");
73                 }
74         }
75         RETURN(0);
76 }
77
78 /* FIXME: I think that this is no longer necessary. */
79 static int local_callback(struct ldlm_lock *l, struct ldlm_lock *new,
80                           void *data, __u32 data_len)
81 {
82         struct ldlm_lock *lock;
83         /* the 'remote handle' is the lock in the FS's namespace */
84         lock = lustre_handle2object(&l->l_remote_handle);
85
86         return common_callback(lock, new, l->l_granted_mode, data, data_len);
87 }
88
89 static int _ldlm_enqueue(struct obd_device *obddev, struct ptlrpc_service *svc,
90                          struct ptlrpc_request *req)
91 {
92         struct ldlm_reply *dlm_rep;
93         struct ldlm_request *dlm_req;
94         int rc, size = sizeof(*dlm_rep), cookielen;
95         __u32 flags;
96         ldlm_error_t err;
97         struct ldlm_lock *lock = NULL;
98         ldlm_lock_callback callback;
99         struct lustre_handle lockh;
100         void *cookie;
101         ENTRY;
102
103         /* Is this lock managed locally? */
104         if (is_local_conn(req->rq_connection))
105                 callback = local_callback;
106         else
107                 callback = ldlm_cli_callback;
108
109         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
110         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_MDSINTENT) {
111                 /* In this case, the reply buffer is allocated deep in
112                  * local_lock_enqueue by the policy function. */
113                 cookie = req;
114                 cookielen = sizeof(*req);
115         } else {
116                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
117                                      &req->rq_repmsg);
118                 if (rc) {
119                         CERROR("out of memory\n");
120                         RETURN(-ENOMEM);
121                 }
122                 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) {
123                         cookie = &dlm_req->lock_desc.l_extent;
124                         cookielen = sizeof(struct ldlm_extent);
125                 }
126         }
127
128         err = ldlm_local_lock_create(obddev->obd_namespace,
129                                      &dlm_req->lock_handle2,
130                                      dlm_req->lock_desc.l_resource.lr_name,
131                                      dlm_req->lock_desc.l_resource.lr_type,
132                                      dlm_req->lock_desc.l_req_mode,
133                                      NULL, 0, &lockh);
134         if (err != ELDLM_OK)
135                 GOTO(out, err);
136
137         flags = dlm_req->lock_flags;
138         err = ldlm_local_lock_enqueue(&lockh, cookie, cookielen, &flags,
139                                       callback, callback);
140         if (err != ELDLM_OK)
141                 GOTO(out, err);
142
143         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
144         dlm_rep->lock_flags = flags;
145
146         memcpy(&dlm_rep->lock_handle, &lockh, sizeof(lockh));
147         lock = lustre_handle2object(&lockh);
148         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
149                 memcpy(&dlm_rep->lock_extent, &lock->l_extent,
150                        sizeof(lock->l_extent));
151         if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED)
152                 memcpy(dlm_rep->lock_resource_name, lock->l_resource->lr_name,
153                        sizeof(dlm_rep->lock_resource_name));
154
155         memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
156                sizeof(lock->l_remote_handle));
157         lock->l_connection = ptlrpc_connection_addref(req->rq_connection);
158         EXIT;
159  out:
160         req->rq_status = err;
161         CDEBUG(D_INFO, "err = %d\n", err);
162
163         if (ptlrpc_reply(svc, req))
164                 LBUG();
165
166         if (!err)
167                 ldlm_reprocess_all(lock->l_resource);
168
169         return 0;
170 }
171
172 static int _ldlm_convert(struct ptlrpc_service *svc, struct ptlrpc_request *req)
173 {
174         struct ldlm_request *dlm_req;
175         struct ldlm_reply *dlm_rep;
176         struct ldlm_resource *res;
177         int rc, size = sizeof(*dlm_rep);
178         ENTRY;
179
180         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
181         if (rc) {
182                 CERROR("out of memory\n");
183                 RETURN(-ENOMEM);
184         }
185         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
186         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
187         dlm_rep->lock_flags = dlm_req->lock_flags;
188
189         res = ldlm_local_lock_convert(&dlm_req->lock_handle1,
190                                       dlm_req->lock_desc.l_req_mode,
191                                       &dlm_rep->lock_flags);
192         req->rq_status = 0;
193         if (ptlrpc_reply(svc, req) != 0)
194                 LBUG();
195
196         ldlm_reprocess_all(res);
197
198         RETURN(0);
199 }
200
201 static int _ldlm_cancel(struct ptlrpc_service *svc, struct ptlrpc_request *req)
202 {
203         struct ldlm_request *dlm_req;
204         struct ldlm_lock *lock;
205         struct ldlm_resource *res;
206         int rc;
207         ENTRY;
208
209         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
210         if (rc) {
211                 CERROR("out of memory\n");
212                 RETURN(-ENOMEM);
213         }
214         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
215
216         lock = lustre_handle2object(&dlm_req->lock_handle1);
217         res = ldlm_local_lock_cancel(lock);
218         req->rq_status = 0;
219         if (ptlrpc_reply(svc, req) != 0)
220                 LBUG();
221
222         if (res != NULL)
223                 ldlm_reprocess_all(res);
224
225         RETURN(0);
226 }
227
228 static int _ldlm_callback(struct ptlrpc_service *svc,
229                           struct ptlrpc_request *req)
230 {
231         struct ldlm_request *dlm_req;
232         struct ldlm_lock *lock1, *lock2;
233         int rc;
234         ENTRY;
235
236         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
237         if (rc) {
238                 CERROR("out of memory\n");
239                 RETURN(-ENOMEM);
240         }
241         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
242
243         /* We must send the reply first, so that the thread is free to handle
244          * any requests made in common_callback() */
245         rc = ptlrpc_reply(svc, req);
246         if (rc != 0)
247                 RETURN(rc);
248
249         lock1 = lustre_handle2object(&dlm_req->lock_handle1);
250         lock2 = lustre_handle2object(&dlm_req->lock_handle2);
251
252         common_callback(lock1, lock2, dlm_req->lock_desc.l_granted_mode, NULL,
253                         0);
254         RETURN(0);
255 }
256
257 static int lustre_handle(struct obd_device *dev, struct ptlrpc_service *svc,
258                        struct ptlrpc_request *req)
259 {
260         struct obd_device *req_dev;
261         int id, rc;
262         ENTRY;
263
264         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
265         if (rc) {
266                 CERROR("lustre_ldlm: Invalid request\n");
267                 GOTO(out, rc);
268         }
269
270         if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
271                 CERROR("lustre_ldlm: wrong packet type sent %d\n",
272                        req->rq_reqmsg->type);
273                 GOTO(out, rc = -EINVAL);
274         }
275
276         id = req->rq_reqmsg->target_id;
277         if (id < 0 || id > MAX_OBD_DEVICES)
278                 GOTO(out, rc = -ENODEV);
279         req_dev = req->rq_obd = &obd_dev[id];
280
281         switch (req->rq_reqmsg->opc) {
282         case LDLM_ENQUEUE:
283                 CDEBUG(D_INODE, "enqueue\n");
284                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
285                 rc = _ldlm_enqueue(req_dev, svc, req);
286                 break;
287
288         case LDLM_CONVERT:
289                 CDEBUG(D_INODE, "convert\n");
290                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
291                 rc = _ldlm_convert(svc, req);
292                 break;
293
294         case LDLM_CANCEL:
295                 CDEBUG(D_INODE, "cancel\n");
296                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
297                 rc = _ldlm_cancel(svc, req);
298                 break;
299
300         case LDLM_CALLBACK:
301                 CDEBUG(D_INODE, "callback\n");
302                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
303                 rc = _ldlm_callback(svc, req);
304                 break;
305
306         default:
307                 rc = ptlrpc_error(svc, req);
308                 RETURN(rc);
309         }
310
311         EXIT;
312 out:
313         if (rc)
314                 RETURN(ptlrpc_error(svc, req));
315         return 0;
316 }
317
318 static int ldlm_iocontrol(long cmd, struct obd_conn *conn, int len, void *karg,
319                           void *uarg)
320 {
321         struct obd_device *obddev = conn->oc_dev;
322         struct ptlrpc_connection *connection;
323         int err;
324         ENTRY;
325
326         if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
327             _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
328                 CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
329                        _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
330                 RETURN(-EINVAL);
331         }
332
333         ptlrpc_init_client(NULL, NULL,
334                            LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
335                            obddev->u.ldlm.ldlm_client);
336         connection = ptlrpc_uuid_to_connection("ldlm");
337         if (!connection)
338                 CERROR("No LDLM UUID found: assuming ldlm is local.\n");
339
340         switch (cmd) {
341         case IOC_LDLM_TEST: {
342                 err = ldlm_test(obddev, connection);
343                 CERROR("-- done err %d\n", err);
344                 GOTO(out, err);
345         }
346         default:
347                 GOTO(out, err = -EINVAL);
348         }
349
350  out:
351         if (connection)
352                 ptlrpc_put_connection(connection);
353         return err;
354 }
355
356 static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
357 {
358         struct ldlm_obd *ldlm = &obddev->u.ldlm;
359         int err;
360         ENTRY;
361
362         ldlm->ldlm_service =
363                 ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL,
364                                 LDLM_REPLY_PORTAL, "self", lustre_handle);
365         if (!ldlm->ldlm_service)
366                 LBUG();
367
368         err = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm");
369         if (err) {
370                 CERROR("cannot start thread\n");
371                 LBUG();
372         }
373         err = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm");
374         if (err) {
375                 CERROR("cannot start thread\n");
376                 LBUG();
377         }
378         err = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm");
379         if (err) {
380                 CERROR("cannot start thread\n");
381                 LBUG();
382         }
383         err = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm");
384         if (err) {
385                 CERROR("cannot start thread\n");
386                 LBUG();
387         }
388
389         MOD_INC_USE_COUNT;
390         RETURN(0);
391 }
392
393 static int ldlm_cleanup(struct obd_device *obddev)
394 {
395         struct ldlm_obd *ldlm = &obddev->u.ldlm;
396         ENTRY;
397
398         ptlrpc_stop_all_threads(ldlm->ldlm_service);
399         rpc_unregister_service(ldlm->ldlm_service);
400
401         if (!list_empty(&ldlm->ldlm_service->srv_reqs)) {
402                 // XXX reply with errors and clean up
403                 CERROR("Request list not empty!\n");
404         }
405
406         OBD_FREE(ldlm->ldlm_client, sizeof(*ldlm->ldlm_client));
407         OBD_FREE(ldlm->ldlm_service, sizeof(*ldlm->ldlm_service));
408
409         if (mds_reint_p != NULL)
410                 inter_module_put("mds_reint");
411         if (mds_getattr_name_p != NULL)
412                 inter_module_put("mds_getattr_name");
413
414         MOD_DEC_USE_COUNT;
415         RETURN(0);
416 }
417
418 struct obd_ops ldlm_obd_ops = {
419         o_iocontrol:   ldlm_iocontrol,
420         o_setup:       ldlm_setup,
421         o_cleanup:     ldlm_cleanup,
422         o_connect:     gen_connect,
423         o_disconnect:  gen_disconnect
424 };
425
426
427 static int __init ldlm_init(void)
428 {
429         int rc = obd_register_type(&ldlm_obd_ops, OBD_LDLM_DEVICENAME);
430         if (rc != 0)
431                 return rc;
432
433         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
434                                                sizeof(struct ldlm_resource), 0,
435                                                SLAB_HWCACHE_ALIGN, NULL, NULL);
436         if (ldlm_resource_slab == NULL)
437                 return -ENOMEM;
438
439         ldlm_lock_slab = kmem_cache_create("ldlm_locks",
440                                            sizeof(struct ldlm_lock), 0,
441                                            SLAB_HWCACHE_ALIGN, NULL, NULL);
442         if (ldlm_lock_slab == NULL) {
443                 kmem_cache_destroy(ldlm_resource_slab);
444                 return -ENOMEM;
445         }
446
447         return 0;
448 }
449
450 static void __exit ldlm_exit(void)
451 {
452         obd_unregister_type(OBD_LDLM_DEVICENAME);
453         if (kmem_cache_destroy(ldlm_resource_slab) != 0)
454                 CERROR("couldn't free ldlm resource slab\n");
455         if (kmem_cache_destroy(ldlm_lock_slab) != 0)
456                 CERROR("couldn't free ldlm lock slab\n");
457 }
458
459 EXPORT_SYMBOL(ldlm_local_lock_match);
460 EXPORT_SYMBOL(ldlm_lock_addref);
461 EXPORT_SYMBOL(ldlm_lock_decref);
462 EXPORT_SYMBOL(ldlm_cli_convert);
463 EXPORT_SYMBOL(ldlm_cli_enqueue);
464 EXPORT_SYMBOL(ldlm_cli_cancel);
465 EXPORT_SYMBOL(lustre_handle2object);
466 EXPORT_SYMBOL(ldlm_test);
467 EXPORT_SYMBOL(ldlm_lock_dump);
468 EXPORT_SYMBOL(ldlm_namespace_new);
469 EXPORT_SYMBOL(ldlm_namespace_free);
470
471 MODULE_AUTHOR("Cluster File Systems, Inc. <braam@clusterfs.com>");
472 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
473 MODULE_LICENSE("GPL");
474
475 module_init(ldlm_init);
476 module_exit(ldlm_exit);