Whamcloud - gitweb
Land b1_8_gate onto b1_8 (20081218_1708)
[fs/lustre-release.git] / lustre / mdc / mdc_reint.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_MDC
41
42 #ifdef __KERNEL__
43 #ifndef AUTOCONF_INCLUDED
44 # include <linux/config.h>
45 #endif
46 # include <linux/module.h>
47 # include <linux/kernel.h>
48 #else
49 # include <liblustre.h>
50 #endif
51
52 #include <obd_class.h>
53 #include "mdc_internal.h"
54
55 /* mdc_setattr does its own semaphore handling */
56 static int mdc_reint(struct ptlrpc_request *request,
57                      struct mdc_rpc_lock *rpc_lock, int level)
58 {
59         int rc;
60
61         request->rq_send_state = level;
62
63         mdc_get_rpc_lock(rpc_lock, NULL);
64         rc = ptlrpc_queue_wait(request);
65         mdc_put_rpc_lock(rpc_lock, NULL);
66         if (rc)
67                 CDEBUG(D_INFO, "error in handling %d\n", rc);
68         else if (!lustre_swab_repbuf(request, REPLY_REC_OFF,
69                                      sizeof(struct mds_body),
70                                      lustre_swab_mds_body)) {
71                 CERROR ("Can't unpack mds_body\n");
72                 rc = -EPROTO;
73         }
74         return rc;
75 }
76
77 /* Find and cancel locally locks matched by inode @bits & @mode in the resource
78  * found by @fid. Found locks are added into @cancel list. Returns the amount of
79  * locks added to @cancels list. */
80 int mdc_resource_get_unused(struct obd_export *exp, struct ll_fid *fid,
81                             struct list_head *cancels, ldlm_mode_t mode,
82                             __u64 bits)
83 {
84         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
85         struct ldlm_res_id res_id;
86         struct ldlm_resource *res;
87         ldlm_policy_data_t policy = {{0}};
88         int count;
89         ENTRY;
90
91         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
92         res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
93
94         if (res == NULL)
95                 RETURN(0);
96
97         /* Initialize ibits lock policy. */
98         policy.l_inodebits.bits = bits;
99         count = ldlm_cancel_resource_local(res, cancels, &policy,
100                                            mode, 0, 0, NULL);
101         ldlm_resource_putref(res);
102         RETURN(count);
103 }
104
105 struct ptlrpc_request *mdc_prep_elc_req(struct obd_export *exp,
106                                         int bufcount, __u32 *size, int off,
107                                         struct list_head *cancels, int count)
108 {
109         return ldlm_prep_elc_req(exp, LUSTRE_MDS_VERSION, MDS_REINT,
110                                  bufcount, size, off, 0, cancels, count);
111 }
112
113 /* If mdc_setattr is called with an 'iattr', then it is a normal RPC that
114  * should take the normal semaphore and go to the normal portal.
115  *
116  * If it is called with iattr->ia_valid & ATTR_FROM_OPEN, then it is a
117  * magic open-path setattr that should take the setattr semaphore and
118  * go to the setattr portal. */
119 int mdc_setattr(struct obd_export *exp, struct mdc_op_data *op_data,
120                 struct iattr *iattr, void *ea, int ealen, void *ea2, int ea2len,
121                 struct ptlrpc_request **request)
122 {
123         CFS_LIST_HEAD(cancels);
124         struct ptlrpc_request *req;
125         struct mdc_rpc_lock *rpc_lock;
126         struct obd_device *obd = exp->exp_obd;
127         __u32 size[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
128                         [REQ_REC_OFF] = sizeof(struct mds_rec_setattr),
129                         [REQ_REC_OFF + 1] = ealen,
130                         [REQ_REC_OFF + 2] = ea2len,
131                         [REQ_REC_OFF + 3] = sizeof(struct ldlm_request) };
132         __u32 replysize[6] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
133                              [REPLY_REC_OFF] = sizeof(struct mdt_body),
134                              [REPLY_REC_OFF+1] = obd->u.cli.cl_max_mds_easize,
135                              [REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE,
136                              [REPLY_REC_OFF+3] = sizeof(struct lustre_capa),
137                              [REPLY_REC_OFF+4] = sizeof(struct lustre_capa)};
138
139         int count, bufcount = 2, rc, replybufcount = 2;
140         int offset = REQ_REC_OFF + 3;
141         __u64 bits;
142         ENTRY;
143
144         LASSERT(iattr != NULL);
145
146         if (mdc_exp_is_2_0_server(exp)) {
147                 size[REQ_REC_OFF] = sizeof(struct mdt_rec_setattr);
148                 size[REQ_REC_OFF + 1] = 0; /* capa */
149                 size[REQ_REC_OFF + 2] = 0; //sizeof (struct mdt_epoch);
150                 size[REQ_REC_OFF + 3] = ealen;
151                 size[REQ_REC_OFF + 4] = ea2len;
152                 size[REQ_REC_OFF + 5] = sizeof(struct ldlm_request);
153                 offset = REQ_REC_OFF + 5;
154                 bufcount = 6;
155                 replybufcount = 6;
156         } else {
157                 bufcount = 4;
158         }
159
160         bits = MDS_INODELOCK_UPDATE;
161         if (iattr->ia_valid & (ATTR_MODE|ATTR_UID|ATTR_GID))
162                 bits |= MDS_INODELOCK_LOOKUP;
163         count = mdc_resource_get_unused(exp, &op_data->fid1,
164                                         &cancels, LCK_EX, bits);
165         if (exp_connect_cancelset(exp))
166                 bufcount ++ ;
167         req = mdc_prep_elc_req(exp, bufcount, size,
168                                offset, &cancels, count);
169         if (req == NULL)
170                 RETURN(-ENOMEM);
171
172         if (iattr->ia_valid & ATTR_FROM_OPEN) {
173                 req->rq_request_portal = MDS_SETATTR_PORTAL;
174                 ptlrpc_at_set_req_timeout(req);
175                 rpc_lock = obd->u.cli.cl_setattr_lock;
176         } else {
177                 rpc_lock = obd->u.cli.cl_rpc_lock;
178         }
179
180         if (iattr->ia_valid & (ATTR_MTIME | ATTR_CTIME))
181                 CDEBUG(D_INODE, "setting mtime %lu, ctime %lu\n",
182                        LTIME_S(iattr->ia_mtime), LTIME_S(iattr->ia_ctime));
183         mdc_setattr_pack(req, REQ_REC_OFF, op_data, iattr,
184                          ea, ealen, ea2, ea2len);
185
186         ptlrpc_req_set_repsize(req, replybufcount, replysize);
187
188         rc = mdc_reint(req, rpc_lock, LUSTRE_IMP_FULL);
189         *request = req;
190         if (rc == -ERESTARTSYS)
191                 rc = 0;
192
193         RETURN(rc);
194 }
195
196 int mdc_create(struct obd_export *exp, struct mdc_op_data *op_data,
197                const void *data, int datalen, int mode, __u32 uid, __u32 gid,
198                cfs_cap_t cap_effective, __u64 rdev,
199                struct ptlrpc_request **request)
200 {
201         CFS_LIST_HEAD(cancels);
202         struct obd_device *obd = exp->exp_obd;
203         struct ptlrpc_request *req;
204         int level, bufcount = 3, rc;
205         __u32 size[6] = { sizeof(struct ptlrpc_body),
206                         sizeof(struct mds_rec_create),
207                         op_data->namelen + 1, 0, sizeof(struct ldlm_request) };
208         int offset = REQ_REC_OFF + 3;
209         int count;
210         ENTRY;
211
212         if (mdc_exp_is_2_0_server(exp)) {
213                 size[REQ_REC_OFF] = sizeof(struct mdt_rec_create);
214                 size[REQ_REC_OFF + 1] = 0; /* capa */
215                 size[REQ_REC_OFF + 2] = op_data->namelen + 1;
216                 size[REQ_REC_OFF + 4] = sizeof(struct ldlm_request);
217                 bufcount++;
218                 offset ++;
219         }
220         if (data && datalen) {
221                 size[bufcount] = datalen;
222                 bufcount++;
223         }
224
225         count = mdc_resource_get_unused(exp, &op_data->fid1, &cancels,
226                                         LCK_EX, MDS_INODELOCK_UPDATE);
227         if (exp_connect_cancelset(exp)) {
228                 if (mdc_exp_is_2_0_server(exp)) {
229                         bufcount = 6;
230                 } else {
231                         bufcount = 5;
232                 }
233         }
234
235         if (mdc_exp_is_2_0_server(exp)) {
236                 struct client_obd *cli = &obd->u.cli;
237                 rc = mdc_fid_alloc(cli->cl_seq, (void *)&op_data->fid2);
238                 if (rc) {
239                         CERROR("fid allocation result: %d\n", rc);
240                         RETURN(rc);
241                 }
242         }
243
244         req = mdc_prep_elc_req(exp, bufcount, size,
245                                offset, &cancels, count);
246         if (req == NULL)
247                 RETURN(-ENOMEM);
248
249         /* mdc_create_pack fills msg->bufs[1] with name
250          * and msg->bufs[2] with tgt, for symlinks or lov MD data */
251         mdc_create_pack(req, REQ_REC_OFF, op_data, data, datalen, mode, uid,
252                         gid, cap_effective, rdev);
253
254         size[REPLY_REC_OFF] = sizeof(struct mdt_body);
255         size[REPLY_REC_OFF+1] = sizeof(struct ost_lvb);
256         ptlrpc_req_set_repsize(req, 3, size);
257
258         level = LUSTRE_IMP_FULL;
259  resend:
260         rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, level);
261         /* Resend if we were told to. */
262         if (rc == -ERESTARTSYS) {
263                 level = LUSTRE_IMP_RECOVER;
264                 goto resend;
265         }
266
267         if (!rc)
268                 mdc_store_inode_generation(req, REQ_REC_OFF, REPLY_REC_OFF);
269
270         *request = req;
271         RETURN(rc);
272 }
273
274 int mdc_unlink(struct obd_export *exp, struct mdc_op_data *op_data,
275                struct ptlrpc_request **request)
276 {
277         CFS_LIST_HEAD(cancels);
278         struct obd_device *obd = class_exp2obd(exp);
279         struct ptlrpc_request *req = *request;
280         __u32 size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
281                         [REQ_REC_OFF] = sizeof(struct mds_rec_unlink),
282                         [REQ_REC_OFF + 1] = op_data->namelen + 1,
283                         [REQ_REC_OFF + 2] = sizeof(struct ldlm_request) };
284         int count, rc, bufcount = 3;
285         int offset = REQ_REC_OFF + 2;
286         ENTRY;
287
288         if (mdc_exp_is_2_0_server(exp)) {
289                 size[REQ_REC_OFF] = sizeof(struct mdt_rec_unlink);
290                 size[REQ_REC_OFF + 1] = 0 /* capa */;
291                 size[REQ_REC_OFF + 2] = op_data->namelen + 1;
292                 size[REQ_REC_OFF + 3] = sizeof(struct ldlm_request);
293                 bufcount ++;
294                 offset ++;
295         }
296
297         LASSERT(req == NULL);
298         count = mdc_resource_get_unused(exp, &op_data->fid1, &cancels,
299                                         LCK_EX, MDS_INODELOCK_UPDATE);
300         if (op_data->fid3.id)
301                 count += mdc_resource_get_unused(exp, &op_data->fid3, &cancels,
302                                                  LCK_EX, MDS_INODELOCK_FULL);
303         if (exp_connect_cancelset(exp))
304                 bufcount ++;
305
306         req = mdc_prep_elc_req(exp, bufcount, size,
307                                offset, &cancels, count);
308         if (req == NULL)
309                 RETURN(-ENOMEM);
310         *request = req;
311
312         size[REPLY_REC_OFF] = sizeof(struct mdt_body);
313         size[REPLY_REC_OFF + 1] = obd->u.cli.cl_max_mds_easize;
314         size[REPLY_REC_OFF + 2] = obd->u.cli.cl_max_mds_cookiesize;
315         ptlrpc_req_set_repsize(req, 4, size);
316
317         mdc_unlink_pack(req, REQ_REC_OFF, op_data);
318
319         rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL);
320         if (rc == -ERESTARTSYS)
321                 rc = 0;
322         RETURN(rc);
323 }
324
325 int mdc_link(struct obd_export *exp, struct mdc_op_data *op_data,
326              struct ptlrpc_request **request)
327 {
328         CFS_LIST_HEAD(cancels);
329         struct obd_device *obd = exp->exp_obd;
330         struct ptlrpc_request *req;
331         __u32 size[6] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
332                         [REQ_REC_OFF] = sizeof(struct mds_rec_link),
333                         [REQ_REC_OFF + 1] = op_data->namelen + 1,
334                         [REQ_REC_OFF + 2] = sizeof(struct ldlm_request)};
335         int count, rc, bufcount = 3;
336         int offset = REQ_REC_OFF + 2;
337         ENTRY;
338
339         if (mdc_exp_is_2_0_server(exp)) {
340                 size[REQ_REC_OFF] = sizeof(struct mdt_rec_link);
341                 size[REQ_REC_OFF + 1] = 0; /* capa */
342                 size[REQ_REC_OFF + 2] = 0; /* capa */
343                 size[REQ_REC_OFF + 3] = op_data->namelen + 1;
344                 size[REQ_REC_OFF + 4] = sizeof(struct ldlm_request);
345                 bufcount = 5;
346                 offset += 2;
347         }
348
349         count = mdc_resource_get_unused(exp, &op_data->fid1, &cancels,
350                                         LCK_EX, MDS_INODELOCK_UPDATE);
351         count += mdc_resource_get_unused(exp, &op_data->fid2, &cancels,
352                                          LCK_EX, MDS_INODELOCK_UPDATE);
353         if (exp_connect_cancelset(exp))
354                 bufcount++;
355
356         req = mdc_prep_elc_req(exp, bufcount, size,
357                                offset, &cancels, count);
358         if (req == NULL)
359                 RETURN(-ENOMEM);
360
361         mdc_link_pack(req, REQ_REC_OFF, op_data);
362
363         size[REPLY_REC_OFF] = sizeof(struct mdt_body);
364         ptlrpc_req_set_repsize(req, 2, size);
365
366         rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL);
367         *request = req;
368         if (rc == -ERESTARTSYS)
369                 rc = 0;
370
371         RETURN(rc);
372 }
373
374 int mdc_rename(struct obd_export *exp, struct mdc_op_data *op_data,
375                const char *old, int oldlen, const char *new, int newlen,
376                struct ptlrpc_request **request)
377 {
378         CFS_LIST_HEAD(cancels);
379         struct obd_device *obd = exp->exp_obd;
380         struct ptlrpc_request *req;
381         __u32 size[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
382                         [REQ_REC_OFF] = sizeof(struct mds_rec_rename),
383                         [REQ_REC_OFF + 1] = oldlen + 1,
384                         [REQ_REC_OFF + 2] = newlen + 1,
385                         [REQ_REC_OFF + 3] = sizeof(struct ldlm_request) };
386         int count, rc, bufcount = 4;
387         int offset = REQ_REC_OFF + 3;
388         ENTRY;
389
390         if (mdc_exp_is_2_0_server(exp)) {
391                 size[REQ_REC_OFF] = sizeof(struct mdt_rec_rename);
392                 size[REQ_REC_OFF + 1] = 0; /* capa */
393                 size[REQ_REC_OFF + 2] = 0; /* capa */
394                 size[REQ_REC_OFF + 3] = oldlen + 1;
395                 size[REQ_REC_OFF + 4] = newlen + 1;
396                 size[REQ_REC_OFF + 5] = sizeof(struct ldlm_request);
397                 bufcount = 6;
398                 offset += 2;
399         }
400
401         count = mdc_resource_get_unused(exp, &op_data->fid1, &cancels,
402                                         LCK_EX, MDS_INODELOCK_UPDATE);
403         count += mdc_resource_get_unused(exp, &op_data->fid2, &cancels,
404                                          LCK_EX, MDS_INODELOCK_UPDATE);
405         if (op_data->fid3.id)
406                 count += mdc_resource_get_unused(exp, &op_data->fid3, &cancels,
407                                                  LCK_EX, MDS_INODELOCK_LOOKUP);
408         if (op_data->fid4.id)
409                 count += mdc_resource_get_unused(exp, &op_data->fid4, &cancels,
410                                                  LCK_EX, MDS_INODELOCK_FULL);
411         if (exp_connect_cancelset(exp))
412                 bufcount ++;
413
414         req = mdc_prep_elc_req(exp, bufcount, size,
415                                offset, &cancels, count);
416         if (req == NULL)
417                 RETURN(-ENOMEM);
418
419         mdc_rename_pack(req, REQ_REC_OFF, op_data, old, oldlen, new, newlen);
420
421         size[REPLY_REC_OFF] = sizeof(struct mdt_body);
422         size[REPLY_REC_OFF + 1] = obd->u.cli.cl_max_mds_easize;
423         size[REPLY_REC_OFF + 2] = obd->u.cli.cl_max_mds_cookiesize;
424         ptlrpc_req_set_repsize(req, 4, size);
425
426         rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL);
427         *request = req;
428         if (rc == -ERESTARTSYS)
429                 rc = 0;
430
431         RETURN(rc);
432 }