Whamcloud - gitweb
- Fixed some 64bit warnings
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  */
11
12 #define DEBUG_SUBSYSTEM S_LDLM
13
14 #include <linux/lustre_dlm.h>
15
16 int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
17                      struct ldlm_namespace *ns,
18                      struct ldlm_handle *parent_lock_handle,
19                      __u64 *res_id,
20                      __u32 type,
21                      struct ldlm_extent *req_ex,
22                      ldlm_mode_t mode,
23                      int *flags,
24                      void *data,
25                      __u32 data_len,
26                      struct ldlm_handle *lockh)
27 {
28         struct ldlm_lock *lock;
29         struct ldlm_request *body;
30         struct ldlm_reply *reply;
31         struct ptlrpc_request *req;
32         char *bufs[2] = {NULL, data};
33         int rc, size[2] = {sizeof(*body), data_len};
34         ENTRY;
35
36         *flags = 0;
37         rc = ldlm_local_lock_create(ns, parent_lock_handle, res_id, type, mode,
38                                     NULL, 0, lockh);
39         if (rc != ELDLM_OK)
40                 GOTO(out, rc);
41
42         lock = ldlm_handle2object(lockh);
43
44         req = ptlrpc_prep_req(cl, conn, LDLM_ENQUEUE, 2, size, bufs);
45         if (!req)
46                 GOTO(out, rc = -ENOMEM);
47
48         /* Dump all of this data into the request buffer */
49         body = lustre_msg_buf(req->rq_reqmsg, 0);
50         body->lock_desc.l_resource.lr_type = type;
51         memcpy(body->lock_desc.l_resource.lr_name, res_id,
52                sizeof(body->lock_desc.l_resource.lr_name));
53
54         body->lock_desc.l_req_mode = mode;
55         if (req_ex)
56                 memcpy(&body->lock_desc.l_extent, req_ex,
57                        sizeof(body->lock_desc.l_extent));
58         body->flags = *flags;
59
60         memcpy(&body->lock_handle1, lockh, sizeof(body->lock_handle1));
61
62         if (parent_lock_handle)
63                 memcpy(&body->lock_handle2, parent_lock_handle,
64                        sizeof(body->lock_handle2));
65
66         /* Continue as normal. */
67         size[0] = sizeof(*reply);
68         req->rq_replen = lustre_msg_size(1, size);
69
70         rc = ptlrpc_queue_wait(req);
71         rc = ptlrpc_check_status(req, rc);
72         if (rc != ELDLM_OK) {
73                 spin_lock(&lock->l_resource->lr_lock);
74                 ldlm_resource_put(lock->l_resource);
75                 spin_unlock(&lock->l_resource->lr_lock);
76                 ldlm_lock_free(lock);
77                 GOTO(out, rc);
78         }
79
80         lock->l_connection = conn;
81         lock->l_client = cl;
82         reply = lustre_msg_buf(req->rq_repmsg, 0);
83         memcpy(&lock->l_remote_handle, &reply->lock_handle,
84                sizeof(lock->l_remote_handle));
85         memcpy(req_ex, &reply->lock_extent, sizeof(*req_ex));
86         *flags = reply->flags;
87
88         CDEBUG(D_INFO, "remote handle: %p, flags: %d\n",
89                (void *)(unsigned long)reply->lock_handle.addr, *flags);
90         CDEBUG(D_INFO, "extent: %Lu -> %Lu\n",
91                (unsigned long long)reply->lock_extent.start,
92                (unsigned long long)reply->lock_extent.end);
93
94         ptlrpc_free_req(req);
95
96         rc = ldlm_local_lock_enqueue(lockh, req_ex, flags, NULL, NULL);
97
98         if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
99                       LDLM_FL_BLOCK_CONV)) {
100                 /* Go to sleep until the lock is granted. */
101                 /* FIXME: or cancelled. */
102                 CDEBUG(D_NET, "enqueue returned a blocked lock (%p), "
103                        "going to sleep.\n", lock);
104                 ldlm_lock_dump(lock);
105                 wait_event_interruptible(lock->l_waitq, lock->l_req_mode ==
106                                          lock->l_granted_mode);
107                 CDEBUG(D_NET, "waking up, the lock must be granted.\n");
108         }
109         EXIT;
110  out:
111         return rc;
112 }
113
114 int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
115                       void *data, __u32 data_len)
116 {
117         struct ldlm_request *body;
118         struct ptlrpc_request *req;
119         struct obd_device *obddev = lock->l_resource->lr_namespace->ns_obddev;
120         struct ptlrpc_client *cl = obddev->u.ldlm.ldlm_client;
121         int rc, size[2] = {sizeof(*body), data_len};
122         char *bufs[2] = {NULL, data};
123         ENTRY;
124
125         req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 2, size,
126                               bufs);
127         if (!req)
128                 GOTO(out, rc = -ENOMEM);
129
130         body = lustre_msg_buf(req->rq_reqmsg, 0);
131         memcpy(&body->lock_handle1, &lock->l_remote_handle,
132                sizeof(body->lock_handle1));
133
134         if (new == NULL) {
135                 CDEBUG(D_NET, "Sending granted AST\n");
136                 ldlm_lock2desc(lock, &body->lock_desc);
137         } else {
138                 CDEBUG(D_NET, "Sending blocked AST\n");
139                 ldlm_lock2desc(new, &body->lock_desc);
140                 ldlm_object2handle(new, &body->lock_handle2);
141         }
142
143         req->rq_replen = lustre_msg_size(0, NULL);
144
145         rc = ptlrpc_queue_wait(req);
146         rc = ptlrpc_check_status(req, rc);
147         ptlrpc_free_req(req);
148
149         EXIT;
150  out:
151         return rc;
152 }
153
154 int ldlm_cli_convert(struct ptlrpc_client *cl, struct ldlm_handle *lockh,
155                      int new_mode, int *flags)
156 {
157         struct ldlm_request *body;
158         struct ldlm_lock *lock;
159         struct ldlm_resource *res;
160         struct ptlrpc_request *req;
161         int rc, size[2] = {sizeof(*body), 0};
162         char *bufs[2] = {NULL, NULL};
163         ENTRY;
164
165         lock = ldlm_handle2object(lockh);
166         *flags = 0;
167
168         size[1] = lock->l_data_len;
169         bufs[1] = lock->l_data;
170         req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CONVERT, 2, size,
171                               bufs);
172         if (!req)
173                 GOTO(out, rc = -ENOMEM);
174
175         body = lustre_msg_buf(req->rq_reqmsg, 0);
176         memcpy(&body->lock_handle1, &lock->l_remote_handle,
177                sizeof(body->lock_handle1));
178
179         body->lock_desc.l_req_mode = new_mode;
180         body->flags = *flags;
181
182         req->rq_replen = lustre_msg_size(1, size);
183
184         rc = ptlrpc_queue_wait(req);
185         rc = ptlrpc_check_status(req, rc);
186         if (rc != ELDLM_OK)
187                 GOTO(out, rc);
188
189         body = lustre_msg_buf(req->rq_repmsg, 0);
190         res = ldlm_local_lock_convert(lockh, new_mode, &body->flags);
191         if (res != NULL)
192                 ldlm_reprocess_all(res);
193         if (lock->l_req_mode != lock->l_granted_mode) {
194                 /* Go to sleep until the lock is granted. */
195                 /* FIXME: or cancelled. */
196                 CDEBUG(D_NET, "convert returned a blocked lock, "
197                        "going to sleep.\n");
198                 wait_event_interruptible(lock->l_waitq, lock->l_req_mode ==
199                                          lock->l_granted_mode);
200                 CDEBUG(D_NET, "waking up, the lock must be granted.\n");
201         }
202         EXIT;
203  out:
204         ptlrpc_free_req(req);
205         return rc;
206 }
207
208 int ldlm_cli_cancel(struct ptlrpc_client *cl, struct ldlm_lock *lock)
209 {
210         struct ldlm_request *body;
211         struct ptlrpc_request *req;
212         struct ldlm_resource *res;
213         int rc, size[2] = {sizeof(*body), 0};
214         char *bufs[2] = {NULL, NULL};
215         ENTRY;
216
217         if (lock->l_data_len == sizeof(struct inode)) {
218                 /* FIXME: do something better than throwing away everything */
219                 struct inode *inode = lock->l_data;
220                 if (inode == NULL)
221                         LBUG();
222                 down(&inode->i_sem);
223                 invalidate_inode_pages(inode);
224                 up(&inode->i_sem);
225         }
226
227         size[1] = lock->l_data_len;
228         bufs[1] = lock->l_data;
229         req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CANCEL, 2, size,
230                               bufs);
231         if (!req)
232                 GOTO(out, rc = -ENOMEM);
233
234         body = lustre_msg_buf(req->rq_reqmsg, 0);
235         memcpy(&body->lock_handle1, &lock->l_remote_handle,
236                sizeof(body->lock_handle1));
237
238         req->rq_replen = lustre_msg_size(0, NULL);
239
240         rc = ptlrpc_queue_wait(req);
241         rc = ptlrpc_check_status(req, rc);
242         ptlrpc_free_req(req);
243         if (rc != ELDLM_OK)
244                 GOTO(out, rc);
245
246         res = ldlm_local_lock_cancel(lock);
247         if (res != NULL)
248                 ldlm_reprocess_all(res);
249         EXIT;
250  out:
251         return rc;
252 }