1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * linux/mdt/mdt_reint.c
5 * Lustre Metadata Target (mdt) reintegration routines
7 * Copyright (C) 2002-2006 Cluster File Systems, Inc.
8 * Author: Peter Braam <braam@clusterfs.com>
9 * Author: Andreas Dilger <adilger@clusterfs.com>
10 * Author: Phil Schwan <phil@clusterfs.com>
11 * Author: Huang Hua <huanghua@clusterfs.com>
13 * This file is part of the Lustre file system, http://www.lustre.org
14 * Lustre is a trademark of Cluster File Systems, Inc.
16 * You may have signed or agreed to another license before downloading
17 * this software. If so, you are bound by the terms and conditions
18 * of that agreement, and the following does not apply to you. See the
19 * LICENSE file included with this distribution for more information.
21 * If you did not agree to a different license, then this copy of Lustre
22 * is open source software; you can redistribute it and/or modify it
23 * under the terms of version 2 of the GNU General Public License as
24 * published by the Free Software Foundation.
26 * In either case, Lustre is distributed in the hope that it will be
27 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
28 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
29 * license text for more details.
33 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_MDS
37 #include "mdt_internal.h"
40 static int mdt_md_create(struct mdt_thread_info *info)
42 struct mdt_device *mdt = info->mti_mdt;
43 struct mdt_object *parent;
44 struct mdt_object *child;
45 struct mdt_lock_handle *lh;
46 struct mdt_body *repbody;
47 struct md_attr *ma = &info->mti_attr;
48 struct mdt_reint_record *rr = &info->mti_rr;
52 repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
54 lh = &info->mti_lh[MDT_LH_PARENT];
55 lh->mlh_mode = LCK_PW;
57 parent = mdt_object_find_lock(info, rr->rr_fid1,
58 lh, MDS_INODELOCK_UPDATE);
60 RETURN(PTR_ERR(parent));
62 child = mdt_object_find(info->mti_ctxt, mdt, rr->rr_fid2);
64 struct md_object *next = mdt_object_child(parent);
66 rc = mdo_create(info->mti_ctxt, next, rr->rr_name,
67 mdt_object_child(child), rr->rr_tgt, ma);
69 /* return fid & attr to client. */
70 if (ma->ma_valid & MA_INODE)
71 mdt_pack_attr2body(repbody, &ma->ma_attr,
72 mdt_object_fid(child));
74 mdt_object_put(info->mti_ctxt, child);
77 mdt_object_unlock_put(info, parent, lh, rc);
81 /* partial request to create object only */
82 static int mdt_md_mkobj(struct mdt_thread_info *info)
84 struct mdt_device *mdt = info->mti_mdt;
86 struct mdt_body *repbody;
87 struct md_attr *ma = &info->mti_attr;
91 repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
93 o = mdt_object_find(info->mti_ctxt, mdt, info->mti_rr.rr_fid1);
95 struct md_object *next = mdt_object_child(o);
97 rc = mo_object_create(info->mti_ctxt, next, ma);
99 /* return fid & attr to client. */
100 if (ma->ma_valid & MA_INODE)
101 mdt_pack_attr2body(repbody, &ma->ma_attr,
104 mdt_object_put(info->mti_ctxt, o);
112 /* In the raw-setattr case, we lock the child inode.
113 * In the write-back case or if being called from open,
114 * the client holds a lock already.
115 * We use the ATTR_FROM_OPEN flag to tell these cases apart. */
116 static int mdt_reint_setattr(struct mdt_thread_info *info)
118 struct lu_attr *attr = &info->mti_attr.ma_attr;
119 struct mdt_reint_record *rr = &info->mti_rr;
120 struct ptlrpc_request *req = mdt_info_req(info);
121 struct mdt_object *mo;
122 struct md_object *next;
123 struct mdt_lock_handle *lh;
124 struct mdt_body *repbody;
125 /*__u64 valid = attr->la_valid;*/
130 DEBUG_REQ(D_INODE, req, "setattr "DFID3" %x", PFID3(rr->rr_fid1),
131 (unsigned int)attr->la_valid);
133 /* MDS_CHECK_RESENT */
134 lh = &info->mti_lh[MDT_LH_PARENT];
135 lh->mlh_mode = LCK_EX;
137 if (attr->la_valid & ATTR_FROM_OPEN) {
138 mo = mdt_object_find(info->mti_ctxt, info->mti_mdt,
141 __u64 lockpart = MDS_INODELOCK_UPDATE;
142 if (attr->la_valid & (ATTR_MODE|ATTR_UID|ATTR_GID))
143 lockpart |= MDS_INODELOCK_LOOKUP;
145 mo = mdt_object_find_lock(info, rr->rr_fid1, lh, lockpart);
148 RETURN(rc = PTR_ERR(mo));
150 next = mdt_object_child(mo);
151 if (lu_object_exists(info->mti_ctxt, &mo->mot_obj.mo_lu) <= 0)
152 GOTO(out_unlock, rc = -ENOENT);
154 rc = mo_attr_set(info->mti_ctxt, next, attr);
156 GOTO(out_unlock, rc);
158 rc = mo_attr_get(info->mti_ctxt, next, attr);
160 GOTO(out_unlock, rc);
162 repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
163 mdt_pack_attr2body(repbody, attr, mdt_object_fid(mo));
165 /* don't return OST-specific attributes if we didn't just set them.
166 if (valid & ATTR_SIZE)
167 repbody->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
168 if (valid & (ATTR_MTIME | ATTR_MTIME_SET))
169 repbody->valid |= OBD_MD_FLMTIME;
170 if (valid & (ATTR_ATIME | ATTR_ATIME_SET))
171 repbody->valid |= OBD_MD_FLATIME;
173 /* FIXME: I have to combine the attr_set & xattr_set into one single
174 transaction. How can I?
177 if (rr->rr_eadatalen > 0)
178 rc = mo_xattr_set(info->mti_ctxt, next,
179 rr->rr_eadata, rr->rr_eadatalen,
182 /* FIXME & TODO Please deal with logcookies here*/
183 GOTO(out_unlock, rc);
185 mdt_object_unlock_put(info, mo, lh, rc);
190 static int mdt_reint_create(struct mdt_thread_info *info)
195 switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
198 if (strlen(info->mti_rr.rr_name) > 0)
199 rc = mdt_md_create(info);
201 rc = mdt_md_mkobj(info);
209 /* special file should stay on the same node as parent */
210 LASSERT(strlen(info->mti_rr.rr_name) > 0);
212 rc = mdt_md_create(info);
222 static int mdt_reint_unlink(struct mdt_thread_info *info)
224 struct mdt_reint_record *rr = &info->mti_rr;
225 struct ptlrpc_request *req = mdt_info_req(info);
226 struct mdt_object *mp;
227 struct mdt_object *mc;
228 struct mdt_lock_handle *lhp;
229 struct mdt_lock_handle *lhc;
230 struct mdt_body *repbody;
231 struct lu_fid *child_fid = &info->mti_tmp_fid1;
236 DEBUG_REQ(D_INODE, req, "unlink "DFID3"/%s\n", PFID3(rr->rr_fid1),
239 /* MDS_CHECK_RESENT here */
241 /* step 1: lock the parent */
242 lhp = &info->mti_lh[MDT_LH_PARENT];
243 lhp->mlh_mode = LCK_EX;
244 mp = mdt_object_find_lock(info, rr->rr_fid1, lhp, MDS_INODELOCK_UPDATE);
248 if (strlen(rr->rr_name) == 0) {
249 /* remote partial operation */
250 rc = mo_ref_del(info->mti_ctxt, mdt_object_child(mp),
252 GOTO(out_unlock_parent, rc);
255 repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
257 /*step 2: find & lock the child */
258 lhc = &info->mti_lh[MDT_LH_CHILD];
259 lhc->mlh_mode = LCK_EX;
260 rc = mdo_lookup(info->mti_ctxt, mdt_object_child(mp),
261 rr->rr_name, child_fid);
263 GOTO(out_unlock_parent, rc);
265 /* we will lock the child regardless it is local or remote. No harm. */
266 mc = mdt_object_find_lock(info, child_fid, lhc, MDS_INODELOCK_FULL);
268 GOTO(out_unlock_parent, rc = PTR_ERR(mc));
270 /*step 3: do some checking ...*/
272 /* step 4: delete it */
273 /* cmm will take care if child is local or remote */
274 rc = mdo_unlink(info->mti_ctxt, mdt_object_child(mp),
275 mdt_object_child(mc), rr->rr_name, &info->mti_attr);
278 GOTO(out_unlock_child, rc);
280 rc = mdt_handle_last_unlink(info, mc, &RQF_MDS_REINT_UNLINK_LAST);
281 GOTO(out_unlock_child, rc);
284 mdt_object_unlock_put(info, mc, lhc, rc);
286 mdt_object_unlock_put(info, mp, lhp, rc);
290 static int mdt_reint_link(struct mdt_thread_info *info)
292 struct mdt_reint_record *rr = &info->mti_rr;
293 struct ptlrpc_request *req = mdt_info_req(info);
294 struct mdt_object *ms;
295 struct mdt_object *mp;
296 struct mdt_lock_handle *lhs;
297 struct mdt_lock_handle *lhp;
302 DEBUG_REQ(D_INODE, req, "link original "DFID3" to "DFID3" %s",
303 PFID3(rr->rr_fid1), PFID3(rr->rr_fid2), rr->rr_name);
305 /* MDS_CHECK_RESENT here */
307 /* step 1: lock the source */
308 lhs = &info->mti_lh[MDT_LH_PARENT];
309 lhs->mlh_mode = LCK_EX;
310 ms = mdt_object_find_lock(info, rr->rr_fid1, lhs, MDS_INODELOCK_UPDATE);
314 if (strlen(rr->rr_name) == 0) {
315 /* remote partial operation */
316 rc = mo_ref_add(info->mti_ctxt, mdt_object_child(ms));
317 GOTO(out_unlock_source, rc);
319 /*step 2: find & lock the target parent dir*/
320 lhp = &info->mti_lh[MDT_LH_CHILD];
321 lhp->mlh_mode = LCK_EX;
322 mp = mdt_object_find_lock(info, rr->rr_fid2, lhp, MDS_INODELOCK_UPDATE);
324 GOTO(out_unlock_source, rc = PTR_ERR(mp));
326 /* step 4: link it */
327 rc = mdo_link(info->mti_ctxt, mdt_object_child(mp),
328 mdt_object_child(ms), rr->rr_name);
329 GOTO(out_unlock_target, rc);
332 mdt_object_unlock_put(info, mp, lhp, rc);
334 mdt_object_unlock_put(info, ms, lhs, rc);
338 /* partial operation for rename */
339 static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
341 struct mdt_reint_record *rr = &info->mti_rr;
342 struct ptlrpc_request *req = mdt_info_req(info);
343 struct mdt_object *mtgtdir;
344 struct mdt_object *mtgt = NULL;
345 struct mdt_lock_handle *lh_tgtdir;
346 struct mdt_lock_handle *lh_tgt;
347 struct lu_fid *tgt_fid = &info->mti_tmp_fid1;
352 DEBUG_REQ(D_INODE, req, "rename_tgt "DFID3" to "DFID3" %s",
354 PFID3(rr->rr_fid1), rr->rr_tgt);
356 /* step 1: lookup & lock the tgt dir */
357 lh_tgt = &info->mti_lh[MDT_LH_CHILD];
358 lh_tgtdir = &info->mti_lh[MDT_LH_PARENT];
359 lh_tgtdir->mlh_mode = LCK_PW;
360 mtgtdir = mdt_object_find_lock(info, rr->rr_fid1, lh_tgtdir,
361 MDS_INODELOCK_UPDATE);
363 GOTO(out, rc = PTR_ERR(mtgtdir));
365 /*step 2: find & lock the target object if exists*/
366 rc = mdo_lookup(info->mti_ctxt, mdt_object_child(mtgtdir),
367 rr->rr_tgt, tgt_fid);
368 if (rc != 0 && rc != -ENOENT)
369 GOTO(out_unlock_tgtdir, rc);
372 lh_tgt->mlh_mode = LCK_EX;
374 mtgt = mdt_object_find_lock(info, tgt_fid, lh_tgt,
375 MDS_INODELOCK_LOOKUP);
377 GOTO(out_unlock_tgtdir, rc = PTR_ERR(mtgt));
380 /* step 3: rename_tgt or name_insert */
382 rc = mdo_rename_tgt(info->mti_ctxt, mdt_object_child(mtgtdir),
383 mdt_object_child(mtgt),
384 rr->rr_fid2, rr->rr_tgt);
386 rc = mdo_name_insert(info->mti_ctxt, mdt_object_child(mtgtdir),
387 rr->rr_tgt, rr->rr_fid2);
388 GOTO(out_unlock_tgt, rc);
392 mdt_object_unlock_put(info, mtgt, lh_tgt, rc);
395 mdt_object_unlock_put(info, mtgtdir, lh_tgtdir, rc);
401 static int mdt_reint_rename(struct mdt_thread_info *info)
403 struct mdt_reint_record *rr = &info->mti_rr;
404 struct req_capsule *pill = &info->mti_pill;
405 struct ptlrpc_request *req = mdt_info_req(info);
406 struct mdt_object *msrcdir;
407 struct mdt_object *mtgtdir;
408 struct mdt_object *mold;
409 struct mdt_object *mnew = NULL;
410 struct mdt_lock_handle *lh_srcdirp;
411 struct mdt_lock_handle *lh_tgtdirp;
412 struct mdt_lock_handle *lh_oldp;
413 struct mdt_lock_handle *lh_newp;
414 struct lu_fid *old_fid = &info->mti_tmp_fid1;
415 struct lu_fid *new_fid = &info->mti_tmp_fid2;
420 DEBUG_REQ(D_INODE, req, "rename "DFID3"/%s to "DFID3"/%s",
421 PFID3(rr->rr_fid1), rr->rr_name,
422 PFID3(rr->rr_fid2), rr->rr_tgt);
424 /* MDS_CHECK_RESENT here */
426 rc = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT);
428 /* if (rr->rr_name[0] == 0) {*/
429 RETURN(mdt_reint_rename_tgt(info));
432 lh_newp = &info->mti_lh[MDT_LH_NEW];
434 /* step 1: lock the source dir */
435 lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
436 lh_srcdirp->mlh_mode = LCK_EX;
437 msrcdir = mdt_object_find_lock(info, rr->rr_fid1, lh_srcdirp,
438 MDS_INODELOCK_UPDATE);
440 GOTO(out, rc = PTR_ERR(msrcdir));
442 /*step 2: find & lock the target dir*/
443 lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
444 lh_tgtdirp->mlh_mode = LCK_EX;
445 if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
446 mdt_object_get(info->mti_ctxt, msrcdir);
449 mtgtdir = mdt_object_find_lock(info, rr->rr_fid2, lh_tgtdirp,
450 MDS_INODELOCK_UPDATE);
452 GOTO(out_unlock_source, rc = PTR_ERR(mtgtdir));
455 /*step 3: find & lock the old object*/
456 rc = mdo_lookup(info->mti_ctxt, mdt_object_child(msrcdir),
457 rr->rr_name, old_fid);
459 GOTO(out_unlock_target, rc);
461 lh_oldp = &info->mti_lh[MDT_LH_OLD];
462 lh_oldp->mlh_mode = LCK_EX;
463 mold = mdt_object_find_lock(info, old_fid, lh_oldp,
464 MDS_INODELOCK_LOOKUP);
466 GOTO(out_unlock_target, rc = PTR_ERR(mold));
468 /*step 4: find & lock the new object*/
469 /* new target object may not exist now */
470 rc = mdo_lookup(info->mti_ctxt, mdt_object_child(mtgtdir),
471 rr->rr_tgt, new_fid);
472 if (rc != 0 && rc != -ENOENT)
473 GOTO(out_unlock_old, rc);
476 /* the new_fid should have been filled at this moment*/
477 lh_newp->mlh_mode = LCK_EX;
478 mnew = mdt_object_find_lock(info, new_fid, lh_newp,
481 GOTO(out_unlock_old, rc = PTR_ERR(mnew));
484 /* step 5: dome some checking ...*/
486 /* step 6: rename it */
487 rc = mdo_rename(info->mti_ctxt, mdt_object_child(msrcdir),
488 mdt_object_child(mtgtdir), old_fid,
489 rr->rr_name, mnew ? mdt_object_child(mnew): NULL,
491 GOTO(out_unlock_new, rc);
495 mdt_object_unlock_put(info, mnew, lh_newp, rc);
498 mdt_object_unlock_put(info, mold, lh_oldp, rc);
500 mdt_object_unlock_put(info, mtgtdir, lh_tgtdirp, rc);
502 mdt_object_unlock_put(info, msrcdir, lh_srcdirp, rc);
508 typedef int (*mdt_reinter)(struct mdt_thread_info *info);
510 static mdt_reinter reinters[REINT_MAX] = {
511 [REINT_SETATTR] = mdt_reint_setattr,
512 [REINT_CREATE] = mdt_reint_create,
513 [REINT_LINK] = mdt_reint_link,
514 [REINT_UNLINK] = mdt_reint_unlink,
515 [REINT_RENAME] = mdt_reint_rename,
516 [REINT_OPEN] = mdt_reint_open
519 int mdt_reint_rec(struct mdt_thread_info *info)
524 rc = reinters[info->mti_rr.rr_opcode](info);