1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002 Cluster File Systems, Inc.
6 * This code is issued under the GNU General Public License.
7 * See the file COPYING in this distribution
9 * by Cluster File Systems, Inc.
13 #define DEBUG_SUBSYSTEM S_LDLM
15 #include <linux/lustre_dlm.h>
17 #define LOOPBACK(x) (((x) & cpu_to_be32(0xff000000)) == cpu_to_be32(0x7f000000))
19 static int is_local_conn(struct ptlrpc_connection *conn)
25 RETURN(LOOPBACK(conn->c_peer.peer_nid));
28 int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
30 struct ldlm_handle *parent_lock_handle,
33 struct ldlm_extent *req_ex,
36 ldlm_lock_callback completion,
37 ldlm_lock_callback blocking,
40 struct ldlm_handle *lockh,
41 struct ptlrpc_request **request)
43 struct ldlm_handle local_lockh;
44 struct ldlm_lock *lock;
45 struct ldlm_request *body;
46 struct ldlm_reply *reply;
47 struct ptlrpc_request *req = NULL;
48 char *bufs[2] = {NULL, data};
49 int rc, size[2] = {sizeof(*body), data_len};
53 err = ldlm_local_lock_create(ns_id, parent_lock_handle, res_id,
58 lock = ldlm_handle2object(&local_lockh);
59 /* Is this lock locally managed? */
60 if (is_local_conn(conn))
63 req = ptlrpc_prep_req(cl, conn, LDLM_ENQUEUE, 2, size, bufs);
65 GOTO(out, rc = -ENOMEM);
67 /* Dump all of this data into the request buffer */
68 body = lustre_msg_buf(req->rq_reqmsg, 0);
69 body->lock_desc.l_resource.lr_ns_id = ns_id;
70 body->lock_desc.l_resource.lr_type = type;
71 memcpy(body->lock_desc.l_resource.lr_name, res_id,
72 sizeof(body->lock_desc.l_resource.lr_name));
74 body->lock_desc.l_req_mode = mode;
76 memcpy(&body->lock_desc.l_extent, req_ex,
77 sizeof(body->lock_desc.l_extent));
80 memcpy(&body->lock_handle1, &local_lockh, sizeof(body->lock_handle1));
82 if (parent_lock_handle)
83 memcpy(&body->lock_handle2, parent_lock_handle,
84 sizeof(body->lock_handle2));
86 /* Continue as normal. */
87 size[0] = sizeof(*reply);
88 req->rq_replen = lustre_msg_size(1, size);
90 rc = ptlrpc_queue_wait(req);
91 rc = ptlrpc_check_status(req, rc);
93 ldlm_resource_put(lock->l_resource);
98 lock->l_connection = conn;
99 reply = lustre_msg_buf(req->rq_repmsg, 0);
100 memcpy(&lock->l_remote_handle, &reply->lock_handle,
101 sizeof(lock->l_remote_handle));
102 *flags = reply->flags;
104 CERROR("remote handle: %p, flags: %d\n",
105 (void *)(unsigned long)reply->lock_handle.addr, *flags);
106 CERROR("extent: %Lu -> %Lu\n",
107 (unsigned long long)reply->lock_extent.start,
108 (unsigned long long)reply->lock_extent.end);
112 rc = ldlm_local_lock_enqueue(&local_lockh, mode, req_ex, flags,
113 completion, blocking, data, data_len);
114 if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
115 LDLM_FL_BLOCK_CONV)) {
116 /* Go to sleep until the lock is granted. */
117 /* FIXME: or cancelled. */
118 wait_event_interruptible(lock->l_waitq, lock->l_req_mode ==
119 lock->l_granted_mode);
126 int ldlm_cli_namespace_new(struct obd_device *obddev, struct ptlrpc_client *cl,
127 struct ptlrpc_connection *conn, __u32 ns_id)
129 struct ldlm_namespace *ns;
130 struct ldlm_request *body;
131 struct ptlrpc_request *req;
132 int rc, size = sizeof(*body);
135 if (is_local_conn(conn))
138 req = ptlrpc_prep_req(cl, conn, LDLM_NAMESPACE_NEW, 1, &size, NULL);
140 GOTO(out, rc = -ENOMEM);
142 body = lustre_msg_buf(req->rq_reqmsg, 0);
143 body->lock_desc.l_resource.lr_ns_id = ns_id;
145 req->rq_replen = lustre_msg_size(0, NULL);
147 rc = ptlrpc_queue_wait(req);
148 rc = ptlrpc_check_status(req, rc);
149 ptlrpc_free_req(req);
155 rc = ldlm_namespace_new(obddev, ns_id, &ns);
156 if (rc != ELDLM_OK) {
157 /* XXX: It succeeded remotely but failed locally. What to do? */
158 CERROR("Local ldlm_namespace_new failed.\n");
164 int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
165 void *data, __u32 data_len)
167 struct ldlm_request *body;
168 struct ptlrpc_request *req;
169 struct obd_device *obddev = lock->l_resource->lr_namespace->ns_obddev;
170 struct ptlrpc_client *cl = obddev->u.ldlm.ldlm_client;
171 int rc, size[2] = {sizeof(*body), data_len};
172 char *bufs[2] = {NULL, data};
175 req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 2, size,
178 GOTO(out, rc = -ENOMEM);
180 body = lustre_msg_buf(req->rq_reqmsg, 0);
181 memcpy(&body->lock_handle1, &lock->l_remote_handle,
182 sizeof(body->lock_handle1));
185 ldlm_lock2desc(new, &body->lock_desc);
186 ldlm_object2handle(new, &body->lock_handle2);
189 req->rq_replen = lustre_msg_size(0, NULL);
191 rc = ptlrpc_queue_wait(req);
192 rc = ptlrpc_check_status(req, rc);
193 ptlrpc_free_req(req);
200 int ldlm_cli_convert(struct ptlrpc_client *cl, struct ldlm_handle *lockh,
201 int new_mode, int *flags, struct ptlrpc_request **request)
203 struct ldlm_request *body;
204 struct ldlm_reply *reply;
205 struct ldlm_lock *lock;
206 struct ptlrpc_request *req = NULL;
207 int rc, size[2] = {sizeof(*body), 0};
208 char *bufs[2] = {NULL, NULL};
211 lock = ldlm_handle2object(lockh);
213 if (is_local_conn(lock->l_connection))
216 size[1] = lock->l_data_len;
217 bufs[1] = lock->l_data;
218 req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CONVERT, 2, size,
221 GOTO(out, rc = -ENOMEM);
223 body = lustre_msg_buf(req->rq_reqmsg, 0);
224 memcpy(&body->lock_handle1, &lock->l_remote_handle,
225 sizeof(body->lock_handle1));
227 body->lock_desc.l_req_mode = new_mode;
228 body->flags = *flags;
230 req->rq_replen = lustre_msg_size(1, size);
232 rc = ptlrpc_queue_wait(req);
233 rc = ptlrpc_check_status(req, rc);
237 reply = lustre_msg_buf(req->rq_repmsg, 0);
238 *flags = reply->flags;
242 rc = ldlm_local_lock_convert(lockh, new_mode, flags);
248 int ldlm_cli_cancel(struct ptlrpc_client *cl, struct ldlm_handle *lockh,
249 struct ptlrpc_request **request)
251 struct ldlm_request *body;
252 struct ldlm_lock *lock;
253 struct ptlrpc_request *req = NULL;
254 int rc, size[2] = {sizeof(*body), 0};
255 char *bufs[2] = {NULL, NULL};
258 lock = ldlm_handle2object(lockh);
260 if (is_local_conn(lock->l_connection))
263 size[1] = lock->l_data_len;
264 bufs[1] = lock->l_data;
265 req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CANCEL, 2, size,
268 GOTO(out, rc = -ENOMEM);
270 body = lustre_msg_buf(req->rq_reqmsg, 0);
271 memcpy(&body->lock_handle1, &lock->l_remote_handle,
272 sizeof(body->lock_handle1));
274 req->rq_replen = lustre_msg_size(0, NULL);
276 rc = ptlrpc_queue_wait(req);
277 rc = ptlrpc_check_status(req, rc);
283 rc = ldlm_local_lock_cancel(lockh);