Whamcloud - gitweb
merge b_devel into HEAD (20030703)
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #define EXPORT_SYMTAB
25 #define DEBUG_SUBSYSTEM S_LDLM
26
27 #ifdef __KERNEL__
28 # include <linux/module.h>
29 # include <linux/slab.h>
30 # include <linux/init.h>
31 # include <linux/wait.h>
32 #else
33 # include <liblustre.h>
34 #endif
35
36 #include <linux/lustre_dlm.h>
37 #include <linux/obd_class.h>
38 extern kmem_cache_t *ldlm_resource_slab;
39 extern kmem_cache_t *ldlm_lock_slab;
40 extern struct lustre_lock ldlm_handle_lock;
41 extern struct list_head ldlm_namespace_list;
42 extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
43 extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
44
45 static int ldlm_already_setup = 0;
46
47 #ifdef __KERNEL__
48
49 inline unsigned long round_timeout(unsigned long timeout)
50 {
51         return ((timeout / HZ) + 1) * HZ;
52 }
53
54 /* XXX should this be per-ldlm? */
55 static struct list_head waiting_locks_list;
56 static spinlock_t waiting_locks_spinlock;
57 static struct timer_list waiting_locks_timer;
58
59 static struct expired_lock_thread {
60         wait_queue_head_t         elt_waitq;
61         int                       elt_state;
62         struct list_head          elt_expired_locks;
63         spinlock_t                elt_lock;
64 } expired_lock_thread;
65
66 #define ELT_STOPPED   0
67 #define ELT_READY     1
68 #define ELT_TERMINATE 2
69
70 static inline int have_expired_locks(void)
71 {
72         int need_to_run;
73
74         spin_lock_bh(&expired_lock_thread.elt_lock);
75         need_to_run = !list_empty(&expired_lock_thread.elt_expired_locks);
76         spin_unlock_bh(&expired_lock_thread.elt_lock);
77
78         RETURN(need_to_run);
79 }
80
81 static int expired_lock_main(void *arg)
82 {
83         struct list_head *expired = &expired_lock_thread.elt_expired_locks;
84         struct l_wait_info lwi = { 0 };
85         unsigned long flags;
86
87         ENTRY;
88         lock_kernel();
89         kportal_daemonize("ldlm_elt");
90         
91         SIGNAL_MASK_LOCK(current, flags);
92         sigfillset(&current->blocked);
93         RECALC_SIGPENDING;
94         SIGNAL_MASK_UNLOCK(current, flags);
95         
96         unlock_kernel();
97         
98         expired_lock_thread.elt_state = ELT_READY;
99         wake_up(&expired_lock_thread.elt_waitq);
100         
101         while (1) {
102                 l_wait_event(expired_lock_thread.elt_waitq,
103                              have_expired_locks() ||
104                              expired_lock_thread.elt_state == ELT_TERMINATE,
105                              &lwi);
106
107                 spin_lock_bh(&expired_lock_thread.elt_lock);
108                 while (!list_empty(expired)) {
109                         struct ldlm_lock *lock = list_entry(expired->next,
110                                                             struct ldlm_lock,
111                                                             l_pending_chain);
112                         spin_unlock_bh(&expired_lock_thread.elt_lock);
113                         
114                         ptlrpc_fail_export(lock->l_export);
115
116                         spin_lock_bh(&expired_lock_thread.elt_lock);
117                 }
118                 spin_unlock_bh(&expired_lock_thread.elt_lock);
119
120                 if (expired_lock_thread.elt_state == ELT_TERMINATE)
121                         break;
122         }
123
124         expired_lock_thread.elt_state = ELT_STOPPED;
125         wake_up(&expired_lock_thread.elt_waitq);
126         RETURN(0);
127 }
128
129 static void waiting_locks_callback(unsigned long unused)
130 {
131         struct ldlm_lock *lock;
132
133         spin_lock_bh(&waiting_locks_spinlock);
134         while (!list_empty(&waiting_locks_list)) {
135                 lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
136                                   l_pending_chain);
137
138                 if (lock->l_callback_timeout > jiffies)
139                         break;
140
141                 LDLM_ERROR(lock, "lock callback timer expired: evicting client "
142                            "%s@%s nid "LPU64,
143                            lock->l_export->exp_client_uuid.uuid,
144                            lock->l_export->exp_connection->c_remote_uuid.uuid,
145                            lock->l_export->exp_connection->c_peer.peer_nid);
146
147                 spin_lock_bh(&expired_lock_thread.elt_lock);
148                 list_del(&lock->l_pending_chain);
149                 list_add(&lock->l_pending_chain,
150                          &expired_lock_thread.elt_expired_locks);
151                 spin_unlock_bh(&expired_lock_thread.elt_lock);
152                 wake_up(&expired_lock_thread.elt_waitq);
153         }
154
155         spin_unlock_bh(&waiting_locks_spinlock);
156 }
157
158 /*
159  * Indicate that we're waiting for a client to call us back cancelling a given
160  * lock.  We add it to the pending-callback chain, and schedule the lock-timeout
161  * timer to fire appropriately.  (We round up to the next second, to avoid
162  * floods of timer firings during periods of high lock contention and traffic).
163  */
164 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
165 {
166         unsigned long timeout_rounded;
167
168         LDLM_DEBUG(lock, "adding to wait list");
169         LASSERT(list_empty(&lock->l_pending_chain));
170
171         spin_lock_bh(&waiting_locks_spinlock);
172         lock->l_callback_timeout = jiffies + (obd_timeout * HZ / 2);
173
174         timeout_rounded = round_timeout(lock->l_callback_timeout);
175
176         if (timeout_rounded < waiting_locks_timer.expires ||
177             !timer_pending(&waiting_locks_timer)) {
178                 mod_timer(&waiting_locks_timer, timeout_rounded);
179         }
180         list_add_tail(&lock->l_pending_chain, &waiting_locks_list); /* FIFO */
181         spin_unlock_bh(&waiting_locks_spinlock);
182         /* We drop this ref when we get removed from the list. */
183         class_export_get(lock->l_export);
184         return 1;
185 }
186
187 /*
188  * Remove a lock from the pending list, likely because it had its cancellation
189  * callback arrive without incident.  This adjusts the lock-timeout timer if
190  * needed.  Returns 0 if the lock wasn't pending after all, 1 if it was.
191  */
192 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
193 {
194         struct list_head *list_next;
195
196         if (lock->l_export == NULL) {
197                 /* We don't have a "waiting locks list" on clients. */
198                 LDLM_DEBUG(lock, "client lock: no-op");
199                 return 0;
200         }
201
202         spin_lock_bh(&waiting_locks_spinlock);
203
204         if (list_empty(&lock->l_pending_chain)) {
205                 spin_unlock_bh(&waiting_locks_spinlock);
206                 LDLM_DEBUG(lock, "wasn't waiting");
207                 return 0;
208         }
209
210         list_next = lock->l_pending_chain.next;
211         if (lock->l_pending_chain.prev == &waiting_locks_list) {
212                 /* Removing the head of the list, adjust timer. */
213                 if (list_next == &waiting_locks_list) {
214                         /* No more, just cancel. */
215                         del_timer(&waiting_locks_timer);
216                 } else {
217                         struct ldlm_lock *next;
218                         next = list_entry(list_next, struct ldlm_lock,
219                                           l_pending_chain);
220                         mod_timer(&waiting_locks_timer,
221                                   round_timeout(next->l_callback_timeout));
222                 }
223         }
224         list_del_init(&lock->l_pending_chain);
225         spin_unlock_bh(&waiting_locks_spinlock);
226         /* We got this ref when we were added to the list. */
227         class_export_put(lock->l_export);
228         LDLM_DEBUG(lock, "removed");
229         return 1;
230 }
231
232 #else /* !__KERNEL__ */
233
234 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
235 {
236         RETURN(1);
237 }
238
239 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
240 {
241         RETURN(0);
242 }
243
244 #endif /* __KERNEL__ */
245
246 static inline void ldlm_failed_ast(struct ldlm_lock *lock, int rc,
247                                    char *ast_type)
248 {
249         CERROR("%s AST failed (%d) for res "LPU64"/"LPU64
250                ", mode %s: evicting client %s@%s NID "LPU64"\n",
251                ast_type, rc,
252                lock->l_resource->lr_name.name[0],
253                lock->l_resource->lr_name.name[1],
254                ldlm_lockname[lock->l_granted_mode],
255                lock->l_export->exp_client_uuid.uuid,
256                lock->l_export->exp_connection->c_remote_uuid.uuid,
257                lock->l_export->exp_connection->c_peer.peer_nid);
258         ptlrpc_fail_export(lock->l_export);
259 }
260
261 int ldlm_server_blocking_ast(struct ldlm_lock *lock,
262                              struct ldlm_lock_desc *desc,
263                              void *data, int flag)
264 {
265         struct ldlm_request *body;
266         struct ptlrpc_request *req;
267         int rc = 0, size = sizeof(*body);
268         ENTRY;
269
270         if (flag == LDLM_CB_CANCELING) {
271                 /* Don't need to do anything here. */
272                 RETURN(0);
273         }
274
275         LASSERT(lock);
276
277         l_lock(&lock->l_resource->lr_namespace->ns_lock);
278         /* XXX This is necessary because, with the lock re-tasking, we actually
279          * _can_ get called in here twice.  (bug 830) */
280         if (!list_empty(&lock->l_pending_chain)) {
281                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
282                 RETURN(0);
283         }
284
285         if (lock->l_destroyed) {
286                 /* What's the point? */
287                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
288                 RETURN(0);
289         }
290
291 #if 0
292         if (LTIME_S(CURRENT_TIME) - lock->l_export->exp_last_request_time > 30){
293                 ldlm_failed_ast(lock, -ETIMEDOUT, "Not-attempted blocking");
294                 RETURN(-ETIMEDOUT);
295         }
296 #endif
297
298         req = ptlrpc_prep_req(lock->l_export->exp_ldlm_data.led_import,
299                               LDLM_BL_CALLBACK, 1, &size, NULL);
300         if (!req)
301                 RETURN(-ENOMEM);
302
303         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
304         memcpy(&body->lock_handle1, &lock->l_remote_handle,
305                sizeof(body->lock_handle1));
306         memcpy(&body->lock_desc, desc, sizeof(*desc));
307
308         LDLM_DEBUG(lock, "server preparing blocking AST");
309         req->rq_replen = lustre_msg_size(0, NULL);
310
311         ldlm_add_waiting_lock(lock);
312         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
313
314         req->rq_level = LUSTRE_CONN_RECOVER;
315         req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
316         rc = ptlrpc_queue_wait(req);
317         if (rc == -ETIMEDOUT || rc == -EINTR) {
318                 ldlm_del_waiting_lock(lock);
319                 ldlm_failed_ast(lock, rc, "blocking");
320         } else if (rc) {
321                 if (rc == -EINVAL)
322                         CDEBUG(D_DLMTRACE, "client (nid "LPU64") returned %d "
323                                "from blocking AST for lock %p--normal race\n",
324                                req->rq_connection->c_peer.peer_nid,
325                                req->rq_repmsg->status, lock);
326                 else if (rc == -ENOTCONN)
327                         CDEBUG(D_DLMTRACE, "client (nid "LPU64") returned %d "
328                                "from blocking AST for lock %p--this client was "
329                                "probably rebooted while it held a lock, nothing"
330                                " serious\n",req->rq_connection->c_peer.peer_nid,
331                                req->rq_repmsg->status, lock);
332                 else
333                         CDEBUG(D_ERROR, "client (nid "LPU64") returned %d "
334                                "from blocking AST for lock %p\n",
335                                req->rq_connection->c_peer.peer_nid,
336                                req->rq_repmsg->status, lock);
337                 LDLM_DEBUG(lock, "client returned error %d from blocking AST",
338                            req->rq_status);
339                 ldlm_lock_cancel(lock);
340                 /* Server-side AST functions are called from ldlm_reprocess_all,
341                  * which needs to be told to please restart its reprocessing. */
342                 rc = -ERESTART;
343         }
344
345         ptlrpc_req_finished(req);
346
347         RETURN(rc);
348 }
349
350 int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
351 {
352         struct ldlm_request *body;
353         struct ptlrpc_request *req;
354         int rc = 0, size = sizeof(*body);
355         ENTRY;
356
357         if (lock == NULL) {
358                 LBUG();
359                 RETURN(-EINVAL);
360         }
361
362         req = ptlrpc_prep_req(lock->l_export->exp_ldlm_data.led_import,
363                               LDLM_CP_CALLBACK, 1, &size, NULL);
364         if (!req)
365                 RETURN(-ENOMEM);
366
367         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
368         memcpy(&body->lock_handle1, &lock->l_remote_handle,
369                sizeof(body->lock_handle1));
370         body->lock_flags = flags;
371         ldlm_lock2desc(lock, &body->lock_desc);
372
373         LDLM_DEBUG(lock, "server preparing completion AST");
374         req->rq_replen = lustre_msg_size(0, NULL);
375
376         req->rq_level = LUSTRE_CONN_RECOVER;
377         req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
378         rc = ptlrpc_queue_wait(req);
379         if (rc == -ETIMEDOUT || rc == -EINTR) {
380                 ldlm_del_waiting_lock(lock);
381                 ldlm_failed_ast(lock, rc, "completion");
382         } else if (rc) {
383                 CERROR("client returned %d from completion AST for lock %p\n",
384                        req->rq_status, lock);
385                 LDLM_DEBUG(lock, "client returned error %d from completion AST",
386                            req->rq_status);
387                 ldlm_lock_cancel(lock);
388                 /* Server-side AST functions are called from ldlm_reprocess_all,
389                  * which needs to be told to please restart its reprocessing. */
390                 rc = -ERESTART;
391         }
392         ptlrpc_req_finished(req);
393
394         RETURN(rc);
395 }
396
397 int ldlm_handle_enqueue(struct ptlrpc_request *req,
398                         ldlm_completion_callback completion_callback,
399                         ldlm_blocking_callback blocking_callback)
400 {
401         struct obd_device *obddev = req->rq_export->exp_obd;
402         struct ldlm_reply *dlm_rep;
403         struct ldlm_request *dlm_req;
404         int rc, size = sizeof(*dlm_rep), cookielen = 0;
405         __u32 flags;
406         ldlm_error_t err;
407         struct ldlm_lock *lock = NULL;
408         void *cookie = NULL;
409         ENTRY;
410
411         LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
412
413         dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
414                                       lustre_swab_ldlm_request);
415         if (dlm_req == NULL) {
416                 CERROR ("Can't unpack dlm_req\n");
417                 RETURN (-EFAULT);
418         }
419         
420         flags = dlm_req->lock_flags;
421         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN &&
422             (flags & LDLM_FL_HAS_INTENT)) {
423                 /* In this case, the reply buffer is allocated deep in
424                  * local_lock_enqueue by the policy function. */
425                 cookie = req;
426                 cookielen = sizeof(*req);
427         } else {
428                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
429                                      &req->rq_repmsg);
430                 if (rc) {
431                         CERROR("out of memory\n");
432                         RETURN(-ENOMEM);
433                 }
434                 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) {
435                         cookie = &dlm_req->lock_desc.l_extent;
436                         cookielen = sizeof(struct ldlm_extent);
437                 }
438         }
439
440         /* The lock's callback data might be set in the policy function */
441         lock = ldlm_lock_create(obddev->obd_namespace,
442                                 &dlm_req->lock_handle2,
443                                 dlm_req->lock_desc.l_resource.lr_name,
444                                 dlm_req->lock_desc.l_resource.lr_type,
445                                 dlm_req->lock_desc.l_req_mode,
446                                 blocking_callback, NULL);
447         if (!lock)
448                 GOTO(out, err = -ENOMEM);
449
450         memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
451                sizeof(lock->l_remote_handle));
452         LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
453
454         LASSERT(req->rq_export);
455         lock->l_export = req->rq_export;
456         l_lock(&lock->l_resource->lr_namespace->ns_lock);
457         list_add(&lock->l_export_chain,
458                  &lock->l_export->exp_ldlm_data.led_held_locks);
459         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
460
461         err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, cookielen,
462                                 &flags, completion_callback);
463         if (err)
464                 GOTO(out, err);
465
466         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*dlm_rep));
467         dlm_rep->lock_flags = flags;
468
469         ldlm_lock2handle(lock, &dlm_rep->lock_handle);
470         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
471                 memcpy(&dlm_rep->lock_extent, &lock->l_extent,
472                        sizeof(lock->l_extent));
473         if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED) {
474                 memcpy(&dlm_rep->lock_resource_name, &lock->l_resource->lr_name,
475                        sizeof(dlm_rep->lock_resource_name));
476                 dlm_rep->lock_mode = lock->l_req_mode;
477         }
478
479         EXIT;
480  out:
481         if (lock)
482                 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
483                            "(err=%d)", err);
484         req->rq_status = err;
485
486         /* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this
487          * ldlm_reprocess_all.  If this moves, revisit that code. -phil */
488         if (lock) {
489                 if (!err)
490                         ldlm_reprocess_all(lock->l_resource);
491                 LDLM_LOCK_PUT(lock);
492         }
493         LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p)", lock);
494
495         return 0;
496 }
497
498 int ldlm_handle_convert(struct ptlrpc_request *req)
499 {
500         struct ldlm_request *dlm_req;
501         struct ldlm_reply *dlm_rep;
502         struct ldlm_lock *lock;
503         int rc, size = sizeof(*dlm_rep);
504         ENTRY;
505
506         dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
507                                       lustre_swab_ldlm_request);
508         if (dlm_req == NULL) {
509                 CERROR ("Can't unpack dlm_req\n");
510                 RETURN (-EFAULT);
511         }
512         
513         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
514         if (rc) {
515                 CERROR("out of memory\n");
516                 RETURN(-ENOMEM);
517         }
518         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*dlm_rep));
519         dlm_rep->lock_flags = dlm_req->lock_flags;
520
521         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
522         if (!lock) {
523                 req->rq_status = EINVAL;
524         } else {
525                 LDLM_DEBUG(lock, "server-side convert handler START");
526                 ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
527                                   &dlm_rep->lock_flags);
528                 if (ldlm_del_waiting_lock(lock))
529                         CDEBUG(D_DLMTRACE, "converted waiting lock %p\n", lock);
530                 req->rq_status = 0;
531         }
532
533         if (lock) {
534                 ldlm_reprocess_all(lock->l_resource);
535                 LDLM_DEBUG(lock, "server-side convert handler END");
536                 LDLM_LOCK_PUT(lock);
537         } else
538                 LDLM_DEBUG_NOLOCK("server-side convert handler END");
539
540         RETURN(0);
541 }
542
543 int ldlm_handle_cancel(struct ptlrpc_request *req)
544 {
545         struct ldlm_request *dlm_req;
546         struct ldlm_lock *lock;
547         int rc;
548         ENTRY;
549
550         dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
551                                       lustre_swab_ldlm_request);
552         if (dlm_req == NULL) {
553                 CERROR("bad request buffer for cancel\n");
554                 RETURN(-EFAULT);
555         }
556
557         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
558         if (rc) {
559                 CERROR("out of memory\n");
560                 RETURN(-ENOMEM);
561         }
562
563         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
564         if (!lock) {
565                 CERROR("received cancel for unknown lock cookie "LPX64
566                        " from nid "LPU64"\n", dlm_req->lock_handle1.cookie,
567                        req->rq_connection->c_peer.peer_nid);
568                 LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock "
569                                   "(cookie "LPU64")",
570                                   dlm_req->lock_handle1.cookie);
571                 req->rq_status = ESTALE;
572         } else {
573                 LDLM_DEBUG(lock, "server-side cancel handler START");
574                 ldlm_lock_cancel(lock);
575                 if (ldlm_del_waiting_lock(lock))
576                         CDEBUG(D_DLMTRACE, "cancelled waiting lock %p\n", lock);
577                 req->rq_status = 0;
578         }
579
580         if (ptlrpc_reply(req) != 0)
581                 LBUG();
582
583         if (lock) {
584                 ldlm_reprocess_all(lock->l_resource);
585                 LDLM_DEBUG(lock, "server-side cancel handler END");
586                 LDLM_LOCK_PUT(lock);
587         }
588
589         RETURN(0);
590 }
591
592 static void ldlm_handle_bl_callback(struct ptlrpc_request *req,
593                                     struct ldlm_namespace *ns,
594                                     struct ldlm_request *dlm_req,
595                                     struct ldlm_lock *lock)
596 {
597         int do_ast;
598         ENTRY;
599
600         l_lock(&ns->ns_lock);
601         LDLM_DEBUG(lock, "client blocking AST callback handler START");
602
603         lock->l_flags |= LDLM_FL_CBPENDING;
604         do_ast = (!lock->l_readers && !lock->l_writers);
605
606         if (do_ast) {
607                 LDLM_DEBUG(lock, "already unused, calling "
608                            "callback (%p)", lock->l_blocking_ast);
609                 if (lock->l_blocking_ast != NULL) {
610                         l_unlock(&ns->ns_lock);
611                         lock->l_blocking_ast(lock, &dlm_req->lock_desc,
612                                              lock->l_data, LDLM_CB_BLOCKING);
613                         l_lock(&ns->ns_lock);
614                 }
615         } else {
616                 LDLM_DEBUG(lock, "Lock still has references, will be"
617                            " cancelled later");
618         }
619
620         LDLM_DEBUG(lock, "client blocking callback handler END");
621         l_unlock(&ns->ns_lock);
622         LDLM_LOCK_PUT(lock);
623         EXIT;
624 }
625
626 static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
627                                     struct ldlm_namespace *ns,
628                                     struct ldlm_request *dlm_req,
629                                     struct ldlm_lock *lock)
630 {
631         LIST_HEAD(ast_list);
632         ENTRY;
633
634         l_lock(&ns->ns_lock);
635         LDLM_DEBUG(lock, "client completion callback handler START");
636
637         /* If we receive the completion AST before the actual enqueue returned,
638          * then we might need to switch lock modes, resources, or extents. */
639         if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
640                 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
641                 LDLM_DEBUG(lock, "completion AST, new lock mode");
642         }
643         if (lock->l_resource->lr_type == LDLM_EXTENT) {
644                 memcpy(&lock->l_extent, &dlm_req->lock_desc.l_extent,
645                        sizeof(lock->l_extent));
646
647                 if ((lock->l_extent.end & ~PAGE_MASK) != ~PAGE_MASK) {
648                         /* XXX Old versions of BA OST code have a fencepost bug
649                          * which will cause them to grant a lock that's one
650                          * byte too large.  This can be safely removed after BA
651                          * ships their next release -phik (02 Apr 2003) */
652                         lock->l_extent.end--;
653                 } else if ((lock->l_extent.start & ~PAGE_MASK) ==
654                            ~PAGE_MASK) {
655                         lock->l_extent.start++;
656                 }
657         }
658
659         ldlm_resource_unlink_lock(lock);
660         if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
661                    &lock->l_resource->lr_name,
662                    sizeof(lock->l_resource->lr_name)) != 0) {
663                 ldlm_lock_change_resource(ns, lock,
664                                          dlm_req->lock_desc.l_resource.lr_name);
665                 LDLM_DEBUG(lock, "completion AST, new resource");
666         }
667         lock->l_resource->lr_tmp = &ast_list;
668         ldlm_grant_lock(lock, req, sizeof(*req));
669         lock->l_resource->lr_tmp = NULL;
670         LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
671         l_unlock(&ns->ns_lock);
672         LDLM_LOCK_PUT(lock);
673
674         ldlm_run_ast_work(&ast_list);
675
676         LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
677                           lock);
678         EXIT;
679 }
680
681 static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
682 {
683         req->rq_status = rc;
684         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
685                              &req->rq_repmsg);
686         if (rc)
687                 return rc;
688         return ptlrpc_reply(req);
689 }
690
691 static int ldlm_callback_handler(struct ptlrpc_request *req)
692 {
693         struct ldlm_namespace *ns;
694         struct ldlm_request *dlm_req;
695         struct ldlm_lock *lock;
696         ENTRY;
697
698         /* Requests arrive in sender's byte order.  The ptlrpc service
699          * handler has already checked and, if necessary, byte-swapped the
700          * incoming request message body, but I am responsible for the
701          * message buffers. */
702
703         if (req->rq_export == NULL) {
704                 struct ldlm_request *dlm_req;
705
706                 CDEBUG(D_RPCTRACE, "operation %d from nid "LPU64" with bad "
707                        "export cookie "LPX64" (ptl req %d/rep %d); this is "
708                        "normal if this node rebooted with a lock held\n",
709                        req->rq_reqmsg->opc, req->rq_connection->c_peer.peer_nid,
710                        req->rq_reqmsg->handle.cookie,
711                        req->rq_request_portal, req->rq_reply_portal);
712
713                 dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
714                                              lustre_swab_ldlm_request);
715                 if (dlm_req != NULL)
716                         CDEBUG(D_RPCTRACE, "--> lock cookie: "LPX64"\n",
717                                dlm_req->lock_handle1.cookie);
718
719                 ldlm_callback_reply(req, -ENOTCONN);
720                 RETURN(0);
721         }
722
723         if (req->rq_reqmsg->opc == LDLM_BL_CALLBACK) {
724                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
725         } else if (req->rq_reqmsg->opc == LDLM_CP_CALLBACK) {
726                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
727         } else {
728                 ldlm_callback_reply(req, -EPROTO);
729                 RETURN(0);
730         }
731
732         LASSERT(req->rq_export != NULL);
733         LASSERT(req->rq_export->exp_obd != NULL);
734         ns = req->rq_export->exp_obd->obd_namespace;
735         LASSERT(ns != NULL);
736
737         dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
738                                       lustre_swab_ldlm_request);
739         if (dlm_req == NULL) {
740                 CERROR ("can't unpack dlm_req\n");
741                 ldlm_callback_reply (req, -EPROTO);
742                 RETURN (0);
743         }
744         
745         lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle1);
746         if (!lock) {
747                 CDEBUG(D_INODE, "callback on lock "LPX64" - lock disappeared\n",
748                        dlm_req->lock_handle1.cookie);
749                 ldlm_callback_reply(req, -EINVAL);
750                 RETURN(0);
751         }
752
753         /* we want the ost thread to get this reply so that it can respond
754          * to ost requests (write cache writeback) that might be triggered
755          * in the callback */
756         ldlm_callback_reply(req, 0);
757
758         switch (req->rq_reqmsg->opc) {
759         case LDLM_BL_CALLBACK:
760                 CDEBUG(D_INODE, "blocking ast\n");
761                 ldlm_handle_bl_callback(req, ns, dlm_req, lock);
762                 break;
763         case LDLM_CP_CALLBACK:
764                 CDEBUG(D_INODE, "completion ast\n");
765                 ldlm_handle_cp_callback(req, ns, dlm_req, lock);
766                 break;
767         default:
768                 LBUG();                         /* checked above */
769         }
770
771         RETURN(0);
772 }
773
774 static int ldlm_cancel_handler(struct ptlrpc_request *req)
775 {
776         int rc;
777         ENTRY;
778
779         /* Requests arrive in sender's byte order.  The ptlrpc service
780          * handler has already checked and, if necessary, byte-swapped the
781          * incoming request message body, but I am responsible for the
782          * message buffers. */
783
784         if (req->rq_export == NULL) {
785                 struct ldlm_request *dlm_req;
786                 CERROR("operation %d with bad export (ptl req %d/rep %d)\n",
787                        req->rq_reqmsg->opc, req->rq_request_portal,
788                        req->rq_reply_portal);
789                 CERROR("--> export cookie: "LPX64"\n",
790                        req->rq_reqmsg->handle.cookie);
791                 dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
792                                              lustre_swab_ldlm_request);
793                 if (dlm_req != NULL)
794                         ldlm_lock_dump_handle(D_ERROR, &dlm_req->lock_handle1);
795                 RETURN(-ENOTCONN);
796         }
797
798         switch (req->rq_reqmsg->opc) {
799
800         /* XXX FIXME move this back to mds/handler.c, bug 249 */
801         case LDLM_CANCEL:
802                 CDEBUG(D_INODE, "cancel\n");
803                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
804                 rc = ldlm_handle_cancel(req);
805                 if (rc)
806                         break;
807                 RETURN(0);
808
809         default:
810                 CERROR("invalid opcode %d\n", req->rq_reqmsg->opc);
811                 RETURN(-EINVAL);
812         }
813
814         RETURN(0);
815 }
816
817 static int ldlm_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
818                           void *karg, void *uarg)
819 {
820         struct obd_device *obddev = class_conn2obd(conn);
821         struct ptlrpc_connection *connection;
822         struct obd_uuid uuid = { "ldlm" };
823         int err = 0;
824         ENTRY;
825
826         if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
827             _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
828                 CDEBUG(D_IOCTL, "invalid ioctl (type %d, nr %d, size %d)\n",
829                        _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
830                 RETURN(-EINVAL);
831         }
832
833         OBD_ALLOC(obddev->u.ldlm.ldlm_client,
834                   sizeof(*obddev->u.ldlm.ldlm_client));
835         connection = ptlrpc_uuid_to_connection(&uuid);
836         if (!connection)
837                 CERROR("No LDLM UUID found: assuming ldlm is local.\n");
838
839         switch (cmd) {
840         case IOC_LDLM_TEST:
841                 //err = ldlm_test(obddev, conn);
842                 err = 0;
843                 CERROR("-- NO TESTS WERE RUN done err %d\n", err);
844                 GOTO(out, err);
845         case IOC_LDLM_DUMP:
846                 ldlm_dump_all_namespaces();
847                 GOTO(out, err);
848         default:
849                 GOTO(out, err = -EINVAL);
850         }
851
852  out:
853         if (connection)
854                 ptlrpc_put_connection(connection);
855         OBD_FREE(obddev->u.ldlm.ldlm_client,
856                  sizeof(*obddev->u.ldlm.ldlm_client));
857         return err;
858 }
859
860 static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
861 {
862         struct ldlm_obd *ldlm = &obddev->u.ldlm;
863         int rc, i;
864         ENTRY;
865
866         if (ldlm_already_setup)
867                 RETURN(-EALREADY);
868
869         rc = ldlm_proc_setup(obddev);
870         if (rc != 0)
871                 RETURN(rc);
872
873 #ifdef __KERNEL__
874         inter_module_register("ldlm_cli_cancel_unused", THIS_MODULE,
875                               ldlm_cli_cancel_unused);
876         inter_module_register("ldlm_namespace_cleanup", THIS_MODULE,
877                               ldlm_namespace_cleanup);
878         inter_module_register("ldlm_replay_locks", THIS_MODULE,
879                               ldlm_replay_locks);
880
881         ldlm->ldlm_cb_service =
882                 ptlrpc_init_svc(LDLM_NEVENTS, LDLM_NBUFS, LDLM_BUFSIZE,
883                                 LDLM_MAXREQSIZE, LDLM_CB_REQUEST_PORTAL,
884                                 LDLM_CB_REPLY_PORTAL,
885                                 ldlm_callback_handler, "ldlm_cbd", obddev);
886
887         if (!ldlm->ldlm_cb_service) {
888                 CERROR("failed to start service\n");
889                 GOTO(out_proc, rc = -ENOMEM);
890         }
891
892         ldlm->ldlm_cancel_service =
893                 ptlrpc_init_svc(LDLM_NEVENTS, LDLM_NBUFS, LDLM_BUFSIZE,
894                                 LDLM_MAXREQSIZE, LDLM_CANCEL_REQUEST_PORTAL,
895                                 LDLM_CANCEL_REPLY_PORTAL,
896                                 ldlm_cancel_handler, "ldlm_canceld", obddev);
897
898         if (!ldlm->ldlm_cancel_service) {
899                 CERROR("failed to start service\n");
900                 GOTO(out_proc, rc = -ENOMEM);
901         }
902
903         for (i = 0; i < LDLM_NUM_THREADS; i++) {
904                 char name[32];
905                 sprintf(name, "ldlm_cn_%02d", i);
906                 rc = ptlrpc_start_thread(obddev, ldlm->ldlm_cancel_service,
907                                          name);
908                 if (rc) {
909                         CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
910                         LBUG();
911                         GOTO(out_thread, rc);
912                 }
913         }
914
915         for (i = 0; i < LDLM_NUM_THREADS; i++) {
916                 char name[32];
917                 sprintf(name, "ldlm_cb_%02d", i);
918                 rc = ptlrpc_start_thread(obddev, ldlm->ldlm_cb_service, name);
919                 if (rc) {
920                         CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
921                         LBUG();
922                         GOTO(out_thread, rc);
923                 }
924         }
925
926         INIT_LIST_HEAD(&expired_lock_thread.elt_expired_locks);
927         spin_lock_init(&expired_lock_thread.elt_lock);
928         expired_lock_thread.elt_state = ELT_STOPPED;
929         init_waitqueue_head(&expired_lock_thread.elt_waitq);
930
931         rc = kernel_thread(expired_lock_main, NULL, CLONE_VM | CLONE_FS);
932         if (rc < 0) {
933                 CERROR("Cannot start ldlm expired-lock thread: %d\n", rc);
934                 GOTO(out_thread, rc);
935         }
936
937         wait_event(expired_lock_thread.elt_waitq,
938                    expired_lock_thread.elt_state == ELT_READY);
939
940         INIT_LIST_HEAD(&waiting_locks_list);
941         spin_lock_init(&waiting_locks_spinlock);
942         waiting_locks_timer.function = waiting_locks_callback;
943         waiting_locks_timer.data = 0;
944         init_timer(&waiting_locks_timer);
945 #endif
946
947         ldlm_already_setup = 1;
948
949         RETURN(0);
950
951  out_thread:
952 #ifdef __KERNEL__
953         ptlrpc_stop_all_threads(ldlm->ldlm_cancel_service);
954         ptlrpc_unregister_service(ldlm->ldlm_cancel_service);
955         ptlrpc_stop_all_threads(ldlm->ldlm_cb_service);
956         ptlrpc_unregister_service(ldlm->ldlm_cb_service);
957 #endif
958  out_proc:
959         ldlm_proc_cleanup(obddev);
960
961         return rc;
962 }
963
964 static int ldlm_cleanup(struct obd_device *obddev, int force, int failover)
965 {
966         struct ldlm_obd *ldlm = &obddev->u.ldlm;
967         ENTRY;
968
969         if (!list_empty(&ldlm_namespace_list)) {
970                 CERROR("ldlm still has namespaces; clean these up first.\n");
971                 ldlm_dump_all_namespaces();
972                 RETURN(-EBUSY);
973         }
974
975 #ifdef __KERNEL__
976         if (force) {
977                 ptlrpc_put_ldlm_hooks();
978         } else if (ptlrpc_ldlm_hooks_referenced()) {
979                 CERROR("Some connections weren't cleaned up; run lconf with "
980                        "--force to forcibly unload.\n");
981                 ptlrpc_dump_connections();
982                 RETURN(-EBUSY);
983         }
984
985         ptlrpc_stop_all_threads(ldlm->ldlm_cb_service);
986         ptlrpc_unregister_service(ldlm->ldlm_cb_service);
987         ptlrpc_stop_all_threads(ldlm->ldlm_cancel_service);
988         ptlrpc_unregister_service(ldlm->ldlm_cancel_service);
989         ldlm_proc_cleanup(obddev);
990
991         expired_lock_thread.elt_state = ELT_TERMINATE;
992         wake_up(&expired_lock_thread.elt_waitq);
993         wait_event(expired_lock_thread.elt_waitq,
994                    expired_lock_thread.elt_state == ELT_STOPPED);
995
996         inter_module_unregister("ldlm_namespace_cleanup");
997         inter_module_unregister("ldlm_cli_cancel_unused");
998         inter_module_unregister("ldlm_replay_locks");
999 #endif
1000
1001         ldlm_already_setup = 0;
1002         RETURN(0);
1003 }
1004
1005 static int ldlm_connect(struct lustre_handle *conn, struct obd_device *src,
1006                         struct obd_uuid *cluuid)
1007 {
1008         return class_connect(conn, src, cluuid);
1009 }
1010
1011 struct obd_ops ldlm_obd_ops = {
1012         o_owner:       THIS_MODULE,
1013         o_iocontrol:   ldlm_iocontrol,
1014         o_setup:       ldlm_setup,
1015         o_cleanup:     ldlm_cleanup,
1016         o_connect:     ldlm_connect,
1017         o_disconnect:  class_disconnect
1018 };
1019
1020 int __init ldlm_init(void)
1021 {
1022         int rc = class_register_type(&ldlm_obd_ops, 0, OBD_LDLM_DEVICENAME);
1023         if (rc != 0)
1024                 return rc;
1025
1026         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
1027                                                sizeof(struct ldlm_resource), 0,
1028                                                SLAB_HWCACHE_ALIGN, NULL, NULL);
1029         if (ldlm_resource_slab == NULL)
1030                 return -ENOMEM;
1031
1032         ldlm_lock_slab = kmem_cache_create("ldlm_locks",
1033                                            sizeof(struct ldlm_lock), 0,
1034                                            SLAB_HWCACHE_ALIGN, NULL, NULL);
1035         if (ldlm_lock_slab == NULL) {
1036                 kmem_cache_destroy(ldlm_resource_slab);
1037                 return -ENOMEM;
1038         }
1039
1040         l_lock_init(&ldlm_handle_lock);
1041
1042         return 0;
1043 }
1044
1045 static void __exit ldlm_exit(void)
1046 {
1047         class_unregister_type(OBD_LDLM_DEVICENAME);
1048         if (kmem_cache_destroy(ldlm_resource_slab) != 0)
1049                 CERROR("couldn't free ldlm resource slab\n");
1050         if (kmem_cache_destroy(ldlm_lock_slab) != 0)
1051                 CERROR("couldn't free ldlm lock slab\n");
1052 }
1053
1054 /* ldlm_lock.c */
1055 EXPORT_SYMBOL(ldlm_lock2desc);
1056 EXPORT_SYMBOL(ldlm_register_intent);
1057 EXPORT_SYMBOL(ldlm_unregister_intent);
1058 EXPORT_SYMBOL(ldlm_lockname);
1059 EXPORT_SYMBOL(ldlm_typename);
1060 EXPORT_SYMBOL(ldlm_lock2handle);
1061 EXPORT_SYMBOL(__ldlm_handle2lock);
1062 EXPORT_SYMBOL(ldlm_lock_put);
1063 EXPORT_SYMBOL(ldlm_lock_match);
1064 EXPORT_SYMBOL(ldlm_lock_cancel);
1065 EXPORT_SYMBOL(ldlm_lock_addref);
1066 EXPORT_SYMBOL(ldlm_lock_decref);
1067 EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
1068 EXPORT_SYMBOL(ldlm_lock_change_resource);
1069 EXPORT_SYMBOL(ldlm_lock_set_data);
1070 EXPORT_SYMBOL(ldlm_it2str);
1071 EXPORT_SYMBOL(ldlm_lock_dump);
1072 EXPORT_SYMBOL(ldlm_lock_dump_handle);
1073 EXPORT_SYMBOL(ldlm_cancel_locks_for_export);
1074 EXPORT_SYMBOL(ldlm_reprocess_all_ns);
1075
1076 /* ldlm_request.c */
1077 EXPORT_SYMBOL(ldlm_completion_ast);
1078 EXPORT_SYMBOL(ldlm_expired_completion_wait);
1079 EXPORT_SYMBOL(ldlm_cli_convert);
1080 EXPORT_SYMBOL(ldlm_cli_enqueue);
1081 EXPORT_SYMBOL(ldlm_cli_cancel);
1082 EXPORT_SYMBOL(ldlm_cli_cancel_unused);
1083 EXPORT_SYMBOL(ldlm_replay_locks);
1084 EXPORT_SYMBOL(ldlm_resource_foreach);
1085 EXPORT_SYMBOL(ldlm_namespace_foreach);
1086 EXPORT_SYMBOL(ldlm_namespace_foreach_res);
1087
1088 /* ldlm_lockd.c */
1089 EXPORT_SYMBOL(ldlm_server_blocking_ast);
1090 EXPORT_SYMBOL(ldlm_server_completion_ast);
1091 EXPORT_SYMBOL(ldlm_handle_enqueue);
1092 EXPORT_SYMBOL(ldlm_handle_cancel);
1093 EXPORT_SYMBOL(ldlm_handle_convert);
1094 EXPORT_SYMBOL(ldlm_del_waiting_lock);
1095
1096 #if 0
1097 /* ldlm_test.c */
1098 EXPORT_SYMBOL(ldlm_test);
1099 EXPORT_SYMBOL(ldlm_regression_start);
1100 EXPORT_SYMBOL(ldlm_regression_stop);
1101 #endif
1102
1103 /* ldlm_resource.c */
1104 EXPORT_SYMBOL(ldlm_namespace_new);
1105 EXPORT_SYMBOL(ldlm_namespace_cleanup);
1106 EXPORT_SYMBOL(ldlm_namespace_free);
1107 EXPORT_SYMBOL(ldlm_namespace_dump);
1108
1109 /* l_lock.c */
1110 EXPORT_SYMBOL(l_lock);
1111 EXPORT_SYMBOL(l_unlock);
1112
1113 /* ldlm_lib.c */
1114 EXPORT_SYMBOL(client_import_connect);
1115 EXPORT_SYMBOL(client_import_disconnect);
1116 EXPORT_SYMBOL(target_abort_recovery);
1117 EXPORT_SYMBOL(target_handle_connect);
1118 EXPORT_SYMBOL(target_cancel_recovery_timer);
1119 EXPORT_SYMBOL(target_send_reply);
1120 EXPORT_SYMBOL(target_queue_recovery_request);
1121 EXPORT_SYMBOL(target_handle_ping);
1122 EXPORT_SYMBOL(target_handle_disconnect);
1123 EXPORT_SYMBOL(target_queue_final_reply);
1124
1125 #ifdef __KERNEL__
1126 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1127 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
1128 MODULE_LICENSE("GPL");
1129
1130 module_init(ldlm_init);
1131 module_exit(ldlm_exit);
1132 #endif