Whamcloud - gitweb
87e5f0bdcfc20280c24d12a7f602260a1e85fc3b
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  */
11
12 #define DEBUG_SUBSYSTEM S_LDLM
13
14 #include <linux/lustre_dlm.h>
15 #include <linux/obd_class.h>
16 #include <linux/obd.h>
17
18 static int interrupted_completion_wait(void *data)
19 {
20         RETURN(1);
21 }
22
23 static int expired_completion_wait(void *data)
24 {
25         struct ldlm_lock *lock = data;
26         if (!lock)
27                 CERROR("NULL lock\n");
28         else if (!lock->l_export)
29                 CERROR("lock %p has NULL export\n", lock);
30         else
31                 class_signal_connection_failure(lock->l_export->exp_connection);
32         RETURN(0);
33 }
34
35 int ldlm_completion_ast(struct ldlm_lock *lock, int flags)
36 {
37         struct l_wait_info lwi = 
38                 LWI_TIMEOUT_INTR(obd_timeout * HZ, expired_completion_wait,
39                                  interrupted_completion_wait, lock);
40         int rc = 0;
41         ENTRY;
42
43         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
44                       LDLM_FL_BLOCK_CONV)) {
45                 /* Go to sleep until the lock is granted. */
46                 /* FIXME: or cancelled. */
47                 LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock,"
48                            " sleeping");
49                 ldlm_lock_dump(lock);
50                 ldlm_reprocess_all(lock->l_resource);
51                 rc = l_wait_event(lock->l_waitq,
52                                   (lock->l_req_mode == lock->l_granted_mode),
53                                   &lwi);
54                 if (rc) {
55                         LDLM_DEBUG(lock,
56                                    "client-side enqueue waking up: failed (%d)",
57                                    rc);
58                 } else {
59                         LDLM_DEBUG(lock, 
60                                    "client-side enqueue waking up: granted");
61                 }
62         } else if (flags == LDLM_FL_WAIT_NOREPROC) {
63                 rc = l_wait_event(lock->l_waitq,
64                                   (lock->l_req_mode == lock->l_granted_mode),
65                                   &lwi);
66         } else if (flags == 0) {
67                 wake_up(&lock->l_waitq);
68         }
69
70         RETURN(rc);
71 }
72
73 static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
74                                   struct lustre_handle *parent_lockh,
75                                   __u64 *res_id,
76                                   __u32 type,
77                                   void *cookie, int cookielen,
78                                   ldlm_mode_t mode,
79                                   int *flags,
80                                   ldlm_completion_callback completion,
81                                   ldlm_blocking_callback blocking,
82                                   void *data,
83                                   __u32 data_len,
84                                   struct lustre_handle *lockh)
85 {
86         struct ldlm_lock *lock;
87         int err;
88
89         if (ns->ns_client) {
90                 CERROR("Trying to cancel local lock\n");
91                 LBUG();
92         }
93
94         lock = ldlm_lock_create(ns, parent_lockh, res_id, type, mode, data, data_len);
95         if (!lock)
96                 GOTO(out_nolock, err = -ENOMEM);
97         LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created");
98
99         ldlm_lock_addref_internal(lock, mode);
100         ldlm_lock2handle(lock, lockh);
101         lock->l_connh = NULL;
102
103         err = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion,
104                                 blocking);
105         if (err != ELDLM_OK)
106                 GOTO(out, err);
107
108         if (type == LDLM_EXTENT)
109                 memcpy(cookie, &lock->l_extent, sizeof(lock->l_extent));
110         if ((*flags) & LDLM_FL_LOCK_CHANGED)
111                 memcpy(res_id, lock->l_resource->lr_name, sizeof(*res_id));
112
113         LDLM_DEBUG_NOLOCK("client-side local enqueue handler END (lock %p)",
114                           lock);
115
116         if (lock->l_completion_ast)
117                 lock->l_completion_ast(lock, *flags);
118
119         LDLM_DEBUG(lock, "client-side local enqueue END");
120         EXIT;
121  out:
122         LDLM_LOCK_PUT(lock);
123  out_nolock:
124         return err;
125 }
126
127 int ldlm_cli_enqueue(struct lustre_handle *connh,
128                      struct ptlrpc_request *req,
129                      struct ldlm_namespace *ns,
130                      struct lustre_handle *parent_lock_handle,
131                      __u64 *res_id,
132                      __u32 type,
133                      void *cookie, int cookielen,
134                      ldlm_mode_t mode,
135                      int *flags,
136                      ldlm_completion_callback completion,
137                      ldlm_blocking_callback blocking,
138                      void *data,
139                      __u32 data_len,
140                      struct lustre_handle *lockh)
141 {
142         struct ldlm_lock *lock;
143         struct ldlm_request *body;
144         struct ldlm_reply *reply;
145         int rc, size = sizeof(*body), req_passed_in = 1;
146         ENTRY;
147
148         if (connh == NULL)
149                 return ldlm_cli_enqueue_local(ns, parent_lock_handle, res_id,
150                                               type, cookie, cookielen, mode,
151                                               flags, completion, blocking, data,
152                                               data_len, lockh);
153
154         *flags = 0;
155         lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type, mode,
156                                 data, data_len);
157         if (lock == NULL)
158                 GOTO(out_nolock, rc = -ENOMEM);
159         LDLM_DEBUG(lock, "client-side enqueue START");
160         /* for the local lock, add the reference */
161         ldlm_lock_addref_internal(lock, mode);
162         ldlm_lock2handle(lock, lockh);
163
164         if (req == NULL) {
165                 req = ptlrpc_prep_req(class_conn2cliimp(connh), LDLM_ENQUEUE, 1,
166                                       &size, NULL);
167                 if (!req)
168                         GOTO(out, rc = -ENOMEM);
169                 req_passed_in = 0;
170         } else if (req->rq_reqmsg->buflens[0] != sizeof(*body))
171                 LBUG();
172
173         /* Dump all of this data into the request buffer */
174         body = lustre_msg_buf(req->rq_reqmsg, 0);
175         ldlm_lock2desc(lock, &body->lock_desc);
176         /* Phil: make this part of ldlm_lock2desc */
177         if (type == LDLM_EXTENT)
178                 memcpy(&body->lock_desc.l_extent, cookie,
179                        sizeof(body->lock_desc.l_extent));
180         body->lock_flags = *flags;
181
182         memcpy(&body->lock_handle1, lockh, sizeof(*lockh));
183         if (parent_lock_handle)
184                 memcpy(&body->lock_handle2, parent_lock_handle,
185                        sizeof(body->lock_handle2));
186
187         /* Continue as normal. */
188         if (!req_passed_in) {
189                 size = sizeof(*reply);
190                 req->rq_replen = lustre_msg_size(1, &size);
191         }
192         lock->l_connh = connh;
193         lock->l_export = NULL;
194
195         rc = ptlrpc_queue_wait(req);
196         /* FIXME: status check here? */
197         rc = ptlrpc_check_status(req, rc);
198
199         if (rc != ELDLM_OK) {
200                 LDLM_DEBUG(lock, "client-side enqueue END (%s)",
201                            rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
202                 ldlm_lock_decref(lockh, mode);
203                 /* FIXME: if we've already received a completion AST, this will
204                  * LBUG! */
205                 ldlm_lock_destroy(lock);
206                 GOTO(out, rc);
207         }
208
209         reply = lustre_msg_buf(req->rq_repmsg, 0);
210         memcpy(&lock->l_remote_handle, &reply->lock_handle,
211                sizeof(lock->l_remote_handle));
212         if (type == LDLM_EXTENT)
213                 memcpy(cookie, &reply->lock_extent, sizeof(reply->lock_extent));
214         *flags = reply->lock_flags;
215
216         CDEBUG(D_INFO, "remote handle: %p, flags: %d\n",
217                (void *)(unsigned long)reply->lock_handle.addr, *flags);
218         CDEBUG(D_INFO, "extent: %Lu -> %Lu\n",
219                (unsigned long long)reply->lock_extent.start,
220                (unsigned long long)reply->lock_extent.end);
221
222         /* If enqueue returned a blocked lock but the completion handler has
223          * already run, then it fixed up the resource and we don't need to do it
224          * again. */
225         if ((*flags) & LDLM_FL_LOCK_CHANGED) {
226                 int newmode = reply->lock_mode;
227                 if (newmode && newmode != lock->l_req_mode) {
228                         LDLM_DEBUG(lock, "server returned different mode %s",
229                                    ldlm_lockname[newmode]);
230                         lock->l_req_mode = newmode;
231                 }
232
233                 if (reply->lock_resource_name[0] !=
234                     lock->l_resource->lr_name[0]) {
235                         CDEBUG(D_INFO, "remote intent success, locking %ld "
236                                "instead of %ld\n",
237                                (long)reply->lock_resource_name[0],
238                                (long)lock->l_resource->lr_name[0]);
239
240                         ldlm_lock_change_resource(lock,
241                                                   reply->lock_resource_name);
242                         if (lock->l_resource == NULL) {
243                                 LBUG();
244                                 RETURN(-ENOMEM);
245                         }
246                         LDLM_DEBUG(lock, "client-side enqueue, new resource");
247                 }
248         }
249
250         if (!req_passed_in)
251                 ptlrpc_free_req(req);
252
253         rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion,
254                                blocking);
255         if (lock->l_completion_ast)
256                 lock->l_completion_ast(lock, *flags);
257
258         LDLM_DEBUG(lock, "client-side enqueue END");
259         EXIT;
260  out:
261         LDLM_LOCK_PUT(lock);
262  out_nolock:
263         return rc;
264 }
265
266 int ldlm_match_or_enqueue(struct lustre_handle *connh,
267                           struct ptlrpc_request *req,
268                           struct ldlm_namespace *ns,
269                           struct lustre_handle *parent_lock_handle,
270                           __u64 *res_id,
271                           __u32 type,
272                           void *cookie, int cookielen,
273                           ldlm_mode_t mode,
274                           int *flags,
275                           ldlm_completion_callback completion,
276                           ldlm_blocking_callback blocking,
277                           void *data,
278                           __u32 data_len,
279                           struct lustre_handle *lockh)
280 {
281         int rc;
282         ENTRY;
283         rc = ldlm_lock_match(ns, res_id, type, cookie, cookielen, mode, lockh);
284         if (rc == 0) {
285                 rc = ldlm_cli_enqueue(connh, req, ns,
286                                       parent_lock_handle, res_id, type, cookie,
287                                       cookielen, mode, flags, completion,
288                                       blocking, data, data_len, lockh);
289                 if (rc != ELDLM_OK)
290                         CERROR("ldlm_cli_enqueue: err: %d\n", rc);
291                 RETURN(rc);
292         } else
293                 RETURN(0);
294 }
295
296 static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode,
297                                   int *flags)
298 {
299
300         if (lock->l_resource->lr_namespace->ns_client) {
301                 CERROR("Trying to cancel local lock\n");
302                 LBUG();
303         }
304         LDLM_DEBUG(lock, "client-side local convert");
305
306         ldlm_lock_convert(lock, new_mode, flags);
307         ldlm_reprocess_all(lock->l_resource);
308
309         LDLM_DEBUG(lock, "client-side local convert handler END");
310         LDLM_LOCK_PUT(lock);
311         RETURN(0);
312 }
313
314 int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags)
315 {
316         struct ldlm_request *body;
317         struct lustre_handle *connh;
318         struct ldlm_reply *reply;
319         struct ldlm_lock *lock;
320         struct ldlm_resource *res;
321         struct ptlrpc_request *req;
322         int rc, size = sizeof(*body);
323         ENTRY;
324
325         lock = ldlm_handle2lock(lockh);
326         if (!lock) {
327                 LBUG();
328                 RETURN(-EINVAL);
329         }
330         *flags = 0;
331         connh = lock->l_connh;
332
333         if (!connh)
334                 return ldlm_cli_convert_local(lock, new_mode, flags);
335
336         LDLM_DEBUG(lock, "client-side convert");
337
338         req = ptlrpc_prep_req(class_conn2cliimp(connh), LDLM_CONVERT, 1, &size,
339                               NULL);
340         if (!req)
341                 GOTO(out, rc = -ENOMEM);
342
343         body = lustre_msg_buf(req->rq_reqmsg, 0);
344         memcpy(&body->lock_handle1, &lock->l_remote_handle,
345                sizeof(body->lock_handle1));
346
347         body->lock_desc.l_req_mode = new_mode;
348         body->lock_flags = *flags;
349
350         size = sizeof(*reply);
351         req->rq_replen = lustre_msg_size(1, &size);
352
353         rc = ptlrpc_queue_wait(req);
354         rc = ptlrpc_check_status(req, rc);
355         if (rc != ELDLM_OK)
356                 GOTO(out, rc);
357
358         reply = lustre_msg_buf(req->rq_repmsg, 0);
359         res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags);
360         if (res != NULL)
361                 ldlm_reprocess_all(res);
362         /* Go to sleep until the lock is granted. */
363         /* FIXME: or cancelled. */
364         if (lock->l_completion_ast)
365                 lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC);
366         EXIT;
367  out:
368         LDLM_LOCK_PUT(lock);
369         ptlrpc_free_req(req);
370         return rc;
371 }
372
373 int ldlm_cli_cancel(struct lustre_handle *lockh)
374 {
375         struct ptlrpc_request *req;
376         struct ldlm_lock *lock;
377         struct ldlm_request *body;
378         int rc = 0, size = sizeof(*body);
379         ENTRY;
380
381         lock = ldlm_handle2lock(lockh);
382         if (!lock) {
383                 /* It's possible that the decref that we did just before this
384                  * cancel was the last reader/writer, and caused a cancel before
385                  * we could call this function.  If we want to make this
386                  * impossible (by adding a dec_and_cancel() or similar), then
387                  * we can put the LBUG back. */
388                 //LBUG();
389                 RETURN(-EINVAL);
390         }
391
392         if (lock->l_connh) {
393                 LDLM_DEBUG(lock, "client-side cancel");
394                 /* Set this flag to prevent others from getting new references*/
395                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
396                 lock->l_flags |= LDLM_FL_CBPENDING;
397                 ldlm_cancel_callback(lock);
398                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
399
400                 req = ptlrpc_prep_req(class_conn2cliimp(lock->l_connh), 
401                                       LDLM_CANCEL, 1, &size, NULL);
402                 if (!req)
403                         GOTO(out, rc = -ENOMEM);
404
405                 body = lustre_msg_buf(req->rq_reqmsg, 0);
406                 memcpy(&body->lock_handle1, &lock->l_remote_handle,
407                        sizeof(body->lock_handle1));
408
409                 req->rq_replen = lustre_msg_size(0, NULL);
410
411                 rc = ptlrpc_queue_wait(req);
412                 rc = ptlrpc_check_status(req, rc);
413                 ptlrpc_free_req(req);
414                 if (rc != ELDLM_OK)
415                         GOTO(out, rc);
416
417                 ldlm_lock_cancel(lock);
418         } else {
419                 LDLM_DEBUG(lock, "client-side local cancel");
420                 if (lock->l_resource->lr_namespace->ns_client) {
421                         CERROR("Trying to cancel local lock\n");
422                         LBUG();
423                 }
424                 ldlm_lock_cancel(lock);
425                 ldlm_reprocess_all(lock->l_resource);
426                 LDLM_DEBUG(lock, "client-side local cancel handler END");
427         }
428
429         EXIT;
430  out:
431         LDLM_LOCK_PUT(lock);
432         return rc;
433 }
434
435 /* Cancel all locks on a given resource that have 0 readers/writers.
436  *
437  * If 'local_only' is true, throw the locks away without trying to notify the
438  * server. */
439 int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, __u64 *res_id,
440                            int local_only)
441 {
442         struct ldlm_resource *res;
443         struct list_head *tmp, *next, list = LIST_HEAD_INIT(list);
444         struct ldlm_ast_work *w;
445         ENTRY;
446
447         res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
448         if (res == NULL)
449                 RETURN(-EINVAL);
450
451         l_lock(&ns->ns_lock);
452         list_for_each(tmp, &res->lr_granted) {
453                 struct ldlm_lock *lock;
454                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
455
456                 if (lock->l_readers || lock->l_writers)
457                         continue;
458
459                 /* Setting the CBPENDING flag is a little misleading, but
460                  * prevents an important race; namely, once CBPENDING is set,
461                  * the lock can accumulate no more readers/writers.  Since
462                  * readers and writers are already zero here, ldlm_lock_decref
463                  * won't see this flag and call l_blocking_ast */
464                 lock->l_flags |= LDLM_FL_CBPENDING;
465
466                 OBD_ALLOC(w, sizeof(*w));
467                 LASSERT(w);
468
469                 w->w_lock = LDLM_LOCK_GET(lock);
470                 list_add(&w->w_list, &list);
471         }
472         l_unlock(&ns->ns_lock);
473
474         list_for_each_safe(tmp, next, &list) {
475                 struct lustre_handle lockh;
476                 int rc;
477                 w = list_entry(tmp, struct ldlm_ast_work, w_list);
478
479                 if (local_only)
480                         ldlm_lock_cancel(w->w_lock);
481                 else {
482                         ldlm_lock2handle(w->w_lock, &lockh);
483                         rc = ldlm_cli_cancel(&lockh);
484                         if (rc != ELDLM_OK)
485                                 CERROR("ldlm_cli_cancel: %d\n", rc);
486                 }
487                 LDLM_LOCK_PUT(w->w_lock);
488                 list_del(&w->w_list);
489                 OBD_FREE(w, sizeof(*w));
490         }
491
492         ldlm_resource_put(res);
493
494         RETURN(0);
495 }