Whamcloud - gitweb
- more LDLM refcount locking infrastructure
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  */
11
12 #define DEBUG_SUBSYSTEM S_LDLM
13
14 #include <linux/lustre_dlm.h>
15
16 int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
17                      struct lustre_handle *connh, 
18                      struct ptlrpc_request *req,
19                      struct ldlm_namespace *ns,
20                      struct lustre_handle *parent_lock_handle,
21                      __u64 *res_id,
22                      __u32 type,
23                      void *cookie, int cookielen,
24                      ldlm_mode_t mode,
25                      int *flags,
26                      ldlm_lock_callback callback,
27                      void *data,
28                      __u32 data_len,
29                      struct lustre_handle *lockh)
30 {
31         struct ldlm_lock *lock;
32         struct ldlm_request *body;
33         struct ldlm_reply *reply;
34         int rc, size = sizeof(*body), req_passed_in = 1;
35         ENTRY;
36
37         *flags = 0;
38         lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type, mode,
39                                 data, data_len);
40         if (lock == NULL)
41                 GOTO(out, rc = -ENOMEM);
42         LDLM_DEBUG(lock, "client-side enqueue START");
43         /* for the local lock, add the reference */
44         ldlm_lock_addref_internal(lock, mode);
45         ldlm_lock2handle(lock, lockh);
46
47         if (req == NULL) {
48                 req = ptlrpc_prep_req2(cl, conn, connh, 
49                                        LDLM_ENQUEUE, 1, &size, NULL);
50                 if (!req)
51                         GOTO(out, rc = -ENOMEM);
52                 req_passed_in = 0;
53         } else if (req->rq_reqmsg->buflens[0] != sizeof(*body))
54                 LBUG();
55
56         /* Dump all of this data into the request buffer */
57         body = lustre_msg_buf(req->rq_reqmsg, 0);
58         ldlm_lock2desc(lock, &body->lock_desc);
59         /* Phil: make this part of ldlm_lock2desc */
60         if (type == LDLM_EXTENT)
61                 memcpy(&body->lock_desc.l_extent, cookie,
62                        sizeof(body->lock_desc.l_extent));
63         body->lock_flags = *flags;
64
65         memcpy(&body->lock_handle1, lockh, sizeof(*lockh));
66         if (parent_lock_handle)
67                 memcpy(&body->lock_handle2, parent_lock_handle,
68                        sizeof(body->lock_handle2));
69
70         /* Continue as normal. */
71         if (!req_passed_in) {
72                 size = sizeof(*reply);
73                 req->rq_replen = lustre_msg_size(1, &size);
74         }
75
76         lock->l_connection = ptlrpc_connection_addref(conn);
77         lock->l_client = cl;
78
79         rc = ptlrpc_queue_wait(req);
80         /* FIXME: status check here? */
81         rc = ptlrpc_check_status(req, rc);
82
83         if (rc != ELDLM_OK) {
84                 LDLM_DEBUG(lock, "client-side enqueue END (%s)",
85                            rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
86                 ldlm_lock_put(lock);
87                 ldlm_lock_decref(lockh, mode);
88                 /* FIXME: if we've already received a completion AST, this will
89                  * LBUG! */
90                 ldlm_lock_destroy(lock);
91                 GOTO(out, rc);
92         }
93
94         reply = lustre_msg_buf(req->rq_repmsg, 0);
95         memcpy(&lock->l_remote_handle, &reply->lock_handle,
96                sizeof(lock->l_remote_handle));
97         if (type == LDLM_EXTENT)
98                 memcpy(cookie, &reply->lock_extent, sizeof(reply->lock_extent));
99         *flags = reply->lock_flags;
100
101         CDEBUG(D_INFO, "remote handle: %p, flags: %d\n",
102                (void *)(unsigned long)reply->lock_handle.addr, *flags);
103         CDEBUG(D_INFO, "extent: %Lu -> %Lu\n",
104                (unsigned long long)reply->lock_extent.start,
105                (unsigned long long)reply->lock_extent.end);
106
107         /* If enqueue returned a blocked lock but the completion handler has
108          * already run, then it fixed up the resource and we don't need to do it
109          * again. */
110         if (*flags & LDLM_FL_LOCK_CHANGED &&
111             lock->l_req_mode != lock->l_granted_mode) {
112                 CDEBUG(D_INFO, "remote intent success, locking %ld instead of"
113                        "%ld\n", (long)reply->lock_resource_name[0],
114                        (long)lock->l_resource->lr_name[0]);
115
116                 ldlm_lock_change_resource(lock, reply->lock_resource_name);
117                 if (lock->l_resource == NULL) {
118                         LBUG();
119                         RETURN(-ENOMEM);
120                 }
121                 LDLM_DEBUG(lock, "client-side enqueue, new resource");
122         }
123
124         if (!req_passed_in)
125                 ptlrpc_free_req(req);
126
127         rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, callback,
128                                      callback);
129
130         if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
131                       LDLM_FL_BLOCK_CONV)) {
132                 /* Go to sleep until the lock is granted. */
133                 /* FIXME: or cancelled. */
134                 LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock,"
135                            " sleeping");
136                 ldlm_lock_dump(lock);
137                 wait_event_interruptible(lock->l_waitq, lock->l_req_mode ==
138                                          lock->l_granted_mode);
139                 LDLM_DEBUG(lock, "client-side enqueue waking up: granted");
140         }
141         LDLM_DEBUG(lock, "client-side enqueue END");
142         ldlm_lock_put(lock);
143         EXIT;
144  out:
145         return rc;
146 }
147
148 int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *desc,
149                     void *data, __u32 data_len)
150 {
151         struct ldlm_lock *lock;
152         struct ldlm_request *body;
153         struct ptlrpc_request *req;
154         struct ptlrpc_client *cl;
155         int rc = 0, size = sizeof(*body);
156         ENTRY;
157
158         lock = ldlm_handle2lock(lockh);
159         if (lock == NULL)
160                 LBUG();
161         cl = &lock->l_resource->lr_namespace->ns_rpc_client;
162         req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 1,
163                               &size, NULL);
164         if (!req)
165                 GOTO(out, rc = -ENOMEM);
166
167         body = lustre_msg_buf(req->rq_reqmsg, 0);
168         memcpy(&body->lock_handle1, &lock->l_remote_handle,
169                sizeof(body->lock_handle1));
170
171         if (desc == NULL) {
172                 CDEBUG(D_NET, "Sending granted AST\n");
173                 ldlm_lock2desc(lock, &body->lock_desc);
174         } else {
175                 CDEBUG(D_NET, "Sending blocked AST\n");
176                 memcpy(&body->lock_desc, desc, sizeof(*desc));
177         }
178
179         LDLM_DEBUG(lock, "server preparing %s AST",
180                    desc == 0 ? "completion" : "blocked");
181
182         req->rq_replen = lustre_msg_size(0, NULL);
183
184         rc = ptlrpc_queue_wait(req);
185         rc = ptlrpc_check_status(req, rc);
186         ptlrpc_free_req(req);
187
188         EXIT;
189  out:
190         ldlm_lock_put(lock);
191         return rc;
192 }
193
194 int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh,
195                      struct lustre_handle *connh, 
196                      int new_mode, int *flags)
197 {
198         struct ldlm_request *body;
199         struct ldlm_reply *reply;
200         struct ldlm_lock *lock;
201         struct ldlm_resource *res;
202         struct ptlrpc_request *req;
203         int rc, size = sizeof(*body);
204         ENTRY;
205
206         lock = ldlm_handle2lock(lockh);
207         if (!lock)
208                 LBUG(); 
209         *flags = 0;
210
211         LDLM_DEBUG(lock, "client-side convert");
212
213         req = ptlrpc_prep_req(cl, lock->l_connection,
214                                LDLM_CONVERT, 1, &size, NULL);
215         if (!req)
216                 GOTO(out, rc = -ENOMEM);
217
218         body = lustre_msg_buf(req->rq_reqmsg, 0);
219         memcpy(&body->lock_handle1, &lock->l_remote_handle,
220                sizeof(body->lock_handle1));
221
222         body->lock_desc.l_req_mode = new_mode;
223         body->lock_flags = *flags;
224
225         size = sizeof(*reply);
226         req->rq_replen = lustre_msg_size(1, &size);
227
228         rc = ptlrpc_queue_wait(req);
229         rc = ptlrpc_check_status(req, rc);
230         if (rc != ELDLM_OK)
231                 GOTO(out, rc);
232
233         reply = lustre_msg_buf(req->rq_repmsg, 0);
234         res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags);
235         if (res != NULL)
236                 ldlm_reprocess_all(res);
237         if (lock->l_req_mode != lock->l_granted_mode) {
238                 /* Go to sleep until the lock is granted. */
239                 /* FIXME: or cancelled. */
240                 CDEBUG(D_NET, "convert returned a blocked lock, "
241                        "going to sleep.\n");
242                 wait_event_interruptible(lock->l_waitq, lock->l_req_mode ==
243                                          lock->l_granted_mode);
244                 CDEBUG(D_NET, "waking up, the lock must be granted.\n");
245         }
246         ldlm_lock_put(lock);
247         EXIT;
248  out:
249         ptlrpc_free_req(req);
250         return rc;
251 }
252
253 int ldlm_cli_cancel(struct lustre_handle *lockh, 
254                     struct lustre_handle *connh)
255 {
256         struct ptlrpc_request *req;
257         struct ldlm_lock *lock;
258         struct ldlm_request *body;
259         int rc, size = sizeof(*body);
260         ENTRY;
261
262         lock = ldlm_handle2lock(lockh); 
263         if (!lock)
264                 LBUG();
265
266         LDLM_DEBUG(lock, "client-side cancel");
267         req = ptlrpc_prep_req(lock->l_client, lock->l_connection,
268                               LDLM_CANCEL, 1, &size, NULL);
269         if (!req)
270                 GOTO(out, rc = -ENOMEM);
271
272         body = lustre_msg_buf(req->rq_reqmsg, 0);
273         memcpy(&body->lock_handle1, &lock->l_remote_handle,
274                sizeof(body->lock_handle1));
275
276         req->rq_replen = lustre_msg_size(0, NULL);
277
278         rc = ptlrpc_queue_wait(req);
279         rc = ptlrpc_check_status(req, rc);
280         ptlrpc_free_req(req);
281         if (rc != ELDLM_OK)
282                 GOTO(out, rc);
283
284         ldlm_lock_cancel(lock);
285         ldlm_lock_put(lock); 
286         EXIT;
287  out:
288         return 0;
289 }