Whamcloud - gitweb
- removed lock list from inode info
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  */
11
12 #define DEBUG_SUBSYSTEM S_LDLM
13
14 #include <linux/lustre_dlm.h>
15
16 int ldlm_completion_ast(struct ldlm_lock *lock, int flags)
17 {
18         ENTRY;
19
20         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
21                       LDLM_FL_BLOCK_CONV)) {
22                 /* Go to sleep until the lock is granted. */
23                 /* FIXME: or cancelled. */
24                 LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock,"
25                            " sleeping");
26                 ldlm_lock_dump(lock);
27                 ldlm_reprocess_all(lock->l_resource);
28                 wait_event(lock->l_waitq, (lock->l_req_mode ==
29                                            lock->l_granted_mode));
30                 LDLM_DEBUG(lock, "client-side enqueue waking up: granted");
31         } else if (flags == LDLM_FL_WAIT_NOREPROC) {
32                 wait_event(lock->l_waitq, (lock->l_req_mode ==
33                                            lock->l_granted_mode));
34         } else if (flags == 0) {
35                 wake_up(&lock->l_waitq);
36         }
37
38         RETURN(0);
39 }
40
41 static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
42                                   struct lustre_handle *parent_lockh,
43                                   __u64 *res_id,
44                                   __u32 type,
45                                   void *cookie, int cookielen,
46                                   ldlm_mode_t mode,
47                                   int *flags,
48                                   ldlm_completion_callback completion,
49                                   ldlm_blocking_callback blocking,
50                                   void *data,
51                                   __u32 data_len,
52                                   struct lustre_handle *lockh)
53 {
54         struct ldlm_lock *lock;
55         int err;
56
57         if (ns->ns_client) {
58                 CERROR("Trying to cancel local lock\n");
59                 LBUG();
60         }
61
62         lock = ldlm_lock_create(ns, parent_lockh, res_id, type, mode, NULL, 0);
63         if (!lock)
64                 GOTO(out_nolock, err = -ENOMEM);
65         LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created");
66
67         ldlm_lock_addref_internal(lock, mode);
68         ldlm_lock2handle(lock, lockh);
69         lock->l_connh = NULL;
70
71         err = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion,
72                                 blocking);
73         if (err != ELDLM_OK)
74                 GOTO(out, err);
75
76         if (type == LDLM_EXTENT)
77                 memcpy(cookie, &lock->l_extent, sizeof(lock->l_extent));
78         if ((*flags) & LDLM_FL_LOCK_CHANGED)
79                 memcpy(res_id, lock->l_resource->lr_name, sizeof(*res_id));
80
81         LDLM_DEBUG_NOLOCK("client-side local enqueue handler END (lock %p)",
82                           lock);
83
84         if (lock->l_completion_ast)
85                 lock->l_completion_ast(lock, *flags);
86
87         LDLM_DEBUG(lock, "client-side local enqueue END");
88         EXIT;
89  out:
90         LDLM_LOCK_PUT(lock);
91  out_nolock:
92         return err;
93 }
94
95 int ldlm_cli_enqueue(struct lustre_handle *connh,
96                      struct ptlrpc_request *req,
97                      struct ldlm_namespace *ns,
98                      struct lustre_handle *parent_lock_handle,
99                      __u64 *res_id,
100                      __u32 type,
101                      void *cookie, int cookielen,
102                      ldlm_mode_t mode,
103                      int *flags,
104                      ldlm_completion_callback completion,
105                      ldlm_blocking_callback blocking,
106                      void *data,
107                      __u32 data_len,
108                      struct lustre_handle *lockh)
109 {
110         struct ldlm_lock *lock;
111         struct ldlm_request *body;
112         struct ldlm_reply *reply;
113         int rc, size = sizeof(*body), req_passed_in = 1;
114         ENTRY;
115
116         if (connh == NULL)
117                 return ldlm_cli_enqueue_local(ns, parent_lock_handle, res_id,
118                                               type, cookie, cookielen, mode,
119                                               flags, completion, blocking, data,
120                                               data_len, lockh);
121
122         *flags = 0;
123         lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type, mode,
124                                 data, data_len);
125         if (lock == NULL)
126                 GOTO(out_nolock, rc = -ENOMEM);
127         LDLM_DEBUG(lock, "client-side enqueue START");
128         /* for the local lock, add the reference */
129         ldlm_lock_addref_internal(lock, mode);
130         ldlm_lock2handle(lock, lockh);
131
132         if (req == NULL) {
133                 req = ptlrpc_prep_req2(connh, LDLM_ENQUEUE, 1, &size, NULL);
134                 if (!req)
135                         GOTO(out, rc = -ENOMEM);
136                 req_passed_in = 0;
137         } else if (req->rq_reqmsg->buflens[0] != sizeof(*body))
138                 LBUG();
139
140         /* Dump all of this data into the request buffer */
141         body = lustre_msg_buf(req->rq_reqmsg, 0);
142         ldlm_lock2desc(lock, &body->lock_desc);
143         /* Phil: make this part of ldlm_lock2desc */
144         if (type == LDLM_EXTENT)
145                 memcpy(&body->lock_desc.l_extent, cookie,
146                        sizeof(body->lock_desc.l_extent));
147         body->lock_flags = *flags;
148
149         memcpy(&body->lock_handle1, lockh, sizeof(*lockh));
150         if (parent_lock_handle)
151                 memcpy(&body->lock_handle2, parent_lock_handle,
152                        sizeof(body->lock_handle2));
153
154         /* Continue as normal. */
155         if (!req_passed_in) {
156                 size = sizeof(*reply);
157                 req->rq_replen = lustre_msg_size(1, &size);
158         }
159         lock->l_connh = connh;
160         lock->l_export = NULL;
161         lock->l_client = client_conn2cli(connh)->cl_client;
162
163         rc = ptlrpc_queue_wait(req);
164         /* FIXME: status check here? */
165         rc = ptlrpc_check_status(req, rc);
166
167         if (rc != ELDLM_OK) {
168                 LDLM_DEBUG(lock, "client-side enqueue END (%s)",
169                            rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
170                 ldlm_lock_decref(lockh, mode);
171                 /* FIXME: if we've already received a completion AST, this will
172                  * LBUG! */
173                 ldlm_lock_destroy(lock);
174                 GOTO(out, rc);
175         }
176
177         reply = lustre_msg_buf(req->rq_repmsg, 0);
178         memcpy(&lock->l_remote_handle, &reply->lock_handle,
179                sizeof(lock->l_remote_handle));
180         if (type == LDLM_EXTENT)
181                 memcpy(cookie, &reply->lock_extent, sizeof(reply->lock_extent));
182         *flags = reply->lock_flags;
183
184         CDEBUG(D_INFO, "remote handle: %p, flags: %d\n",
185                (void *)(unsigned long)reply->lock_handle.addr, *flags);
186         CDEBUG(D_INFO, "extent: %Lu -> %Lu\n",
187                (unsigned long long)reply->lock_extent.start,
188                (unsigned long long)reply->lock_extent.end);
189
190         /* If enqueue returned a blocked lock but the completion handler has
191          * already run, then it fixed up the resource and we don't need to do it
192          * again. */
193         if ((*flags) & LDLM_FL_LOCK_CHANGED) {
194                 int newmode = reply->lock_mode;
195                 if (newmode && newmode != lock->l_req_mode) {
196                         LDLM_DEBUG(lock, "server returned different mode %s",
197                                    ldlm_lockname[newmode]);
198                         lock->l_req_mode = newmode;
199                 }
200
201                 if (reply->lock_resource_name[0] !=
202                     lock->l_resource->lr_name[0]) {
203                         CDEBUG(D_INFO, "remote intent success, locking %ld "
204                                "instead of %ld\n",
205                                (long)reply->lock_resource_name[0],
206                                (long)lock->l_resource->lr_name[0]);
207
208                         ldlm_lock_change_resource(lock,
209                                                   reply->lock_resource_name);
210                         if (lock->l_resource == NULL) {
211                                 LBUG();
212                                 RETURN(-ENOMEM);
213                         }
214                         LDLM_DEBUG(lock, "client-side enqueue, new resource");
215                 }
216         }
217
218         if (!req_passed_in)
219                 ptlrpc_free_req(req);
220
221         rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion,
222                                blocking);
223         if (lock->l_completion_ast)
224                 lock->l_completion_ast(lock, *flags);
225
226         LDLM_DEBUG(lock, "client-side enqueue END");
227         EXIT;
228  out:
229         LDLM_LOCK_PUT(lock);
230  out_nolock:
231         return rc;
232 }
233
234 int ldlm_match_or_enqueue(struct lustre_handle *connh,
235                           struct ptlrpc_request *req,
236                           struct ldlm_namespace *ns,
237                           struct lustre_handle *parent_lock_handle,
238                           __u64 *res_id,
239                           __u32 type,
240                           void *cookie, int cookielen,
241                           ldlm_mode_t mode,
242                           int *flags,
243                           ldlm_completion_callback completion,
244                           ldlm_blocking_callback blocking,
245                           void *data,
246                           __u32 data_len,
247                           struct lustre_handle *lockh)
248 {
249         int rc;
250         ENTRY;
251         rc = ldlm_lock_match(ns, res_id, type, cookie, cookielen, mode, lockh);
252         if (rc == 0) {
253                 rc = ldlm_cli_enqueue(connh, req, ns,
254                                       parent_lock_handle, res_id, type, cookie,
255                                       cookielen, mode, flags, completion,
256                                       blocking, data, data_len, lockh);
257                 if (rc != ELDLM_OK)
258                         CERROR("ldlm_cli_enqueue: err: %d\n", rc);
259                 RETURN(rc);
260         } else
261                 RETURN(0);
262 }
263
264 static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode,
265                                   int *flags)
266 {
267
268         if (lock->l_resource->lr_namespace->ns_client) {
269                 CERROR("Trying to cancel local lock\n");
270                 LBUG();
271         }
272         LDLM_DEBUG(lock, "client-side local convert");
273
274         ldlm_lock_convert(lock, new_mode, flags);
275         ldlm_reprocess_all(lock->l_resource);
276
277         LDLM_DEBUG(lock, "client-side local convert handler END");
278         LDLM_LOCK_PUT(lock);
279         RETURN(0);
280 }
281
282 int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags)
283 {
284         struct ldlm_request *body;
285         struct lustre_handle *connh;
286         struct ldlm_reply *reply;
287         struct ldlm_lock *lock;
288         struct ldlm_resource *res;
289         struct ptlrpc_request *req;
290         int rc, size = sizeof(*body);
291         ENTRY;
292
293         lock = ldlm_handle2lock(lockh);
294         if (!lock) {
295                 LBUG();
296                 RETURN(-EINVAL);
297         }
298         *flags = 0;
299         connh = lock->l_connh;
300
301         if (!connh)
302                 return ldlm_cli_convert_local(lock, new_mode, flags);
303
304         LDLM_DEBUG(lock, "client-side convert");
305
306         req = ptlrpc_prep_req2(connh, LDLM_CONVERT, 1, &size, NULL);
307         if (!req)
308                 GOTO(out, rc = -ENOMEM);
309
310         body = lustre_msg_buf(req->rq_reqmsg, 0);
311         memcpy(&body->lock_handle1, &lock->l_remote_handle,
312                sizeof(body->lock_handle1));
313
314         body->lock_desc.l_req_mode = new_mode;
315         body->lock_flags = *flags;
316
317         size = sizeof(*reply);
318         req->rq_replen = lustre_msg_size(1, &size);
319
320         rc = ptlrpc_queue_wait(req);
321         rc = ptlrpc_check_status(req, rc);
322         if (rc != ELDLM_OK)
323                 GOTO(out, rc);
324
325         reply = lustre_msg_buf(req->rq_repmsg, 0);
326         res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags);
327         if (res != NULL)
328                 ldlm_reprocess_all(res);
329         /* Go to sleep until the lock is granted. */
330         /* FIXME: or cancelled. */
331         if (lock->l_completion_ast)
332                 lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC);
333         EXIT;
334  out:
335         LDLM_LOCK_PUT(lock);
336         ptlrpc_free_req(req);
337         return rc;
338 }
339
340 int ldlm_cli_cancel(struct lustre_handle *lockh)
341 {
342         struct ptlrpc_request *req;
343         struct ldlm_lock *lock;
344         struct ldlm_request *body;
345         int rc = 0, size = sizeof(*body);
346         ENTRY;
347
348         lock = ldlm_handle2lock(lockh);
349         if (!lock) {
350                 /* It's possible that the decref that we did just before this
351                  * cancel was the last reader/writer, and caused a cancel before
352                  * we could call this function.  If we want to make this
353                  * impossible (by adding a dec_and_cancel() or similar), then
354                  * we can put the LBUG back. */
355                 //LBUG();
356                 RETURN(-EINVAL);
357         }
358
359         if (lock->l_connh) {
360                 LDLM_DEBUG(lock, "client-side cancel");
361                 /* Set this flag to prevent others from getting new references*/
362                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
363                 lock->l_flags |= LDLM_FL_CBPENDING;
364                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
365
366                 req = ptlrpc_prep_req2(lock->l_connh, LDLM_CANCEL, 1, &size,
367                                        NULL);
368                 if (!req)
369                         GOTO(out, rc = -ENOMEM);
370
371                 body = lustre_msg_buf(req->rq_reqmsg, 0);
372                 memcpy(&body->lock_handle1, &lock->l_remote_handle,
373                        sizeof(body->lock_handle1));
374
375                 req->rq_replen = lustre_msg_size(0, NULL);
376
377                 rc = ptlrpc_queue_wait(req);
378                 rc = ptlrpc_check_status(req, rc);
379                 ptlrpc_free_req(req);
380                 if (rc != ELDLM_OK)
381                         GOTO(out, rc);
382
383                 ldlm_lock_cancel(lock);
384         } else {
385                 LDLM_DEBUG(lock, "client-side local cancel");
386                 if (lock->l_resource->lr_namespace->ns_client) {
387                         CERROR("Trying to cancel local lock\n");
388                         LBUG();
389                 }
390                 ldlm_lock_cancel(lock);
391                 ldlm_reprocess_all(lock->l_resource);
392                 LDLM_DEBUG(lock, "client-side local cancel handler END");
393         }
394
395         EXIT;
396  out:
397         LDLM_LOCK_PUT(lock);
398         return rc;
399 }
400
401 /* Cancel all locks on a given resource that have 0 readers/writers */
402 int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, __u64 *res_id)
403 {
404         struct ldlm_resource *res;
405         struct list_head *tmp, *next, list = LIST_HEAD_INIT(list);
406         struct ldlm_ast_work *w;
407         ENTRY;
408
409         res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
410         if (res == NULL)
411                 RETURN(-ENOMEM);
412
413         l_lock(&ns->ns_lock);
414         list_for_each(tmp, &res->lr_granted) {
415                 struct ldlm_lock *lock;
416                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
417
418                 if (lock->l_readers || lock->l_writers)
419                         continue;
420
421                 /* Setting the CBPENDING flag is a little misleading, but
422                  * prevents an important race; namely, once CBPENDING is set,
423                  * the lock can accumulate no more readers/writers.  Since
424                  * readers and writers are already zero here, ldlm_lock_decref
425                  * won't see this flag and call l_blocking_ast */
426                 lock->l_flags |= LDLM_FL_CBPENDING;
427
428                 OBD_ALLOC(w, sizeof(*w));
429                 LASSERT(w);
430
431                 w->w_lock = LDLM_LOCK_GET(lock);
432                 list_add(&w->w_list, &list);
433         }
434         l_unlock(&ns->ns_lock);
435
436         list_for_each_safe(tmp, next, &list) {
437                 struct lustre_handle lockh;
438                 int rc;
439                 w = list_entry(tmp, struct ldlm_ast_work, w_list);
440
441                 ldlm_lock2handle(w->w_lock, &lockh);
442                 rc = ldlm_cli_cancel(&lockh);
443                 if (rc != ELDLM_OK)
444                         CERROR("ldlm_cli_cancel: %d\n", rc);
445
446                 LDLM_LOCK_PUT(w->w_lock);
447                 list_del(&w->w_list);
448                 OBD_FREE(w, sizeof(*w));
449         }
450
451         RETURN(0);
452 }