Whamcloud - gitweb
- Added match_or_enqueue helper function
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  */
11
12 #define DEBUG_SUBSYSTEM S_LDLM
13
14 #include <linux/lustre_dlm.h>
15
16 int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
17                      struct lustre_handle *connh, 
18                      struct ptlrpc_request *req,
19                      struct ldlm_namespace *ns,
20                      struct lustre_handle *parent_lock_handle,
21                      __u64 *res_id,
22                      __u32 type,
23                      void *cookie, int cookielen,
24                      ldlm_mode_t mode,
25                      int *flags,
26                      ldlm_lock_callback callback,
27                      void *data,
28                      __u32 data_len,
29                      struct lustre_handle *lockh)
30 {
31         struct ldlm_lock *lock;
32         struct ldlm_request *body;
33         struct ldlm_reply *reply;
34         int rc, size = sizeof(*body), req_passed_in = 1;
35         ENTRY;
36
37         *flags = 0;
38         lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type, mode,
39                                 data, data_len);
40         if (lock == NULL)
41                 GOTO(out, rc = -ENOMEM);
42         LDLM_DEBUG(lock, "client-side enqueue START");
43         /* for the local lock, add the reference */
44         ldlm_lock_addref_internal(lock, mode);
45         ldlm_lock2handle(lock, lockh);
46
47         if (req == NULL) {
48                 req = ptlrpc_prep_req2(cl, conn, connh, 
49                                        LDLM_ENQUEUE, 1, &size, NULL);
50                 if (!req)
51                         GOTO(out, rc = -ENOMEM);
52                 req_passed_in = 0;
53         } else if (req->rq_reqmsg->buflens[0] != sizeof(*body))
54                 LBUG();
55
56         /* Dump all of this data into the request buffer */
57         body = lustre_msg_buf(req->rq_reqmsg, 0);
58         ldlm_lock2desc(lock, &body->lock_desc);
59         /* Phil: make this part of ldlm_lock2desc */
60         if (type == LDLM_EXTENT)
61                 memcpy(&body->lock_desc.l_extent, cookie,
62                        sizeof(body->lock_desc.l_extent));
63         body->lock_flags = *flags;
64
65         memcpy(&body->lock_handle1, lockh, sizeof(*lockh));
66         if (parent_lock_handle)
67                 memcpy(&body->lock_handle2, parent_lock_handle,
68                        sizeof(body->lock_handle2));
69
70         /* Continue as normal. */
71         if (!req_passed_in) {
72                 size = sizeof(*reply);
73                 req->rq_replen = lustre_msg_size(1, &size);
74         }
75
76         lock->l_connection = ptlrpc_connection_addref(conn);
77         lock->l_client = cl;
78
79         rc = ptlrpc_queue_wait(req);
80         /* FIXME: status check here? */
81         rc = ptlrpc_check_status(req, rc);
82
83         if (rc != ELDLM_OK) {
84                 LDLM_DEBUG(lock, "client-side enqueue END (%s)",
85                            rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
86                 LDLM_LOCK_PUT(lock);
87                 ldlm_lock_decref(lockh, mode);
88                 /* FIXME: if we've already received a completion AST, this will
89                  * LBUG! */
90                 ldlm_lock_destroy(lock);
91                 GOTO(out, rc);
92         }
93
94         reply = lustre_msg_buf(req->rq_repmsg, 0);
95         memcpy(&lock->l_remote_handle, &reply->lock_handle,
96                sizeof(lock->l_remote_handle));
97         if (type == LDLM_EXTENT)
98                 memcpy(cookie, &reply->lock_extent, sizeof(reply->lock_extent));
99         *flags = reply->lock_flags;
100
101         CDEBUG(D_INFO, "remote handle: %p, flags: %d\n",
102                (void *)(unsigned long)reply->lock_handle.addr, *flags);
103         CDEBUG(D_INFO, "extent: %Lu -> %Lu\n",
104                (unsigned long long)reply->lock_extent.start,
105                (unsigned long long)reply->lock_extent.end);
106
107         /* If enqueue returned a blocked lock but the completion handler has
108          * already run, then it fixed up the resource and we don't need to do it
109          * again. */
110         if ((*flags) & LDLM_FL_LOCK_CHANGED &&
111             lock->l_req_mode != lock->l_granted_mode) {
112                 CDEBUG(D_INFO, "remote intent success, locking %ld instead of"
113                        "%ld\n", (long)reply->lock_resource_name[0],
114                        (long)lock->l_resource->lr_name[0]);
115
116                 ldlm_lock_change_resource(lock, reply->lock_resource_name);
117                 if (lock->l_resource == NULL) {
118                         LBUG();
119                         RETURN(-ENOMEM);
120                 }
121                 LDLM_DEBUG(lock, "client-side enqueue, new resource");
122         }
123
124         if (!req_passed_in)
125                 ptlrpc_free_req(req);
126
127         rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, callback,
128                                callback);
129
130         if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
131                       LDLM_FL_BLOCK_CONV)) {
132                 /* Go to sleep until the lock is granted. */
133                 /* FIXME: or cancelled. */
134                 LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock,"
135                            " sleeping");
136                 ldlm_lock_dump(lock);
137                 wait_event(lock->l_waitq,
138                            lock->l_req_mode == lock->l_granted_mode);
139                 LDLM_DEBUG(lock, "client-side enqueue waking up: granted");
140         }
141         LDLM_DEBUG(lock, "client-side enqueue END");
142         LDLM_LOCK_PUT(lock);
143         EXIT;
144  out:
145         return rc;
146 }
147
148 int ldlm_match_or_enqueue(struct ptlrpc_client *cl,
149                           struct ptlrpc_connection *conn,
150                           struct lustre_handle *connh, 
151                           struct ptlrpc_request *req,
152                           struct ldlm_namespace *ns,
153                           struct lustre_handle *parent_lock_handle,
154                           __u64 *res_id,
155                           __u32 type,
156                           void *cookie, int cookielen,
157                           ldlm_mode_t mode,
158                           int *flags,
159                           ldlm_lock_callback callback,
160                           void *data,
161                           __u32 data_len,
162                           struct lustre_handle *lockh)
163 {
164         int rc;
165         ENTRY;
166         rc = ldlm_lock_match(ns, res_id, type, cookie, cookielen, mode, lockh);
167         if (rc == 0) {
168                 rc = ldlm_cli_enqueue(cl, conn, connh, req, ns,
169                                       parent_lock_handle, res_id, type, cookie,
170                                       cookielen, mode, flags, callback, data,
171                                       data_len, lockh);
172                 if (rc != ELDLM_OK)
173                         CERROR("ldlm_cli_enqueue: err: %d\n", rc);
174                 RETURN(rc);
175         } else
176                 RETURN(0);
177 }
178
179 int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *desc,
180                     void *data, __u32 data_len)
181 {
182         struct ldlm_lock *lock;
183         struct ldlm_request *body;
184         struct ptlrpc_request *req;
185         struct ptlrpc_client *cl;
186         int rc = 0, size = sizeof(*body);
187         ENTRY;
188
189         lock = ldlm_handle2lock(lockh);
190         if (lock == NULL) {
191                 LBUG();
192                 RETURN(-EINVAL);
193         }
194         cl = &lock->l_resource->lr_namespace->ns_rpc_client;
195         req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 1,
196                               &size, NULL);
197         if (!req)
198                 GOTO(out, rc = -ENOMEM);
199
200         body = lustre_msg_buf(req->rq_reqmsg, 0);
201         memcpy(&body->lock_handle1, &lock->l_remote_handle,
202                sizeof(body->lock_handle1));
203
204         if (desc == NULL) {
205                 CDEBUG(D_NET, "Sending granted AST\n");
206                 ldlm_lock2desc(lock, &body->lock_desc);
207         } else {
208                 CDEBUG(D_NET, "Sending blocked AST\n");
209                 memcpy(&body->lock_desc, desc, sizeof(*desc));
210         }
211
212         LDLM_DEBUG(lock, "server preparing %s AST",
213                    desc == 0 ? "completion" : "blocked");
214
215         req->rq_replen = lustre_msg_size(0, NULL);
216
217         rc = ptlrpc_queue_wait(req);
218         rc = ptlrpc_check_status(req, rc);
219         ptlrpc_free_req(req);
220
221         EXIT;
222  out:
223         LDLM_LOCK_PUT(lock);
224         return rc;
225 }
226
227 int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh,
228                      struct lustre_handle *connh, 
229                      int new_mode, int *flags)
230 {
231         struct ldlm_request *body;
232         struct ldlm_reply *reply;
233         struct ldlm_lock *lock;
234         struct ldlm_resource *res;
235         struct ptlrpc_request *req;
236         int rc, size = sizeof(*body);
237         ENTRY;
238
239         lock = ldlm_handle2lock(lockh);
240         if (!lock) {
241                 LBUG();
242                 RETURN(-EINVAL);
243         }
244         *flags = 0;
245
246         LDLM_DEBUG(lock, "client-side convert");
247
248         req = ptlrpc_prep_req(cl, lock->l_connection,
249                                LDLM_CONVERT, 1, &size, NULL);
250         if (!req)
251                 GOTO(out, rc = -ENOMEM);
252
253         body = lustre_msg_buf(req->rq_reqmsg, 0);
254         memcpy(&body->lock_handle1, &lock->l_remote_handle,
255                sizeof(body->lock_handle1));
256
257         body->lock_desc.l_req_mode = new_mode;
258         body->lock_flags = *flags;
259
260         size = sizeof(*reply);
261         req->rq_replen = lustre_msg_size(1, &size);
262
263         rc = ptlrpc_queue_wait(req);
264         rc = ptlrpc_check_status(req, rc);
265         if (rc != ELDLM_OK)
266                 GOTO(out, rc);
267
268         reply = lustre_msg_buf(req->rq_repmsg, 0);
269         res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags);
270         if (res != NULL)
271                 ldlm_reprocess_all(res);
272         if (lock->l_req_mode != lock->l_granted_mode) {
273                 /* Go to sleep until the lock is granted. */
274                 /* FIXME: or cancelled. */
275                 CDEBUG(D_NET, "convert returned a blocked lock, "
276                        "going to sleep.\n");
277                 wait_event(lock->l_waitq,
278                            lock->l_req_mode == lock->l_granted_mode);
279                 CDEBUG(D_NET, "waking up, the lock must be granted.\n");
280         }
281         LDLM_LOCK_PUT(lock);
282         EXIT;
283  out:
284         ptlrpc_free_req(req);
285         return rc;
286 }
287
288 int ldlm_cli_cancel(struct lustre_handle *lockh)
289 {
290         struct ptlrpc_request *req;
291         struct ldlm_lock *lock;
292         struct ldlm_request *body;
293         int rc, size = sizeof(*body);
294         ENTRY;
295
296         lock = ldlm_handle2lock(lockh); 
297         if (!lock) {
298                 /* It's possible that the decref that we did just before this
299                  * cancel was the last reader/writer, and caused a cancel before
300                  * we could call this function.  If we want to make this
301                  * impossible (by adding a dec_and_cancel() or similar), then
302                  * we can put the LBUG back. */
303                 //LBUG();
304                 RETURN(-EINVAL);
305         }
306
307         LDLM_DEBUG(lock, "client-side cancel");
308         req = ptlrpc_prep_req(lock->l_client, lock->l_connection,
309                               LDLM_CANCEL, 1, &size, NULL);
310         if (!req)
311                 GOTO(out, rc = -ENOMEM);
312
313         body = lustre_msg_buf(req->rq_reqmsg, 0);
314         memcpy(&body->lock_handle1, &lock->l_remote_handle,
315                sizeof(body->lock_handle1));
316
317         req->rq_replen = lustre_msg_size(0, NULL);
318
319         rc = ptlrpc_queue_wait(req);
320         rc = ptlrpc_check_status(req, rc);
321         ptlrpc_free_req(req);
322         if (rc != ELDLM_OK)
323                 GOTO(out, rc);
324
325         ldlm_lock_cancel(lock);
326         LDLM_LOCK_PUT(lock); 
327         EXIT;
328  out:
329         return 0;
330 }