Whamcloud - gitweb
The old patch was failing because sometimes the inodes were gone by the time
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  */
11
12 #define DEBUG_SUBSYSTEM S_LDLM
13
14 #include <linux/lustre_dlm.h>
15 #include <linux/obd.h>
16
17 int ldlm_completion_ast(struct ldlm_lock *lock, int flags)
18 {
19         ENTRY;
20
21         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
22                       LDLM_FL_BLOCK_CONV)) {
23                 /* Go to sleep until the lock is granted. */
24                 /* FIXME: or cancelled. */
25                 LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock,"
26                            " sleeping");
27                 ldlm_lock_dump(lock);
28                 ldlm_reprocess_all(lock->l_resource);
29                 wait_event(lock->l_waitq, (lock->l_req_mode ==
30                                            lock->l_granted_mode));
31                 LDLM_DEBUG(lock, "client-side enqueue waking up: granted");
32         } else if (flags == LDLM_FL_WAIT_NOREPROC) {
33                 wait_event(lock->l_waitq, (lock->l_req_mode ==
34                                            lock->l_granted_mode));
35         } else if (flags == 0) {
36                 wake_up(&lock->l_waitq);
37         }
38
39         RETURN(0);
40 }
41
42 static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
43                                   struct lustre_handle *parent_lockh,
44                                   __u64 *res_id,
45                                   __u32 type,
46                                   void *cookie, int cookielen,
47                                   ldlm_mode_t mode,
48                                   int *flags,
49                                   ldlm_completion_callback completion,
50                                   ldlm_blocking_callback blocking,
51                                   void *data,
52                                   __u32 data_len,
53                                   struct lustre_handle *lockh)
54 {
55         struct ldlm_lock *lock;
56         int err;
57
58         if (ns->ns_client) {
59                 CERROR("Trying to cancel local lock\n");
60                 LBUG();
61         }
62
63         lock = ldlm_lock_create(ns, parent_lockh, res_id, type, mode, data, data_len);
64         if (!lock)
65                 GOTO(out_nolock, err = -ENOMEM);
66         LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created");
67
68         ldlm_lock_addref_internal(lock, mode);
69         ldlm_lock2handle(lock, lockh);
70         lock->l_connh = NULL;
71
72         err = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion,
73                                 blocking);
74         if (err != ELDLM_OK)
75                 GOTO(out, err);
76
77         if (type == LDLM_EXTENT)
78                 memcpy(cookie, &lock->l_extent, sizeof(lock->l_extent));
79         if ((*flags) & LDLM_FL_LOCK_CHANGED)
80                 memcpy(res_id, lock->l_resource->lr_name, sizeof(*res_id));
81
82         LDLM_DEBUG_NOLOCK("client-side local enqueue handler END (lock %p)",
83                           lock);
84
85         if (lock->l_completion_ast)
86                 lock->l_completion_ast(lock, *flags);
87
88         LDLM_DEBUG(lock, "client-side local enqueue END");
89         EXIT;
90  out:
91         LDLM_LOCK_PUT(lock);
92  out_nolock:
93         return err;
94 }
95
96 int ldlm_cli_enqueue(struct lustre_handle *connh,
97                      struct ptlrpc_request *req,
98                      struct ldlm_namespace *ns,
99                      struct lustre_handle *parent_lock_handle,
100                      __u64 *res_id,
101                      __u32 type,
102                      void *cookie, int cookielen,
103                      ldlm_mode_t mode,
104                      int *flags,
105                      ldlm_completion_callback completion,
106                      ldlm_blocking_callback blocking,
107                      void *data,
108                      __u32 data_len,
109                      struct lustre_handle *lockh)
110 {
111         struct ldlm_lock *lock;
112         struct ldlm_request *body;
113         struct ldlm_reply *reply;
114         int rc, size = sizeof(*body), req_passed_in = 1;
115         ENTRY;
116
117         if (connh == NULL)
118                 return ldlm_cli_enqueue_local(ns, parent_lock_handle, res_id,
119                                               type, cookie, cookielen, mode,
120                                               flags, completion, blocking, data,
121                                               data_len, lockh);
122
123         *flags = 0;
124         lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type, mode,
125                                 data, data_len);
126         if (lock == NULL)
127                 GOTO(out_nolock, rc = -ENOMEM);
128         LDLM_DEBUG(lock, "client-side enqueue START");
129         /* for the local lock, add the reference */
130         ldlm_lock_addref_internal(lock, mode);
131         ldlm_lock2handle(lock, lockh);
132
133         if (req == NULL) {
134                 req = ptlrpc_prep_req(class_conn2cliimp(connh), LDLM_ENQUEUE, 1,
135                                       &size, NULL);
136                 if (!req)
137                         GOTO(out, rc = -ENOMEM);
138                 req_passed_in = 0;
139         } else if (req->rq_reqmsg->buflens[0] != sizeof(*body))
140                 LBUG();
141
142         /* Dump all of this data into the request buffer */
143         body = lustre_msg_buf(req->rq_reqmsg, 0);
144         ldlm_lock2desc(lock, &body->lock_desc);
145         /* Phil: make this part of ldlm_lock2desc */
146         if (type == LDLM_EXTENT)
147                 memcpy(&body->lock_desc.l_extent, cookie,
148                        sizeof(body->lock_desc.l_extent));
149         body->lock_flags = *flags;
150
151         memcpy(&body->lock_handle1, lockh, sizeof(*lockh));
152         if (parent_lock_handle)
153                 memcpy(&body->lock_handle2, parent_lock_handle,
154                        sizeof(body->lock_handle2));
155
156         /* Continue as normal. */
157         if (!req_passed_in) {
158                 size = sizeof(*reply);
159                 req->rq_replen = lustre_msg_size(1, &size);
160         }
161         lock->l_connh = connh;
162         lock->l_export = NULL;
163
164         rc = ptlrpc_queue_wait(req);
165         /* FIXME: status check here? */
166         rc = ptlrpc_check_status(req, rc);
167
168         if (rc != ELDLM_OK) {
169                 LDLM_DEBUG(lock, "client-side enqueue END (%s)",
170                            rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
171                 ldlm_lock_decref(lockh, mode);
172                 /* FIXME: if we've already received a completion AST, this will
173                  * LBUG! */
174                 ldlm_lock_destroy(lock);
175                 GOTO(out, rc);
176         }
177
178         reply = lustre_msg_buf(req->rq_repmsg, 0);
179         memcpy(&lock->l_remote_handle, &reply->lock_handle,
180                sizeof(lock->l_remote_handle));
181         if (type == LDLM_EXTENT)
182                 memcpy(cookie, &reply->lock_extent, sizeof(reply->lock_extent));
183         *flags = reply->lock_flags;
184
185         CDEBUG(D_INFO, "remote handle: %p, flags: %d\n",
186                (void *)(unsigned long)reply->lock_handle.addr, *flags);
187         CDEBUG(D_INFO, "extent: %Lu -> %Lu\n",
188                (unsigned long long)reply->lock_extent.start,
189                (unsigned long long)reply->lock_extent.end);
190
191         /* If enqueue returned a blocked lock but the completion handler has
192          * already run, then it fixed up the resource and we don't need to do it
193          * again. */
194         if ((*flags) & LDLM_FL_LOCK_CHANGED) {
195                 int newmode = reply->lock_mode;
196                 if (newmode && newmode != lock->l_req_mode) {
197                         LDLM_DEBUG(lock, "server returned different mode %s",
198                                    ldlm_lockname[newmode]);
199                         lock->l_req_mode = newmode;
200                 }
201
202                 if (reply->lock_resource_name[0] !=
203                     lock->l_resource->lr_name[0]) {
204                         CDEBUG(D_INFO, "remote intent success, locking %ld "
205                                "instead of %ld\n",
206                                (long)reply->lock_resource_name[0],
207                                (long)lock->l_resource->lr_name[0]);
208
209                         ldlm_lock_change_resource(lock,
210                                                   reply->lock_resource_name);
211                         if (lock->l_resource == NULL) {
212                                 LBUG();
213                                 RETURN(-ENOMEM);
214                         }
215                         LDLM_DEBUG(lock, "client-side enqueue, new resource");
216                 }
217         }
218
219         if (!req_passed_in)
220                 ptlrpc_free_req(req);
221
222         rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion,
223                                blocking);
224         if (lock->l_completion_ast)
225                 lock->l_completion_ast(lock, *flags);
226
227         LDLM_DEBUG(lock, "client-side enqueue END");
228         EXIT;
229  out:
230         LDLM_LOCK_PUT(lock);
231  out_nolock:
232         return rc;
233 }
234
235 int ldlm_match_or_enqueue(struct lustre_handle *connh,
236                           struct ptlrpc_request *req,
237                           struct ldlm_namespace *ns,
238                           struct lustre_handle *parent_lock_handle,
239                           __u64 *res_id,
240                           __u32 type,
241                           void *cookie, int cookielen,
242                           ldlm_mode_t mode,
243                           int *flags,
244                           ldlm_completion_callback completion,
245                           ldlm_blocking_callback blocking,
246                           void *data,
247                           __u32 data_len,
248                           struct lustre_handle *lockh)
249 {
250         int rc;
251         ENTRY;
252         rc = ldlm_lock_match(ns, res_id, type, cookie, cookielen, mode, lockh);
253         if (rc == 0) {
254                 rc = ldlm_cli_enqueue(connh, req, ns,
255                                       parent_lock_handle, res_id, type, cookie,
256                                       cookielen, mode, flags, completion,
257                                       blocking, data, data_len, lockh);
258                 if (rc != ELDLM_OK)
259                         CERROR("ldlm_cli_enqueue: err: %d\n", rc);
260                 RETURN(rc);
261         } else
262                 RETURN(0);
263 }
264
265 static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode,
266                                   int *flags)
267 {
268
269         if (lock->l_resource->lr_namespace->ns_client) {
270                 CERROR("Trying to cancel local lock\n");
271                 LBUG();
272         }
273         LDLM_DEBUG(lock, "client-side local convert");
274
275         ldlm_lock_convert(lock, new_mode, flags);
276         ldlm_reprocess_all(lock->l_resource);
277
278         LDLM_DEBUG(lock, "client-side local convert handler END");
279         LDLM_LOCK_PUT(lock);
280         RETURN(0);
281 }
282
283 int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags)
284 {
285         struct ldlm_request *body;
286         struct lustre_handle *connh;
287         struct ldlm_reply *reply;
288         struct ldlm_lock *lock;
289         struct ldlm_resource *res;
290         struct ptlrpc_request *req;
291         int rc, size = sizeof(*body);
292         ENTRY;
293
294         lock = ldlm_handle2lock(lockh);
295         if (!lock) {
296                 LBUG();
297                 RETURN(-EINVAL);
298         }
299         *flags = 0;
300         connh = lock->l_connh;
301
302         if (!connh)
303                 return ldlm_cli_convert_local(lock, new_mode, flags);
304
305         LDLM_DEBUG(lock, "client-side convert");
306
307         req = ptlrpc_prep_req(class_conn2cliimp(connh), LDLM_CONVERT, 1, &size,
308                               NULL);
309         if (!req)
310                 GOTO(out, rc = -ENOMEM);
311
312         body = lustre_msg_buf(req->rq_reqmsg, 0);
313         memcpy(&body->lock_handle1, &lock->l_remote_handle,
314                sizeof(body->lock_handle1));
315
316         body->lock_desc.l_req_mode = new_mode;
317         body->lock_flags = *flags;
318
319         size = sizeof(*reply);
320         req->rq_replen = lustre_msg_size(1, &size);
321
322         rc = ptlrpc_queue_wait(req);
323         rc = ptlrpc_check_status(req, rc);
324         if (rc != ELDLM_OK)
325                 GOTO(out, rc);
326
327         reply = lustre_msg_buf(req->rq_repmsg, 0);
328         res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags);
329         if (res != NULL)
330                 ldlm_reprocess_all(res);
331         /* Go to sleep until the lock is granted. */
332         /* FIXME: or cancelled. */
333         if (lock->l_completion_ast)
334                 lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC);
335         EXIT;
336  out:
337         LDLM_LOCK_PUT(lock);
338         ptlrpc_free_req(req);
339         return rc;
340 }
341
342 int ldlm_cli_cancel(struct lustre_handle *lockh)
343 {
344         struct ptlrpc_request *req;
345         struct ldlm_lock *lock;
346         struct ldlm_request *body;
347         int rc = 0, size = sizeof(*body);
348         ENTRY;
349
350         lock = ldlm_handle2lock(lockh);
351         if (!lock) {
352                 /* It's possible that the decref that we did just before this
353                  * cancel was the last reader/writer, and caused a cancel before
354                  * we could call this function.  If we want to make this
355                  * impossible (by adding a dec_and_cancel() or similar), then
356                  * we can put the LBUG back. */
357                 //LBUG();
358                 RETURN(-EINVAL);
359         }
360
361         if (lock->l_connh) {
362                 LDLM_DEBUG(lock, "client-side cancel");
363                 /* Set this flag to prevent others from getting new references*/
364                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
365                 lock->l_flags |= LDLM_FL_CBPENDING;
366                 ldlm_cancel_callback(lock);
367                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
368
369                 req = ptlrpc_prep_req(class_conn2cliimp(lock->l_connh), 
370                                       LDLM_CANCEL, 1, &size, NULL);
371                 if (!req)
372                         GOTO(out, rc = -ENOMEM);
373
374                 body = lustre_msg_buf(req->rq_reqmsg, 0);
375                 memcpy(&body->lock_handle1, &lock->l_remote_handle,
376                        sizeof(body->lock_handle1));
377
378                 req->rq_replen = lustre_msg_size(0, NULL);
379
380                 rc = ptlrpc_queue_wait(req);
381                 rc = ptlrpc_check_status(req, rc);
382                 ptlrpc_free_req(req);
383                 if (rc != ELDLM_OK)
384                         GOTO(out, rc);
385
386                 ldlm_lock_cancel(lock);
387         } else {
388                 LDLM_DEBUG(lock, "client-side local cancel");
389                 if (lock->l_resource->lr_namespace->ns_client) {
390                         CERROR("Trying to cancel local lock\n");
391                         LBUG();
392                 }
393                 ldlm_lock_cancel(lock);
394                 ldlm_reprocess_all(lock->l_resource);
395                 LDLM_DEBUG(lock, "client-side local cancel handler END");
396         }
397
398         EXIT;
399  out:
400         LDLM_LOCK_PUT(lock);
401         return rc;
402 }
403
404 /* Cancel all locks on a given resource that have 0 readers/writers.
405  *
406  * If 'local_only' is true, throw the locks away without trying to notify the
407  * server. */
408 int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, __u64 *res_id,
409                            int local_only)
410 {
411         struct ldlm_resource *res;
412         struct list_head *tmp, *next, list = LIST_HEAD_INIT(list);
413         struct ldlm_ast_work *w;
414         ENTRY;
415
416         res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
417         if (res == NULL)
418                 RETURN(-EINVAL);
419
420         l_lock(&ns->ns_lock);
421         list_for_each(tmp, &res->lr_granted) {
422                 struct ldlm_lock *lock;
423                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
424
425                 if (lock->l_readers || lock->l_writers)
426                         continue;
427
428                 /* Setting the CBPENDING flag is a little misleading, but
429                  * prevents an important race; namely, once CBPENDING is set,
430                  * the lock can accumulate no more readers/writers.  Since
431                  * readers and writers are already zero here, ldlm_lock_decref
432                  * won't see this flag and call l_blocking_ast */
433                 lock->l_flags |= LDLM_FL_CBPENDING;
434
435                 OBD_ALLOC(w, sizeof(*w));
436                 LASSERT(w);
437
438                 w->w_lock = LDLM_LOCK_GET(lock);
439                 list_add(&w->w_list, &list);
440         }
441         l_unlock(&ns->ns_lock);
442
443         list_for_each_safe(tmp, next, &list) {
444                 struct lustre_handle lockh;
445                 int rc;
446                 w = list_entry(tmp, struct ldlm_ast_work, w_list);
447
448                 if (local_only)
449                         ldlm_lock_cancel(w->w_lock);
450                 else {
451                         ldlm_lock2handle(w->w_lock, &lockh);
452                         rc = ldlm_cli_cancel(&lockh);
453                         if (rc != ELDLM_OK)
454                                 CERROR("ldlm_cli_cancel: %d\n", rc);
455                 }
456                 LDLM_LOCK_PUT(w->w_lock);
457                 list_del(&w->w_list);
458                 OBD_FREE(w, sizeof(*w));
459         }
460
461         ldlm_resource_put(res);
462
463         RETURN(0);
464 }