Whamcloud - gitweb
- move the peter branch changes to the head
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  */
11
12 #define DEBUG_SUBSYSTEM S_LDLM
13
14 #include <linux/lustre_dlm.h>
15
16 int ldlm_completion_ast(struct ldlm_lock *lock, int flags)
17 {
18
19         ENTRY;
20
21         if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
22                       LDLM_FL_BLOCK_CONV)) {
23                 /* Go to sleep until the lock is granted. */
24                 /* FIXME: or cancelled. */
25                 LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock,"
26                            " sleeping");
27                 ldlm_lock_dump(lock);
28                 ldlm_reprocess_all(lock->l_resource);
29                 wait_event(lock->l_waitq, lock->l_req_mode == lock->l_granted_mode);
30                 LDLM_DEBUG(lock, "client-side enqueue waking up: granted");
31         } else if (flags == LDLM_FL_WAIT_NOREPROC) { 
32                 wait_event(lock->l_waitq, lock->l_req_mode == lock->l_granted_mode);
33         } else if (flags == 0) { 
34                 wake_up(&lock->l_waitq);
35
36         }
37
38         RETURN(0);
39 }
40
41
42 static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
43                                   struct lustre_handle *parent_lockh,
44                                   __u64 *res_id,
45                                   __u32 type,
46                                   void *cookie, int cookielen,
47                                   ldlm_mode_t mode,
48                                   int *flags,
49                                   ldlm_completion_callback completion,
50                                   ldlm_lock_callback callback,
51                                   void *data,
52                                   __u32 data_len,
53                                   struct lustre_handle *lockh)
54 {
55         struct ldlm_lock *lock; 
56         int err; 
57
58         if (ns->ns_client) { 
59                 CERROR("Trying to cancel local lock\n"); 
60                 LBUG();
61         }
62
63         lock = ldlm_lock_create(ns, parent_lockh, res_id, type, mode, NULL, 0);
64         if (!lock)
65                 GOTO(out_nolock, err = -ENOMEM);
66         LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created");
67
68         ldlm_lock_addref_internal(lock, mode);
69         ldlm_lock2handle(lock, lockh);
70         lock->l_connh = NULL; 
71
72         err = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion, callback);
73         if (err != ELDLM_OK)
74                 GOTO(out, err);
75
76         if (type == LDLM_EXTENT)
77                 memcpy(cookie, &lock->l_extent, sizeof(lock->l_extent));
78         if ((*flags) & LDLM_FL_LOCK_CHANGED)
79                 memcpy(res_id, lock->l_resource->lr_name, sizeof(*res_id));
80
81         LDLM_DEBUG_NOLOCK("client-side local enqueue handler END (lock %p)", lock);
82
83         if (lock->l_completion_ast)
84                 lock->l_completion_ast(lock, *flags); 
85
86         LDLM_DEBUG(lock, "client-side local enqueue END");
87         EXIT;
88  out:
89         LDLM_LOCK_PUT(lock);
90  out_nolock:
91         return err;
92 }
93
94
95 int ldlm_cli_enqueue(struct lustre_handle *connh, 
96                      struct ptlrpc_request *req,
97                      struct ldlm_namespace *ns,
98                      struct lustre_handle *parent_lock_handle,
99                      __u64 *res_id,
100                      __u32 type,
101                      void *cookie, int cookielen,
102                      ldlm_mode_t mode,
103                      int *flags,
104                      ldlm_completion_callback completion,
105                      ldlm_lock_callback callback,
106                      void *data,
107                      __u32 data_len,
108                      struct lustre_handle *lockh)
109 {
110         struct ptlrpc_connection *connection;
111         struct ldlm_lock *lock;
112         struct ldlm_request *body;
113         struct ldlm_reply *reply;
114         int rc, size = sizeof(*body), req_passed_in = 1;
115         ENTRY;
116
117         if (connh == NULL) 
118                 return ldlm_cli_enqueue_local(ns, parent_lock_handle, res_id, 
119                                               type, cookie, cookielen, mode, 
120                                               flags, completion, callback, data, data_len, lockh);
121         connection = client_conn2cli(connh)->cl_conn;
122
123         *flags = 0;
124         lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type, mode,
125                                 data, data_len);
126         if (lock == NULL)
127                 GOTO(out_nolock, rc = -ENOMEM);
128         LDLM_DEBUG(lock, "client-side enqueue START");
129         /* for the local lock, add the reference */
130         ldlm_lock_addref_internal(lock, mode);
131         ldlm_lock2handle(lock, lockh);
132
133         if (req == NULL) {
134                 req = ptlrpc_prep_req2(connh, LDLM_ENQUEUE, 1, &size, NULL);
135                 if (!req)
136                         GOTO(out, rc = -ENOMEM);
137                 req_passed_in = 0;
138         } else if (req->rq_reqmsg->buflens[0] != sizeof(*body))
139                 LBUG();
140
141         /* Dump all of this data into the request buffer */
142         body = lustre_msg_buf(req->rq_reqmsg, 0);
143         ldlm_lock2desc(lock, &body->lock_desc);
144         /* Phil: make this part of ldlm_lock2desc */
145         if (type == LDLM_EXTENT)
146                 memcpy(&body->lock_desc.l_extent, cookie,
147                        sizeof(body->lock_desc.l_extent));
148         body->lock_flags = *flags;
149
150         memcpy(&body->lock_handle1, lockh, sizeof(*lockh));
151         if (parent_lock_handle)
152                 memcpy(&body->lock_handle2, parent_lock_handle,
153                        sizeof(body->lock_handle2));
154
155         /* Continue as normal. */
156         if (!req_passed_in) {
157                 size = sizeof(*reply);
158                 req->rq_replen = lustre_msg_size(1, &size);
159         }
160         lock->l_connh = connh; 
161         lock->l_connection = ptlrpc_connection_addref(connection);
162         lock->l_client = client_conn2cli(connh)->cl_client;
163
164         rc = ptlrpc_queue_wait(req);
165         /* FIXME: status check here? */
166         rc = ptlrpc_check_status(req, rc);
167
168         if (rc != ELDLM_OK) {
169                 LDLM_DEBUG(lock, "client-side enqueue END (%s)",
170                            rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
171                 ldlm_lock_decref(lockh, mode);
172                 /* FIXME: if we've already received a completion AST, this will
173                  * LBUG! */
174                 ldlm_lock_destroy(lock);
175                 GOTO(out, rc);
176         }
177
178         reply = lustre_msg_buf(req->rq_repmsg, 0);
179         memcpy(&lock->l_remote_handle, &reply->lock_handle,
180                sizeof(lock->l_remote_handle));
181         if (type == LDLM_EXTENT)
182                 memcpy(cookie, &reply->lock_extent, sizeof(reply->lock_extent));
183         *flags = reply->lock_flags;
184
185         CDEBUG(D_INFO, "remote handle: %p, flags: %d\n",
186                (void *)(unsigned long)reply->lock_handle.addr, *flags);
187         CDEBUG(D_INFO, "extent: %Lu -> %Lu\n",
188                (unsigned long long)reply->lock_extent.start,
189                (unsigned long long)reply->lock_extent.end);
190
191         /* If enqueue returned a blocked lock but the completion handler has
192          * already run, then it fixed up the resource and we don't need to do it
193          * again. */
194         if ((*flags) & LDLM_FL_LOCK_CHANGED &&
195             lock->l_req_mode != lock->l_granted_mode) {
196                 CDEBUG(D_INFO, "remote intent success, locking %ld instead of"
197                        "%ld\n", (long)reply->lock_resource_name[0],
198                        (long)lock->l_resource->lr_name[0]);
199
200                 ldlm_lock_change_resource(lock, reply->lock_resource_name);
201                 if (lock->l_resource == NULL) {
202                         LBUG();
203                         RETURN(-ENOMEM);
204                 }
205                 LDLM_DEBUG(lock, "client-side enqueue, new resource");
206         }
207
208         if (!req_passed_in)
209                 ptlrpc_free_req(req);
210
211         rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion,
212                                callback);
213         if (lock->l_completion_ast)
214                 lock->l_completion_ast(lock, *flags); 
215
216         LDLM_DEBUG(lock, "client-side enqueue END");
217         EXIT;
218  out:
219         LDLM_LOCK_PUT(lock);
220  out_nolock:
221         return rc;
222 }
223
224 int ldlm_match_or_enqueue(struct lustre_handle *connh, 
225                           struct ptlrpc_request *req,
226                           struct ldlm_namespace *ns,
227                           struct lustre_handle *parent_lock_handle,
228                           __u64 *res_id,
229                           __u32 type,
230                           void *cookie, int cookielen,
231                           ldlm_mode_t mode,
232                           int *flags,
233                           ldlm_completion_callback completion,
234                           ldlm_lock_callback callback,
235                           void *data,
236                           __u32 data_len,
237                           struct lustre_handle *lockh)
238 {
239         int rc;
240         ENTRY;
241         rc = ldlm_lock_match(ns, res_id, type, cookie, cookielen, mode, lockh);
242         if (rc == 0) {
243                 rc = ldlm_cli_enqueue(connh, req, ns,
244                                       parent_lock_handle, res_id, type, cookie,
245                                       cookielen, mode, flags, completion, callback, data,
246                                       data_len, lockh);
247                 if (rc != ELDLM_OK)
248                         CERROR("ldlm_cli_enqueue: err: %d\n", rc);
249                 RETURN(rc);
250         } else
251                 RETURN(0);
252 }
253
254 int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *desc,
255                     void *data, __u32 data_len)
256 {
257         struct ldlm_lock *lock;
258         struct ldlm_request *body;
259         struct ptlrpc_request *req;
260         struct ptlrpc_client *cl;
261         int rc = 0, size = sizeof(*body);
262         ENTRY;
263
264         lock = ldlm_handle2lock(lockh);
265         if (lock == NULL) {
266                 LBUG();
267                 RETURN(-EINVAL);
268         }
269         cl = &lock->l_resource->lr_namespace->ns_rpc_client;
270         req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 1,
271                               &size, NULL);
272         if (!req)
273                 GOTO(out, rc = -ENOMEM);
274
275         body = lustre_msg_buf(req->rq_reqmsg, 0);
276         memcpy(&body->lock_handle1, &lock->l_remote_handle,
277                sizeof(body->lock_handle1));
278
279         if (desc == NULL) {
280                 CDEBUG(D_NET, "Sending granted AST\n");
281                 ldlm_lock2desc(lock, &body->lock_desc);
282         } else {
283                 CDEBUG(D_NET, "Sending blocked AST\n");
284                 memcpy(&body->lock_desc, desc, sizeof(*desc));
285         }
286
287         LDLM_DEBUG(lock, "server preparing %s AST",
288                    desc == 0 ? "completion" : "blocked");
289
290         req->rq_replen = lustre_msg_size(0, NULL);
291
292         rc = ptlrpc_queue_wait(req);
293         rc = ptlrpc_check_status(req, rc);
294         ptlrpc_free_req(req);
295
296         EXIT;
297  out:
298         LDLM_LOCK_PUT(lock);
299         return rc;
300 }
301
302
303
304 static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode, int *flags)
305 {
306
307         if (lock->l_resource->lr_namespace->ns_client) { 
308                 CERROR("Trying to cancel local lock\n"); 
309                 LBUG();
310         }
311         LDLM_DEBUG(lock, "client-side local convert");
312
313         ldlm_lock_convert(lock, new_mode, flags); 
314         ldlm_reprocess_all(lock->l_resource);
315
316         LDLM_DEBUG(lock, "client-side local convert handler END");
317         LDLM_LOCK_PUT(lock);
318         RETURN(0);
319 }
320
321 int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags)
322 {
323         struct ldlm_request *body;
324         struct lustre_handle *connh;
325         struct ldlm_reply *reply;
326         struct ldlm_lock *lock;
327         struct ldlm_resource *res;
328         struct ptlrpc_request *req;
329         int rc, size = sizeof(*body);
330         ENTRY;
331
332         lock = ldlm_handle2lock(lockh);
333         if (!lock) {
334                 LBUG();
335                 RETURN(-EINVAL);
336         }
337         *flags = 0;
338         connh = lock->l_connh; 
339
340         if (!connh)
341                 return ldlm_cli_convert_local(lock, new_mode, flags);
342
343         LDLM_DEBUG(lock, "client-side convert");
344
345         req = ptlrpc_prep_req2(connh, LDLM_CONVERT, 1, &size, NULL);
346         if (!req)
347                 GOTO(out, rc = -ENOMEM);
348
349         body = lustre_msg_buf(req->rq_reqmsg, 0);
350         memcpy(&body->lock_handle1, &lock->l_remote_handle,
351                sizeof(body->lock_handle1));
352
353         body->lock_desc.l_req_mode = new_mode;
354         body->lock_flags = *flags;
355
356         size = sizeof(*reply);
357         req->rq_replen = lustre_msg_size(1, &size);
358
359         rc = ptlrpc_queue_wait(req);
360         rc = ptlrpc_check_status(req, rc);
361         if (rc != ELDLM_OK)
362                 GOTO(out, rc);
363
364         reply = lustre_msg_buf(req->rq_repmsg, 0);
365         res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags);
366         if (res != NULL)
367                 ldlm_reprocess_all(res);
368         /* Go to sleep until the lock is granted. */
369         /* FIXME: or cancelled. */
370         if (lock->l_completion_ast)
371                 lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC); 
372         EXIT;
373  out:
374         LDLM_LOCK_PUT(lock);
375         ptlrpc_free_req(req);
376         return rc;
377 }
378
379 int ldlm_cli_cancel(struct lustre_handle *lockh)
380 {
381         struct ptlrpc_request *req;
382         struct ldlm_lock *lock;
383         struct ldlm_request *body;
384         int rc = 0, size = sizeof(*body);
385         ENTRY;
386
387         lock = ldlm_handle2lock(lockh); 
388         if (!lock) {
389                 /* It's possible that the decref that we did just before this
390                  * cancel was the last reader/writer, and caused a cancel before
391                  * we could call this function.  If we want to make this
392                  * impossible (by adding a dec_and_cancel() or similar), then
393                  * we can put the LBUG back. */
394                 //LBUG();
395                 RETURN(-EINVAL);
396         }
397
398         if (lock->l_connh) { 
399                 LDLM_DEBUG(lock, "client-side cancel");
400                 req = ptlrpc_prep_req2(lock->l_connh, LDLM_CANCEL, 1, &size, NULL);
401                 if (!req)
402                         GOTO(out, rc = -ENOMEM);
403                 
404                 body = lustre_msg_buf(req->rq_reqmsg, 0);
405                 memcpy(&body->lock_handle1, &lock->l_remote_handle,
406                        sizeof(body->lock_handle1));
407                 
408                 req->rq_replen = lustre_msg_size(0, NULL);
409                 
410                 rc = ptlrpc_queue_wait(req);
411                 rc = ptlrpc_check_status(req, rc);
412                 ptlrpc_free_req(req);
413                 if (rc != ELDLM_OK)
414                         GOTO(out, rc);
415                 
416                 ldlm_lock_cancel(lock);
417         } else { 
418                 LDLM_DEBUG(lock, "client-side local cancel");
419                 if (lock->l_resource->lr_namespace->ns_client) { 
420                         CERROR("Trying to cancel local lock\n"); 
421                         LBUG();
422                 }
423                 ldlm_lock_cancel(lock); 
424                 ldlm_reprocess_all(lock->l_resource); 
425                 LDLM_DEBUG(lock, "client-side local cancel handler END");
426         }
427
428         EXIT;
429  out:
430         LDLM_LOCK_PUT(lock);
431         return rc;
432 }