Whamcloud - gitweb
Landing the mds_lock_devel branch on the trunk. Notables:
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  * This code is issued under the GNU General Public License.
7  * See the file COPYING in this distribution
8  *
9  * by Cluster File Systems, Inc.
10  */
11
12 #define DEBUG_SUBSYSTEM S_LDLM
13
14 #include <linux/lustre_dlm.h>
15
16 int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
17                      struct ptlrpc_request *req,
18                      struct ldlm_namespace *ns,
19                      struct lustre_handle *parent_lock_handle,
20                      __u64 *res_id,
21                      __u32 type,
22                      void *cookie, int cookielen,
23                      ldlm_mode_t mode,
24                      int *flags,
25                      ldlm_lock_callback callback,
26                      void *data,
27                      __u32 data_len,
28                      struct lustre_handle *lockh)
29 {
30         struct ldlm_lock *lock;
31         struct ldlm_request *body;
32         struct ldlm_reply *reply;
33         int rc, size = sizeof(*body), req_passed_in = 1;
34         ENTRY;
35
36         *flags = 0;
37         rc = ldlm_local_lock_create(ns, parent_lock_handle, res_id, type, mode,
38                                     data, data_len, lockh);
39         if (rc != ELDLM_OK)
40                 GOTO(out, rc);
41
42         lock = lustre_handle2object(lockh);
43
44         if (req == NULL) {
45                 req = ptlrpc_prep_req(cl, conn, LDLM_ENQUEUE, 1, &size, NULL);
46                 if (!req)
47                         GOTO(out, rc = -ENOMEM);
48                 req_passed_in = 0;
49         } else if (req->rq_reqmsg->buflens[0] != sizeof(*body))
50                 LBUG();
51
52         /* Dump all of this data into the request buffer */
53         body = lustre_msg_buf(req->rq_reqmsg, 0);
54         body->lock_desc.l_resource.lr_type = type;
55         memcpy(body->lock_desc.l_resource.lr_name, res_id,
56                sizeof(body->lock_desc.l_resource.lr_name));
57
58         body->lock_desc.l_req_mode = mode;
59         if (type == LDLM_EXTENT)
60                 memcpy(&body->lock_desc.l_extent, cookie,
61                        sizeof(body->lock_desc.l_extent));
62         body->lock_flags = *flags;
63
64         memcpy(&body->lock_handle1, lockh, sizeof(body->lock_handle1));
65
66         if (parent_lock_handle)
67                 memcpy(&body->lock_handle2, parent_lock_handle,
68                        sizeof(body->lock_handle2));
69
70         /* Continue as normal. */
71         if (!req_passed_in) {
72                 size = sizeof(*reply);
73                 req->rq_replen = lustre_msg_size(1, &size);
74         }
75
76         rc = ptlrpc_queue_wait(req);
77         rc = ptlrpc_check_status(req, rc);
78         if (rc != ELDLM_OK) {
79                 spin_lock(&lock->l_resource->lr_lock);
80                 ldlm_resource_put(lock->l_resource);
81                 spin_unlock(&lock->l_resource->lr_lock);
82                 ldlm_lock_free(lock);
83                 GOTO(out, rc);
84         }
85
86         lock->l_connection = conn;
87         lock->l_client = cl;
88         reply = lustre_msg_buf(req->rq_repmsg, 0);
89         memcpy(&lock->l_remote_handle, &reply->lock_handle,
90                sizeof(lock->l_remote_handle));
91         if (type == LDLM_EXTENT)
92                 memcpy(cookie, &reply->lock_extent, sizeof(reply->lock_extent));
93         *flags = reply->lock_flags;
94
95         CDEBUG(D_INFO, "remote handle: %p, flags: %d\n",
96                (void *)(unsigned long)reply->lock_handle.addr, *flags);
97         CDEBUG(D_INFO, "extent: %Lu -> %Lu\n",
98                (unsigned long long)reply->lock_extent.start,
99                (unsigned long long)reply->lock_extent.end);
100
101         if (*flags & LDLM_FL_LOCK_CHANGED) {
102                 CDEBUG(D_INFO, "remote intent success, locking %ld instead of"
103                        "%ld\n", (long)reply->lock_resource_name[0],
104                        (long)lock->l_resource->lr_name[0]);
105                 ldlm_resource_put(lock->l_resource);
106
107                 lock->l_resource =
108                         ldlm_resource_get(ns, NULL, reply->lock_resource_name,
109                                           type, 1);
110                 if (lock->l_resource == NULL) {
111                         LBUG();
112                         RETURN(-ENOMEM);
113                 }
114         }
115
116         if (!req_passed_in)
117                 ptlrpc_free_req(req);
118
119         rc = ldlm_local_lock_enqueue(lockh, cookie, cookielen, flags, callback,
120                                      callback);
121
122         if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
123                       LDLM_FL_BLOCK_CONV)) {
124                 /* Go to sleep until the lock is granted. */
125                 /* FIXME: or cancelled. */
126                 CDEBUG(D_NET, "enqueue returned a blocked lock (%p), "
127                        "going to sleep.\n", lock);
128                 ldlm_lock_dump(lock);
129                 wait_event_interruptible(lock->l_waitq, lock->l_req_mode ==
130                                          lock->l_granted_mode);
131                 CDEBUG(D_NET, "waking up, the lock must be granted.\n");
132         }
133         EXIT;
134  out:
135         return rc;
136 }
137
138 int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
139                       void *data, __u32 data_len)
140 {
141         struct ldlm_request *body;
142         struct ptlrpc_request *req;
143         struct ptlrpc_client *cl = &lock->l_resource->lr_namespace->ns_client;
144         int rc, size = sizeof(*body);
145         ENTRY;
146
147         req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 1, &size,
148                               NULL);
149         if (!req)
150                 GOTO(out, rc = -ENOMEM);
151
152         body = lustre_msg_buf(req->rq_reqmsg, 0);
153         memcpy(&body->lock_handle1, &lock->l_remote_handle,
154                sizeof(body->lock_handle1));
155
156         if (new == NULL) {
157                 CDEBUG(D_NET, "Sending granted AST\n");
158                 ldlm_lock2desc(lock, &body->lock_desc);
159         } else {
160                 CDEBUG(D_NET, "Sending blocked AST\n");
161                 ldlm_lock2desc(new, &body->lock_desc);
162                 ldlm_object2handle(new, &body->lock_handle2);
163         }
164
165         req->rq_replen = lustre_msg_size(0, NULL);
166
167         rc = ptlrpc_queue_wait(req);
168         rc = ptlrpc_check_status(req, rc);
169         ptlrpc_free_req(req);
170
171         EXIT;
172  out:
173         return rc;
174 }
175
176 int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh,
177                      int new_mode, int *flags)
178 {
179         struct ldlm_request *body;
180         struct ldlm_reply *reply;
181         struct ldlm_lock *lock;
182         struct ldlm_resource *res;
183         struct ptlrpc_request *req;
184         int rc, size = sizeof(*body);
185         ENTRY;
186
187         lock = lustre_handle2object(lockh);
188         *flags = 0;
189
190         req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CONVERT, 1, &size,
191                               NULL);
192         if (!req)
193                 GOTO(out, rc = -ENOMEM);
194
195         body = lustre_msg_buf(req->rq_reqmsg, 0);
196         memcpy(&body->lock_handle1, &lock->l_remote_handle,
197                sizeof(body->lock_handle1));
198
199         body->lock_desc.l_req_mode = new_mode;
200         body->lock_flags = *flags;
201
202         size = sizeof(*reply);
203         req->rq_replen = lustre_msg_size(1, &size);
204
205         rc = ptlrpc_queue_wait(req);
206         rc = ptlrpc_check_status(req, rc);
207         if (rc != ELDLM_OK)
208                 GOTO(out, rc);
209
210         reply = lustre_msg_buf(req->rq_repmsg, 0);
211         res = ldlm_local_lock_convert(lockh, new_mode, &reply->lock_flags);
212         if (res != NULL)
213                 ldlm_reprocess_all(res);
214         if (lock->l_req_mode != lock->l_granted_mode) {
215                 /* Go to sleep until the lock is granted. */
216                 /* FIXME: or cancelled. */
217                 CDEBUG(D_NET, "convert returned a blocked lock, "
218                        "going to sleep.\n");
219                 wait_event_interruptible(lock->l_waitq, lock->l_req_mode ==
220                                          lock->l_granted_mode);
221                 CDEBUG(D_NET, "waking up, the lock must be granted.\n");
222         }
223         EXIT;
224  out:
225         ptlrpc_free_req(req);
226         return rc;
227 }
228
229 int ldlm_cli_cancel(struct ptlrpc_client *cl, struct ldlm_lock *lock)
230 {
231         struct ldlm_request *body;
232         struct ptlrpc_request *req;
233         struct ldlm_resource *res;
234         int rc, size = sizeof(*body);
235         ENTRY;
236
237         req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CANCEL, 1, &size,
238                               NULL);
239         if (!req)
240                 GOTO(out, rc = -ENOMEM);
241
242         body = lustre_msg_buf(req->rq_reqmsg, 0);
243         memcpy(&body->lock_handle1, &lock->l_remote_handle,
244                sizeof(body->lock_handle1));
245
246         req->rq_replen = lustre_msg_size(0, NULL);
247
248         rc = ptlrpc_queue_wait(req);
249         rc = ptlrpc_check_status(req, rc);
250         ptlrpc_free_req(req);
251         if (rc != ELDLM_OK)
252                 GOTO(out, rc);
253
254         res = ldlm_local_lock_cancel(lock);
255         if (res != NULL)
256                 ldlm_reprocess_all(res);
257         else
258                 rc = ELDLM_RESOURCE_FREED;
259         EXIT;
260  out:
261         return rc;
262 }