Whamcloud - gitweb
64-bit warning fixes. Someone should take a closer look at ext2_obd.c
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  */
11
12 #define EXPORT_SYMTAB
13 #define DEBUG_SUBSYSTEM S_LDLM
14
15 #include <linux/lustre_dlm.h>
16
17 #define LOOPBACK(x) (((x) & cpu_to_be32(0xff000000)) == cpu_to_be32(0x7f000000))
18
19 static int is_local_conn(struct ptlrpc_connection *conn)
20 {
21         ENTRY;
22         if (conn == NULL)
23                 RETURN(1);
24
25         RETURN(LOOPBACK(conn->c_peer.peer_nid));
26 }
27
28 int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
29                      __u32 ns_id,
30                      struct ldlm_handle *parent_lock_handle,
31                      __u64 *res_id,
32                      __u32 type,
33                      struct ldlm_extent *req_ex,
34                      ldlm_mode_t mode,
35                      int *flags,
36                      ldlm_lock_callback completion,
37                      ldlm_lock_callback blocking,
38                      void *data,
39                      __u32 data_len,
40                      struct ldlm_handle *lockh,
41                      struct ptlrpc_request **request)
42 {
43         struct ldlm_handle local_lockh;
44         struct ldlm_lock *lock;
45         struct ldlm_request *body;
46         struct ldlm_reply *reply;
47         struct ptlrpc_request *req = NULL;
48         char *bufs[2] = {NULL, data};
49         int rc, size[2] = {sizeof(*body), data_len};
50         ldlm_error_t err;
51         ENTRY;
52
53         err = ldlm_local_lock_create(ns_id, parent_lock_handle, res_id,
54                                      type, &local_lockh);
55         if (err != ELDLM_OK)
56                 RETURN(err);
57
58         lock = ldlm_handle2object(&local_lockh);
59         /* Is this lock locally managed? */
60         if (is_local_conn(conn))
61                 GOTO(local, 0);
62
63         req = ptlrpc_prep_req(cl, conn, LDLM_ENQUEUE, 2, size, bufs);
64         if (!req)
65                 GOTO(out, rc = -ENOMEM);
66
67         /* Dump all of this data into the request buffer */
68         body = lustre_msg_buf(req->rq_reqmsg, 0);
69         body->lock_desc.l_resource.lr_ns_id = ns_id;
70         body->lock_desc.l_resource.lr_type = type;
71         memcpy(body->lock_desc.l_resource.lr_name, res_id,
72                sizeof(body->lock_desc.l_resource.lr_name));
73
74         body->lock_desc.l_req_mode = mode;
75         if (req_ex)
76                 memcpy(&body->lock_desc.l_extent, req_ex,
77                        sizeof(body->lock_desc.l_extent));
78         body->flags = *flags;
79
80         memcpy(&body->lock_handle1, &local_lockh, sizeof(body->lock_handle1));
81
82         if (parent_lock_handle)
83                 memcpy(&body->lock_handle2, parent_lock_handle,
84                        sizeof(body->lock_handle2));
85
86         /* Continue as normal. */
87         size[0] = sizeof(*reply);
88         req->rq_replen = lustre_msg_size(1, size);
89
90         rc = ptlrpc_queue_wait(req);
91         rc = ptlrpc_check_status(req, rc);
92         if (rc != ELDLM_OK) {
93                 ldlm_resource_put(lock->l_resource);
94                 ldlm_lock_free(lock);
95                 GOTO(out, rc);
96         }
97
98         lock->l_connection = conn;
99         reply = lustre_msg_buf(req->rq_repmsg, 0);
100         memcpy(&lock->l_remote_handle, &reply->lock_handle,
101                sizeof(lock->l_remote_handle));
102         *flags = reply->flags;
103
104         CERROR("remote handle: %p, flags: %d\n",
105                (void *)(unsigned long)reply->lock_handle.addr, *flags);
106         CERROR("extent: %Lu -> %Lu\n",
107                (unsigned long long)reply->lock_extent.start,
108                (unsigned long long)reply->lock_extent.end);
109
110         EXIT;
111  local:
112         rc = ldlm_local_lock_enqueue(&local_lockh, mode, req_ex, flags,
113                                      completion, blocking, data, data_len);
114         if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
115                      LDLM_FL_BLOCK_CONV)) {
116                 /* Go to sleep until the lock is granted. */
117                 /* FIXME: or cancelled. */
118                 wait_event_interruptible(lock->l_waitq, lock->l_req_mode ==
119                                          lock->l_granted_mode);
120         }
121  out:
122         *request = req;
123         return rc;
124 }
125
126 int ldlm_cli_namespace_new(struct obd_device *obddev, struct ptlrpc_client *cl,
127                            struct ptlrpc_connection *conn, __u32 ns_id)
128 {
129         struct ldlm_namespace *ns;
130         struct ldlm_request *body;
131         struct ptlrpc_request *req;
132         int rc, size = sizeof(*body);
133         ENTRY;
134
135         if (is_local_conn(conn))
136                 GOTO(local, 0);
137
138         req = ptlrpc_prep_req(cl, conn, LDLM_NAMESPACE_NEW, 1, &size, NULL);
139         if (!req)
140                 GOTO(out, rc = -ENOMEM);
141
142         body = lustre_msg_buf(req->rq_reqmsg, 0);
143         body->lock_desc.l_resource.lr_ns_id = ns_id;
144
145         req->rq_replen = lustre_msg_size(0, NULL);
146
147         rc = ptlrpc_queue_wait(req);
148         rc = ptlrpc_check_status(req, rc);
149         ptlrpc_free_req(req);
150         if (rc)
151                 GOTO(out, rc);
152
153         EXIT;
154  local:
155         rc = ldlm_namespace_new(obddev, ns_id, &ns);
156         if (rc != ELDLM_OK) {
157                 /* XXX: It succeeded remotely but failed locally. What to do? */
158                 CERROR("Local ldlm_namespace_new failed.\n");
159         }
160  out:
161         return rc;
162 }
163
164 int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
165                       void *data, __u32 data_len)
166 {
167         struct ldlm_request *body;
168         struct ptlrpc_request *req;
169         struct obd_device *obddev = lock->l_resource->lr_namespace->ns_obddev;
170         struct ptlrpc_client *cl = obddev->u.ldlm.ldlm_client;
171         int rc, size[2] = {sizeof(*body), data_len};
172         char *bufs[2] = {NULL, data};
173         ENTRY;
174
175         req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 2, size,
176                               bufs);
177         if (!req)
178                 GOTO(out, rc = -ENOMEM);
179
180         body = lustre_msg_buf(req->rq_reqmsg, 0);
181         memcpy(&body->lock_handle1, &lock->l_remote_handle,
182                sizeof(body->lock_handle1));
183
184         if (new != NULL) {
185                 ldlm_lock2desc(new, &body->lock_desc);
186                 ldlm_object2handle(new, &body->lock_handle2);
187         }
188
189         req->rq_replen = lustre_msg_size(0, NULL);
190
191         rc = ptlrpc_queue_wait(req);
192         rc = ptlrpc_check_status(req, rc);
193         ptlrpc_free_req(req);
194
195         EXIT;
196  out:
197         return rc;
198 }
199
200 int ldlm_cli_convert(struct ptlrpc_client *cl, struct ldlm_handle *lockh,
201                      int new_mode, int *flags, struct ptlrpc_request **request)
202 {
203         struct ldlm_request *body;
204         struct ldlm_reply *reply;
205         struct ldlm_lock *lock;
206         struct ptlrpc_request *req = NULL;
207         int rc, size[2] = {sizeof(*body), 0};
208         char *bufs[2] = {NULL, NULL};
209         ENTRY;
210
211         lock = ldlm_handle2object(lockh);
212
213         if (is_local_conn(lock->l_connection))
214                 GOTO(local, 0);
215
216         size[1] = lock->l_data_len;
217         bufs[1] = lock->l_data;
218         req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CONVERT, 2, size,
219                               bufs);
220         if (!req)
221                 GOTO(out, rc = -ENOMEM);
222
223         body = lustre_msg_buf(req->rq_reqmsg, 0);
224         memcpy(&body->lock_handle1, &lock->l_remote_handle,
225                sizeof(body->lock_handle1));
226
227         body->lock_desc.l_req_mode = new_mode;
228         body->flags = *flags;
229
230         req->rq_replen = lustre_msg_size(1, size);
231
232         rc = ptlrpc_queue_wait(req);
233         rc = ptlrpc_check_status(req, rc);
234         if (rc != ELDLM_OK)
235                 GOTO(out, rc);
236
237         reply = lustre_msg_buf(req->rq_repmsg, 0);
238         *flags = reply->flags;
239
240         EXIT;
241  local:
242         rc = ldlm_local_lock_convert(lockh, new_mode, flags);
243  out:
244         *request = req;
245         return rc;
246 }
247
248 int ldlm_cli_cancel(struct ptlrpc_client *cl, struct ldlm_handle *lockh,
249                     struct ptlrpc_request **request)
250 {
251         struct ldlm_request *body;
252         struct ldlm_lock *lock;
253         struct ptlrpc_request *req = NULL;
254         int rc, size[2] = {sizeof(*body), 0};
255         char *bufs[2] = {NULL, NULL};
256         ENTRY;
257
258         lock = ldlm_handle2object(lockh);
259
260         if (is_local_conn(lock->l_connection))
261                 GOTO(local, 0);
262
263         size[1] = lock->l_data_len;
264         bufs[1] = lock->l_data;
265         req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CANCEL, 2, size,
266                               bufs);
267         if (!req)
268                 GOTO(out, rc = -ENOMEM);
269
270         body = lustre_msg_buf(req->rq_reqmsg, 0);
271         memcpy(&body->lock_handle1, &lock->l_remote_handle,
272                sizeof(body->lock_handle1));
273
274         req->rq_replen = lustre_msg_size(0, NULL);
275
276         rc = ptlrpc_queue_wait(req);
277         rc = ptlrpc_check_status(req, rc);
278         if (rc != ELDLM_OK)
279                 GOTO(out, rc);
280
281         EXIT;
282  local:
283         rc = ldlm_local_lock_cancel(lockh);
284  out:
285         *request = req;
286         return rc;
287 }