Whamcloud - gitweb
The more controversial changes, although my tree finally works:
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  */
11
12 #define EXPORT_SYMTAB
13 #define DEBUG_SUBSYSTEM S_LDLM
14
15 #include <linux/lustre_dlm.h>
16
17 #define LOOPBACK(x) (((x) & cpu_to_be32(0xff000000)) == cpu_to_be32(0x7f000000))
18
19 static int is_local_conn(struct ptlrpc_connection *conn)
20 {
21         ENTRY;
22         if (conn == NULL)
23                 RETURN(1);
24
25         RETURN(LOOPBACK(conn->c_peer.peer_nid));
26 }
27
28 int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
29                      __u32 ns_id,
30                      struct ldlm_handle *parent_lock_handle,
31                      __u64 *res_id,
32                      __u32 type,
33                      struct ldlm_extent *req_ex,
34                      ldlm_mode_t mode,
35                      int *flags,
36                      ldlm_lock_callback completion,
37                      ldlm_lock_callback blocking,
38                      void *data,
39                      __u32 data_len,
40                      struct ldlm_handle *lockh,
41                      struct ptlrpc_request **request)
42 {
43         struct ldlm_handle local_lockh;
44         struct ldlm_lock *lock;
45         struct ldlm_request *body;
46         struct ldlm_reply *reply;
47         struct ptlrpc_request *req = NULL;
48         char *bufs[2] = {NULL, data};
49         int rc, size[2] = {sizeof(*body), data_len};
50         ldlm_error_t err;
51         ENTRY;
52
53         err = ldlm_local_lock_create(ns_id, parent_lock_handle, res_id,
54                                      type, &local_lockh);
55         if (err != ELDLM_OK)
56                 RETURN(err);
57
58         lock = ldlm_handle2object(&local_lockh);
59         /* Is this lock locally managed? */
60         if (is_local_conn(conn))
61                 GOTO(local, 0);
62
63         req = ptlrpc_prep_req(cl, conn, LDLM_ENQUEUE, 2, size, bufs);
64         if (!req)
65                 GOTO(out, rc = -ENOMEM);
66
67         /* Dump all of this data into the request buffer */
68         body = lustre_msg_buf(req->rq_reqmsg, 0);
69         body->lock_desc.l_resource.lr_ns_id = ns_id;
70         body->lock_desc.l_resource.lr_type = type;
71         memcpy(body->lock_desc.l_resource.lr_name, res_id,
72                sizeof(body->lock_desc.l_resource.lr_name));
73
74         body->lock_desc.l_req_mode = mode;
75         if (req_ex)
76                 memcpy(&body->lock_desc.l_extent, req_ex,
77                        sizeof(body->lock_desc.l_extent));
78         body->flags = *flags;
79
80         memcpy(&body->lock_handle1, &local_lockh, sizeof(body->lock_handle1));
81
82         if (parent_lock_handle)
83                 memcpy(&body->lock_handle2, parent_lock_handle,
84                        sizeof(body->lock_handle2));
85
86         /* Continue as normal. */
87         size[0] = sizeof(*reply);
88         req->rq_replen = lustre_msg_size(1, size);
89
90         rc = ptlrpc_queue_wait(req);
91         rc = ptlrpc_check_status(req, rc);
92         if (rc != ELDLM_OK) {
93                 ldlm_resource_put(lock->l_resource);
94                 ldlm_lock_free(lock);
95                 GOTO(out, rc);
96         }
97
98         lock->l_connection = conn;
99         reply = lustre_msg_buf(req->rq_repmsg, 0);
100         memcpy(&lock->l_remote_handle, &reply->lock_handle,
101                sizeof(lock->l_remote_handle));
102         *flags = reply->flags;
103
104         CERROR("remote handle: %p, flags: %d\n",
105                (void *)(unsigned long)reply->lock_handle.addr, *flags);
106         CERROR("extent: %Lu -> %Lu\n", reply->lock_extent.start,
107                reply->lock_extent.end);
108
109         EXIT;
110  local:
111         rc = ldlm_local_lock_enqueue(&local_lockh, mode, req_ex, flags,
112                                      completion, blocking, data, data_len);
113         if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
114                      LDLM_FL_BLOCK_CONV)) {
115                 /* Go to sleep until the lock is granted. */
116                 /* FIXME: or cancelled. */
117                 wait_event_interruptible(lock->l_waitq, lock->l_req_mode ==
118                                          lock->l_granted_mode);
119         }
120  out:
121         *request = req;
122         return rc;
123 }
124
125 int ldlm_cli_namespace_new(struct obd_device *obddev, struct ptlrpc_client *cl,
126                            struct ptlrpc_connection *conn, __u32 ns_id)
127 {
128         struct ldlm_namespace *ns;
129         struct ldlm_request *body;
130         struct ptlrpc_request *req;
131         int rc, size = sizeof(*body);
132         ENTRY;
133
134         if (is_local_conn(conn))
135                 GOTO(local, 0);
136
137         req = ptlrpc_prep_req(cl, conn, LDLM_NAMESPACE_NEW, 1, &size, NULL);
138         if (!req)
139                 GOTO(out, rc = -ENOMEM);
140
141         body = lustre_msg_buf(req->rq_reqmsg, 0);
142         body->lock_desc.l_resource.lr_ns_id = ns_id;
143
144         req->rq_replen = lustre_msg_size(0, NULL);
145
146         rc = ptlrpc_queue_wait(req);
147         rc = ptlrpc_check_status(req, rc);
148         ptlrpc_free_req(req);
149         if (rc)
150                 GOTO(out, rc);
151
152         EXIT;
153  local:
154         rc = ldlm_namespace_new(obddev, ns_id, &ns);
155         if (rc != ELDLM_OK) {
156                 /* XXX: It succeeded remotely but failed locally. What to do? */
157                 CERROR("Local ldlm_namespace_new failed.\n");
158         }
159  out:
160         return rc;
161 }
162
163 int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
164                       void *data, __u32 data_len)
165 {
166         struct ldlm_request *body;
167         struct ptlrpc_request *req;
168         struct obd_device *obddev = lock->l_resource->lr_namespace->ns_obddev;
169         struct ptlrpc_client *cl = obddev->u.ldlm.ldlm_client;
170         int rc, size[2] = {sizeof(*body), data_len};
171         char *bufs[2] = {NULL, data};
172         ENTRY;
173
174         req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 2, size,
175                               bufs);
176         if (!req)
177                 GOTO(out, rc = -ENOMEM);
178
179         body = lustre_msg_buf(req->rq_reqmsg, 0);
180         memcpy(&body->lock_handle1, &lock->l_remote_handle,
181                sizeof(body->lock_handle1));
182
183         if (new != NULL) {
184                 ldlm_lock2desc(new, &body->lock_desc);
185                 ldlm_object2handle(new, &body->lock_handle2);
186         }
187
188         req->rq_replen = lustre_msg_size(0, NULL);
189
190         rc = ptlrpc_queue_wait(req);
191         rc = ptlrpc_check_status(req, rc);
192         ptlrpc_free_req(req);
193
194         EXIT;
195  out:
196         return rc;
197 }
198
199 int ldlm_cli_convert(struct ptlrpc_client *cl, struct ldlm_handle *lockh,
200                      int new_mode, int *flags, struct ptlrpc_request **request)
201 {
202         struct ldlm_request *body;
203         struct ldlm_reply *reply;
204         struct ldlm_lock *lock;
205         struct ptlrpc_request *req;
206         int rc, size[2] = {sizeof(*body), lock->l_data_len};
207         char *bufs[2] = {NULL, lock->l_data};
208         ENTRY;
209
210         lock = ldlm_handle2object(lockh);
211
212         if (is_local_conn(lock->l_connection))
213                 GOTO(local, 0);
214
215         req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CONVERT, 2, size,
216                               bufs);
217         if (!req)
218                 GOTO(out, rc = -ENOMEM);
219
220         body = lustre_msg_buf(req->rq_reqmsg, 0);
221         memcpy(&body->lock_handle1, &lock->l_remote_handle,
222                sizeof(body->lock_handle1));
223
224         body->lock_desc.l_req_mode = new_mode;
225         body->flags = *flags;
226
227         req->rq_replen = lustre_msg_size(1, size);
228
229         rc = ptlrpc_queue_wait(req);
230         rc = ptlrpc_check_status(req, rc);
231         if (rc != ELDLM_OK)
232                 GOTO(out, rc);
233
234         reply = lustre_msg_buf(req->rq_repmsg, 0);
235         *flags = reply->flags;
236
237         EXIT;
238  local:
239         rc = ldlm_local_lock_convert(lockh, new_mode, flags);
240  out:
241         *request = req;
242         return rc;
243 }
244
245 int ldlm_cli_cancel(struct ptlrpc_client *cl, struct ldlm_handle *lockh,
246                     struct ptlrpc_request **request)
247 {
248         struct ldlm_request *body;
249         struct ldlm_lock *lock;
250         struct ptlrpc_request *req;
251         int rc, size[2] = {sizeof(*body), lock->l_data_len};
252         char *bufs[2] = {NULL, lock->l_data};
253         ENTRY;
254
255         lock = ldlm_handle2object(lockh);
256
257         if (is_local_conn(lock->l_connection))
258                 GOTO(local, 0);
259
260         req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CANCEL, 2, size,
261                               bufs);
262         if (!req)
263                 GOTO(out, rc = -ENOMEM);
264
265         body = lustre_msg_buf(req->rq_reqmsg, 0);
266         memcpy(&body->lock_handle1, &lock->l_remote_handle,
267                sizeof(body->lock_handle1));
268
269         req->rq_replen = lustre_msg_size(0, NULL);
270
271         rc = ptlrpc_queue_wait(req);
272         rc = ptlrpc_check_status(req, rc);
273         if (rc != ELDLM_OK)
274                 GOTO(out, rc);
275
276         EXIT;
277  local:
278         rc = ldlm_local_lock_cancel(lockh);
279  out:
280         *request = req;
281         return rc;
282 }