Whamcloud - gitweb
c1d31820aa8e2dec0a0139bea6aa66279c8936c9
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #define EXPORT_SYMTAB
25 #define DEBUG_SUBSYSTEM S_LDLM
26
27 #ifdef __KERNEL__
28 #include <linux/module.h>
29 #include <linux/slab.h>
30 #include <linux/init.h>
31 #else 
32 #include <liblustre.h>
33 #endif
34
35 #include <linux/lustre_dlm.h>
36 #include <linux/obd_class.h>
37
38
39 extern kmem_cache_t *ldlm_resource_slab;
40 extern kmem_cache_t *ldlm_lock_slab;
41 extern struct lustre_lock ldlm_handle_lock;
42 extern struct list_head ldlm_namespace_list;
43 extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
44 extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
45
46 inline unsigned long round_timeout(unsigned long timeout)
47 {
48         return ((timeout / HZ) + 1) * HZ;
49 }
50
51 /* XXX should this be per-ldlm? */
52 static struct list_head waiting_locks_list;
53 static spinlock_t waiting_locks_spinlock;
54 static struct timer_list waiting_locks_timer;
55 static int ldlm_already_setup = 0;
56
57 static void waiting_locks_callback(unsigned long unused)
58 {
59         struct list_head *liter, *n;
60
61         spin_lock_bh(&waiting_locks_spinlock);
62         list_for_each_safe(liter, n, &waiting_locks_list) {
63                 struct ldlm_lock *l = list_entry(liter, struct ldlm_lock,
64                                                  l_pending_chain);
65                 if (l->l_callback_timeout > jiffies)
66                         break;
67                 CERROR("lock timer expired, lock %p\n", l);
68                 LDLM_DEBUG(l, "timer expired, recovering exp %p on conn %p",
69                            l->l_export, l->l_export->exp_connection);
70                 recovd_conn_fail(l->l_export->exp_connection);
71         }
72         spin_unlock_bh(&waiting_locks_spinlock);
73 }
74
75 /*
76  * Indicate that we're waiting for a client to call us back cancelling a given
77  * lock.  We add it to the pending-callback chain, and schedule the lock-timeout
78  * timer to fire appropriately.  (We round up to the next second, to avoid
79  * floods of timer firings during periods of high lock contention and traffic).
80  */
81 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
82 {
83         unsigned long timeout_rounded;
84         ENTRY;
85
86         LASSERT(list_empty(&lock->l_pending_chain));
87
88         spin_lock_bh(&waiting_locks_spinlock);
89         lock->l_callback_timeout = jiffies + (obd_timeout * HZ / 2);
90
91         timeout_rounded = round_timeout(lock->l_callback_timeout);
92
93         if (timeout_rounded < waiting_locks_timer.expires ||
94             !timer_pending(&waiting_locks_timer)) {
95                 mod_timer(&waiting_locks_timer, timeout_rounded);
96         }
97         list_add_tail(&lock->l_pending_chain, &waiting_locks_list); /* FIFO */
98         spin_unlock_bh(&waiting_locks_spinlock);
99         RETURN(1);
100 }
101
102 /*
103  * Remove a lock from the pending list, likely because it had its cancellation
104  * callback arrive without incident.  This adjusts the lock-timeout timer if
105  * needed.  Returns 0 if the lock wasn't pending after all, 1 if it was.
106  */
107 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
108 {
109         struct list_head *list_next;
110
111         ENTRY;
112
113         spin_lock_bh(&waiting_locks_spinlock);
114
115         if (list_empty(&lock->l_pending_chain)) {
116                 spin_unlock_bh(&waiting_locks_spinlock);
117                 RETURN(0);
118         }
119
120         list_next = lock->l_pending_chain.next;
121         if (lock->l_pending_chain.prev == &waiting_locks_list) {
122                 /* Removing the head of the list, adjust timer. */
123                 if (list_next == &waiting_locks_list) {
124                         /* No more, just cancel. */
125                         del_timer(&waiting_locks_timer);
126                 } else {
127                         struct ldlm_lock *next;
128                         next = list_entry(list_next, struct ldlm_lock,
129                                           l_pending_chain);
130                         mod_timer(&waiting_locks_timer,
131                                   round_timeout(next->l_callback_timeout));
132                 }
133         }
134         list_del_init(&lock->l_pending_chain);
135         spin_unlock_bh(&waiting_locks_spinlock);
136         RETURN(1);
137 }
138
139 static inline void ldlm_failed_ast(struct ldlm_lock *lock)
140 {
141         /* XXX diagnostic */
142         recovd_conn_fail(lock->l_export->exp_connection);
143 }
144
145 int ldlm_server_blocking_ast(struct ldlm_lock *lock,
146                              struct ldlm_lock_desc *desc,
147                              void *data, int flag)
148 {
149         struct ldlm_request *body;
150         struct ptlrpc_request *req;
151         int rc = 0, size = sizeof(*body);
152         ENTRY;
153
154         if (flag == LDLM_CB_CANCELING) {
155                 /* Don't need to do anything here. */
156                 RETURN(0);
157         }
158
159         LASSERT(lock);
160
161         l_lock(&lock->l_resource->lr_namespace->ns_lock);
162         /* XXX This is necessary because, with the lock re-tasking, we actually
163          * _can_ get called in here twice.  (bug 830) */
164         if (!list_empty(&lock->l_pending_chain)) {
165                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
166                 RETURN(0);
167         }
168
169         if (lock->l_destroyed) {
170                 /* What's the point? */
171                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
172                 RETURN(0);
173         }
174
175         req = ptlrpc_prep_req(&lock->l_export->exp_ldlm_data.led_import,
176                               LDLM_BL_CALLBACK, 1, &size, NULL);
177         if (!req)
178                 RETURN(-ENOMEM);
179
180         body = lustre_msg_buf(req->rq_reqmsg, 0);
181         memcpy(&body->lock_handle1, &lock->l_remote_handle,
182                sizeof(body->lock_handle1));
183         memcpy(&body->lock_desc, desc, sizeof(*desc));
184
185         LDLM_DEBUG(lock, "server preparing blocking AST");
186         req->rq_replen = lustre_msg_size(0, NULL);
187
188         ldlm_add_waiting_lock(lock);
189         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
190
191         req->rq_level = LUSTRE_CONN_RECOVD;
192         rc = ptlrpc_queue_wait(req);
193         if (rc == -ETIMEDOUT || rc == -EINTR) {
194                 ldlm_del_waiting_lock(lock);
195                 ldlm_failed_ast(lock);
196         } else if (rc) {
197                 CERROR("client returned %d from blocking AST for lock %p\n",
198                        req->rq_status, lock);
199                 LDLM_DEBUG(lock, "client returned error %d from blocking AST",
200                            req->rq_status);
201                 ldlm_lock_cancel(lock);
202                 /* Server-side AST functions are called from ldlm_reprocess_all,
203                  * which needs to be told to please restart its reprocessing. */
204                 rc = -ERESTART;
205         }
206
207         ptlrpc_req_finished(req);
208
209         RETURN(rc);
210 }
211
212 int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
213 {
214         struct ldlm_request *body;
215         struct ptlrpc_request *req;
216         int rc = 0, size = sizeof(*body);
217         ENTRY;
218
219         if (lock == NULL) {
220                 LBUG();
221                 RETURN(-EINVAL);
222         }
223
224         req = ptlrpc_prep_req(&lock->l_export->exp_ldlm_data.led_import,
225                               LDLM_CP_CALLBACK, 1, &size, NULL);
226         if (!req)
227                 RETURN(-ENOMEM);
228
229         body = lustre_msg_buf(req->rq_reqmsg, 0);
230         memcpy(&body->lock_handle1, &lock->l_remote_handle,
231                sizeof(body->lock_handle1));
232         body->lock_flags = flags;
233         ldlm_lock2desc(lock, &body->lock_desc);
234
235         LDLM_DEBUG(lock, "server preparing completion AST");
236         req->rq_replen = lustre_msg_size(0, NULL);
237
238         req->rq_level = LUSTRE_CONN_RECOVD;
239         rc = ptlrpc_queue_wait(req);
240         if (rc == -ETIMEDOUT || rc == -EINTR) {
241                 ldlm_del_waiting_lock(lock);
242                 ldlm_failed_ast(lock);
243         } else if (rc) {
244                 CERROR("client returned %d from completion AST for lock %p\n",
245                        req->rq_status, lock);
246                 LDLM_DEBUG(lock, "client returned error %d from completion AST",
247                            req->rq_status);
248                 ldlm_lock_cancel(lock);
249                 /* Server-side AST functions are called from ldlm_reprocess_all,
250                  * which needs to be told to please restart its reprocessing. */
251                 rc = -ERESTART;
252         }
253         ptlrpc_req_finished(req);
254
255         RETURN(rc);
256 }
257
258 int ldlm_handle_enqueue(struct ptlrpc_request *req,
259                         ldlm_completion_callback completion_callback,
260                         ldlm_blocking_callback blocking_callback)
261 {
262         struct obd_device *obddev = req->rq_export->exp_obd;
263         struct ldlm_reply *dlm_rep;
264         struct ldlm_request *dlm_req;
265         int rc, size = sizeof(*dlm_rep), cookielen = 0;
266         __u32 flags;
267         ldlm_error_t err;
268         struct ldlm_lock *lock = NULL;
269         void *cookie = NULL;
270         ENTRY;
271
272         LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
273
274         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
275         flags = dlm_req->lock_flags;
276         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN &&
277             (flags & LDLM_FL_HAS_INTENT)) {
278                 /* In this case, the reply buffer is allocated deep in
279                  * local_lock_enqueue by the policy function. */
280                 cookie = req;
281                 cookielen = sizeof(*req);
282         } else {
283                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
284                                      &req->rq_repmsg);
285                 if (rc) {
286                         CERROR("out of memory\n");
287                         RETURN(-ENOMEM);
288                 }
289                 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) {
290                         cookie = &dlm_req->lock_desc.l_extent;
291                         cookielen = sizeof(struct ldlm_extent);
292                 }
293         }
294
295         /* The lock's callback data might be set in the policy function */
296         lock = ldlm_lock_create(obddev->obd_namespace,
297                                 &dlm_req->lock_handle2,
298                                 dlm_req->lock_desc.l_resource.lr_name,
299                                 dlm_req->lock_desc.l_resource.lr_type,
300                                 dlm_req->lock_desc.l_req_mode, NULL, 0);
301         if (!lock)
302                 GOTO(out, err = -ENOMEM);
303
304         memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
305                sizeof(lock->l_remote_handle));
306         LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
307
308         LASSERT(req->rq_export);
309         lock->l_export = req->rq_export;
310         l_lock(&lock->l_resource->lr_namespace->ns_lock);
311         list_add(&lock->l_export_chain,
312                  &lock->l_export->exp_ldlm_data.led_held_locks);
313         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
314
315         err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, cookielen,
316                                 &flags, completion_callback, blocking_callback);
317         if (err)
318                 GOTO(out, err);
319
320         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
321         dlm_rep->lock_flags = flags;
322
323         ldlm_lock2handle(lock, &dlm_rep->lock_handle);
324         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
325                 memcpy(&dlm_rep->lock_extent, &lock->l_extent,
326                        sizeof(lock->l_extent));
327         if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED) {
328                 memcpy(&dlm_rep->lock_resource_name, &lock->l_resource->lr_name,
329                        sizeof(dlm_rep->lock_resource_name));
330                 dlm_rep->lock_mode = lock->l_req_mode;
331         }
332
333         EXIT;
334  out:
335         if (lock)
336                 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
337                            "(err=%d)", err);
338         req->rq_status = err;
339
340         /* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this
341          * ldlm_reprocess_all.  If this moves, revisit that code. -phil */
342         if (lock) {
343                 if (!err)
344                         ldlm_reprocess_all(lock->l_resource);
345                 LDLM_LOCK_PUT(lock);
346         }
347         LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p)", lock);
348
349         return 0;
350 }
351
352 int ldlm_handle_convert(struct ptlrpc_request *req)
353 {
354         struct ldlm_request *dlm_req;
355         struct ldlm_reply *dlm_rep;
356         struct ldlm_lock *lock;
357         int rc, size = sizeof(*dlm_rep);
358         ENTRY;
359
360         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
361         if (rc) {
362                 CERROR("out of memory\n");
363                 RETURN(-ENOMEM);
364         }
365         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
366         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
367         dlm_rep->lock_flags = dlm_req->lock_flags;
368
369         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
370         if (!lock) {
371                 req->rq_status = EINVAL;
372         } else {
373                 LDLM_DEBUG(lock, "server-side convert handler START");
374                 ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
375                                   &dlm_rep->lock_flags);
376                 if (ldlm_del_waiting_lock(lock))
377                         CDEBUG(D_DLMTRACE, "converted waiting lock %p\n", lock);
378                 req->rq_status = 0;
379         }
380
381         if (lock) {
382                 ldlm_reprocess_all(lock->l_resource);
383                 LDLM_DEBUG(lock, "server-side convert handler END");
384                 LDLM_LOCK_PUT(lock);
385         } else
386                 LDLM_DEBUG_NOLOCK("server-side convert handler END");
387
388         RETURN(0);
389 }
390
391 int ldlm_handle_cancel(struct ptlrpc_request *req)
392 {
393         struct ldlm_request *dlm_req;
394         struct ldlm_lock *lock;
395         int rc;
396         ENTRY;
397
398         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
399         if (rc) {
400                 CERROR("out of memory\n");
401                 RETURN(-ENOMEM);
402         }
403         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
404         if (!dlm_req) {
405                 CERROR("bad request buffer for cancel\n");
406                 RETURN(-EINVAL);
407         }
408
409         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
410         if (!lock) {
411                 CERROR("received cancel for unknown lock cookie "LPX64"\n",
412                        dlm_req->lock_handle1.cookie);
413                 LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock "
414                                   "(cookie "LPU64")",
415                                   dlm_req->lock_handle1.cookie);
416                 req->rq_status = ESTALE;
417         } else {
418                 LDLM_DEBUG(lock, "server-side cancel handler START");
419                 ldlm_lock_cancel(lock);
420                 if (ldlm_del_waiting_lock(lock))
421                         CDEBUG(D_DLMTRACE, "cancelled waiting lock %p\n", lock);
422                 req->rq_status = 0;
423         }
424
425         if (ptlrpc_reply(req->rq_svc, req) != 0)
426                 LBUG();
427
428         if (lock) {
429                 ldlm_reprocess_all(lock->l_resource);
430                 LDLM_DEBUG(lock, "server-side cancel handler END");
431                 LDLM_LOCK_PUT(lock);
432         }
433
434         RETURN(0);
435 }
436
437 struct ldlm_lock *ldlm_handle2lock_ns(struct ldlm_namespace *ns,
438                                       struct lustre_handle *handle);
439
440 static int ldlm_handle_bl_callback(struct ptlrpc_request *req,
441                                    struct ldlm_namespace *ns)
442 {
443         struct ldlm_request *dlm_req;
444         struct ldlm_lock *lock;
445         int do_ast;
446         ENTRY;
447
448         OBD_FAIL_RETURN(OBD_FAIL_OSC_LOCK_BL_AST, 0);
449
450         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
451
452         lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle1);
453         if (!lock) {
454                 CDEBUG(D_INFO, "blocking callback on lock "LPX64
455                        " - lock disappeared\n", dlm_req->lock_handle1.cookie);
456                 RETURN(-EINVAL);
457         }
458
459         LDLM_DEBUG(lock, "client blocking AST callback handler START");
460
461         l_lock(&ns->ns_lock);
462         lock->l_flags |= LDLM_FL_CBPENDING;
463         do_ast = (!lock->l_readers && !lock->l_writers);
464         l_unlock(&ns->ns_lock);
465
466         if (do_ast) {
467                 LDLM_DEBUG(lock, "already unused, calling "
468                            "callback (%p)", lock->l_blocking_ast);
469                 if (lock->l_blocking_ast != NULL)
470                         lock->l_blocking_ast(lock, &dlm_req->lock_desc,
471                                              lock->l_data, LDLM_CB_BLOCKING);
472         } else {
473                 LDLM_DEBUG(lock, "Lock still has references, will be"
474                            " cancelled later");
475         }
476
477         LDLM_DEBUG(lock, "client blocking callback handler END");
478         LDLM_LOCK_PUT(lock);
479         RETURN(0);
480 }
481
482 static int ldlm_handle_cp_callback(struct ptlrpc_request *req,
483                                    struct ldlm_namespace *ns)
484 {
485         struct list_head ast_list = LIST_HEAD_INIT(ast_list);
486         struct ldlm_request *dlm_req;
487         struct ldlm_lock *lock;
488         ENTRY;
489
490         OBD_FAIL_RETURN(OBD_FAIL_OSC_LOCK_CP_AST, 0);
491
492         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
493
494         lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle1);
495         if (!lock) {
496                 CERROR("completion callback on lock "LPX64" - lock "
497                        "disappeared\n", dlm_req->lock_handle1.cookie);
498                 RETURN(-EINVAL);
499         }
500
501         LDLM_DEBUG(lock, "client completion callback handler START");
502
503         l_lock(&ns->ns_lock);
504
505         /* If we receive the completion AST before the actual enqueue returned,
506          * then we might need to switch lock modes, resources, or extents. */
507         if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
508                 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
509                 LDLM_DEBUG(lock, "completion AST, new lock mode");
510         }
511         if (lock->l_resource->lr_type == LDLM_EXTENT)
512                 memcpy(&lock->l_extent, &dlm_req->lock_desc.l_extent,
513                        sizeof(lock->l_extent));
514         ldlm_resource_unlink_lock(lock);
515         if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
516                    &lock->l_resource->lr_name,
517                    sizeof(lock->l_resource->lr_name)) != 0) {
518                 ldlm_lock_change_resource(ns, lock,
519                                          dlm_req->lock_desc.l_resource.lr_name);
520                 LDLM_DEBUG(lock, "completion AST, new resource");
521         }
522         lock->l_resource->lr_tmp = &ast_list;
523         ldlm_grant_lock(lock, req, sizeof(*req));
524         lock->l_resource->lr_tmp = NULL;
525         l_unlock(&ns->ns_lock);
526         LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
527         LDLM_LOCK_PUT(lock);
528
529         ldlm_run_ast_work(&ast_list);
530
531         LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
532                           lock);
533         RETURN(0);
534 }
535
536 static int ldlm_callback_handler(struct ptlrpc_request *req)
537 {
538         struct ldlm_namespace *ns;
539         int rc;
540         ENTRY;
541
542         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
543         if (rc) {
544                 CERROR("Invalid request: %d\n", rc);
545                 RETURN(rc);
546         }
547
548         if (req->rq_export == NULL) {
549                 struct ldlm_request *dlm_req;
550
551                 CERROR("operation %d with bad export (ptl req %d/rep %d)\n",
552                        req->rq_reqmsg->opc, req->rq_request_portal,
553                        req->rq_reply_portal);
554                 CERROR("--> export addr: "LPX64", cookie: "LPX64"\n",
555                        req->rq_reqmsg->addr, req->rq_reqmsg->cookie);
556                 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
557                 CERROR("--> lock addr: "LPX64", cookie: "LPX64"\n",
558                        dlm_req->lock_handle1.addr,dlm_req->lock_handle1.cookie);
559                 rc = -ENOTCONN;
560                 goto out;
561         }
562
563         LASSERT(req->rq_export != NULL);
564         LASSERT(req->rq_export->exp_obd != NULL);
565         ns = req->rq_export->exp_obd->obd_namespace;
566         LASSERT(ns != NULL);
567
568         switch (req->rq_reqmsg->opc) {
569         case LDLM_BL_CALLBACK:
570                 CDEBUG(D_INODE, "blocking ast\n");
571                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
572                 rc = ldlm_handle_bl_callback(req, ns);
573                 break;
574         case LDLM_CP_CALLBACK:
575                 CDEBUG(D_INODE, "completion ast\n");
576                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
577                 rc = ldlm_handle_cp_callback(req, ns);
578                 break;
579         default:
580                 CERROR("invalid opcode %d\n", req->rq_reqmsg->opc);
581                 RETURN(-EINVAL);
582         }
583  out:
584         req->rq_status = rc;
585         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
586         if (rc)
587                 RETURN(rc);
588         ptlrpc_reply(req->rq_svc, req);
589
590         RETURN(0);
591 }
592
593 static int ldlm_cancel_handler(struct ptlrpc_request *req)
594 {
595         int rc;
596         ENTRY;
597
598         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
599         if (rc) {
600                 CERROR("lustre_ldlm: Invalid request: %d\n", rc);
601                 RETURN(rc);
602         }
603
604         if (req->rq_export == NULL) {
605                 struct ldlm_request *dlm_req;
606                 CERROR("operation %d with bad export (ptl req %d/rep %d)\n",
607                        req->rq_reqmsg->opc, req->rq_request_portal,
608                        req->rq_reply_portal);
609                 CERROR("--> export addr: "LPX64", cookie: "LPX64"\n",
610                        req->rq_reqmsg->addr, req->rq_reqmsg->cookie);
611                 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
612                 ldlm_lock_dump_handle(D_ERROR, &dlm_req->lock_handle1);
613                 CERROR("--> ignoring this error as a temporary workaround!  "
614                        "beware!\n");
615                 //RETURN(-ENOTCONN);
616         }
617
618         switch (req->rq_reqmsg->opc) {
619
620         /* XXX FIXME move this back to mds/handler.c, bug 625069 */
621         case LDLM_CANCEL:
622                 CDEBUG(D_INODE, "cancel\n");
623                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
624                 rc = ldlm_handle_cancel(req);
625                 if (rc)
626                         break;
627                 RETURN(0);
628
629         default:
630                 CERROR("invalid opcode %d\n", req->rq_reqmsg->opc);
631                 RETURN(-EINVAL);
632         }
633
634         RETURN(0);
635 }
636
637 static int ldlm_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
638                           void *karg, void *uarg)
639 {
640         struct obd_device *obddev = class_conn2obd(conn);
641         struct ptlrpc_connection *connection;
642         struct obd_uuid uuid = { "ldlm" };
643         int err = 0;
644         ENTRY;
645
646         if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
647             _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
648                 CDEBUG(D_IOCTL, "invalid ioctl (type %d, nr %d, size %d)\n",
649                        _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
650                 RETURN(-EINVAL);
651         }
652
653         OBD_ALLOC(obddev->u.ldlm.ldlm_client,
654                   sizeof(*obddev->u.ldlm.ldlm_client));
655         connection = ptlrpc_uuid_to_connection(&uuid);
656         if (!connection)
657                 CERROR("No LDLM UUID found: assuming ldlm is local.\n");
658
659         switch (cmd) {
660         case IOC_LDLM_TEST:
661                 //err = ldlm_test(obddev, conn);
662                 err = 0;
663                 CERROR("-- NO TESTS WERE RUN done err %d\n", err);
664                 GOTO(out, err);
665         case IOC_LDLM_DUMP:
666                 ldlm_dump_all_namespaces();
667                 GOTO(out, err);
668         default:
669                 GOTO(out, err = -EINVAL);
670         }
671
672  out:
673         if (connection)
674                 ptlrpc_put_connection(connection);
675         OBD_FREE(obddev->u.ldlm.ldlm_client,
676                  sizeof(*obddev->u.ldlm.ldlm_client));
677         return err;
678 }
679
680 static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
681 {
682         struct ldlm_obd *ldlm = &obddev->u.ldlm;
683         int rc, i;
684         ENTRY;
685
686         if (ldlm_already_setup)
687                 RETURN(-EALREADY);
688
689         rc = ldlm_proc_setup(obddev);
690         if (rc != 0)
691                 RETURN(rc);
692
693 #ifdef __KERNEL__
694         ldlm->ldlm_cb_service =
695                 ptlrpc_init_svc(LDLM_NEVENTS, LDLM_NBUFS, LDLM_BUFSIZE,
696                                 LDLM_MAXREQSIZE, LDLM_CB_REQUEST_PORTAL,
697                                 LDLM_CB_REPLY_PORTAL,
698                                 ldlm_callback_handler, "ldlm_cbd");
699
700         if (!ldlm->ldlm_cb_service) {
701                 CERROR("failed to start service\n");
702                 GOTO(out_proc, rc = -ENOMEM);
703         }
704
705         ldlm->ldlm_cancel_service =
706                 ptlrpc_init_svc(LDLM_NEVENTS, LDLM_NBUFS, LDLM_BUFSIZE,
707                                 LDLM_MAXREQSIZE, LDLM_CANCEL_REQUEST_PORTAL,
708                                 LDLM_CANCEL_REPLY_PORTAL,
709                                 ldlm_cancel_handler, "ldlm_canceld");
710
711         if (!ldlm->ldlm_cancel_service) {
712                 CERROR("failed to start service\n");
713                 GOTO(out_proc, rc = -ENOMEM);
714         }
715
716         for (i = 0; i < LDLM_NUM_THREADS; i++) {
717                 char name[32];
718                 sprintf(name, "ldlm_cn_%02d", i);
719                 rc = ptlrpc_start_thread(obddev, ldlm->ldlm_cancel_service,
720                                          name);
721                 if (rc) {
722                         CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
723                         LBUG();
724                         GOTO(out_thread, rc);
725                 }
726         }
727
728         for (i = 0; i < LDLM_NUM_THREADS; i++) {
729                 char name[32];
730                 sprintf(name, "ldlm_cb_%02d", i);
731                 rc = ptlrpc_start_thread(obddev, ldlm->ldlm_cb_service, name);
732                 if (rc) {
733                         CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
734                         LBUG();
735                         GOTO(out_thread, rc);
736                 }
737         }
738
739 #endif
740         INIT_LIST_HEAD(&waiting_locks_list);
741         spin_lock_init(&waiting_locks_spinlock);
742         waiting_locks_timer.function = waiting_locks_callback;
743         waiting_locks_timer.data = 0;
744         init_timer(&waiting_locks_timer);
745
746         ldlm_already_setup = 1;
747
748         RETURN(0);
749
750  out_thread:
751 #ifdef __KERNEL__
752         ptlrpc_stop_all_threads(ldlm->ldlm_cancel_service);
753         ptlrpc_unregister_service(ldlm->ldlm_cancel_service);
754         ptlrpc_stop_all_threads(ldlm->ldlm_cb_service);
755         ptlrpc_unregister_service(ldlm->ldlm_cb_service);
756 #endif
757  out_proc:
758         ldlm_proc_cleanup(obddev);
759
760         return rc;
761 }
762
763 static int ldlm_cleanup(struct obd_device *obddev)
764 {
765         struct ldlm_obd *ldlm = &obddev->u.ldlm;
766         ENTRY;
767
768         if (!list_empty(&ldlm_namespace_list)) {
769                 CERROR("ldlm still has namespaces; clean these up first.\n");
770                 RETURN(-EBUSY);
771         }
772
773 #ifdef __KERNEL__
774         ptlrpc_stop_all_threads(ldlm->ldlm_cb_service);
775         ptlrpc_unregister_service(ldlm->ldlm_cb_service);
776         ptlrpc_stop_all_threads(ldlm->ldlm_cancel_service);
777         ptlrpc_unregister_service(ldlm->ldlm_cancel_service);
778         ldlm_proc_cleanup(obddev);
779 #endif
780         ldlm_already_setup = 0;
781         RETURN(0);
782 }
783
784 static int ldlm_connect(struct lustre_handle *conn, struct obd_device *src,
785                         struct obd_uuid *cluuid, struct recovd_obd *recovd,
786                         ptlrpc_recovery_cb_t recover)
787 {
788         return class_connect(conn, src, cluuid);
789 }
790
791 struct obd_ops ldlm_obd_ops = {
792         o_owner:       THIS_MODULE,
793         o_iocontrol:   ldlm_iocontrol,
794         o_setup:       ldlm_setup,
795         o_cleanup:     ldlm_cleanup,
796         o_connect:     ldlm_connect,
797         o_disconnect:  class_disconnect
798 };
799
800 int __init ldlm_init(void)
801 {
802         int rc = class_register_type(&ldlm_obd_ops, 0, OBD_LDLM_DEVICENAME);
803         if (rc != 0)
804                 return rc;
805
806         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
807                                                sizeof(struct ldlm_resource), 0,
808                                                SLAB_HWCACHE_ALIGN, NULL, NULL);
809         if (ldlm_resource_slab == NULL)
810                 return -ENOMEM;
811
812         ldlm_lock_slab = kmem_cache_create("ldlm_locks",
813                                            sizeof(struct ldlm_lock), 0,
814                                            SLAB_HWCACHE_ALIGN, NULL, NULL);
815         if (ldlm_lock_slab == NULL) {
816                 kmem_cache_destroy(ldlm_resource_slab);
817                 return -ENOMEM;
818         }
819
820         l_lock_init(&ldlm_handle_lock);
821
822         return 0;
823 }
824
825 static void __exit ldlm_exit(void)
826 {
827         class_unregister_type(OBD_LDLM_DEVICENAME);
828         if (kmem_cache_destroy(ldlm_resource_slab) != 0)
829                 CERROR("couldn't free ldlm resource slab\n");
830         if (kmem_cache_destroy(ldlm_lock_slab) != 0)
831                 CERROR("couldn't free ldlm lock slab\n");
832 }
833
834 /* ldlm_lock.c */
835 EXPORT_SYMBOL(ldlm_lock2desc);
836 EXPORT_SYMBOL(ldlm_register_intent);
837 EXPORT_SYMBOL(ldlm_unregister_intent);
838 EXPORT_SYMBOL(ldlm_lockname);
839 EXPORT_SYMBOL(ldlm_typename);
840 EXPORT_SYMBOL(ldlm_lock2handle);
841 EXPORT_SYMBOL(__ldlm_handle2lock);
842 EXPORT_SYMBOL(ldlm_lock_put);
843 EXPORT_SYMBOL(ldlm_lock_match);
844 EXPORT_SYMBOL(ldlm_lock_cancel);
845 EXPORT_SYMBOL(ldlm_lock_addref);
846 EXPORT_SYMBOL(ldlm_lock_decref);
847 EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
848 EXPORT_SYMBOL(ldlm_lock_change_resource);
849 EXPORT_SYMBOL(ldlm_lock_set_data);
850 EXPORT_SYMBOL(ldlm_it2str);
851 EXPORT_SYMBOL(ldlm_lock_dump);
852 EXPORT_SYMBOL(ldlm_lock_dump_handle);
853 EXPORT_SYMBOL(ldlm_cancel_locks_for_export);
854 EXPORT_SYMBOL(ldlm_reprocess_all_ns);
855
856 /* ldlm_request.c */
857 EXPORT_SYMBOL(ldlm_completion_ast);
858 EXPORT_SYMBOL(ldlm_expired_completion_wait);
859 EXPORT_SYMBOL(ldlm_cli_convert);
860 EXPORT_SYMBOL(ldlm_cli_enqueue);
861 EXPORT_SYMBOL(ldlm_cli_cancel);
862 EXPORT_SYMBOL(ldlm_cli_cancel_unused);
863 EXPORT_SYMBOL(ldlm_match_or_enqueue);
864 EXPORT_SYMBOL(ldlm_replay_locks);
865 EXPORT_SYMBOL(ldlm_resource_foreach);
866 EXPORT_SYMBOL(ldlm_namespace_foreach);
867 EXPORT_SYMBOL(ldlm_namespace_foreach_res);
868
869 /* ldlm_lockd.c */
870 EXPORT_SYMBOL(ldlm_server_blocking_ast);
871 EXPORT_SYMBOL(ldlm_server_completion_ast);
872 EXPORT_SYMBOL(ldlm_handle_enqueue);
873 EXPORT_SYMBOL(ldlm_handle_cancel);
874 EXPORT_SYMBOL(ldlm_handle_convert);
875 EXPORT_SYMBOL(ldlm_del_waiting_lock);
876
877 #if 0
878 /* ldlm_test.c */
879 EXPORT_SYMBOL(ldlm_test);
880 EXPORT_SYMBOL(ldlm_regression_start);
881 EXPORT_SYMBOL(ldlm_regression_stop);
882 #endif
883
884 /* ldlm_resource.c */
885 EXPORT_SYMBOL(ldlm_namespace_new);
886 EXPORT_SYMBOL(ldlm_namespace_cleanup);
887 EXPORT_SYMBOL(ldlm_namespace_free);
888 EXPORT_SYMBOL(ldlm_namespace_dump);
889
890 /* l_lock.c */
891 EXPORT_SYMBOL(l_lock);
892 EXPORT_SYMBOL(l_unlock);
893
894 #ifdef __KERNEL__
895 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
896 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
897 MODULE_LICENSE("GPL");
898
899 module_init(ldlm_init);
900 module_exit(ldlm_exit);
901 #endif