Whamcloud - gitweb
Landing the mds_lock_devel branch on the trunk. Notables:
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  */
11
12 #define EXPORT_SYMTAB
13 #define DEBUG_SUBSYSTEM S_LDLM
14
15 #include <linux/module.h>
16 #include <linux/slab.h>
17 #include <linux/lustre_dlm.h>
18
19 extern kmem_cache_t *ldlm_resource_slab;
20 extern kmem_cache_t *ldlm_lock_slab;
21 extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
22 extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
23
24 #define LOOPBACK(x) (((x) & cpu_to_be32(0xff000000)) == cpu_to_be32(0x7f000000))
25
26 static int is_local_conn(struct ptlrpc_connection *conn)
27 {
28         ENTRY;
29         if (conn == NULL)
30                 RETURN(1);
31
32         RETURN(LOOPBACK(conn->c_peer.peer_nid));
33 }
34
35 /* _ldlm_callback and local_callback setup the variables then call this common
36  * code */
37 static int common_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
38                            ldlm_mode_t mode, void *data, __u32 data_len)
39 {
40         ENTRY;
41
42         if (!lock)
43                 LBUG();
44         if (!lock->l_resource)
45                 LBUG();
46
47         ldlm_lock_dump(lock);
48
49         spin_lock(&lock->l_resource->lr_lock);
50         spin_lock(&lock->l_lock);
51         if (!new) {
52                 CDEBUG(D_INFO, "Got local completion AST for lock %p.\n", lock);
53                 lock->l_req_mode = mode;
54                 list_del_init(&lock->l_res_link); 
55                 ldlm_grant_lock(lock->l_resource, lock);
56                 wake_up(&lock->l_waitq);
57                 spin_unlock(&lock->l_lock);
58                 spin_unlock(&lock->l_resource->lr_lock);
59         } else {
60                 CDEBUG(D_INFO, "Got local blocking AST for lock %p.\n", lock);
61                 lock->l_flags |= LDLM_FL_DYING;
62                 spin_unlock(&lock->l_lock);
63                 spin_unlock(&lock->l_resource->lr_lock);
64                 if (!lock->l_readers && !lock->l_writers) {
65                         CDEBUG(D_INFO, "Lock already unused, calling "
66                                "callback (%p).\n", lock->l_blocking_ast);
67                         if (lock->l_blocking_ast != NULL)
68                                 lock->l_blocking_ast(lock, new, lock->l_data,
69                                                      lock->l_data_len);
70                 } else {
71                         CDEBUG(D_INFO, "Lock still has references; lock will be"
72                                " cancelled later.\n");
73                 }
74         }
75         RETURN(0);
76 }
77
78 /* FIXME: I think that this is no longer necessary. */
79 static int local_callback(struct ldlm_lock *l, struct ldlm_lock *new,
80                           void *data, __u32 data_len)
81 {
82         struct ldlm_lock *lock;
83         /* the 'remote handle' is the lock in the FS's namespace */
84         lock = lustre_handle2object(&l->l_remote_handle);
85
86         return common_callback(lock, new, l->l_granted_mode, data, data_len);
87 }
88
89 static int _ldlm_enqueue(struct obd_device *obddev, struct ptlrpc_service *svc,
90                          struct ptlrpc_request *req)
91 {
92         struct ldlm_reply *dlm_rep;
93         struct ldlm_request *dlm_req;
94         int rc, size = sizeof(*dlm_rep), cookielen;
95         __u32 flags;
96         ldlm_error_t err;
97         struct ldlm_lock *lock = NULL;
98         ldlm_lock_callback callback;
99         struct lustre_handle lockh;
100         void *cookie;
101         ENTRY;
102
103         /* Is this lock managed locally? */
104         if (is_local_conn(req->rq_connection))
105                 callback = local_callback;
106         else
107                 callback = ldlm_cli_callback;
108
109         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
110         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_MDSINTENT) {
111                 /* In this case, the reply buffer is allocated deep in
112                  * local_lock_enqueue by the policy function. */
113                 cookie = req;
114                 cookielen = sizeof(*req);
115         } else {
116                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
117                                      &req->rq_repmsg);
118                 if (rc) {
119                         CERROR("out of memory\n");
120                         RETURN(-ENOMEM);
121                 }
122                 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) {
123                         cookie = &dlm_req->lock_desc.l_extent;
124                         cookielen = sizeof(struct ldlm_extent);
125                 }
126         }
127
128         err = ldlm_local_lock_create(obddev->obd_namespace,
129                                      &dlm_req->lock_handle2,
130                                      dlm_req->lock_desc.l_resource.lr_name,
131                                      dlm_req->lock_desc.l_resource.lr_type,
132                                      dlm_req->lock_desc.l_req_mode,
133                                      NULL, 0, &lockh);
134         if (err != ELDLM_OK)
135                 GOTO(out, err);
136
137         flags = dlm_req->lock_flags;
138         err = ldlm_local_lock_enqueue(&lockh,
139                                       cookie, cookielen,
140                                       &flags,
141                                       callback, callback);
142         if (err != ELDLM_OK)
143                 GOTO(out, err);
144
145         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
146         dlm_rep->lock_flags = flags;
147
148         memcpy(&dlm_rep->lock_handle, &lockh, sizeof(lockh));
149         lock = lustre_handle2object(&lockh);
150         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
151                 memcpy(&dlm_rep->lock_extent, &lock->l_extent,
152                        sizeof(lock->l_extent));
153         if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED)
154                 memcpy(dlm_rep->lock_resource_name, lock->l_resource->lr_name,
155                        sizeof(dlm_rep->lock_resource_name));
156
157         memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
158                sizeof(lock->l_remote_handle));
159         lock->l_connection = ptlrpc_connection_addref(req->rq_connection);
160         EXIT;
161  out:
162         req->rq_status = err;
163         CDEBUG(D_INFO, "err = %d\n", err);
164
165         if (ptlrpc_reply(svc, req))
166                 LBUG();
167
168         if (!err) {
169                 ldlm_reprocess_all(lock->l_resource);
170         }
171
172         return 0;
173 }
174
175 static int _ldlm_convert(struct ptlrpc_service *svc, struct ptlrpc_request *req)
176 {
177         struct ldlm_request *dlm_req, *dlm_rep;
178         struct ldlm_resource *res;
179         int rc, size = sizeof(*dlm_rep);
180         ENTRY;
181
182         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
183         if (rc) {
184                 CERROR("out of memory\n");
185                 RETURN(-ENOMEM);
186         }
187         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
188         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
189         dlm_rep->lock_flags = dlm_req->lock_flags;
190
191         res = ldlm_local_lock_convert(&dlm_req->lock_handle1,
192                                       dlm_req->lock_desc.l_req_mode,
193                                       &dlm_rep->lock_flags);
194         req->rq_status = 0;
195         if (ptlrpc_reply(svc, req) != 0)
196                 LBUG();
197
198         ldlm_reprocess_all(res);
199
200         RETURN(0);
201 }
202
203 static int _ldlm_cancel(struct ptlrpc_service *svc, struct ptlrpc_request *req)
204 {
205         struct ldlm_request *dlm_req;
206         struct ldlm_lock *lock;
207         struct ldlm_resource *res;
208         int rc;
209         ENTRY;
210
211         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
212         if (rc) {
213                 CERROR("out of memory\n");
214                 RETURN(-ENOMEM);
215         }
216         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
217
218         lock = lustre_handle2object(&dlm_req->lock_handle1);
219         res = ldlm_local_lock_cancel(lock);
220         req->rq_status = 0;
221         if (ptlrpc_reply(svc, req) != 0)
222                 LBUG();
223
224         if (res != NULL)
225                 ldlm_reprocess_all(res);
226
227         RETURN(0);
228 }
229
230 static int _ldlm_callback(struct ptlrpc_service *svc,
231                           struct ptlrpc_request *req)
232 {
233         struct ldlm_request *dlm_req;
234         struct ldlm_lock *lock1, *lock2;
235         int rc;
236         ENTRY;
237
238         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
239         if (rc) {
240                 CERROR("out of memory\n");
241                 RETURN(-ENOMEM);
242         }
243         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
244
245         /* We must send the reply first, so that the thread is free to handle
246          * any requests made in common_callback() */
247         rc = ptlrpc_reply(svc, req);
248         if (rc != 0)
249                 RETURN(rc);
250
251         lock1 = lustre_handle2object(&dlm_req->lock_handle1);
252         lock2 = lustre_handle2object(&dlm_req->lock_handle2);
253
254         common_callback(lock1, lock2, dlm_req->lock_desc.l_granted_mode, NULL,
255                         0);
256         RETURN(0);
257 }
258
259 static int lustre_handle(struct obd_device *dev, struct ptlrpc_service *svc,
260                        struct ptlrpc_request *req)
261 {
262         struct obd_device *req_dev;
263         int id, rc;
264         ENTRY;
265
266         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
267         if (rc) {
268                 CERROR("lustre_ldlm: Invalid request\n");
269                 GOTO(out, rc);
270         }
271
272         if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
273                 CERROR("lustre_ldlm: wrong packet type sent %d\n",
274                        req->rq_reqmsg->type);
275                 GOTO(out, rc = -EINVAL);
276         }
277
278         id = req->rq_reqmsg->target_id;
279         if (id < 0 || id > MAX_OBD_DEVICES)
280                 GOTO(out, rc = -ENODEV);
281         req_dev = req->rq_obd = &obd_dev[id];
282
283         switch (req->rq_reqmsg->opc) {
284         case LDLM_ENQUEUE:
285                 CDEBUG(D_INODE, "enqueue\n");
286                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
287                 rc = _ldlm_enqueue(req_dev, svc, req);
288                 break;
289
290         case LDLM_CONVERT:
291                 CDEBUG(D_INODE, "convert\n");
292                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
293                 rc = _ldlm_convert(svc, req);
294                 break;
295
296         case LDLM_CANCEL:
297                 CDEBUG(D_INODE, "cancel\n");
298                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
299                 rc = _ldlm_cancel(svc, req);
300                 break;
301
302         case LDLM_CALLBACK:
303                 CDEBUG(D_INODE, "callback\n");
304                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
305                 rc = _ldlm_callback(svc, req);
306                 break;
307
308         default:
309                 rc = ptlrpc_error(svc, req);
310                 RETURN(rc);
311         }
312
313         EXIT;
314 out:
315         if (rc)
316                 RETURN(ptlrpc_error(svc, req));
317         return 0;
318 }
319
320 static int ldlm_iocontrol(long cmd, struct obd_conn *conn, int len, void *karg,
321                           void *uarg)
322 {
323         struct obd_device *obddev = conn->oc_dev;
324         struct ptlrpc_connection *connection;
325         int err;
326         ENTRY;
327
328         if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
329             _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
330                 CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
331                        _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
332                 RETURN(-EINVAL);
333         }
334
335         ptlrpc_init_client(NULL, NULL,
336                            LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
337                            obddev->u.ldlm.ldlm_client);
338         connection = ptlrpc_uuid_to_connection("ldlm");
339         if (!connection)
340                 CERROR("No LDLM UUID found: assuming ldlm is local.\n");
341
342         switch (cmd) {
343         case IOC_LDLM_TEST: {
344                 err = ldlm_test(obddev, connection);
345                 CERROR("-- done err %d\n", err);
346                 GOTO(out, err);
347         }
348         default:
349                 GOTO(out, err = -EINVAL);
350         }
351
352  out:
353         if (connection)
354                 ptlrpc_put_connection(connection);
355         return err;
356 }
357
358 static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
359 {
360         struct ldlm_obd *ldlm = &obddev->u.ldlm;
361         int err;
362         ENTRY;
363
364         ldlm->ldlm_service =
365                 ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL,
366                                 LDLM_REPLY_PORTAL, "self", lustre_handle);
367         if (!ldlm->ldlm_service)
368                 LBUG();
369
370         err = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm");
371         if (err) {
372                 CERROR("cannot start thread\n");
373                 LBUG();
374         }
375         err = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm");
376         if (err) {
377                 CERROR("cannot start thread\n");
378                 LBUG();
379         }
380         err = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm");
381         if (err) {
382                 CERROR("cannot start thread\n");
383                 LBUG();
384         }
385         err = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm");
386         if (err) {
387                 CERROR("cannot start thread\n");
388                 LBUG();
389         }
390
391         MOD_INC_USE_COUNT;
392         RETURN(0);
393 }
394
395 static int ldlm_cleanup(struct obd_device *obddev)
396 {
397         struct ldlm_obd *ldlm = &obddev->u.ldlm;
398         ENTRY;
399
400         ptlrpc_stop_all_threads(ldlm->ldlm_service);
401         rpc_unregister_service(ldlm->ldlm_service);
402
403         if (!list_empty(&ldlm->ldlm_service->srv_reqs)) {
404                 // XXX reply with errors and clean up
405                 CERROR("Request list not empty!\n");
406         }
407
408         OBD_FREE(ldlm->ldlm_client, sizeof(*ldlm->ldlm_client));
409         OBD_FREE(ldlm->ldlm_service, sizeof(*ldlm->ldlm_service));
410
411         if (mds_reint_p != NULL)
412                 inter_module_put("mds_reint");
413         if (mds_getattr_name_p != NULL)
414                 inter_module_put("mds_getattr_name");
415
416         MOD_DEC_USE_COUNT;
417         RETURN(0);
418 }
419
420 struct obd_ops ldlm_obd_ops = {
421         o_iocontrol:   ldlm_iocontrol,
422         o_setup:       ldlm_setup,
423         o_cleanup:     ldlm_cleanup,
424         o_connect:     gen_connect,
425         o_disconnect:  gen_disconnect
426 };
427
428
429 static int __init ldlm_init(void)
430 {
431         int rc = obd_register_type(&ldlm_obd_ops, OBD_LDLM_DEVICENAME);
432         if (rc != 0)
433                 return rc;
434
435         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
436                                                sizeof(struct ldlm_resource), 0,
437                                                SLAB_HWCACHE_ALIGN, NULL, NULL);
438         if (ldlm_resource_slab == NULL)
439                 return -ENOMEM;
440
441         ldlm_lock_slab = kmem_cache_create("ldlm_locks",
442                                            sizeof(struct ldlm_lock), 0,
443                                            SLAB_HWCACHE_ALIGN, NULL, NULL);
444         if (ldlm_lock_slab == NULL) {
445                 kmem_cache_destroy(ldlm_resource_slab);
446                 return -ENOMEM;
447         }
448
449         return 0;
450 }
451
452 static void __exit ldlm_exit(void)
453 {
454         obd_unregister_type(OBD_LDLM_DEVICENAME);
455         kmem_cache_destroy(ldlm_resource_slab);
456         kmem_cache_destroy(ldlm_lock_slab);
457 }
458
459 EXPORT_SYMBOL(ldlm_local_lock_match);
460 EXPORT_SYMBOL(ldlm_lock_addref);
461 EXPORT_SYMBOL(ldlm_lock_decref);
462 EXPORT_SYMBOL(ldlm_cli_convert);
463 EXPORT_SYMBOL(ldlm_cli_enqueue);
464 EXPORT_SYMBOL(ldlm_cli_cancel);
465 EXPORT_SYMBOL(lustre_handle2object);
466 EXPORT_SYMBOL(ldlm_test);
467 EXPORT_SYMBOL(ldlm_lock_dump);
468 EXPORT_SYMBOL(ldlm_namespace_new);
469 EXPORT_SYMBOL(ldlm_namespace_free);
470
471 MODULE_AUTHOR("Cluster File Systems, Inc. <braam@clusterfs.com>");
472 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
473 MODULE_LICENSE("GPL");
474
475 module_init(ldlm_init);
476 module_exit(ldlm_exit);