1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002 Cluster File Systems, Inc.
6 * This code is issued under the GNU General Public License.
7 * See the file COPYING in this distribution
9 * by Cluster File Systems, Inc.
14 #define DEBUG_SUBSYSTEM S_LDLM
16 #include <linux/lustre_dlm.h>
18 int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
20 struct ldlm_handle *parent_lock_handle,
23 struct ldlm_extent *req_ex,
28 struct ldlm_handle *lockh,
29 struct ptlrpc_request **request)
31 struct ldlm_handle local_lockh;
32 struct ldlm_lock *lock;
33 struct ldlm_request *body;
34 struct ldlm_reply *reply;
35 struct ptlrpc_request *req = NULL;
36 char *bufs[2] = {NULL, data};
37 int rc, size[2] = {sizeof(*body), data_len};
41 err = ldlm_local_lock_create(ns_id, parent_lock_handle, res_id,
46 /* Is this lock locally managed? */
50 req = ptlrpc_prep_req(cl, conn, LDLM_ENQUEUE, 2, size, bufs);
52 GOTO(out, rc = -ENOMEM);
54 /* Dump all of this data into the request buffer */
55 body = lustre_msg_buf(req->rq_reqmsg, 0);
56 body->lock_desc.l_resource.lr_ns_id = ns_id;
57 body->lock_desc.l_resource.lr_type = type;
58 memcpy(body->lock_desc.l_resource.lr_name, res_id,
59 sizeof(body->lock_desc.l_resource.lr_name));
61 body->lock_desc.l_req_mode = mode;
63 memcpy(&body->lock_desc.l_extent, req_ex,
64 sizeof(body->lock_desc.l_extent));
67 memcpy(&body->lock_handle1, &local_lockh, sizeof(body->lock_handle1));
69 if (parent_lock_handle)
70 memcpy(&body->lock_handle2, parent_lock_handle,
71 sizeof(body->lock_handle2));
73 /* Continue as normal. */
74 size[0] = sizeof(*reply);
75 req->rq_replen = lustre_msg_size(1, size);
77 rc = ptlrpc_queue_wait(req);
78 rc = ptlrpc_check_status(req, rc);
82 reply = lustre_msg_buf(req->rq_repmsg, 0);
83 lock = ldlm_handle2object(&local_lockh);
84 memcpy(&lock->l_remote_handle, &reply->lock_handle,
85 sizeof(lock->l_remote_handle));
86 *flags = reply->flags;
88 CERROR("remote handle: %p, flags: %d\n",
89 (void *)(unsigned long)reply->lock_handle.addr, *flags);
90 CERROR("extent: %Lu -> %Lu\n", reply->lock_extent.start,
91 reply->lock_extent.end);
95 rc = ldlm_local_lock_enqueue(&local_lockh, mode, req_ex, flags, NULL,
96 NULL, data, data_len);
102 int ldlm_cli_namespace_new(struct obd_device *obddev, struct ptlrpc_client *cl,
103 struct ptlrpc_connection *conn, __u32 ns_id,
104 struct ptlrpc_request **request)
106 struct ldlm_namespace *ns;
107 struct ldlm_request *body;
108 struct ptlrpc_request *req;
109 int rc, size = sizeof(*body);
115 req = ptlrpc_prep_req(cl, conn, LDLM_NAMESPACE_NEW, 1, &size, NULL);
117 GOTO(out, rc = -ENOMEM);
119 body = lustre_msg_buf(req->rq_reqmsg, 0);
120 body->lock_desc.l_resource.lr_ns_id = ns_id;
122 req->rq_replen = lustre_msg_size(0, NULL);
124 rc = ptlrpc_queue_wait(req);
125 rc = ptlrpc_check_status(req, rc);
131 err = ldlm_namespace_new(obddev, ns_id, &ns);
132 if (err != ELDLM_OK) {
133 /* XXX: It succeeded remotely but failed locally. What to do? */
141 int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
142 void *data, __u32 data_len)
144 struct ldlm_request *body;
145 struct ptlrpc_request *req;
146 struct obd_device *obddev = lock->l_resource->lr_namespace->ns_obddev;
147 struct ptlrpc_client *cl = obddev->u.ldlm.ldlm_client;
148 int rc, size[2] = {sizeof(*body), data_len};
149 char *bufs[2] = {NULL, data};
151 req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 2, size,
154 GOTO(out, rc = -ENOMEM);
156 body = lustre_msg_buf(req->rq_reqmsg, 0);
157 memcpy(&body->lock_handle1, &lock->l_remote_handle,
158 sizeof(body->lock_handle1));
161 ldlm_lock2desc(new, &body->lock_desc);
163 req->rq_replen = lustre_msg_size(0, NULL);
165 rc = ptlrpc_queue_wait(req);
166 rc = ptlrpc_check_status(req, rc);
167 ptlrpc_free_req(req);
174 int ldlm_cli_convert(struct ptlrpc_client *cl, struct ldlm_handle *lockh,
175 int new_mode, int *flags, struct ptlrpc_request **request)
177 struct ldlm_request *body;
178 struct ldlm_reply *reply;
179 struct ldlm_lock *lock;
180 struct ptlrpc_request *req;
181 int rc, size[2] = {sizeof(*body), lock->l_data_len};
182 char *bufs[2] = {NULL, lock->l_data};
184 lock = ldlm_handle2object(lockh);
186 if (lock->l_connection == NULL)
189 req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CONVERT, 2, size,
192 GOTO(out, rc = -ENOMEM);
194 body = lustre_msg_buf(req->rq_reqmsg, 0);
195 memcpy(&body->lock_handle1, &lock->l_remote_handle,
196 sizeof(body->lock_handle1));
198 body->lock_desc.l_req_mode = new_mode;
199 body->flags = *flags;
201 req->rq_replen = lustre_msg_size(1, size);
203 rc = ptlrpc_queue_wait(req);
204 rc = ptlrpc_check_status(req, rc);
208 reply = lustre_msg_buf(req->rq_repmsg, 0);
209 *flags = reply->flags;
213 rc = ldlm_local_lock_convert(lockh, new_mode, flags);
219 int ldlm_cli_cancel(struct ptlrpc_client *cl, struct ldlm_handle *lockh,
220 struct ptlrpc_request **request)
222 struct ldlm_request *body;
223 struct ldlm_lock *lock;
224 struct ptlrpc_request *req;
225 int rc, size[2] = {sizeof(*body), lock->l_data_len};
226 char *bufs[2] = {NULL, lock->l_data};
228 lock = ldlm_handle2object(lockh);
230 if (lock->l_connection == NULL)
233 req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CANCEL, 2, size,
236 GOTO(out, rc = -ENOMEM);
238 body = lustre_msg_buf(req->rq_reqmsg, 0);
239 memcpy(&body->lock_handle1, &lock->l_remote_handle,
240 sizeof(body->lock_handle1));
242 req->rq_replen = lustre_msg_size(0, NULL);
244 rc = ptlrpc_queue_wait(req);
245 rc = ptlrpc_check_status(req, rc);
251 rc = ldlm_local_lock_cancel(lockh);