Whamcloud - gitweb
- Use l_wait_event in ldlm_completion_ast to both trigger recovery and make
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  */
11
12 #define DEBUG_SUBSYSTEM S_LDLM
13
14 #include <linux/lustre_dlm.h>
15 #include <linux/obd.h>
16
17 static int interrupted_completion_wait(void *data)
18 {
19         RETURN(1);
20 }
21
22 static int expired_completion_wait(void *data)
23 {
24         struct ldlm_lock *lock = data;
25         class_signal_connection_failure(lock->l_export->exp_connection);
26         RETURN(0);
27 }
28
29 int ldlm_completion_ast(struct ldlm_lock *lock, int flags)
30 {
31         struct l_wait_info lwi = 
32                 LWI_TIMEOUT_INTR(obd_timeout * HZ, expired_completion_wait,
33                                  interrupted_completion_wait, lock);
34         int rc = 0;
35         ENTRY;
36
37         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
38                       LDLM_FL_BLOCK_CONV)) {
39                 /* Go to sleep until the lock is granted. */
40                 /* FIXME: or cancelled. */
41                 LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock,"
42                            " sleeping");
43                 ldlm_lock_dump(lock);
44                 ldlm_reprocess_all(lock->l_resource);
45                 rc = l_wait_event(lock->l_waitq,
46                                   (lock->l_req_mode == lock->l_granted_mode),
47                                   &lwi);
48                 if (rc) {
49                         LDLM_DEBUG(lock,
50                                    "client-side enqueue waking up: failed (%d)",
51                                    rc);
52                 } else {
53                         LDLM_DEBUG(lock, 
54                                    "client-side enqueue waking up: granted");
55                 }
56         } else if (flags == LDLM_FL_WAIT_NOREPROC) {
57                 rc = l_wait_event(lock->l_waitq,
58                                   (lock->l_req_mode == lock->l_granted_mode),
59                                   &lwi);
60         } else if (flags == 0) {
61                 wake_up(&lock->l_waitq);
62         }
63
64         RETURN(rc);
65 }
66
67 static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
68                                   struct lustre_handle *parent_lockh,
69                                   __u64 *res_id,
70                                   __u32 type,
71                                   void *cookie, int cookielen,
72                                   ldlm_mode_t mode,
73                                   int *flags,
74                                   ldlm_completion_callback completion,
75                                   ldlm_blocking_callback blocking,
76                                   void *data,
77                                   __u32 data_len,
78                                   struct lustre_handle *lockh)
79 {
80         struct ldlm_lock *lock;
81         int err;
82
83         if (ns->ns_client) {
84                 CERROR("Trying to cancel local lock\n");
85                 LBUG();
86         }
87
88         lock = ldlm_lock_create(ns, parent_lockh, res_id, type, mode, data, data_len);
89         if (!lock)
90                 GOTO(out_nolock, err = -ENOMEM);
91         LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created");
92
93         ldlm_lock_addref_internal(lock, mode);
94         ldlm_lock2handle(lock, lockh);
95         lock->l_connh = NULL;
96
97         err = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion,
98                                 blocking);
99         if (err != ELDLM_OK)
100                 GOTO(out, err);
101
102         if (type == LDLM_EXTENT)
103                 memcpy(cookie, &lock->l_extent, sizeof(lock->l_extent));
104         if ((*flags) & LDLM_FL_LOCK_CHANGED)
105                 memcpy(res_id, lock->l_resource->lr_name, sizeof(*res_id));
106
107         LDLM_DEBUG_NOLOCK("client-side local enqueue handler END (lock %p)",
108                           lock);
109
110         if (lock->l_completion_ast)
111                 lock->l_completion_ast(lock, *flags);
112
113         LDLM_DEBUG(lock, "client-side local enqueue END");
114         EXIT;
115  out:
116         LDLM_LOCK_PUT(lock);
117  out_nolock:
118         return err;
119 }
120
121 int ldlm_cli_enqueue(struct lustre_handle *connh,
122                      struct ptlrpc_request *req,
123                      struct ldlm_namespace *ns,
124                      struct lustre_handle *parent_lock_handle,
125                      __u64 *res_id,
126                      __u32 type,
127                      void *cookie, int cookielen,
128                      ldlm_mode_t mode,
129                      int *flags,
130                      ldlm_completion_callback completion,
131                      ldlm_blocking_callback blocking,
132                      void *data,
133                      __u32 data_len,
134                      struct lustre_handle *lockh)
135 {
136         struct ldlm_lock *lock;
137         struct ldlm_request *body;
138         struct ldlm_reply *reply;
139         int rc, size = sizeof(*body), req_passed_in = 1;
140         ENTRY;
141
142         if (connh == NULL)
143                 return ldlm_cli_enqueue_local(ns, parent_lock_handle, res_id,
144                                               type, cookie, cookielen, mode,
145                                               flags, completion, blocking, data,
146                                               data_len, lockh);
147
148         *flags = 0;
149         lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type, mode,
150                                 data, data_len);
151         if (lock == NULL)
152                 GOTO(out_nolock, rc = -ENOMEM);
153         LDLM_DEBUG(lock, "client-side enqueue START");
154         /* for the local lock, add the reference */
155         ldlm_lock_addref_internal(lock, mode);
156         ldlm_lock2handle(lock, lockh);
157
158         if (req == NULL) {
159                 req = ptlrpc_prep_req(class_conn2cliimp(connh), LDLM_ENQUEUE, 1,
160                                       &size, NULL);
161                 if (!req)
162                         GOTO(out, rc = -ENOMEM);
163                 req_passed_in = 0;
164         } else if (req->rq_reqmsg->buflens[0] != sizeof(*body))
165                 LBUG();
166
167         /* Dump all of this data into the request buffer */
168         body = lustre_msg_buf(req->rq_reqmsg, 0);
169         ldlm_lock2desc(lock, &body->lock_desc);
170         /* Phil: make this part of ldlm_lock2desc */
171         if (type == LDLM_EXTENT)
172                 memcpy(&body->lock_desc.l_extent, cookie,
173                        sizeof(body->lock_desc.l_extent));
174         body->lock_flags = *flags;
175
176         memcpy(&body->lock_handle1, lockh, sizeof(*lockh));
177         if (parent_lock_handle)
178                 memcpy(&body->lock_handle2, parent_lock_handle,
179                        sizeof(body->lock_handle2));
180
181         /* Continue as normal. */
182         if (!req_passed_in) {
183                 size = sizeof(*reply);
184                 req->rq_replen = lustre_msg_size(1, &size);
185         }
186         lock->l_connh = connh;
187         lock->l_export = NULL;
188
189         rc = ptlrpc_queue_wait(req);
190         /* FIXME: status check here? */
191         rc = ptlrpc_check_status(req, rc);
192
193         if (rc != ELDLM_OK) {
194                 LDLM_DEBUG(lock, "client-side enqueue END (%s)",
195                            rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
196                 ldlm_lock_decref(lockh, mode);
197                 /* FIXME: if we've already received a completion AST, this will
198                  * LBUG! */
199                 ldlm_lock_destroy(lock);
200                 GOTO(out, rc);
201         }
202
203         reply = lustre_msg_buf(req->rq_repmsg, 0);
204         memcpy(&lock->l_remote_handle, &reply->lock_handle,
205                sizeof(lock->l_remote_handle));
206         if (type == LDLM_EXTENT)
207                 memcpy(cookie, &reply->lock_extent, sizeof(reply->lock_extent));
208         *flags = reply->lock_flags;
209
210         CDEBUG(D_INFO, "remote handle: %p, flags: %d\n",
211                (void *)(unsigned long)reply->lock_handle.addr, *flags);
212         CDEBUG(D_INFO, "extent: %Lu -> %Lu\n",
213                (unsigned long long)reply->lock_extent.start,
214                (unsigned long long)reply->lock_extent.end);
215
216         /* If enqueue returned a blocked lock but the completion handler has
217          * already run, then it fixed up the resource and we don't need to do it
218          * again. */
219         if ((*flags) & LDLM_FL_LOCK_CHANGED) {
220                 int newmode = reply->lock_mode;
221                 if (newmode && newmode != lock->l_req_mode) {
222                         LDLM_DEBUG(lock, "server returned different mode %s",
223                                    ldlm_lockname[newmode]);
224                         lock->l_req_mode = newmode;
225                 }
226
227                 if (reply->lock_resource_name[0] !=
228                     lock->l_resource->lr_name[0]) {
229                         CDEBUG(D_INFO, "remote intent success, locking %ld "
230                                "instead of %ld\n",
231                                (long)reply->lock_resource_name[0],
232                                (long)lock->l_resource->lr_name[0]);
233
234                         ldlm_lock_change_resource(lock,
235                                                   reply->lock_resource_name);
236                         if (lock->l_resource == NULL) {
237                                 LBUG();
238                                 RETURN(-ENOMEM);
239                         }
240                         LDLM_DEBUG(lock, "client-side enqueue, new resource");
241                 }
242         }
243
244         if (!req_passed_in)
245                 ptlrpc_free_req(req);
246
247         rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion,
248                                blocking);
249         if (lock->l_completion_ast)
250                 lock->l_completion_ast(lock, *flags);
251
252         LDLM_DEBUG(lock, "client-side enqueue END");
253         EXIT;
254  out:
255         LDLM_LOCK_PUT(lock);
256  out_nolock:
257         return rc;
258 }
259
260 int ldlm_match_or_enqueue(struct lustre_handle *connh,
261                           struct ptlrpc_request *req,
262                           struct ldlm_namespace *ns,
263                           struct lustre_handle *parent_lock_handle,
264                           __u64 *res_id,
265                           __u32 type,
266                           void *cookie, int cookielen,
267                           ldlm_mode_t mode,
268                           int *flags,
269                           ldlm_completion_callback completion,
270                           ldlm_blocking_callback blocking,
271                           void *data,
272                           __u32 data_len,
273                           struct lustre_handle *lockh)
274 {
275         int rc;
276         ENTRY;
277         rc = ldlm_lock_match(ns, res_id, type, cookie, cookielen, mode, lockh);
278         if (rc == 0) {
279                 rc = ldlm_cli_enqueue(connh, req, ns,
280                                       parent_lock_handle, res_id, type, cookie,
281                                       cookielen, mode, flags, completion,
282                                       blocking, data, data_len, lockh);
283                 if (rc != ELDLM_OK)
284                         CERROR("ldlm_cli_enqueue: err: %d\n", rc);
285                 RETURN(rc);
286         } else
287                 RETURN(0);
288 }
289
290 static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode,
291                                   int *flags)
292 {
293
294         if (lock->l_resource->lr_namespace->ns_client) {
295                 CERROR("Trying to cancel local lock\n");
296                 LBUG();
297         }
298         LDLM_DEBUG(lock, "client-side local convert");
299
300         ldlm_lock_convert(lock, new_mode, flags);
301         ldlm_reprocess_all(lock->l_resource);
302
303         LDLM_DEBUG(lock, "client-side local convert handler END");
304         LDLM_LOCK_PUT(lock);
305         RETURN(0);
306 }
307
308 int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags)
309 {
310         struct ldlm_request *body;
311         struct lustre_handle *connh;
312         struct ldlm_reply *reply;
313         struct ldlm_lock *lock;
314         struct ldlm_resource *res;
315         struct ptlrpc_request *req;
316         int rc, size = sizeof(*body);
317         ENTRY;
318
319         lock = ldlm_handle2lock(lockh);
320         if (!lock) {
321                 LBUG();
322                 RETURN(-EINVAL);
323         }
324         *flags = 0;
325         connh = lock->l_connh;
326
327         if (!connh)
328                 return ldlm_cli_convert_local(lock, new_mode, flags);
329
330         LDLM_DEBUG(lock, "client-side convert");
331
332         req = ptlrpc_prep_req(class_conn2cliimp(connh), LDLM_CONVERT, 1, &size,
333                               NULL);
334         if (!req)
335                 GOTO(out, rc = -ENOMEM);
336
337         body = lustre_msg_buf(req->rq_reqmsg, 0);
338         memcpy(&body->lock_handle1, &lock->l_remote_handle,
339                sizeof(body->lock_handle1));
340
341         body->lock_desc.l_req_mode = new_mode;
342         body->lock_flags = *flags;
343
344         size = sizeof(*reply);
345         req->rq_replen = lustre_msg_size(1, &size);
346
347         rc = ptlrpc_queue_wait(req);
348         rc = ptlrpc_check_status(req, rc);
349         if (rc != ELDLM_OK)
350                 GOTO(out, rc);
351
352         reply = lustre_msg_buf(req->rq_repmsg, 0);
353         res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags);
354         if (res != NULL)
355                 ldlm_reprocess_all(res);
356         /* Go to sleep until the lock is granted. */
357         /* FIXME: or cancelled. */
358         if (lock->l_completion_ast)
359                 lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC);
360         EXIT;
361  out:
362         LDLM_LOCK_PUT(lock);
363         ptlrpc_free_req(req);
364         return rc;
365 }
366
367 int ldlm_cli_cancel(struct lustre_handle *lockh)
368 {
369         struct ptlrpc_request *req;
370         struct ldlm_lock *lock;
371         struct ldlm_request *body;
372         int rc = 0, size = sizeof(*body);
373         ENTRY;
374
375         lock = ldlm_handle2lock(lockh);
376         if (!lock) {
377                 /* It's possible that the decref that we did just before this
378                  * cancel was the last reader/writer, and caused a cancel before
379                  * we could call this function.  If we want to make this
380                  * impossible (by adding a dec_and_cancel() or similar), then
381                  * we can put the LBUG back. */
382                 //LBUG();
383                 RETURN(-EINVAL);
384         }
385
386         if (lock->l_connh) {
387                 LDLM_DEBUG(lock, "client-side cancel");
388                 /* Set this flag to prevent others from getting new references*/
389                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
390                 lock->l_flags |= LDLM_FL_CBPENDING;
391                 ldlm_cancel_callback(lock);
392                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
393
394                 req = ptlrpc_prep_req(class_conn2cliimp(lock->l_connh), 
395                                       LDLM_CANCEL, 1, &size, NULL);
396                 if (!req)
397                         GOTO(out, rc = -ENOMEM);
398
399                 body = lustre_msg_buf(req->rq_reqmsg, 0);
400                 memcpy(&body->lock_handle1, &lock->l_remote_handle,
401                        sizeof(body->lock_handle1));
402
403                 req->rq_replen = lustre_msg_size(0, NULL);
404
405                 rc = ptlrpc_queue_wait(req);
406                 rc = ptlrpc_check_status(req, rc);
407                 ptlrpc_free_req(req);
408                 if (rc != ELDLM_OK)
409                         GOTO(out, rc);
410
411                 ldlm_lock_cancel(lock);
412         } else {
413                 LDLM_DEBUG(lock, "client-side local cancel");
414                 if (lock->l_resource->lr_namespace->ns_client) {
415                         CERROR("Trying to cancel local lock\n");
416                         LBUG();
417                 }
418                 ldlm_lock_cancel(lock);
419                 ldlm_reprocess_all(lock->l_resource);
420                 LDLM_DEBUG(lock, "client-side local cancel handler END");
421         }
422
423         EXIT;
424  out:
425         LDLM_LOCK_PUT(lock);
426         return rc;
427 }
428
429 /* Cancel all locks on a given resource that have 0 readers/writers.
430  *
431  * If 'local_only' is true, throw the locks away without trying to notify the
432  * server. */
433 int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, __u64 *res_id,
434                            int local_only)
435 {
436         struct ldlm_resource *res;
437         struct list_head *tmp, *next, list = LIST_HEAD_INIT(list);
438         struct ldlm_ast_work *w;
439         ENTRY;
440
441         res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
442         if (res == NULL)
443                 RETURN(-EINVAL);
444
445         l_lock(&ns->ns_lock);
446         list_for_each(tmp, &res->lr_granted) {
447                 struct ldlm_lock *lock;
448                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
449
450                 if (lock->l_readers || lock->l_writers)
451                         continue;
452
453                 /* Setting the CBPENDING flag is a little misleading, but
454                  * prevents an important race; namely, once CBPENDING is set,
455                  * the lock can accumulate no more readers/writers.  Since
456                  * readers and writers are already zero here, ldlm_lock_decref
457                  * won't see this flag and call l_blocking_ast */
458                 lock->l_flags |= LDLM_FL_CBPENDING;
459
460                 OBD_ALLOC(w, sizeof(*w));
461                 LASSERT(w);
462
463                 w->w_lock = LDLM_LOCK_GET(lock);
464                 list_add(&w->w_list, &list);
465         }
466         l_unlock(&ns->ns_lock);
467
468         list_for_each_safe(tmp, next, &list) {
469                 struct lustre_handle lockh;
470                 int rc;
471                 w = list_entry(tmp, struct ldlm_ast_work, w_list);
472
473                 if (local_only)
474                         ldlm_lock_cancel(w->w_lock);
475                 else {
476                         ldlm_lock2handle(w->w_lock, &lockh);
477                         rc = ldlm_cli_cancel(&lockh);
478                         if (rc != ELDLM_OK)
479                                 CERROR("ldlm_cli_cancel: %d\n", rc);
480                 }
481                 LDLM_LOCK_PUT(w->w_lock);
482                 list_del(&w->w_list);
483                 OBD_FREE(w, sizeof(*w));
484         }
485
486         ldlm_resource_put(res);
487
488         RETURN(0);
489 }