Whamcloud - gitweb
- first fixes to get things working again: current state:
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  */
11
12 #define DEBUG_SUBSYSTEM S_LDLM
13
14 #include <linux/lustre_dlm.h>
15
16 int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
17                      struct ptlrpc_request *req,
18                      struct ldlm_namespace *ns,
19                      struct lustre_handle *parent_lock_handle,
20                      __u64 *res_id,
21                      __u32 type,
22                      void *cookie, int cookielen,
23                      ldlm_mode_t mode,
24                      int *flags,
25                      ldlm_lock_callback callback,
26                      void *data,
27                      __u32 data_len,
28                      struct lustre_handle *lockh)
29 {
30         struct ldlm_lock *lock;
31         struct ldlm_request *body;
32         struct ldlm_reply *reply;
33         int rc, size = sizeof(*body), req_passed_in = 1;
34         ENTRY;
35
36         *flags = 0;
37         lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type, mode,
38                                 data, data_len);
39         if (lock == NULL)
40                 GOTO(out, rc = -ENOMEM);
41         ldlm_lock2handle(lock, lockh);
42
43         LDLM_DEBUG(lock, "client-side enqueue START");
44
45         if (req == NULL) {
46                 req = ptlrpc_prep_req(cl, conn, LDLM_ENQUEUE, 1, &size, NULL);
47                 if (!req)
48                         GOTO(out, rc = -ENOMEM);
49                 req_passed_in = 0;
50         } else if (req->rq_reqmsg->buflens[0] != sizeof(*body))
51                 LBUG();
52
53         /* Dump all of this data into the request buffer */
54         body = lustre_msg_buf(req->rq_reqmsg, 0);
55         ldlm_lock2desc(lock, &body->lock_desc);
56         /* Phil: make this part of ldlm_lock2desc */
57         if (type == LDLM_EXTENT)
58                 memcpy(&body->lock_desc.l_extent, cookie,
59                        sizeof(body->lock_desc.l_extent));
60         body->lock_flags = *flags;
61
62         ldlm_lock2handle(lock, &body->lock_handle1);
63         if (parent_lock_handle)
64                 memcpy(&body->lock_handle2, parent_lock_handle,
65                        sizeof(body->lock_handle2));
66
67         /* Continue as normal. */
68         if (!req_passed_in) {
69                 size = sizeof(*reply);
70                 req->rq_replen = lustre_msg_size(1, &size);
71         }
72
73         lock->l_connection = conn;
74         lock->l_client = cl;
75
76         rc = ptlrpc_queue_wait(req);
77         /* FIXME: status check here? */
78         rc = ptlrpc_check_status(req, rc);
79
80         if (rc != ELDLM_OK) {
81                 LDLM_DEBUG(lock, "client-side enqueue END (%s)",
82                            rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
83                 ldlm_lock_put(lock);
84                 GOTO(out, rc);
85         }
86
87         reply = lustre_msg_buf(req->rq_repmsg, 0);
88         memcpy(&lock->l_remote_handle, &reply->lock_handle,
89                sizeof(lock->l_remote_handle));
90         if (type == LDLM_EXTENT)
91                 memcpy(cookie, &reply->lock_extent, sizeof(reply->lock_extent));
92         *flags = reply->lock_flags;
93
94         CDEBUG(D_INFO, "remote handle: %p, flags: %d\n",
95                (void *)(unsigned long)reply->lock_handle.addr, *flags);
96         CDEBUG(D_INFO, "extent: %Lu -> %Lu\n",
97                (unsigned long long)reply->lock_extent.start,
98                (unsigned long long)reply->lock_extent.end);
99
100         /* If enqueue returned a blocked lock but the completion handler has
101          * already run, then it fixed up the resource and we don't need to do it
102          * again. */
103         if (*flags & LDLM_FL_LOCK_CHANGED &&
104             lock->l_req_mode != lock->l_granted_mode) {
105                 CDEBUG(D_INFO, "remote intent success, locking %ld instead of"
106                        "%ld\n", (long)reply->lock_resource_name[0],
107                        (long)lock->l_resource->lr_name[0]);
108
109                 ldlm_lock_change_resource(lock, reply->lock_resource_name);
110                 if (lock->l_resource == NULL) {
111                         LBUG();
112                         RETURN(-ENOMEM);
113                 }
114                 LDLM_DEBUG(lock, "client-side enqueue, new resource");
115         }
116
117         if (!req_passed_in)
118                 ptlrpc_free_req(req);
119
120         rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, callback,
121                                      callback);
122
123         LDLM_DEBUG(lock, "client-side enqueue END");
124         if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
125                       LDLM_FL_BLOCK_CONV)) {
126                 /* Go to sleep until the lock is granted. */
127                 /* FIXME: or cancelled. */
128                 CDEBUG(D_NET, "enqueue returned a blocked lock (%p), "
129                        "going to sleep.\n", lock);
130                 ldlm_lock_dump(lock);
131                 wait_event_interruptible(lock->l_waitq, lock->l_req_mode ==
132                                          lock->l_granted_mode);
133                 CDEBUG(D_NET, "waking up, the lock must be granted.\n");
134         }
135         ldlm_lock_put(lock);
136         EXIT;
137  out:
138         return rc;
139 }
140
141 int ldlm_cli_callback(struct lustre_handle *lockh, struct ldlm_lock_desc *desc,
142                       void *data, __u32 data_len, struct ptlrpc_request **reqp)
143 {
144         struct ldlm_lock *lock;
145         struct ldlm_request *body;
146         struct ptlrpc_request *req;
147         struct ptlrpc_client *cl =
148                 &lock->l_resource->lr_namespace->ns_rpc_client;
149         int rc = 0, size = sizeof(*body);
150         ENTRY;
151
152         lock = ldlm_handle2lock(lockh);
153         if (lock == NULL)
154                 LBUG();
155
156         req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 1,
157                               &size, NULL);
158         if (!req)
159                 GOTO(out, rc = -ENOMEM);
160
161         body = lustre_msg_buf(req->rq_reqmsg, 0);
162         memcpy(&body->lock_handle1, &lock->l_remote_handle,
163                sizeof(body->lock_handle1));
164
165         if (desc == NULL) {
166                 CDEBUG(D_NET, "Sending granted AST\n");
167                 ldlm_lock2desc(lock, &body->lock_desc);
168         } else {
169                 CDEBUG(D_NET, "Sending blocked AST\n");
170                 memcpy(&body->lock_desc, desc, sizeof(*desc));
171         }
172
173         LDLM_DEBUG(lock, "server preparing %s AST",
174                    desc->l_req_mode == 0 ? "completion" : "blocked");
175
176         req->rq_replen = lustre_msg_size(0, NULL);
177
178         if (reqp == NULL) {
179                 LBUG();
180                 rc = ptlrpc_queue_wait(req);
181                 rc = ptlrpc_check_status(req, rc);
182                 ptlrpc_free_req(req);
183         } else
184                 *reqp = req;
185
186
187         EXIT;
188  out:
189         ldlm_lock_put(lock);
190         return rc;
191 }
192
193 int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh,
194                      int new_mode, int *flags)
195 {
196         struct ldlm_request *body;
197         struct ldlm_reply *reply;
198         struct ldlm_lock *lock;
199         struct ldlm_resource *res;
200         struct ptlrpc_request *req;
201         int rc, size = sizeof(*body);
202         ENTRY;
203
204         lock = ldlm_handle2lock(lockh);
205         if (!lock)
206                 LBUG(); 
207         *flags = 0;
208
209         LDLM_DEBUG(lock, "client-side convert");
210
211         req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CONVERT, 1, &size,
212                               NULL);
213         if (!req)
214                 GOTO(out, rc = -ENOMEM);
215
216         body = lustre_msg_buf(req->rq_reqmsg, 0);
217         memcpy(&body->lock_handle1, &lock->l_remote_handle,
218                sizeof(body->lock_handle1));
219
220         body->lock_desc.l_req_mode = new_mode;
221         body->lock_flags = *flags;
222
223         size = sizeof(*reply);
224         req->rq_replen = lustre_msg_size(1, &size);
225
226         rc = ptlrpc_queue_wait(req);
227         rc = ptlrpc_check_status(req, rc);
228         if (rc != ELDLM_OK)
229                 GOTO(out, rc);
230
231         reply = lustre_msg_buf(req->rq_repmsg, 0);
232         res = ldlm_convert(lock, new_mode, &reply->lock_flags);
233         if (res != NULL)
234                 ldlm_reprocess_all(res);
235         if (lock->l_req_mode != lock->l_granted_mode) {
236                 /* Go to sleep until the lock is granted. */
237                 /* FIXME: or cancelled. */
238                 CDEBUG(D_NET, "convert returned a blocked lock, "
239                        "going to sleep.\n");
240                 wait_event_interruptible(lock->l_waitq, lock->l_req_mode ==
241                                          lock->l_granted_mode);
242                 CDEBUG(D_NET, "waking up, the lock must be granted.\n");
243         }
244         ldlm_lock_put(lock);
245         EXIT;
246  out:
247         ptlrpc_free_req(req);
248         return rc;
249 }
250
251 int ldlm_cli_cancel(struct lustre_handle *lockh)
252 {
253         struct ptlrpc_request *req;
254         struct ldlm_lock *lock;
255         struct ldlm_request *body;
256         struct ldlm_resource *res;
257         int rc, size = sizeof(*body);
258         ENTRY;
259
260         lock = ldlm_handle2lock(lockh); 
261         if (!lock)
262                 LBUG();
263
264         LDLM_DEBUG(lock, "client-side cancel");
265         req = ptlrpc_prep_req(lock->l_client, lock->l_connection, LDLM_CANCEL,
266                               1, &size, NULL);
267         if (!req)
268                 GOTO(out, rc = -ENOMEM);
269
270         body = lustre_msg_buf(req->rq_reqmsg, 0);
271         memcpy(&body->lock_handle1, &lock->l_remote_handle,
272                sizeof(body->lock_handle1));
273
274         req->rq_replen = lustre_msg_size(0, NULL);
275
276         rc = ptlrpc_queue_wait(req);
277         rc = ptlrpc_check_status(req, rc);
278         ptlrpc_free_req(req);
279         if (rc != ELDLM_OK)
280                 GOTO(out, rc);
281
282         ldlm_lock_cancel(lock);
283         ldlm_reprocess_all(res);
284         ldlm_lock_put(lock); 
285         EXIT;
286  out:
287         return 0;
288 }