Whamcloud - gitweb
7c20bd9d4e6f4b3793d9cd27597123c8c10bec7c
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  */
11
12 #define EXPORT_SYMTAB
13
14 #define DEBUG_SUBSYSTEM S_LDLM
15
16 #include <linux/lustre_dlm.h>
17
18 int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
19                      __u32 ns_id,
20                      struct ldlm_handle *parent_lock_handle,
21                      __u64 *res_id,
22                      __u32 type,
23                      struct ldlm_extent *req_ex,
24                      ldlm_mode_t mode,
25                      int *flags,
26                      void *data,
27                      __u32 data_len,
28                      struct ldlm_handle *lockh,
29                      struct ptlrpc_request **request)
30 {
31         struct ldlm_handle local_lockh;
32         struct ldlm_lock *lock;
33         struct ldlm_request *body;
34         struct ldlm_reply *reply;
35         struct ptlrpc_request *req = NULL;
36         char *bufs[2] = {NULL, data};
37         int rc, size[2] = {sizeof(*body), data_len};
38         ldlm_error_t err;
39         ENTRY;
40
41         err = ldlm_local_lock_create(ns_id, parent_lock_handle, res_id,
42                                      type, &local_lockh);
43         if (err != ELDLM_OK)
44                 RETURN(err);
45
46         /* Is this lock locally managed? */
47         if (conn == NULL)
48                 GOTO(local, 0);
49
50         req = ptlrpc_prep_req(cl, conn, LDLM_ENQUEUE, 2, size, bufs);
51         if (!req)
52                 GOTO(out, rc = -ENOMEM);
53
54         /* Dump all of this data into the request buffer */
55         body = lustre_msg_buf(req->rq_reqmsg, 0);
56         body->lock_desc.l_resource.lr_ns_id = ns_id;
57         body->lock_desc.l_resource.lr_type = type;
58         memcpy(body->lock_desc.l_resource.lr_name, res_id,
59                sizeof(body->lock_desc.l_resource.lr_name));
60
61         body->lock_desc.l_req_mode = mode;
62         if (req_ex)
63                 memcpy(&body->lock_desc.l_extent, req_ex,
64                        sizeof(body->lock_desc.l_extent));
65         body->flags = *flags;
66
67         memcpy(&body->lock_handle1, &local_lockh, sizeof(body->lock_handle1));
68
69         if (parent_lock_handle)
70                 memcpy(&body->lock_handle2, parent_lock_handle,
71                        sizeof(body->lock_handle2));
72
73         /* Continue as normal. */
74         size[0] = sizeof(*reply);
75         req->rq_replen = lustre_msg_size(1, size);
76
77         rc = ptlrpc_queue_wait(req);
78         rc = ptlrpc_check_status(req, rc);
79         if (rc != ELDLM_OK)
80                 GOTO(out, rc);
81
82         reply = lustre_msg_buf(req->rq_repmsg, 0);
83         lock = ldlm_handle2object(&local_lockh);
84         memcpy(&lock->l_remote_handle, &reply->lock_handle,
85                sizeof(lock->l_remote_handle));
86         *flags = reply->flags;
87
88         CERROR("remote handle: %p, flags: %d\n",
89                (void *)(unsigned long)reply->lock_handle.addr, *flags);
90         CERROR("extent: %Lu -> %Lu\n", reply->lock_extent.start,
91                reply->lock_extent.end);
92
93         EXIT;
94  local:
95         rc = ldlm_local_lock_enqueue(&local_lockh, mode, req_ex, flags, NULL,
96                                      NULL, data, data_len);
97  out:
98         *request = req;
99         return rc;
100 }
101
102 int ldlm_cli_namespace_new(struct obd_device *obddev, struct ptlrpc_client *cl,
103                            struct ptlrpc_connection *conn, __u32 ns_id,
104                            struct ptlrpc_request **request)
105 {
106         struct ldlm_namespace *ns;
107         struct ldlm_request *body;
108         struct ptlrpc_request *req;
109         int rc, size = sizeof(*body);
110         int err;
111
112         if (conn == NULL)
113                 GOTO(local, 0);
114
115         req = ptlrpc_prep_req(cl, conn, LDLM_NAMESPACE_NEW, 1, &size, NULL);
116         if (!req)
117                 GOTO(out, rc = -ENOMEM);
118
119         body = lustre_msg_buf(req->rq_reqmsg, 0);
120         body->lock_desc.l_resource.lr_ns_id = ns_id;
121
122         req->rq_replen = lustre_msg_size(0, NULL);
123
124         rc = ptlrpc_queue_wait(req);
125         rc = ptlrpc_check_status(req, rc);
126         if (rc)
127                 GOTO(out, rc);
128
129         EXIT;
130  local:
131         err = ldlm_namespace_new(obddev, ns_id, &ns);
132         if (err != ELDLM_OK) {
133                 /* XXX: It succeeded remotely but failed locally. What to do? */
134                 return err;
135         }
136  out:
137         *request = req;
138         return rc;
139 }
140
141 int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
142                       void *data, __u32 data_len)
143 {
144         struct ldlm_request *body;
145         struct ptlrpc_request *req;
146         struct obd_device *obddev = lock->l_resource->lr_namespace->ns_obddev;
147         struct ptlrpc_client *cl = obddev->u.ldlm.ldlm_client;
148         int rc, size[2] = {sizeof(*body), data_len};
149         char *bufs[2] = {NULL, data};
150
151         req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 2, size,
152                               bufs);
153         if (!req)
154                 GOTO(out, rc = -ENOMEM);
155
156         body = lustre_msg_buf(req->rq_reqmsg, 0);
157         memcpy(&body->lock_handle1, &lock->l_remote_handle,
158                sizeof(body->lock_handle1));
159
160         if (new != NULL)
161                 ldlm_lock2desc(new, &body->lock_desc);
162
163         req->rq_replen = lustre_msg_size(0, NULL);
164
165         rc = ptlrpc_queue_wait(req);
166         rc = ptlrpc_check_status(req, rc);
167         ptlrpc_free_req(req);
168
169         EXIT;
170  out:
171         return rc;
172 }
173
174 int ldlm_cli_convert(struct ptlrpc_client *cl, struct ldlm_handle *lockh,
175                      int new_mode, int *flags, struct ptlrpc_request **request)
176 {
177         struct ldlm_request *body;
178         struct ldlm_reply *reply;
179         struct ldlm_lock *lock;
180         struct ptlrpc_request *req;
181         int rc, size[2] = {sizeof(*body), lock->l_data_len};
182         char *bufs[2] = {NULL, lock->l_data};
183
184         lock = ldlm_handle2object(lockh);
185
186         if (lock->l_connection == NULL)
187                 GOTO(local, 0);
188
189         req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CONVERT, 2, size,
190                               bufs);
191         if (!req)
192                 GOTO(out, rc = -ENOMEM);
193
194         body = lustre_msg_buf(req->rq_reqmsg, 0);
195         memcpy(&body->lock_handle1, &lock->l_remote_handle,
196                sizeof(body->lock_handle1));
197
198         body->lock_desc.l_req_mode = new_mode;
199         body->flags = *flags;
200
201         req->rq_replen = lustre_msg_size(1, size);
202
203         rc = ptlrpc_queue_wait(req);
204         rc = ptlrpc_check_status(req, rc);
205         if (rc != ELDLM_OK)
206                 GOTO(out, rc);
207
208         reply = lustre_msg_buf(req->rq_repmsg, 0);
209         *flags = reply->flags;
210
211         EXIT;
212  local:
213         rc = ldlm_local_lock_convert(lockh, new_mode, flags);
214  out:
215         *request = req;
216         return rc;
217 }
218
219 int ldlm_cli_cancel(struct ptlrpc_client *cl, struct ldlm_handle *lockh,
220                     struct ptlrpc_request **request)
221 {
222         struct ldlm_request *body;
223         struct ldlm_lock *lock;
224         struct ptlrpc_request *req;
225         int rc, size[2] = {sizeof(*body), lock->l_data_len};
226         char *bufs[2] = {NULL, lock->l_data};
227
228         lock = ldlm_handle2object(lockh);
229
230         if (lock->l_connection == NULL)
231                 GOTO(local, 0);
232
233         req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CANCEL, 2, size,
234                               bufs);
235         if (!req)
236                 GOTO(out, rc = -ENOMEM);
237
238         body = lustre_msg_buf(req->rq_reqmsg, 0);
239         memcpy(&body->lock_handle1, &lock->l_remote_handle,
240                sizeof(body->lock_handle1));
241
242         req->rq_replen = lustre_msg_size(0, NULL);
243
244         rc = ptlrpc_queue_wait(req);
245         rc = ptlrpc_check_status(req, rc);
246         if (rc != ELDLM_OK)
247                 GOTO(out, rc);
248
249         EXIT;
250  local:
251         rc = ldlm_local_lock_cancel(lockh);
252  out:
253         *request = req;
254         return rc;
255 }