Whamcloud - gitweb
More connection-level hackery for the DLM
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  */
11
12 #define DEBUG_SUBSYSTEM S_LDLM
13
14 #include <linux/lustre_dlm.h>
15
16 int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
17                      struct lustre_handle *connh, 
18                      struct ptlrpc_request *req,
19                      struct ldlm_namespace *ns,
20                      struct lustre_handle *parent_lock_handle,
21                      __u64 *res_id,
22                      __u32 type,
23                      void *cookie, int cookielen,
24                      ldlm_mode_t mode,
25                      int *flags,
26                      ldlm_lock_callback callback,
27                      void *data,
28                      __u32 data_len,
29                      struct lustre_handle *lockh)
30 {
31         struct ldlm_lock *lock;
32         struct ldlm_request *body;
33         struct ldlm_reply *reply;
34         int rc, size = sizeof(*body), req_passed_in = 1;
35         ENTRY;
36
37         *flags = 0;
38         lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type, mode,
39                                 data, data_len);
40         if (lock == NULL)
41                 GOTO(out, rc = -ENOMEM);
42         LDLM_DEBUG(lock, "client-side enqueue START");
43         /* for the local lock, add the reference */
44         ldlm_lock_addref_internal(lock, mode);
45         ldlm_lock2handle(lock, lockh);
46
47         if (req == NULL) {
48                 req = ptlrpc_prep_req2(cl, conn, connh, 
49                                        LDLM_ENQUEUE, 1, &size, NULL);
50                 if (!req)
51                         GOTO(out, rc = -ENOMEM);
52                 req_passed_in = 0;
53         } else if (req->rq_reqmsg->buflens[0] != sizeof(*body))
54                 LBUG();
55
56         /* Dump all of this data into the request buffer */
57         body = lustre_msg_buf(req->rq_reqmsg, 0);
58         ldlm_lock2desc(lock, &body->lock_desc);
59         /* Phil: make this part of ldlm_lock2desc */
60         if (type == LDLM_EXTENT)
61                 memcpy(&body->lock_desc.l_extent, cookie,
62                        sizeof(body->lock_desc.l_extent));
63         body->lock_flags = *flags;
64
65         memcpy(&body->lock_handle1, lockh, sizeof(*lockh));
66         if (parent_lock_handle)
67                 memcpy(&body->lock_handle2, parent_lock_handle,
68                        sizeof(body->lock_handle2));
69
70         /* Continue as normal. */
71         if (!req_passed_in) {
72                 size = sizeof(*reply);
73                 req->rq_replen = lustre_msg_size(1, &size);
74         }
75
76         lock->l_connection = ptlrpc_connection_addref(conn);
77         lock->l_client = cl;
78
79         /* I apologize in advance for what I am about to do.
80          *
81          * The MDS needs to send lock requests to itself, but it doesn't want to
82          * go through the whole hassle of setting up proper connections.  Soon,
83          * these will just be function calls, but until I have the time, just
84          * set this connection to level FULL so that they go through.
85          *
86          * Since the MDS is the only user of PLAIN locks right now, we can use
87          * that to distinguish.  Sorry. */
88         if (type == LDLM_PLAIN)
89                 conn->c_level = LUSTRE_CONN_FULL;
90
91         rc = ptlrpc_queue_wait(req);
92         /* FIXME: status check here? */
93         rc = ptlrpc_check_status(req, rc);
94
95         if (rc != ELDLM_OK) {
96                 LDLM_DEBUG(lock, "client-side enqueue END (%s)",
97                            rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
98                 LDLM_LOCK_PUT(lock);
99                 ldlm_lock_decref(lockh, mode);
100                 /* FIXME: if we've already received a completion AST, this will
101                  * LBUG! */
102                 ldlm_lock_destroy(lock);
103                 GOTO(out, rc);
104         }
105
106         reply = lustre_msg_buf(req->rq_repmsg, 0);
107         memcpy(&lock->l_remote_handle, &reply->lock_handle,
108                sizeof(lock->l_remote_handle));
109         if (type == LDLM_EXTENT)
110                 memcpy(cookie, &reply->lock_extent, sizeof(reply->lock_extent));
111         *flags = reply->lock_flags;
112
113         CDEBUG(D_INFO, "remote handle: %p, flags: %d\n",
114                (void *)(unsigned long)reply->lock_handle.addr, *flags);
115         CDEBUG(D_INFO, "extent: %Lu -> %Lu\n",
116                (unsigned long long)reply->lock_extent.start,
117                (unsigned long long)reply->lock_extent.end);
118
119         /* If enqueue returned a blocked lock but the completion handler has
120          * already run, then it fixed up the resource and we don't need to do it
121          * again. */
122         if ((*flags) & LDLM_FL_LOCK_CHANGED &&
123             lock->l_req_mode != lock->l_granted_mode) {
124                 CDEBUG(D_INFO, "remote intent success, locking %ld instead of"
125                        "%ld\n", (long)reply->lock_resource_name[0],
126                        (long)lock->l_resource->lr_name[0]);
127
128                 ldlm_lock_change_resource(lock, reply->lock_resource_name);
129                 if (lock->l_resource == NULL) {
130                         LBUG();
131                         RETURN(-ENOMEM);
132                 }
133                 LDLM_DEBUG(lock, "client-side enqueue, new resource");
134         }
135
136         if (!req_passed_in)
137                 ptlrpc_free_req(req);
138
139         rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, callback,
140                                callback);
141
142         if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
143                       LDLM_FL_BLOCK_CONV)) {
144                 /* Go to sleep until the lock is granted. */
145                 /* FIXME: or cancelled. */
146                 LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock,"
147                            " sleeping");
148                 ldlm_lock_dump(lock);
149 #warning ldlm needs to time out
150                 wait_event(lock->l_waitq,
151                            lock->l_req_mode == lock->l_granted_mode);
152                 LDLM_DEBUG(lock, "client-side enqueue waking up: granted");
153         }
154         LDLM_DEBUG(lock, "client-side enqueue END");
155         LDLM_LOCK_PUT(lock);
156         EXIT;
157  out:
158         return rc;
159 }
160
161 int ldlm_match_or_enqueue(struct ptlrpc_client *cl,
162                           struct ptlrpc_connection *conn,
163                           struct lustre_handle *connh, 
164                           struct ptlrpc_request *req,
165                           struct ldlm_namespace *ns,
166                           struct lustre_handle *parent_lock_handle,
167                           __u64 *res_id,
168                           __u32 type,
169                           void *cookie, int cookielen,
170                           ldlm_mode_t mode,
171                           int *flags,
172                           ldlm_lock_callback callback,
173                           void *data,
174                           __u32 data_len,
175                           struct lustre_handle *lockh)
176 {
177         int rc;
178         ENTRY;
179         rc = ldlm_lock_match(ns, res_id, type, cookie, cookielen, mode, lockh);
180         if (rc == 0) {
181                 rc = ldlm_cli_enqueue(cl, conn, connh, req, ns,
182                                       parent_lock_handle, res_id, type, cookie,
183                                       cookielen, mode, flags, callback, data,
184                                       data_len, lockh);
185                 if (rc != ELDLM_OK)
186                         CERROR("ldlm_cli_enqueue: err: %d\n", rc);
187                 RETURN(rc);
188         } else
189                 RETURN(0);
190 }
191
192 int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *desc,
193                     void *data, __u32 data_len)
194 {
195         struct ldlm_lock *lock;
196         struct ldlm_request *body;
197         struct ptlrpc_request *req;
198         struct ptlrpc_client *cl;
199         int rc = 0, size = sizeof(*body);
200         ENTRY;
201
202         lock = ldlm_handle2lock(lockh);
203         if (lock == NULL) {
204                 LBUG();
205                 RETURN(-EINVAL);
206         }
207         cl = &lock->l_resource->lr_namespace->ns_rpc_client;
208         req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 1,
209                               &size, NULL);
210         if (!req)
211                 GOTO(out, rc = -ENOMEM);
212
213         body = lustre_msg_buf(req->rq_reqmsg, 0);
214         memcpy(&body->lock_handle1, &lock->l_remote_handle,
215                sizeof(body->lock_handle1));
216
217         if (desc == NULL) {
218                 CDEBUG(D_NET, "Sending granted AST\n");
219                 ldlm_lock2desc(lock, &body->lock_desc);
220         } else {
221                 CDEBUG(D_NET, "Sending blocked AST\n");
222                 memcpy(&body->lock_desc, desc, sizeof(*desc));
223         }
224
225         LDLM_DEBUG(lock, "server preparing %s AST",
226                    desc == 0 ? "completion" : "blocked");
227
228         req->rq_replen = lustre_msg_size(0, NULL);
229
230         rc = ptlrpc_queue_wait(req);
231         rc = ptlrpc_check_status(req, rc);
232         ptlrpc_free_req(req);
233
234         EXIT;
235  out:
236         LDLM_LOCK_PUT(lock);
237         return rc;
238 }
239
240 int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh,
241                      struct lustre_handle *connh, 
242                      int new_mode, int *flags)
243 {
244         struct ldlm_request *body;
245         struct ldlm_reply *reply;
246         struct ldlm_lock *lock;
247         struct ldlm_resource *res;
248         struct ptlrpc_request *req;
249         int rc, size = sizeof(*body);
250         ENTRY;
251
252         lock = ldlm_handle2lock(lockh);
253         if (!lock) {
254                 LBUG();
255                 RETURN(-EINVAL);
256         }
257         *flags = 0;
258
259         LDLM_DEBUG(lock, "client-side convert");
260
261         req = ptlrpc_prep_req(cl, lock->l_connection,
262                                LDLM_CONVERT, 1, &size, NULL);
263         if (!req)
264                 GOTO(out, rc = -ENOMEM);
265
266         body = lustre_msg_buf(req->rq_reqmsg, 0);
267         memcpy(&body->lock_handle1, &lock->l_remote_handle,
268                sizeof(body->lock_handle1));
269
270         body->lock_desc.l_req_mode = new_mode;
271         body->lock_flags = *flags;
272
273         size = sizeof(*reply);
274         req->rq_replen = lustre_msg_size(1, &size);
275
276         rc = ptlrpc_queue_wait(req);
277         rc = ptlrpc_check_status(req, rc);
278         if (rc != ELDLM_OK)
279                 GOTO(out, rc);
280
281         reply = lustre_msg_buf(req->rq_repmsg, 0);
282         res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags);
283         if (res != NULL)
284                 ldlm_reprocess_all(res);
285         if (lock->l_req_mode != lock->l_granted_mode) {
286                 /* Go to sleep until the lock is granted. */
287                 /* FIXME: or cancelled. */
288                 CDEBUG(D_NET, "convert returned a blocked lock, "
289                        "going to sleep.\n");
290                 wait_event(lock->l_waitq,
291                            lock->l_req_mode == lock->l_granted_mode);
292                 CDEBUG(D_NET, "waking up, the lock must be granted.\n");
293         }
294         LDLM_LOCK_PUT(lock);
295         EXIT;
296  out:
297         ptlrpc_free_req(req);
298         return rc;
299 }
300
301 int ldlm_cli_cancel(struct lustre_handle *lockh)
302 {
303         struct ptlrpc_request *req;
304         struct ldlm_lock *lock;
305         struct ldlm_request *body;
306         int rc, size = sizeof(*body);
307         ENTRY;
308
309         lock = ldlm_handle2lock(lockh); 
310         if (!lock) {
311                 /* It's possible that the decref that we did just before this
312                  * cancel was the last reader/writer, and caused a cancel before
313                  * we could call this function.  If we want to make this
314                  * impossible (by adding a dec_and_cancel() or similar), then
315                  * we can put the LBUG back. */
316                 //LBUG();
317                 RETURN(-EINVAL);
318         }
319
320         LDLM_DEBUG(lock, "client-side cancel");
321         req = ptlrpc_prep_req(lock->l_client, lock->l_connection,
322                               LDLM_CANCEL, 1, &size, NULL);
323         if (!req)
324                 GOTO(out, rc = -ENOMEM);
325
326         body = lustre_msg_buf(req->rq_reqmsg, 0);
327         memcpy(&body->lock_handle1, &lock->l_remote_handle,
328                sizeof(body->lock_handle1));
329
330         req->rq_replen = lustre_msg_size(0, NULL);
331
332         rc = ptlrpc_queue_wait(req);
333         rc = ptlrpc_check_status(req, rc);
334         ptlrpc_free_req(req);
335         if (rc != ELDLM_OK)
336                 GOTO(out, rc);
337
338         ldlm_lock_cancel(lock);
339         LDLM_LOCK_PUT(lock); 
340         EXIT;
341  out:
342         return 0;
343 }