Whamcloud - gitweb
Quiet some well-known error messages.
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #define EXPORT_SYMTAB
25 #define DEBUG_SUBSYSTEM S_LDLM
26
27 #ifdef __KERNEL__
28 # include <linux/module.h>
29 # include <linux/slab.h>
30 # include <linux/init.h>
31 # include <linux/wait.h>
32 #else
33 # include <liblustre.h>
34 #endif
35
36 #include <linux/lustre_dlm.h>
37 #include <linux/obd_class.h>
38 extern kmem_cache_t *ldlm_resource_slab;
39 extern kmem_cache_t *ldlm_lock_slab;
40 extern struct lustre_lock ldlm_handle_lock;
41 extern struct list_head ldlm_namespace_list;
42 extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
43 extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
44
45 static int ldlm_already_setup = 0;
46
47 #ifdef __KERNEL__
48
49 inline unsigned long round_timeout(unsigned long timeout)
50 {
51         return ((timeout / HZ) + 1) * HZ;
52 }
53
54 /* XXX should this be per-ldlm? */
55 static struct list_head waiting_locks_list;
56 static spinlock_t waiting_locks_spinlock;
57 static struct timer_list waiting_locks_timer;
58
59 static struct expired_lock_thread {
60         wait_queue_head_t         elt_waitq;
61         int                       elt_state;
62         struct list_head          elt_expired_locks;
63         spinlock_t                elt_lock;
64 } expired_lock_thread;
65
66 #define ELT_STOPPED   0
67 #define ELT_READY     1
68 #define ELT_TERMINATE 2
69
70 static inline int have_expired_locks(void)
71 {
72         int need_to_run;
73
74         spin_lock_bh(&expired_lock_thread.elt_lock);
75         need_to_run = !list_empty(&expired_lock_thread.elt_expired_locks);
76         spin_unlock_bh(&expired_lock_thread.elt_lock);
77
78         RETURN(need_to_run);
79 }
80
81 static int expired_lock_main(void *arg)
82 {
83         struct list_head *expired = &expired_lock_thread.elt_expired_locks;
84         struct l_wait_info lwi = { 0 };
85         unsigned long flags;
86
87         ENTRY;
88         lock_kernel();
89         kportal_daemonize("ldlm_elt");
90         
91         SIGNAL_MASK_LOCK(current, flags);
92         sigfillset(&current->blocked);
93         RECALC_SIGPENDING;
94         SIGNAL_MASK_UNLOCK(current, flags);
95         
96         unlock_kernel();
97         
98         expired_lock_thread.elt_state = ELT_READY;
99         wake_up(&expired_lock_thread.elt_waitq);
100         
101         while (1) {
102                 l_wait_event(expired_lock_thread.elt_waitq,
103                              have_expired_locks() ||
104                              expired_lock_thread.elt_state == ELT_TERMINATE,
105                              &lwi);
106
107                 spin_lock_bh(&expired_lock_thread.elt_lock);
108                 while (!list_empty(expired)) {
109                         struct ldlm_lock *lock = list_entry(expired->next,
110                                                             struct ldlm_lock,
111                                                             l_pending_chain);
112                         spin_unlock_bh(&expired_lock_thread.elt_lock);
113                         
114                         ptlrpc_fail_export(lock->l_export);
115
116                         spin_lock_bh(&expired_lock_thread.elt_lock);
117                 }
118                 spin_unlock_bh(&expired_lock_thread.elt_lock);
119
120                 if (expired_lock_thread.elt_state == ELT_TERMINATE)
121                         break;
122         }
123
124         expired_lock_thread.elt_state = ELT_STOPPED;
125         wake_up(&expired_lock_thread.elt_waitq);
126         RETURN(0);
127 }
128
129 static void waiting_locks_callback(unsigned long unused)
130 {
131         struct ldlm_lock *lock;
132
133         spin_lock_bh(&waiting_locks_spinlock);
134         while (!list_empty(&waiting_locks_list)) {
135                 lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
136                                   l_pending_chain);
137
138                 if (lock->l_callback_timeout > jiffies)
139                         break;
140
141                 LDLM_ERROR(lock, "lock callback timer expired: evicting client "
142                            "%s@%s nid "LPU64,
143                            lock->l_export->exp_client_uuid.uuid,
144                            lock->l_export->exp_connection->c_remote_uuid.uuid,
145                            lock->l_export->exp_connection->c_peer.peer_nid);
146
147                 spin_lock_bh(&expired_lock_thread.elt_lock);
148                 list_del(&lock->l_pending_chain);
149                 list_add(&lock->l_pending_chain,
150                          &expired_lock_thread.elt_expired_locks);
151                 spin_unlock_bh(&expired_lock_thread.elt_lock);
152                 wake_up(&expired_lock_thread.elt_waitq);
153         }
154
155         spin_unlock_bh(&waiting_locks_spinlock);
156 }
157
158 /*
159  * Indicate that we're waiting for a client to call us back cancelling a given
160  * lock.  We add it to the pending-callback chain, and schedule the lock-timeout
161  * timer to fire appropriately.  (We round up to the next second, to avoid
162  * floods of timer firings during periods of high lock contention and traffic).
163  */
164 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
165 {
166         unsigned long timeout_rounded;
167
168         LDLM_DEBUG(lock, "adding to wait list");
169         LASSERT(list_empty(&lock->l_pending_chain));
170
171         spin_lock_bh(&waiting_locks_spinlock);
172         lock->l_callback_timeout = jiffies + (obd_timeout * HZ / 2);
173
174         timeout_rounded = round_timeout(lock->l_callback_timeout);
175
176         if (timeout_rounded < waiting_locks_timer.expires ||
177             !timer_pending(&waiting_locks_timer)) {
178                 mod_timer(&waiting_locks_timer, timeout_rounded);
179         }
180         list_add_tail(&lock->l_pending_chain, &waiting_locks_list); /* FIFO */
181         spin_unlock_bh(&waiting_locks_spinlock);
182         return 1;
183 }
184
185 /*
186  * Remove a lock from the pending list, likely because it had its cancellation
187  * callback arrive without incident.  This adjusts the lock-timeout timer if
188  * needed.  Returns 0 if the lock wasn't pending after all, 1 if it was.
189  */
190 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
191 {
192         struct list_head *list_next;
193
194         if (lock->l_export == NULL) {
195                 /* We don't have a "waiting locks list" on clients. */
196                 LDLM_DEBUG(lock, "client lock: no-op");
197                 return 0;
198         }
199
200         spin_lock_bh(&waiting_locks_spinlock);
201
202         if (list_empty(&lock->l_pending_chain)) {
203                 spin_unlock_bh(&waiting_locks_spinlock);
204                 LDLM_DEBUG(lock, "wasn't waiting");
205                 return 0;
206         }
207
208         list_next = lock->l_pending_chain.next;
209         if (lock->l_pending_chain.prev == &waiting_locks_list) {
210                 /* Removing the head of the list, adjust timer. */
211                 if (list_next == &waiting_locks_list) {
212                         /* No more, just cancel. */
213                         del_timer(&waiting_locks_timer);
214                 } else {
215                         struct ldlm_lock *next;
216                         next = list_entry(list_next, struct ldlm_lock,
217                                           l_pending_chain);
218                         mod_timer(&waiting_locks_timer,
219                                   round_timeout(next->l_callback_timeout));
220                 }
221         }
222         list_del_init(&lock->l_pending_chain);
223         spin_unlock_bh(&waiting_locks_spinlock);
224         LDLM_DEBUG(lock, "removed");
225         return 1;
226 }
227
228 #else /* !__KERNEL__ */
229
230 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
231 {
232         RETURN(1);
233 }
234
235 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
236 {
237         RETURN(0);
238 }
239
240 #endif /* __KERNEL__ */
241
242 static void ldlm_failed_ast(struct ldlm_lock *lock, int rc, char *ast_type)
243 {
244         CERROR("%s AST failed (%d) for res "LPU64"/"LPU64
245                ", mode %s: evicting client %s@%s NID "LPU64"\n",
246                ast_type, rc,
247                lock->l_resource->lr_name.name[0],
248                lock->l_resource->lr_name.name[1],
249                ldlm_lockname[lock->l_granted_mode],
250                lock->l_export->exp_client_uuid.uuid,
251                lock->l_export->exp_connection->c_remote_uuid.uuid,
252                lock->l_export->exp_connection->c_peer.peer_nid);
253         ptlrpc_fail_export(lock->l_export);
254 }
255
256 int ldlm_server_blocking_ast(struct ldlm_lock *lock,
257                              struct ldlm_lock_desc *desc,
258                              void *data, int flag)
259 {
260         struct ldlm_request *body;
261         struct ptlrpc_request *req;
262         int rc = 0, size = sizeof(*body);
263         ENTRY;
264
265         if (flag == LDLM_CB_CANCELING) {
266                 /* Don't need to do anything here. */
267                 RETURN(0);
268         }
269
270         LASSERT(lock);
271
272         l_lock(&lock->l_resource->lr_namespace->ns_lock);
273         /* XXX This is necessary because, with the lock re-tasking, we actually
274          * _can_ get called in here twice.  (bug 830) */
275         if (!list_empty(&lock->l_pending_chain)) {
276                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
277                 RETURN(0);
278         }
279
280         if (lock->l_destroyed) {
281                 /* What's the point? */
282                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
283                 RETURN(0);
284         }
285
286 #if 0
287         if (LTIME_S(CURRENT_TIME) - lock->l_export->exp_last_request_time > 30){
288                 ldlm_failed_ast(lock, -ETIMEDOUT, "Not-attempted blocking");
289                 RETURN(-ETIMEDOUT);
290         }
291 #endif
292
293         req = ptlrpc_prep_req(lock->l_export->exp_ldlm_data.led_import,
294                               LDLM_BL_CALLBACK, 1, &size, NULL);
295         if (!req)
296                 RETURN(-ENOMEM);
297
298         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
299         memcpy(&body->lock_handle1, &lock->l_remote_handle,
300                sizeof(body->lock_handle1));
301         memcpy(&body->lock_desc, desc, sizeof(*desc));
302
303         LDLM_DEBUG(lock, "server preparing blocking AST");
304         req->rq_replen = lustre_msg_size(0, NULL);
305
306         ldlm_add_waiting_lock(lock);
307         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
308
309         req->rq_level = LUSTRE_CONN_RECOVER;
310         req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
311         rc = ptlrpc_queue_wait(req);
312         if (rc == -ETIMEDOUT || rc == -EINTR) {
313                 ldlm_del_waiting_lock(lock);
314                 ldlm_failed_ast(lock, rc, "blocking");
315         } else if (rc) {
316                 if (rc == -EINVAL)
317                         CDEBUG(D_DLMTRACE, "client (nid "LPU64") returned %d "
318                                "from blocking AST for lock %p--normal race\n",
319                                req->rq_connection->c_peer.peer_nid,
320                                req->rq_repmsg->status, lock);
321                 else if (rc == -ENOTCONN)
322                         CDEBUG(D_DLMTRACE, "client (nid "LPU64") returned %d "
323                                "from blocking AST for lock %p--this client was "
324                                "probably rebooted while it held a lock, nothing"
325                                " serious\n",req->rq_connection->c_peer.peer_nid,
326                                req->rq_repmsg->status, lock);
327                 else
328                         CDEBUG(D_ERROR, "client (nid "LPU64") returned %d "
329                                "from blocking AST for lock %p\n",
330                                req->rq_connection->c_peer.peer_nid,
331                                req->rq_repmsg->status, lock);
332                 LDLM_DEBUG(lock, "client returned error %d from blocking AST",
333                            req->rq_status);
334                 ldlm_lock_cancel(lock);
335                 /* Server-side AST functions are called from ldlm_reprocess_all,
336                  * which needs to be told to please restart its reprocessing. */
337                 rc = -ERESTART;
338         }
339
340         ptlrpc_req_finished(req);
341
342         RETURN(rc);
343 }
344
345 /* XXX copied from ptlrpc/service.c */
346 static long timeval_sub(struct timeval *large, struct timeval *small)
347 {
348         return (large->tv_sec - small->tv_sec) * 1000000 +
349                 (large->tv_usec - small->tv_usec);
350 }
351
352 int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
353 {
354         struct ldlm_request *body;
355         struct ptlrpc_request *req;
356         struct timeval granted_time;
357         long total_enqueue_wait;
358         int rc = 0, size = sizeof(*body);
359         ENTRY;
360
361         if (lock == NULL) {
362                 LBUG();
363                 RETURN(-EINVAL);
364         }
365
366         do_gettimeofday(&granted_time);
367         total_enqueue_wait = timeval_sub(&granted_time, &lock->l_enqueued_time);
368
369         if (total_enqueue_wait / 1000000 > obd_timeout)
370                 LDLM_ERROR(lock, "enqueue wait took %ldus", total_enqueue_wait);
371
372         req = ptlrpc_prep_req(lock->l_export->exp_ldlm_data.led_import,
373                               LDLM_CP_CALLBACK, 1, &size, NULL);
374         if (!req)
375                 RETURN(-ENOMEM);
376
377         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
378         memcpy(&body->lock_handle1, &lock->l_remote_handle,
379                sizeof(body->lock_handle1));
380         body->lock_flags = flags;
381         ldlm_lock2desc(lock, &body->lock_desc);
382
383         LDLM_DEBUG(lock, "server preparing completion AST (after %ldus wait)",
384                    total_enqueue_wait);
385         req->rq_replen = lustre_msg_size(0, NULL);
386
387         req->rq_level = LUSTRE_CONN_RECOVER;
388         req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
389         rc = ptlrpc_queue_wait(req);
390         if (rc == -ETIMEDOUT || rc == -EINTR) {
391                 ldlm_del_waiting_lock(lock);
392                 ldlm_failed_ast(lock, rc, "completion");
393         } else if (rc) {
394                 CERROR("client returned %d from completion AST for lock %p\n",
395                        req->rq_status, lock);
396                 LDLM_DEBUG(lock, "client returned error %d from completion AST",
397                            req->rq_status);
398                 ldlm_lock_cancel(lock);
399                 /* Server-side AST functions are called from ldlm_reprocess_all,
400                  * which needs to be told to please restart its reprocessing. */
401                 rc = -ERESTART;
402         }
403         ptlrpc_req_finished(req);
404
405         RETURN(rc);
406 }
407
408 int ldlm_handle_enqueue(struct ptlrpc_request *req,
409                         ldlm_completion_callback completion_callback,
410                         ldlm_blocking_callback blocking_callback)
411 {
412         struct obd_device *obddev = req->rq_export->exp_obd;
413         struct ldlm_reply *dlm_rep;
414         struct ldlm_request *dlm_req;
415         int rc, size = sizeof(*dlm_rep), cookielen = 0;
416         __u32 flags;
417         ldlm_error_t err;
418         struct ldlm_lock *lock = NULL;
419         void *cookie = NULL;
420         ENTRY;
421
422         LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
423
424         dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
425                                       lustre_swab_ldlm_request);
426         if (dlm_req == NULL) {
427                 CERROR ("Can't unpack dlm_req\n");
428                 RETURN (-EFAULT);
429         }
430         
431         flags = dlm_req->lock_flags;
432         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN &&
433             (flags & LDLM_FL_HAS_INTENT)) {
434                 /* In this case, the reply buffer is allocated deep in
435                  * local_lock_enqueue by the policy function. */
436                 cookie = req;
437                 cookielen = sizeof(*req);
438         } else {
439                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
440                                      &req->rq_repmsg);
441                 if (rc) {
442                         CERROR("out of memory\n");
443                         RETURN(-ENOMEM);
444                 }
445                 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) {
446                         cookie = &dlm_req->lock_desc.l_extent;
447                         cookielen = sizeof(struct ldlm_extent);
448                 }
449         }
450
451         /* The lock's callback data might be set in the policy function */
452         lock = ldlm_lock_create(obddev->obd_namespace,
453                                 &dlm_req->lock_handle2,
454                                 dlm_req->lock_desc.l_resource.lr_name,
455                                 dlm_req->lock_desc.l_resource.lr_type,
456                                 dlm_req->lock_desc.l_req_mode,
457                                 blocking_callback, NULL);
458         if (!lock)
459                 GOTO(out, err = -ENOMEM);
460
461         do_gettimeofday(&lock->l_enqueued_time);
462         memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
463                sizeof(lock->l_remote_handle));
464         LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
465
466         LASSERT(req->rq_export);
467         lock->l_export = class_export_get(req->rq_export);
468         l_lock(&lock->l_resource->lr_namespace->ns_lock);
469         list_add(&lock->l_export_chain,
470                  &lock->l_export->exp_ldlm_data.led_held_locks);
471         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
472
473         err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, cookielen,
474                                 &flags, completion_callback);
475         if (err)
476                 GOTO(out, err);
477
478         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*dlm_rep));
479         dlm_rep->lock_flags = flags;
480
481         ldlm_lock2handle(lock, &dlm_rep->lock_handle);
482         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
483                 memcpy(&dlm_rep->lock_extent, &lock->l_extent,
484                        sizeof(lock->l_extent));
485         if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED) {
486                 memcpy(&dlm_rep->lock_resource_name, &lock->l_resource->lr_name,
487                        sizeof(dlm_rep->lock_resource_name));
488                 dlm_rep->lock_mode = lock->l_req_mode;
489         }
490
491         EXIT;
492  out:
493         if (lock)
494                 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
495                            "(err=%d)", err);
496         req->rq_status = err;
497
498         /* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this
499          * ldlm_reprocess_all.  If this moves, revisit that code. -phil */
500         if (lock) {
501                 if (!err)
502                         ldlm_reprocess_all(lock->l_resource);
503                 LDLM_LOCK_PUT(lock);
504         }
505         LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p)", lock);
506
507         return 0;
508 }
509
510 int ldlm_handle_convert(struct ptlrpc_request *req)
511 {
512         struct ldlm_request *dlm_req;
513         struct ldlm_reply *dlm_rep;
514         struct ldlm_lock *lock;
515         int rc, size = sizeof(*dlm_rep);
516         ENTRY;
517
518         dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
519                                       lustre_swab_ldlm_request);
520         if (dlm_req == NULL) {
521                 CERROR ("Can't unpack dlm_req\n");
522                 RETURN (-EFAULT);
523         }
524         
525         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
526         if (rc) {
527                 CERROR("out of memory\n");
528                 RETURN(-ENOMEM);
529         }
530         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*dlm_rep));
531         dlm_rep->lock_flags = dlm_req->lock_flags;
532
533         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
534         if (!lock) {
535                 req->rq_status = EINVAL;
536         } else {
537                 LDLM_DEBUG(lock, "server-side convert handler START");
538                 ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
539                                   &dlm_rep->lock_flags);
540                 if (ldlm_del_waiting_lock(lock))
541                         CDEBUG(D_DLMTRACE, "converted waiting lock %p\n", lock);
542                 req->rq_status = 0;
543         }
544
545         if (lock) {
546                 ldlm_reprocess_all(lock->l_resource);
547                 LDLM_DEBUG(lock, "server-side convert handler END");
548                 LDLM_LOCK_PUT(lock);
549         } else
550                 LDLM_DEBUG_NOLOCK("server-side convert handler END");
551
552         RETURN(0);
553 }
554
555 int ldlm_handle_cancel(struct ptlrpc_request *req)
556 {
557         struct ldlm_request *dlm_req;
558         struct ldlm_lock *lock;
559         int rc;
560         ENTRY;
561
562         dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
563                                       lustre_swab_ldlm_request);
564         if (dlm_req == NULL) {
565                 CERROR("bad request buffer for cancel\n");
566                 RETURN(-EFAULT);
567         }
568
569         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
570         if (rc) {
571                 CERROR("out of memory\n");
572                 RETURN(-ENOMEM);
573         }
574
575         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
576         if (!lock) {
577                 CERROR("received cancel for unknown lock cookie "LPX64
578                        " from nid "LPU64"\n", dlm_req->lock_handle1.cookie,
579                        req->rq_connection->c_peer.peer_nid);
580                 LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock "
581                                   "(cookie "LPU64")",
582                                   dlm_req->lock_handle1.cookie);
583                 req->rq_status = ESTALE;
584         } else {
585                 LDLM_DEBUG(lock, "server-side cancel handler START");
586                 ldlm_lock_cancel(lock);
587                 if (ldlm_del_waiting_lock(lock))
588                         CDEBUG(D_DLMTRACE, "cancelled waiting lock %p\n", lock);
589                 req->rq_status = 0;
590         }
591
592         if (ptlrpc_reply(req) != 0)
593                 LBUG();
594
595         if (lock) {
596                 ldlm_reprocess_all(lock->l_resource);
597                 LDLM_DEBUG(lock, "server-side cancel handler END");
598                 LDLM_LOCK_PUT(lock);
599         }
600
601         RETURN(0);
602 }
603
604 static void ldlm_handle_bl_callback(struct ptlrpc_request *req,
605                                     struct ldlm_namespace *ns,
606                                     struct ldlm_request *dlm_req,
607                                     struct ldlm_lock *lock)
608 {
609         int do_ast;
610         ENTRY;
611
612         l_lock(&ns->ns_lock);
613         LDLM_DEBUG(lock, "client blocking AST callback handler START");
614
615         lock->l_flags |= LDLM_FL_CBPENDING;
616         do_ast = (!lock->l_readers && !lock->l_writers);
617
618         if (do_ast) {
619                 LDLM_DEBUG(lock, "already unused, calling "
620                            "callback (%p)", lock->l_blocking_ast);
621                 if (lock->l_blocking_ast != NULL) {
622                         l_unlock(&ns->ns_lock);
623                         lock->l_blocking_ast(lock, &dlm_req->lock_desc,
624                                              lock->l_data, LDLM_CB_BLOCKING);
625                         l_lock(&ns->ns_lock);
626                 }
627         } else {
628                 LDLM_DEBUG(lock, "Lock still has references, will be"
629                            " cancelled later");
630         }
631
632         LDLM_DEBUG(lock, "client blocking callback handler END");
633         l_unlock(&ns->ns_lock);
634         LDLM_LOCK_PUT(lock);
635         EXIT;
636 }
637
638 static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
639                                     struct ldlm_namespace *ns,
640                                     struct ldlm_request *dlm_req,
641                                     struct ldlm_lock *lock)
642 {
643         LIST_HEAD(ast_list);
644         ENTRY;
645
646         l_lock(&ns->ns_lock);
647         LDLM_DEBUG(lock, "client completion callback handler START");
648
649         /* If we receive the completion AST before the actual enqueue returned,
650          * then we might need to switch lock modes, resources, or extents. */
651         if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
652                 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
653                 LDLM_DEBUG(lock, "completion AST, new lock mode");
654         }
655         if (lock->l_resource->lr_type == LDLM_EXTENT)
656                 memcpy(&lock->l_extent, &dlm_req->lock_desc.l_extent,
657                        sizeof(lock->l_extent));
658
659         ldlm_resource_unlink_lock(lock);
660         if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
661                    &lock->l_resource->lr_name,
662                    sizeof(lock->l_resource->lr_name)) != 0) {
663                 ldlm_lock_change_resource(ns, lock,
664                                          dlm_req->lock_desc.l_resource.lr_name);
665                 LDLM_DEBUG(lock, "completion AST, new resource");
666         }
667         lock->l_resource->lr_tmp = &ast_list;
668         ldlm_grant_lock(lock, req, sizeof(*req));
669         lock->l_resource->lr_tmp = NULL;
670         LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
671         l_unlock(&ns->ns_lock);
672         LDLM_LOCK_PUT(lock);
673
674         ldlm_run_ast_work(&ast_list);
675
676         LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
677                           lock);
678         EXIT;
679 }
680
681 static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
682 {
683         req->rq_status = rc;
684         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
685                              &req->rq_repmsg);
686         if (rc)
687                 return rc;
688         return ptlrpc_reply(req);
689 }
690
691 static int ldlm_callback_handler(struct ptlrpc_request *req)
692 {
693         struct ldlm_namespace *ns;
694         struct ldlm_request *dlm_req;
695         struct ldlm_lock *lock;
696         ENTRY;
697
698         /* Requests arrive in sender's byte order.  The ptlrpc service
699          * handler has already checked and, if necessary, byte-swapped the
700          * incoming request message body, but I am responsible for the
701          * message buffers. */
702
703         if (req->rq_export == NULL) {
704                 struct ldlm_request *dlm_req;
705
706                 CDEBUG(D_RPCTRACE, "operation %d from nid "LPU64" with bad "
707                        "export cookie "LPX64" (ptl req %d/rep %d); this is "
708                        "normal if this node rebooted with a lock held\n",
709                        req->rq_reqmsg->opc, req->rq_connection->c_peer.peer_nid,
710                        req->rq_reqmsg->handle.cookie,
711                        req->rq_request_portal, req->rq_reply_portal);
712
713                 dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
714                                              lustre_swab_ldlm_request);
715                 if (dlm_req != NULL)
716                         CDEBUG(D_RPCTRACE, "--> lock cookie: "LPX64"\n",
717                                dlm_req->lock_handle1.cookie);
718
719                 ldlm_callback_reply(req, -ENOTCONN);
720                 RETURN(0);
721         }
722
723         if (req->rq_reqmsg->opc == LDLM_BL_CALLBACK) {
724                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
725         } else if (req->rq_reqmsg->opc == LDLM_CP_CALLBACK) {
726                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
727         } else {
728                 ldlm_callback_reply(req, -EPROTO);
729                 RETURN(0);
730         }
731
732         LASSERT(req->rq_export != NULL);
733         LASSERT(req->rq_export->exp_obd != NULL);
734         ns = req->rq_export->exp_obd->obd_namespace;
735         LASSERT(ns != NULL);
736
737         dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
738                                       lustre_swab_ldlm_request);
739         if (dlm_req == NULL) {
740                 CERROR ("can't unpack dlm_req\n");
741                 ldlm_callback_reply (req, -EPROTO);
742                 RETURN (0);
743         }
744         
745         lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle1);
746         if (!lock) {
747                 CDEBUG(D_INODE, "callback on lock "LPX64" - lock disappeared\n",
748                        dlm_req->lock_handle1.cookie);
749                 ldlm_callback_reply(req, -EINVAL);
750                 RETURN(0);
751         }
752
753         /* we want the ost thread to get this reply so that it can respond
754          * to ost requests (write cache writeback) that might be triggered
755          * in the callback */
756         ldlm_callback_reply(req, 0);
757
758         switch (req->rq_reqmsg->opc) {
759         case LDLM_BL_CALLBACK:
760                 CDEBUG(D_INODE, "blocking ast\n");
761                 ldlm_handle_bl_callback(req, ns, dlm_req, lock);
762                 break;
763         case LDLM_CP_CALLBACK:
764                 CDEBUG(D_INODE, "completion ast\n");
765                 ldlm_handle_cp_callback(req, ns, dlm_req, lock);
766                 break;
767         default:
768                 LBUG();                         /* checked above */
769         }
770
771         RETURN(0);
772 }
773
774 static int ldlm_cancel_handler(struct ptlrpc_request *req)
775 {
776         int rc;
777         ENTRY;
778
779         /* Requests arrive in sender's byte order.  The ptlrpc service
780          * handler has already checked and, if necessary, byte-swapped the
781          * incoming request message body, but I am responsible for the
782          * message buffers. */
783
784         if (req->rq_export == NULL) {
785                 struct ldlm_request *dlm_req;
786                 CERROR("operation %d with bad export (ptl req %d/rep %d)\n",
787                        req->rq_reqmsg->opc, req->rq_request_portal,
788                        req->rq_reply_portal);
789                 CERROR("--> export cookie: "LPX64"\n",
790                        req->rq_reqmsg->handle.cookie);
791                 dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
792                                              lustre_swab_ldlm_request);
793                 if (dlm_req != NULL)
794                         ldlm_lock_dump_handle(D_ERROR, &dlm_req->lock_handle1);
795                 RETURN(-ENOTCONN);
796         }
797
798         switch (req->rq_reqmsg->opc) {
799
800         /* XXX FIXME move this back to mds/handler.c, bug 249 */
801         case LDLM_CANCEL:
802                 CDEBUG(D_INODE, "cancel\n");
803                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
804                 rc = ldlm_handle_cancel(req);
805                 if (rc)
806                         break;
807                 RETURN(0);
808
809         default:
810                 CERROR("invalid opcode %d\n", req->rq_reqmsg->opc);
811                 RETURN(-EINVAL);
812         }
813
814         RETURN(0);
815 }
816
817 static int ldlm_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
818                           void *karg, void *uarg)
819 {
820         struct obd_device *obddev = class_conn2obd(conn);
821         struct ptlrpc_connection *connection;
822         struct obd_uuid uuid = { "ldlm" };
823         int err = 0;
824         ENTRY;
825
826         if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
827             _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
828                 CDEBUG(D_IOCTL, "invalid ioctl (type %d, nr %d, size %d)\n",
829                        _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
830                 RETURN(-EINVAL);
831         }
832
833         OBD_ALLOC(obddev->u.ldlm.ldlm_client,
834                   sizeof(*obddev->u.ldlm.ldlm_client));
835         connection = ptlrpc_uuid_to_connection(&uuid);
836         if (!connection)
837                 CERROR("No LDLM UUID found: assuming ldlm is local.\n");
838
839         switch (cmd) {
840         case IOC_LDLM_TEST:
841                 //err = ldlm_test(obddev, conn);
842                 err = 0;
843                 CERROR("-- NO TESTS WERE RUN done err %d\n", err);
844                 GOTO(out, err);
845         case IOC_LDLM_DUMP:
846                 ldlm_dump_all_namespaces();
847                 GOTO(out, err);
848         default:
849                 GOTO(out, err = -EINVAL);
850         }
851
852  out:
853         if (connection)
854                 ptlrpc_put_connection(connection);
855         OBD_FREE(obddev->u.ldlm.ldlm_client,
856                  sizeof(*obddev->u.ldlm.ldlm_client));
857         return err;
858 }
859
860 static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
861 {
862         struct ldlm_obd *ldlm = &obddev->u.ldlm;
863         int rc, i;
864         ENTRY;
865
866         if (ldlm_already_setup)
867                 RETURN(-EALREADY);
868
869         rc = ldlm_proc_setup(obddev);
870         if (rc != 0)
871                 RETURN(rc);
872
873 #ifdef __KERNEL__
874         inter_module_register("ldlm_cli_cancel_unused", THIS_MODULE,
875                               ldlm_cli_cancel_unused);
876         inter_module_register("ldlm_namespace_cleanup", THIS_MODULE,
877                               ldlm_namespace_cleanup);
878         inter_module_register("ldlm_replay_locks", THIS_MODULE,
879                               ldlm_replay_locks);
880
881         ldlm->ldlm_cb_service =
882                 ptlrpc_init_svc(LDLM_NEVENTS, LDLM_NBUFS, LDLM_BUFSIZE,
883                                 LDLM_MAXREQSIZE, LDLM_CB_REQUEST_PORTAL,
884                                 LDLM_CB_REPLY_PORTAL,
885                                 ldlm_callback_handler, "ldlm_cbd", obddev);
886
887         if (!ldlm->ldlm_cb_service) {
888                 CERROR("failed to start service\n");
889                 GOTO(out_proc, rc = -ENOMEM);
890         }
891
892         ldlm->ldlm_cancel_service =
893                 ptlrpc_init_svc(LDLM_NEVENTS, LDLM_NBUFS, LDLM_BUFSIZE,
894                                 LDLM_MAXREQSIZE, LDLM_CANCEL_REQUEST_PORTAL,
895                                 LDLM_CANCEL_REPLY_PORTAL,
896                                 ldlm_cancel_handler, "ldlm_canceld", obddev);
897
898         if (!ldlm->ldlm_cancel_service) {
899                 CERROR("failed to start service\n");
900                 GOTO(out_proc, rc = -ENOMEM);
901         }
902
903         for (i = 0; i < LDLM_NUM_THREADS; i++) {
904                 char name[32];
905                 sprintf(name, "ldlm_cn_%02d", i);
906                 rc = ptlrpc_start_thread(obddev, ldlm->ldlm_cancel_service,
907                                          name);
908                 if (rc) {
909                         CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
910                         LBUG();
911                         GOTO(out_thread, rc);
912                 }
913         }
914
915         for (i = 0; i < LDLM_NUM_THREADS; i++) {
916                 char name[32];
917                 sprintf(name, "ldlm_cb_%02d", i);
918                 rc = ptlrpc_start_thread(obddev, ldlm->ldlm_cb_service, name);
919                 if (rc) {
920                         CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
921                         LBUG();
922                         GOTO(out_thread, rc);
923                 }
924         }
925
926         INIT_LIST_HEAD(&expired_lock_thread.elt_expired_locks);
927         spin_lock_init(&expired_lock_thread.elt_lock);
928         expired_lock_thread.elt_state = ELT_STOPPED;
929         init_waitqueue_head(&expired_lock_thread.elt_waitq);
930
931         rc = kernel_thread(expired_lock_main, NULL, CLONE_VM | CLONE_FS);
932         if (rc < 0) {
933                 CERROR("Cannot start ldlm expired-lock thread: %d\n", rc);
934                 GOTO(out_thread, rc);
935         }
936
937         wait_event(expired_lock_thread.elt_waitq,
938                    expired_lock_thread.elt_state == ELT_READY);
939
940         INIT_LIST_HEAD(&waiting_locks_list);
941         spin_lock_init(&waiting_locks_spinlock);
942         waiting_locks_timer.function = waiting_locks_callback;
943         waiting_locks_timer.data = 0;
944         init_timer(&waiting_locks_timer);
945 #endif
946
947         ldlm_already_setup = 1;
948
949         RETURN(0);
950
951  out_thread:
952 #ifdef __KERNEL__
953         ptlrpc_stop_all_threads(ldlm->ldlm_cancel_service);
954         ptlrpc_unregister_service(ldlm->ldlm_cancel_service);
955         ptlrpc_stop_all_threads(ldlm->ldlm_cb_service);
956         ptlrpc_unregister_service(ldlm->ldlm_cb_service);
957 #endif
958  out_proc:
959         ldlm_proc_cleanup(obddev);
960
961         return rc;
962 }
963
964 static int ldlm_cleanup(struct obd_device *obddev, int flags)
965 {
966         struct ldlm_obd *ldlm = &obddev->u.ldlm;
967         ENTRY;
968
969         if (!list_empty(&ldlm_namespace_list)) {
970                 CERROR("ldlm still has namespaces; clean these up first.\n");
971                 ldlm_dump_all_namespaces();
972                 RETURN(-EBUSY);
973         }
974
975 #ifdef __KERNEL__
976         if (flags & OBD_OPT_FORCE) {
977                 ptlrpc_put_ldlm_hooks();
978         } else if (ptlrpc_ldlm_hooks_referenced()) {
979                 CERROR("Some connections weren't cleaned up; run lconf with "
980                        "--force to forcibly unload.\n");
981                 ptlrpc_dump_connections();
982                 RETURN(-EBUSY);
983         }
984
985         ptlrpc_stop_all_threads(ldlm->ldlm_cb_service);
986         ptlrpc_unregister_service(ldlm->ldlm_cb_service);
987         ptlrpc_stop_all_threads(ldlm->ldlm_cancel_service);
988         ptlrpc_unregister_service(ldlm->ldlm_cancel_service);
989         ldlm_proc_cleanup(obddev);
990
991         expired_lock_thread.elt_state = ELT_TERMINATE;
992         wake_up(&expired_lock_thread.elt_waitq);
993         wait_event(expired_lock_thread.elt_waitq,
994                    expired_lock_thread.elt_state == ELT_STOPPED);
995
996         inter_module_unregister("ldlm_namespace_cleanup");
997         inter_module_unregister("ldlm_cli_cancel_unused");
998         inter_module_unregister("ldlm_replay_locks");
999 #endif
1000
1001         ldlm_already_setup = 0;
1002         RETURN(0);
1003 }
1004
1005 static int ldlm_connect(struct lustre_handle *conn, struct obd_device *src,
1006                         struct obd_uuid *cluuid)
1007 {
1008         return class_connect(conn, src, cluuid);
1009 }
1010
1011 struct obd_ops ldlm_obd_ops = {
1012         o_owner:       THIS_MODULE,
1013         o_iocontrol:   ldlm_iocontrol,
1014         o_setup:       ldlm_setup,
1015         o_cleanup:     ldlm_cleanup,
1016         o_connect:     ldlm_connect,
1017         o_disconnect:  class_disconnect
1018 };
1019
1020 int __init ldlm_init(void)
1021 {
1022         int rc = class_register_type(&ldlm_obd_ops, 0, OBD_LDLM_DEVICENAME);
1023         if (rc != 0)
1024                 return rc;
1025
1026         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
1027                                                sizeof(struct ldlm_resource), 0,
1028                                                SLAB_HWCACHE_ALIGN, NULL, NULL);
1029         if (ldlm_resource_slab == NULL)
1030                 return -ENOMEM;
1031
1032         ldlm_lock_slab = kmem_cache_create("ldlm_locks",
1033                                            sizeof(struct ldlm_lock), 0,
1034                                            SLAB_HWCACHE_ALIGN, NULL, NULL);
1035         if (ldlm_lock_slab == NULL) {
1036                 kmem_cache_destroy(ldlm_resource_slab);
1037                 return -ENOMEM;
1038         }
1039
1040         l_lock_init(&ldlm_handle_lock);
1041
1042         return 0;
1043 }
1044
1045 static void __exit ldlm_exit(void)
1046 {
1047         class_unregister_type(OBD_LDLM_DEVICENAME);
1048         if (kmem_cache_destroy(ldlm_resource_slab) != 0)
1049                 CERROR("couldn't free ldlm resource slab\n");
1050         if (kmem_cache_destroy(ldlm_lock_slab) != 0)
1051                 CERROR("couldn't free ldlm lock slab\n");
1052 }
1053
1054 /* ldlm_lock.c */
1055 EXPORT_SYMBOL(ldlm_lock2desc);
1056 EXPORT_SYMBOL(ldlm_register_intent);
1057 EXPORT_SYMBOL(ldlm_unregister_intent);
1058 EXPORT_SYMBOL(ldlm_lockname);
1059 EXPORT_SYMBOL(ldlm_typename);
1060 EXPORT_SYMBOL(ldlm_lock2handle);
1061 EXPORT_SYMBOL(__ldlm_handle2lock);
1062 EXPORT_SYMBOL(ldlm_lock_put);
1063 EXPORT_SYMBOL(ldlm_lock_match);
1064 EXPORT_SYMBOL(ldlm_lock_cancel);
1065 EXPORT_SYMBOL(ldlm_lock_addref);
1066 EXPORT_SYMBOL(ldlm_lock_decref);
1067 EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
1068 EXPORT_SYMBOL(ldlm_lock_change_resource);
1069 EXPORT_SYMBOL(ldlm_lock_set_data);
1070 EXPORT_SYMBOL(ldlm_it2str);
1071 EXPORT_SYMBOL(ldlm_lock_dump);
1072 EXPORT_SYMBOL(ldlm_lock_dump_handle);
1073 EXPORT_SYMBOL(ldlm_cancel_locks_for_export);
1074 EXPORT_SYMBOL(ldlm_reprocess_all_ns);
1075
1076 /* ldlm_request.c */
1077 EXPORT_SYMBOL(ldlm_completion_ast);
1078 EXPORT_SYMBOL(ldlm_expired_completion_wait);
1079 EXPORT_SYMBOL(ldlm_cli_convert);
1080 EXPORT_SYMBOL(ldlm_cli_enqueue);
1081 EXPORT_SYMBOL(ldlm_cli_cancel);
1082 EXPORT_SYMBOL(ldlm_cli_cancel_unused);
1083 EXPORT_SYMBOL(ldlm_replay_locks);
1084 EXPORT_SYMBOL(ldlm_resource_foreach);
1085 EXPORT_SYMBOL(ldlm_namespace_foreach);
1086 EXPORT_SYMBOL(ldlm_namespace_foreach_res);
1087 EXPORT_SYMBOL(ldlm_change_cbdata);
1088
1089 /* ldlm_lockd.c */
1090 EXPORT_SYMBOL(ldlm_server_blocking_ast);
1091 EXPORT_SYMBOL(ldlm_server_completion_ast);
1092 EXPORT_SYMBOL(ldlm_handle_enqueue);
1093 EXPORT_SYMBOL(ldlm_handle_cancel);
1094 EXPORT_SYMBOL(ldlm_handle_convert);
1095 EXPORT_SYMBOL(ldlm_del_waiting_lock);
1096
1097 #if 0
1098 /* ldlm_test.c */
1099 EXPORT_SYMBOL(ldlm_test);
1100 EXPORT_SYMBOL(ldlm_regression_start);
1101 EXPORT_SYMBOL(ldlm_regression_stop);
1102 #endif
1103
1104 /* ldlm_resource.c */
1105 EXPORT_SYMBOL(ldlm_namespace_new);
1106 EXPORT_SYMBOL(ldlm_namespace_cleanup);
1107 EXPORT_SYMBOL(ldlm_namespace_free);
1108 EXPORT_SYMBOL(ldlm_namespace_dump);
1109
1110 /* l_lock.c */
1111 EXPORT_SYMBOL(l_lock);
1112 EXPORT_SYMBOL(l_unlock);
1113
1114 /* ldlm_lib.c */
1115 EXPORT_SYMBOL(client_import_connect);
1116 EXPORT_SYMBOL(client_import_disconnect);
1117 EXPORT_SYMBOL(target_abort_recovery);
1118 EXPORT_SYMBOL(target_handle_connect);
1119 EXPORT_SYMBOL(target_cancel_recovery_timer);
1120 EXPORT_SYMBOL(target_send_reply);
1121 EXPORT_SYMBOL(target_queue_recovery_request);
1122 EXPORT_SYMBOL(target_handle_ping);
1123 EXPORT_SYMBOL(target_handle_disconnect);
1124 EXPORT_SYMBOL(target_queue_final_reply);
1125
1126 #ifdef __KERNEL__
1127 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1128 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
1129 MODULE_LICENSE("GPL");
1130
1131 module_init(ldlm_init);
1132 module_exit(ldlm_exit);
1133 #endif