Whamcloud - gitweb
This commit contains probably 92% of the striping infrastructure
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  */
11
12 #define DEBUG_SUBSYSTEM S_LDLM
13
14 #include <linux/lustre_dlm.h>
15
16 int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
17                      struct lustre_handle *connh, 
18                      struct ptlrpc_request *req,
19                      struct ldlm_namespace *ns,
20                      struct lustre_handle *parent_lock_handle,
21                      __u64 *res_id,
22                      __u32 type,
23                      void *cookie, int cookielen,
24                      ldlm_mode_t mode,
25                      int *flags,
26                      ldlm_lock_callback callback,
27                      void *data,
28                      __u32 data_len,
29                      struct lustre_handle *lockh)
30 {
31         struct ldlm_lock *lock;
32         struct ldlm_request *body;
33         struct ldlm_reply *reply;
34         int rc, size = sizeof(*body), req_passed_in = 1;
35         ENTRY;
36
37         *flags = 0;
38         lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type, mode,
39                                 data, data_len);
40         if (lock == NULL)
41                 GOTO(out, rc = -ENOMEM);
42         LDLM_DEBUG(lock, "client-side enqueue START");
43         /* for the local lock, add the reference */
44         ldlm_lock_addref_internal(lock, mode);
45         ldlm_lock2handle(lock, lockh);
46
47         if (req == NULL) {
48                 req = ptlrpc_prep_req2(cl, conn, connh, 
49                                        LDLM_ENQUEUE, 1, &size, NULL);
50                 if (!req)
51                         GOTO(out, rc = -ENOMEM);
52                 req_passed_in = 0;
53         } else if (req->rq_reqmsg->buflens[0] != sizeof(*body))
54                 LBUG();
55
56         /* Dump all of this data into the request buffer */
57         body = lustre_msg_buf(req->rq_reqmsg, 0);
58         ldlm_lock2desc(lock, &body->lock_desc);
59         /* Phil: make this part of ldlm_lock2desc */
60         if (type == LDLM_EXTENT)
61                 memcpy(&body->lock_desc.l_extent, cookie,
62                        sizeof(body->lock_desc.l_extent));
63         body->lock_flags = *flags;
64
65         memcpy(&body->lock_handle1, lockh, sizeof(*lockh));
66         if (parent_lock_handle)
67                 memcpy(&body->lock_handle2, parent_lock_handle,
68                        sizeof(body->lock_handle2));
69
70         /* Continue as normal. */
71         if (!req_passed_in) {
72                 size = sizeof(*reply);
73                 req->rq_replen = lustre_msg_size(1, &size);
74         }
75
76         lock->l_connection = ptlrpc_connection_addref(conn);
77         lock->l_client = cl;
78
79         rc = ptlrpc_queue_wait(req);
80         /* FIXME: status check here? */
81         rc = ptlrpc_check_status(req, rc);
82
83         if (rc != ELDLM_OK) {
84                 LDLM_DEBUG(lock, "client-side enqueue END (%s)",
85                            rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
86                 LDLM_LOCK_PUT(lock);
87                 ldlm_lock_decref(lockh, mode);
88                 /* FIXME: if we've already received a completion AST, this will
89                  * LBUG! */
90                 ldlm_lock_destroy(lock);
91                 GOTO(out, rc);
92         }
93
94         reply = lustre_msg_buf(req->rq_repmsg, 0);
95         memcpy(&lock->l_remote_handle, &reply->lock_handle,
96                sizeof(lock->l_remote_handle));
97         if (type == LDLM_EXTENT)
98                 memcpy(cookie, &reply->lock_extent, sizeof(reply->lock_extent));
99         *flags = reply->lock_flags;
100
101         CDEBUG(D_INFO, "remote handle: %p, flags: %d\n",
102                (void *)(unsigned long)reply->lock_handle.addr, *flags);
103         CDEBUG(D_INFO, "extent: %Lu -> %Lu\n",
104                (unsigned long long)reply->lock_extent.start,
105                (unsigned long long)reply->lock_extent.end);
106
107         /* If enqueue returned a blocked lock but the completion handler has
108          * already run, then it fixed up the resource and we don't need to do it
109          * again. */
110         if ((*flags) & LDLM_FL_LOCK_CHANGED &&
111             lock->l_req_mode != lock->l_granted_mode) {
112                 CDEBUG(D_INFO, "remote intent success, locking %ld instead of"
113                        "%ld\n", (long)reply->lock_resource_name[0],
114                        (long)lock->l_resource->lr_name[0]);
115
116                 ldlm_lock_change_resource(lock, reply->lock_resource_name);
117                 if (lock->l_resource == NULL) {
118                         LBUG();
119                         RETURN(-ENOMEM);
120                 }
121                 LDLM_DEBUG(lock, "client-side enqueue, new resource");
122         }
123
124         if (!req_passed_in)
125                 ptlrpc_free_req(req);
126
127         rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, callback,
128                                callback);
129
130         if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
131                       LDLM_FL_BLOCK_CONV)) {
132                 /* Go to sleep until the lock is granted. */
133                 /* FIXME: or cancelled. */
134                 LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock,"
135                            " sleeping");
136                 ldlm_lock_dump(lock);
137 #warning ldlm needs to time out
138                 wait_event(lock->l_waitq,
139                            lock->l_req_mode == lock->l_granted_mode);
140                 LDLM_DEBUG(lock, "client-side enqueue waking up: granted");
141         }
142         LDLM_DEBUG(lock, "client-side enqueue END");
143         LDLM_LOCK_PUT(lock);
144         EXIT;
145  out:
146         return rc;
147 }
148
149 int ldlm_match_or_enqueue(struct ptlrpc_client *cl,
150                           struct ptlrpc_connection *conn,
151                           struct lustre_handle *connh, 
152                           struct ptlrpc_request *req,
153                           struct ldlm_namespace *ns,
154                           struct lustre_handle *parent_lock_handle,
155                           __u64 *res_id,
156                           __u32 type,
157                           void *cookie, int cookielen,
158                           ldlm_mode_t mode,
159                           int *flags,
160                           ldlm_lock_callback callback,
161                           void *data,
162                           __u32 data_len,
163                           struct lustre_handle *lockh)
164 {
165         int rc;
166         ENTRY;
167         rc = ldlm_lock_match(ns, res_id, type, cookie, cookielen, mode, lockh);
168         if (rc == 0) {
169                 rc = ldlm_cli_enqueue(cl, conn, connh, req, ns,
170                                       parent_lock_handle, res_id, type, cookie,
171                                       cookielen, mode, flags, callback, data,
172                                       data_len, lockh);
173                 if (rc != ELDLM_OK)
174                         CERROR("ldlm_cli_enqueue: err: %d\n", rc);
175                 RETURN(rc);
176         } else
177                 RETURN(0);
178 }
179
180 int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *desc,
181                     void *data, __u32 data_len)
182 {
183         struct ldlm_lock *lock;
184         struct ldlm_request *body;
185         struct ptlrpc_request *req;
186         struct ptlrpc_client *cl;
187         int rc = 0, size = sizeof(*body);
188         ENTRY;
189
190         lock = ldlm_handle2lock(lockh);
191         if (lock == NULL) {
192                 LBUG();
193                 RETURN(-EINVAL);
194         }
195         cl = &lock->l_resource->lr_namespace->ns_rpc_client;
196         req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 1,
197                               &size, NULL);
198         if (!req)
199                 GOTO(out, rc = -ENOMEM);
200
201         body = lustre_msg_buf(req->rq_reqmsg, 0);
202         memcpy(&body->lock_handle1, &lock->l_remote_handle,
203                sizeof(body->lock_handle1));
204
205         if (desc == NULL) {
206                 CDEBUG(D_NET, "Sending granted AST\n");
207                 ldlm_lock2desc(lock, &body->lock_desc);
208         } else {
209                 CDEBUG(D_NET, "Sending blocked AST\n");
210                 memcpy(&body->lock_desc, desc, sizeof(*desc));
211         }
212
213         LDLM_DEBUG(lock, "server preparing %s AST",
214                    desc == 0 ? "completion" : "blocked");
215
216         req->rq_replen = lustre_msg_size(0, NULL);
217
218         rc = ptlrpc_queue_wait(req);
219         rc = ptlrpc_check_status(req, rc);
220         ptlrpc_free_req(req);
221
222         EXIT;
223  out:
224         LDLM_LOCK_PUT(lock);
225         return rc;
226 }
227
228 int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh,
229                      struct lustre_handle *connh, 
230                      int new_mode, int *flags)
231 {
232         struct ldlm_request *body;
233         struct ldlm_reply *reply;
234         struct ldlm_lock *lock;
235         struct ldlm_resource *res;
236         struct ptlrpc_request *req;
237         int rc, size = sizeof(*body);
238         ENTRY;
239
240         lock = ldlm_handle2lock(lockh);
241         if (!lock) {
242                 LBUG();
243                 RETURN(-EINVAL);
244         }
245         *flags = 0;
246
247         LDLM_DEBUG(lock, "client-side convert");
248
249         req = ptlrpc_prep_req(cl, lock->l_connection,
250                                LDLM_CONVERT, 1, &size, NULL);
251         if (!req)
252                 GOTO(out, rc = -ENOMEM);
253
254         body = lustre_msg_buf(req->rq_reqmsg, 0);
255         memcpy(&body->lock_handle1, &lock->l_remote_handle,
256                sizeof(body->lock_handle1));
257
258         body->lock_desc.l_req_mode = new_mode;
259         body->lock_flags = *flags;
260
261         size = sizeof(*reply);
262         req->rq_replen = lustre_msg_size(1, &size);
263
264         rc = ptlrpc_queue_wait(req);
265         rc = ptlrpc_check_status(req, rc);
266         if (rc != ELDLM_OK)
267                 GOTO(out, rc);
268
269         reply = lustre_msg_buf(req->rq_repmsg, 0);
270         res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags);
271         if (res != NULL)
272                 ldlm_reprocess_all(res);
273         if (lock->l_req_mode != lock->l_granted_mode) {
274                 /* Go to sleep until the lock is granted. */
275                 /* FIXME: or cancelled. */
276                 CDEBUG(D_NET, "convert returned a blocked lock, "
277                        "going to sleep.\n");
278                 wait_event(lock->l_waitq,
279                            lock->l_req_mode == lock->l_granted_mode);
280                 CDEBUG(D_NET, "waking up, the lock must be granted.\n");
281         }
282         LDLM_LOCK_PUT(lock);
283         EXIT;
284  out:
285         ptlrpc_free_req(req);
286         return rc;
287 }
288
289 int ldlm_cli_cancel(struct lustre_handle *lockh)
290 {
291         struct ptlrpc_request *req;
292         struct ldlm_lock *lock;
293         struct ldlm_request *body;
294         int rc, size = sizeof(*body);
295         ENTRY;
296
297         lock = ldlm_handle2lock(lockh); 
298         if (!lock) {
299                 /* It's possible that the decref that we did just before this
300                  * cancel was the last reader/writer, and caused a cancel before
301                  * we could call this function.  If we want to make this
302                  * impossible (by adding a dec_and_cancel() or similar), then
303                  * we can put the LBUG back. */
304                 //LBUG();
305                 RETURN(-EINVAL);
306         }
307
308         LDLM_DEBUG(lock, "client-side cancel");
309         req = ptlrpc_prep_req(lock->l_client, lock->l_connection,
310                               LDLM_CANCEL, 1, &size, NULL);
311         if (!req)
312                 GOTO(out, rc = -ENOMEM);
313
314         body = lustre_msg_buf(req->rq_reqmsg, 0);
315         memcpy(&body->lock_handle1, &lock->l_remote_handle,
316                sizeof(body->lock_handle1));
317
318         req->rq_replen = lustre_msg_size(0, NULL);
319
320         rc = ptlrpc_queue_wait(req);
321         rc = ptlrpc_check_status(req, rc);
322         ptlrpc_free_req(req);
323         if (rc != ELDLM_OK)
324                 GOTO(out, rc);
325
326         ldlm_lock_cancel(lock);
327         LDLM_LOCK_PUT(lock); 
328         EXIT;
329  out:
330         return 0;
331 }