1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002 Cluster File Systems, Inc.
6 * This code is issued under the GNU General Public License.
7 * See the file COPYING in this distribution
9 * by Cluster File Systems, Inc.
13 #define DEBUG_SUBSYSTEM S_LDLM
15 #include <linux/lustre_dlm.h>
17 #define LOOPBACK(x) (((x) & cpu_to_be32(0xff000000)) == cpu_to_be32(0x7f000000))
19 static int is_local_conn(struct ptlrpc_connection *conn)
25 RETURN(LOOPBACK(conn->c_peer.peer_nid));
28 int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
30 struct ldlm_handle *parent_lock_handle,
33 struct ldlm_extent *req_ex,
36 ldlm_lock_callback completion,
37 ldlm_lock_callback blocking,
40 struct ldlm_handle *lockh,
41 struct ptlrpc_request **request)
43 struct ldlm_handle local_lockh;
44 struct ldlm_lock *lock;
45 struct ldlm_request *body;
46 struct ldlm_reply *reply;
47 struct ptlrpc_request *req = NULL;
48 char *bufs[2] = {NULL, data};
49 int rc, size[2] = {sizeof(*body), data_len};
53 err = ldlm_local_lock_create(ns_id, parent_lock_handle, res_id,
58 lock = ldlm_handle2object(&local_lockh);
59 /* Is this lock locally managed? */
60 if (is_local_conn(conn))
63 req = ptlrpc_prep_req(cl, conn, LDLM_ENQUEUE, 2, size, bufs);
65 GOTO(out, rc = -ENOMEM);
67 /* Dump all of this data into the request buffer */
68 body = lustre_msg_buf(req->rq_reqmsg, 0);
69 body->lock_desc.l_resource.lr_ns_id = ns_id;
70 body->lock_desc.l_resource.lr_type = type;
71 memcpy(body->lock_desc.l_resource.lr_name, res_id,
72 sizeof(body->lock_desc.l_resource.lr_name));
74 body->lock_desc.l_req_mode = mode;
76 memcpy(&body->lock_desc.l_extent, req_ex,
77 sizeof(body->lock_desc.l_extent));
80 memcpy(&body->lock_handle1, &local_lockh, sizeof(body->lock_handle1));
82 if (parent_lock_handle)
83 memcpy(&body->lock_handle2, parent_lock_handle,
84 sizeof(body->lock_handle2));
86 /* Continue as normal. */
87 size[0] = sizeof(*reply);
88 req->rq_replen = lustre_msg_size(1, size);
90 rc = ptlrpc_queue_wait(req);
91 rc = ptlrpc_check_status(req, rc);
93 ldlm_resource_put(lock->l_resource);
98 lock->l_connection = conn;
99 reply = lustre_msg_buf(req->rq_repmsg, 0);
100 memcpy(&lock->l_remote_handle, &reply->lock_handle,
101 sizeof(lock->l_remote_handle));
102 *flags = reply->flags;
104 CERROR("remote handle: %p, flags: %d\n",
105 (void *)(unsigned long)reply->lock_handle.addr, *flags);
106 CERROR("extent: %Lu -> %Lu\n", reply->lock_extent.start,
107 reply->lock_extent.end);
111 rc = ldlm_local_lock_enqueue(&local_lockh, mode, req_ex, flags,
112 completion, blocking, data, data_len);
113 if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
114 LDLM_FL_BLOCK_CONV)) {
115 /* Go to sleep until the lock is granted. */
116 /* FIXME: or cancelled. */
117 wait_event_interruptible(lock->l_waitq, lock->l_req_mode ==
118 lock->l_granted_mode);
125 int ldlm_cli_namespace_new(struct obd_device *obddev, struct ptlrpc_client *cl,
126 struct ptlrpc_connection *conn, __u32 ns_id)
128 struct ldlm_namespace *ns;
129 struct ldlm_request *body;
130 struct ptlrpc_request *req;
131 int rc, size = sizeof(*body);
134 if (is_local_conn(conn))
137 req = ptlrpc_prep_req(cl, conn, LDLM_NAMESPACE_NEW, 1, &size, NULL);
139 GOTO(out, rc = -ENOMEM);
141 body = lustre_msg_buf(req->rq_reqmsg, 0);
142 body->lock_desc.l_resource.lr_ns_id = ns_id;
144 req->rq_replen = lustre_msg_size(0, NULL);
146 rc = ptlrpc_queue_wait(req);
147 rc = ptlrpc_check_status(req, rc);
148 ptlrpc_free_req(req);
154 rc = ldlm_namespace_new(obddev, ns_id, &ns);
155 if (rc != ELDLM_OK) {
156 /* XXX: It succeeded remotely but failed locally. What to do? */
157 CERROR("Local ldlm_namespace_new failed.\n");
163 int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
164 void *data, __u32 data_len)
166 struct ldlm_request *body;
167 struct ptlrpc_request *req;
168 struct obd_device *obddev = lock->l_resource->lr_namespace->ns_obddev;
169 struct ptlrpc_client *cl = obddev->u.ldlm.ldlm_client;
170 int rc, size[2] = {sizeof(*body), data_len};
171 char *bufs[2] = {NULL, data};
174 req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 2, size,
177 GOTO(out, rc = -ENOMEM);
179 body = lustre_msg_buf(req->rq_reqmsg, 0);
180 memcpy(&body->lock_handle1, &lock->l_remote_handle,
181 sizeof(body->lock_handle1));
184 ldlm_lock2desc(new, &body->lock_desc);
185 ldlm_object2handle(new, &body->lock_handle2);
188 req->rq_replen = lustre_msg_size(0, NULL);
190 rc = ptlrpc_queue_wait(req);
191 rc = ptlrpc_check_status(req, rc);
192 ptlrpc_free_req(req);
199 int ldlm_cli_convert(struct ptlrpc_client *cl, struct ldlm_handle *lockh,
200 int new_mode, int *flags, struct ptlrpc_request **request)
202 struct ldlm_request *body;
203 struct ldlm_reply *reply;
204 struct ldlm_lock *lock;
205 struct ptlrpc_request *req = NULL;
206 int rc, size[2] = {sizeof(*body), 0};
207 char *bufs[2] = {NULL, NULL};
210 lock = ldlm_handle2object(lockh);
212 if (is_local_conn(lock->l_connection))
215 size[1] = lock->l_data_len;
216 bufs[1] = lock->l_data;
217 req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CONVERT, 2, size,
220 GOTO(out, rc = -ENOMEM);
222 body = lustre_msg_buf(req->rq_reqmsg, 0);
223 memcpy(&body->lock_handle1, &lock->l_remote_handle,
224 sizeof(body->lock_handle1));
226 body->lock_desc.l_req_mode = new_mode;
227 body->flags = *flags;
229 req->rq_replen = lustre_msg_size(1, size);
231 rc = ptlrpc_queue_wait(req);
232 rc = ptlrpc_check_status(req, rc);
236 reply = lustre_msg_buf(req->rq_repmsg, 0);
237 *flags = reply->flags;
241 rc = ldlm_local_lock_convert(lockh, new_mode, flags);
247 int ldlm_cli_cancel(struct ptlrpc_client *cl, struct ldlm_handle *lockh,
248 struct ptlrpc_request **request)
250 struct ldlm_request *body;
251 struct ldlm_lock *lock;
252 struct ptlrpc_request *req = NULL;
253 int rc, size[2] = {sizeof(*body), 0};
254 char *bufs[2] = {NULL, NULL};
257 lock = ldlm_handle2object(lockh);
259 if (is_local_conn(lock->l_connection))
262 size[1] = lock->l_data_len;
263 bufs[1] = lock->l_data;
264 req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CANCEL, 2, size,
267 GOTO(out, rc = -ENOMEM);
269 body = lustre_msg_buf(req->rq_reqmsg, 0);
270 memcpy(&body->lock_handle1, &lock->l_remote_handle,
271 sizeof(body->lock_handle1));
273 req->rq_replen = lustre_msg_size(0, NULL);
275 rc = ptlrpc_queue_wait(req);
276 rc = ptlrpc_check_status(req, rc);
282 rc = ldlm_local_lock_cancel(lockh);