1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * linux/mdt/mdt_reint.c
5 * Lustre Metadata Target (mdt) reintegration routines
7 * Copyright (C) 2002-2006 Cluster File Systems, Inc.
8 * Author: Peter Braam <braam@clusterfs.com>
9 * Author: Andreas Dilger <adilger@clusterfs.com>
10 * Author: Phil Schwan <phil@clusterfs.com>
11 * Author: Huang Hua <huanghua@clusterfs.com>
12 * Author: Yury Umanets <umka@clusterfs.com>
14 * This file is part of the Lustre file system, http://www.lustre.org
15 * Lustre is a trademark of Cluster File Systems, Inc.
17 * You may have signed or agreed to another license before downloading
18 * this software. If so, you are bound by the terms and conditions
19 * of that agreement, and the following does not apply to you. See the
20 * LICENSE file included with this distribution for more information.
22 * If you did not agree to a different license, then this copy of Lustre
23 * is open source software; you can redistribute it and/or modify it
24 * under the terms of version 2 of the GNU General Public License as
25 * published by the Free Software Foundation.
27 * In either case, Lustre is distributed in the hope that it will be
28 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
29 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
30 * license text for more details.
34 # define EXPORT_SYMTAB
36 #define DEBUG_SUBSYSTEM S_MDS
38 #include "mdt_internal.h"
40 static int mdt_md_create(struct mdt_thread_info *info)
42 struct mdt_device *mdt = info->mti_mdt;
43 struct mdt_object *parent;
44 struct mdt_object *child;
45 struct mdt_lock_handle *lh;
46 struct mdt_body *repbody;
47 struct md_attr *ma = &info->mti_attr;
48 struct mdt_reint_record *rr = &info->mti_rr;
52 DEBUG_REQ(D_INODE, mdt_info_req(info), "Create (%s->"DFID") in "DFID,
53 rr->rr_name, PFID(rr->rr_fid2), PFID(rr->rr_fid1));
55 repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
57 lh = &info->mti_lh[MDT_LH_PARENT];
58 mdt_lock_pdo_init(lh, LCK_PW, rr->rr_name, rr->rr_namelen);
60 parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
61 MDS_INODELOCK_UPDATE);
63 RETURN(PTR_ERR(parent));
65 child = mdt_object_find(info->mti_env, mdt, rr->rr_fid2);
67 struct md_object *next = mdt_object_child(parent);
69 ma->ma_need = MA_INODE;
71 mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
72 OBD_FAIL_MDS_REINT_CREATE_WRITE);
74 /* Let lower layer know current lock mode. */
75 info->mti_spec.sp_cr_mode =
76 mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode);
79 * Do perform lookup sanity check. We do not know if name exists
82 info->mti_spec.sp_cr_lookup = 1;
84 rc = mdo_create(info->mti_env, next, rr->rr_name,
85 mdt_object_child(child),
88 /* Return fid & attr to client. */
89 if (ma->ma_valid & MA_INODE)
90 mdt_pack_attr2body(info, repbody, &ma->ma_attr,
91 mdt_object_fid(child));
93 mdt_object_put(info->mti_env, child);
96 mdt_object_unlock_put(info, parent, lh, rc);
100 /* Partial request to create object only */
101 static int mdt_md_mkobj(struct mdt_thread_info *info)
103 struct mdt_device *mdt = info->mti_mdt;
104 struct mdt_object *o;
105 struct mdt_body *repbody;
106 struct md_attr *ma = &info->mti_attr;
110 DEBUG_REQ(D_INODE, mdt_info_req(info), "Partial create "DFID"\n",
111 PFID(info->mti_rr.rr_fid2));
113 repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
115 o = mdt_object_find(info->mti_env, mdt, info->mti_rr.rr_fid2);
117 struct md_object *next = mdt_object_child(o);
119 ma->ma_need = MA_INODE;
123 * Cross-ref create can encounter already created obj in case of
124 * recovery, just get attr in that case.
126 if (mdt_object_exists(o) == 1) {
127 rc = mo_attr_get(info->mti_env, next, ma);
129 rc = mo_object_create(info->mti_env, next,
130 &info->mti_spec, ma);
133 /* Return fid & attr to client. */
134 if (ma->ma_valid & MA_INODE)
135 mdt_pack_attr2body(info, repbody, &ma->ma_attr,
138 mdt_object_put(info->mti_env, o);
145 /* In the raw-setattr case, we lock the child inode.
146 * In the write-back case or if being called from open,
147 * the client holds a lock already.
148 * We use the ATTR_FROM_OPEN (translated into MRF_SETATTR_LOCKED by
149 * mdt_setattr_unpack()) flag to tell these cases apart. */
150 int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, int flags)
152 struct md_attr *ma = &info->mti_attr;
153 struct mdt_lock_handle *lh;
159 som_update = (info->mti_epoch->flags & MF_SOM_CHANGE);
161 /* Try to avoid object_lock if another epoch has been started
163 if (som_update && (info->mti_epoch->ioepoch != mo->mot_ioepoch))
166 lh = &info->mti_lh[MDT_LH_PARENT];
167 mdt_lock_pdo_init(lh, LCK_PW, NULL, 0);
169 if (!(flags & MRF_SETATTR_LOCKED)) {
170 __u64 lockpart = MDS_INODELOCK_UPDATE;
171 if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
172 lockpart |= MDS_INODELOCK_LOOKUP;
174 rc = mdt_object_lock(info, mo, lh, lockpart, MDT_LOCAL_LOCK);
179 /* Setattrs are syncronized through dlm lock taken above. If another
180 * epoch started, its attributes may be already flushed on disk,
182 if (som_update && (info->mti_epoch->ioepoch != mo->mot_ioepoch))
183 GOTO(out_unlock, rc = 0);
185 if (lu_object_assert_not_exists(&mo->mot_obj.mo_lu))
186 GOTO(out_unlock, rc = -ENOENT);
188 /* all attrs are packed into mti_attr in unpack_setattr */
189 mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
190 OBD_FAIL_MDS_REINT_SETATTR_WRITE);
192 /* all attrs are packed into mti_attr in unpack_setattr */
193 rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
195 GOTO(out_unlock, rc);
197 /* Re-enable SIZEONMDS. */
199 CDEBUG(D_INODE, "Closing epoch "LPU64" on "DFID". Count %d\n",
200 mo->mot_ioepoch, PFID(mdt_object_fid(mo)),
202 mdt_sizeonmds_enable(info, mo);
207 mdt_object_unlock(info, mo, lh, rc);
211 static int mdt_reint_setattr(struct mdt_thread_info *info,
212 struct mdt_lock_handle *lhc)
214 struct mdt_device *mdt = info->mti_mdt;
215 struct md_attr *ma = &info->mti_attr;
216 struct mdt_reint_record *rr = &info->mti_rr;
217 struct ptlrpc_request *req = mdt_info_req(info);
218 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
219 struct mdt_file_data *mfd;
220 struct mdt_object *mo;
221 struct md_object *next;
222 struct mdt_body *repbody;
226 mdt_lprocfs_time_start(info->mti_mdt, &info->mti_time,
227 LPROC_MDT_REINT_SETATTR);
229 DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
230 (unsigned int)ma->ma_attr.la_valid);
232 repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
233 mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
235 GOTO(out, rc = PTR_ERR(mo));
237 if (info->mti_epoch && (info->mti_epoch->flags & MF_EPOCH_OPEN)) {
239 rc = mdt_write_get(info->mti_mdt, mo);
245 GOTO(out_put, rc = -ENOMEM);
247 mdt_epoch_open(info, mo);
248 repbody->ioepoch = mo->mot_ioepoch;
250 mdt_object_get(info->mti_env, mo);
251 mfd->mfd_mode = FMODE_EPOCHLCK;
252 mfd->mfd_object = mo;
253 mfd->mfd_xid = req->rq_xid;
255 spin_lock(&med->med_open_lock);
256 list_add(&mfd->mfd_list, &med->med_open_head);
257 spin_unlock(&med->med_open_lock);
258 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
261 rc = mdt_attr_set(info, mo, rr->rr_flags);
265 if (info->mti_epoch && (info->mti_epoch->flags & MF_SOM_CHANGE)) {
266 LASSERT(info->mti_epoch);
268 /* Size-on-MDS Update. Find and free mfd. */
269 spin_lock(&med->med_open_lock);
270 mfd = mdt_handle2mfd(&(info->mti_epoch->handle));
272 spin_unlock(&med->med_open_lock);
273 CDEBUG(D_INODE, "no handle for file close: "
274 "fid = "DFID": cookie = "LPX64"\n",
275 PFID(info->mti_rr.rr_fid1),
276 info->mti_epoch->handle.cookie);
277 GOTO(out_put, rc = -ESTALE);
280 LASSERT(mfd->mfd_mode == FMODE_SOM);
281 LASSERT(ma->ma_attr.la_valid & LA_SIZE);
282 LASSERT(!(info->mti_epoch->flags & MF_EPOCH_CLOSE));
284 class_handle_unhash(&mfd->mfd_handle);
285 list_del_init(&mfd->mfd_list);
286 spin_unlock(&med->med_open_lock);
287 mdt_mfd_close(info, mfd);
290 ma->ma_need = MA_INODE;
292 next = mdt_object_child(mo);
293 rc = mo_attr_get(info->mti_env, next, ma);
297 mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
299 if (mdt->mdt_opts.mo_oss_capa &&
300 S_ISREG(lu_object_attr(&mo->mot_obj.mo_lu)) &&
301 (ma->ma_attr.la_valid & LA_SIZE)) {
302 struct lustre_capa *capa;
304 capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1);
306 capa->lc_opc = CAPA_OPC_OSS_DEFAULT | CAPA_OPC_OSS_TRUNC;
307 rc = mo_capa_get(info->mti_env, mdt_object_child(mo), capa, 0);
310 repbody->valid |= OBD_MD_FLOSSCAPA;
315 mdt_object_put(info->mti_env, mo);
317 mdt_lprocfs_time_end(info->mti_mdt, &info->mti_time,
318 LPROC_MDT_REINT_SETATTR);
322 static int mdt_reint_create(struct mdt_thread_info *info,
323 struct mdt_lock_handle *lhc)
328 mdt_lprocfs_time_start(info->mti_mdt, &info->mti_time,
329 LPROC_MDT_REINT_CREATE);
331 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
332 GOTO(out, rc = err_serious(-ESTALE));
334 switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
336 /* Cross-ref case. */
337 if (info->mti_cross_ref) {
338 rc = mdt_md_mkobj(info);
348 /* Special file should stay on the same node as parent. */
349 LASSERT(info->mti_rr.rr_namelen > 0);
350 rc = mdt_md_create(info);
354 rc = err_serious(-EOPNOTSUPP);
358 mdt_lprocfs_time_end(info->mti_mdt, &info->mti_time,
359 LPROC_MDT_REINT_CREATE);
363 static int mdt_reint_unlink(struct mdt_thread_info *info,
364 struct mdt_lock_handle *lhc)
366 struct mdt_reint_record *rr = &info->mti_rr;
367 struct ptlrpc_request *req = mdt_info_req(info);
368 struct md_attr *ma = &info->mti_attr;
369 struct lu_fid *child_fid = &info->mti_tmp_fid1;
370 struct mdt_object *mp;
371 struct mdt_object *mc;
372 struct mdt_lock_handle *parent_lh;
373 struct mdt_lock_handle *child_lh;
377 mdt_lprocfs_time_start(info->mti_mdt, &info->mti_time,
378 LPROC_MDT_REINT_UNLINK);
380 DEBUG_REQ(D_INODE, req, "unlink "DFID"/%s\n", PFID(rr->rr_fid1),
383 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
384 GOTO(out, rc = err_serious(-ENOENT));
386 /* step 1: lock the parent */
387 parent_lh = &info->mti_lh[MDT_LH_PARENT];
388 mdt_lock_pdo_init(parent_lh, LCK_PW, rr->rr_name,
391 mp = mdt_object_find_lock(info, rr->rr_fid1, parent_lh,
392 MDS_INODELOCK_UPDATE);
394 GOTO(out, rc = PTR_ERR(mp));
396 ma->ma_lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
397 ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
398 &RMF_MDT_MD, RCL_SERVER);
400 ma->ma_cookie = req_capsule_server_get(&info->mti_pill,
402 ma->ma_cookie_size = req_capsule_get_size(&info->mti_pill,
405 ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE;
407 if (!ma->ma_lmm || !ma->ma_cookie)
408 GOTO(out_unlock_parent, rc = -EINVAL);
410 if (info->mti_cross_ref) {
412 * Remote partial operation. It is possible that replay may
413 * happen on parent MDT and this operation will be repeated.
414 * Therefore the object absense is allowed case and nothing
415 * should be done here.
417 if (mdt_object_exists(mp) > 0) {
418 mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA);
419 rc = mo_ref_del(info->mti_env,
420 mdt_object_child(mp), ma);
421 mdt_handle_last_unlink(info, mp, ma);
424 GOTO(out_unlock_parent, rc);
427 /* step 2: find & lock the child */
428 rc = mdo_lookup(info->mti_env, mdt_object_child(mp),
429 rr->rr_name, child_fid, &info->mti_spec);
431 GOTO(out_unlock_parent, rc);
433 /* We will lock the child regardless it is local or remote. No harm. */
434 mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
436 GOTO(out_unlock_parent, rc = PTR_ERR(mc));
437 child_lh = &info->mti_lh[MDT_LH_CHILD];
438 mdt_lock_reg_init(child_lh, LCK_EX);
439 rc = mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_FULL,
442 mdt_object_put(info->mti_env, mc);
443 GOTO(out_unlock_parent, rc);
446 mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
447 OBD_FAIL_MDS_REINT_UNLINK_WRITE);
450 * Now we can only make sure we need MA_INODE, in mdd layer, will check
451 * whether need MA_LOV and MA_COOKIE.
453 ma->ma_need = MA_INODE;
455 mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
456 rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
457 mdt_object_child(mc), rr->rr_name, ma);
459 mdt_handle_last_unlink(info, mc, ma);
462 mdt_object_unlock_put(info, mc, child_lh, rc);
464 mdt_object_unlock_put(info, mp, parent_lh, rc);
466 mdt_shrink_reply(info, REPLY_REC_OFF + 1, 0, 0);
467 mdt_lprocfs_time_end(info->mti_mdt, &info->mti_time,
468 LPROC_MDT_REINT_UNLINK);
472 static int mdt_reint_link(struct mdt_thread_info *info,
473 struct mdt_lock_handle *lhc)
475 struct mdt_reint_record *rr = &info->mti_rr;
476 struct ptlrpc_request *req = mdt_info_req(info);
477 struct md_attr *ma = &info->mti_attr;
478 struct mdt_object *ms;
479 struct mdt_object *mp;
480 struct mdt_lock_handle *lhs;
481 struct mdt_lock_handle *lhp;
485 mdt_lprocfs_time_start(info->mti_mdt, &info->mti_time,
486 LPROC_MDT_REINT_LINK);
488 DEBUG_REQ(D_INODE, req, "link "DFID" to "DFID"/%s",
489 PFID(rr->rr_fid1), PFID(rr->rr_fid2), rr->rr_name);
491 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
492 GOTO(out, rc = err_serious(-ENOENT));
494 if (info->mti_cross_ref) {
495 /* MDT holding name ask us to add ref. */
496 lhs = &info->mti_lh[MDT_LH_CHILD];
497 mdt_lock_reg_init(lhs, LCK_EX);
498 ms = mdt_object_find_lock(info, rr->rr_fid1, lhs,
499 MDS_INODELOCK_UPDATE);
501 GOTO(out, rc = PTR_ERR(ms));
503 mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA);
504 rc = mo_ref_add(info->mti_env, mdt_object_child(ms));
505 mdt_object_unlock_put(info, ms, lhs, rc);
506 mdt_lprocfs_time_end(info->mti_mdt, &info->mti_time,
507 LPROC_MDT_REINT_LINK);
511 /* Invalid case so return error immediately instead of
513 if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
514 GOTO(out, rc = -EPERM);
516 /* step 1: find & lock the target parent dir */
517 lhp = &info->mti_lh[MDT_LH_PARENT];
518 mdt_lock_pdo_init(lhp, LCK_EX, rr->rr_name,
520 mp = mdt_object_find_lock(info, rr->rr_fid2, lhp,
521 MDS_INODELOCK_UPDATE);
523 GOTO(out, rc = PTR_ERR(mp));
525 /* step 2: find & lock the source */
526 lhs = &info->mti_lh[MDT_LH_CHILD];
527 mdt_lock_reg_init(lhs, LCK_EX);
529 ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
531 GOTO(out_unlock_parent, rc = PTR_ERR(ms));
533 rc = mdt_object_lock(info, ms, lhs, MDS_INODELOCK_UPDATE,
536 mdt_object_put(info->mti_env, ms);
537 GOTO(out_unlock_parent, rc);
540 /* step 3: link it */
541 mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
542 OBD_FAIL_MDS_REINT_LINK_WRITE);
544 rc = mdo_link(info->mti_env, mdt_object_child(mp),
545 mdt_object_child(ms), rr->rr_name, ma);
548 mdt_object_unlock_put(info, ms, lhs, rc);
550 mdt_object_unlock_put(info, mp, lhp, rc);
552 mdt_lprocfs_time_end(info->mti_mdt, &info->mti_time,
553 LPROC_MDT_REINT_LINK);
557 /* partial operation for rename */
558 static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
560 struct mdt_reint_record *rr = &info->mti_rr;
561 struct ptlrpc_request *req = mdt_info_req(info);
562 struct md_attr *ma = &info->mti_attr;
563 struct mdt_object *mtgtdir;
564 struct mdt_object *mtgt = NULL;
565 struct mdt_lock_handle *lh_tgtdir;
566 struct mdt_lock_handle *lh_tgt;
567 struct lu_fid *tgt_fid = &info->mti_tmp_fid1;
571 DEBUG_REQ(D_INODE, req, "rename_tgt: insert (%s->"DFID") in "DFID,
572 rr->rr_tgt, PFID(rr->rr_fid2), PFID(rr->rr_fid1));
574 /* step 1: lookup & lock the tgt dir. */
575 lh_tgtdir = &info->mti_lh[MDT_LH_PARENT];
576 mdt_lock_pdo_init(lh_tgtdir, LCK_PW, rr->rr_tgt,
578 mtgtdir = mdt_object_find_lock(info, rr->rr_fid1, lh_tgtdir,
579 MDS_INODELOCK_UPDATE);
581 GOTO(out, rc = PTR_ERR(mtgtdir));
583 /* step 2: find & lock the target object if exists. */
584 mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA);
585 rc = mdo_lookup(info->mti_env, mdt_object_child(mtgtdir),
586 rr->rr_tgt, tgt_fid, &info->mti_spec);
587 if (rc != 0 && rc != -ENOENT) {
588 GOTO(out_unlock_tgtdir, rc);
589 } else if (rc == 0) {
591 * In case of replay that name can be already inserted, check
592 * that and do nothing if so.
594 if (lu_fid_eq(tgt_fid, rr->rr_fid2))
595 GOTO(out_unlock_tgtdir, rc);
597 lh_tgt = &info->mti_lh[MDT_LH_CHILD];
598 mdt_lock_reg_init(lh_tgt, LCK_EX);
600 mtgt = mdt_object_find_lock(info, tgt_fid, lh_tgt,
601 MDS_INODELOCK_LOOKUP);
603 GOTO(out_unlock_tgtdir, rc = PTR_ERR(mtgt));
605 rc = mdo_rename_tgt(info->mti_env, mdt_object_child(mtgtdir),
606 mdt_object_child(mtgt), rr->rr_fid2,
608 } else /* -ENOENT */ {
609 rc = mdo_name_insert(info->mti_env, mdt_object_child(mtgtdir),
610 rr->rr_tgt, rr->rr_fid2,
611 S_ISDIR(ma->ma_attr.la_mode));
614 /* handle last link of tgt object */
616 mdt_handle_last_unlink(info, mtgt, ma);
619 mdt_object_unlock_put(info, mtgt, lh_tgt, rc);
622 mdt_object_unlock_put(info, mtgtdir, lh_tgtdir, rc);
624 mdt_shrink_reply(info, REPLY_REC_OFF + 1, 0, 0);
628 static int mdt_rename_lock(struct mdt_thread_info *info,
629 struct lustre_handle *lh)
631 struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
632 ldlm_policy_data_t *policy = &info->mti_policy;
633 struct ldlm_res_id *res_id = &info->mti_res_id;
638 ls = info->mti_mdt->mdt_md_dev.md_lu_dev.ld_site;
639 fid_build_reg_res_name(&LUSTRE_BFL_FID, res_id);
641 memset(policy, 0, sizeof *policy);
642 policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
644 if (ls->ls_control_exp == NULL) {
645 int flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
648 * Current node is controller, that is mdt0, where we should
651 rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy,
652 LCK_EX, &flags, ldlm_blocking_ast,
653 ldlm_completion_ast, NULL, NULL, 0,
659 * This is the case mdt0 is remote node, issue DLM lock like
662 rc = ldlm_cli_enqueue(ls->ls_control_exp, NULL, res_id,
663 LDLM_IBITS, policy, LCK_EX, &flags,
664 ldlm_blocking_ast, ldlm_completion_ast,
665 NULL, NULL, NULL, 0, NULL, lh, 0);
671 static void mdt_rename_unlock(struct lustre_handle *lh)
674 LASSERT(lustre_handle_is_used(lh));
675 ldlm_lock_decref(lh, LCK_EX);
680 * This is is_subdir() variant, it is CMD if cmm forwards it to correct
681 * target. Source should not be ancestor of target dir. May be other rename
682 * checks can be moved here later.
684 static int mdt_rename_sanity(struct mdt_thread_info *info, struct lu_fid *fid)
686 struct mdt_reint_record *rr = &info->mti_rr;
687 struct lu_fid dst_fid = *rr->rr_fid2;
688 struct mdt_object *dst;
693 LASSERT(fid_is_sane(&dst_fid));
694 dst = mdt_object_find(info->mti_env, info->mti_mdt, &dst_fid);
696 rc = mdo_is_subdir(info->mti_env,
697 mdt_object_child(dst), fid,
699 mdt_object_put(info->mti_env, dst);
700 if (rc != -EREMOTE && rc < 0) {
701 CERROR("Failed mdo_is_subdir(), rc %d\n", rc);
703 /* check the found fid */
704 if (lu_fid_eq(&dst_fid, fid))
710 } while (rc == -EREMOTE);
715 static int mdt_reint_rename(struct mdt_thread_info *info,
716 struct mdt_lock_handle *lhc)
718 struct mdt_reint_record *rr = &info->mti_rr;
719 struct md_attr *ma = &info->mti_attr;
720 struct ptlrpc_request *req = mdt_info_req(info);
721 struct mdt_object *msrcdir;
722 struct mdt_object *mtgtdir;
723 struct mdt_object *mold;
724 struct mdt_object *mnew = NULL;
725 struct mdt_lock_handle *lh_srcdirp;
726 struct mdt_lock_handle *lh_tgtdirp;
727 struct mdt_lock_handle *lh_oldp;
728 struct mdt_lock_handle *lh_newp;
729 struct lu_fid *old_fid = &info->mti_tmp_fid1;
730 struct lu_fid *new_fid = &info->mti_tmp_fid2;
731 struct lustre_handle rename_lh = { 0 };
735 mdt_lprocfs_time_start(info->mti_mdt, &info->mti_time,
736 LPROC_MDT_REINT_RENAME);
738 if (info->mti_cross_ref) {
739 rc = mdt_reint_rename_tgt(info);
740 mdt_lprocfs_time_end(info->mti_mdt, &info->mti_time,
741 LPROC_MDT_REINT_RENAME);
745 DEBUG_REQ(D_INODE, req, "rename "DFID"/%s to "DFID"/%s",
746 PFID(rr->rr_fid1), rr->rr_name,
747 PFID(rr->rr_fid2), rr->rr_tgt);
749 rc = mdt_rename_lock(info, &rename_lh);
751 CERROR("Can't lock FS for rename, rc %d\n", rc);
755 lh_newp = &info->mti_lh[MDT_LH_NEW];
757 /* step 1: lock the source dir. */
758 lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
759 mdt_lock_pdo_init(lh_srcdirp, LCK_PW, rr->rr_name,
761 msrcdir = mdt_object_find_lock(info, rr->rr_fid1, lh_srcdirp,
762 MDS_INODELOCK_UPDATE);
764 GOTO(out_rename_lock, rc = PTR_ERR(msrcdir));
766 /* step 2: find & lock the target dir. */
767 lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
768 mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, rr->rr_tgt,
770 if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
771 mdt_object_get(info->mti_env, msrcdir);
774 mtgtdir = mdt_object_find(info->mti_env, info->mti_mdt,
777 GOTO(out_unlock_source, rc = PTR_ERR(mtgtdir));
779 rc = mdt_object_exists(mtgtdir);
781 GOTO(out_unlock_target, rc = -ESTALE);
783 /* we lock the target dir if it is local */
784 rc = mdt_object_lock(info, mtgtdir, lh_tgtdirp,
785 MDS_INODELOCK_UPDATE,
788 GOTO(out_unlock_target, rc);
792 /* step 3: find & lock the old object. */
793 rc = mdo_lookup(info->mti_env, mdt_object_child(msrcdir),
794 rr->rr_name, old_fid, &info->mti_spec);
796 GOTO(out_unlock_target, rc);
798 if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
799 GOTO(out_unlock_target, rc = -EINVAL);
801 mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
803 GOTO(out_unlock_target, rc = PTR_ERR(mold));
805 lh_oldp = &info->mti_lh[MDT_LH_OLD];
806 mdt_lock_reg_init(lh_oldp, LCK_EX);
807 rc = mdt_object_lock(info, mold, lh_oldp, MDS_INODELOCK_LOOKUP,
810 mdt_object_put(info->mti_env, mold);
811 GOTO(out_unlock_target, rc);
814 /* step 4: find & lock the new object. */
815 /* new target object may not exist now */
816 rc = mdo_lookup(info->mti_env, mdt_object_child(mtgtdir),
817 rr->rr_tgt, new_fid, &info->mti_spec);
819 /* the new_fid should have been filled at this moment */
820 if (lu_fid_eq(old_fid, new_fid))
821 GOTO(out_unlock_old, rc);
823 if (lu_fid_eq(new_fid, rr->rr_fid1) ||
824 lu_fid_eq(new_fid, rr->rr_fid2))
825 GOTO(out_unlock_old, rc = -EINVAL);
827 mdt_lock_reg_init(lh_newp, LCK_EX);
828 mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
830 GOTO(out_unlock_old, rc = PTR_ERR(mnew));
832 rc = mdt_object_lock(info, mnew, lh_newp,
833 MDS_INODELOCK_FULL, MDT_CROSS_LOCK);
835 mdt_object_put(info->mti_env, mnew);
836 GOTO(out_unlock_old, rc);
838 } else if (rc != -EREMOTE && rc != -ENOENT)
839 GOTO(out_unlock_old, rc);
841 /* step 5: rename it */
842 ma->ma_lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
843 ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
844 &RMF_MDT_MD, RCL_SERVER);
846 ma->ma_cookie = req_capsule_server_get(&info->mti_pill,
848 ma->ma_cookie_size = req_capsule_get_size(&info->mti_pill,
849 &RMF_LOGCOOKIES, RCL_SERVER);
851 if (!ma->ma_lmm || !ma->ma_cookie)
852 GOTO(out_unlock_new, rc = -EINVAL);
854 ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE;
856 mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
857 OBD_FAIL_MDS_REINT_RENAME_WRITE);
859 mdt_set_capainfo(info, 2, old_fid, BYPASS_CAPA);
860 mdt_set_capainfo(info, 3, new_fid, BYPASS_CAPA);
862 /* Check if @dst is subdir of @src. */
863 rc = mdt_rename_sanity(info, old_fid);
865 GOTO(out_unlock_new, rc);
867 rc = mdo_rename(info->mti_env, mdt_object_child(msrcdir),
868 mdt_object_child(mtgtdir), old_fid, rr->rr_name,
869 (mnew ? mdt_object_child(mnew) : NULL),
872 /* handle last link of tgt object */
874 mdt_handle_last_unlink(info, mnew, ma);
878 mdt_object_unlock_put(info, mnew, lh_newp, rc);
880 mdt_object_unlock_put(info, mold, lh_oldp, rc);
882 mdt_object_unlock_put(info, mtgtdir, lh_tgtdirp, rc);
884 mdt_object_unlock_put(info, msrcdir, lh_srcdirp, rc);
886 mdt_rename_unlock(&rename_lh);
888 mdt_shrink_reply(info, REPLY_REC_OFF + 1, 0, 0);
889 mdt_lprocfs_time_end(info->mti_mdt, &info->mti_time,
890 LPROC_MDT_REINT_RENAME);
894 typedef int (*mdt_reinter)(struct mdt_thread_info *info,
895 struct mdt_lock_handle *lhc);
897 static mdt_reinter reinters[REINT_MAX] = {
898 [REINT_SETATTR] = mdt_reint_setattr,
899 [REINT_CREATE] = mdt_reint_create,
900 [REINT_LINK] = mdt_reint_link,
901 [REINT_UNLINK] = mdt_reint_unlink,
902 [REINT_RENAME] = mdt_reint_rename,
903 [REINT_OPEN] = mdt_reint_open
906 int mdt_reint_rec(struct mdt_thread_info *info,
907 struct mdt_lock_handle *lhc)
912 rc = reinters[info->mti_rr.rr_opcode](info, lhc);