Whamcloud - gitweb
merge b_devel into HEAD, which will become 0.7.3
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #define EXPORT_SYMTAB
25 #define DEBUG_SUBSYSTEM S_LDLM
26
27 #ifdef __KERNEL__
28 # include <linux/module.h>
29 # include <linux/slab.h>
30 # include <linux/init.h>
31 # include <linux/wait.h>
32 #else
33 # include <liblustre.h>
34 #endif
35
36 #include <linux/lustre_dlm.h>
37 #include <linux/obd_class.h>
38 extern kmem_cache_t *ldlm_resource_slab;
39 extern kmem_cache_t *ldlm_lock_slab;
40 extern struct lustre_lock ldlm_handle_lock;
41 extern struct list_head ldlm_namespace_list;
42 extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
43 extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
44
45 static int ldlm_already_setup = 0;
46
47 #ifdef __KERNEL__
48
49 inline unsigned long round_timeout(unsigned long timeout)
50 {
51         return ((timeout / HZ) + 1) * HZ;
52 }
53
54 /* XXX should this be per-ldlm? */
55 static struct list_head waiting_locks_list;
56 static spinlock_t waiting_locks_spinlock;
57 static struct timer_list waiting_locks_timer;
58
59 static struct expired_lock_thread {
60         wait_queue_head_t         elt_waitq;
61         int                       elt_state;
62         struct list_head          elt_expired_locks;
63         spinlock_t                elt_lock;
64 } expired_lock_thread;
65
66 #define ELT_STOPPED   0
67 #define ELT_READY     1
68 #define ELT_TERMINATE 2
69
70 static inline int have_expired_locks(void)
71 {
72         int need_to_run;
73
74         spin_lock_bh(&expired_lock_thread.elt_lock);
75         need_to_run = !list_empty(&expired_lock_thread.elt_expired_locks);
76         spin_unlock_bh(&expired_lock_thread.elt_lock);
77
78         RETURN(need_to_run);
79 }
80
81 static int expired_lock_main(void *arg)
82 {
83         struct list_head *expired = &expired_lock_thread.elt_expired_locks;
84         struct l_wait_info lwi = { 0 };
85         unsigned long flags;
86
87         ENTRY;
88         lock_kernel();
89         kportal_daemonize("ldlm_elt");
90         
91         SIGNAL_MASK_LOCK(current, flags);
92         sigfillset(&current->blocked);
93         RECALC_SIGPENDING;
94         SIGNAL_MASK_UNLOCK(current, flags);
95         
96         unlock_kernel();
97         
98         expired_lock_thread.elt_state = ELT_READY;
99         wake_up(&expired_lock_thread.elt_waitq);
100         
101         while (1) {
102                 l_wait_event(expired_lock_thread.elt_waitq,
103                              have_expired_locks() ||
104                              expired_lock_thread.elt_state == ELT_TERMINATE,
105                              &lwi);
106
107                 spin_lock_bh(&expired_lock_thread.elt_lock);
108                 while (!list_empty(expired)) {
109                         struct ldlm_lock *lock = list_entry(expired->next,
110                                                             struct ldlm_lock,
111                                                             l_pending_chain);
112                         spin_unlock_bh(&expired_lock_thread.elt_lock);
113                         
114                         ptlrpc_fail_export(lock->l_export);
115
116                         spin_lock_bh(&expired_lock_thread.elt_lock);
117                 }
118                 spin_unlock_bh(&expired_lock_thread.elt_lock);
119
120                 if (expired_lock_thread.elt_state == ELT_TERMINATE)
121                         break;
122         }
123
124         expired_lock_thread.elt_state = ELT_STOPPED;
125         wake_up(&expired_lock_thread.elt_waitq);
126         RETURN(0);
127 }
128
129 static void waiting_locks_callback(unsigned long unused)
130 {
131         struct ldlm_lock *lock;
132
133         spin_lock_bh(&waiting_locks_spinlock);
134         while (!list_empty(&waiting_locks_list)) {
135                 lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
136                                   l_pending_chain);
137
138                 if (lock->l_callback_timeout > jiffies)
139                         break;
140
141                 LDLM_ERROR(lock, "lock callback timer expired: evicting client "
142                            "%s@%s nid "LPU64,
143                            lock->l_export->exp_client_uuid.uuid,
144                            lock->l_export->exp_connection->c_remote_uuid.uuid,
145                            lock->l_export->exp_connection->c_peer.peer_nid);
146
147                 spin_lock_bh(&expired_lock_thread.elt_lock);
148                 list_del(&lock->l_pending_chain);
149                 list_add(&lock->l_pending_chain,
150                          &expired_lock_thread.elt_expired_locks);
151                 spin_unlock_bh(&expired_lock_thread.elt_lock);
152                 wake_up(&expired_lock_thread.elt_waitq);
153         }
154
155         spin_unlock_bh(&waiting_locks_spinlock);
156 }
157
158 /*
159  * Indicate that we're waiting for a client to call us back cancelling a given
160  * lock.  We add it to the pending-callback chain, and schedule the lock-timeout
161  * timer to fire appropriately.  (We round up to the next second, to avoid
162  * floods of timer firings during periods of high lock contention and traffic).
163  */
164 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
165 {
166         unsigned long timeout_rounded;
167
168         LDLM_DEBUG(lock, "adding to wait list");
169         LASSERT(list_empty(&lock->l_pending_chain));
170
171         spin_lock_bh(&waiting_locks_spinlock);
172         lock->l_callback_timeout = jiffies + (obd_timeout * HZ / 2);
173
174         timeout_rounded = round_timeout(lock->l_callback_timeout);
175
176         if (timeout_rounded < waiting_locks_timer.expires ||
177             !timer_pending(&waiting_locks_timer)) {
178                 mod_timer(&waiting_locks_timer, timeout_rounded);
179         }
180         list_add_tail(&lock->l_pending_chain, &waiting_locks_list); /* FIFO */
181         spin_unlock_bh(&waiting_locks_spinlock);
182         /* We drop this ref when we get removed from the list. */
183         class_export_get(lock->l_export);
184         return 1;
185 }
186
187 /*
188  * Remove a lock from the pending list, likely because it had its cancellation
189  * callback arrive without incident.  This adjusts the lock-timeout timer if
190  * needed.  Returns 0 if the lock wasn't pending after all, 1 if it was.
191  */
192 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
193 {
194         struct list_head *list_next;
195
196         if (lock->l_export == NULL) {
197                 /* We don't have a "waiting locks list" on clients. */
198                 LDLM_DEBUG(lock, "client lock: no-op");
199                 return 0;
200         }
201
202         spin_lock_bh(&waiting_locks_spinlock);
203
204         if (list_empty(&lock->l_pending_chain)) {
205                 spin_unlock_bh(&waiting_locks_spinlock);
206                 LDLM_DEBUG(lock, "wasn't waiting");
207                 return 0;
208         }
209
210         list_next = lock->l_pending_chain.next;
211         if (lock->l_pending_chain.prev == &waiting_locks_list) {
212                 /* Removing the head of the list, adjust timer. */
213                 if (list_next == &waiting_locks_list) {
214                         /* No more, just cancel. */
215                         del_timer(&waiting_locks_timer);
216                 } else {
217                         struct ldlm_lock *next;
218                         next = list_entry(list_next, struct ldlm_lock,
219                                           l_pending_chain);
220                         mod_timer(&waiting_locks_timer,
221                                   round_timeout(next->l_callback_timeout));
222                 }
223         }
224         list_del_init(&lock->l_pending_chain);
225         spin_unlock_bh(&waiting_locks_spinlock);
226         /* We got this ref when we were added to the list. */
227         class_export_put(lock->l_export);
228         LDLM_DEBUG(lock, "removed");
229         return 1;
230 }
231
232 #else /* !__KERNEL__ */
233
234 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
235 {
236         RETURN(1);
237 }
238
239 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
240 {
241         RETURN(0);
242 }
243
244 #endif /* __KERNEL__ */
245
246 static void ldlm_failed_ast(struct ldlm_lock *lock, int rc, char *ast_type)
247 {
248         CERROR("%s AST failed (%d) for res "LPU64"/"LPU64
249                ", mode %s: evicting client %s@%s NID "LPU64"\n",
250                ast_type, rc,
251                lock->l_resource->lr_name.name[0],
252                lock->l_resource->lr_name.name[1],
253                ldlm_lockname[lock->l_granted_mode],
254                lock->l_export->exp_client_uuid.uuid,
255                lock->l_export->exp_connection->c_remote_uuid.uuid,
256                lock->l_export->exp_connection->c_peer.peer_nid);
257         ptlrpc_fail_export(lock->l_export);
258 }
259
260 int ldlm_server_blocking_ast(struct ldlm_lock *lock,
261                              struct ldlm_lock_desc *desc,
262                              void *data, int flag)
263 {
264         struct ldlm_request *body;
265         struct ptlrpc_request *req;
266         int rc = 0, size = sizeof(*body);
267         ENTRY;
268
269         if (flag == LDLM_CB_CANCELING) {
270                 /* Don't need to do anything here. */
271                 RETURN(0);
272         }
273
274         LASSERT(lock);
275
276         l_lock(&lock->l_resource->lr_namespace->ns_lock);
277         /* XXX This is necessary because, with the lock re-tasking, we actually
278          * _can_ get called in here twice.  (bug 830) */
279         if (!list_empty(&lock->l_pending_chain)) {
280                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
281                 RETURN(0);
282         }
283
284         if (lock->l_destroyed) {
285                 /* What's the point? */
286                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
287                 RETURN(0);
288         }
289
290 #if 0
291         if (LTIME_S(CURRENT_TIME) - lock->l_export->exp_last_request_time > 30){
292                 ldlm_failed_ast(lock, -ETIMEDOUT, "Not-attempted blocking");
293                 RETURN(-ETIMEDOUT);
294         }
295 #endif
296
297         req = ptlrpc_prep_req(lock->l_export->exp_ldlm_data.led_import,
298                               LDLM_BL_CALLBACK, 1, &size, NULL);
299         if (!req)
300                 RETURN(-ENOMEM);
301
302         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
303         memcpy(&body->lock_handle1, &lock->l_remote_handle,
304                sizeof(body->lock_handle1));
305         memcpy(&body->lock_desc, desc, sizeof(*desc));
306
307         LDLM_DEBUG(lock, "server preparing blocking AST");
308         req->rq_replen = lustre_msg_size(0, NULL);
309
310         ldlm_add_waiting_lock(lock);
311         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
312
313         req->rq_level = LUSTRE_CONN_RECOVER;
314         req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
315         rc = ptlrpc_queue_wait(req);
316         if (rc == -ETIMEDOUT || rc == -EINTR) {
317                 ldlm_del_waiting_lock(lock);
318                 ldlm_failed_ast(lock, rc, "blocking");
319         } else if (rc) {
320                 if (rc == -EINVAL)
321                         CDEBUG(D_DLMTRACE, "client (nid "LPU64") returned %d "
322                                "from blocking AST for lock %p--normal race\n",
323                                req->rq_connection->c_peer.peer_nid,
324                                req->rq_repmsg->status, lock);
325                 else if (rc == -ENOTCONN)
326                         CDEBUG(D_DLMTRACE, "client (nid "LPU64") returned %d "
327                                "from blocking AST for lock %p--this client was "
328                                "probably rebooted while it held a lock, nothing"
329                                " serious\n",req->rq_connection->c_peer.peer_nid,
330                                req->rq_repmsg->status, lock);
331                 else
332                         CDEBUG(D_ERROR, "client (nid "LPU64") returned %d "
333                                "from blocking AST for lock %p\n",
334                                req->rq_connection->c_peer.peer_nid,
335                                req->rq_repmsg->status, lock);
336                 LDLM_DEBUG(lock, "client returned error %d from blocking AST",
337                            req->rq_status);
338                 ldlm_lock_cancel(lock);
339                 /* Server-side AST functions are called from ldlm_reprocess_all,
340                  * which needs to be told to please restart its reprocessing. */
341                 rc = -ERESTART;
342         }
343
344         ptlrpc_req_finished(req);
345
346         RETURN(rc);
347 }
348
349 /* XXX copied from ptlrpc/service.c */
350 static long timeval_sub(struct timeval *large, struct timeval *small)
351 {
352         return (large->tv_sec - small->tv_sec) * 1000000 +
353                 (large->tv_usec - small->tv_usec);
354 }
355
356 int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
357 {
358         struct ldlm_request *body;
359         struct ptlrpc_request *req;
360         struct timeval granted_time;
361         long total_enqueue_wait;
362         int rc = 0, size = sizeof(*body);
363         ENTRY;
364
365         if (lock == NULL) {
366                 LBUG();
367                 RETURN(-EINVAL);
368         }
369
370         do_gettimeofday(&granted_time);
371         total_enqueue_wait = timeval_sub(&granted_time, &lock->l_enqueued_time);
372
373         if (total_enqueue_wait / 1000000 > obd_timeout)
374                 LDLM_ERROR(lock, "enqueue wait took %ldus", total_enqueue_wait);
375
376         req = ptlrpc_prep_req(lock->l_export->exp_ldlm_data.led_import,
377                               LDLM_CP_CALLBACK, 1, &size, NULL);
378         if (!req)
379                 RETURN(-ENOMEM);
380
381         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
382         memcpy(&body->lock_handle1, &lock->l_remote_handle,
383                sizeof(body->lock_handle1));
384         body->lock_flags = flags;
385         ldlm_lock2desc(lock, &body->lock_desc);
386
387         LDLM_DEBUG(lock, "server preparing completion AST (after %ldus wait)",
388                    total_enqueue_wait);
389         req->rq_replen = lustre_msg_size(0, NULL);
390
391         req->rq_level = LUSTRE_CONN_RECOVER;
392         req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
393         rc = ptlrpc_queue_wait(req);
394         if (rc == -ETIMEDOUT || rc == -EINTR) {
395                 ldlm_del_waiting_lock(lock);
396                 ldlm_failed_ast(lock, rc, "completion");
397         } else if (rc) {
398                 CERROR("client returned %d from completion AST for lock %p\n",
399                        req->rq_status, lock);
400                 LDLM_DEBUG(lock, "client returned error %d from completion AST",
401                            req->rq_status);
402                 ldlm_lock_cancel(lock);
403                 /* Server-side AST functions are called from ldlm_reprocess_all,
404                  * which needs to be told to please restart its reprocessing. */
405                 rc = -ERESTART;
406         }
407         ptlrpc_req_finished(req);
408
409         RETURN(rc);
410 }
411
412 int ldlm_handle_enqueue(struct ptlrpc_request *req,
413                         ldlm_completion_callback completion_callback,
414                         ldlm_blocking_callback blocking_callback)
415 {
416         struct obd_device *obddev = req->rq_export->exp_obd;
417         struct ldlm_reply *dlm_rep;
418         struct ldlm_request *dlm_req;
419         int rc, size = sizeof(*dlm_rep), cookielen = 0;
420         __u32 flags;
421         ldlm_error_t err;
422         struct ldlm_lock *lock = NULL;
423         void *cookie = NULL;
424         ENTRY;
425
426         LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
427
428         dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
429                                       lustre_swab_ldlm_request);
430         if (dlm_req == NULL) {
431                 CERROR ("Can't unpack dlm_req\n");
432                 RETURN (-EFAULT);
433         }
434         
435         flags = dlm_req->lock_flags;
436         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN &&
437             (flags & LDLM_FL_HAS_INTENT)) {
438                 /* In this case, the reply buffer is allocated deep in
439                  * local_lock_enqueue by the policy function. */
440                 cookie = req;
441                 cookielen = sizeof(*req);
442         } else {
443                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
444                                      &req->rq_repmsg);
445                 if (rc) {
446                         CERROR("out of memory\n");
447                         RETURN(-ENOMEM);
448                 }
449                 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) {
450                         cookie = &dlm_req->lock_desc.l_extent;
451                         cookielen = sizeof(struct ldlm_extent);
452                 }
453         }
454
455         /* The lock's callback data might be set in the policy function */
456         lock = ldlm_lock_create(obddev->obd_namespace,
457                                 &dlm_req->lock_handle2,
458                                 dlm_req->lock_desc.l_resource.lr_name,
459                                 dlm_req->lock_desc.l_resource.lr_type,
460                                 dlm_req->lock_desc.l_req_mode,
461                                 blocking_callback, NULL);
462         if (!lock)
463                 GOTO(out, err = -ENOMEM);
464
465         do_gettimeofday(&lock->l_enqueued_time);
466         memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
467                sizeof(lock->l_remote_handle));
468         LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
469
470         LASSERT(req->rq_export);
471         lock->l_export = req->rq_export;
472         l_lock(&lock->l_resource->lr_namespace->ns_lock);
473         list_add(&lock->l_export_chain,
474                  &lock->l_export->exp_ldlm_data.led_held_locks);
475         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
476
477         err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, cookielen,
478                                 &flags, completion_callback);
479         if (err)
480                 GOTO(out, err);
481
482         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*dlm_rep));
483         dlm_rep->lock_flags = flags;
484
485         ldlm_lock2handle(lock, &dlm_rep->lock_handle);
486         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
487                 memcpy(&dlm_rep->lock_extent, &lock->l_extent,
488                        sizeof(lock->l_extent));
489         if (dlm_rep->lock_flags & LDLM_FL_LOCK_CHANGED) {
490                 memcpy(&dlm_rep->lock_resource_name, &lock->l_resource->lr_name,
491                        sizeof(dlm_rep->lock_resource_name));
492                 dlm_rep->lock_mode = lock->l_req_mode;
493         }
494
495         EXIT;
496  out:
497         if (lock)
498                 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
499                            "(err=%d)", err);
500         req->rq_status = err;
501
502         /* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this
503          * ldlm_reprocess_all.  If this moves, revisit that code. -phil */
504         if (lock) {
505                 if (!err)
506                         ldlm_reprocess_all(lock->l_resource);
507                 LDLM_LOCK_PUT(lock);
508         }
509         LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p)", lock);
510
511         return 0;
512 }
513
514 int ldlm_handle_convert(struct ptlrpc_request *req)
515 {
516         struct ldlm_request *dlm_req;
517         struct ldlm_reply *dlm_rep;
518         struct ldlm_lock *lock;
519         int rc, size = sizeof(*dlm_rep);
520         ENTRY;
521
522         dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
523                                       lustre_swab_ldlm_request);
524         if (dlm_req == NULL) {
525                 CERROR ("Can't unpack dlm_req\n");
526                 RETURN (-EFAULT);
527         }
528         
529         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
530         if (rc) {
531                 CERROR("out of memory\n");
532                 RETURN(-ENOMEM);
533         }
534         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*dlm_rep));
535         dlm_rep->lock_flags = dlm_req->lock_flags;
536
537         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
538         if (!lock) {
539                 req->rq_status = EINVAL;
540         } else {
541                 LDLM_DEBUG(lock, "server-side convert handler START");
542                 ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
543                                   &dlm_rep->lock_flags);
544                 if (ldlm_del_waiting_lock(lock))
545                         CDEBUG(D_DLMTRACE, "converted waiting lock %p\n", lock);
546                 req->rq_status = 0;
547         }
548
549         if (lock) {
550                 ldlm_reprocess_all(lock->l_resource);
551                 LDLM_DEBUG(lock, "server-side convert handler END");
552                 LDLM_LOCK_PUT(lock);
553         } else
554                 LDLM_DEBUG_NOLOCK("server-side convert handler END");
555
556         RETURN(0);
557 }
558
559 int ldlm_handle_cancel(struct ptlrpc_request *req)
560 {
561         struct ldlm_request *dlm_req;
562         struct ldlm_lock *lock;
563         int rc;
564         ENTRY;
565
566         dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
567                                       lustre_swab_ldlm_request);
568         if (dlm_req == NULL) {
569                 CERROR("bad request buffer for cancel\n");
570                 RETURN(-EFAULT);
571         }
572
573         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
574         if (rc) {
575                 CERROR("out of memory\n");
576                 RETURN(-ENOMEM);
577         }
578
579         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
580         if (!lock) {
581                 CERROR("received cancel for unknown lock cookie "LPX64
582                        " from nid "LPU64"\n", dlm_req->lock_handle1.cookie,
583                        req->rq_connection->c_peer.peer_nid);
584                 LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock "
585                                   "(cookie "LPU64")",
586                                   dlm_req->lock_handle1.cookie);
587                 req->rq_status = ESTALE;
588         } else {
589                 LDLM_DEBUG(lock, "server-side cancel handler START");
590                 ldlm_lock_cancel(lock);
591                 if (ldlm_del_waiting_lock(lock))
592                         CDEBUG(D_DLMTRACE, "cancelled waiting lock %p\n", lock);
593                 req->rq_status = 0;
594         }
595
596         if (ptlrpc_reply(req) != 0)
597                 LBUG();
598
599         if (lock) {
600                 ldlm_reprocess_all(lock->l_resource);
601                 LDLM_DEBUG(lock, "server-side cancel handler END");
602                 LDLM_LOCK_PUT(lock);
603         }
604
605         RETURN(0);
606 }
607
608 static void ldlm_handle_bl_callback(struct ptlrpc_request *req,
609                                     struct ldlm_namespace *ns,
610                                     struct ldlm_request *dlm_req,
611                                     struct ldlm_lock *lock)
612 {
613         int do_ast;
614         ENTRY;
615
616         l_lock(&ns->ns_lock);
617         LDLM_DEBUG(lock, "client blocking AST callback handler START");
618
619         lock->l_flags |= LDLM_FL_CBPENDING;
620         do_ast = (!lock->l_readers && !lock->l_writers);
621
622         if (do_ast) {
623                 LDLM_DEBUG(lock, "already unused, calling "
624                            "callback (%p)", lock->l_blocking_ast);
625                 if (lock->l_blocking_ast != NULL) {
626                         l_unlock(&ns->ns_lock);
627                         lock->l_blocking_ast(lock, &dlm_req->lock_desc,
628                                              lock->l_data, LDLM_CB_BLOCKING);
629                         l_lock(&ns->ns_lock);
630                 }
631         } else {
632                 LDLM_DEBUG(lock, "Lock still has references, will be"
633                            " cancelled later");
634         }
635
636         LDLM_DEBUG(lock, "client blocking callback handler END");
637         l_unlock(&ns->ns_lock);
638         LDLM_LOCK_PUT(lock);
639         EXIT;
640 }
641
642 static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
643                                     struct ldlm_namespace *ns,
644                                     struct ldlm_request *dlm_req,
645                                     struct ldlm_lock *lock)
646 {
647         LIST_HEAD(ast_list);
648         ENTRY;
649
650         l_lock(&ns->ns_lock);
651         LDLM_DEBUG(lock, "client completion callback handler START");
652
653         /* If we receive the completion AST before the actual enqueue returned,
654          * then we might need to switch lock modes, resources, or extents. */
655         if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
656                 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
657                 LDLM_DEBUG(lock, "completion AST, new lock mode");
658         }
659         if (lock->l_resource->lr_type == LDLM_EXTENT)
660                 memcpy(&lock->l_extent, &dlm_req->lock_desc.l_extent,
661                        sizeof(lock->l_extent));
662
663         ldlm_resource_unlink_lock(lock);
664         if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
665                    &lock->l_resource->lr_name,
666                    sizeof(lock->l_resource->lr_name)) != 0) {
667                 ldlm_lock_change_resource(ns, lock,
668                                          dlm_req->lock_desc.l_resource.lr_name);
669                 LDLM_DEBUG(lock, "completion AST, new resource");
670         }
671         lock->l_resource->lr_tmp = &ast_list;
672         ldlm_grant_lock(lock, req, sizeof(*req));
673         lock->l_resource->lr_tmp = NULL;
674         LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
675         l_unlock(&ns->ns_lock);
676         LDLM_LOCK_PUT(lock);
677
678         ldlm_run_ast_work(&ast_list);
679
680         LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
681                           lock);
682         EXIT;
683 }
684
685 static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
686 {
687         req->rq_status = rc;
688         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
689                              &req->rq_repmsg);
690         if (rc)
691                 return rc;
692         return ptlrpc_reply(req);
693 }
694
695 static int ldlm_callback_handler(struct ptlrpc_request *req)
696 {
697         struct ldlm_namespace *ns;
698         struct ldlm_request *dlm_req;
699         struct ldlm_lock *lock;
700         ENTRY;
701
702         /* Requests arrive in sender's byte order.  The ptlrpc service
703          * handler has already checked and, if necessary, byte-swapped the
704          * incoming request message body, but I am responsible for the
705          * message buffers. */
706
707         if (req->rq_export == NULL) {
708                 struct ldlm_request *dlm_req;
709
710                 CDEBUG(D_RPCTRACE, "operation %d from nid "LPU64" with bad "
711                        "export cookie "LPX64" (ptl req %d/rep %d); this is "
712                        "normal if this node rebooted with a lock held\n",
713                        req->rq_reqmsg->opc, req->rq_connection->c_peer.peer_nid,
714                        req->rq_reqmsg->handle.cookie,
715                        req->rq_request_portal, req->rq_reply_portal);
716
717                 dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
718                                              lustre_swab_ldlm_request);
719                 if (dlm_req != NULL)
720                         CDEBUG(D_RPCTRACE, "--> lock cookie: "LPX64"\n",
721                                dlm_req->lock_handle1.cookie);
722
723                 ldlm_callback_reply(req, -ENOTCONN);
724                 RETURN(0);
725         }
726
727         if (req->rq_reqmsg->opc == LDLM_BL_CALLBACK) {
728                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
729         } else if (req->rq_reqmsg->opc == LDLM_CP_CALLBACK) {
730                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
731         } else {
732                 ldlm_callback_reply(req, -EPROTO);
733                 RETURN(0);
734         }
735
736         LASSERT(req->rq_export != NULL);
737         LASSERT(req->rq_export->exp_obd != NULL);
738         ns = req->rq_export->exp_obd->obd_namespace;
739         LASSERT(ns != NULL);
740
741         dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
742                                       lustre_swab_ldlm_request);
743         if (dlm_req == NULL) {
744                 CERROR ("can't unpack dlm_req\n");
745                 ldlm_callback_reply (req, -EPROTO);
746                 RETURN (0);
747         }
748         
749         lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle1);
750         if (!lock) {
751                 CDEBUG(D_INODE, "callback on lock "LPX64" - lock disappeared\n",
752                        dlm_req->lock_handle1.cookie);
753                 ldlm_callback_reply(req, -EINVAL);
754                 RETURN(0);
755         }
756
757         /* we want the ost thread to get this reply so that it can respond
758          * to ost requests (write cache writeback) that might be triggered
759          * in the callback */
760         ldlm_callback_reply(req, 0);
761
762         switch (req->rq_reqmsg->opc) {
763         case LDLM_BL_CALLBACK:
764                 CDEBUG(D_INODE, "blocking ast\n");
765                 ldlm_handle_bl_callback(req, ns, dlm_req, lock);
766                 break;
767         case LDLM_CP_CALLBACK:
768                 CDEBUG(D_INODE, "completion ast\n");
769                 ldlm_handle_cp_callback(req, ns, dlm_req, lock);
770                 break;
771         default:
772                 LBUG();                         /* checked above */
773         }
774
775         RETURN(0);
776 }
777
778 static int ldlm_cancel_handler(struct ptlrpc_request *req)
779 {
780         int rc;
781         ENTRY;
782
783         /* Requests arrive in sender's byte order.  The ptlrpc service
784          * handler has already checked and, if necessary, byte-swapped the
785          * incoming request message body, but I am responsible for the
786          * message buffers. */
787
788         if (req->rq_export == NULL) {
789                 struct ldlm_request *dlm_req;
790                 CERROR("operation %d with bad export (ptl req %d/rep %d)\n",
791                        req->rq_reqmsg->opc, req->rq_request_portal,
792                        req->rq_reply_portal);
793                 CERROR("--> export cookie: "LPX64"\n",
794                        req->rq_reqmsg->handle.cookie);
795                 dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
796                                              lustre_swab_ldlm_request);
797                 if (dlm_req != NULL)
798                         ldlm_lock_dump_handle(D_ERROR, &dlm_req->lock_handle1);
799                 RETURN(-ENOTCONN);
800         }
801
802         switch (req->rq_reqmsg->opc) {
803
804         /* XXX FIXME move this back to mds/handler.c, bug 249 */
805         case LDLM_CANCEL:
806                 CDEBUG(D_INODE, "cancel\n");
807                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
808                 rc = ldlm_handle_cancel(req);
809                 if (rc)
810                         break;
811                 RETURN(0);
812
813         default:
814                 CERROR("invalid opcode %d\n", req->rq_reqmsg->opc);
815                 RETURN(-EINVAL);
816         }
817
818         RETURN(0);
819 }
820
821 static int ldlm_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
822                           void *karg, void *uarg)
823 {
824         struct obd_device *obddev = class_conn2obd(conn);
825         struct ptlrpc_connection *connection;
826         struct obd_uuid uuid = { "ldlm" };
827         int err = 0;
828         ENTRY;
829
830         if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
831             _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
832                 CDEBUG(D_IOCTL, "invalid ioctl (type %d, nr %d, size %d)\n",
833                        _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
834                 RETURN(-EINVAL);
835         }
836
837         OBD_ALLOC(obddev->u.ldlm.ldlm_client,
838                   sizeof(*obddev->u.ldlm.ldlm_client));
839         connection = ptlrpc_uuid_to_connection(&uuid);
840         if (!connection)
841                 CERROR("No LDLM UUID found: assuming ldlm is local.\n");
842
843         switch (cmd) {
844         case IOC_LDLM_TEST:
845                 //err = ldlm_test(obddev, conn);
846                 err = 0;
847                 CERROR("-- NO TESTS WERE RUN done err %d\n", err);
848                 GOTO(out, err);
849         case IOC_LDLM_DUMP:
850                 ldlm_dump_all_namespaces();
851                 GOTO(out, err);
852         default:
853                 GOTO(out, err = -EINVAL);
854         }
855
856  out:
857         if (connection)
858                 ptlrpc_put_connection(connection);
859         OBD_FREE(obddev->u.ldlm.ldlm_client,
860                  sizeof(*obddev->u.ldlm.ldlm_client));
861         return err;
862 }
863
864 static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
865 {
866         struct ldlm_obd *ldlm = &obddev->u.ldlm;
867         int rc, i;
868         ENTRY;
869
870         if (ldlm_already_setup)
871                 RETURN(-EALREADY);
872
873         rc = ldlm_proc_setup(obddev);
874         if (rc != 0)
875                 RETURN(rc);
876
877 #ifdef __KERNEL__
878         inter_module_register("ldlm_cli_cancel_unused", THIS_MODULE,
879                               ldlm_cli_cancel_unused);
880         inter_module_register("ldlm_namespace_cleanup", THIS_MODULE,
881                               ldlm_namespace_cleanup);
882         inter_module_register("ldlm_replay_locks", THIS_MODULE,
883                               ldlm_replay_locks);
884
885         ldlm->ldlm_cb_service =
886                 ptlrpc_init_svc(LDLM_NEVENTS, LDLM_NBUFS, LDLM_BUFSIZE,
887                                 LDLM_MAXREQSIZE, LDLM_CB_REQUEST_PORTAL,
888                                 LDLM_CB_REPLY_PORTAL,
889                                 ldlm_callback_handler, "ldlm_cbd", obddev);
890
891         if (!ldlm->ldlm_cb_service) {
892                 CERROR("failed to start service\n");
893                 GOTO(out_proc, rc = -ENOMEM);
894         }
895
896         ldlm->ldlm_cancel_service =
897                 ptlrpc_init_svc(LDLM_NEVENTS, LDLM_NBUFS, LDLM_BUFSIZE,
898                                 LDLM_MAXREQSIZE, LDLM_CANCEL_REQUEST_PORTAL,
899                                 LDLM_CANCEL_REPLY_PORTAL,
900                                 ldlm_cancel_handler, "ldlm_canceld", obddev);
901
902         if (!ldlm->ldlm_cancel_service) {
903                 CERROR("failed to start service\n");
904                 GOTO(out_proc, rc = -ENOMEM);
905         }
906
907         for (i = 0; i < LDLM_NUM_THREADS; i++) {
908                 char name[32];
909                 sprintf(name, "ldlm_cn_%02d", i);
910                 rc = ptlrpc_start_thread(obddev, ldlm->ldlm_cancel_service,
911                                          name);
912                 if (rc) {
913                         CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
914                         LBUG();
915                         GOTO(out_thread, rc);
916                 }
917         }
918
919         for (i = 0; i < LDLM_NUM_THREADS; i++) {
920                 char name[32];
921                 sprintf(name, "ldlm_cb_%02d", i);
922                 rc = ptlrpc_start_thread(obddev, ldlm->ldlm_cb_service, name);
923                 if (rc) {
924                         CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
925                         LBUG();
926                         GOTO(out_thread, rc);
927                 }
928         }
929
930         INIT_LIST_HEAD(&expired_lock_thread.elt_expired_locks);
931         spin_lock_init(&expired_lock_thread.elt_lock);
932         expired_lock_thread.elt_state = ELT_STOPPED;
933         init_waitqueue_head(&expired_lock_thread.elt_waitq);
934
935         rc = kernel_thread(expired_lock_main, NULL, CLONE_VM | CLONE_FS);
936         if (rc < 0) {
937                 CERROR("Cannot start ldlm expired-lock thread: %d\n", rc);
938                 GOTO(out_thread, rc);
939         }
940
941         wait_event(expired_lock_thread.elt_waitq,
942                    expired_lock_thread.elt_state == ELT_READY);
943
944         INIT_LIST_HEAD(&waiting_locks_list);
945         spin_lock_init(&waiting_locks_spinlock);
946         waiting_locks_timer.function = waiting_locks_callback;
947         waiting_locks_timer.data = 0;
948         init_timer(&waiting_locks_timer);
949 #endif
950
951         ldlm_already_setup = 1;
952
953         RETURN(0);
954
955  out_thread:
956 #ifdef __KERNEL__
957         ptlrpc_stop_all_threads(ldlm->ldlm_cancel_service);
958         ptlrpc_unregister_service(ldlm->ldlm_cancel_service);
959         ptlrpc_stop_all_threads(ldlm->ldlm_cb_service);
960         ptlrpc_unregister_service(ldlm->ldlm_cb_service);
961 #endif
962  out_proc:
963         ldlm_proc_cleanup(obddev);
964
965         return rc;
966 }
967
968 static int ldlm_cleanup(struct obd_device *obddev, int flags)
969 {
970         struct ldlm_obd *ldlm = &obddev->u.ldlm;
971         ENTRY;
972
973         if (!list_empty(&ldlm_namespace_list)) {
974                 CERROR("ldlm still has namespaces; clean these up first.\n");
975                 ldlm_dump_all_namespaces();
976                 RETURN(-EBUSY);
977         }
978
979 #ifdef __KERNEL__
980         if (flags & OBD_OPT_FORCE) {
981                 ptlrpc_put_ldlm_hooks();
982         } else if (ptlrpc_ldlm_hooks_referenced()) {
983                 CERROR("Some connections weren't cleaned up; run lconf with "
984                        "--force to forcibly unload.\n");
985                 ptlrpc_dump_connections();
986                 RETURN(-EBUSY);
987         }
988
989         ptlrpc_stop_all_threads(ldlm->ldlm_cb_service);
990         ptlrpc_unregister_service(ldlm->ldlm_cb_service);
991         ptlrpc_stop_all_threads(ldlm->ldlm_cancel_service);
992         ptlrpc_unregister_service(ldlm->ldlm_cancel_service);
993         ldlm_proc_cleanup(obddev);
994
995         expired_lock_thread.elt_state = ELT_TERMINATE;
996         wake_up(&expired_lock_thread.elt_waitq);
997         wait_event(expired_lock_thread.elt_waitq,
998                    expired_lock_thread.elt_state == ELT_STOPPED);
999
1000         inter_module_unregister("ldlm_namespace_cleanup");
1001         inter_module_unregister("ldlm_cli_cancel_unused");
1002         inter_module_unregister("ldlm_replay_locks");
1003 #endif
1004
1005         ldlm_already_setup = 0;
1006         RETURN(0);
1007 }
1008
1009 static int ldlm_connect(struct lustre_handle *conn, struct obd_device *src,
1010                         struct obd_uuid *cluuid)
1011 {
1012         return class_connect(conn, src, cluuid);
1013 }
1014
1015 struct obd_ops ldlm_obd_ops = {
1016         o_owner:       THIS_MODULE,
1017         o_iocontrol:   ldlm_iocontrol,
1018         o_setup:       ldlm_setup,
1019         o_cleanup:     ldlm_cleanup,
1020         o_connect:     ldlm_connect,
1021         o_disconnect:  class_disconnect
1022 };
1023
1024 int __init ldlm_init(void)
1025 {
1026         int rc = class_register_type(&ldlm_obd_ops, 0, OBD_LDLM_DEVICENAME);
1027         if (rc != 0)
1028                 return rc;
1029
1030         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
1031                                                sizeof(struct ldlm_resource), 0,
1032                                                SLAB_HWCACHE_ALIGN, NULL, NULL);
1033         if (ldlm_resource_slab == NULL)
1034                 return -ENOMEM;
1035
1036         ldlm_lock_slab = kmem_cache_create("ldlm_locks",
1037                                            sizeof(struct ldlm_lock), 0,
1038                                            SLAB_HWCACHE_ALIGN, NULL, NULL);
1039         if (ldlm_lock_slab == NULL) {
1040                 kmem_cache_destroy(ldlm_resource_slab);
1041                 return -ENOMEM;
1042         }
1043
1044         l_lock_init(&ldlm_handle_lock);
1045
1046         return 0;
1047 }
1048
1049 static void __exit ldlm_exit(void)
1050 {
1051         class_unregister_type(OBD_LDLM_DEVICENAME);
1052         if (kmem_cache_destroy(ldlm_resource_slab) != 0)
1053                 CERROR("couldn't free ldlm resource slab\n");
1054         if (kmem_cache_destroy(ldlm_lock_slab) != 0)
1055                 CERROR("couldn't free ldlm lock slab\n");
1056 }
1057
1058 /* ldlm_lock.c */
1059 EXPORT_SYMBOL(ldlm_lock2desc);
1060 EXPORT_SYMBOL(ldlm_register_intent);
1061 EXPORT_SYMBOL(ldlm_unregister_intent);
1062 EXPORT_SYMBOL(ldlm_lockname);
1063 EXPORT_SYMBOL(ldlm_typename);
1064 EXPORT_SYMBOL(ldlm_lock2handle);
1065 EXPORT_SYMBOL(__ldlm_handle2lock);
1066 EXPORT_SYMBOL(ldlm_lock_put);
1067 EXPORT_SYMBOL(ldlm_lock_match);
1068 EXPORT_SYMBOL(ldlm_lock_cancel);
1069 EXPORT_SYMBOL(ldlm_lock_addref);
1070 EXPORT_SYMBOL(ldlm_lock_decref);
1071 EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
1072 EXPORT_SYMBOL(ldlm_lock_change_resource);
1073 EXPORT_SYMBOL(ldlm_lock_set_data);
1074 EXPORT_SYMBOL(ldlm_it2str);
1075 EXPORT_SYMBOL(ldlm_lock_dump);
1076 EXPORT_SYMBOL(ldlm_lock_dump_handle);
1077 EXPORT_SYMBOL(ldlm_cancel_locks_for_export);
1078 EXPORT_SYMBOL(ldlm_reprocess_all_ns);
1079
1080 /* ldlm_request.c */
1081 EXPORT_SYMBOL(ldlm_completion_ast);
1082 EXPORT_SYMBOL(ldlm_expired_completion_wait);
1083 EXPORT_SYMBOL(ldlm_cli_convert);
1084 EXPORT_SYMBOL(ldlm_cli_enqueue);
1085 EXPORT_SYMBOL(ldlm_cli_cancel);
1086 EXPORT_SYMBOL(ldlm_cli_cancel_unused);
1087 EXPORT_SYMBOL(ldlm_replay_locks);
1088 EXPORT_SYMBOL(ldlm_resource_foreach);
1089 EXPORT_SYMBOL(ldlm_namespace_foreach);
1090 EXPORT_SYMBOL(ldlm_namespace_foreach_res);
1091 EXPORT_SYMBOL(ldlm_change_cbdata);
1092
1093 /* ldlm_lockd.c */
1094 EXPORT_SYMBOL(ldlm_server_blocking_ast);
1095 EXPORT_SYMBOL(ldlm_server_completion_ast);
1096 EXPORT_SYMBOL(ldlm_handle_enqueue);
1097 EXPORT_SYMBOL(ldlm_handle_cancel);
1098 EXPORT_SYMBOL(ldlm_handle_convert);
1099 EXPORT_SYMBOL(ldlm_del_waiting_lock);
1100
1101 #if 0
1102 /* ldlm_test.c */
1103 EXPORT_SYMBOL(ldlm_test);
1104 EXPORT_SYMBOL(ldlm_regression_start);
1105 EXPORT_SYMBOL(ldlm_regression_stop);
1106 #endif
1107
1108 /* ldlm_resource.c */
1109 EXPORT_SYMBOL(ldlm_namespace_new);
1110 EXPORT_SYMBOL(ldlm_namespace_cleanup);
1111 EXPORT_SYMBOL(ldlm_namespace_free);
1112 EXPORT_SYMBOL(ldlm_namespace_dump);
1113
1114 /* l_lock.c */
1115 EXPORT_SYMBOL(l_lock);
1116 EXPORT_SYMBOL(l_unlock);
1117
1118 /* ldlm_lib.c */
1119 EXPORT_SYMBOL(client_import_connect);
1120 EXPORT_SYMBOL(client_import_disconnect);
1121 EXPORT_SYMBOL(target_abort_recovery);
1122 EXPORT_SYMBOL(target_handle_connect);
1123 EXPORT_SYMBOL(target_cancel_recovery_timer);
1124 EXPORT_SYMBOL(target_send_reply);
1125 EXPORT_SYMBOL(target_queue_recovery_request);
1126 EXPORT_SYMBOL(target_handle_ping);
1127 EXPORT_SYMBOL(target_handle_disconnect);
1128 EXPORT_SYMBOL(target_queue_final_reply);
1129
1130 #ifdef __KERNEL__
1131 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1132 MODULE_DESCRIPTION("Lustre Lock Management Module v0.1");
1133 MODULE_LICENSE("GPL");
1134
1135 module_init(ldlm_init);
1136 module_exit(ldlm_exit);
1137 #endif