1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002 Cluster File Systems, Inc.
6 * This code is issued under the GNU General Public License.
7 * See the file COPYING in this distribution
9 * by Cluster File Systems, Inc.
12 #define DEBUG_SUBSYSTEM S_LDLM
14 #include <linux/lustre_dlm.h>
16 int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
17 struct ldlm_namespace *ns,
18 struct ldlm_handle *parent_lock_handle,
21 struct ldlm_extent *req_ex,
26 struct ldlm_handle *lockh)
28 struct ldlm_lock *lock;
29 struct ldlm_request *body;
30 struct ldlm_reply *reply;
31 struct ptlrpc_request *req;
32 char *bufs[2] = {NULL, data};
33 int rc, size[2] = {sizeof(*body), data_len};
37 rc = ldlm_local_lock_create(ns, parent_lock_handle, res_id, type, mode,
42 lock = ldlm_handle2object(lockh);
44 req = ptlrpc_prep_req(cl, conn, LDLM_ENQUEUE, 2, size, bufs);
46 GOTO(out, rc = -ENOMEM);
48 /* Dump all of this data into the request buffer */
49 body = lustre_msg_buf(req->rq_reqmsg, 0);
50 body->lock_desc.l_resource.lr_type = type;
51 memcpy(body->lock_desc.l_resource.lr_name, res_id,
52 sizeof(body->lock_desc.l_resource.lr_name));
54 body->lock_desc.l_req_mode = mode;
56 memcpy(&body->lock_desc.l_extent, req_ex,
57 sizeof(body->lock_desc.l_extent));
60 memcpy(&body->lock_handle1, lockh, sizeof(body->lock_handle1));
62 if (parent_lock_handle)
63 memcpy(&body->lock_handle2, parent_lock_handle,
64 sizeof(body->lock_handle2));
66 /* Continue as normal. */
67 size[0] = sizeof(*reply);
68 req->rq_replen = lustre_msg_size(1, size);
70 rc = ptlrpc_queue_wait(req);
71 rc = ptlrpc_check_status(req, rc);
73 spin_lock(&lock->l_resource->lr_lock);
74 ldlm_resource_put(lock->l_resource);
75 spin_unlock(&lock->l_resource->lr_lock);
80 lock->l_connection = conn;
82 reply = lustre_msg_buf(req->rq_repmsg, 0);
83 memcpy(&lock->l_remote_handle, &reply->lock_handle,
84 sizeof(lock->l_remote_handle));
85 memcpy(req_ex, &reply->lock_extent, sizeof(*req_ex));
86 *flags = reply->flags;
88 CDEBUG(D_INFO, "remote handle: %p, flags: %d\n",
89 (void *)(unsigned long)reply->lock_handle.addr, *flags);
90 CDEBUG(D_INFO, "extent: %Lu -> %Lu\n",
91 (unsigned long long)reply->lock_extent.start,
92 (unsigned long long)reply->lock_extent.end);
96 rc = ldlm_local_lock_enqueue(lockh, req_ex, flags, NULL, NULL);
98 if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
99 LDLM_FL_BLOCK_CONV)) {
100 /* Go to sleep until the lock is granted. */
101 /* FIXME: or cancelled. */
102 CDEBUG(D_NET, "enqueue returned a blocked lock (%p), "
103 "going to sleep.\n", lock);
104 ldlm_lock_dump(lock);
105 wait_event_interruptible(lock->l_waitq, lock->l_req_mode ==
106 lock->l_granted_mode);
107 CDEBUG(D_NET, "waking up, the lock must be granted.\n");
114 int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
115 void *data, __u32 data_len)
117 struct ldlm_request *body;
118 struct ptlrpc_request *req;
119 struct obd_device *obddev = lock->l_resource->lr_namespace->ns_obddev;
120 struct ptlrpc_client *cl = obddev->u.ldlm.ldlm_client;
121 int rc, size[2] = {sizeof(*body), data_len};
122 char *bufs[2] = {NULL, data};
125 req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 2, size,
128 GOTO(out, rc = -ENOMEM);
130 body = lustre_msg_buf(req->rq_reqmsg, 0);
131 memcpy(&body->lock_handle1, &lock->l_remote_handle,
132 sizeof(body->lock_handle1));
135 CDEBUG(D_NET, "Sending granted AST\n");
136 ldlm_lock2desc(lock, &body->lock_desc);
138 CDEBUG(D_NET, "Sending blocked AST\n");
139 ldlm_lock2desc(new, &body->lock_desc);
140 ldlm_object2handle(new, &body->lock_handle2);
143 req->rq_replen = lustre_msg_size(0, NULL);
145 rc = ptlrpc_queue_wait(req);
146 rc = ptlrpc_check_status(req, rc);
147 ptlrpc_free_req(req);
154 int ldlm_cli_convert(struct ptlrpc_client *cl, struct ldlm_handle *lockh,
155 int new_mode, int *flags)
157 struct ldlm_request *body;
158 struct ldlm_lock *lock;
159 struct ldlm_resource *res;
160 struct ptlrpc_request *req;
161 int rc, size[2] = {sizeof(*body), 0};
162 char *bufs[2] = {NULL, NULL};
165 lock = ldlm_handle2object(lockh);
168 size[1] = lock->l_data_len;
169 bufs[1] = lock->l_data;
170 req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CONVERT, 2, size,
173 GOTO(out, rc = -ENOMEM);
175 body = lustre_msg_buf(req->rq_reqmsg, 0);
176 memcpy(&body->lock_handle1, &lock->l_remote_handle,
177 sizeof(body->lock_handle1));
179 body->lock_desc.l_req_mode = new_mode;
180 body->flags = *flags;
182 req->rq_replen = lustre_msg_size(1, size);
184 rc = ptlrpc_queue_wait(req);
185 rc = ptlrpc_check_status(req, rc);
189 body = lustre_msg_buf(req->rq_repmsg, 0);
190 res = ldlm_local_lock_convert(lockh, new_mode, &body->flags);
192 ldlm_reprocess_all(res);
193 if (lock->l_req_mode != lock->l_granted_mode) {
194 /* Go to sleep until the lock is granted. */
195 /* FIXME: or cancelled. */
196 CDEBUG(D_NET, "convert returned a blocked lock, "
197 "going to sleep.\n");
198 wait_event_interruptible(lock->l_waitq, lock->l_req_mode ==
199 lock->l_granted_mode);
200 CDEBUG(D_NET, "waking up, the lock must be granted.\n");
204 ptlrpc_free_req(req);
208 int ldlm_cli_cancel(struct ptlrpc_client *cl, struct ldlm_lock *lock)
210 struct ldlm_request *body;
211 struct ptlrpc_request *req;
212 struct ldlm_resource *res;
213 int rc, size[2] = {sizeof(*body), 0};
214 char *bufs[2] = {NULL, NULL};
217 if (lock->l_data_len == sizeof(struct inode)) {
218 /* FIXME: do something better than throwing away everything */
219 struct inode *inode = lock->l_data;
223 invalidate_inode_pages(inode);
227 size[1] = lock->l_data_len;
228 bufs[1] = lock->l_data;
229 req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CANCEL, 2, size,
232 GOTO(out, rc = -ENOMEM);
234 body = lustre_msg_buf(req->rq_reqmsg, 0);
235 memcpy(&body->lock_handle1, &lock->l_remote_handle,
236 sizeof(body->lock_handle1));
238 req->rq_replen = lustre_msg_size(0, NULL);
240 rc = ptlrpc_queue_wait(req);
241 rc = ptlrpc_check_status(req, rc);
242 ptlrpc_free_req(req);
246 res = ldlm_local_lock_cancel(lock);
248 ldlm_reprocess_all(res);